In grpc-mate we have product_update_servicer.py to show how to do client stream upload in grpc.
protobuf
service ProductUpdateService {
//upload product into elastic search , make it so that we could search on it
//used to demo client side stream
rpc UploadProduct (stream Product) returns (UploadProductResponse) {
}
}
message UploadProductResponse {
enum ResultStatus {
SUCCESS = 0;
FAILED = 1;
}
ResultStatus result_status = 1;
}
server side
class ProductUpdateServiceServicer(grpc_mate.product_search_engine_pb2_grpc.ProductUpdateServiceServicer):
def UploadProduct(self, request_iterator, context):
with session_scope() as session:
for product in request_iterator:
db_product = DBProduct()
for k in product.DESCRIPTOR.fields_by_name:
setattr(db_product, k, getattr(product, k))
db_product.product_id = None
session.add(db_product)
return UploadProductResponse(result_status=UploadProductResponse.SUCCESS)
- from the server side, we will get a request iterator to iterate all the uploaded object, in this case it's a stream of Product messages
- we could re-use the session_scope to access session object without worrying about session management.
- as we defined DBProdcut have exactly the same attributes as Product message, we could use code below to copy the attributes between the objects, this is super useful when you have lots of fields in the message
db_product = DBProduct()
for k in product.DESCRIPTOR.fields_by_name:
setattr(db_product, k, getattr(product, k))
client side
I will use unit test way to show how to make a stream call in the client, I've already discussed how to use pytest-grpc to test the grpc servicer in this article, I will use the same approach in test_product_update_servicer.py, but there are something different from previous article -- we also use sqlalchemy to do data persist, so we need a way to init the DB and destroy the DB after usage, thanks to pytest's fixture, we could easily achieve this by code below
@pytest.fixture(autouse=True, scope='function')
def create_schema():
if engine.url.__str__() == 'sqlite:///:memory:':
Base.metadata.create_all(engine)
yield None
Base.metadata.drop_all(engine)
create_schema fixture will be exucted before any test method and yield nothing, after the test method complete, it will drop all the tables, so any test method in the module will have a clean db state to use.
to call the grpc method, we simply use a list to simulate the iterator then pass to the client call, due to python's iterator design, apart from list, we could write a custom iteerator or generator to have a limitless client stream pass to the grpc server
products = [
Product(product_name='product_name_1', product_price=1.0, product_status=InStock, category='category_1'),
Product(product_name='product_name_2', product_price=2.0, product_status=InStock, category='category_2')]
grpc_stub.UploadProduct(iter(products))