Up

module Udp

: sig

A grab-bag of performance-oriented, UDP-oriented network tools. These provide some convenience, but they are more complex than basic applications require.

Defaults are chosen for typical UDP applications. Buffering is via Iobuf conventions, where a typical packet-handling loop iteration is read-flip_lo-process-reset.

While these functions are oriented toward UDP, they work with any files that satisfy Fd.supports_nonblock.

For zero-copy Bigstring.t transfers, we must ensure no buffering between the receive loop and caller. So, an interface like Tcp.connect, with something like (Bigstring.t * Socket.Address.Inet.t) Pipe.Reader.t, won't work.

Instead, we use synchronous callbacks.

#
type write_buffer = (Core.Std.read_write, Core.Std.Iobuf.seek) Core.Std.Iobuf.t
#
val default_capacity : int

The default buffer capacity for UDP-oriented buffers is 1472, determined as the typical Ethernet MTU (1500 octets) less the typical UDP header length (28). Using buffers of this size, one avoids accidentally creating messages that will be dropped on send because they exceed the MTU, and can receive the largest corresponding UDP message.

While this number is merely typical and not guaranteed to work in all cases, defining it in one place makes it easy to share and change. For example, another MTU in common use is 9000 for Jumbo frames, so the value of default_capacity might change to 8972 in the future.

#
module Config : sig

A typical receive loop calls before before calling its callback to prepare a packet buffer for reading and after afterwards to prepare for writing (for the next iteration).

One can specify ~before:ignore or ~after:ignore to disable the default action, as when doing buffer management in the callback. One can also specify an action, such as ~after:Iobuf.compact for use with read_loop on a connection-oriented socket or file. It's often convenient to use the same interface for UDP, TCP, and file variants of the same protocol.

stop terminates a typical loop as soon as possible, when it becomes determined.

max_ready limits the number of receive loop iterations within an Fd.every_ready_to iteration, to prevent starvation of other Async jobs.

