Module type Rpc_parallel.Creator
type ('worker, 'query, 'response) _function
type ('worker, 'query, 'response) _direct
type worker
type worker_state
type connection_state
val create_rpc : ?name:string -> f:(worker_state:worker_state -> conn_state:connection_state -> 'query -> 'response Async.Deferred.t) -> bin_input:'query Bin_prot.Type_class.t -> bin_output:'response Bin_prot.Type_class.t -> unit -> (worker, 'query, 'response) _function
create_rpc ?name ~f ~bin_input ~bin_output ()
will create anRpc.Rpc.t
withname
if specified and usef
as an implementation for this Rpc. It returns back a_function
, a type-safe Rpc protocol.
val create_pipe : ?name:string -> f:(worker_state:worker_state -> conn_state:connection_state -> 'query -> 'response Async.Pipe.Reader.t Async.Deferred.t) -> bin_input:'query Bin_prot.Type_class.t -> bin_output:'response Bin_prot.Type_class.t -> unit -> (worker, 'query, 'response Async.Pipe.Reader.t) _function
create_pipe ?name ~f ~bin_input ~bin_output ()
will create anRpc.Pipe_rpc.t
withname
if specified. The implementation for this Rpc is a function that creates aPipe.Reader.t
and aPipe.Writer.t
, then callsf arg ~writer
and returns the reader.Notice that
aborted
is not exposed. The pipe is closed upon aborted.
val create_direct_pipe : ?name:string -> f:(worker_state:worker_state -> conn_state:connection_state -> 'query -> 'response Async.Rpc.Pipe_rpc.Direct_stream_writer.t -> unit Async.Deferred.t) -> bin_input:'query Bin_prot.Type_class.t -> bin_output:'response Bin_prot.Type_class.t -> unit -> (worker, 'query, 'response) _direct
create_direct_pipe ?name ~f ~bin_input ~bin_output ()
will create anRpc.Pipe_rpc.t
withname
if specified.
val create_one_way : ?name:string -> f:(worker_state:worker_state -> conn_state:connection_state -> 'query -> unit) -> bin_input:'query Bin_prot.Type_class.t -> unit -> (worker, 'query, unit) _function
create_one_way ?name ~f ~bin_msg ()
will create anRpc.One_way.t
withname
if specified and usef
as an implementation.
val create_reverse_pipe : ?name:string -> f:(worker_state:worker_state -> conn_state:connection_state -> 'query -> 'update Async.Pipe.Reader.t -> 'response Async.Deferred.t) -> bin_query:'query Bin_prot.Type_class.t -> bin_update:'update Bin_prot.Type_class.t -> bin_response:'response Bin_prot.Type_class.t -> unit -> (worker, 'query * 'update Async.Pipe.Reader.t, 'response) _function
create_reverse_pipe ?name ~f ~bin_query ~bin_update ~bin_response ()
generates a function allowing you to send aquery
and a pipe ofupdate
s to a worker. The worker will send back aresponse
. It is up to you whether to send aresponse
before or after finishing with the pipe; Rpc_parallel doesn't care.
val create_reverse_direct_pipe : ?name:string -> f:(worker_state:worker_state -> conn_state:connection_state -> 'query -> 'update Async.Pipe.Reader.t -> 'response Async.Deferred.t) -> bin_query:'query Bin_prot.Type_class.t -> bin_update:'update Bin_prot.Type_class.t -> bin_response:'response Bin_prot.Type_class.t -> unit -> (worker, 'query * ('update Async.Rpc.Pipe_rpc.Direct_stream_writer.t -> unit Core.Or_error.t Async.Deferred.t), 'response) _function
create_reverse_pipe ?name ~f ~bin_query ~bin_update ~bin_response ()
generates a function allowing you to send aquery
and a direct stream ofupdate
s to a worker. The worker will send back aresponse
. It is up to you whether to send aresponse
before or after finishing with the pipe; Rpc_parallel doesn't care.
val of_async_rpc : f:(worker_state:worker_state -> conn_state:connection_state -> 'query -> 'response Async.Deferred.t) -> ('query, 'response) Async.Rpc.Rpc.t -> (worker, 'query, 'response) _function
of_async_rpc ~f rpc
is the analog tocreate_rpc
but instead of creating an Rpc protocol, it uses the supplied one
val of_async_pipe_rpc : f:(worker_state:worker_state -> conn_state:connection_state -> 'query -> 'response Async.Pipe.Reader.t Async.Deferred.t) -> ('query, 'response, Core.Error.t) Async.Rpc.Pipe_rpc.t -> (worker, 'query, 'response Async.Pipe.Reader.t) _function
of_async_pipe_rpc ~f rpc
is the analog tocreate_pipe
but instead of creating a Pipe rpc protocol, it uses the supplied one.Notice that
aborted
is not exposed. The pipe is closed upon aborted.
val of_async_direct_pipe_rpc : f:(worker_state:worker_state -> conn_state:connection_state -> 'query -> 'response Async.Rpc.Pipe_rpc.Direct_stream_writer.t -> unit Async.Deferred.t) -> ('query, 'response, Core.Error.t) Async.Rpc.Pipe_rpc.t -> (worker, 'query, 'response) _direct
of_async_direct_pipe_rpc ~f rpc
is the analog tocreate_direct_pipe
but instead of creating a Pipe rpc protocol, it uses the supplied one.
val of_async_one_way_rpc : f:(worker_state:worker_state -> conn_state:connection_state -> 'query -> unit) -> 'query Async.Rpc.One_way.t -> (worker, 'query, unit) _function
of_async_one_way_rpc ~f rpc
is the analog tocreate_one_way
but instead of creating a One_way rpc protocol, it uses the supplied one