Browse Source

Reorganization and removal of cumbersome priority system.

Kestrel 2 years ago
parent
commit
9e2b1e5bbd

+ 5 - 7
fleck/src/crypto.rs

@@ -44,9 +44,8 @@ pub(crate) struct VerifyPacketSignature {}
 impl Service for std::rc::Rc<VerifyPacketSignature> {
     fn register_channels(&self, eroot: &mut EventRoot) {
         eroot
-            .priority_channel::<fleck_core::ReceivePacketChannel>()
-            .with::<order::Preprocessing>()
-            .subscribe(self, VerifyPacketSignature::check);
+            .channel::<fleck_core::ReceivePacketChannel>()
+            .subscribe(fleck_core::ReceiveOrder::Verify, self, VerifyPacketSignature::check);
     }
 }
 
@@ -66,7 +65,7 @@ impl VerifyPacketSignature {
                 message.crypto_header = PacketHeader::NoCrypto;
                 let to_verify = bincode::serialize(&message).expect("assume that we can re-serialize the message");
 
-                signature.verify(to_verify, signature);
+                // signature.verify(to_verify, signature);
                 
                 // let pubkey = api.services().with_service(|ns: &fleck_core::Nodes| { ns.self_node().pubkey() });
 
@@ -85,9 +84,8 @@ pub(crate) struct SignPacket {}
 impl Service for std::rc::Rc<SignPacket> {
     fn register_channels(&self, eroot: &mut EventRoot) {
         eroot
-            .priority_channel::<fleck_core::SendPacketChannel>()
-            .with::<order::Between<order::Postprocessing, order::Last>>()
-            .subscribe(self, SignPacket::sign);
+            .channel::<fleck_core::SendPacketChannel>()
+            .subscribe(fleck_core::SendOrder::Sign, self, SignPacket::sign);
     }
 }
 

+ 34 - 0
fleck/src/fleck_core.rs

@@ -0,0 +1,34 @@
+mod lowlevel;
+mod nodes;
+
+pub use nodes::Node;
+pub use nodes::Nodes;
+
+pub mod channel_tags {
+    pub struct SendPacketTag {}
+    pub struct ReceivePacketTag {}
+    pub struct MinorTickTag {}
+    pub struct MajorTickTag {}
+}
+
+#[derive(PartialEq, PartialOrd)]
+pub enum SendOrder {
+    Serialize,
+    Sign,
+    Encrypt,
+    Send,
+}
+pub type SendPacketChannel = (channel_tags::SendPacketTag, crate::io::Packet, SendOrder);
+#[derive(PartialEq, PartialOrd)]
+pub enum ReceiveOrder {
+    Decryption,
+    Verify,
+    Parse,
+    Dispatch,
+}
+pub type ReceivePacketChannel = (channel_tags::ReceivePacketTag, crate::io::Packet, ReceiveOrder);
+pub type MinorTickChannel = (channel_tags::MinorTickTag, ());
+pub type MajorTickChannel = (channel_tags::MajorTickTag, ());
+
+pub(crate) use lowlevel::LocalDiscovery;
+pub(crate) use lowlevel::SendPacket;

+ 94 - 0
fleck/src/fleck_core/lowlevel.rs

@@ -0,0 +1,94 @@
+use crate::prelude::*;
+
+use serde::{Deserialize, Serialize};
+use std::rc::Rc;
+
+#[derive(Default)]
+pub(crate) struct SendPacket {}
+
+impl Service for Rc<SendPacket> {
+    fn register_channels(&self, eroot: &mut EventRoot) {
+        eroot
+            .channel::<fleck_core::SendPacketChannel>()
+            // .with::<order::Last>()
+            .subscribe(fleck_core::SendOrder::Send, self, SendPacket::process_outgoing);
+    }
+}
+
+impl SendPacket {
+    fn process_outgoing(&self, api: &crate::Fleck, packet: &mut FleckEvent<FleckPacket>) {
+        // use default channel if not specified
+        if packet.data.io_channel.is_none() {
+            packet.data.io_channel = Some(api.raw_io().global().clone());
+        }
+
+        // serialize if needed
+        if let (false, Some(msg)) = (packet.data.data.as_ref().is_some(), packet.data.msg.take()) {
+            packet.data.data = Some(bincode::serialize(&msg).expect("failed to serialize message"));
+        }
+
+        match &mut packet.data.io_channel {
+            Some(ch) => ch.clone().send_packet(&mut packet.data),
+            _ => {
+                println!("tried to send packet with no channel?");
+            }
+        }
+    }
+}
+
+#[derive(Default)]
+pub(crate) struct LocalDiscovery;
+
+#[derive(Serialize, Deserialize, Debug)]
+struct DiscoveryMsg {
+    pkey: crate::crypto::PublicKey,
+}
+
+impl MessageParams for DiscoveryMsg {
+    const NAME: &'static str = "DiscoveryMsg";
+    const ENCRYPTED: bool = false;
+}
+
+impl Service for Rc<LocalDiscovery> {
+    fn register_msgs(&self, registry: &mut crate::msg::MessageRegistry) {
+        registry.add_message_type::<DiscoveryMsg>();
+    }
+
+    fn register_channels(self: &Self, eroot: &mut EventRoot) {
+        eroot
+            .channel::<fleck_core::MajorTickChannel>()
+            .subscribe(self, LocalDiscovery::process_major_tick);
+        eroot
+            .channel::<fleck_core::ReceivePacketChannel>()
+            .subscribe(fleck_core::ReceiveOrder::Dispatch, self, LocalDiscovery::packet);
+    }
+}
+
+impl LocalDiscovery {
+    fn packet(&self, api: &crate::Fleck, pkt: &mut FleckEvent<FleckPacket>) {
+        if let Some(discovery) = pkt.data.as_msg::<DiscoveryMsg>() {
+            log::trace!("received discovery message");
+
+            // this will automatically ignore self-messages because we already know the pubkey
+            api.with_service(|nodes: &fleck_core::Nodes| {
+                if nodes.node_by_pubkey(&discovery.pkey).is_none() {
+                    let node = fleck_core::Node::build_with_addr(pkt.data.addr.unwrap());
+                    node.set_pubkey(discovery.pkey);
+                    nodes.inform_of(node);
+                }
+            });
+        }
+    }
+
+    fn process_major_tick(&self, api: &crate::Fleck, _: &mut FleckEvent<()>) {
+        api.queue::<fleck_core::SendPacketChannel>(FleckPacket {
+            addr: Some((*crate::io::MULTICAST_ADDRESS).into()),
+            data: None,
+            io_channel: Some(api.raw_io().local().clone()),
+            node: None,
+            msg: Some(crate::msg::Message::build(DiscoveryMsg {
+                pkey: api.with_service(|nodes: &fleck_core::Nodes| nodes.self_node().pubkey().unwrap().to_owned())
+            })),
+        });
+    }
+}

+ 1 - 1
fleck/src/service/nodes.rs → fleck/src/fleck_core/nodes.rs

@@ -2,7 +2,7 @@ use rand::prelude::*;
 
 use std::{rc::Rc, collections::{HashMap, HashSet}, cell::{RefCell, Ref}};
 
-use super::Service;
+use crate::prelude::*;
 
 use crate::crypto::{Keypair, PublicKey};
 

+ 1 - 1
fleck/src/io.rs

@@ -22,7 +22,7 @@ pub struct Packet {
     pub(crate) addr: Option<std::net::SocketAddr>,
     pub(crate) data: Option<Vec<u8>>,
     pub(crate) io_channel: Option<std::rc::Rc<dyn IOChannel>>,
-    pub node: Option<crate::service::core::Node>,
+    pub node: Option<crate::fleck_core::Node>,
     pub msg: Option<crate::msg::Message>,
 }
 

+ 8 - 6
fleck/src/lib.rs

@@ -7,14 +7,16 @@ mod crypto;
 mod helper;
 mod io;
 pub mod service;
+pub mod fleck_core;
 
 pub mod prelude {
     pub use crate::helper::{AsAny, IntoWeak};
     pub use crate::io::Packet as FleckPacket;
-    pub use crate::msg::Message;
+    pub use crate::msg::{Message,MessageParams};
     pub use crate::service::{
-        core as fleck_core, event::Event as FleckEvent, order, ChannelSpec, EventRoot, Service,
+        event::Event as FleckEvent, ChannelSpec, EventRoot, Service,
     };
+    pub use crate::fleck_core;
 }
 
 use prelude::*;
@@ -41,13 +43,13 @@ impl Fleck {
         // initial incoming/minor/major channels
         svcs.create_io_channels();
         // Node registration
-        svcs.give_service::<service::core::Nodes>();
+        svcs.give_service::<fleck_core::Nodes>();
         // Local discovery
-        svcs.give_service::<service::core::LocalDiscovery>();
+        svcs.give_service::<fleck_core::LocalDiscovery>();
         // Parsing incoming packets
         svcs.give_service::<msg::ParsePacket>();
         // Actually sending packets
-        svcs.give_service::<service::core::SendPacket>();
+        svcs.give_service::<fleck_core::SendPacket>();
         // Signing packets
         svcs.give_service::<crypto::SignPacket>();
     }
@@ -88,7 +90,7 @@ impl Fleck {
         self.services
             .borrow()
             .event_root()
-            .priority_channel::<CS>()
+            .channel::<CS>()
             .queue(data);
     }
 }

