Up

module Rpc

: sig

A library for building asynchronous RPC-style protocols.

The approach here is to have a separate representation of the server-side implementation of an RPC (An Implementation.t) and the interface that it exports (either an Rpc.t, a State_rpc.t or a Pipe_rpc.t, but we'll refer to them generically as RPC interfaces). A server builds the Implementation.t out of an RPC interface and a function for implementing the RPC, while the client dispatches a request using the same RPC interface.

The Implementation.t hides the type of the query and the response, whereas the Rpc.t is polymorphic in the query and response type. This allows you to build a Implementations.t out of a list of Implementation.ts.

Each RPC also comes with a version number. This is meant to allow support of multiple different versions of what is essentially the same RPC. You can think of it as an extension to the name of the RPC, and in fact, each RPC is uniquely identified by its (name, version) pair. RPCs with the same name but different versions should implement similar functionality.

#
module Implementation : sig
#
type 'connection_state t

A 'connection_state t is something that knows how to respond to one query, given a 'connection_state. That is, you can create a 'connection_state t by providing a function which takes a query *and* a 'connection_state and provides a response.

The reason for this is that rpcs often do something like look something up in a master structure. This way, Implementation.t's can be created without having the master structure in your hands.

#
module Description : sig
#
type t = {
# name
: string;
# version
: int;
}
#
val t_of_sexp : Sexplib.Sexp.t -> t
#
val sexp_of_t : t -> Sexplib.Sexp.t
#
val compare : t -> t -> int
end
#
val description : _ t -> Description.t
#
val lift : 'a t -> f:('b -> 'a) -> 'b t

We may want to use an 'a t implementation (perhaps provided by someone else) in a 'b t context. We can do this as long as we can map our state into the state expected by the original implementer.

end
#
module Implementations : sig
#
type 'connection_state t

A 'connection_state Implementations.t is something that knows how to respond to many different queries. It is conceptually a package of 'connection_state Implementation.t's.

#
val null : unit -> 'connection_state t

a server that can handle no queries

#
val create : implementations:'connection_state Implementation.t list -> on_unknown_rpc:[
| `Raise
| `Continue
| `Close_connection
| `Call of rpc_tag:string -> version:int -> [
| `Close_connection
| `Continue
]
] -> ('connection_state t, [
| `Duplicate_implementations of Implementation.Description.t list
]) Core.Std.Result.t

create ~implementations ~on_unknown_rpc creates a server capable of responding to the rpc's implemented in the implementation list. Be careful about setting on_unknown_rpc to `Raise because other programs may mistakenly connect to this one causing it to crash.

#
val create_exn : implementations:'connection_state Implementation.t list -> on_unknown_rpc:[
| `Raise
| `Continue
| `Close_connection
| `Call of rpc_tag:string -> version:int -> [
| `Close_connection
| `Continue
]
] -> 'connection_state t
end
#
module type Connection = Rpc_intf.Connection with module Implementations := Implementations
#
module Connection : Connection
#
module Rpc : sig
#
type ('query, 'response) t
#
val create : name:string -> version:int -> bin_query:'query Core.Std.Bin_prot.Type_class.t -> bin_response:'response Core.Std.Bin_prot.Type_class.t -> ('query, 'response) t
#
val name : (_, _) t -> string

the same values as were passed to create.

