Module Rpc.Connection
include module type of sig ... end
type t= Async_rpc_kernel__Rpc.Connection.t
val sexp_of_t : t -> Ppx_sexp_conv_lib.Sexp.t
val create : ?implementations:'s Async_rpc_kernel__.Implementations.t -> connection_state:(t -> 's) -> ?handshake_timeout:Core_kernel.Time_ns.Span.t -> ?heartbeat_config:Heartbeat_config.t -> ?description:Core_kernel.Info.t -> ?time_source:Async_kernel.Synchronous_time_source.t -> Async_rpc_kernel__.Transport.t -> (t, Core_kernel.Exn.t) Core_kernel.Result.t Async_kernel.Deferred.tval contains_magic_prefix : bool Bin_prot.Type_class.readerval description : t -> Core_kernel.Info.tval add_heartbeat_callback : t -> (unit -> unit) -> unitval close : ?streaming_responses_flush_timeout:Core_kernel.Time_ns.Span.t -> ?reason:Core_kernel.Info.t -> t -> unit Async_kernel.Deferred.tval close_finished : t -> unit Async_kernel.Deferred.tval close_reason : t -> on_close:[ `finished | `started ] -> Core_kernel.Info.t Async_kernel.Deferred.tval is_closed : t -> boolval bytes_to_write : t -> intval flushed : t -> unit Async_kernel.Deferred.tval with_close : ?implementations:'s Async_rpc_kernel__.Implementations.t -> ?handshake_timeout:Core_kernel.Time_ns.Span.t -> ?heartbeat_config:Heartbeat_config.t -> connection_state:(t -> 's) -> Async_rpc_kernel__.Transport.t -> dispatch_queries:(t -> 'a Async_kernel.Deferred.t) -> on_handshake_error:[ `Call of Core_kernel.Exn.t -> 'a Async_kernel.Deferred.t | `Raise ] -> 'a Async_kernel.Deferred.tval server_with_close : ?handshake_timeout:Core_kernel.Time_ns.Span.t -> ?heartbeat_config:Heartbeat_config.t -> Async_rpc_kernel__.Transport.t -> implementations:'s Async_rpc_kernel__.Implementations.t -> connection_state:(t -> 's) -> on_handshake_error:[ `Call of Core_kernel.Exn.t -> unit Async_kernel.Deferred.t | `Ignore | `Raise ] -> unit Async_kernel.Deferred.t
val create : ?implementations:'s Implementations.t -> connection_state:(t -> 's) -> ?max_message_size:int -> ?handshake_timeout:Core.Time.Span.t -> ?heartbeat_config:Heartbeat_config.t -> ?description:Core.Info.t -> Async_rpc__.Import.Reader.t -> Async_rpc__.Import.Writer.t -> (t, Core.Exn.t) Core.Result.t Async_rpc__.Import.Deferred.tThese functions are mostly the same as the ones with the same names in
Async_rpc_kernel.Rpc.Connection; seeConnection_intfin that library for documentation. The differences are that:- they take an
Async_unix.Reader.t,Async_unix.Writer.tandmax_message_sizeinstead of aTransport.t - they use
Timeinstead ofTime_ns
- they take an
val contains_magic_prefix : Async_rpc__.Import.Reader.t -> bool Async_rpc__.Import.Deferred.tAs of Feb 2017, the RPC protocol started to contain a magic number so that one can identify RPC communication. The bool returned by
contains_magic_prefixsays whether this magic number was observed.This operation is a "peek" that does not advance any pointers associated with the reader. In particular, it makes sense to call
createon a reader after calling this function.
val with_close : ?implementations:'s Implementations.t -> ?max_message_size:int -> ?handshake_timeout:Core.Time.Span.t -> ?heartbeat_config:Heartbeat_config.t -> connection_state:(t -> 's) -> Async_rpc__.Import.Reader.t -> Async_rpc__.Import.Writer.t -> dispatch_queries:(t -> 'a Async_rpc__.Import.Deferred.t) -> on_handshake_error:[ `Raise | `Call of Core.Exn.t -> 'a Async_rpc__.Import.Deferred.t ] -> 'a Async_rpc__.Import.Deferred.tval server_with_close : ?max_message_size:int -> ?handshake_timeout:Core.Time.Span.t -> ?heartbeat_config:Heartbeat_config.t -> Async_rpc__.Import.Reader.t -> Async_rpc__.Import.Writer.t -> implementations:'s Implementations.t -> connection_state:(t -> 's) -> on_handshake_error:[ `Raise | `Ignore | `Call of Core.Exn.t -> unit Async_rpc__.Import.Deferred.t ] -> unit Async_rpc__.Import.Deferred.t
type transport_maker= Async_rpc__.Import.Fd.t -> max_message_size:int -> Transport.tA function creating a transport from a file descriptor. It is responsible for setting the low-level parameters of the underlying transport.
For instance to set up a transport using
Async.{Reader,Writer}and set a buffer age limit on the writer, you can pass this to the functions of this module:~make_transport:(fun fd ~max_message_size -> Rpc.Transport.of_fd fd ~max_message_size ~buffer_age_limit:`Unlimited)
type on_handshake_error=[|`Raise|`Ignore|`Call of Core.Exn.t -> unit]
val serve : implementations:'s Implementations.t -> initial_connection_state:('address -> t -> 's) -> where_to_listen:('address, 'listening_on) Async_rpc__.Import.Tcp.Where_to_listen.t -> ?max_connections:int -> ?backlog:int -> ?max_message_size:int -> ?make_transport:transport_maker -> ?handshake_timeout:Core.Time.Span.t -> ?heartbeat_config:Heartbeat_config.t -> ?auth:('address -> bool) -> ?on_handshake_error:on_handshake_error -> ?on_handler_error:[ `Raise | `Ignore | `Call of 'address -> exn -> unit ] -> unit -> ('address, 'listening_on) Async_rpc__.Import.Tcp.Server.t Async_rpc__.Import.Deferred.tserve implementations ~port ?on_handshake_error ()starts a server with the given implementation onport. The optional auth function will be called on all incoming connections with the address info of the client and will disconnect the client immediately if it returns false. This auth mechanism is generic and does nothing other than disconnect the client -- any logging or record of the reasons is the responsibility of the auth function itself.
val serve_with_transport : handshake_timeout:Core.Time.Span.t option -> heartbeat_config:Heartbeat_config.t option -> implementations:'s Implementations.t -> description:Core.Info.t -> connection_state:(t -> 's) -> on_handshake_error:on_handshake_error -> Transport.t -> unit Async_rpc__.Import.Deferred.tval client : ?implementations:_ Client_implementations.t -> ?max_message_size:int -> ?make_transport:transport_maker -> ?handshake_timeout:Core.Time.Span.t -> ?heartbeat_config:Heartbeat_config.t -> ?description:Core.Info.t -> _ Async_rpc__.Import.Tcp.Where_to_connect.t -> (t, Core.Exn.t) Core.Result.t Async_rpc__.Import.Deferred.tclient where_to_connect ()connects to the server atwhere_to_connectand returns the connection or an Error if a connection could not be made. It is the responsibility of the caller to eventually callclose.In
clientandwith_client, thehandshake_timeoutencompasses both the TCP connection timeout and the timeout for this module's own handshake.
val client' : ?implementations:_ Client_implementations.t -> ?max_message_size:int -> ?make_transport:transport_maker -> ?handshake_timeout:Core.Time.Span.t -> ?heartbeat_config:Heartbeat_config.t -> ?description:Core.Info.t -> 'transport Async_rpc__.Import.Tcp.Where_to_connect.t -> ('transport * t, Core.Exn.t) Core.Result.t Async_rpc__.Import.Deferred.tSimilar to
client, but additionally expose theSocket.Address.tof the RPC server that we connected to.
val with_client : ?implementations:_ Client_implementations.t -> ?max_message_size:int -> ?make_transport:transport_maker -> ?handshake_timeout:Core.Time.Span.t -> ?heartbeat_config:Heartbeat_config.t -> _ Async_rpc__.Import.Tcp.Where_to_connect.t -> (t -> 'a Async_rpc__.Import.Deferred.t) -> ('a, Core.Exn.t) Core.Result.t Async_rpc__.Import.Deferred.twith_client where_to_connect fconnects to the server atwhere_to_connectand runs f until an exception is thrown or until the returned Deferred is fulfilled.NOTE: As with
with_close, you should be careful when using this withPipe_rpc. Seewith_closefor more information.
val with_client' : ?implementations:_ Client_implementations.t -> ?max_message_size:int -> ?make_transport:transport_maker -> ?handshake_timeout:Core.Time.Span.t -> ?heartbeat_config:Heartbeat_config.t -> 'transport Async_rpc__.Import.Tcp.Where_to_connect.t -> (remote_server:'transport -> t -> 'a Async_rpc__.Import.Deferred.t) -> ('a, Core.Exn.t) Core.Result.t Async_rpc__.Import.Deferred.tSimilar to
with_client, but additionally expose theSocket.Address.tof the RPC server that we connected to.