+ 3 - 3
fleck/src/msg.rs

@@ -114,9 +114,9 @@ impl ParsePacket {}
 impl Service for std::rc::Rc<ParsePacket> {
     fn register_channels(&self, eroot: &mut super::EventRoot) {
         eroot
-            .priority_channel::<fleck_core::ReceivePacketChannel>()
-            .with::<order::Preprocessing>()
-            .subscribe(self, ParsePacket::process_incoming);
+            .channel::<fleck_core::ReceivePacketChannel>()
+            // .with::<order::Preprocessing>()
+            .subscribe(fleck_core::ReceiveOrder::Parse, self, ParsePacket::process_incoming);
     }
 }
 

+ 0 - 8
fleck/src/node.rs

@@ -1,8 +0,0 @@
-pub struct NodeID(u64);
-pub struct NodeKey(Vec<u8>);
-
-pub struct FleckNode {
-    id: Option<NodeID>,
-    addr: Option<std::net::IpAddr>,
-    key: Option<NodeKey>,
-}

+ 8 - 55
fleck/src/service.rs

@@ -4,59 +4,12 @@ use std::ops::Deref;
 use std::rc::Rc;
 
 use crate::io::Packet;
+use crate::fleck_core;
 
 pub(crate) mod event;
-mod lowlevel;
-mod nodes;
-mod priority;
-
-pub use priority::{Never, ServicePriority as BetweenPriority};
 
 pub use event::ChannelSpec;
 
