123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425 |
- // we have some complex types in here, but it's okay. imo splitting things out makes them *harder*
- // to follow in this case.
- #![allow(clippy::type_complexity)]
- use std::cell::RefCell;
- use std::{
- collections::{HashMap, VecDeque},
- rc::{Rc, Weak},
- };
- pub enum SubscriptionFunction<Host: 'static, Data: 'static> {
- ByRef(Box<dyn Fn(&Host, &mut Data)>),
- ByValue(Box<dyn Fn(&Host, Data) -> Option<Data>>),
- Consume(Box<dyn Fn(&Host, Data)>),
- }
- impl<Host: 'static, Data: 'static> SubscriptionFunction<Host, Data> {
- fn invoke(&self, host: &Host, mut data: Data) -> Option<Data> {
- match self {
- Self::ByRef(f) => {
- f(host, &mut data);
- Some(data)
- },
- Self::ByValue(f) => f(host, data),
- Self::Consume(f) => {
- f(host, data);
- None
- },
- }
- }
- }
- struct ConcreteEventSub<Host: 'static, Data: 'static> {
- host: Weak<Host>,
- callback: SubscriptionFunction<Host, Data>,
- }
- trait EventSub<Data: 'static> {
- fn is_healthy(&self) -> bool;
- fn invoke(&self, data: Data) -> Option<Data>;
- }
- impl<Host: 'static, Data: 'static> EventSub<Data> for ConcreteEventSub<Host, Data> {
- fn is_healthy(&self) -> bool {
- self.host.strong_count() > 0
- }
- fn invoke(&self, data: Data) -> Option<Data> {
- self.host
- .upgrade()
- .and_then(|h| self.callback.invoke(h.as_ref(), data))
- }
- }
- type Subscription<Data, Priority> = (Priority, Rc<dyn EventSub<Data>>);
- pub struct Channel<Tag: 'static, Data: 'static, Priority: 'static> {
- subs: RefCell<Vec<Subscription<Data, Priority>>>,
- queue: RefCell<VecDeque<Data>>,
- _ghost: std::marker::PhantomData<(Tag, Priority)>,
- }
- impl<Tag: 'static, Data: 'static, Priority: 'static> Default for Channel<Tag, Data, Priority> {
- fn default() -> Self {
- Self {
- subs: RefCell::new(Default::default()),
- queue: RefCell::new(Default::default()),
- _ghost: std::marker::PhantomData,
- }
- }
- }
- impl<Tag: 'static, Data: 'static> Channel<Tag, Data, NoPriorityTag> {
- pub fn sub_ref<
- Host: 'static,
- HC: crate::helper::IntoWeak<Host>,
- CB: Fn(&Host, &mut Data) + 'static,
- >(
- &self,
- who: HC,
- cb: CB,
- ) {
- let sub = Rc::new(ConcreteEventSub {
- host: who.as_weak(),
- callback: SubscriptionFunction::ByRef(Box::new(cb)),
- });
- self.subs.borrow_mut().push((NoPriorityTag {}, sub));
- }
- pub fn sub_opt<
- Host: 'static,
- HC: crate::helper::IntoWeak<Host>,
- CB: Fn(&Host, Data) -> Option<Data> + 'static,
- >(
- &self,
- who: HC,
- cb: CB,
- ) {
- let sub = Rc::new(ConcreteEventSub {
- host: who.as_weak(),
- callback: SubscriptionFunction::ByValue(Box::new(cb)),
- });
- self.subs.borrow_mut().push((NoPriorityTag {}, sub));
- }
- pub fn sub_eat<
- Host: 'static,
- HC: crate::helper::IntoWeak<Host>,
- CB: Fn(&Host, Data) + 'static,
- >(
- &self,
- who: HC,
- cb: CB,
- ) {
- let sub = Rc::new(ConcreteEventSub {
- host: who.as_weak(),
- callback: SubscriptionFunction::Consume(Box::new(cb)),
- });
- self.subs.borrow_mut().push((NoPriorityTag {}, sub));
- }
- }
- impl<Tag: 'static, Data: 'static, Priority: PartialOrd> Channel<Tag, Data, Priority> {
- pub fn sub_ref<
- Host: 'static,
- HC: crate::helper::IntoWeak<Host>,
- CB: Fn(&Host, &mut Data) + 'static,
- >(
- &self,
- p: Priority,
- who: HC,
- cb: CB,
- ) {
- let sub = Rc::new(ConcreteEventSub {
- host: who.as_weak(),
- callback: SubscriptionFunction::ByRef(Box::new(cb)),
- });
- let mut subs = self.subs.borrow_mut();
- subs.push((p, sub));
- // XXX: what happens if we actually get an undefined order?...
- subs.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
- }
- pub fn sub_opt<
- Host: 'static,
- HC: crate::helper::IntoWeak<Host>,
- CB: Fn(&Host, Data) -> Option<Data> + 'static,
- >(
- &self,
- p: Priority,
- who: HC,
- cb: CB,
- ) {
- let sub = Rc::new(ConcreteEventSub {
- host: who.as_weak(),
- callback: SubscriptionFunction::ByValue(Box::new(cb)),
- });
- let mut subs = self.subs.borrow_mut();
- subs.push((p, sub));
- // XXX: what happens if we actually get an undefined order?...
- subs.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
- }
- pub fn sub_eat<
- Host: 'static,
- HC: crate::helper::IntoWeak<Host>,
- CB: Fn(&Host, Data) + 'static,
- >(
- &self,
- p: Priority,
- who: HC,
- cb: CB,
- ) {
- let sub = Rc::new(ConcreteEventSub {
- host: who.as_weak(),
- callback: SubscriptionFunction::Consume(Box::new(cb)),
- });
- let mut subs = self.subs.borrow_mut();
- subs.push((p, sub));
- // XXX: what happens if we actually get an undefined order?...
- subs.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
- }
- }
- impl<Tag: 'static, Data: 'static, Priority: 'static> Channel<Tag, Data, Priority> {
- pub fn queue(&self, data: Data) {
- self.queue.borrow_mut().push_back(data);
- }
- }
- trait ChannelMetadata {
- fn queue_len(&self) -> usize;
- fn fire_all(&self) -> usize;
- }
- impl<Tag: 'static, Data: 'static, Priority: 'static> ChannelMetadata
- for Channel<Tag, Data, Priority>
- {
- fn queue_len(&self) -> usize {
- self.queue.borrow().len()
- }
- fn fire_all(&self) -> usize {
- let mut count = 0;
- if self.subs.borrow().len() == 0 {
- return self.queue.borrow_mut().drain(..).count();
- }
- while !self.queue.borrow().is_empty() {
- let mut event = self.queue.borrow_mut().pop_front();
- for sub in self.subs.borrow().iter() {
- event = sub.1.invoke(event.unwrap());
- if event.is_none() {
- break;
- }
- }
- count += 1;
- }
- count
- }
- }
- pub trait ChannelSpec: 'static {
- type Tag: 'static;
- type Data: 'static;
- type Priority: 'static + Clone;
- }
- #[derive(Clone)]
- pub struct NoPriorityTag;
- impl<Tag: 'static, Data: 'static, Priority: 'static + Clone + PartialOrd> ChannelSpec
- for (Tag, Data, Priority)
- {
- type Tag = Tag;
- type Data = Data;
- type Priority = Priority;
- }
- impl<Tag: 'static, Data: 'static> ChannelSpec for (Tag, Data) {
- type Tag = Tag;
- type Data = Data;
- type Priority = NoPriorityTag;
- }
- type NamedChannelMetadata = (&'static str, Rc<dyn ChannelMetadata>);
- #[derive(Default)]
- pub struct EventRoot {
- pub(crate) channels: RefCell<HashMap<std::any::TypeId, Rc<dyn std::any::Any>>>,
- metadata: RefCell<HashMap<std::any::TypeId, NamedChannelMetadata>>,
- }
- impl EventRoot {
- pub fn create_channel<CS: ChannelSpec>(&self) {
- let tid = std::any::TypeId::of::<Channel<CS::Tag, CS::Data, CS::Priority>>();
- let ch = Rc::new(Channel::<CS::Tag, CS::Data, CS::Priority>::default());
- if self
- .metadata
- .borrow_mut()
- .insert(tid, (std::any::type_name::<CS>(), ch.clone()))
- .is_some()
- {
- panic!(
- "Tried recreating already-existing channel {}!",
- std::any::type_name::<CS>()
- );
- }
- self.channels.borrow_mut().insert(tid, ch);
- }
- pub fn channel<'l, 's: 'l, CS: ChannelSpec>(
- &'s self,
- ) -> Rc<Channel<CS::Tag, CS::Data, CS::Priority>> {
- let tid = std::any::TypeId::of::<Channel<CS::Tag, CS::Data, CS::Priority>>();
- let own = self.channels.borrow();
- if own.contains_key(&tid) {
- let entry = own.get(&tid).unwrap().clone();
- entry
- .downcast::<Channel<CS::Tag, CS::Data, CS::Priority>>()
- .expect("internal inconsistency?")
- } else {
- panic!(
- "Asked for channel {} that has not been created!",
- std::any::type_name::<CS>()
- )
- }
- }
- pub(crate) fn fire_all(&self) {
- let mut any = true;
- while any {
- any = false;
- for ch in self.metadata.borrow().iter() {
- if ch.1 .1.queue_len() > 0 {
- log::trace!(
- "Queue {} has {} event(s) to fire",
- ch.1 .0,
- &ch.1 .1.queue_len()
- );
- ch.1 .1.fire_all();
- any = true;
- }
- }
- }
- }
- }
- #[cfg(test)]
- mod tests {
- use std::ops::Deref;
- use std::{
- cell::{Cell, RefCell},
- rc::Rc,
- };
- use super::ChannelMetadata;
- use super::EventRoot;
- struct Receiver {
- int_count: Cell<usize>,
- }
- impl Receiver {
- fn receive_i32(&self, _val: &mut i32) {
- self.int_count.set(self.int_count.get() + 1);
- }
- }
- struct OrderedReceiver {
- id: i32,
- order: Rc<RefCell<Vec<i32>>>,
- }
- impl OrderedReceiver {
- fn receive_i32(&self, _val: &mut i32) {
- self.order.borrow_mut().push(self.id)
- }
- }
- struct IntTag;
- type IntChannel = (IntTag, i32);
- #[derive(PartialEq, PartialOrd, Clone)]
- enum IntPriority {
- First,
- Second,
- Third,
- }
- type IntPriorityChannel = (IntTag, i32, IntPriority);
- #[test]
- fn simple_fire() {
- let root = EventRoot::default();
- root.create_channel::<IntChannel>();
- let recv = Rc::new(Receiver {
- int_count: Cell::new(0),
- });
- root.channel::<IntChannel>()
- .sub_ref(&recv, Receiver::receive_i32);
- root.channel::<IntChannel>().queue(0i32);
- assert_eq!(recv.int_count.get(), 0);
- root.channel::<IntChannel>().fire_all();
- assert_eq!(recv.int_count.get(), 1);
- }
- #[test]
- fn priority_fire() {
- let root = EventRoot::default();
- root.create_channel::<IntPriorityChannel>();
- let order = Rc::new(RefCell::new(Vec::new()));
- let recv1 = Rc::new(OrderedReceiver {
- id: 1,
- order: order.clone(),
- });
- let recv2 = Rc::new(OrderedReceiver {
- id: 2,
- order: order.clone(),
- });
- let recv3 = Rc::new(OrderedReceiver {
- id: 3,
- order: order.clone(),
- });
- root.channel::<IntPriorityChannel>().sub_ref(
- IntPriority::Second,
- &recv2,
- OrderedReceiver::receive_i32,
- );
- root.channel::<IntPriorityChannel>().sub_ref(
- IntPriority::First,
- &recv1,
- OrderedReceiver::receive_i32,
- );
- root.channel::<IntPriorityChannel>().sub_ref(
- IntPriority::Third,
- &recv3,
- OrderedReceiver::receive_i32,
- );
- root.channel::<IntPriorityChannel>().queue(0i32);
- assert_eq!(order.borrow().deref(), &vec![]);
- root.channel::<IntPriorityChannel>().fire_all();
- assert_eq!(order.borrow().deref(), &vec![1, 2, 3]);
- }
- }
|