#
val version : (_, _) t -> int
#
val bin_query : ('query, _) t -> 'query Core.Std.Bin_prot.Type_class.t
#
val bin_response : (_, 'response) t -> 'response Core.Std.Bin_prot.Type_class.t
#
val implement : ('query, 'response) t -> ('connection_state -> 'query -> 'response Import.Deferred.t) -> 'connection_state Implementation.t
#
val implement' : ('query, 'response) t -> ('connection_state -> 'query -> 'response) -> 'connection_state Implementation.t
#
val dispatch : ('query, 'response) t -> Connection.t -> 'query -> 'response Core.Std.Or_error.t Import.Deferred.t
#
val dispatch_exn : ('query, 'response) t -> Connection.t -> 'query -> 'response Import.Deferred.t
end
#
module Pipe_rpc : sig
#
type ('query, 'response, 'error) t
#
module Id : sig
#
type t
end
#
val create : ?client_pushes_back:unit -> name:string -> version:int -> bin_query:'query Core.Std.Bin_prot.Type_class.t -> bin_response:'response Core.Std.Bin_prot.Type_class.t -> bin_error:'error Core.Std.Bin_prot.Type_class.t -> unit -> ('query, 'response, 'error) t
#
val bin_query : ('query, _, _) t -> 'query Core.Std.Bin_prot.Type_class.t
#
val bin_response : (_, 'response, _) t -> 'response Core.Std.Bin_prot.Type_class.t
#
val bin_error : (_, _, 'error) t -> 'error Core.Std.Bin_prot.Type_class.t
#
val implement : ('query, 'response, 'error) t -> ('connection_state -> 'query -> aborted:unit Import.Deferred.t -> ('response Import.Pipe.Reader.t, 'error) Core.Std.Result.t Import.Deferred.t) -> 'connection_state Implementation.t
#
val dispatch : ('query, 'response, 'error) t -> Connection.t -> 'query -> ('response Import.Pipe.Reader.t * Id.t, 'error) Core.Std.Result.t Core.Std.Or_error.t Import.Deferred.t

This has (..., 'error) Result.t as its return type to represent the possibility of the call itself being somehow erroneous (but understood - the outer Or_error.t encompasses failures of that nature). Note that this cannot be done simply by making 'response a result type, since ('response Pipe.Reader.t, 'error) Result.t is distinct from ('response, 'error) Result.t Pipe.Reader.t.

Closing the pipe has the effect of calling abort.

#
val dispatch_exn : ('query, 'response, 'error) t -> Connection.t -> 'query -> ('response Import.Pipe.Reader.t * Id.t) Import.Deferred.t
#
val abort : (_, _, _) t -> Connection.t -> Id.t -> unit

abort rpc connection id given an RPC and the id returned as part of a call to dispatch, abort requests that the other side of the connection stop sending updates.

#
val name : (_, _, _) t -> string
#
val version : (_, _, _) t -> int
end
#
module State_rpc : sig

A state rpc is an easy way for two processes to synchronize a data structure by sending updates over the wire. It's basically a pipe rpc that sends/receives an initial state of the data structure, and then updates, and applies the updates under the covers.

#
type ('query, 'state, 'update, 'error) t
#
module Id : sig
#
type t
end
#
val create : ?client_pushes_back:unit -> name:string -> version:int -> bin_query:'query Core.Std.Bin_prot.Type_class.t -> bin_state:'state Core.Std.Bin_prot.Type_class.t -> bin_update:'update Core.Std.Bin_prot.Type_class.t -> bin_error:'error Core.Std.Bin_prot.Type_class.t -> unit -> ('query, 'state, 'update, 'error) t
#
val bin_query : ('query, _, _, _) t -> 'query Core.Std.Bin_prot.Type_class.t
#
val bin_state : (_, 'state, _, _) t -> 'state Core.Std.Bin_prot.Type_class.t
#
val bin_update : (_, _, 'update, _) t -> 'update Core.Std.Bin_prot.Type_class.t
#
val bin_error : (_, _, _, 'error) t -> 'error Core.Std.Bin_prot.Type_class.t
#
val implement : ('query, 'state, 'update, 'error) t -> ('connection_state -> 'query -> aborted:unit Import.Deferred.t -> ('state * 'update Import.Pipe.Reader.t, 'error) Core.Std.Result.t Import.Deferred.t) -> 'connection_state Implementation.t
#
val dispatch : ('query, 'state, 'update, 'error) t -> Connection.t -> 'query -> update:('state -> 'update -> 'state) -> ('state * ('state * 'update) Import.Pipe.Reader.t * Id.t, 'error) Core.Std.Result.t Core.Std.Or_error.t Import.Deferred.t
#
val abort : (_, _, _, _) t -> Connection.t -> Id.t -> unit
#
val name : (_, _, _, _) t -> string
#
val version : (_, _, _, _) t -> int
end
#
module Any : sig
#
type t =
# | Rpc : ('q, 'r) Rpc.t -> t
# | Pipe : ('q, 'r, 'e) Pipe_rpc.t -> t
# | State : ('q, 's, 'u, 'e) State_rpc.t -> t
end
end