Przeglądaj źródła

Replaced hardcoded Service trait with generic EventChannels.

Kestrel 2 lat temu
rodzic
commit
0e72adb77a

+ 32 - 1
fleck/src/helper.rs

@@ -3,6 +3,37 @@ use std::any::Any;
 pub trait AsAny {
     fn as_any(&self) -> &dyn Any;
 }
+
 impl<T: Any> AsAny for T {
-    fn as_any(&self) -> &dyn Any { self }
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+}
+
+pub trait IntoWeak<T> {
+    fn as_weak(self) -> std::rc::Weak<T>;
+}
+
+impl<T> IntoWeak<T> for std::rc::Rc<T> {
+    fn as_weak(self) -> std::rc::Weak<T> {
+        std::rc::Rc::downgrade(&self)
+    }
+}
+
+impl<'a, T> IntoWeak<T> for &'a std::rc::Rc<T> {
+    fn as_weak(self) -> std::rc::Weak<T> {
+        std::rc::Rc::downgrade(&self)
+    }
+}
+
+impl<T> IntoWeak<T> for std::rc::Weak<T> {
+    fn as_weak(self) -> std::rc::Weak<T> {
+        self
+    }
+}
+
+impl<'a, T> IntoWeak<T> for &'a std::rc::Weak<T> {
+    fn as_weak(self) -> std::rc::Weak<T> {
+        self.clone()
+    }
 }

+ 8 - 4
fleck/src/io.rs

@@ -101,13 +101,17 @@ impl SocketWrapperChannel {
             match self.socket.borrow().send_to(&pkt.0, pkt.1) {
                 Ok(len) => {
                     if len != pkt.0.len() {
-                        log::warn!("Packet of {} bytes truncated to {} bytes when sending", pkt.0.len(), len);
+                        log::warn!(
+                            "Packet of {} bytes truncated to {} bytes when sending",
+                            pkt.0.len(),
+                            len
+                        );
                     }
                     queue.pop_front();
-                },
+                }
                 Err(e) => {
                     log::info!("Error while sending packet: {:?}", e);
-                    break
+                    break;
                 }
             }
         }
@@ -208,7 +212,7 @@ impl Default for FleckIO {
             local: Default::default(),
             poll,
             major,
-            minor
+            minor,
         }
     }
 }

+ 31 - 34
fleck/src/lib.rs

@@ -5,34 +5,28 @@ mod msg;
 mod node;
 
 mod crypto;
-mod io;
 mod helper;
+mod io;
 pub mod service;
 
 pub mod prelude {
+    pub use crate::helper::{AsAny, IntoWeak};
     pub use crate::msg::Message;
-    pub use crate::service::{Service, ServiceAPI};
+    pub use crate::service::Service;
 }
 
 use prelude::*;
 