-use self::priority::AbstractServicePriority;
-
-pub mod order {
-    pub use super::priority::{Never, ServicePriority as Between};
-
-    pub struct FirstTag;
-    pub type First = super::priority::Fixpoint<FirstTag>;
-
-    pub type Preprocessing = Between<First, Processing>;
-    pub type Processing = Between<First, Last>;
-    pub type Postprocessing = Between<First, Processing>;
-
-    pub struct LastTag;
-    pub type Last = super::priority::Fixpoint<LastTag>;
-}
-
-pub mod core {
-    pub(crate) use super::lowlevel::{LocalDiscovery, SendPacket};
-
-    pub use super::nodes::{Node, Nodes};
-
-    pub mod channel_tags {
-        #[derive(Default)]
-        pub struct ReceivePacketTag {}
-        #[derive(Default)]
-        pub struct MinorTickTag {}
-        #[derive(Default)]
-        pub struct MajorTickTag {}
-    }
-
-    pub use super::lowlevel::SendPacketChannel;
-    pub mod receive_order {
-        use super::super::order;
-        pub type Decryption = order::First;
-        pub type Verify = order::Between<Decryption, Parse>;
-        pub type Parse = order::Preprocessing;
-        pub type Dispatch = order::Processing;
-    }
-    pub type ReceivePacketChannel = (channel_tags::ReceivePacketTag, crate::io::Packet);
-    pub type MinorTickChannel = (channel_tags::MinorTickTag, ());
-    pub type MajorTickChannel = (channel_tags::MajorTickTag, ());
-}
-
 pub type EventRoot = event::EventRoot<crate::Fleck>;
 
 #[allow(unused_variables)]
@@ -130,25 +83,25 @@ impl ServiceStack {
 impl ServiceStack {
     pub(crate) fn process_incoming(&self, api: &crate::Fleck, packet: Packet) {
         self.eroot
-            .priority_channel::<core::ReceivePacketChannel>()
+            .channel::<fleck_core::ReceivePacketChannel>()
             .queue(packet);
         self.eroot.fire_all(api);
     }
 
     pub(crate) fn process_minor_tick(&self, api: &crate::Fleck) {
-        self.eroot.channel::<core::MinorTickChannel>().queue(());
+        self.eroot.channel::<fleck_core::MinorTickChannel>().queue(());
         self.eroot.fire_all(api);
     }
 
     pub(crate) fn process_major_tick(&self, api: &crate::Fleck) {
-        self.eroot.channel::<core::MajorTickChannel>().queue(());
+        self.eroot.channel::<fleck_core::MajorTickChannel>().queue(());
         self.eroot.fire_all(api);
     }
 
     pub(crate) fn create_io_channels(&mut self) {
-        self.eroot
-            .create_priority_channel::<core::ReceivePacketChannel>();
-        self.eroot.create_channel::<core::MinorTickChannel>();
-        self.eroot.create_channel::<core::MajorTickChannel>();
+        self.eroot.create_channel::<fleck_core::SendPacketChannel>();
+        self.eroot.create_channel::<fleck_core::ReceivePacketChannel>();
+        self.eroot.create_channel::<fleck_core::MinorTickChannel>();
+        self.eroot.create_channel::<fleck_core::MajorTickChannel>();
     }
 }

+ 83 - 132
fleck/src/service/event.rs

@@ -4,8 +4,6 @@ use std::{
     rc::{Rc, Weak},
 };
 
-use super::priority::AbstractServicePriority;
-
 pub struct Event<Data: 'static> {
     pub data: Data,
     pub emptied: bool,
@@ -34,92 +32,29 @@ impl<Host: 'static, Context: 'static + ?Sized, Data: 'static> EventSub<Context,
     }
 }
 
