module Consumer:sig
..end
type
t
Consumer
is used to augment our notion of flushing (Pipe.flushed
) to include
the time spent processing an element once it has been removed from the pipe. It can
be thought of as sitting at the end of a pipe, or between two pipes, and it provides
more detailed feedback on the time an element spends outside of the pipe proper.
So we have the following two cases:
Pipe --> Consumer Pipe --> Consumer --> Pipe --> ...
The time outside of the pipe can be broken down into two parts: a part (probably short lived) during which the consumer processes the elements in some way, and a downstream portion where the consumer acts as a sentinal to report when the element has been fully processed.
For instance, consider the simple case of a pipe attached to an Async.Std.Writer
that is writing elements to disk. Part one would be whatever transform the consumer
applies to the elements in the pipe before it hands them off to the writer, and part
two would be waiting for the writer to finish writing the transformed element to
disk. A more complex case is chaining two pipes together (maybe with a transform
like map
). Part one in this case is the transform and the write to the downstream
pipe, and part two is waiting for that pipe (and any further pipes in the chain) to
flush.
In each case the consumer is responsible for indicating when:
read
and read'
) and calling values_sent_downstream
when it has
sent the values downstream.~downstream_flushed
when add_consumer
is called).read'
but are processed over a long
period.val values_sent_downstream : t -> unit