Module Amqp_client_eio.Stream

Extension to Eio.Stream, which allows closing the stream. A closed stream might still hold messages. Once the last message is taken off a close stream, the stream will raise a user defined exception.

Posting to or closing a closed stream will raise same exception.

note

This is a far from perfect implementation:

This should be re-written to use a promise for indicating that the channel has been closed. This way any pending publishers can be cancelled. Still need to figure out how reliably signal consumers. Well these could wait also wait on the promise. If the promise is ready, they check if there are more messages. ( Could start with a nonblocking read before checking ). Would be so easy if we had a switch (and could fork). We only want one message to be posted.

type 'a item = ( 'a, exn ) Stdlib.result
type 'a t = {
mutex : Stdlib.Mutex.t;
id : Eio.Private.Ctf.id;
capacity : int;
readers : 'a item Eio.Private.Waiters.t;
writers : ( unit, exn ) Stdlib.result Eio.Private.Waiters.t;
items : 'a Stdlib.Queue.t;
condition : Stdlib.Condition.t;
mutable flow : bool;(*

If false, receiver will be blocked receiving from the queue

*)
mutable closed : exn option;
}
val with_lock : Stdlib.Mutex.t -> ( unit -> 'a ) -> 'b
val create : ?capacity:int -> unit -> 'a t
val send : 'a t -> ?force:bool -> 'b -> unit

Push a message onto the stream.

  • raises Closed

    if the stream has been closed

  • parameter force

    if true, ignore max_capacity and send will not block

val wait_empty : 'a t -> unit
val receive : 'a t -> 'b

Pop the first element of the stream.

  • raises exception

    if the stream has been closed, and there are no more message on the stream

val close : ?message:'a -> 'b t -> exn -> unit

Close the stream. Reading from a closed stream will raise the close exception once empty, if the stream was closed with notify_consumers. Closing an already closed stream does nothing (and will not update the close reason).

  • parameter message

    Post a message onto the queue after its closed, guaranteeing that its the last message on the queue (weak guarantee). Note that this might block until the stream has room

val is_empty : 'a t -> bool
val is_full : 'a t -> bool
val is_closed : 'a t -> bool