-pub struct EventChannel<Tag: 'static, Context: 'static + ?Sized, Data: 'static> {
-    subs: RefCell<Vec<Box<dyn EventSub<Context, Data>>>>,
+pub struct Channel<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: 'static> {
+    /*subs: RefCell<super::priority::TotalOrder<Rc<dyn EventSub<Context, Data>>>>,
+    order_cache: RefCell<Option<Vec<Rc<dyn EventSub<Context, Data>>>>>,*/
+    subs: RefCell<Vec<(Priority, Rc<dyn EventSub<Context, Data>>)>>,
 
     queue: RefCell<VecDeque<Data>>,
 
-    _ghost: std::marker::PhantomData<(Tag,)>,
+    _ghost: std::marker::PhantomData<(Tag,Priority)>,
 }
 
-impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> Default
-    for EventChannel<Tag, Context, Data>
-{
-    fn default() -> Self {
-        Self {
-            subs: RefCell::new(Vec::new()),
-            queue: Default::default(),
-            _ghost: std::marker::PhantomData,
-        }
-    }
-}
-
-impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> EventChannel<Tag, Context, Data> {
-    pub fn subscribe<
-        Host: 'static,
-        HC: crate::helper::IntoWeak<Host>,
-        CB: Fn(&Host, &Context, &mut Event<Data>) + 'static,
-    >(
-        &self,
-        who: HC,
-        cb: CB,
-    ) {
-        self.subs.borrow_mut().push(Box::new(ConcreteEventSub {
-            host: who.as_weak(),
-            callback: Box::new(cb),
-        }));
-    }
-
-    pub fn queue(&self, data: Data) {
-        self.queue.borrow_mut().push_back(data);
-    }
-
-    fn do_fire_all(&self, context: &Context) -> usize {
-        let mut count = 0;
-
-        while !self.queue.borrow().is_empty() {
-            let mut event = Event {
-                data: self.queue.borrow_mut().pop_front().unwrap(),
-                emptied: false,
-            };
-
-            for sub in self.subs.borrow().iter() {
-                sub.invoke(context, &mut event);
-                if event.emptied {
-                    break;
-                }
-            }
-
-            count += 1;
-        }
-
-        count
-    }
-}
-
-pub struct PriorityChannel<Tag: 'static, Context: 'static + ?Sized, Data: 'static> {
-    subs: RefCell<super::priority::TotalOrder<Rc<dyn EventSub<Context, Data>>>>,
-    order_cache: RefCell<Option<Vec<Rc<dyn EventSub<Context, Data>>>>>,
-
-    queue: RefCell<VecDeque<Data>>,
-
-    _ghost: std::marker::PhantomData<(Tag,)>,
-}
-
-impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> Default
-    for PriorityChannel<Tag, Context, Data>
+impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: 'static> Default
+    for Channel<Tag, Context, Data, Priority>
 {
     fn default() -> Self {
         Self {
             subs: RefCell::new(Default::default()),
-            order_cache: Default::default(),
             queue: RefCell::new(Default::default()),
             _ghost: std::marker::PhantomData,
         }
     }
 }
 
-pub struct PrioritySubscriptionBuilder<
+/*pub struct PrioritySubscriptionBuilder<
     'l,
     Tag: 'static,
     Context: 'static + ?Sized,
@@ -150,10 +85,11 @@ impl<
         self.parent.subscribe::<Priority, Host, HC, CB>(who, cb);
     }
 }
+*/
 
-impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> PriorityChannel<Tag, Context, Data> {
+
+impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> Channel<Tag, Context, Data, NoPriorityTag> {
     pub fn subscribe<
-        P: super::AbstractServicePriority,
         Host: 'static,
         HC: crate::helper::IntoWeak<Host>,
         CB: Fn(&Host, &Context, &mut Event<Data>) + 'static,
@@ -167,27 +103,39 @@ impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> PriorityChannel<Tag
             callback: Box::new(cb),
         });
 
-        self.subs.borrow_mut().add_priority::<P>(sub);
-        self.order_cache.borrow_mut().take();
+        self.subs.borrow_mut().push((NoPriorityTag {}, sub));
     }
+}
 
