Client

A single Channel represents a single connection to the server. Because gRPC is based on HTTP/2, there is no need to create multiple connections to the server, many concurrent RPC calls can be performed through a single multiplexed connection. See Overview for more details.

async with Channel(host, port) as channel:
    pass

A single server can implement several services, so you can reuse one channel for all corresponding service stubs:

foo_svc = FooServiceStub(channel)
bar_svc = BarServiceStub(channel)
baz_svc = BazServiceStub(channel)

There are two ways to call RPC methods:

  • simple, suitable for unary-unary calls:

    reply = await stub.Method(Request())
    
  • advanced, suitable for streaming calls:

    async with stub.BiDiMethod.open() as stream:
        await stream.send_request()  # needed to initiate a call
        while True:
            task = await task_queue.get()
            if task is None:
                await stream.end()
                break
            else:
                await stream.send_message(task)
                result = await stream.recv_message()
                await result_queue.add(task)
    

See reference docs for all method types and for the Stream methods and attributes.

Secure Channels

Here is how to establish a secure connection to a public gRPC server:

channel = Channel(host, port, ssl=True)
                              ^^^^^^^^

In this case grpclib uses system CA certificates. But grpclib has also a built-in support for a certifi package which contains actual Mozilla’s collection of CA certificates. All you need is to install it and keep it up to date – this is a more favorable way than relying on system CA certificates:

$ pip3 install certifi

Another way to tell which CA certificates to use is by using ssl.get_default_verify_paths() function:

channel = Channel(host, port, ssl=ssl.get_default_verify_paths())
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

This function also supports reading SSL_CERT_FILE and SSL_CERT_DIR environment variables to override your system defaults. It returns DefaultVerifyPaths named tuple structure which you can customize and provide your own cafile and capath values without using environment variables or placing certificates into a distribution-specific directory:

ssl.get_default_verify_paths()._replace(cafile=YOUR_CA_FILE)

grpclib also allows you to use a custom SSL configuration by providing a SSLContext object. We have a simple mTLS auth example in our code repository to illustrate how this works.

Reference

class grpclib.client.Stream(channel: Channel, method_name: str, metadata: _Metadata, cardinality: Cardinality, send_type: Type[_SendType], recv_type: Type[_RecvType], *, codec: CodecBase, status_details_codec: Optional[StatusDetailsCodecBase], dispatch: _DispatchChannelEvents, deadline: Optional[Deadline] = None)

Represents gRPC method call - HTTP/2 request/stream, and everything you need to communicate with server in order to get response.

In order to work directly with stream, you should ServiceMethod.open() request like this:

request = cafe_pb2.LatteOrder(
    size=cafe_pb2.SMALL,
    temperature=70,
    sugar=3,
)
async with client.MakeLatte.open() as stream:
    await stream.send_message(request, end=True)
    reply: empty_pb2.Empty = await stream.recv_message()
initial_metadata: Optional[_Metadata] = None

This property contains initial metadata, received with headers from the server. It equals to None initially, and to a multi-dict object after recv_initial_metadata() coroutine succeeds.

trailing_metadata: Optional[_Metadata] = None

This property contains trailing metadata, received with trailers from the server. It equals to None initially, and to a multi-dict object after recv_trailing_metadata() coroutine succeeds.

peer: Optional[Peer] = None

Connection’s peer info of type Peer

async send_request(*, end: bool = False) None

Coroutine to send request headers with metadata to the server.

New HTTP/2 stream will be created during this coroutine call.

Note

This coroutine will be called implicitly during first send_message() coroutine call, if not called before explicitly.

Parameters:

end – end outgoing stream if there are no messages to send in a streaming request

async send_message(message: _SendType, *, end: bool = False) None

Coroutine to send message to the server.

If client sends UNARY request, then you should call this coroutine only once. If client sends STREAM request, then you can call this coroutine as many times as you need.

Warning

It is important to finally end stream from the client-side when you finished sending messages.

You can do this in two ways:

  • specify end=True argument while sending last message - and last DATA frame will include END_STREAM flag;

  • call end() coroutine after sending last message - and extra HEADERS frame with END_STREAM flag will be sent.

First approach is preferred, because it doesn’t require sending additional HTTP/2 frame.

async end() None

Coroutine to end stream from the client-side.

It should be used to finally end stream from the client-side when we’re finished sending messages to the server and stream wasn’t closed with last DATA frame. See send_message() for more details.

HTTP/2 stream will have half-closed (local) state after this coroutine call.

async recv_initial_metadata() None

Coroutine to wait for headers with initial metadata from the server.

Note

This coroutine will be called implicitly during first recv_message() coroutine call, if not called before explicitly.

May raise GRPCError if server returned non-Status.OK in trailers-only response.

When this coroutine finishes, you can access received initial metadata by using initial_metadata attribute.

async recv_message() Optional[_RecvType]

Coroutine to receive incoming message from the server.

If server sends UNARY response, then you can call this coroutine only once. If server sends STREAM response, then you should call this coroutine several times, until it returns None when the server has ended the stream. To simplify you code in this case, Stream implements async iterations protocol, so you can use it like this:

async for message in stream:
    do_smth_with(message)

or even like this:

messages = [msg async for msg in stream]

HTTP/2 has flow control mechanism, so client will acknowledge received DATA frames as a message only after user consumes this coroutine.

Returns:

message

async recv_trailing_metadata() None

Coroutine to wait for trailers with trailing metadata from the server.

Note

This coroutine will be called implicitly at exit from this call (context manager’s exit), if not called before explicitly.

May raise GRPCError if server returned non-Status.OK in trailers.

When this coroutine finishes, you can access received trailing metadata by using trailing_metadata attribute.

