Client
A single Channel
represents a single connection to
the server. Because gRPC is based on HTTP/2, there is no need to create multiple
connections to the server, many concurrent RPC calls can be performed through
a single multiplexed connection. See Overview for more details.
async with Channel(host, port) as channel:
pass
A single server can implement several services, so you can reuse one channel for all corresponding service stubs:
foo_svc = FooServiceStub(channel)
bar_svc = BarServiceStub(channel)
baz_svc = BazServiceStub(channel)
There are two ways to call RPC methods:
simple, suitable for unary-unary calls:
reply = await stub.Method(Request())
advanced, suitable for streaming calls:
async with stub.BiDiMethod.open() as stream: await stream.send_request() # needed to initiate a call while True: task = await task_queue.get() if task is None: await stream.end() break else: await stream.send_message(task) result = await stream.recv_message() await result_queue.add(task)
See reference docs for all method types and for the
Stream
methods and attributes.
Secure Channels
Here is how to establish a secure connection to a public gRPC server:
channel = Channel(host, port, ssl=True)
^^^^^^^^
In this case grpclib
uses system CA certificates. But grpclib
has also
a built-in support for a certifi package which contains actual Mozilla’s
collection of CA certificates. All you need is to install it and keep it
updated – this is a more favorable way than relying on system CA certificates:
$ pip3 install certifi
grpclib
also allows you to use a custom SSL configuration by providing a
SSLContext
object. We have a simple mTLS auth example
in our code repository to illustrate how this works.
Reference
- class grpclib.client.Stream(channel: Channel, method_name: str, metadata: _Metadata, cardinality: Cardinality, send_type: Type[_SendType], recv_type: Type[_RecvType], *, codec: CodecBase, status_details_codec: Optional[StatusDetailsCodecBase], dispatch: _DispatchChannelEvents, deadline: Optional[Deadline] = None)
Represents gRPC method call - HTTP/2 request/stream, and everything you need to communicate with server in order to get response.
In order to work directly with stream, you should
ServiceMethod.open()
request like this:request = cafe_pb2.LatteOrder( size=cafe_pb2.SMALL, temperature=70, sugar=3, ) async with client.MakeLatte.open() as stream: await stream.send_message(request, end=True) reply: empty_pb2.Empty = await stream.recv_message()
- initial_metadata: Optional[_Metadata] = None
This property contains initial metadata, received with headers from the server. It equals to
None
initially, and to a multi-dict object afterrecv_initial_metadata()
coroutine succeeds.
- trailing_metadata: Optional[_Metadata] = None
This property contains trailing metadata, received with trailers from the server. It equals to
None
initially, and to a multi-dict object afterrecv_trailing_metadata()
coroutine succeeds.
- async send_request(*, end: bool = False) None
Coroutine to send request headers with metadata to the server.
New HTTP/2 stream will be created during this coroutine call.
Note
This coroutine will be called implicitly during first
send_message()
coroutine call, if not called before explicitly.- Parameters
end – end outgoing stream if there are no messages to send in a streaming request
- async send_message(message: _SendType, *, end: bool = False) None
Coroutine to send message to the server.
If client sends UNARY request, then you should call this coroutine only once. If client sends STREAM request, then you can call this coroutine as many times as you need.
Warning
It is important to finally end stream from the client-side when you finished sending messages.
You can do this in two ways:
specify
end=True
argument while sending last message - and last DATA frame will include END_STREAM flag;call
end()
coroutine after sending last message - and extra HEADERS frame with END_STREAM flag will be sent.
First approach is preferred, because it doesn’t require sending additional HTTP/2 frame.
- async end() None
Coroutine to end stream from the client-side.
It should be used to finally end stream from the client-side when we’re finished sending messages to the server and stream wasn’t closed with last DATA frame. See
send_message()
for more details.HTTP/2 stream will have half-closed (local) state after this coroutine call.
- async recv_initial_metadata() None
Coroutine to wait for headers with initial metadata from the server.
Note
This coroutine will be called implicitly during first
recv_message()
coroutine call, if not called before explicitly.May raise
GRPCError
if server returned non-Status.OK
in trailers-only response.When this coroutine finishes, you can access received initial metadata by using
initial_metadata
attribute.
- async recv_message() Optional[_RecvType]
Coroutine to receive incoming message from the server.
If server sends UNARY response, then you can call this coroutine only once. If server sends STREAM response, then you should call this coroutine several times, until it returns None when the server has ended the stream. To simplify you code in this case,
Stream
implements async iterations protocol, so you can use it like this:async for message in stream: do_smth_with(message)
or even like this:
messages = [msg async for msg in stream]
HTTP/2 has flow control mechanism, so client will acknowledge received DATA frames as a message only after user consumes this coroutine.
- Returns
message
- async recv_trailing_metadata() None
Coroutine to wait for trailers with trailing metadata from the server.
Note
This coroutine will be called implicitly at exit from this call (context manager’s exit), if not called before explicitly.
May raise
GRPCError
if server returned non-Status.OK
in trailers.When this coroutine finishes, you can access received trailing metadata by using
trailing_metadata
attribute.
- class grpclib.client.Channel(host: Optional[str] = None, port: Optional[int] = None, *, loop: Optional[AbstractEventLoop] = None, path: Optional[str] = None, codec: Optional[CodecBase] = None, status_details_codec: Optional[StatusDetailsCodecBase] = None, ssl: Union[None, bool, SSLContext] = None, config: Optional[Configuration] = None)
Represents a connection to the server, which can be used with generated stub classes to perform gRPC calls.
channel = Channel() client = cafe_grpc.CoffeeMachineStub(channel) ... request = cafe_pb2.LatteOrder( size=cafe_pb2.SMALL, temperature=70, sugar=3, ) reply: empty_pb2.Empty = await client.MakeLatte(request) ... channel.close()
Initialize connection to the server
- Parameters
host – server host name.
port – server port number.
loop – (deprecated) asyncio-compatible event loop
path – server socket path. If specified, host and port should be omitted (must be None).
codec – instance of a codec to encode and decode messages, if omitted
ProtoCodec
is used by defaultstatus_details_codec – instance of a status details codec to decode error details in a trailing metadata, if omitted
ProtoStatusDetailsCodec
is used by defaultssl –
True
orSSLContext
object; ifTrue
, default SSL context is used.
- class grpclib.client.UnaryUnaryMethod(channel: Channel, name: str, request_type: Type[_SendType], reply_type: Type[_RecvType])
Represents UNARY-UNARY gRPC method type.
- async __call__(message: _SendType, *, timeout: Optional[float] = None, metadata: Optional[Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]]]] = None) _RecvType
Coroutine to perform defined call.
- Parameters
message – message
timeout (float) – request timeout (seconds)
metadata – custom request metadata, dict or list of pairs
- Returns
message
- open(*, timeout: Optional[float] = None, metadata: Optional[Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]]]] = None) Stream[_SendType, _RecvType]
Creates and returns
Stream
object to perform request to the server.Nothing will happen to the current underlying HTTP/2 connection during this method call. It just initializes
Stream
object for you. Actual request will be sent only duringStream.send_request()
orStream.send_message()
coroutine call.
- class grpclib.client.UnaryStreamMethod(channel: Channel, name: str, request_type: Type[_SendType], reply_type: Type[_RecvType])
Represents UNARY-STREAM gRPC method type.
- async __call__(message: _SendType, *, timeout: Optional[float] = None, metadata: Optional[Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]]]] = None) List[_RecvType]
Coroutine to perform defined call.
- Parameters
message – message
timeout (float) – request timeout (seconds)
metadata – custom request metadata, dict or list of pairs
- Returns
sequence of messages
- open(*, timeout: Optional[float] = None, metadata: Optional[Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]]]] = None) Stream[_SendType, _RecvType]
Creates and returns
Stream
object to perform request to the server.Nothing will happen to the current underlying HTTP/2 connection during this method call. It just initializes
Stream
object for you. Actual request will be sent only duringStream.send_request()
orStream.send_message()
coroutine call.
- class grpclib.client.StreamUnaryMethod(channel: Channel, name: str, request_type: Type[_SendType], reply_type: Type[_RecvType])
Represents STREAM-UNARY gRPC method type.
- async __call__(messages: Sequence[_SendType], *, timeout: Optional[float] = None, metadata: Optional[Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]]]] = None) _RecvType
Coroutine to perform defined call.
- Parameters
messages – sequence of messages
timeout (float) – request timeout (seconds)
metadata – custom request metadata, dict or list of pairs
- Returns
message
- open(*, timeout: Optional[float] = None, metadata: Optional[Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]]]] = None) Stream[_SendType, _RecvType]
Creates and returns
Stream
object to perform request to the server.Nothing will happen to the current underlying HTTP/2 connection during this method call. It just initializes
Stream
object for you. Actual request will be sent only duringStream.send_request()
orStream.send_message()
coroutine call.
- class grpclib.client.StreamStreamMethod(channel: Channel, name: str, request_type: Type[_SendType], reply_type: Type[_RecvType])
Represents STREAM-STREAM gRPC method type.
- async __call__(messages: Sequence[_SendType], *, timeout: Optional[float] = None, metadata: Optional[Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]]]] = None) List[_RecvType]
Coroutine to perform defined call.
- Parameters
messages – sequence of messages
timeout (float) – request timeout (seconds)
metadata – custom request metadata, dict or list of pairs
- Returns
sequence of messages
- open(*, timeout: Optional[float] = None, metadata: Optional[Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]]]] = None) Stream[_SendType, _RecvType]
Creates and returns
Stream
object to perform request to the server.Nothing will happen to the current underlying HTTP/2 connection during this method call. It just initializes
Stream
object for you. Actual request will be sent only duringStream.send_request()
orStream.send_message()
coroutine call.