Amqp_client_eio.Stream
Extension to Eio.Stream, which allows closing the stream. A closed stream might still hold messages. Once the last message is taken off a close stream, the stream will raise a user defined exception.
Posting to or closing a closed stream will raise same exception.
This is a far from perfect implementation:
This should be re-written to use a promise for indicating that the channel has been closed. This way any pending publishers can be cancelled. Still need to figure out how reliably signal consumers. Well these could wait also wait on the promise. If the promise is ready, they check if there are more messages. ( Could start with a nonblocking read before checking ). Would be so easy if we had a switch (and could fork). We only want one message to be posted.
type 'a t = {
mutex : Stdlib.Mutex.t; | |
id : Eio.Private.Ctf.id; | |
capacity : int; | |
readers : 'a item Eio.Private.Waiters.t; | |
writers : ( unit, exn ) Stdlib.result Eio.Private.Waiters.t; | |
items : 'a Stdlib.Queue.t; | |
condition : Stdlib.Condition.t; | |
mutable flow : bool; | (* If false, receiver will be blocked receiving from the queue *) |
mutable closed : exn option; |
}
val create : ?capacity:int -> unit -> 'a t
val send : 'a t -> ?force:bool -> 'b -> unit
Push a message onto the stream.
val wait_empty : 'a t -> unit
val receive : 'a t -> 'b
Pop the first element of the stream.
val close : ?message:'a -> 'b t -> exn -> unit
Close the stream. Reading from a closed stream will raise the close exception once empty, if the stream was closed with notify_consumers
. Closing an already closed stream does nothing (and will not update the close reason).
val is_empty : 'a t -> bool
val is_full : 'a t -> bool
val is_closed : 'a t -> bool