// we have some complex types in here, but it's okay. imo splitting things out makes them *harder* // to follow in this case. #![allow(clippy::type_complexity)] use std::cell::RefCell; use std::{ collections::{HashMap, VecDeque}, rc::{Rc, Weak}, }; pub enum SubscriptionFunction { ByRef(Box), ByValue(Box Option>), Consume(Box), } impl SubscriptionFunction { fn invoke(&self, host: &Host, mut data: Data) -> Option { match self { Self::ByRef(f) => { f(host, &mut data); Some(data) }, Self::ByValue(f) => f(host, data), Self::Consume(f) => { f(host, data); None }, } } } struct ConcreteEventSub { host: Weak, callback: SubscriptionFunction, } trait EventSub { fn is_healthy(&self) -> bool; fn invoke(&self, data: Data) -> Option; } impl EventSub for ConcreteEventSub { fn is_healthy(&self) -> bool { self.host.strong_count() > 0 } fn invoke(&self, data: Data) -> Option { self.host .upgrade() .and_then(|h| self.callback.invoke(h.as_ref(), data)) } } type Subscription = (Priority, Rc>); pub struct Channel { subs: RefCell>>, queue: RefCell>, _ghost: std::marker::PhantomData<(Tag, Priority)>, } impl Default for Channel { fn default() -> Self { Self { subs: RefCell::new(Default::default()), queue: RefCell::new(Default::default()), _ghost: std::marker::PhantomData, } } } impl Channel { pub fn sub_ref< Host: 'static, HC: crate::helper::IntoWeak, CB: Fn(&Host, &mut Data) + 'static, >( &self, who: HC, cb: CB, ) { let sub = Rc::new(ConcreteEventSub { host: who.as_weak(), callback: SubscriptionFunction::ByRef(Box::new(cb)), }); self.subs.borrow_mut().push((NoPriorityTag {}, sub)); } pub fn sub_opt< Host: 'static, HC: crate::helper::IntoWeak, CB: Fn(&Host, Data) -> Option + 'static, >( &self, who: HC, cb: CB, ) { let sub = Rc::new(ConcreteEventSub { host: who.as_weak(), callback: SubscriptionFunction::ByValue(Box::new(cb)), }); self.subs.borrow_mut().push((NoPriorityTag {}, sub)); } pub fn sub_eat< Host: 'static, HC: crate::helper::IntoWeak, CB: Fn(&Host, Data) + 'static, >( &self, who: HC, cb: CB, ) { let sub = Rc::new(ConcreteEventSub { host: who.as_weak(), callback: SubscriptionFunction::Consume(Box::new(cb)), }); self.subs.borrow_mut().push((NoPriorityTag {}, sub)); } } impl Channel { pub fn sub_ref< Host: 'static, HC: crate::helper::IntoWeak, CB: Fn(&Host, &mut Data) + 'static, >( &self, p: Priority, who: HC, cb: CB, ) { let sub = Rc::new(ConcreteEventSub { host: who.as_weak(), callback: SubscriptionFunction::ByRef(Box::new(cb)), }); let mut subs = self.subs.borrow_mut(); subs.push((p, sub)); // XXX: what happens if we actually get an undefined order?... subs.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap()); } pub fn sub_opt< Host: 'static, HC: crate::helper::IntoWeak, CB: Fn(&Host, Data) -> Option + 'static, >( &self, p: Priority, who: HC, cb: CB, ) { let sub = Rc::new(ConcreteEventSub { host: who.as_weak(), callback: SubscriptionFunction::ByValue(Box::new(cb)), }); let mut subs = self.subs.borrow_mut(); subs.push((p, sub)); // XXX: what happens if we actually get an undefined order?... subs.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap()); } pub fn sub_eat< Host: 'static, HC: crate::helper::IntoWeak, CB: Fn(&Host, Data) + 'static, >( &self, p: Priority, who: HC, cb: CB, ) { let sub = Rc::new(ConcreteEventSub { host: who.as_weak(), callback: SubscriptionFunction::Consume(Box::new(cb)), }); let mut subs = self.subs.borrow_mut(); subs.push((p, sub)); // XXX: what happens if we actually get an undefined order?... subs.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap()); } } impl Channel { pub fn queue(&self, data: Data) { self.queue.borrow_mut().push_back(data); } } trait ChannelMetadata { fn queue_len(&self) -> usize; fn fire_all(&self) -> usize; } impl ChannelMetadata for Channel { fn queue_len(&self) -> usize { self.queue.borrow().len() } fn fire_all(&self) -> usize { let mut count = 0; if self.subs.borrow().len() == 0 { return self.queue.borrow_mut().drain(..).count(); } while !self.queue.borrow().is_empty() { let mut event = self.queue.borrow_mut().pop_front(); for sub in self.subs.borrow().iter() { event = sub.1.invoke(event.unwrap()); if event.is_none() { break; } } count += 1; } count } } pub trait ChannelSpec: 'static { type Tag: 'static; type Data: 'static; type Priority: 'static + Clone; } #[derive(Clone)] pub struct NoPriorityTag; impl ChannelSpec for (Tag, Data, Priority) { type Tag = Tag; type Data = Data; type Priority = Priority; } impl ChannelSpec for (Tag, Data) { type Tag = Tag; type Data = Data; type Priority = NoPriorityTag; } type NamedChannelMetadata = (&'static str, Rc); #[derive(Default)] pub struct EventRoot { pub(crate) channels: RefCell>>, metadata: RefCell>, } impl EventRoot { pub fn create_channel(&self) { let tid = std::any::TypeId::of::>(); let ch = Rc::new(Channel::::default()); if self .metadata .borrow_mut() .insert(tid, (std::any::type_name::(), ch.clone())) .is_some() { panic!( "Tried recreating already-existing channel {}!", std::any::type_name::() ); } self.channels.borrow_mut().insert(tid, ch); } pub fn channel<'l, 's: 'l, CS: ChannelSpec>( &'s self, ) -> Rc> { let tid = std::any::TypeId::of::>(); let own = self.channels.borrow(); if own.contains_key(&tid) { let entry = own.get(&tid).unwrap().clone(); entry .downcast::>() .expect("internal inconsistency?") } else { panic!( "Asked for channel {} that has not been created!", std::any::type_name::() ) } } pub(crate) fn fire_all(&self) { let mut any = true; while any { any = false; for ch in self.metadata.borrow().iter() { if ch.1 .1.queue_len() > 0 { log::trace!( "Queue {} has {} event(s) to fire", ch.1 .0, &ch.1 .1.queue_len() ); ch.1 .1.fire_all(); any = true; } } } } } #[cfg(test)] mod tests { use std::ops::Deref; use std::{ cell::{Cell, RefCell}, rc::Rc, }; use super::ChannelMetadata; use super::EventRoot; struct Receiver { int_count: Cell, } impl Receiver { fn receive_i32(&self, _val: &mut i32) { self.int_count.set(self.int_count.get() + 1); } } struct OrderedReceiver { id: i32, order: Rc>>, } impl OrderedReceiver { fn receive_i32(&self, _val: &mut i32) { self.order.borrow_mut().push(self.id) } } struct IntTag; type IntChannel = (IntTag, i32); #[derive(PartialEq, PartialOrd, Clone)] enum IntPriority { First, Second, Third, } type IntPriorityChannel = (IntTag, i32, IntPriority); #[test] fn simple_fire() { let root = EventRoot::default(); root.create_channel::(); let recv = Rc::new(Receiver { int_count: Cell::new(0), }); root.channel::() .sub_ref(&recv, Receiver::receive_i32); root.channel::().queue(0i32); assert_eq!(recv.int_count.get(), 0); root.channel::().fire_all(); assert_eq!(recv.int_count.get(), 1); } #[test] fn priority_fire() { let root = EventRoot::default(); root.create_channel::(); let order = Rc::new(RefCell::new(Vec::new())); let recv1 = Rc::new(OrderedReceiver { id: 1, order: order.clone(), }); let recv2 = Rc::new(OrderedReceiver { id: 2, order: order.clone(), }); let recv3 = Rc::new(OrderedReceiver { id: 3, order: order.clone(), }); root.channel::().sub_ref( IntPriority::Second, &recv2, OrderedReceiver::receive_i32, ); root.channel::().sub_ref( IntPriority::First, &recv1, OrderedReceiver::receive_i32, ); root.channel::().sub_ref( IntPriority::Third, &recv3, OrderedReceiver::receive_i32, ); root.channel::().queue(0i32); assert_eq!(order.borrow().deref(), &vec![]); root.channel::().fire_all(); assert_eq!(order.borrow().deref(), &vec![1, 2, 3]); } }