In grpc-mate we have product_read_servicer.py to show how to do server stream and bi-directional streaming in grpc.
protobuf
service ProductReadService {
//download product by category
//used to demo server side stream
rpc DownloadProducts (DownloadProductsRequest) returns (stream Product) {
}
//search product and return all matched products
//used to demo simple grpc call
rpc SearchProducts (SearchProductsRequest) returns (SearchProductsResponse) {
}
//calcualte each proeuct sore based on simple rule
//used to demo bi directional stream
rpc CalculateProductScore (stream Product) returns (stream CalculateProductScoreResponse) {
}
rpc DownloadProductImage(DownloadProductImageRequest) returns(stream DataChunk){
}
}
Server Stream
def DownloadProducts(self, request, context):
with session_scope() as session:
result = session.query(DBProduct) \
.filter(DBProduct.category == request.category) \
.all()
for product in result:
yield db_product_to_protobuf_product(product)
in order to return a stream of data, the there is no return statement in the server side, it use yield instead, this will make the call to DownloadProducts to return a python generator, grpc framework will take care of call next element of the generator as a stream, it will also handle the end of stream automatially
to call the server stream, we could call the grpc method normally, but it will return an iterator, so that in clint side, we could iterator over the server stream
def test_DownloadProducts_exist(grpc_stub):
faker = Faker()
category = faker.name()
# save to db
with session_scope() as session:
for idx in range(5):
product = DBProduct(product_name=f'{faker.name()}_{idx}',
product_price=Decimal(faker.random_int() / 100),
product_status=InStock,
category=category)
session.add(product)
result = grpc_stub.DownloadProducts(DownloadProductsRequest(category=category))
# assert we have 5 items
assert len(list(result)) == 5
bi-directional Stream
def CalculateProductScore(self, request_iterator, context):
for product in request_iterator:
yield CalculateProductScoreResponse(product=product, score=int(product.product_price * 2))
bi-directional stream is a combination of client stream and server stream, it will accept a stream of input and out put a stream of data as output, thanks to Python's generator design, this is super simple in python, we could iterator the input stream and just yield result out wen it's ready, the grpc framework will take care of the rest
to call the bi-directional service, we could also make a generator function to generate the input paramter and iterate the result from the output parameter, look the example below
the call of function product_generator
will result in a python iterator,this itertor could be used as parameter of CalculateProductScore
, the result of CalculateProductScore
is also an iterator, we could iterate the result as well.
def product_generator():
for i in range(0, 5):
yield Product(product_id=i, product_name=f'product_name_{i}', product_price=i, product_status=InStock,category='category')
def test_CalculateProductScore(grpc_stub):
product_iterator = product_generator()
result = grpc_stub.CalculateProductScore(product_iterator)
all_result = list(result)
assert len(all_result) == 5
for response in all_result:
assert int(response.product.product_price * 2) == response.score