Up

Module Pipe_rpc = Async_extra.Import.Rpc_kernel.Pipe_rpc

Signature

type ('query, 'response, 'error) t
module Id : sig .. end
module Metadata : sig .. end
val create : ?client_pushes_back:unit -> name:string -> version:int -> bin_query:'query Core_kernel.Std.Bin_prot.Type_class.t -> bin_response:'response Core_kernel.Std.Bin_prot.Type_class.t -> bin_error:'error Core_kernel.Std.Bin_prot.Type_class.t -> unit -> ('query, 'response, 'error) t
val bin_query : ('query, _, _) t -> 'query Core_kernel.Std.Bin_prot.Type_class.t
val bin_response : (_, 'response, _) t -> 'response Core_kernel.Std.Bin_prot.Type_class.t
val bin_error : (_, _, 'error) t -> 'error Core_kernel.Std.Bin_prot.Type_class.t
val implement : ('query, 'response, 'error) t -> ('connection_state -> 'query -> aborted:unit Async_kernel.Std.Deferred.t -> ('response Async_kernel.Std.Pipe.Reader.t, 'error) Core_kernel.Std.Result.t Async_kernel.Std.Deferred.t) -> 'connection_state Rpc_kernel.Implementation.t
module Direct_stream_writer : sig .. end
A Direct_stream_writer.t is a simple object for responding to a Pipe_rpc query.
val implement_direct : ('query, 'response, 'error) t -> ('connection_state -> 'query -> 'response Direct_stream_writer.t -> (unit, 'error) Core_kernel.Std.Result.t Async_kernel.Std.Deferred.t) -> 'connection_state Rpc_kernel.Implementation.t

Similar to implement, but you are given the writer instead of providing a writer and the writer is a Direct_stream_writer.t instead of a Pipe.Writer.t.

The main advantage of this interface is that it consumes far less memory per open query.

Though the implementation function is given a writer immediately, the result of the client's call to dispatch will not be determined until after the implementation function returns. Elements written before the function returns will be queued up to be written after the function returns.

This has (..., 'error) Result.t as its return type to represent the possibility of the call itself being somehow erroneous (but understood - the outer Or_error.t encompasses failures of that nature). Note that this cannot be done simply by making 'response a result type, since ('response Pipe.Reader.t, 'error) Result.t is distinct from ('response, 'error) Result.t Pipe.Reader.t.

Closing the pipe has the effect of calling abort.

val dispatch_exn : ('query, 'response, 'error) t -> Rpc_kernel.Connection.t -> 'query -> ('response Async_kernel.Std.Pipe.Reader.t * Metadata.t) Async_kernel.Std.Deferred.t
module Pipe_message : sig .. end
The input type of the f passed to dispatch_iter.
module Pipe_response : sig .. end
The output type of the f passed to dispatch_iter.
val dispatch_iter : ('query, 'response, 'error) t -> Rpc_kernel.Connection.t -> 'query -> f:('response Pipe_message.t -> Pipe_response.t) -> (Id.t, 'error) Core_kernel.Std.Result.t Core_kernel.Std.Or_error.t Async_kernel.Std.Deferred.t

Calling dispatch_iter t conn query ~f is similar to calling dispatch t conn query and then iterating over the result pipe with f. The main advantage it offers is that its memory usage is much lower, making it more suitable for situations where many queries are open at once.

f may be fed any number of Update _ messages, followed by a single Closed _ message.

f can cause the connection to stop reading messages off of its underlying Reader.t by returning Wait _. This is the same as what happens when a client stops reading from the pipe returned by dispatch when the Pipe_rpc.t has client_pushes_back set.

When successful, dispatch_iter returns an Id.t after the subscription is started. This may be fed to abort with the same Pipe_rpc.t and Connection.t as the call to dispatch_iter to cancel the subscription, which will fill in the implementation's aborted Deferred.t. Calling it with a different Pipe_rpc.t or Connection.t has undefined behavior.

val abort : (_, _, _) t -> Rpc_kernel.Connection.t -> Id.t -> unit

abort rpc connection id given an RPC and the id returned as part of a call to dispatch, abort requests that the other side of the connection stop sending updates.

close_reason metadata will be determined sometime after the pipe associated with metadata is closed. Its value will indicate what caused the pipe to be closed.

val name : (_, _, _) t -> string
val version : (_, _, _) t -> int
val description : (_, _, _) t -> Rpc_kernel.Description.t