sig
type ('a, 'phantom) t
type ('a, 'phantom) pipe = ('a, 'phantom) Pipe.t
module Writer :
sig
type phantom
type 'a t = ('a, Pipe.Writer.phantom) Pipe.pipe
val invariant : 'a Pipe.Writer.t -> unit
val sexp_of_t :
('a -> Sexplib.Sexp.t) -> 'a Pipe.Writer.t -> Sexplib.Sexp.t
end
module Reader :
sig
type phantom
type 'a t = ('a, Pipe.Reader.phantom) Pipe.pipe
val invariant : 'a Pipe.Reader.t -> unit
val sexp_of_t :
('a -> Sexplib.Sexp.t) -> 'a Pipe.Reader.t -> Sexplib.Sexp.t
end
val create : unit -> 'a Pipe.Reader.t * 'a Pipe.Writer.t
val init : ('a Pipe.Writer.t -> unit Deferred.t) -> 'a Pipe.Reader.t
val of_list : 'a list -> 'a Pipe.Reader.t
val close : 'a Pipe.Writer.t -> unit
val close_read : 'a Pipe.Reader.t -> unit
val is_closed : ('a, 'b) Pipe.t -> bool
val closed : ('a, 'b) Pipe.t -> unit Deferred.t
module Flushed_result :
sig
type t = [ `Ok | `Reader_closed ]
val sexp_of_t : Pipe.Flushed_result.t -> Sexplib.Sexp.t
end
val upstream_flushed : ('a, 'b) Pipe.t -> Pipe.Flushed_result.t Deferred.t
val downstream_flushed :
('a, 'b) Pipe.t -> Pipe.Flushed_result.t Deferred.t
module Consumer :
sig type t val values_sent_downstream : Pipe.Consumer.t -> unit end
val add_consumer :
'a Pipe.Reader.t ->
downstream_flushed:(unit -> Pipe.Flushed_result.t Deferred.t) ->
Pipe.Consumer.t
val length : ('a, 'b) Pipe.t -> int
val is_empty : ('a, 'b) Pipe.t -> bool
val pushback : 'a Pipe.Writer.t -> unit Deferred.t
val write' : 'a Pipe.Writer.t -> 'a Core.Std.Queue.t -> unit Deferred.t
val write : 'a Pipe.Writer.t -> 'a -> unit Deferred.t
val write_without_pushback' :
'a Pipe.Writer.t -> 'a Core.Std.Queue.t -> unit
val write_without_pushback : 'a Pipe.Writer.t -> 'a -> unit
val write_when_ready :
'a Pipe.Writer.t ->
f:(('a -> unit) -> 'b) -> [ `Closed | `Ok of 'b ] Deferred.t
val read' :
?consumer:Pipe.Consumer.t ->
'a Pipe.Reader.t -> [ `Eof | `Ok of 'a Core.Std.Queue.t ] Deferred.t
val read :
?consumer:Pipe.Consumer.t ->
'a Pipe.Reader.t -> [ `Eof | `Ok of 'a ] Deferred.t
val read_at_most :
?consumer:Pipe.Consumer.t ->
'a Pipe.Reader.t ->
num_values:int -> [ `Eof | `Ok of 'a Core.Std.Queue.t ] Deferred.t
val read_exactly :
?consumer:Pipe.Consumer.t ->
'a Pipe.Reader.t ->
num_values:int ->
[ `Eof | `Exactly of 'a Core.Std.Queue.t | `Fewer of 'a Core.Std.Queue.t ]
Deferred.t
val read_now' :
?consumer:Pipe.Consumer.t ->
'a Pipe.Reader.t ->
[ `Eof | `Nothing_available | `Ok of 'a Core.Std.Queue.t ]
val read_now :
?consumer:Pipe.Consumer.t ->
'a Pipe.Reader.t -> [ `Eof | `Nothing_available | `Ok of 'a ]
val peek : 'a Pipe.Reader.t -> 'a option
val clear : 'a Pipe.Reader.t -> unit
val read_all : 'a Pipe.Reader.t -> 'a Core.Std.Queue.t Deferred.t
val values_available : 'a Pipe.Reader.t -> [ `Eof | `Ok ] Deferred.t
type ('a, 'b, 'c, 'accum) fold =
?consumer:Pipe.Consumer.t ->
'a Pipe.Reader.t ->
init:'accum -> f:('accum -> 'b -> 'c) -> 'accum Deferred.t
val fold' : ('a, 'a Core.Std.Queue.t, 'accum Deferred.t, 'accum) Pipe.fold
val fold : ('a, 'a, 'accum, 'accum) Pipe.fold
type ('a, 'b, 'c) iter =
?consumer:Pipe.Consumer.t ->
?continue_on_error:bool ->
'a Pipe.Reader.t -> f:('b -> 'c) -> unit Deferred.t
val iter' : ('a, 'a Core.Std.Queue.t, unit Deferred.t) Pipe.iter
val iter : ('a, 'a, unit Deferred.t) Pipe.iter
val iter_without_pushback : ('a, 'a, unit) Pipe.iter
val transfer' :
'a Pipe.Reader.t ->
'b Pipe.Writer.t ->
f:('a Core.Std.Queue.t -> 'b Core.Std.Queue.t Deferred.t) ->
unit Deferred.t
val transfer :
'a Pipe.Reader.t -> 'b Pipe.Writer.t -> f:('a -> 'b) -> unit Deferred.t
val transfer_id : 'a Pipe.Reader.t -> 'a Pipe.Writer.t -> unit Deferred.t
val map' :
'a Pipe.Reader.t ->
f:('a Core.Std.Queue.t -> 'b Core.Std.Queue.t Deferred.t) ->
'b Pipe.Reader.t
val map : 'a Pipe.Reader.t -> f:('a -> 'b) -> 'b Pipe.Reader.t
val filter_map' :
'a Pipe.Reader.t -> f:('a -> 'b option Deferred.t) -> 'b Pipe.Reader.t
val filter_map :
'a Pipe.Reader.t -> f:('a -> 'b option) -> 'b Pipe.Reader.t
val filter : 'a Pipe.Reader.t -> f:('a -> bool) -> 'a Pipe.Reader.t
val interleave : 'a Pipe.Reader.t list -> 'a Pipe.Reader.t
val concat : 'a Pipe.Reader.t list -> 'a Pipe.Reader.t
val to_stream_deprecated : 'a Pipe.Reader.t -> 'a Async_stream.t
val of_stream_deprecated : 'a Async_stream.t -> 'a Pipe.Reader.t
val drain : 'a Pipe.Reader.t -> unit Deferred.t
val drain_and_count : 'a Pipe.Reader.t -> int Deferred.t
val to_list : 'a Pipe.Reader.t -> 'a list Deferred.t
val hash : ('a, 'b) Pipe.t -> int
val equal : ('a, 'b) Pipe.t -> ('a, 'b) Pipe.t -> bool
val size_budget : ('a, 'b) Pipe.t -> int
val set_size_budget : ('a, 'b) Pipe.t -> int -> unit
val show_debug_messages : bool Pervasives.ref
val check_invariant : bool Pervasives.ref
val sexp_of_t :
('a -> Sexplib.Sexp.t) ->
('phantom -> Sexplib.Sexp.t) -> ('a, 'phantom) Pipe.t -> Sexplib.Sexp.t
val sexp_of_pipe :
('a -> Sexplib.Sexp.t) ->
('phantom -> Sexplib.Sexp.t) ->
('a, 'phantom) Pipe.pipe -> Sexplib.Sexp.t
end