Up

Module Consumer

Signature

type t

A Consumer is used to augment our notion of flushing (Pipe.upstream_flushed and Pipe.downstream_flushed) to include the time spent processing an element once it has been removed from the pipe. It can be thought of as sitting at the end of a pipe, or between two pipes, and it provides more detailed feedback on the time an element spends outside of the pipe proper. So we have the following two cases:

        Pipe --> Consumer
        Pipe --> Consumer --> Pipe --> ...
      

The time outside of the pipe can be broken down into two parts: a part (probably short lived) during which the consumer processes the elements in some way, and a downstream portion where the consumer acts as a sentinel to report when the element has been fully processed.

For instance, consider the simple case of a pipe attached to an Async.Std.Writer that is writing elements to disk. Part one would be whatever transform the consumer applies to the elements in the pipe before it hands them off to the writer, and part two would be waiting for the writer to finish writing the transformed element to disk. A more complex case is chaining two pipes together (maybe with a transform like map). Part one in this case is the transform and the write to the downstream pipe, and part two is waiting for that pipe (and any further pipes in the chain) to flush.

In each case the consumer is responsible for indicating when:

  • it has finished any local work (by attaching itself to elements via the ~consumer argument to read and read') and calling values_sent_downstream when it has sent the values downstream.
  • when any further processing has been completed (by providing an appropriate function to ~downstream_flushed when add_consumer is called).

If a reader does not use a consumer to do the reading then an element is considered flushed the moment it leaves the pipe. This may lead to odd results as entire queues of elements are removed by a call to read' but are processed over a long period.

val values_sent_downstream : t -> unit