aboutsummaryrefslogtreecommitdiff
path: root/src/data
diff options
context:
space:
mode:
Diffstat (limited to 'src/data')
-rw-r--r--src/data/mod.rs1
-rw-r--r--src/data/pubsub.rs19
2 files changed, 15 insertions, 5 deletions
diff --git a/src/data/mod.rs b/src/data/mod.rs
index 7c00442..5d3efa6 100644
--- a/src/data/mod.rs
+++ b/src/data/mod.rs
@@ -2,3 +2,4 @@
pub mod arena;
pub(crate) mod graph;
+pub mod pubsub;
diff --git a/src/data/pubsub.rs b/src/data/pubsub.rs
index b3b263c..1ac7498 100644
--- a/src/data/pubsub.rs
+++ b/src/data/pubsub.rs
@@ -1,16 +1,18 @@
//! Publish-subscribe mechanism for internal events.
+use serde::export::PhantomData;
use std::collections::VecDeque;
/// The position of a subscriber on a pub-sub queue.
#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
-pub struct PubSubCursor {
+pub struct PubSubCursor<T> {
// Index of the next message to read.
id: u32,
next: u32,
+ _phantom: PhantomData<T>,
}
-impl PubSubCursor {
+impl<T> PubSubCursor<T> {
fn id(&self, num_deleted: u32) -> usize {
(self.id - num_deleted) as usize
}
@@ -53,18 +55,25 @@ impl<T> PubSub<T> {
/// Subscribe to the queue.
///
/// A subscription cannot be cancelled.
- pub fn subscribe(&mut self) -> PubSubCursor {
+ pub fn subscribe(&mut self) -> PubSubCursor<T> {
let cursor = PubSubCursor {
next: self.messages.len() as u32 + self.deleted_messages,
id: self.offsets.len() as u32 + self.deleted_offsets,
+ _phantom: PhantomData,
};
self.offsets.push_back(cursor.next);
cursor
}
+ /// Read the i-th message not yet read by the given subsciber.
+ pub fn read_ith(&self, cursor: &PubSubCursor<T>, i: usize) -> Option<&T> {
+ self.messages
+ .get(cursor.next(self.deleted_messages) as usize + i)
+ }
+
/// Get the messages not yet read by the given subscriber.
- pub fn read(&self, cursor: &PubSubCursor) -> impl Iterator<Item = &T> {
+ pub fn read(&self, cursor: &PubSubCursor<T>) -> impl Iterator<Item = &T> {
let next = cursor.next(self.deleted_messages);
// TODO: use self.queue.range(next..) once it is stabilised.
@@ -77,7 +86,7 @@ impl<T> PubSub<T> {
/// Makes the given subscribe acknowledge all the messages in the queue.
///
/// A subscriber cannot read acknowledged messages any more.
- pub fn ack(&mut self, cursor: &mut PubSubCursor) {
+ pub fn ack(&mut self, cursor: &mut PubSubCursor<T>) {
// Update the cursor.
cursor.next = self.messages.len() as u32 + self.deleted_messages;
self.offsets[cursor.id(self.deleted_offsets)] = u32::MAX;