Server
A single Server
can serve arbitrary
number of services:
server = Server([foo_svc, bar_svc, baz_svc])
To monitor health of your services you can use standard gRPC health checking protocol, details are here: Health Checking.
There is a special gRPC reflection protocol to inspect running servers and call their methods using command-line tools, details are here: Reflection. It is as simple as using curl.
It is also important to handle server’s exit properly:
with graceful_exit([server]):
await server.start(host, port)
print(f'Serving on {host}:{port}')
await server.wait_closed()
graceful_exit()
helps you handle SIGINT
and
SIGTERM
signals.
When things become complicated you can start using
AsyncExitStack
and
asynccontextmanager()
to manage lifecycle of your
application and used resources:
async with AsyncExitStack() as stack:
db = await stack.enter_async_context(setup_db())
foo_svc = FooService(db)
server = Server([foo_svc])
stack.enter_context(graceful_exit([server]))
await server.start(host, port)
print(f'Serving on {host}:{port}')
await server.wait_closed()
Reference
- class grpclib.server.Stream(stream: protocol.Stream, method_name: str, cardinality: Cardinality, recv_type: Type[_RecvType], send_type: Type[_SendType], *, codec: CodecBase, status_details_codec: Optional[StatusDetailsCodecBase], dispatch: _DispatchServerEvents, deadline: Optional[Deadline] = None, user_agent: Optional[str] = None)
Represents gRPC method call – HTTP/2 request/stream, and everything you need to communicate with client in order to handle this request.
As you can see, every method handler accepts single positional argument - stream:
async def MakeLatte(self, stream: grpclib.server.Stream): task: cafe_pb2.LatteOrder = await stream.recv_message() ... await stream.send_message(empty_pb2.Empty())
This is true for every gRPC method type.
- metadata: Optional[_Metadata]
Invocation metadata, received with headers from the client. Represented as a multi-dict object.
- user_agent
Client’s user-agent
- async recv_message() Optional[_RecvType]
Coroutine to receive incoming message from the client.
If client sends UNARY request, then you can call this coroutine only once. If client sends STREAM request, then you should call this coroutine several times, until it returns None when the client has ended the stream. To simplify your code in this case,
Stream
class implements async iteration protocol, so you can use it like this:async for message in stream: do_smth_with(message)
or even like this:
messages = [msg async for msg in stream]
HTTP/2 has flow control mechanism, so server will acknowledge received DATA frames as a message only after user consumes this coroutine.
- Returns
message
- async send_initial_metadata(*, metadata: Optional[Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]]]] = None) None
Coroutine to send headers with initial metadata to the client.
In gRPC you can send initial metadata as soon as possible, because gRPC doesn’t use :status pseudo header to indicate success or failure of the current request. gRPC uses trailers for this purpose, and trailers are sent during
send_trailing_metadata()
call, which should be called in the end.Note
This coroutine will be called implicitly during first
send_message()
coroutine call, if not called before explicitly.- Parameters
metadata – custom initial metadata, dict or list of pairs
- async send_message(message: _SendType) None
Coroutine to send message to the client.
If server sends UNARY response, then you should call this coroutine only once. If server sends STREAM response, then you can call this coroutine as many times as you need.
- Parameters
message – message object
- async send_trailing_metadata(*, status: Status = Status.OK, status_message: Optional[str] = None, status_details: Optional[Any] = None, metadata: Optional[Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]]]] = None) None
Coroutine to send trailers with trailing metadata to the client.
This coroutine allows sending trailers-only responses, in case of some failure conditions during handling current request, i.e. when
status is not OK
.Note
This coroutine will be called implicitly at exit from request handler, with appropriate status code, if not called explicitly during handler execution.
- Parameters
status – resulting status of this coroutine call
status_message – description for a status
metadata – custom trailing metadata, dict or list of pairs
- class grpclib.server.Server(handlers: Collection[IServable], *, loop: Optional[AbstractEventLoop] = None, codec: Optional[CodecBase] = None, status_details_codec: Optional[StatusDetailsCodecBase] = None, config: Optional[Configuration] = None)
HTTP/2 server, which uses gRPC service handlers to handle requests.
Handler is a subclass of the abstract base class, which was generated from .proto file:
class CoffeeMachine(cafe_grpc.CoffeeMachineBase): async def MakeLatte(self, stream): task: cafe_pb2.LatteOrder = await stream.recv_message() ... await stream.send_message(empty_pb2.Empty()) server = Server([CoffeeMachine()])
- Parameters
handlers – list of handlers
loop – (deprecated) asyncio-compatible event loop
codec – instance of a codec to encode and decode messages, if omitted
ProtoCodec
is used by defaultstatus_details_codec – instance of a status details codec to encode error details in a trailing metadata, if omitted
ProtoStatusDetailsCodec
is used by default
- async start(host: Optional[str] = None, port: Optional[int] = None, *, path: Optional[str] = None, family: socket.AddressFamily = AddressFamily.AF_UNSPEC, flags: socket.AddressInfo = AddressInfo.AI_PASSIVE, sock: Optional[socket] = None, backlog: int = 100, ssl: Optional[_ssl.SSLContext] = None, reuse_address: Optional[bool] = None, reuse_port: Optional[bool] = None) None
Coroutine to start the server.
- Parameters
host – can be a string, containing IPv4/v6 address or domain name. If host is None, server will be bound to all available interfaces.
port – port number.
path – UNIX domain socket path. If specified, host and port should be omitted (must be None).
family – can be set to either
socket.AF_INET
orsocket.AF_INET6
to force the socket to use IPv4 or IPv6. If not set it will be determined from host.flags – is a bitmask for
getaddrinfo()
.sock – sock can optionally be specified in order to use a preexisting socket object. If specified, host and port should be omitted (must be None).
backlog – is the maximum number of queued connections passed to listen().
ssl – can be set to an
SSLContext
to enable SSL over the accepted connections.reuse_address – tells the kernel to reuse a local socket in TIME_WAIT state, without waiting for its natural timeout to expire.
reuse_port – tells the kernel to allow this endpoint to be bound to the same port as other existing endpoints are bound to, so long as they all set this flag when being created.
- grpclib.utils.graceful_exit(servers: ~typing.Collection[IClosable], *, loop: ~typing.Optional[~asyncio.events.AbstractEventLoop] = None, signals: ~typing.Collection[int] = (<Signals.SIGINT: 2>, <Signals.SIGTERM: 15>)) Iterator[None]
Utility context-manager to help properly shutdown server in response to the OS signals
By default this context-manager handles
SIGINT
andSIGTERM
signals.There are two stages:
first received signal closes servers
subsequent signals raise
SystemExit
exception
Example:
async def main(...): ... with graceful_exit([server]): await server.start(host, port) print('Serving on {}:{}'.format(host, port)) await server.wait_closed() print('Server closed')
First stage calls
server.close()
andawait server.wait_closed()
should complete successfully without errors. If server wasn’t started yet, second stage runs to prevent server start.Second stage raises
SystemExit
exception, but you will receiveasyncio.CancelledError
in yourasync def main()
coroutine. You can usetry..finally
constructs and context-managers to properly handle this error.This context-manager is designed to work in cooperation with
asyncio.run()
function:if __name__ == '__main__': asyncio.run(main())
- Parameters
servers – list of servers
loop – (deprecated) asyncio-compatible event loop
signals – set of the OS signals to handle
Note
Not supported in Windows