aboutsummaryrefslogtreecommitdiff
path: root/src/data
diff options
context:
space:
mode:
authorSébastien Crozet <developer@crozet.re>2020-10-06 16:53:54 +0200
committerGitHub <noreply@github.com>2020-10-06 16:53:54 +0200
commit24a25f8ae7a62c5c5afa24825b063fbb1b603922 (patch)
tree5302f5282fe963b72dbd9e94f422994b6ab11eca /src/data
parent99f28ba4b4a14254b4160a191cbeb15211cdd2d2 (diff)
parent25b8486ebf8bdfa0d165300a30877293e9e40c51 (diff)
downloadrapier-24a25f8ae7a62c5c5afa24825b063fbb1b603922.tar.gz
rapier-24a25f8ae7a62c5c5afa24825b063fbb1b603922.tar.bz2
rapier-24a25f8ae7a62c5c5afa24825b063fbb1b603922.zip
Merge pull request #28 from dimforge/raycast
Add the QueryPipeline for ray-casting and other geometrical queries in the future
Diffstat (limited to 'src/data')
-rw-r--r--src/data/mod.rs1
-rw-r--r--src/data/pubsub.rs175
2 files changed, 176 insertions, 0 deletions
diff --git a/src/data/mod.rs b/src/data/mod.rs
index 7c00442..5d3efa6 100644
--- a/src/data/mod.rs
+++ b/src/data/mod.rs
@@ -2,3 +2,4 @@
pub mod arena;
pub(crate) mod graph;
+pub mod pubsub;
diff --git a/src/data/pubsub.rs b/src/data/pubsub.rs
new file mode 100644
index 0000000..b2c9e27
--- /dev/null
+++ b/src/data/pubsub.rs
@@ -0,0 +1,175 @@
+//! Publish-subscribe mechanism for internal events.
+
+use std::collections::VecDeque;
+use std::marker::PhantomData;
+
+/// A permanent subscription to a pub-sub queue.
+#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
+pub struct Subscription<T> {
+ // Position on the cursor array.
+ id: u32,
+ _phantom: PhantomData<T>,
+}
+
+#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
+struct PubSubCursor {
+ // Position on the offset array.
+ id: u32,
+ // Index of the next message to read.
+ // NOTE: Having this here is not actually necessary because
+ // this value is supposed to be equal to `offsets[self.id]`.
+ // However, we keep it because it lets us avoid one lookup
+ // on the `offsets` array inside of message-polling loops
+ // based on `read_ith`.
+ next: u32,
+}
+
+impl PubSubCursor {
+ fn id(&self, num_deleted: u32) -> usize {
+ (self.id - num_deleted) as usize
+ }
+
+ fn next(&self, num_deleted: u32) -> usize {
+ (self.next - num_deleted) as usize
+ }
+}
+
+/// A pub-sub queue.
+#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
+pub struct PubSub<T> {
+ deleted_messages: u32,
+ deleted_offsets: u32,
+ messages: VecDeque<T>,
+ offsets: VecDeque<u32>,
+ cursors: Vec<PubSubCursor>,
+}
+
+impl<T> PubSub<T> {
+ /// Create a new empty pub-sub queue.
+ pub fn new() -> Self {
+ Self {
+ deleted_offsets: 0,
+ deleted_messages: 0,
+ messages: VecDeque::new(),
+ offsets: VecDeque::new(),
+ cursors: Vec::new(),
+ }
+ }
+
+ fn reset_shifts(&mut self) {
+ for offset in &mut self.offsets {
+ *offset -= self.deleted_messages;
+ }
+
+ for cursor in &mut self.cursors {
+ cursor.id -= self.deleted_offsets;
+ cursor.next -= self.deleted_messages;
+ }
+
+ self.deleted_offsets = 0;
+ self.deleted_messages = 0;
+ }
+
+ /// Publish a new message.
+ pub fn publish(&mut self, message: T) {
+ if self.offsets.is_empty() {
+ // No subscribers, drop the message.
+ return;
+ }
+
+ self.messages.push_back(message);
+ }
+
+ /// Subscribe to the queue.
+ ///
+ /// A subscription cannot be cancelled.
+ #[must_use]
+ pub fn subscribe(&mut self) -> Subscription<T> {
+ let cursor = PubSubCursor {
+ next: self.messages.len() as u32 + self.deleted_messages,
+ id: self.offsets.len() as u32 + self.deleted_offsets,
+ };
+
+ let subscription = Subscription {
+ id: self.cursors.len() as u32,
+ _phantom: PhantomData,
+ };
+
+ self.offsets.push_back(cursor.next);
+ self.cursors.push(cursor);
+ subscription
+ }
+
+ /// Read the i-th message not yet read by the given subsciber.
+ pub fn read_ith(&self, sub: &Subscription<T>, i: usize) -> Option<&T> {
+ let cursor = &self.cursors[sub.id as usize];
+ self.messages
+ .get(cursor.next(self.deleted_messages) as usize + i)
+ }
+
+ /// Get the messages not yet read by the given subscriber.
+ pub fn read(&self, sub: &Subscription<T>) -> impl Iterator<Item = &T> {
+ let cursor = &self.cursors[sub.id as usize];
+ let next = cursor.next(self.deleted_messages);
+
+ // TODO: use self.queue.range(next..) once it is stabilised.
+ MessageRange {
+ queue: &self.messages,
+ next,
+ }
+ }
+
+ /// Makes the given subscribe acknowledge all the messages in the queue.
+ ///
+ /// A subscriber cannot read acknowledged messages any more.
+ pub fn ack(&mut self, sub: &Subscription<T>) {
+ // Update the cursor.
+ let cursor = &mut self.cursors[sub.id as usize];
+
+ self.offsets[cursor.id(self.deleted_offsets)] = u32::MAX;
+ cursor.id = self.offsets.len() as u32 + self.deleted_offsets;
+
+ cursor.next = self.messages.len() as u32 + self.deleted_messages;
+ self.offsets.push_back(cursor.next);
+
+ // Now clear the messages we don't need to
+ // maintain in memory anymore.
+ while self.offsets.front() == Some(&u32::MAX) {
+ self.offsets.pop_front();
+ self.deleted_offsets += 1;
+ }
+
+ // There must be at least one offset otherwise
+ // that would mean we have no subscribers.
+ let next = self.offsets.front().unwrap();
+ let num_to_delete = *next - self.deleted_messages;
+
+ for _ in 0..num_to_delete {
+ self.messages.pop_front();
+ }
+
+ self.deleted_messages += num_to_delete;
+
+ if self.deleted_messages > u32::MAX / 2 || self.deleted_offsets > u32::MAX / 2 {
+ // Don't let the deleted_* shifts grow indefinitely otherwise
+ // they will end up overflowing, breaking everything.
+ self.reset_shifts();
+ }
+ }
+}
+
+struct MessageRange<'a, T> {
+ queue: &'a VecDeque<T>,
+ next: usize,
+}
+
+impl<'a, T> Iterator for MessageRange<'a, T> {
+ type Item = &'a T;
+
+ #[inline(always)]
+ fn next(&mut self) -> Option<&'a T> {
+ let result = self.queue.get(self.next);
+ self.next += 1;
+ result
+ }
+}