-    pub fn with<P: AbstractServicePriority>(
+impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: PartialOrd> Channel<Tag, Context, Data, Priority> {
+    pub fn subscribe<
+        Host: 'static,
+        HC: crate::helper::IntoWeak<Host>,
+        CB: Fn(&Host, &Context, &mut Event<Data>) + 'static,
+    >(
         &self,
-    ) -> PrioritySubscriptionBuilder<Tag, Context, Data, P> {
-        PrioritySubscriptionBuilder {
-            parent: self,
-            _ghost: std::marker::PhantomData,
-        }
+        p: Priority,
+        who: HC,
+        cb: CB,
+    ) {
+        let sub = Rc::new(ConcreteEventSub {
+            host: who.as_weak(),
+            callback: Box::new(cb),
+        });
+
+        let mut subs = self.subs.borrow_mut();
+        subs.push((p.into(), sub));
+        // XXX: what happens if we actually get an undefined order?...
+        subs.sort_by(|a,b| a.0.partial_cmp(&b.0).unwrap());
     }
+}
 
+impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: 'static> Channel<Tag, Context, Data, Priority> {
     pub fn queue(&self, data: Data) {
         self.queue.borrow_mut().push_back(data);
     }
 
     fn do_fire_all(&self, context: &Context) -> usize {
-        let mut cache = self.order_cache.borrow_mut();
-        let subs = cache.get_or_insert_with(|| self.subs.borrow().clone().order());
-
         let mut count = 0;
 
         while !self.queue.borrow().is_empty() {
@@ -196,8 +144,8 @@ impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> PriorityChannel<Tag
                 emptied: false,
             };
 
-            for sub in subs.iter() {
-                sub.invoke(context, &mut event);
+            for sub in self.subs.borrow().iter() {
+                sub.1.invoke(context, &mut event);
                 if event.emptied {
                     break;
                 }
@@ -215,20 +163,8 @@ trait ChannelMetadata<Context: 'static + ?Sized> {
     fn fire_all(&self, context: &Context) -> usize;
 }
 
-impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> ChannelMetadata<Context>
-    for EventChannel<Tag, Context, Data>
-{
-    fn queue_len(&self) -> usize {
-        self.queue.borrow().len()
-    }
-
-    fn fire_all(&self, context: &Context) -> usize {
-        self.do_fire_all(context)
-    }
-}
-
-impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> ChannelMetadata<Context>
-    for PriorityChannel<Tag, Context, Data>
+impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: 'static> ChannelMetadata<Context>
+    for Channel<Tag, Context, Data, Priority>
 {
     fn queue_len(&self) -> usize {
         self.queue.borrow().len()
@@ -242,11 +178,21 @@ impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> ChannelMetadata<Con
 pub trait ChannelSpec {
     type Tag: 'static;
     type Data: 'static;
+    type Priority: 'static;
+}
+
+pub struct NoPriorityTag {}
+
+impl<Tag: 'static, Data: 'static, Priority: 'static + PartialOrd> ChannelSpec for (Tag, Data, Priority) {
+    type Tag = Tag;
+    type Data = Data;
+    type Priority = Priority;
 }
 
 impl<Tag: 'static, Data: 'static> ChannelSpec for (Tag, Data) {
     type Tag = Tag;
     type Data = Data;
+    type Priority = NoPriorityTag;
 }
 
 pub struct EventRoot<Context: 'static + ?Sized> {
@@ -267,24 +213,24 @@ impl<Context: 'static + ?Sized> Default for EventRoot<Context> {
 
 impl<Context: 'static + ?Sized> EventRoot<Context> {
     pub fn create_channel<CS: ChannelSpec>(&mut self) {
-        let tid = std::any::TypeId::of::<EventChannel<CS::Tag, Context, CS::Data>>();
+        let tid = std::any::TypeId::of::<Channel<CS::Tag, Context, CS::Data, CS::Priority>>();
 
-        let ch = Rc::new(EventChannel::<CS::Tag, Context, CS::Data>::default());
+        let ch = Rc::new(Channel::<CS::Tag, Context, CS::Data, CS::Priority>::default());
         self.metadata
             .insert(tid, (std::any::type_name::<CS>(), ch.clone()));
         self.channels.insert(tid, ch);
     }
 
-    pub fn create_priority_channel<CS: ChannelSpec>(&mut self) {
+    /*pub fn create_priority_channel<CS: ChannelSpec>(&mut self) {
         let tid = std::any::TypeId::of::<PriorityChannel<CS::Tag, Context, CS::Data>>();
 
         let ch = Rc::new(PriorityChannel::<CS::Tag, Context, CS::Data>::default());
         self.metadata
             .insert(tid, (std::any::type_name::<CS>(), ch.clone()));
         self.channels.insert(tid, ch);
-    }
+    }*/
 
-    pub fn channel<CS: ChannelSpec>(&self) -> &EventChannel<CS::Tag, Context, CS::Data> {
+    /*pub fn channel<CS: ChannelSpec>(&self) -> &EventChannel<CS::Tag, Context, CS::Data> {
         let tid = std::any::TypeId::of::<EventChannel<CS::Tag, Context, CS::Data>>();
 
         match self.channels.get(&tid) {
@@ -296,26 +242,20 @@ impl<Context: 'static + ?Sized> EventRoot<Context> {
                 )
             }
         }
-    }
+    }*/
 
-    pub fn priority_channel<CS: ChannelSpec>(
+    pub fn channel<CS: ChannelSpec>(
         &self,
-    ) -> &PriorityChannel<CS::Tag, Context, CS::Data> {
-        let tid = std::any::TypeId::of::<PriorityChannel<CS::Tag, Context, CS::Data>>();
+    ) -> &Channel<CS::Tag, Context, CS::Data, CS::Priority> {
+        let tid = std::any::TypeId::of::<Channel<CS::Tag, Context, CS::Data, CS::Priority>>();
 
         match self.channels.get(&tid) {
             Some(ch) => ch.downcast_ref().expect("internal inconsistency"),
             None => {
-                if self.channels.contains_key(&std::any::TypeId::of::<
-                    EventChannel<CS::Tag, Context, CS::Data>,
-                >()) {
-                    panic!("Asked for priority channel {}, which does not exist, but there is a non-priority channel!", std::any::type_name::<CS>())
-                } else {
-                    panic!(
-                        "Asked for priority channel {} that has not been created!",
-                        std::any::type_name::<CS>()
-                    )
-                }
+                panic!(
+                    "Asked for channel {} that has not been created!",
+                    std::any::type_name::<CS>()
+                )
             }
         }
     }
@@ -345,7 +285,6 @@ mod tests {
     };
 
     use super::{Event, EventRoot};
-    use crate::service::order;
 
     struct Receiver {
         int_count: Cell<usize>,
@@ -371,6 +310,14 @@ mod tests {
     struct IntTag;
     type IntChannel = (IntTag, i32);
 
+    #[derive(PartialEq, PartialOrd)]
+    enum IntPriority {
+        First,
+        Second,
+        Third
+    }
+    type IntPriorityChannel = (IntTag, i32, IntPriority);
+
     #[test]
     fn simple_fire() {
         let mut root = EventRoot::default();
@@ -393,7 +340,7 @@ mod tests {
     fn priority_fire() {
         let mut root = EventRoot::default();
 
-        root.create_priority_channel::<IntChannel>();
+        root.create_channel::<IntPriorityChannel>();
 
         let order = Rc::new(RefCell::new(Vec::new()));
 
@@ -405,16 +352,20 @@ mod tests {
             id: 2,
             order: order.clone(),
         });
+        let recv3 = Rc::new(OrderedReceiver {
+            id: 3,
+            order: order.clone(),
+        });
 
-        root.priority_channel::<IntChannel>()
-            .with::<order::First>()
-            .subscribe(&recv1, OrderedReceiver::receive_i32);
-        root.priority_channel::<IntChannel>()
-            .with::<order::Last>()
-            .subscribe(&recv2, OrderedReceiver::receive_i32);
-        root.priority_channel::<IntChannel>().queue(0i32);
+        root.channel::<IntPriorityChannel>()
+            .subscribe(IntPriority::Second, &recv2, OrderedReceiver::receive_i32);
+        root.channel::<IntPriorityChannel>()
+            .subscribe(IntPriority::First, &recv1, OrderedReceiver::receive_i32);
+        root.channel::<IntPriorityChannel>()
+            .subscribe(IntPriority::Third, &recv3, OrderedReceiver::receive_i32);
+        root.channel::<IntPriorityChannel>().queue(0i32);
         assert_eq!(order.borrow().deref(), &vec![]);
-        root.priority_channel::<IntChannel>().do_fire_all(&());
-        assert_eq!(order.borrow().deref(), &vec![1, 2]);
+        root.channel::<IntPriorityChannel>().do_fire_all(&());
+        assert_eq!(order.borrow().deref(), &vec![1, 2, 3]);
     }
 }

+ 0 - 105
fleck/src/service/lowlevel.rs

@@ -1,105 +0,0 @@
-use serde::{Deserialize, Serialize};
-use std::rc::Rc;
-
-use super::core::MajorTickChannel;
-use super::event::Event;
-use super::{order, BetweenPriority, Service};
-use crate::io::Packet;
-
-use crate as fleck;
-use crate::msg::MessageParams;
-
-pub struct SendPacketTag {}
-pub type SendPacketChannel = (SendPacketTag, crate::io::Packet);
-
-#[derive(Default)]
-pub(crate) struct SendPacket {}
-
-impl Service for Rc<SendPacket> {
-    fn register_channels(&self, eroot: &mut super::EventRoot) {
-        eroot.create_priority_channel::<SendPacketChannel>();
-        eroot
-            .priority_channel::<SendPacketChannel>()
-            .with::<order::Last>()
-            .subscribe(self, SendPacket::process_outgoing);
-    }
-}
-
-impl SendPacket {
-    fn process_outgoing(&self, api: &crate::Fleck, packet: &mut Event<crate::io::Packet>) {
-        // use default channel if not specified
-        if packet.data.io_channel.is_none() {
-            packet.data.io_channel = Some(api.raw_io().global().clone());
-        }
-
-        // serialize if needed
-        if let (false, Some(msg)) = (packet.data.data.as_ref().is_some(), packet.data.msg.take()) {
-            packet.data.data = Some(bincode::serialize(&msg).expect("failed to serialize message"));
-        }
-
-        match &mut packet.data.io_channel {
-            Some(ch) => ch.clone().send_packet(&mut packet.data),
-            _ => {
-                println!("tried to send packet with no channel?");
-            }
-        }
-    }
-}
-
-#[derive(Default)]
-pub(crate) struct LocalDiscovery;
-
-#[derive(Serialize, Deserialize, Debug)]
-struct DiscoveryMsg {
-    pkey: crate::crypto::PublicKey,
-}
-
-impl MessageParams for DiscoveryMsg {
-    const NAME: &'static str = "DiscoveryMsg";
-    const ENCRYPTED: bool = false;
-}
-
-impl Service for Rc<LocalDiscovery> {
-    fn register_msgs(&self, registry: &mut fleck::msg::MessageRegistry) {
-        registry.add_message_type::<DiscoveryMsg>();
-    }
-
-    fn register_channels(self: &Self, eroot: &mut super::EventRoot) {
-        eroot
-            .channel::<MajorTickChannel>()
-            .subscribe(self, LocalDiscovery::process_major_tick);
-        eroot
-            .priority_channel::<super::core::ReceivePacketChannel>()
-            .with::<order::Processing>()
-            .subscribe(self, LocalDiscovery::packet);
-    }
-}
-
-impl LocalDiscovery {
-    fn packet(&self, api: &crate::Fleck, pkt: &mut Event<Packet>) {
-        if let Some(discovery) = pkt.data.as_msg::<DiscoveryMsg>() {
-            log::trace!("received discovery message");
-
-            // this will automatically ignore self-messages because we already know the pubkey
-            api.with_service(|nodes: &super::core::Nodes| {
-                if nodes.node_by_pubkey(&discovery.pkey).is_none() {
-                    let node = super::core::Node::build_with_addr(pkt.data.addr.unwrap());
-                    node.set_pubkey(discovery.pkey);
-                    nodes.inform_of(node);
-                }
-            });
-        }
-    }
-
-    fn process_major_tick(&self, api: &crate::Fleck, _: &mut Event<()>) {
-        api.queue_priority::<SendPacketChannel>(Packet {
-            addr: Some((*crate::io::MULTICAST_ADDRESS).into()),
-            data: None,
-            io_channel: Some(api.raw_io().local().clone()),
-            node: None,
-            msg: Some(crate::msg::Message::build(DiscoveryMsg {
-                pkey: api.with_service(|nodes: &super::core::Nodes| nodes.self_node().pubkey().unwrap().to_owned())
-            })),
-        });
-    }
-}

+ 0 - 201
fleck/src/service/priority.rs

@@ -1,201 +0,0 @@
-use std::{
-    any::TypeId,
-    collections::{HashMap, HashSet},
-};
-
-pub trait AbstractServicePriority: 'static + Default {
-    /// What this priority comes strictly after
-    type After: AbstractServicePriority;
-    /// What this priority comes strictly before
-    type Before: AbstractServicePriority;
-
-    /// Is this a fully-abstract node that should be merged during processing?
-    const SKIP: bool = false;
-
-    /// Is this a Never sentinel type?
-    const NEVER: bool = false;
-}
-
-#[derive(Default)]
-pub struct ServicePriority<After: AbstractServicePriority, Before: AbstractServicePriority> {
-    _ghost: std::marker::PhantomData<(After, Before)>,
-}
-
-impl<After: AbstractServicePriority, Before: AbstractServicePriority> AbstractServicePriority
-    for ServicePriority<After, Before>
-{
-    type After = After;
-    type Before = Before;
-
-    const SKIP: bool = false;
-}
-
-#[derive(Default)]
-pub struct Never;
-
-pub struct MaxTag;
-pub struct MinTag;
-
-#[allow(unused)]
-pub type MinPriority = Fixpoint<MinTag>;
-#[allow(unused)]
-pub type MaxPriority = Fixpoint<MaxTag>;
-
-pub struct Fixpoint<Tag: 'static> {
-    _ghost: std::marker::PhantomData<Tag>,
-}
-
-impl AbstractServicePriority for Never {
-    type After = Self;
-    type Before = Self;
-
-    const NEVER: bool = true;
-}
-
-impl<Tag: 'static> Default for Fixpoint<Tag> {
-    fn default() -> Self {
-        Self {
-            _ghost: std::marker::PhantomData,
-        }
-    }
-}
-
-impl<Tag: 'static> AbstractServicePriority for Fixpoint<Tag> {
-    type After = Self;
-    type Before = Self;
-
-    const SKIP: bool = true;
-}
-
-#[derive(Clone)]
-pub(crate) struct TotalOrder<Assoc: Clone> {
-    // directional adjacency-map of happens-after relationships
-    nodes: HashMap<TypeId, HashSet<TypeId>>,
-    associated: HashMap<TypeId, Vec<Option<Assoc>>>,
-    skip: HashSet<TypeId>,
-}
-
-impl<Assoc: Clone> Default for TotalOrder<Assoc> {
-    fn default() -> Self {
-        Self {
-            nodes: Default::default(),
-            associated: Default::default(),
-            skip: Default::default(),
-        }
-    }
-}
-
-type VisitedSet = HashSet<TypeId>;
-impl<Assoc: Clone> TotalOrder<Assoc> {
-    fn walk_priorities<SP: AbstractServicePriority>(
-        &mut self,
-        visited: &mut VisitedSet,
-        assoc: &Option<Assoc>,
-    ) {
-        let tid = TypeId::of::<SP>();
-        if visited.contains(&tid) {
-            return;
-        }
-        visited.insert(tid);
-        if SP::SKIP {
-            self.skip.insert(tid);
-        }
-
-        // add happens-after link
-        let after_id = TypeId::of::<SP::After>();
-        let before_id = TypeId::of::<SP::Before>();
-        self.nodes
-            .entry(tid)
-            .or_insert_with(Default::default)
-            .insert(after_id);
-        self.nodes
-            .entry(before_id)
-            .or_insert_with(Default::default)
-            .insert(tid);
-
-        // if we're not at an internal node, continue
-        if self
-            .associated
-            .entry(tid)
-            .or_insert(Vec::new())
-            .iter()
-            .all(Option::is_some)
-        {
-            self.walk_priorities::<SP::Before>(visited, assoc);
-            self.walk_priorities::<SP::After>(visited, assoc);
-        }
-    }
-
-    fn add_priority_internal<SP: AbstractServicePriority>(&mut self, assoc: Option<Assoc>) {
-        let tid = TypeId::of::<SP>();
-        self.associated
-            .entry(tid)
-            .or_insert_with(Vec::new)
-            .push(assoc.clone());
-
-        let mut visited = VisitedSet::default();
-        self.walk_priorities::<SP>(&mut visited, &assoc);
-    }
-
-    pub(crate) fn add_priority<SP: AbstractServicePriority>(&mut self, assoc: Assoc) {
-        self.add_priority_internal::<SP>(Some(assoc));
-    }
-}
-
-impl<Assoc: Clone> TotalOrder<Assoc> {
-    pub(crate) fn order(mut self) -> Vec<Assoc> {
-        let mut tsort = topological_sort::TopologicalSort::<TypeId>::new();
-
-        for node in self.nodes {
-            tsort.insert(node.0);
-            for adj in node.1 {
-                tsort.insert(adj);
-
-                if adj != node.0 || !self.skip.contains(&adj) {
-                    tsort.add_dependency(adj, node.0);
-                }
-            }
-        }
-
-        let mut tid_order = vec![];
-        while tsort.pop().map(|tid| tid_order.push(tid)).is_some() {}
-        // we expect there to be two items left in the tsort: MaxPriority and Never
-        if !tsort.is_empty() {
-            panic!("Circular dependency detected! tsort: {:?}", tsort);
-        }
-
-        tid_order
-            .iter()
-            .flat_map(|tid| self.associated.remove(tid))
-            .flatten()
-            .map(|a| a.unwrap())
-            .collect()
-    }
-}
-
-#[cfg(test)]
-mod order_tests {
-    use super::{AbstractServicePriority, ServicePriority, TotalOrder};
-    use super::{MaxPriority, MinPriority};
-
-    #[test]
-    fn single_priority_test() {
-        let mut order = TotalOrder::default();
-        order.add_priority::<ServicePriority<MinPriority, MaxPriority>>("simple priority");
-        // println!("nodes: {:?}", order.nodes);
-        let result = order.order();
-        // println!("ordering: {:?}", result);
-        assert_eq!(result, vec!["simple priority"]);
-    }
-
-    #[test]
-    fn two_priority_test() {
-        let mut order = TotalOrder::default();
-        type P1 = ServicePriority<MinPriority, MaxPriority>;
-        type P2 = ServicePriority<P1, MaxPriority>;
-        order.add_priority::<P1>("P1");
-        order.add_priority::<P2>("P2");
-        let result = order.order();
-        assert_eq!(result, vec!["P1", "P2"]);
-    }
-}