Up

Module Connection

Signature

val sexp_of_t : t -> Sexplib.Sexp.t
val create : ?implementations:'s Async_rpc_kernel.Implementations.t -> connection_state:(t -> 's) -> ?handshake_timeout:Core_kernel.Std.Time_ns.Span.t -> ?heartbeat_config:Heartbeat_config.t -> ?description:Core_kernel.Std.Info.t -> Async_rpc_kernel.Transport.t -> (t, Core_kernel.Std.Exn.t) Core_kernel.Std.Result.t Async_kernel.Std.Deferred.t
val description : t -> Core_kernel.Std.Info.t
val add_heartbeat_callback : t -> (unit -> unit) -> unit
val close : ?streaming_responses_flush_timeout:Core_kernel.Std.Time_ns.Span.t -> ?reason:Core_kernel.Std.Info.t -> t -> unit Async_kernel.Std.Deferred.t
val close_finished : t -> unit Async_kernel.Std.Deferred.t
val is_closed : t -> bool
val bytes_to_write : t -> int
val flushed : t -> unit Async_kernel.Std.Deferred.t
val with_close : ?implementations:'s Async_rpc_kernel.Implementations.t -> ?handshake_timeout:Core_kernel.Std.Time_ns.Span.t -> ?heartbeat_config:Heartbeat_config.t -> connection_state:(t -> 's) -> Async_rpc_kernel.Transport.t -> dispatch_queries:(t -> 'a Async_kernel.Std.Deferred.t) -> on_handshake_error:[
| `Raise
] -> 'a Async_kernel.Std.Deferred.t
val server_with_close : ?handshake_timeout:Core_kernel.Std.Time_ns.Span.t -> ?heartbeat_config:Heartbeat_config.t -> Async_rpc_kernel.Transport.t -> implementations:'s Async_rpc_kernel.Implementations.t -> connection_state:(t -> 's) -> on_handshake_error:[
| `Ignore
| `Raise
] -> unit Async_kernel.Std.Deferred.t
val create : ?implementations:'s Implementations.t -> connection_state:(t -> 's) -> ?max_message_size:int -> ?handshake_timeout:Core.Std.Time.Span.t -> ?heartbeat_config:Heartbeat_config.t -> ?description:Core.Std.Info.t -> Async_extra.Import.Reader.t -> Async_extra.Import.Writer.t -> (t, Core.Std.Exn.t) Core.Std.Result.t Async_extra.Import.Deferred.t

These functions are mostly the same as the ones with the same names in Async_rpc_kernel.Std.Rpc.Connection; see Connection_intf in that library for documentation. The differences are that:

  • they take an Async_unix.Std.Reader.t, Async_unix.Std.Writer.t and max_message_size instead of a Transport.t
  • they use Time instead of Time_ns
val with_close : ?implementations:'s Implementations.t -> ?max_message_size:int -> ?handshake_timeout:Core.Std.Time.Span.t -> ?heartbeat_config:Heartbeat_config.t -> connection_state:(t -> 's) -> Async_extra.Import.Reader.t -> Async_extra.Import.Writer.t -> dispatch_queries:(t -> 'a Async_extra.Import.Deferred.t) -> on_handshake_error:[
| `Raise
] -> 'a Async_extra.Import.Deferred.t
val server_with_close : ?max_message_size:int -> ?handshake_timeout:Core.Std.Time.Span.t -> ?heartbeat_config:Heartbeat_config.t -> Async_extra.Import.Reader.t -> Async_extra.Import.Writer.t -> implementations:'s Implementations.t -> connection_state:(t -> 's) -> on_handshake_error:[
| `Raise
| `Ignore
] -> unit Async_extra.Import.Deferred.t
type transport_maker = Async_extra.Import.Fd.t -> max_message_size:int -> Transport.t

A function creating a transport from a file descriptor. It is responsible for setting the low-level parameters of the underlying transport.

For instance to setup a transport using Async.Std.{Reader,Writer} and set a buffer age limit on the writer, you can pass this to the functions of this module:


        ~make_transport:(fun fd ~max_message_size ->
          Rpc.Transport.of_fd fd ~max_message_size ~buffer_age_limit:`Unlimited)
      
val serve : implementations:'s Implementations.t -> initial_connection_state:('address -> t -> 's) -> where_to_listen:('address, 'listening_on) Async_extra.Tcp.Where_to_listen.t -> ?max_connections:int -> ?backlog:int -> ?max_message_size:int -> ?make_transport:transport_maker -> ?handshake_timeout:Core.Std.Time.Span.t -> ?heartbeat_config:Heartbeat_config.t -> ?auth:('address -> bool) -> ?on_handshake_error:[
| `Raise
| `Ignore
| `Call of Core.Std.Exn.t -> unit
] -> unit -> ('address, 'listening_on) Async_extra.Tcp.Server.t Async_extra.Import.Deferred.t

serve implementations ~port ?on_handshake_error () starts a server with the given implementation on port. The optional auth function will be called on all incoming connections with the address info of the client and will disconnect the client immediately if it returns false. This auth mechanism is generic and does nothing other than disconnect the client - any logging or record of the reasons is the responsibility of the auth function itself.

module Client_implementations : sig .. end
val client : host:string -> port:int -> ?via_local_interface:Async_extra.Import.Unix.Inet_addr.t -> ?implementations:_ Client_implementations.t -> ?max_message_size:int -> ?make_transport:transport_maker -> ?handshake_timeout:Core.Std.Time.Span.t -> ?heartbeat_config:Heartbeat_config.t -> ?description:Core.Std.Info.t -> unit -> (t, Core.Std.Exn.t) Core.Std.Result.t Async_extra.Import.Deferred.t

client ~host ~port () connects to the server at (host,port) and returns the connection or an Error if a connection could not be made. It is the responsibility of the caller to eventually call close.

In client and with_client, the handshake_timeout encompasses both the TCP connection timeout and the timeout for this module's own handshake.

val with_client : host:string -> port:int -> ?via_local_interface:Async_extra.Import.Unix.Inet_addr.t -> ?implementations:_ Client_implementations.t -> ?max_message_size:int -> ?make_transport:transport_maker -> ?handshake_timeout:Core.Std.Time.Span.t -> ?heartbeat_config:Heartbeat_config.t -> (t -> 'a Async_extra.Import.Deferred.t) -> ('a, Core.Std.Exn.t) Core.Std.Result.t Async_extra.Import.Deferred.t

with_client ~host ~port f connects to the server at (host,port) and runs f until an exception is thrown or until the returned Deferred is fulfilled.

NOTE: As with with_close, you should be careful when using this with Pipe_rpc. See with_close for more information.