-use io::FleckIO;
-
 pub struct Fleck {
-    io: Rc<FleckIO>,
+    io: Rc<io::FleckIO>,
     services: RefCell<service::ServiceStack>,
-    message_registry: msg::MessageRegistry,
-
-    queued_packets: RefCell<Vec<io::Packet>>,
 }
 
 impl Fleck {
     pub fn new() -> Rc<Self> {
         let res = Rc::new(Self {
-            io: Rc::new(FleckIO::default()),
-            services: RefCell::new(service::ServiceStack::new()),
-            message_registry: msg::MessageRegistry::new(),
-            queued_packets: RefCell::new(vec![]),
+            io: Default::default(),
+            services: Default::default(),
         });
 
         res.register_core_services();
@@ -42,53 +36,56 @@ impl Fleck {
 
     fn register_core_services(&self) {
         let mut svcs = self.services.borrow_mut();
+        // initial incoming/minor/major channels
+        svcs.create_io_channels();
         // Node registration
-        svcs.give_service(service::core::Nodes::default());
+        svcs.give_service::<service::core::Nodes>();
         // Local discovery
-        svcs.give_service(service::core::LocalDiscovery::default());
+        svcs.give_service::<service::core::LocalDiscovery>();
         // Parsing incoming packets
-        svcs.give_service(service::core::ParsePacket::default());
+        svcs.give_service::<service::core::ParsePacket>();
         // Actually sending packets
-        svcs.give_service(service::core::SendPacket::default());
+        svcs.give_service::<service::core::SendPacket>();
     }
 
     pub fn run(&self) {
         self.io.poll(self);
     }
-
-    fn send_queued_packets(&self) {
-        let svcs = self.services.borrow();
-        for mut pkt in self.queued_packets.borrow_mut().drain(..) {
-            svcs.process_outgoing(self, &mut pkt);
-        }
-    }
 }
 
-impl ServiceAPI for Fleck {
-    fn raw_io(&self) -> &FleckIO {
+impl Fleck {
+    pub(crate) fn raw_io(&self) -> &io::FleckIO {
         &self.io
     }
-    fn send_packet(&self, pkt: io::Packet) {
-        self.queued_packets.borrow_mut().push(pkt);
-    }
-    fn services(&self) -> std::cell::Ref<service::ServiceStack> {
+    pub fn services(&self) -> std::cell::Ref<service::ServiceStack> {
         self.services.borrow()
     }
+    pub fn queue<CS: service::ChannelSpec>(&self, data: CS::Data) {
+        self.services
+            .borrow()
+            .event_root()
+            .channel::<CS>()
+            .queue(data);
+    }
+    pub fn queue_priority<CS: service::ChannelSpec>(&self, data: CS::Data) {
+        self.services
+            .borrow()
+            .event_root()
+            .priority_channel::<CS>()
+            .queue(data);
+    }
 }
 
 impl io::IOFeedback for Fleck {
-    fn packet(&self, mut packet: io::Packet) {
-        self.services.borrow().process_incoming(self, &mut packet);
-        self.send_queued_packets();
+    fn packet(&self, packet: io::Packet) {
+        self.services.borrow().process_incoming(self, packet);
     }
 
     fn minor_tick(&self) {
         self.services.borrow().process_minor_tick(self);
-        self.send_queued_packets();
     }
 
     fn major_tick(&self) {
         self.services.borrow().process_major_tick(self);
-        self.send_queued_packets();
     }
 }

+ 10 - 9
fleck/src/msg.rs

@@ -21,19 +21,26 @@ pub struct Message {
     data: Vec<u8>,
 }
 
-pub struct MessageBuilder {}
-
 impl Message {
     pub fn build<M: MessageParams + Serialize>(from: M) -> Self {
         Self {
             magic: MESSAGE_MAGIC,
             ty: message_type::<M>(),
-            data: vec![]
+            data: vec![],
         }
     }
+
+    pub fn is_type<M: MessageParams>(&self) -> bool {
+        self.ty == message_type::<M>()
+    }
+
+    pub fn downcast<M: MessageParams + DeserializeOwned>(&self) -> Option<M> {
+        todo!()
+    }
 }
 
 type Deserializer = dyn Fn(&[u8]) -> Option<Box<dyn std::any::Any>>;
+#[derive(Default)]
 pub struct MessageRegistry {
     deser: HashMap<u16, Box<Deserializer>>,
 }
@@ -44,12 +51,6 @@ pub trait MessageParams {
 }
 
 impl MessageRegistry {
-    pub(crate) fn new() -> Self {
-        Self {
-            deser: Default::default(),
-        }
-    }
-
     pub fn add_message_type<MT: MessageParams + Serialize + DeserializeOwned>(&mut self) {
         let derived_typeid = message_type::<MT>();
         self.deser.insert(

+ 74 - 121
fleck/src/service.rs

@@ -1,17 +1,20 @@
 use std::any::TypeId;
 use std::cell::RefCell;
 use std::collections::HashMap;
-use std::ops::DerefMut;
 use std::ops::Deref;
+use std::ops::DerefMut;
 use std::{borrow::BorrowMut, rc::Rc};
 
 use crate::io::{FleckIO, Packet};
 
+mod event;
 mod lowlevel;
-mod priority;
 mod nodes;
+mod priority;
 
-pub use priority::{Never, ServicePriority as Priority};
+pub use priority::{Never, ServicePriority as BetweenPriority};
+
+pub use event::ChannelSpec;
 
 use self::priority::AbstractServicePriority;
 
@@ -35,166 +38,116 @@ pub mod order {
 }
 
 pub mod core {
-    pub(crate) use super::lowlevel::{LocalDiscovery, SendPacket, ParsePacket};
+    pub(crate) use super::lowlevel::{LocalDiscovery, ParsePacket, SendPacket};
 
     pub use super::nodes::Nodes;
-}
 
-pub trait ServiceAPI {
-    fn raw_io(&self) -> &FleckIO;
-    fn send_packet(&self, pkt: Packet);
-    fn services(&self) -> std::cell::Ref<ServiceStack>;
+    pub mod channel_tags {
+        #[derive(Default)]
+        pub struct ReceivePacketTag {}
+        #[derive(Default)]
+        pub struct MinorTickTag {}
+        #[derive(Default)]
+        pub struct MajorTickTag {}
+    }
+
+    pub use super::lowlevel::SendPacketChannel;
+    pub type ReceivePacketChannel = (channel_tags::ReceivePacketTag, crate::io::Packet);
+    pub type MinorTickChannel = (channel_tags::MinorTickTag, ());
+    pub type MajorTickChannel = (channel_tags::MajorTickTag, ());
 }
 
-pub trait Service : std::any::Any + crate::helper::AsAny {
-    fn process_incoming(&mut self, _api: &dyn ServiceAPI, _packet: &mut Packet) {}
-    fn process_outgoing(&mut self, _api: &dyn ServiceAPI, _packet: &mut Packet) {}
-    fn process_minor_tick(&mut self, _api: &dyn ServiceAPI) {}
-    fn process_major_tick(&mut self, _api: &dyn ServiceAPI) {}
+pub type EventRoot = event::EventRoot<crate::Fleck>;
 
-    fn register_msgs(&self, _registry: &mut crate::msg::MessageRegistry) {}
-}
+#[allow(unused_variables)]
+pub trait Service: std::any::Any + crate::helper::AsAny {
+    /*fn process_incoming(&mut self, api: &crate::Fleck, packet: &mut Packet) {}
+    fn process_outgoing(&mut self, api: &crate::Fleck, packet: &mut Packet) {}
+    fn process_minor_tick(&mut self, api: &crate::Fleck) {}
+    fn process_major_tick(&mut self, api: &crate::Fleck) {}*/
 
-pub trait IncomingPriority {
-    type Priority: priority::AbstractServicePriority;
+    fn register_channels(&self, eroot: &mut EventRoot) {}
+    fn register_msgs(&self, registry: &mut crate::msg::MessageRegistry) {}
 }
 
-pub trait OutgoingPriority {
-    type Priority: priority::AbstractServicePriority;
+trait ServiceExt {
+    fn build() -> Self;
 }
 
-#[macro_export]
-macro_rules! service_priority {
-    ($what:ty, in: $i:ty, out: $o:ty) => {
-        impl fleck::service::IncomingPriority for $what {
-            type Priority = $i;
-        }
-        impl fleck::service::OutgoingPriority for $what {
-            type Priority = $o;
-        }
-    };
-    ($what:ty, in: $i:ty) => {
-        impl fleck::service::IncomingPriority for $what {
-            type Priority = $i;
-        }
-        impl fleck::service::OutgoingPriority for $what {
-            type Priority = fleck::service::order::Never;
-        }
-    };
-    ($what:ty, out: $o:ty) => {
-        impl fleck::service::IncomingPriority for $what {
-            type Priority = fleck::service::order::Never;
-        }
-        impl fleck::service::OutgoingPriority for $what {
-            type Priority = $o;
-        }
-    };
-    ($what:ty) => {
-        impl fleck::service::IncomingPriority for $what {
-            type Priority = fleck::service::order::Never;
-        }
-        impl fleck::service::OutgoingPriority for $what {
-            type Priority = fleck::service::order::Never;
-        }
-    };
+impl<T: Service + Default> ServiceExt for T {
+    fn build() -> Self {
+        Default::default()
+    }
 }
 
 #[derive(Default)]
 pub struct ServiceStack {
-    services: Vec<Rc<RefCell<dyn Service>>>,
+    services: Vec<Box<dyn Service>>,
 
-    service_map: HashMap<TypeId, Rc<RefCell<dyn Service>>>,
+    service_map: HashMap<TypeId, Box<dyn Service>>,
 
-    incoming_order: priority::TotalOrder<Rc<RefCell<dyn Service>>>,
-    outgoing_order: priority::TotalOrder<Rc<RefCell<dyn Service>>>,
-
-    incoming_cache: RefCell<Option<Vec<Rc<RefCell<dyn Service>>>>>,
-    outgoing_cache: RefCell<Option<Vec<Rc<RefCell<dyn Service>>>>>,
+    eroot: EventRoot,
+    message_registry: crate::msg::MessageRegistry,
 }
 
 impl ServiceStack {
-    pub(crate) fn new() -> Self {
-        Self::default()
-    }
+    pub fn give_service<S: 'static + Default>(&mut self)
+    where
+        Rc<S>: Service,
+    {
+        let srv = Rc::new(S::default());
 
-    pub fn give_service<S: 'static + Service + IncomingPriority + OutgoingPriority>(
-        &mut self,
-        srv: S,
-    ) {
-        let srv = Rc::new(RefCell::new(srv));
-
-        self.incoming_cache = RefCell::new(None);
-        self.outgoing_cache = RefCell::new(None);
-
-        if !<<S as IncomingPriority>::Priority as AbstractServicePriority>::NEVER {
-            self.incoming_order
-                .add_priority::<<S as IncomingPriority>::Priority>(srv.clone());
-        }
-        if !<<S as OutgoingPriority>::Priority as AbstractServicePriority>::NEVER {
-            self.outgoing_order
-                .add_priority::<<S as OutgoingPriority>::Priority>(srv.clone());
-        }
-        self.service_map.insert(TypeId::of::<S>(), srv.clone());
-        self.services.push(srv);
+        srv.register_msgs(&mut self.message_registry);
+        srv.register_channels(&mut self.eroot);
+
+        self.service_map
+            .insert(TypeId::of::<S>(), Box::new(srv.clone()));
+        self.services.push(Box::new(srv));
     }
 
-    pub fn with_service<S: Service + 'static, R, F: FnOnce(&S) -> R>(&self, f: F) -> R {
+    pub fn with_service<S: 'static, R, F: FnOnce(&S) -> R>(&self, f: F) -> R
+    where
+        Rc<S>: Service,
+    {
         let id = TypeId::of::<S>();
 
         self.service_map
             .get(&id)
             .expect("asked for service that doesn't exist!")
-            .borrow()
             .deref()
             .as_any()
             .downcast_ref()
             .map(f)
             .unwrap()
     }
+
+    pub fn event_root(&self) -> &EventRoot {
+        &self.eroot
+    }
 }
 
 impl ServiceStack {
-    pub(crate) fn process_incoming(&self, api: &dyn ServiceAPI, packet: &mut Packet) {
-        let mut cache = self.incoming_cache.borrow_mut();
-        let services = cache.get_or_insert_with(|| self.incoming_order.clone().order());
-
-        for srv in services {
-            if packet.is_clear() { break }
-            srv.as_ref()
-                .borrow_mut()
-                .deref_mut()
-                .process_incoming(api, packet);
-        }
+    pub(crate) fn process_incoming(&self, api: &crate::Fleck, packet: Packet) {
+        self.eroot
+            .priority_channel::<core::ReceivePacketChannel>()
+            .queue(packet);
+        self.eroot.fire_all(api);
     }
 
-    pub(crate) fn process_outgoing(&self, api: &dyn ServiceAPI, packet: &mut Packet) {
-        let mut cache = self.outgoing_cache.borrow_mut();
-        let services = cache.get_or_insert_with(|| self.outgoing_order.clone().order());
-
-        for srv in services {
-            if packet.is_clear() { break }
-            srv.as_ref()
-                .borrow_mut()
-                .deref_mut()
-                .process_outgoing(api, packet);
-        }
+    pub(crate) fn process_minor_tick(&self, api: &crate::Fleck) {
+        self.eroot.channel::<core::MinorTickChannel>().queue(());
+        self.eroot.fire_all(api);
     }
 
-    pub(crate) fn process_minor_tick(&self, api: &dyn ServiceAPI) {
-        for srv in &self.services {
-            srv.as_ref()
-                .borrow_mut()
-                .deref_mut()
-                .process_minor_tick(api);
-        }
+    pub(crate) fn process_major_tick(&self, api: &crate::Fleck) {
+        self.eroot.channel::<core::MajorTickChannel>().queue(());
+        self.eroot.fire_all(api);
     }
 
-    pub(crate) fn process_major_tick(&self, api: &dyn ServiceAPI) {
-        for srv in &self.services {
-            srv.as_ref()
-                .borrow_mut()
-                .deref_mut()
-                .process_major_tick(api);
-        }
+    pub(crate) fn create_io_channels(&mut self) {
+        self.eroot
+            .create_priority_channel::<core::ReceivePacketChannel>();
+        self.eroot.create_channel::<core::MinorTickChannel>();
+        self.eroot.create_channel::<core::MajorTickChannel>();
     }
 }

+ 409 - 0
fleck/src/service/event.rs

@@ -0,0 +1,409 @@
+use std::cell::RefCell;
+use std::{
+    collections::{HashMap, VecDeque},
+    rc::{Rc, Weak},
+};
+
+use super::priority::AbstractServicePriority;
+
+pub struct Event<Data: 'static> {
+    pub data: Data,
+    pub emptied: bool,
+}
+
+struct ConcreteEventSub<Host, Context: 'static + ?Sized, Data: 'static> {
+    host: Weak<Host>,
+    callback: Box<dyn Fn(&Host, &Context, &mut Event<Data>)>,
+}
+
+trait EventSub<Context: 'static + ?Sized, Data: 'static> {
+    fn is_healthy(&self) -> bool;
+    fn invoke(&self, context: &Context, data: &mut Event<Data>);
+}
+
+impl<Host: 'static, Context: 'static + ?Sized, Data: 'static> EventSub<Context, Data>
+    for ConcreteEventSub<Host, Context, Data>
+{
+    fn is_healthy(&self) -> bool {
+        self.host.strong_count() > 0
+    }
+    fn invoke(&self, context: &Context, data: &mut Event<Data>) {
+        self.host
+            .upgrade()
+            .map(|h| (self.callback)(h.as_ref(), context, data));
+    }
+}
+
+pub struct EventChannel<Tag: 'static, Context: 'static + ?Sized, Data: 'static> {
+    subs: RefCell<Vec<Box<dyn EventSub<Context, Data>>>>,
+
+    queue: RefCell<VecDeque<Data>>,
+
+    _ghost: std::marker::PhantomData<(Tag,)>,
+}
+
+impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> Default
+    for EventChannel<Tag, Context, Data>
+{
+    fn default() -> Self {
+        Self {
+            subs: RefCell::new(Vec::new()),
+            queue: Default::default(),
+            _ghost: std::marker::PhantomData,
+        }
+    }
+}
+
+impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> EventChannel<Tag, Context, Data> {
+    pub fn subscribe<
+        Host: 'static,
+        HC: crate::helper::IntoWeak<Host>,
+        CB: Fn(&Host, &Context, &mut Event<Data>) + 'static,
+    >(
+        &self,
+        who: HC,
+        cb: CB,
+    ) {
+        self.subs.borrow_mut().push(Box::new(ConcreteEventSub {
+            host: who.as_weak(),
+            callback: Box::new(cb),
+        }));
+    }
+
+    pub fn queue(&self, data: Data) {
+        self.queue.borrow_mut().push_back(data);
+    }
+
+    fn do_fire_all(&self, context: &Context) -> usize {
+        let mut count = 0;
+
+        while !self.queue.borrow().is_empty() {
+            let mut event = Event {
+                data: self.queue.borrow_mut().pop_front().unwrap(),
+                emptied: false,
+            };
+
+            for sub in self.subs.borrow().iter() {
+                sub.invoke(context, &mut event);
+                if event.emptied {
+                    break;
+                }
+            }
+
+            count += 1;
+        }
+
+        count
+    }
+}
+
+pub struct PriorityChannel<Tag: 'static, Context: 'static + ?Sized, Data: 'static> {
+    subs: RefCell<super::priority::TotalOrder<Rc<dyn EventSub<Context, Data>>>>,
+    order_cache: RefCell<Option<Vec<Rc<dyn EventSub<Context, Data>>>>>,
+
+    queue: RefCell<VecDeque<Data>>,
+
+    _ghost: std::marker::PhantomData<(Tag,)>,
+}
+
+impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> Default
+    for PriorityChannel<Tag, Context, Data>
+{
+    fn default() -> Self {
+        Self {
+            subs: RefCell::new(Default::default()),
+            order_cache: Default::default(),
+            queue: RefCell::new(Default::default()),
+            _ghost: std::marker::PhantomData,
+        }
+    }
+}
+
+pub struct PrioritySubscriptionBuilder<
+    'l,
+    Tag: 'static,
+    Context: 'static + ?Sized,
+    Data: 'static,
+    Priority: AbstractServicePriority,
+> {
+    parent: &'l PriorityChannel<Tag, Context, Data>,
+    _ghost: std::marker::PhantomData<Priority>,
+}
+
+impl<
+        'l,
+        Tag: 'static,
+        Context: 'static + ?Sized,
+        Data: 'static,
+        Priority: AbstractServicePriority,
+    > PrioritySubscriptionBuilder<'l, Tag, Context, Data, Priority>
+{
+    pub fn subscribe<
+        Host: 'static,
+        HC: crate::helper::IntoWeak<Host>,
+        CB: Fn(&Host, &Context, &mut Event<Data>) + 'static,
+    >(
+        &self,
+        who: HC,
+        cb: CB,
+    ) {
+        self.parent.subscribe::<Priority, Host, HC, CB>(who, cb);
+    }
+}
+
+impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> PriorityChannel<Tag, Context, Data> {
+    pub fn subscribe<
+        P: super::AbstractServicePriority,
+        Host: 'static,
+        HC: crate::helper::IntoWeak<Host>,
+        CB: Fn(&Host, &Context, &mut Event<Data>) + 'static,
+    >(
+        &self,
+        who: HC,
+        cb: CB,
+    ) {
+        let sub = Rc::new(ConcreteEventSub {
+            host: who.as_weak(),
+            callback: Box::new(cb),
+        });
+
+        self.subs.borrow_mut().add_priority::<P>(sub);
+        self.order_cache.borrow_mut().take();
+    }
+
+    pub fn with<P: AbstractServicePriority>(
+        &self,
+    ) -> PrioritySubscriptionBuilder<Tag, Context, Data, P> {
+        PrioritySubscriptionBuilder {
+            parent: self,
+            _ghost: std::marker::PhantomData,
+        }
+    }
+
+    pub fn queue(&self, data: Data) {
+        self.queue.borrow_mut().push_back(data);
+    }
+
+    fn do_fire_all(&self, context: &Context) -> usize {
+        let mut cache = self.order_cache.borrow_mut();
+        let subs = cache.get_or_insert_with(|| self.subs.borrow().clone().order());
+
+        let mut count = 0;
+
+        while !self.queue.borrow().is_empty() {
+            let mut event = Event {
+                data: self.queue.borrow_mut().pop_front().unwrap(),
+                emptied: false,
+            };
+
+            for sub in subs.iter() {
+                sub.invoke(context, &mut event);
+                if event.emptied {
+                    break;
+                }
+            }
+
+            count += 1;
+        }
+
+        count
+    }
+}
+
+trait ChannelMetadata<Context: 'static + ?Sized> {
+    fn queue_len(&self) -> usize;
+    fn fire_all(&self, context: &Context) -> usize;
+}
+
+impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> ChannelMetadata<Context>
+    for EventChannel<Tag, Context, Data>
+{
+    fn queue_len(&self) -> usize {
+        self.queue.borrow().len()
+    }
+
+    fn fire_all(&self, context: &Context) -> usize {
+        self.do_fire_all(context)
+    }
+}
+
+impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> ChannelMetadata<Context>
+    for PriorityChannel<Tag, Context, Data>
+{
+    fn queue_len(&self) -> usize {
+        self.queue.borrow().len()
+    }
+
+    fn fire_all(&self, context: &Context) -> usize {
+        self.do_fire_all(context)
+    }
+}
+
+pub trait ChannelSpec {
+    type Tag: 'static;
+    type Data: 'static;
+}
+
+impl<Tag: 'static, Data: 'static> ChannelSpec for (Tag, Data) {
+    type Tag = Tag;
+    type Data = Data;
+}
+
+pub struct EventRoot<Context: 'static + ?Sized> {
+    channels: HashMap<std::any::TypeId, Rc<dyn std::any::Any>>,
+    metadata: HashMap<std::any::TypeId, (&'static str, Rc<dyn ChannelMetadata<Context>>)>,
+    _ghost: std::marker::PhantomData<Context>,
+}
+
+impl<Context: 'static + ?Sized> Default for EventRoot<Context> {
+    fn default() -> Self {
+        Self {
+            channels: Default::default(),
+            metadata: Default::default(),
+            _ghost: std::marker::PhantomData,
+        }
+    }
+}
+
+impl<Context: 'static + ?Sized> EventRoot<Context> {
+    pub fn create_channel<CS: ChannelSpec>(&mut self) {
+        let tid = std::any::TypeId::of::<EventChannel<CS::Tag, Context, CS::Data>>();
+
+        let ch = Rc::new(EventChannel::<CS::Tag, Context, CS::Data>::default());
+        self.metadata
+            .insert(tid, (std::any::type_name::<CS>(), ch.clone()));
+        self.channels.insert(tid, ch);
+    }
+
+    pub fn create_priority_channel<CS: ChannelSpec>(&mut self) {
+        let tid = std::any::TypeId::of::<PriorityChannel<CS::Tag, Context, CS::Data>>();
+
+        let ch = Rc::new(PriorityChannel::<CS::Tag, Context, CS::Data>::default());
+        self.metadata
+            .insert(tid, (std::any::type_name::<CS>(), ch.clone()));
+        self.channels.insert(tid, ch);
+    }
+
+    pub fn channel<CS: ChannelSpec>(&self) -> &EventChannel<CS::Tag, Context, CS::Data> {
+        let tid = std::any::TypeId::of::<EventChannel<CS::Tag, Context, CS::Data>>();
+
+        match self.channels.get(&tid) {
+            Some(ch) => ch.downcast_ref().expect("internal inconsistency"),
+            None => {
+                panic!("Asked for channel that has not been created!")
+            }
+        }
+    }
+
+    pub fn priority_channel<CS: ChannelSpec>(
+        &self,
+    ) -> &PriorityChannel<CS::Tag, Context, CS::Data> {
+        let tid = std::any::TypeId::of::<PriorityChannel<CS::Tag, Context, CS::Data>>();
+
+        match self.channels.get(&tid) {
+            Some(ch) => ch.downcast_ref().expect("internal inconsistency"),
+            None => {
+                panic!("Asked for priority channel that has not been created!")
+            }
+        }
+    }
+
+    pub(crate) fn fire_all(&self, context: &Context) {
+        let mut any = true;
+        while any {
+            any = false;
+
+            for ch in &self.metadata {
+                let count = ch.1 .1.fire_all(context);
+                if count > 0 {
+                    log::trace!("Queue {} processed {} event(s)", ch.1 .0, count);
+                }
+                any = any || count > 0;
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::ops::Deref;
+    use std::{
+        cell::{Cell, RefCell},
+        rc::Rc,
+    };
+
+    use super::{Event, EventRoot};
+    use crate::service::order;
+
+    struct Receiver {
+        int_count: Cell<usize>,
+    }
+
+    impl Receiver {
+        fn receive_i32(&self, _ctx: &(), _val: &mut Event<i32>) {
+            self.int_count.set(self.int_count.get() + 1);
+        }
+    }
+
+    struct OrderedReceiver {
+        id: i32,
+        order: Rc<RefCell<Vec<i32>>>,
+    }
+
+    impl OrderedReceiver {
+        fn receive_i32(&self, _ctx: &(), _val: &mut Event<i32>) {
+            self.order.borrow_mut().push(self.id)
+        }
+    }
+
+    struct IntTag;
+
+    type IntChannel = (IntTag, i32);
+
+    #[test]
+    fn simple_fire() {
+        let mut root = EventRoot::default();
+
+        root.create_channel::<IntChannel>();
+
+        let recv = Rc::new(Receiver {
+            int_count: Cell::new(0),
+        });
+
+        root.channel::<IntChannel>()
+            .subscribe(&recv, Receiver::receive_i32);
+        root.channel::<IntChannel>().queue(0i32);
+        assert_eq!(recv.int_count.get(), 0);
+        root.channel::<IntChannel>().do_fire_all(&());
+        assert_eq!(recv.int_count.get(), 1);
+    }
+
+    #[test]
+    fn priority_fire() {
+        let mut root = EventRoot::default();
+
+        root.create_priority_channel::<IntChannel>();
+
+        let order = Rc::new(RefCell::new(Vec::new()));
+
+        let recv1 = Rc::new(OrderedReceiver {
+            id: 1,
+            order: order.clone(),
+        });
+        let recv2 = Rc::new(OrderedReceiver {
+            id: 2,
+            order: order.clone(),
+        });
+
+        root.priority_channel::<IntChannel>()
+            .with::<order::First>()
+            .subscribe(&recv1, OrderedReceiver::receive_i32);
+        root.priority_channel::<IntChannel>()
+            .with::<order::Last>()
+            .subscribe(&recv2, OrderedReceiver::receive_i32);
+        root.priority_channel::<IntChannel>().queue(0i32);
+        assert_eq!(order.borrow().deref(), &vec![]);
+        root.priority_channel::<IntChannel>().do_fire_all(&());
+        assert_eq!(order.borrow().deref(), &vec![1, 2]);
+    }
+}

+ 51 - 32
fleck/src/service/lowlevel.rs

@@ -1,20 +1,27 @@
-use serde::{Serialize, Deserialize};
+use serde::{Deserialize, Serialize};
+use std::rc::Rc;
 
-use super::{order, Priority, Service, ServiceAPI};
+use super::core::MajorTickChannel;
+use super::event::Event;
+use super::{order, BetweenPriority, Service};
 use crate::io::Packet;
 
 use crate as fleck;
 use crate::msg::MessageParams;
 
-crate::service_priority!(
-    ParsePacket,
-    in: Priority<order::First, order::Preprocessing>
-);
 #[derive(Default)]
 pub(crate) struct ParsePacket {}
 
-impl Service for ParsePacket {
-    fn process_incoming(&mut self, api: &dyn ServiceAPI, _packet: &mut Packet) {
+impl ParsePacket {}
+
+impl Service for Rc<ParsePacket> {
+    fn register_channels(&self, eroot: &mut super::EventRoot) {
+        // eroot.channel::<>().subscribe(self, Self::process_incoming);
+    }
+}
+
+impl ParsePacket {
+    fn process_incoming(&mut self, api: &crate::Fleck, _packet: &mut Packet) {
         // try deserializing
         // get message registry from api
         // remove data from packet, add parsed Msg instead
@@ -22,27 +29,36 @@ impl Service for ParsePacket {
     }
 }
 
-crate::service_priority!(
-    SendPacket,
-    out: Priority<order::Postprocessing, order::Last>
-);
+pub struct SendPacketTag {}
+pub type SendPacketChannel = (SendPacketTag, crate::io::Packet);
+
 #[derive(Default)]
 pub(crate) struct SendPacket {}
 
-impl Service for SendPacket {
-    fn process_outgoing(&mut self, api: &dyn ServiceAPI, packet: &mut crate::io::Packet) {
+impl Service for Rc<SendPacket> {
+    fn register_channels(&self, eroot: &mut super::EventRoot) {
+        eroot.create_priority_channel::<SendPacketChannel>();
+        eroot
+            .priority_channel::<SendPacketChannel>()
+            .with::<order::Last>()
+            .subscribe(self, |s, api, packet| s.process_outgoing(api, packet));
+    }
+}
+
+impl SendPacket {
+    fn process_outgoing(&self, api: &crate::Fleck, packet: &mut Event<crate::io::Packet>) {
         // use default channel if not specified
-        if packet.io_channel.is_none() {
-            packet.io_channel = Some(api.raw_io().global().clone());
+        if packet.data.io_channel.is_none() {
+            packet.data.io_channel = Some(api.raw_io().global().clone());
         }
 
         // serialize if needed
-        if let (false, Some(msg)) = (packet.data.as_ref().is_some(), packet.msg.take()) {
-            packet.data = Some(bincode::serialize(&msg).expect("failed to serialize message"));
+        if let (false, Some(msg)) = (packet.data.data.as_ref().is_some(), packet.data.msg.take()) {
+            packet.data.data = Some(bincode::serialize(&msg).expect("failed to serialize message"));
         }
 
-        match &mut packet.io_channel {
-            Some(ch) => ch.clone().send_packet(packet),
+        match &mut packet.data.io_channel {
+            Some(ch) => ch.clone().send_packet(&mut packet.data),
             _ => {
                 println!("tried to send packet with no channel?");
             }
@@ -50,14 +66,12 @@ impl Service for SendPacket {
     }
 }
 
-crate::service_priority!(LocalDiscovery, out: Priority<order::Processing, order::Postprocessing>);
 #[derive(Default)]
 pub(crate) struct LocalDiscovery;
 
-
 #[derive(Serialize, Deserialize)]
 struct DiscoveryMsg {
-    pkey: crate::crypto::PublicKey
+    pkey: crate::crypto::PublicKey,
 }
 
 impl MessageParams for DiscoveryMsg {
@@ -65,25 +79,30 @@ impl MessageParams for DiscoveryMsg {
     const ENCRYPTED: bool = false;
 }
 
-impl Service for LocalDiscovery {
+impl Service for Rc<LocalDiscovery> {
     fn register_msgs(&self, registry: &mut fleck::msg::MessageRegistry) {
         registry.add_message_type::<DiscoveryMsg>();
     }
 
-    fn process_incoming(&mut self, api: &dyn ServiceAPI, packet: &mut Packet) {
-
+    fn register_channels(self: &Self, eroot: &mut super::EventRoot) {
+        log::info!("LocalDiscovery registering channels...");
+        eroot
+            .channel::<MajorTickChannel>()
+            .subscribe(self, LocalDiscovery::process_major_tick);
     }
+}
 
-    fn process_major_tick(&mut self, api: &dyn ServiceAPI) {
-        api.services().with_service(|n: &super::nodes::Nodes| {
-            
-        });
+impl LocalDiscovery {
+    fn process_major_tick(&self, api: &crate::Fleck, _: &mut Event<()>) {
+        log::info!("LocalDiscovery major tick!");
 
-        api.send_packet(Packet {
+        api.queue_priority::<SendPacketChannel>(Packet {
             addr: Some((*crate::io::MULTICAST_ADDRESS).into()),
             data: None,
             io_channel: Some(api.raw_io().local().clone()),
-            msg: Some(crate::msg::Message::build(DiscoveryMsg { pkey: Default::default() })),
+            msg: Some(crate::msg::Message::build(DiscoveryMsg {
+                pkey: Default::default(),
+            })),
         });
     }
 }

+ 5 - 9
fleck/src/service/nodes.rs

@@ -1,7 +1,8 @@
+use std::rc::Rc;
+
 use super::Service;
-use crate as fleck;
 
-use crate::crypto::{PublicKey,Keypair};
+use crate::crypto::{Keypair, PublicKey};
 
 pub struct Node {
     addr: Option<std::net::SocketAddr>,
@@ -9,11 +10,8 @@ pub struct Node {
     keypair: Option<Keypair>,
 }
 
-crate::service_priority!(Nodes);
 #[derive(Default)]
-pub struct Nodes {
-    
-}
+pub struct Nodes {}
 
 impl Nodes {
     pub fn self_node(&self) -> &Node {
@@ -25,6 +23,4 @@ impl Nodes {
     }
 }
 
-impl Service for Nodes {
-
-}
+impl Service for Rc<Nodes> {}