event.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. // we have some complex types in here, but it's okay. imo splitting things out makes them *harder*
  2. // to follow in this case.
  3. #![allow(clippy::type_complexity)]
  4. use std::cell::RefCell;
  5. use std::{
  6. collections::{HashMap, VecDeque},
  7. rc::{Rc, Weak},
  8. };
  9. pub enum SubscriptionFunction<Host: 'static, Data: 'static> {
  10. ByRef(Box<dyn Fn(&Host, &mut Data)>),
  11. ByValue(Box<dyn Fn(&Host, Data) -> Option<Data>>),
  12. Consume(Box<dyn Fn(&Host, Data)>),
  13. }
  14. impl<Host: 'static, Data: 'static> SubscriptionFunction<Host, Data> {
  15. fn invoke(&self, host: &Host, mut data: Data) -> Option<Data> {
  16. match self {
  17. Self::ByRef(f) => {
  18. f(host, &mut data);
  19. Some(data)
  20. },
  21. Self::ByValue(f) => f(host, data),
  22. Self::Consume(f) => {
  23. f(host, data);
  24. None
  25. },
  26. }
  27. }
  28. }
  29. struct ConcreteEventSub<Host: 'static, Data: 'static> {
  30. host: Weak<Host>,
  31. callback: SubscriptionFunction<Host, Data>,
  32. }
  33. trait EventSub<Data: 'static> {
  34. fn is_healthy(&self) -> bool;
  35. fn invoke(&self, data: Data) -> Option<Data>;
  36. }
  37. impl<Host: 'static, Data: 'static> EventSub<Data> for ConcreteEventSub<Host, Data> {
  38. fn is_healthy(&self) -> bool {
  39. self.host.strong_count() > 0
  40. }
  41. fn invoke(&self, data: Data) -> Option<Data> {
  42. self.host
  43. .upgrade()
  44. .and_then(|h| self.callback.invoke(h.as_ref(), data))
  45. }
  46. }
  47. type Subscription<Data, Priority> = (Priority, Rc<dyn EventSub<Data>>);
  48. pub struct Channel<Tag: 'static, Data: 'static, Priority: 'static> {
  49. subs: RefCell<Vec<Subscription<Data, Priority>>>,
  50. queue: RefCell<VecDeque<Data>>,
  51. _ghost: std::marker::PhantomData<(Tag, Priority)>,
  52. }
  53. impl<Tag: 'static, Data: 'static, Priority: 'static> Default for Channel<Tag, Data, Priority> {
  54. fn default() -> Self {
  55. Self {
  56. subs: RefCell::new(Default::default()),
  57. queue: RefCell::new(Default::default()),
  58. _ghost: std::marker::PhantomData,
  59. }
  60. }
  61. }
  62. impl<Tag: 'static, Data: 'static> Channel<Tag, Data, NoPriorityTag> {
  63. pub fn sub_ref<
  64. Host: 'static,
  65. HC: crate::helper::IntoWeak<Host>,
  66. CB: Fn(&Host, &mut Data) + 'static,
  67. >(
  68. &self,
  69. who: HC,
  70. cb: CB,
  71. ) {
  72. let sub = Rc::new(ConcreteEventSub {
  73. host: who.as_weak(),
  74. callback: SubscriptionFunction::ByRef(Box::new(cb)),
  75. });
  76. self.subs.borrow_mut().push((NoPriorityTag {}, sub));
  77. }
  78. pub fn sub_opt<
  79. Host: 'static,
  80. HC: crate::helper::IntoWeak<Host>,
  81. CB: Fn(&Host, Data) -> Option<Data> + 'static,
  82. >(
  83. &self,
  84. who: HC,
  85. cb: CB,
  86. ) {
  87. let sub = Rc::new(ConcreteEventSub {
  88. host: who.as_weak(),
  89. callback: SubscriptionFunction::ByValue(Box::new(cb)),
  90. });
  91. self.subs.borrow_mut().push((NoPriorityTag {}, sub));
  92. }
  93. pub fn sub_eat<
  94. Host: 'static,
  95. HC: crate::helper::IntoWeak<Host>,
  96. CB: Fn(&Host, Data) + 'static,
  97. >(
  98. &self,
  99. who: HC,
  100. cb: CB,
  101. ) {
  102. let sub = Rc::new(ConcreteEventSub {
  103. host: who.as_weak(),
  104. callback: SubscriptionFunction::Consume(Box::new(cb)),
  105. });
  106. self.subs.borrow_mut().push((NoPriorityTag {}, sub));
  107. }
  108. }
  109. impl<Tag: 'static, Data: 'static, Priority: PartialOrd> Channel<Tag, Data, Priority> {
  110. pub fn sub_ref<
  111. Host: 'static,
  112. HC: crate::helper::IntoWeak<Host>,
  113. CB: Fn(&Host, &mut Data) + 'static,
  114. >(
  115. &self,
  116. p: Priority,
  117. who: HC,
  118. cb: CB,
  119. ) {
  120. let sub = Rc::new(ConcreteEventSub {
  121. host: who.as_weak(),
  122. callback: SubscriptionFunction::ByRef(Box::new(cb)),
  123. });
  124. let mut subs = self.subs.borrow_mut();
  125. subs.push((p, sub));
  126. // XXX: what happens if we actually get an undefined order?...
  127. subs.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
  128. }
  129. pub fn sub_opt<
  130. Host: 'static,
  131. HC: crate::helper::IntoWeak<Host>,
  132. CB: Fn(&Host, Data) -> Option<Data> + 'static,
  133. >(
  134. &self,
  135. p: Priority,
  136. who: HC,
  137. cb: CB,
  138. ) {
  139. let sub = Rc::new(ConcreteEventSub {
  140. host: who.as_weak(),
  141. callback: SubscriptionFunction::ByValue(Box::new(cb)),
  142. });
  143. let mut subs = self.subs.borrow_mut();
  144. subs.push((p, sub));
  145. // XXX: what happens if we actually get an undefined order?...
  146. subs.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
  147. }
  148. pub fn sub_eat<
  149. Host: 'static,
  150. HC: crate::helper::IntoWeak<Host>,
  151. CB: Fn(&Host, Data) + 'static,
  152. >(
  153. &self,
  154. p: Priority,
  155. who: HC,
  156. cb: CB,
  157. ) {
  158. let sub = Rc::new(ConcreteEventSub {
  159. host: who.as_weak(),
  160. callback: SubscriptionFunction::Consume(Box::new(cb)),
  161. });
  162. let mut subs = self.subs.borrow_mut();
  163. subs.push((p, sub));
  164. // XXX: what happens if we actually get an undefined order?...
  165. subs.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
  166. }
  167. }
  168. impl<Tag: 'static, Data: 'static, Priority: 'static> Channel<Tag, Data, Priority> {
  169. pub fn queue(&self, data: Data) {
  170. self.queue.borrow_mut().push_back(data);
  171. }
  172. }
  173. trait ChannelMetadata {
  174. fn queue_len(&self) -> usize;
  175. fn fire_all(&self) -> usize;
  176. }
  177. impl<Tag: 'static, Data: 'static, Priority: 'static> ChannelMetadata
  178. for Channel<Tag, Data, Priority>
  179. {
  180. fn queue_len(&self) -> usize {
  181. self.queue.borrow().len()
  182. }
  183. fn fire_all(&self) -> usize {
  184. let mut count = 0;
  185. if self.subs.borrow().len() == 0 {
  186. return self.queue.borrow_mut().drain(..).count();
  187. }
  188. while !self.queue.borrow().is_empty() {
  189. let mut event = self.queue.borrow_mut().pop_front();
  190. for sub in self.subs.borrow().iter() {
  191. event = sub.1.invoke(event.unwrap());
  192. if event.is_none() {
  193. break;
  194. }
  195. }
  196. count += 1;
  197. }
  198. count
  199. }
  200. }
  201. pub trait ChannelSpec: 'static {
  202. type Tag: 'static;
  203. type Data: 'static;
  204. type Priority: 'static + Clone;
  205. }
  206. #[derive(Clone)]
  207. pub struct NoPriorityTag;
  208. impl<Tag: 'static, Data: 'static, Priority: 'static + Clone + PartialOrd> ChannelSpec
  209. for (Tag, Data, Priority)
  210. {
  211. type Tag = Tag;
  212. type Data = Data;
  213. type Priority = Priority;
  214. }
  215. impl<Tag: 'static, Data: 'static> ChannelSpec for (Tag, Data) {
  216. type Tag = Tag;
  217. type Data = Data;
  218. type Priority = NoPriorityTag;
  219. }
  220. type NamedChannelMetadata = (&'static str, Rc<dyn ChannelMetadata>);
  221. #[derive(Default)]
  222. pub struct EventRoot {
  223. pub(crate) channels: RefCell<HashMap<std::any::TypeId, Rc<dyn std::any::Any>>>,
  224. metadata: RefCell<HashMap<std::any::TypeId, NamedChannelMetadata>>,
  225. }
  226. impl EventRoot {
  227. pub fn create_channel<CS: ChannelSpec>(&self) {
  228. let tid = std::any::TypeId::of::<Channel<CS::Tag, CS::Data, CS::Priority>>();
  229. let ch = Rc::new(Channel::<CS::Tag, CS::Data, CS::Priority>::default());
  230. if self
  231. .metadata
  232. .borrow_mut()
  233. .insert(tid, (std::any::type_name::<CS>(), ch.clone()))
  234. .is_some()
  235. {
  236. panic!(
  237. "Tried recreating already-existing channel {}!",
  238. std::any::type_name::<CS>()
  239. );
  240. }
  241. self.channels.borrow_mut().insert(tid, ch);
  242. }
  243. pub fn channel<'l, 's: 'l, CS: ChannelSpec>(
  244. &'s self,
  245. ) -> Rc<Channel<CS::Tag, CS::Data, CS::Priority>> {
  246. let tid = std::any::TypeId::of::<Channel<CS::Tag, CS::Data, CS::Priority>>();
  247. let own = self.channels.borrow();
  248. if own.contains_key(&tid) {
  249. let entry = own.get(&tid).unwrap().clone();
  250. entry
  251. .downcast::<Channel<CS::Tag, CS::Data, CS::Priority>>()
  252. .expect("internal inconsistency?")
  253. } else {
  254. panic!(
  255. "Asked for channel {} that has not been created!",
  256. std::any::type_name::<CS>()
  257. )
  258. }
  259. }
  260. pub(crate) fn fire_all(&self) {
  261. let mut any = true;
  262. while any {
  263. any = false;
  264. for ch in self.metadata.borrow().iter() {
  265. if ch.1 .1.queue_len() > 0 {
  266. log::trace!(
  267. "Queue {} has {} event(s) to fire",
  268. ch.1 .0,
  269. &ch.1 .1.queue_len()
  270. );
  271. ch.1 .1.fire_all();
  272. any = true;
  273. }
  274. }
  275. }
  276. }
  277. }
  278. #[cfg(test)]
  279. mod tests {
  280. use std::ops::Deref;
  281. use std::{
  282. cell::{Cell, RefCell},
  283. rc::Rc,
  284. };
  285. use super::ChannelMetadata;
  286. use super::EventRoot;
  287. struct Receiver {
  288. int_count: Cell<usize>,
  289. }
  290. impl Receiver {
  291. fn receive_i32(&self, _val: &mut i32) {
  292. self.int_count.set(self.int_count.get() + 1);
  293. }
  294. }
  295. struct OrderedReceiver {
  296. id: i32,
  297. order: Rc<RefCell<Vec<i32>>>,
  298. }
  299. impl OrderedReceiver {
  300. fn receive_i32(&self, _val: &mut i32) {
  301. self.order.borrow_mut().push(self.id)
  302. }
  303. }
  304. struct IntTag;
  305. type IntChannel = (IntTag, i32);
  306. #[derive(PartialEq, PartialOrd, Clone)]
  307. enum IntPriority {
  308. First,
  309. Second,
  310. Third,
  311. }
  312. type IntPriorityChannel = (IntTag, i32, IntPriority);
  313. #[test]
  314. fn simple_fire() {
  315. let root = EventRoot::default();
  316. root.create_channel::<IntChannel>();
  317. let recv = Rc::new(Receiver {
  318. int_count: Cell::new(0),
  319. });
  320. root.channel::<IntChannel>()
  321. .sub_ref(&recv, Receiver::receive_i32);
  322. root.channel::<IntChannel>().queue(0i32);
  323. assert_eq!(recv.int_count.get(), 0);
  324. root.channel::<IntChannel>().fire_all();
  325. assert_eq!(recv.int_count.get(), 1);
  326. }
  327. #[test]
  328. fn priority_fire() {
  329. let root = EventRoot::default();
  330. root.create_channel::<IntPriorityChannel>();
  331. let order = Rc::new(RefCell::new(Vec::new()));
  332. let recv1 = Rc::new(OrderedReceiver {
  333. id: 1,
  334. order: order.clone(),
  335. });
  336. let recv2 = Rc::new(OrderedReceiver {
  337. id: 2,
  338. order: order.clone(),
  339. });
  340. let recv3 = Rc::new(OrderedReceiver {
  341. id: 3,
  342. order: order.clone(),
  343. });
  344. root.channel::<IntPriorityChannel>().sub_ref(
  345. IntPriority::Second,
  346. &recv2,
  347. OrderedReceiver::receive_i32,
  348. );
  349. root.channel::<IntPriorityChannel>().sub_ref(
  350. IntPriority::First,
  351. &recv1,
  352. OrderedReceiver::receive_i32,
  353. );
  354. root.channel::<IntPriorityChannel>().sub_ref(
  355. IntPriority::Third,
  356. &recv3,
  357. OrderedReceiver::receive_i32,
  358. );
  359. root.channel::<IntPriorityChannel>().queue(0i32);
  360. assert_eq!(order.borrow().deref(), &vec![]);
  361. root.channel::<IntPriorityChannel>().fire_all();
  362. assert_eq!(order.borrow().deref(), &vec![1, 2, 3]);
  363. }
  364. }