#
type t = {
# capacity
: int;
# init
: write_buffer;
# before
: write_buffer -> unit;
# after
: write_buffer -> unit;
# stop
: unit Import.Deferred.t;
# max_ready
: int;
}
#
val max_ready : t -> int
#
val stop : t -> unit Import.Deferred.t
#
val after : t -> write_buffer -> unit
#
val before : t -> write_buffer -> unit
#
val init : t -> write_buffer
#
val capacity : t -> int
#
module Fields : sig
#
val names : string list
#
val max_ready : (t, int) Fieldslib.Field.t
#
val stop : (t, unit Import.Deferred.t) Fieldslib.Field.t
#
val after : (t, write_buffer -> unit) Fieldslib.Field.t
#
val before : (t, write_buffer -> unit) Fieldslib.Field.t
#
val init : (t, write_buffer) Fieldslib.Field.t
#
val capacity : (t, int) Fieldslib.Field.t
#
val fold : init:'acc__ -> capacity:('acc__ -> (t, int) Fieldslib.Field.t -> 'acc__) -> init:('acc__ -> (t, write_buffer) Fieldslib.Field.t -> 'acc__) -> before:('acc__ -> (t, write_buffer -> unit) Fieldslib.Field.t -> 'acc__) -> after:('acc__ -> (t, write_buffer -> unit) Fieldslib.Field.t -> 'acc__) -> stop:('acc__ -> (t, unit Import.Deferred.t) Fieldslib.Field.t -> 'acc__) -> max_ready:('acc__ -> (t, int) Fieldslib.Field.t -> 'acc__) -> 'acc__
#
val make_creator : capacity:((t, int) Fieldslib.Field.t -> 'compile_acc__ -> ('input__ -> int) * 'compile_acc__) -> init:((t, write_buffer) Fieldslib.Field.t -> 'compile_acc__ -> ('input__ -> write_buffer) * 'compile_acc__) -> before:((t, write_buffer -> unit) Fieldslib.Field.t -> 'compile_acc__ -> ('input__ -> write_buffer -> unit) * 'compile_acc__) -> after:((t, write_buffer -> unit) Fieldslib.Field.t -> 'compile_acc__ -> ('input__ -> write_buffer -> unit) * 'compile_acc__) -> stop:((t, unit Import.Deferred.t) Fieldslib.Field.t -> 'compile_acc__ -> ('input__ -> unit Import.Deferred.t) * 'compile_acc__) -> max_ready:((t, int) Fieldslib.Field.t -> 'compile_acc__ -> ('input__ -> int) * 'compile_acc__) -> 'compile_acc__ -> ('input__ -> t) * 'compile_acc__
#
val create : capacity:int -> init:write_buffer -> before:(write_buffer -> unit) -> after:(write_buffer -> unit) -> stop:unit Import.Deferred.t -> max_ready:int -> t
#
val map : capacity:((t, int) Fieldslib.Field.t -> int) -> init:((t, write_buffer) Fieldslib.Field.t -> write_buffer) -> before:((t, write_buffer -> unit) Fieldslib.Field.t -> write_buffer -> unit) -> after:((t, write_buffer -> unit) Fieldslib.Field.t -> write_buffer -> unit) -> stop:((t, unit Import.Deferred.t) Fieldslib.Field.t -> unit Import.Deferred.t) -> max_ready:((t, int) Fieldslib.Field.t -> int) -> t
#
val iter : capacity:((t, int) Fieldslib.Field.t -> unit) -> init:((t, write_buffer) Fieldslib.Field.t -> unit) -> before:((t, write_buffer -> unit) Fieldslib.Field.t -> unit) -> after:((t, write_buffer -> unit) Fieldslib.Field.t -> unit) -> stop:((t, unit Import.Deferred.t) Fieldslib.Field.t -> unit) -> max_ready:((t, int) Fieldslib.Field.t -> unit) -> unit
#
val for_all : capacity:((t, int) Fieldslib.Field.t -> bool) -> init:((t, write_buffer) Fieldslib.Field.t -> bool) -> before:((t, write_buffer -> unit) Fieldslib.Field.t -> bool) -> after:((t, write_buffer -> unit) Fieldslib.Field.t -> bool) -> stop:((t, unit Import.Deferred.t) Fieldslib.Field.t -> bool) -> max_ready:((t, int) Fieldslib.Field.t -> bool) -> bool
#
val exists : capacity:((t, int) Fieldslib.Field.t -> bool) -> init:((t, write_buffer) Fieldslib.Field.t -> bool) -> before:((t, write_buffer -> unit) Fieldslib.Field.t -> bool) -> after:((t, write_buffer -> unit) Fieldslib.Field.t -> bool) -> stop:((t, unit Import.Deferred.t) Fieldslib.Field.t -> bool) -> max_ready:((t, int) Fieldslib.Field.t -> bool) -> bool
#
val to_list : capacity:((t, int) Fieldslib.Field.t -> 'elem__) -> init:((t, write_buffer) Fieldslib.Field.t -> 'elem__) -> before:((t, write_buffer -> unit) Fieldslib.Field.t -> 'elem__) -> after:((t, write_buffer -> unit) Fieldslib.Field.t -> 'elem__) -> stop:((t, unit Import.Deferred.t) Fieldslib.Field.t -> 'elem__) -> max_ready:((t, int) Fieldslib.Field.t -> 'elem__) -> 'elem__ list
#
val map_poly : ([<
| `Read
| `Set_and_create
], t, 'x0) Fieldslib.Field.user -> 'x0 list
#
module Direct : sig
#
val iter : t -> capacity:((t, int) Fieldslib.Field.t -> t -> int -> unit) -> init:((t, write_buffer) Fieldslib.Field.t -> t -> write_buffer -> unit) -> before:((t, write_buffer -> unit) Fieldslib.Field.t -> t -> (write_buffer -> unit) -> unit) -> after:((t, write_buffer -> unit) Fieldslib.Field.t -> t -> (write_buffer -> unit) -> unit) -> stop:((t, unit Import.Deferred.t) Fieldslib.Field.t -> t -> unit Import.Deferred.t -> unit) -> max_ready:((t, int) Fieldslib.Field.t -> t -> int -> unit) -> unit
#
val fold : t -> init:'acc__ -> capacity:('acc__ -> (t, int) Fieldslib.Field.t -> t -> int -> 'acc__) -> init:('acc__ -> (t, write_buffer) Fieldslib.Field.t -> t -> write_buffer -> 'acc__) -> before:('acc__ -> (t, write_buffer -> unit) Fieldslib.Field.t -> t -> (write_buffer -> unit) -> 'acc__) -> after:('acc__ -> (t, write_buffer -> unit) Fieldslib.Field.t -> t -> (write_buffer -> unit) -> 'acc__) -> stop:('acc__ -> (t, unit Import.Deferred.t) Fieldslib.Field.t -> t -> unit Import.Deferred.t -> 'acc__) -> max_ready:('acc__ -> (t, int) Fieldslib.Field.t -> t -> int -> 'acc__) -> 'acc__
end
end
#
val create : ?capacity:int -> ?init:write_buffer -> ?before:(write_buffer -> unit) -> ?after:(write_buffer -> unit) -> ?stop:unit Import.Deferred.t -> ?max_ready:int -> unit -> t
end
#
val sendto_sync : unit -> (Import.Fd.t -> ([>
| Core.Std.read
], Core.Std.Iobuf.seek) Core.Std.Iobuf.t -> Import.Socket.Address.Inet.t -> [
| `Not_ready
| `Ok
]) Core.Std.Or_error.t

sendto_sync sock buf addr does not try again if sock is not ready to write. Instead, it returns `Not_ready immediately.

Short writes are distinguished by buf not being empty afterward.

sendto sock buf addr retries if sock is not ready to write.

Raises Unix_error in the case of output errors. See also [root:Iobuf].sendto_nonblocking_no_sigpipe and [root:Bigstring].sendto_nonblocking_no_sigpipe.
#
val sendto : unit -> (Import.Fd.t -> ([>
| Core.Std.read
], Core.Std.Iobuf.seek) Core.Std.Iobuf.t -> Import.Socket.Address.Inet.t -> unit Import.Deferred.t) Core.Std.Or_error.t
#
val bind : ?ifname:string -> Import.Socket.Address.Inet.t -> ([
| `Bound
], Import.Socket.Address.Inet.t) Import.Socket.t Import.Deferred.t
#
val bind_any : unit -> ([
| `Bound
], Import.Socket.Address.Inet.t) Import.Socket.t Import.Deferred.t
#
val recvfrom_loop : ?config:Config.t -> Import.Fd.t -> (write_buffer -> Import.Socket.Address.Inet.t -> unit) -> unit Import.Deferred.t

recvfrom_loop_with_buffer_replacement callback calls callback synchronously on each message received. callback returns the packet buffer for subsequent iterations, so it can replace the initial packet buffer when necessary. This enables immediate buffer reuse in the common case and fallback to allocation if we want to save the packet buffer for asynchronous processing.

#
val recvfrom_loop_with_buffer_replacement : ?config:Config.t -> Import.Fd.t -> (write_buffer -> Import.Socket.Address.Inet.t -> write_buffer) -> write_buffer Import.Deferred.t
#
val read_loop : ?config:Config.t -> Import.Fd.t -> (write_buffer -> unit) -> unit Import.Deferred.t
#
val read_loop_with_buffer_replacement : ?config:Config.t -> Import.Fd.t -> (write_buffer -> write_buffer) -> write_buffer Import.Deferred.t
#
val recvmmsg_loop : (?config:Config.t -> ?create_srcs:bool -> ?max_count:int -> ?bufs:write_buffer array -> ?on_wouldblock:(unit -> unit) -> Import.Fd.t -> (?srcs:Core.Std.Unix.sockaddr array -> write_buffer array -> count:int -> unit) -> unit Import.Deferred.t) Core.Std.Or_error.t

recvmmsg_loop ~socket callback iteratively receives up to max_count packets at a time on socket and passes them to callback. Each packet is up to capacity bytes. If create_srcs, collect from-addresses there.

callback ?srcs bufs ~count processes count packets synchronously. callback may replace packet buffers in bufs and take ownership of the corresponding originals. srcs contains the corresponding source addresses of the packets in bufs, if requested, and will similarly be reused when callback returns.

Config.init config is used as a prototype for bufs and as one of the elements.

#
val recvmmsg_no_sources_loop : (?config:Config.t -> Import.Fd.t -> ?max_count:int -> ?bufs:write_buffer array -> ?on_wouldblock:(unit -> unit) -> (write_buffer array -> count:int -> unit) -> unit Import.Deferred.t) Core.Std.Or_error.t

recvmmsg_no_sources_loop ~socket callback is identical to recvmmsg_loop, but can be used when sources are ignored to avoid some overhead incurred by optional arguments.

end