A library for building asynchronous RPC-style protocols.

The approach here is to have a separate representation of the server-side implementation of an RPC (An Implementation.t) and the interface that it exports (either an Rpc.t, a State_rpc.t or a Pipe_rpc.t, but we'll refer to them generically as RPC interfaces). A server builds the Implementation.t out of an RPC interface and a function for implementing the RPC, while the client dispatches a request using the same RPC interface.

The Implementation.t hides the type of the query and the response, whereas the Rpc.t is polymorphic in the query and response type. This allows you to build a Implementations.t out of a list of Implementation.ts.

Each RPC also comes with a version number. This is meant to allow support of multiple different versions of what is essentially the same RPC. You can think of it as an extension to the name of the RPC, and in fact, each RPC is uniquely identified by its (name, version) pair. RPCs with the same name but different versions should implement similar functionality.

module Implementation : sig .. end
type 'connection_state t
A 'connection_state t is something which knows how to respond to one query, given a 'connection_state. That is, you can create a 'connection_state t by providing a function which takes a query *and* a 'connection_state and provides a response.

The reason for this is that rpcs often do something like look something up in a master structure. This way, Implementation.t's can be created without having the master structure in your hands.

module Description : sig .. end
type t = {
name : string;
version : int;
}
val t_of_sexp : Sexplib.Sexp.t -> t
val sexp_of_t : t -> Sexplib.Sexp.t
val description : 'a t -> Description.t
val lift : 'a t -> f:('b -> 'a) -> 'b t
We may want to use an 'a t implementation (perhaps provided by someone else) in a 'b t context. We can do this as long as we can map our state into the state expected by the original implementer.
module Implementations : sig .. end
type 'connection_state t
A 'connection_state Implementations.t is something which knows how to respond to many different queries. It is conceptually a package of 'connection_state Implementation.t's.
val null : unit -> 'connection_state t
a server that can handle no queries
val create : implementations:'connection_state Implementation.t list ->
on_unknown_rpc:[ `Call of rpc_tag:string -> version:int -> unit
               | `Ignore
               | `Raise ] ->
('connection_state t,
 [ `Duplicate_implementations of Implementation.Description.t list ])
Core.Std.Result.t
create ~implementations ~on_unknown_rpc creates a server capable of responding to the rpc's implemented in the implementation list.
val create_exn : implementations:'connection_state Implementation.t list ->
on_unknown_rpc:[ `Call of rpc_tag:string -> version:int -> unit
               | `Ignore
               | `Raise ] ->
'connection_state t
module type Connection = Rpc_intf.Connection with module Implementations = Implementations
module Connection : Connection
module Rpc : sig .. end
type ('query, 'response) t
val create : name:string ->
version:int ->
bin_query:'query Core.Std.Bin_prot.Type_class.t ->
bin_response:'response Core.Std.Bin_prot.Type_class.t ->
('query, 'response) t
val name : ('a, 'b) t -> string
the same values as were passed to create.
val version : ('a, 'b) t -> int
val bin_query : ('query, 'a) t -> 'query Core.Std.Bin_prot.Type_class.t
val bin_response : ('a, 'response) t -> 'response Core.Std.Bin_prot.Type_class.t
val implement : ('query, 'response) t ->
('connection_state -> 'query -> 'response Import.Deferred.t) ->
'connection_state Implementation.t
val dispatch : ('query, 'response) t ->
Connection.t -> 'query -> 'response Core.Std.Or_error.t Import.Deferred.t
val dispatch_exn : ('query, 'response) t ->
Connection.t -> 'query -> 'response Import.Deferred.t
module Pipe_rpc : sig .. end
type ('query, 'response, 'error) t
module Id : sig .. end
type t
val create : name:string ->
version:int ->
bin_query:'query Core.Std.Bin_prot.Type_class.t ->
bin_response:'response Core.Std.Bin_prot.Type_class.t ->
bin_error:'error Core.Std.Bin_prot.Type_class.t ->
('query, 'response, 'error) t
val bin_query : ('query, 'a, 'b) t -> 'query Core.Std.Bin_prot.Type_class.t
val bin_response : ('a, 'response, 'b) t -> 'response Core.Std.Bin_prot.Type_class.t
val bin_error : ('a, 'b, 'error) t -> 'error Core.Std.Bin_prot.Type_class.t
val implement : ('query, 'response, 'error) t ->
('connection_state ->
 'query ->
 aborted:unit Import.Deferred.t ->
 ('response Import.Pipe.Reader.t, 'error) Core.Std.Result.t Import.Deferred.t) ->
'connection_state Implementation.t
val dispatch : ('query, 'response, 'error) t ->
Connection.t ->
'query ->
('response Import.Pipe.Reader.t * Id.t, 'error) Core.Std.Result.t
Core.Std.Or_error.t Import.Deferred.t
This has (..., 'error) Result.t as its return type to represent the possibility of the call itself being somehow erroneous (but understood - the outer Or_error.t encompasses failures of that nature). Note that this cannot be done simply by making 'response a result type, since ('response Pipe.Reader.t, 'error) Result.t is distinct from ('response, 'error) Result.t Pipe.Reader.t.

Closing the pipe has the effect of calling abort.

val dispatch_exn : ('query, 'response, 'error) t ->
Connection.t ->
'query -> ('response Import.Pipe.Reader.t * Id.t) Import.Deferred.t
val abort : ('a, 'b, 'c) t -> Connection.t -> Id.t -> unit
abort rpc connection id given an RPC and the id returned as part of a call to dispatch, abort requests that the other side of the connection stop sending updates.
val name : ('a, 'b, 'c) t -> string
val version : ('a, 'b, 'c) t -> int
module State_rpc : sig .. end
A state rpc is an easy way for two processes to synchronize a data structure by sending updates over the wire. It's basically a pipe rpc that sends/receives an initial state of the data structure, and then updates, and applies the updates under the covers.
type ('query, 'state, 'update, 'error) t
module Id : sig .. end
type t
val create : name:string ->
version:int ->
bin_query:'query Core.Std.Bin_prot.Type_class.t ->
bin_state:'state Core.Std.Bin_prot.Type_class.t ->
bin_update:'update Core.Std.Bin_prot.Type_class.t ->
bin_error:'error Core.Std.Bin_prot.Type_class.t ->
('query, 'state, 'update, 'error) t
val bin_query : ('query, 'a, 'b, 'c) t -> 'query Core.Std.Bin_prot.Type_class.t
val bin_state : ('a, 'state, 'b, 'c) t -> 'state Core.Std.Bin_prot.Type_class.t
val bin_update : ('a, 'b, 'update, 'c) t -> 'update Core.Std.Bin_prot.Type_class.t
val bin_error : ('a, 'b, 'c, 'error) t -> 'error Core.Std.Bin_prot.Type_class.t
val implement : ('query, 'state, 'update, 'error) t ->
('connection_state ->
 'query ->
 aborted:unit Import.Deferred.t ->
 ('state * 'update Import.Pipe.Reader.t, 'error) Core.Std.Result.t
 Import.Deferred.t) ->
'connection_state Implementation.t
val dispatch : ('query, 'state, 'update, 'error) t ->
Connection.t ->
'query ->
update:('state -> 'update -> 'state) ->
('state * ('state * 'update) Import.Pipe.Reader.t * Id.t, 'error)
Core.Std.Result.t Core.Std.Or_error.t Import.Deferred.t
val abort : ('a, 'b, 'c, 'd) t -> Connection.t -> Id.t -> unit
val name : ('a, 'b, 'c, 'd) t -> string
val version : ('a, 'b, 'c, 'd) t -> int