Module Async_udp

A grab-bag of performance-oriented, UDP-oriented network tools. These provide some convenience, but they are more complex than basic applications require.

Defaults are chosen for typical UDP applications. Buffering is via Iobuf conventions, where a typical packet-handling loop iteration is read -> flip_lo -> process -> reset.

While these functions are oriented toward UDP, they work with any files that satisfy Fd.supports_nonblock.

For zero-copy Bigstring.t transfers, we must ensure no buffering between the receive loop and caller. So an interface like Tcp.connect, with something like (Bigstring.t * Socket.Address.Inet.t) Pipe.Reader.t, won't work. Instead, we use synchronous callbacks.

type write_buffer = (Core.read_writeIobuf.seek) Iobuf.t
val default_capacity : int

The default buffer capacity for UDP-oriented buffers is 1472, determined as the typical Ethernet MTU (1500 octets) less the typical UDP header length (28). Using buffers of this size, one avoids accidentally creating messages that will be dropped on send because they exceed the MTU, and can receive the largest corresponding UDP message.

While this number is merely typical and not guaranteed to work in all cases, defining it in one place makes it easy to share and change. For example, another MTU in common use is 9000 for Jumbo frames, so the value of default_capacity might change to 8972 in the future.

module Config : sig ... end

A typical receive loop implicitly calls Iobuf.flip_lo before calling its callback to prepare a packet buffer for reading by the callback and Iobuf.reset afterward to prepare for the next iteration.

val sendto_sync : unit -> (Async.Fd.t -> ([> Core.read ]Iobuf.seek) Iobuf.t -> Async.Socket.Address.Inet.t -> Async.Unix.Syscall_result.Unit.t) Core.Or_error.t

sendto_sync sock buf addr does not try again if sock is not ready to write. Instead, it returns EWOULDBLOCK immediately.

Short writes are distinguished by buf not being empty afterward.

See also Iobuf.sendto_nonblocking_no_sigpipe and Bigstring.sendto_nonblocking_no_sigpipe.

raises Failure

on internal errors but return Unix.error via Unix.Syscall_result.Unit.t rather than raising Unix_error.

val send_sync : unit -> (Async.Fd.t -> ([> Core.read ]Iobuf.seek) Iobuf.t -> Async.Unix.Syscall_result.Unit.t) Core.Or_error.t

send_sync sock buf has identical semantics to sendto_sync, but is intended for connected UDP sockets (and therefore does not require a "to" address).

See also Iobuf.send_nonblocking_no_sigpipe and Bigstring.send_nonblocking_no_sigpipe.

raises Failure

on internal errors but return Unix.error via Unix.Syscall_result.Unit.t rather than raising Unix_error.

val sendto : unit -> (Async.Fd.t -> ([> Core.read ]Iobuf.seek) Iobuf.t -> Async.Socket.Address.Inet.t -> unit Async.Deferred.t) Core.Or_error.t

sendto sock buf addr retries if sock is not ready to write.

raises Unix_error

in the case of Unix output errors and Failure on internal errors.

val send : unit -> (Async.Fd.t -> ([> Core.read ]Iobuf.seek) Iobuf.t -> unit Async.Deferred.t) Core.Or_error.t

send sock buf retries if sock is not ready to write.

raises Unix_error

in the case of Unix output errors and Failure on internal errors.

val bind : ?⁠ifname:string -> ?⁠source:Async.Unix.Inet_addr.t -> ?⁠reuseaddr:bool -> Async.Socket.Address.Inet.t -> ([ `Bound ]Async.Socket.Address.Inet.t) Async.Socket.t

bind ?ifname ?source address creates a socket bound to address, and if address is multicast address,

  • joins the multicast group.
  • if ifname is specified, joins the multicast group on ifname.
  • if source is specified, joins the multicast group with source.
val bind_any : unit -> ([ `Bound ]Async.Socket.Address.Inet.t) Async.Socket.t
module Loop_result : sig ... end
val recvfrom_loop : ?⁠config:Config.t -> Async.Fd.t -> (write_buffer -> Async.Socket.Address.Inet.t -> unit) -> Loop_result.t Async.Deferred.t

Loops, including recvfrom_loop, terminate normally when the socket is closed.

val recvfrom_loop_with_buffer_replacement : ?⁠config:Config.t -> Async.Fd.t -> (write_buffer -> Async.Socket.Address.Inet.t -> write_buffer) -> Loop_result.t Async.Deferred.t

recvfrom_loop_with_buffer_replacement callback calls callback synchronously on each message received. callback returns the packet buffer for subsequent iterations, so it can replace the initial packet buffer when necessary. This enables immediate buffer reuse in the common case and fallback to allocation if we want to save the packet buffer for asynchronous processing.

val read_loop : ?⁠config:Config.t -> Async.Fd.t -> (write_buffer -> unit) -> Loop_result.t Async.Deferred.t
val read_loop_with_buffer_replacement : ?⁠config:Config.t -> Async.Fd.t -> (write_buffer -> write_buffer) -> Loop_result.t Async.Deferred.t
val recvmmsg_loop : (?⁠config:Config.t -> ?⁠max_count:int -> ?⁠on_wouldblock:(unit -> unit) -> Async.Fd.t -> (write_buffer array -> count:int -> unit) -> Loop_result.t Async.Deferred.t) Core.Or_error.t

recvmmsg_loop ~socket callback iteratively receives up to max_count packets at a time on socket and passes them to callback. Each packet is up to Iobuf.capacity bytes.

callback bufs ~count processes count packets synchronously.

Config.init config is used as a prototype for bufs and as one of the elements.

val default_recvmmsg_loop_max_count : int
module Ready_iter : sig ... end
val custom_on_readable_loop : ?⁠config:Config.t -> Async.Fd.t -> syscall_name:string -> f:(Core.Unix.File_descr.t -> Ready_iter.t) -> Loop_result.t Async.Deferred.t

If you need a custom read operation on an FD that we don't provide, use this to turn it into a loop with the same features as our other APIs.