Events

You can listen() for client-side events by using Channel instance as a target:

from grpclib.events import SendRequest

channel = Channel()

async def send_request(event: SendRequest):
    event.metadata['injected'] = 'successfully'

listen(channel, SendRequest, send_request)

For the server-side events you can listen() Server instance:

from grpclib.events import RecvRequest

server = Server([service])

async def recv_request(event: RecvRequest):
    print(event.metadata.get('injected'))

listen(server, RecvRequest, recv_request)

There are two types of event properties:

  • mutable: you can change/mutate these properties and this will have an effect

  • read-only: you can only read them

Listening callbacks are called in order: first added, first called. Each callback can event.interrupt() sequence of calls for a particular event:

async def authn_error(stream):
    raise GRPCError(Status.UNAUTHENTICATED)

async def recv_request(event: RecvRequest):
    if event.metadata.get('auth-token') != SECRET:
        # provide custom RPC handler
        event.method_func = authn_error
        event.interrupt()

listen(server, RecvRequest, recv_request)

Common Events

grpclib.events.listen(target: IEventsTarget, event_type: Type[_EventType], callback: Callable[[_EventType], Coroutine[Any, Any, None]]) None

Registers a listener function for the given target and event type

async def callback(event: SomeEvent):
    print(event.data)

listen(target, SomeEvent, callback)
class grpclib.events.SendMessage(**kwargs)

Dispatches before sending message to the other party

Parameters:

message (mutable) – message to send

class grpclib.events.RecvMessage(**kwargs)

Dispatches after message was received from the other party

Parameters:

message (mutable) – received message

Client-Side Events

See also SendMessage and RecvMessage. You can listen for them on the client-side.

class grpclib.events.SendRequest(**kwargs)

Dispatches before sending request to the server

Parameters:
  • metadata (mutable) – invocation metadata

  • method_name (read-only) – RPC’s method name

  • deadline (read-only) – request’s Deadline

  • content_type (read-only) – request’s content type

class grpclib.events.RecvInitialMetadata(**kwargs)

Dispatches after headers with initial metadata were received from the server

Parameters:

metadata (mutable) – initial metadata

class grpclib.events.RecvTrailingMetadata(**kwargs)

Dispatches after trailers with trailing metadata were received from the server

Parameters:
  • metadata (mutable) – trailing metadata

  • status (read-only) – status of the RPC call

  • status_message (read-only) – description of the status

  • status_details (read-only) – additional status details

Server-Side Events

See also RecvMessage and SendMessage. You can listen for them on the server-side.

class grpclib.events.RecvRequest(**kwargs)

Dispatches after request was received from the client

Parameters:
  • metadata (mutable) – invocation metadata

  • method_func (mutable) – coroutine function to process this request, accepts Stream

  • method_name (read-only) – RPC’s method name

  • deadline (read-only) – request’s Deadline

  • content_type (read-only) – request’s content type

  • user_agent (read-only) – request’s user agent

  • peer (read-only) – request’s Peer

class grpclib.events.SendInitialMetadata(**kwargs)

Dispatches before sending headers with initial metadata to the client

Parameters:

metadata (mutable) – initial metadata

class grpclib.events.SendTrailingMetadata(**kwargs)

Dispatches before sending trailers with trailing metadata to the client

Parameters:
  • metadata (mutable) – trailing metadata

  • status (read-only) – status of the RPC call

  • status_message (read-only) – description of the status

  • status_details (read-only) – additional status details