Module Rpc_parallel__.Map_reduce
module Config : sig ... end
module type Worker = sig ... end
Map functions
module type Map_function = sig ... end
module type Map_function_with_init_spec = sig ... end
module Make_map_function_with_init : functor (S : Map_function_with_init_spec) -> Map_function with type Param.t = S.Param.t and type Input.t = S.Input.t and type Output.t = S.Output.t
module type Map_function_spec = sig ... end
module Make_map_function : functor (S : Map_function_spec) -> Map_function with type Param.t = unit and type Input.t = S.Input.t and type Output.t = S.Output.t
val map_unordered : Config.t -> 'a Async.Pipe.Reader.t -> m:(module Map_function with type Input.t = 'a and type Output.t = 'b and type Param.t = 'param) -> param:'param -> ('b * int) Async.Pipe.Reader.t Async.Deferred.t
The
map_unordered
operation takes'a Pipe.Reader.t
along with aMap_function
and sends the'a
values to workers for mapping. Each pair in the resulting('b * int) Pipe.Reader.t
contains the mapped value and the index of the value in the input pipe.
val map : Config.t -> 'a Async.Pipe.Reader.t -> m:(module Map_function with type Input.t = 'a and type Output.t = 'b and type Param.t = 'param) -> param:'param -> 'b Async.Pipe.Reader.t Async.Deferred.t
The
map
operation is similar tomap_unordered
, but the result is a'b Pipe.Reader.t
where the mapped values are guaranteed to be in the same order as the input values.
val find_map : Config.t -> 'a Async.Pipe.Reader.t -> m:(module Map_function with type Input.t = 'a and type Output.t = 'b option and type Param.t = 'param) -> param:'param -> 'b option Async.Deferred.t
The
find_map
operation takes'a Pipe.Reader.t
along with aMap_function
that returns'b option
values. As soon asmap
returnsSome value
, all workers are stopped andSome value
is returned. Ifmap
never returnsSome value
thenNone
is returned. If more than one worker returnsSome value
, one value is chosen arbitrarily and returned.
Map-reduce
functions
module type Map_reduce_function = sig ... end
module type Map_reduce_function_with_init_spec = sig ... end
module Make_map_reduce_function_with_init : functor (S : Map_reduce_function_with_init_spec) -> Map_reduce_function with type Param.t = S.Param.t and type Accum.t = S.Accum.t and type Input.t = S.Input.t
module type Map_reduce_function_spec = sig ... end
module Make_map_reduce_function : functor (S : Map_reduce_function_spec) -> Map_reduce_function with type Param.t = unit and type Accum.t = S.Accum.t and type Input.t = S.Input.t
val map_reduce_commutative : Config.t -> 'a Async.Pipe.Reader.t -> m:(module Map_reduce_function with type Accum.t = 'accum and type Input.t = 'a and type Param.t = 'param) -> param:'param -> 'accum option Async.Deferred.t
The
map_reduce_commutative
operation takes'a Pipe.Reader.t
along with aMap_reduce_function
and applies themap
function to'a
values (in an unspecified order), resulting in'accum
values. Thecombine
function is then called to combine the'accum
values (in an unspecified order) into a single'accum
value. Commutative map-reduce assumes thatcombine
is associative and commutative.
val map_reduce : Config.t -> 'a Async.Pipe.Reader.t -> m:(module Map_reduce_function with type Accum.t = 'accum and type Input.t = 'a and type Param.t = 'param) -> param:'param -> 'accum option Async.Deferred.t
The
map_reduce
operation makes strong guarantees about the order in which the values are processed bycombine
. For a list a_0, a_1, a_2, ..., a_n of'a
values, the noncommutative map-reduce operation applies themap
function to produceacc_{i,i+1}
from eacha_i
. Thecombine
function is used to computecombine acc_{i,j} acc_{j,k}
for i<j<k, producingacc_{i,k}
. Themap
andcombine
functions are called repeatedly until the entire list is reduced to a singleacc_{0,n+1}
value. Noncommutative map-reduce assumes thatcombine
is associative.