async cancel() None

Coroutine to cancel this request/stream.

Client will send RST_STREAM frame to the server, so it will be explicitly informed that there is nothing to expect from the client regarding this request/stream.

class grpclib.client.Channel(host: Optional[str] = None, port: Optional[int] = None, *, loop: Optional[AbstractEventLoop] = None, path: Optional[str] = None, codec: Optional[CodecBase] = None, status_details_codec: Optional[StatusDetailsCodecBase] = None, ssl: Union[None, bool, SSLContext, DefaultVerifyPaths] = None, config: Optional[Configuration] = None)

Represents a connection to the server, which can be used with generated stub classes to perform gRPC calls.

channel = Channel()
client = cafe_grpc.CoffeeMachineStub(channel)

...

request = cafe_pb2.LatteOrder(
    size=cafe_pb2.SMALL,
    temperature=70,
    sugar=3,
)
reply: empty_pb2.Empty = await client.MakeLatte(request)

...

channel.close()

Initialize connection to the server

Parameters:
  • host – server host name.

  • port – server port number.

  • loop – (deprecated) asyncio-compatible event loop

  • path – server socket path. If specified, host and port should be omitted (must be None).

  • codec – instance of a codec to encode and decode messages, if omitted ProtoCodec is used by default

  • status_details_codec – instance of a status details codec to decode error details in a trailing metadata, if omitted ProtoStatusDetailsCodec is used by default

  • sslTrue or SSLContext object or ssl.DefaultVerifyPaths object; if True, default SSL context is used.

close() None

Closes connection to the server.

class grpclib.client.UnaryUnaryMethod(channel: Channel, name: str, request_type: Type[_SendType], reply_type: Type[_RecvType])

Represents UNARY-UNARY gRPC method type.

async __call__(message: _SendType, *, timeout: Optional[float] = None, metadata: Optional[Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]]]] = None) _RecvType

Coroutine to perform defined call.

Parameters:
  • message – message

  • timeout (float) – request timeout (seconds)

  • metadata – custom request metadata, dict or list of pairs

Returns:

message

open(*, timeout: Optional[float] = None, metadata: Optional[Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]]]] = None) Stream[_SendType, _RecvType]

Creates and returns Stream object to perform request to the server.

Nothing will happen to the current underlying HTTP/2 connection during this method call. It just initializes Stream object for you. Actual request will be sent only during Stream.send_request() or Stream.send_message() coroutine call.

Parameters:
  • timeout (float) – request timeout (seconds)

  • metadata – custom request metadata, dict or list of pairs

Returns:

Stream object

class grpclib.client.UnaryStreamMethod(channel: Channel, name: str, request_type: Type[_SendType], reply_type: Type[_RecvType])

Represents UNARY-STREAM gRPC method type.

async __call__(message: _SendType, *, timeout: Optional[float] = None, metadata: Optional[Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]]]] = None) List[_RecvType]

Coroutine to perform defined call.

Parameters:
  • message – message

  • timeout (float) – request timeout (seconds)

  • metadata – custom request metadata, dict or list of pairs

Returns:

sequence of messages

open(*, timeout: Optional[float] = None, metadata: Optional[Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]]]] = None) Stream[_SendType, _RecvType]

Creates and returns Stream object to perform request to the server.

Nothing will happen to the current underlying HTTP/2 connection during this method call. It just initializes Stream object for you. Actual request will be sent only during Stream.send_request() or Stream.send_message() coroutine call.

Parameters:
  • timeout (float) – request timeout (seconds)

  • metadata – custom request metadata, dict or list of pairs

Returns:

Stream object

class grpclib.client.StreamUnaryMethod(channel: Channel, name: str, request_type: Type[_SendType], reply_type: Type[_RecvType])

Represents STREAM-UNARY gRPC method type.

async __call__(messages: Sequence[_SendType], *, timeout: Optional[float] = None, metadata: Optional[Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]]]] = None) _RecvType

Coroutine to perform defined call.

Parameters:
  • messages – sequence of messages

  • timeout (float) – request timeout (seconds)

  • metadata – custom request metadata, dict or list of pairs

Returns:

message

open(*, timeout: Optional[float] = None, metadata: Optional[Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]]]] = None) Stream[_SendType, _RecvType]

Creates and returns Stream object to perform request to the server.

Nothing will happen to the current underlying HTTP/2 connection during this method call. It just initializes Stream object for you. Actual request will be sent only during Stream.send_request() or Stream.send_message() coroutine call.

Parameters:
  • timeout (float) – request timeout (seconds)

  • metadata – custom request metadata, dict or list of pairs

Returns:

Stream object

class grpclib.client.StreamStreamMethod(channel: Channel, name: str, request_type: Type[_SendType], reply_type: Type[_RecvType])

Represents STREAM-STREAM gRPC method type.

async __call__(messages: Sequence[_SendType], *, timeout: Optional[float] = None, metadata: Optional[Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]]]] = None) List[_RecvType]

Coroutine to perform defined call.

Parameters:
  • messages – sequence of messages

  • timeout (float) – request timeout (seconds)

  • metadata – custom request metadata, dict or list of pairs

Returns:

sequence of messages

open(*, timeout: Optional[float] = None, metadata: Optional[Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]]]] = None) Stream[_SendType, _RecvType]

Creates and returns Stream object to perform request to the server.

Nothing will happen to the current underlying HTTP/2 connection during this method call. It just initializes Stream object for you. Actual request will be sent only during Stream.send_request() or Stream.send_message() coroutine call.

Parameters:
  • timeout (float) – request timeout (seconds)

  • metadata – custom request metadata, dict or list of pairs

Returns:

Stream object