In grpc-mate we have product_read_servicer.py to show how to do server stream and bi-directional streaming in grpc.

python-grpc

protobuf

service ProductReadService {
    //download product by category
    //used to demo server side stream
    rpc DownloadProducts (DownloadProductsRequest) returns (stream Product) {

    }
    //search product and return all matched products
    //used to demo simple grpc call
    rpc SearchProducts (SearchProductsRequest) returns (SearchProductsResponse) {
    }

    //calcualte each proeuct sore based on simple rule
    //used to demo bi directional stream
    rpc CalculateProductScore (stream Product) returns (stream CalculateProductScoreResponse) {
    }

    rpc DownloadProductImage(DownloadProductImageRequest) returns(stream DataChunk){
    }
}

Server Stream

    def DownloadProducts(self, request, context):
        with session_scope() as session:
            result = session.query(DBProduct) \
                .filter(DBProduct.category == request.category) \
                .all()
            for product in result:
                yield db_product_to_protobuf_product(product)

in order to return a stream of data, the there is no return statement in the server side, it use yield instead, this will make the call to DownloadProducts to return a python generator, grpc framework will take care of call next element of the generator as a stream, it will also handle the end of stream automatially

to call the server stream, we could call the grpc method normally, but it will return an iterator, so that in clint side, we could iterator over the server stream

def test_DownloadProducts_exist(grpc_stub):
    faker = Faker()
    category = faker.name()
    # save to db
    with session_scope() as session:
        for idx in range(5):
            product = DBProduct(product_name=f'{faker.name()}_{idx}',
                                product_price=Decimal(faker.random_int() / 100),
                                product_status=InStock,
                                category=category)
            session.add(product)
    result = grpc_stub.DownloadProducts(DownloadProductsRequest(category=category))

    # assert we have 5 items
    assert len(list(result)) == 5

bi-directional Stream

    def CalculateProductScore(self, request_iterator, context):
        for product in request_iterator:
            yield CalculateProductScoreResponse(product=product, score=int(product.product_price * 2))

bi-directional stream is a combination of client stream and server stream, it will accept a stream of input and out put a stream of data as output, thanks to Python's generator design, this is super simple in python, we could iterator the input stream and just yield result out wen it's ready, the grpc framework will take care of the rest

to call the bi-directional service, we could also make a generator function to generate the input paramter and iterate the result from the output parameter, look the example below

the call of function product_generator will result in a python iterator,this itertor could be used as parameter of CalculateProductScore, the result of CalculateProductScore is also an iterator, we could iterate the result as well.

def product_generator():
    for i in range(0, 5):
        yield Product(product_id=i, product_name=f'product_name_{i}', product_price=i, product_status=InStock,category='category')

def test_CalculateProductScore(grpc_stub):
    product_iterator = product_generator()
    result = grpc_stub.CalculateProductScore(product_iterator)
    all_result = list(result)
    assert len(all_result) == 5
    for response in all_result:
        assert int(response.product.product_price * 2) == response.score