Server

A single Server can serve arbitrary number of services:

server = Server([foo_svc, bar_svc, baz_svc])

To monitor health of your services you can use standard gRPC health checking protocol, details are here: Health Checking.

There is a special gRPC reflection protocol to inspect running servers and call their methods using command-line tools, details are here: Reflection. It is as simple as using curl.

It is also important to handle server’s exit properly:

with graceful_exit([server]):
    await server.start(host, port)
    print(f'Serving on {host}:{port}')
    await server.wait_closed()

graceful_exit() helps you handle SIGINT and SIGTERM signals.

When things become complicated you can start using AsyncExitStack and asynccontextmanager() to manage lifecycle of your application and used resources:

async with AsyncExitStack() as stack:
    db = await stack.enter_async_context(setup_db())
    foo_svc = FooService(db)

    server = Server([foo_svc])
    stack.enter_context(graceful_exit([server]))
    await server.start(host, port)
    print(f'Serving on {host}:{port}')
    await server.wait_closed()

Reference

class grpclib.server.Stream(stream: protocol.Stream, method_name: str, cardinality: Cardinality, recv_type: Type[_RecvType], send_type: Type[_SendType], *, codec: CodecBase, status_details_codec: Optional[StatusDetailsCodecBase], dispatch: _DispatchServerEvents, deadline: Optional[Deadline] = None, user_agent: Optional[str] = None)

Represents gRPC method call – HTTP/2 request/stream, and everything you need to communicate with client in order to handle this request.

As you can see, every method handler accepts single positional argument - stream:

async def MakeLatte(self, stream: grpclib.server.Stream):
    task: cafe_pb2.LatteOrder = await stream.recv_message()
    ...
    await stream.send_message(empty_pb2.Empty())

This is true for every gRPC method type.

deadline

Deadline of the current request

metadata: Optional[_Metadata]

Invocation metadata, received with headers from the client. Represented as a multi-dict object.

user_agent

Client’s user-agent

peer

Connection’s peer info of type Peer

async recv_message() Optional[_RecvType]

Coroutine to receive incoming message from the client.

If client sends UNARY request, then you can call this coroutine only once. If client sends STREAM request, then you should call this coroutine several times, until it returns None when the client has ended the stream. To simplify your code in this case, Stream class implements async iteration protocol, so you can use it like this:

async for message in stream:
    do_smth_with(message)

or even like this:

messages = [msg async for msg in stream]

HTTP/2 has flow control mechanism, so server will acknowledge received DATA frames as a message only after user consumes this coroutine.

Returns:

message

async send_initial_metadata(*, metadata: Optional[Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]]]] = None) None

Coroutine to send headers with initial metadata to the client.

In gRPC you can send initial metadata as soon as possible, because gRPC doesn’t use :status pseudo header to indicate success or failure of the current request. gRPC uses trailers for this purpose, and trailers are sent during send_trailing_metadata() call, which should be called in the end.

Note

This coroutine will be called implicitly during first send_message() coroutine call, if not called before explicitly.

Parameters:

metadata – custom initial metadata, dict or list of pairs

async send_message(message: _SendType) None

Coroutine to send message to the client.

If server sends UNARY response, then you should call this coroutine only once. If server sends STREAM response, then you can call this coroutine as many times as you need.

Parameters:

message – message object

async send_trailing_metadata(*, status: Status = Status.OK, status_message: Optional[str] = None, status_details: Optional[Any] = None, metadata: Optional[Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]]]] = None) None

Coroutine to send trailers with trailing metadata to the client.

This coroutine allows sending trailers-only responses, in case of some failure conditions during handling current request, i.e. when status is not OK.

Note

This coroutine will be called implicitly at exit from request handler, with appropriate status code, if not called explicitly during handler execution.

Parameters:
  • status – resulting status of this coroutine call

  • status_message – description for a status

  • metadata – custom trailing metadata, dict or list of pairs

async cancel() None

Coroutine to cancel this request/stream.

