aboutsummaryrefslogtreecommitdiff
path: root/src/data
diff options
context:
space:
mode:
authorCrozet Sébastien <developer@crozet.re>2020-10-05 19:04:18 +0200
committerCrozet Sébastien <developer@crozet.re>2020-10-05 19:04:18 +0200
commit93aa7b6e1e8cbfd73542ed10ad5c26ae0a8b9848 (patch)
tree5d602450c5b5e1c0c08eeffd3196b373b4312a08 /src/data
parent2d0a888484dd296cc785caf978252dd97b58e10a (diff)
downloadrapier-93aa7b6e1e8cbfd73542ed10ad5c26ae0a8b9848.tar.gz
rapier-93aa7b6e1e8cbfd73542ed10ad5c26ae0a8b9848.tar.bz2
rapier-93aa7b6e1e8cbfd73542ed10ad5c26ae0a8b9848.zip
Use the publish-subscribe mechanism to handle collider removals across pipelines.
Diffstat (limited to 'src/data')
-rw-r--r--src/data/mod.rs1
-rw-r--r--src/data/pubsub.rs19
2 files changed, 15 insertions, 5 deletions
diff --git a/src/data/mod.rs b/src/data/mod.rs
index 7c00442..5d3efa6 100644
--- a/src/data/mod.rs
+++ b/src/data/mod.rs
@@ -2,3 +2,4 @@
pub mod arena;
pub(crate) mod graph;
+pub mod pubsub;
diff --git a/src/data/pubsub.rs b/src/data/pubsub.rs
index b3b263c..1ac7498 100644
--- a/src/data/pubsub.rs
+++ b/src/data/pubsub.rs
@@ -1,16 +1,18 @@
//! Publish-subscribe mechanism for internal events.
+use serde::export::PhantomData;
use std::collections::VecDeque;
/// The position of a subscriber on a pub-sub queue.
#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
-pub struct PubSubCursor {
+pub struct PubSubCursor<T> {
// Index of the next message to read.
id: u32,
next: u32,
+ _phantom: PhantomData<T>,
}
-impl PubSubCursor {
+impl<T> PubSubCursor<T> {
fn id(&self, num_deleted: u32) -> usize {
(self.id - num_deleted) as usize
}
@@ -53,18 +55,25 @@ impl<T> PubSub<T> {
/// Subscribe to the queue.
///
/// A subscription cannot be cancelled.
- pub fn subscribe(&mut self) -> PubSubCursor {
+ pub fn subscribe(&mut self) -> PubSubCursor<T> {
let cursor = PubSubCursor {
next: self.messages.len() as u32 + self.deleted_messages,
id: self.offsets.len() as u32 + self.deleted_offsets,
+ _phantom: PhantomData,
};
self.offsets.push_back(cursor.next);
cursor
}
+ /// Read the i-th message not yet read by the given subsciber.
+ pub fn read_ith(&self, cursor: &PubSubCursor<T>, i: usize) -> Option<&T> {
+ self.messages
+ .get(cursor.next(self.deleted_messages) as usize + i)
+ }
+
/// Get the messages not yet read by the given subscriber.
- pub fn read(&self, cursor: &PubSubCursor) -> impl Iterator<Item = &T> {
+ pub fn read(&self, cursor: &PubSubCursor<T>) -> impl Iterator<Item = &T> {
let next = cursor.next(self.deleted_messages);
// TODO: use self.queue.range(next..) once it is stabilised.
@@ -77,7 +86,7 @@ impl<T> PubSub<T> {
/// Makes the given subscribe acknowledge all the messages in the queue.
///
/// A subscriber cannot read acknowledged messages any more.
- pub fn ack(&mut self, cursor: &mut PubSubCursor) {
+ pub fn ack(&mut self, cursor: &mut PubSubCursor<T>) {
// Update the cursor.
cursor.next = self.messages.len() as u32 + self.deleted_messages;
self.offsets[cursor.id(self.deleted_offsets)] = u32::MAX;