A Consumer
is used to augment our notion of flushing (Pipe.upstream_flushed
and
Pipe.downstream_flushed
) to include the time spent processing an element once it
has been removed from the pipe. It can be thought of as sitting at the end of a
pipe, or between two pipes, and it provides more detailed feedback on the time an
element spends outside of the pipe proper. So we have the following two cases:
Pipe --> Consumer Pipe --> Consumer --> Pipe --> ...
The time outside of the pipe can be broken down into two parts: a part (probably short-lived) during which the consumer processes the elements in some way, and a downstream portion where the consumer acts as a sentinel to report when the element has been fully processed.
For instance, consider the simple case of a pipe attached to an Async.Writer
that is writing elements to disk. Part one would be whatever transform the consumer
applies to the elements in the pipe before it hands them off to the writer, and part
two would be waiting for the writer to finish writing the transformed element to
disk. A more complex case is chaining two pipes together (maybe with a transform
like map
). Part one in this case is the transform and the write to the downstream
pipe, and part two is waiting for that pipe (and any further pipes in the chain) to
flush.
In each case the consumer is responsible for indicating when:
read
and read'
) and calling values_sent_downstream
when it has
sent the values downstream.~downstream_flushed
when add_consumer
is called).If a reader does not use a consumer to do the reading then an element is considered
flushed the moment it leaves the pipe. This may lead to odd results as entire
queues of elements are removed by a call to read'
but are processed over a long
period.
val values_sent_downstream : t ‑> unit