Server will send RST_STREAM frame to the client, so it will be explicitly informed that there is nothing to expect from the server regarding this request/stream.

class grpclib.server.Server(handlers: Collection[IServable], *, loop: Optional[AbstractEventLoop] = None, codec: Optional[CodecBase] = None, status_details_codec: Optional[StatusDetailsCodecBase] = None, config: Optional[Configuration] = None)

HTTP/2 server, which uses gRPC service handlers to handle requests.

Handler is a subclass of the abstract base class, which was generated from .proto file:

class CoffeeMachine(cafe_grpc.CoffeeMachineBase):

    async def MakeLatte(self, stream):
        task: cafe_pb2.LatteOrder = await stream.recv_message()
        ...
        await stream.send_message(empty_pb2.Empty())

server = Server([CoffeeMachine()])
Parameters:
  • handlers – list of handlers

  • loop – (deprecated) asyncio-compatible event loop

  • codec – instance of a codec to encode and decode messages, if omitted ProtoCodec is used by default

  • status_details_codec – instance of a status details codec to encode error details in a trailing metadata, if omitted ProtoStatusDetailsCodec is used by default

async start(host: Optional[str] = None, port: Optional[int] = None, *, path: Optional[str] = None, family: socket.AddressFamily = AddressFamily.AF_UNSPEC, flags: socket.AddressInfo = AddressInfo.AI_PASSIVE, sock: Optional[socket] = None, backlog: int = 100, ssl: Optional[_ssl.SSLContext] = None, reuse_address: Optional[bool] = None, reuse_port: Optional[bool] = None) None

Coroutine to start the server.

Parameters:
  • host – can be a string, containing IPv4/v6 address or domain name. If host is None, server will be bound to all available interfaces.

  • port – port number.

  • path – UNIX domain socket path. If specified, host and port should be omitted (must be None).

  • family – can be set to either socket.AF_INET or socket.AF_INET6 to force the socket to use IPv4 or IPv6. If not set it will be determined from host.

  • flags – is a bitmask for getaddrinfo().

  • sock – sock can optionally be specified in order to use a preexisting socket object. If specified, host and port should be omitted (must be None).

  • backlog – is the maximum number of queued connections passed to listen().

  • ssl – can be set to an SSLContext to enable SSL over the accepted connections.

  • reuse_address – tells the kernel to reuse a local socket in TIME_WAIT state, without waiting for its natural timeout to expire.

  • reuse_port – tells the kernel to allow this endpoint to be bound to the same port as other existing endpoints are bound to, so long as they all set this flag when being created.

close() None

Stops accepting new connections, cancels all currently running requests. Request handlers are able to handle CancelledError and exit properly.

async wait_closed() None

Coroutine to wait until all existing request handlers will exit properly.

grpclib.utils.graceful_exit(servers: ~typing.Collection[IClosable], *, loop: ~typing.Optional[~asyncio.events.AbstractEventLoop] = None, signals: ~typing.Collection[int] = (<Signals.SIGINT: 2>, <Signals.SIGTERM: 15>)) Iterator[None]

Utility context-manager to help properly shutdown server in response to the OS signals

By default this context-manager handles SIGINT and SIGTERM signals.

There are two stages:

  1. first received signal closes servers

  2. subsequent signals raise SystemExit exception

Example:

async def main(...):
    ...
    with graceful_exit([server]):
        await server.start(host, port)
        print('Serving on {}:{}'.format(host, port))
        await server.wait_closed()
        print('Server closed')

First stage calls server.close() and await server.wait_closed() should complete successfully without errors. If server wasn’t started yet, second stage runs to prevent server start.

Second stage raises SystemExit exception, but you will receive asyncio.CancelledError in your async def main() coroutine. You can use try..finally constructs and context-managers to properly handle this error.

This context-manager is designed to work in cooperation with asyncio.run() function:

if __name__ == '__main__':
    asyncio.run(main())
Parameters:
  • servers – list of servers

  • loop – (deprecated) asyncio-compatible event loop

  • signals – set of the OS signals to handle

Note

Not supported in Windows