Bläddra i källkod

Minor cleanups.

Kestrel 2 år sedan
förälder
incheckning
356db60b48

+ 2 - 1
fleck/examples/simple_node.rs

@@ -10,7 +10,7 @@ fn main() {
 
     // generate ephemeral keypair
     api.with_service(|ns: &fleck_core::NodeService| {
-        ns.self_node().gen_keypair();
+        ns.build_ephemeral_self_node();
     });
 
     // create UDP socket for communication with a local multicast group
@@ -31,5 +31,6 @@ fn main() {
         }
     });
 
+    // run and perform all automatic services, but don't do anything interesting...
     api.run();
 }

+ 4 - 3
fleck/src/fleck_core.rs

@@ -5,9 +5,9 @@ pub mod msg;
 pub mod node;
 pub mod peer;
 
-pub use msg::{Message,MessageService,MessageParams,MessageChannel};
-pub use node::{Node,NodeService,NodeRegistrationChannel};
-pub use peer::{Peer,PeerService};
+pub use msg::{Message, MessageChannel, MessageParams, MessageService};
+pub use node::{Node, NodeRegistrationChannel, NodeService};
+pub use peer::{Peer, PeerService};
 
 pub mod channel_tags {
     pub struct SendPacketTag {}
@@ -64,6 +64,7 @@ impl crate::service::DefaultService for CoreInitService {
 
         api.add_service::<msg::MessageService>();
         api.add_service::<node::NodeService>();
+        api.add_service::<discovery::DiscoveryService>();
         api.add_service::<discovery::LocalDiscovery>();
         api.add_service::<peer::PeerService>();
         api.add_service::<crypto::SignPacket>();

+ 76 - 64
fleck/src/fleck_core/crypto.rs

@@ -20,13 +20,13 @@ pub enum PacketHeader {
     #[default]
     NoCrypto,
     Signed(Signature),
-    // u64 here is the nonce
-    Encrypted(u64),
+    // u128 here is the nonce
+    Encrypted(u128),
 }
 
 pub struct SymmetricKey {
     key: [u8; 16],
-    nonce: [u8; 16],
+    nonce: u128,
 }
 
 pub(crate) struct VerifyPacketSignature {
@@ -38,39 +38,34 @@ impl Service for VerifyPacketSignature {
         Self { api }
     }
     fn setup(self: &std::rc::Rc<Self>) {
-        self.api.channel::<fleck_core::ReceivePacketChannel>().sub_opt(
-            fleck_core::ReceiveOrder::Verify,
-            self,
-            Self::check,
-        );
+        self.api
+            .channel::<fleck_core::ReceivePacketChannel>()
+            .sub_opt(fleck_core::ReceiveOrder::Verify, self, Self::check);
     }
 }
 
 impl VerifyPacketSignature {
-    fn check(
-        &self,
-        mut message: fleck_core::msg::Message,
-    ) -> Option<fleck_core::msg::Message> {
-        match message.crypto_header {
-            PacketHeader::Signed(signature) => {
-                // check that we have a pubkey for the node, reject if not
-                let pubkey = message.node.as_ref().and_then(|n| Some(n.pubkey()));
-                if pubkey.is_none() {
-                    return None
-                }
-                let pubkey = pubkey.unwrap();
-
-                message.crypto_header = PacketHeader::NoCrypto;
-                let to_verify = bincode::serialize(&message)
-                    .expect("assume that we can re-serialize the message");
-
-                use ed25519_dalek::Verifier;
-                pubkey.verify(&to_verify, &signature).ok()?;
-            }
-            _ => {}
+    fn check(&self, mut message: fleck_core::msg::Message) -> Option<fleck_core::msg::Message> {
+        // we don't care about the message if it isn't supposed to be signed
+        if !message.saved_params.unwrap().signed {
+            return Some(message);
         }
 
-        Some(message)
+        if let PacketHeader::Signed(signature) = message.crypto_header {
+            let pubkey = message.node.as_ref()?.pubkey();
+            // reset the crypto header for verification
+            message.crypto_header = PacketHeader::NoCrypto;
+            let to_verify =
+                bincode::serialize(&message).expect("assume that we can re-serialize the message");
+
+            use ed25519_dalek::Verifier;
+            pubkey.verify(&to_verify, &signature).ok()?;
+            // verification passed!
+            Some(message)
+        } else {
+            // it was supposed to have a signature, but it didn't...
+            None
+        }
     }
 }
 
@@ -132,18 +127,18 @@ pub enum KeyExchangeState {
 impl std::fmt::Debug for KeyExchangeState {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         match self {
-            Self::NoState => {
-                f.write_str("KeyExchangeState::NoState")
-            },
-            Self::Given(pkey) => {
-                f.write_fmt(format_args!("KeyExchangeState::Given {{ pkey: {:?} }}", pkey))
-            },
+            Self::NoState => f.write_str("KeyExchangeState::NoState"),
+            Self::Given(pkey) => f.write_fmt(format_args!(
+                "KeyExchangeState::Given {{ pkey: {:?} }}",
+                pkey
+            )),
             Self::WaitForResponse(_) => {
                 f.write_str("KeyExchangeState::WaitForResponse { redacted }")
-            },
-            Self::Completed(pkey, _) => {
-                f.write_fmt(format_args!("KeyExchangeState::Completed {{ pkey: {:?}, redacted }}", pkey))
-            },
+            }
+            Self::Completed(pkey, _) => f.write_fmt(format_args!(
+                "KeyExchangeState::Completed {{ pkey: {:?}, redacted }}",
+                pkey
+            )),
         }
     }
 }
@@ -155,15 +150,22 @@ pub struct EncryptionService {
 
 impl Service for EncryptionService {
     fn new(api: std::rc::Rc<crate::API>) -> Self {
-        Self { api, state: Default::default() }
+        Self {
+            api,
+            state: Default::default(),
+        }
     }
 
     fn setup(self: &std::rc::Rc<Self>) {
         self.api.with_service(|msg: &fleck_core::MessageService| {
             msg.add_message_type::<KeyExchange>();
         });
-        self.api.channel::<fleck_core::node::NodeRegistrationChannel>().sub_ref(self, Self::new_node);
-        self.api.channel::<fleck_core::msg::MessageChannel<KeyExchange>>().sub_eat(self, Self::incoming);
+        self.api
+            .channel::<fleck_core::node::NodeRegistrationChannel>()
+            .sub_ref(self, Self::new_node);
+        self.api
+            .channel::<fleck_core::msg::MessageChannel<KeyExchange>>()
+            .sub_eat(self, Self::incoming);
     }
 }
 
@@ -183,11 +185,12 @@ impl EncryptionService {
         // send notification with new secret
         let send = || {
             log::trace!("sending KeyExchange message");
-            self.api.queue::<fleck_core::SendPacketChannel>(fleck_core::msg::Message::build(
-                KeyExchange {
-                    key: x25519_dalek::PublicKey::from(&secret)
-                }
-            ).with_peer(with.clone()));
+            self.api.queue::<fleck_core::SendPacketChannel>(
+                fleck_core::msg::Message::build(KeyExchange {
+                    key: x25519_dalek::PublicKey::from(&secret),
+                })
+                .with_peer(with.clone()),
+            );
         };
 
         let mut states = self.state.borrow_mut();
@@ -196,43 +199,50 @@ impl EncryptionService {
             KeyExchangeState::NoState => {
                 send();
                 states.insert(with.clone(), KeyExchangeState::WaitForResponse(secret));
-            },
+            }
             KeyExchangeState::Given(pubkey) => {
                 send();
                 let result = secret.diffie_hellman(&pubkey);
 
                 let sk = SymmetricKey {
                     key: result.as_bytes()[0..16].try_into().unwrap(),
-                    nonce: result.as_bytes()[16..32].try_into().unwrap(),
+                    nonce: u128::from_be_bytes(result.as_bytes()[16..32].try_into().unwrap()),
                 };
 
                 log::trace!("Completed key negotiation with peer!");
                 states.insert(with.clone(), KeyExchangeState::Completed(pubkey, sk));
-            },
+            }
             KeyExchangeState::WaitForResponse(secret) => {
                 send();
                 states.insert(with.clone(), KeyExchangeState::WaitForResponse(secret));
-            },
+            }
             KeyExchangeState::Completed(_pubkey, _symmetric_key) => {
-                log::error!("asked to begin handshake when already have a good encryption key?");
-                return
-            },
+                log::warn!("asked to begin handshake when already have a good encryption key --- did the peer die?");
+                return;
+            }
         }
     }
 
     fn incoming(&self, msg: (fleck_core::msg::Metadata, KeyExchange)) {
         let mut states = self.state.borrow_mut();
-        let peer = msg.0.peer.expect("incoming KeyExchange should have a peer?");
+        let peer = msg
+            .0
+            .peer
+            .expect("incoming KeyExchange should have a peer?");
 
         let state = states.remove(&peer).unwrap_or(KeyExchangeState::NoState);
-        log::trace!("incoming key exchange message: {:?}, state: {:?}", msg.1, &state);
+        log::trace!(
+            "incoming key exchange message: {:?}, state: {:?}",
+            msg.1,
+            &state
+        );
         match state {
             KeyExchangeState::NoState => {
                 states.insert(peer.clone(), KeyExchangeState::Given(msg.1.key));
 
                 drop(states);
                 self.begin_handshake(&peer);
-            },
+            }
             KeyExchangeState::Given(peer_pubkey) => {
                 if peer_pubkey != msg.1.key {
                     log::info!("peer DH key changed!");
@@ -241,19 +251,19 @@ impl EncryptionService {
 
                 drop(states);
                 self.begin_handshake(&peer);
-            },
+            }
             KeyExchangeState::WaitForResponse(secret) => {
                 // key handshake is finished!
                 let result = secret.diffie_hellman(&msg.1.key);
 
                 let sk = SymmetricKey {
                     key: result.as_bytes()[0..16].try_into().unwrap(),
-                    nonce: result.as_bytes()[16..32].try_into().unwrap(),
+                    nonce: u128::from_be_bytes(result.as_bytes()[16..32].try_into().unwrap()),
                 };
 
                 states.insert(peer.clone(), KeyExchangeState::Completed(msg.1.key, sk));
                 drop(states);
-            },
+            }
             KeyExchangeState::Completed(peer_pubkey, _symmetric_key) => {
                 if peer_pubkey == msg.1.key {
                     // it's a repeat, so nothing to do
@@ -263,9 +273,11 @@ impl EncryptionService {
                 states.insert(peer.clone(), KeyExchangeState::Given(msg.1.key));
                 drop(states);
                 self.begin_handshake(&peer);
-            },
+            }
         }
-        log::trace!("after processing incoming key exchange message, state: {:?}", self.state.borrow().get(&peer));
-
+        log::trace!(
+            "after processing incoming key exchange message, state: {:?}",
+            self.state.borrow().get(&peer)
+        );
     }
 }

+ 25 - 15
fleck/src/fleck_core/discovery.rs

@@ -1,12 +1,12 @@
 use crate::prelude::*;
 
 use serde::{Deserialize, Serialize};
-use std::{cell::RefCell, rc::Rc, collections::HashSet};
+use std::{cell::RefCell, collections::HashSet, rc::Rc};
 
 #[derive(Serialize, Deserialize, Debug)]
 struct DiscoveryMsg {
     pkey: fleck_core::crypto::PublicKey,
-    ttl: u8
+    ttl: u8,
 }
 
 impl fleck_core::msg::MessageParams for DiscoveryMsg {
@@ -26,7 +26,12 @@ impl Service for DiscoveryService {
     }
 
     fn setup(self: &Rc<Self>) {
-        self.api.channel::<fleck_core::msg::MessageChannel<DiscoveryMsg>>()
+        self.api.with_service(|msg: &fleck_core::MessageService| {
+            msg.add_message_type::<DiscoveryMsg>();
+        });
+
+        self.api
+            .channel::<fleck_core::msg::MessageChannel<DiscoveryMsg>>()
             .sub_eat(self, Self::discovery);
     }
 }
@@ -34,12 +39,11 @@ impl Service for DiscoveryService {
 impl DiscoveryService {
     fn discovery(&self, msg: (fleck_core::msg::Metadata, DiscoveryMsg)) {
         let discovery = msg.1;
-        log::trace!("received discovery message");
 
         // this will automatically ignore self-messages because we already know our own pubkey
         self.api.with_service(|ns: &fleck_core::NodeService| {
             if ns.node_by_pubkey(&discovery.pkey).is_none() {
-                // XXX ns.inform_of_peer(discovery.pkey, msg.0.peer.unwrap());
+                ns.inform_of(discovery.pkey, msg.0.peer.unwrap());
             }
         });
     }
@@ -59,16 +63,15 @@ impl Service for LocalDiscovery {
         }
     }
     fn setup(self: &Rc<Self>) {
-        self.api.with_service(|m: &fleck_core::MessageService| {
-            m.add_message_type::<DiscoveryMsg>();
-        });
-        self.api.channel::<fleck_core::MajorTickChannel>()
+        self.api
+            .channel::<fleck_core::MajorTickChannel>()
             .sub_ref(self, LocalDiscovery::process_major_tick);
     }
 }
 
 impl LocalDiscovery {
     pub fn add_multicast_peer(&self, peer: fleck_core::peer::Peer) {
+        // TODO: verify that the peer is in fact a multicast address...
         self.targets.borrow_mut().push(peer);
     }
 
@@ -103,7 +106,9 @@ impl Service for PeerDiscovery {
     }
 
     fn setup(self: &Rc<Self>) {
-        self.api.channel::<fleck_core::ReceivePacketChannel>().sub_opt(fleck_core::ReceiveOrder::Annotate, self, Self::discover);
+        self.api
+            .channel::<fleck_core::ReceivePacketChannel>()
+            .sub_opt(fleck_core::ReceiveOrder::Annotate, self, Self::discover);
     }
 }
 
@@ -112,7 +117,7 @@ impl PeerDiscovery {
         let peer = match &msg.peer {
             Some(peer) => peer,
             // don't touch if it doesn't have a peer
-            None => { return Some(msg) }
+            None => return Some(msg),
         };
         // check to see if we know this peer yet
         if self.known.borrow().contains(&peer) {
@@ -126,9 +131,14 @@ impl PeerDiscovery {
         let mut known = self.known.borrow_mut();
         known.insert(peer.clone());
 
-        self.api.queue::<fleck_core::SendPacketChannel>(DiscoveryMsg {
-            pkey: self.api.with_service(|ns: &fleck_core::NodeService| ns.self_node().pubkey().clone()),
-            ttl: 2,
-        }.into());
+        self.api.queue::<fleck_core::SendPacketChannel>(
+            DiscoveryMsg {
+                pkey: self
+                    .api
+                    .with_service(|ns: &fleck_core::NodeService| ns.self_node().pubkey().clone()),
+                ttl: 2,
+            }
+            .into(),
+        );
     }
 }

+ 4 - 1
fleck/src/fleck_core/io.rs

@@ -195,7 +195,10 @@ impl UdpSocket {
             peer.clone()
         } else {
             let peer = super::peer::Peer {
-                data: Rc::new(super::peer::PeerData::new(rc.clone(), super::peer::PeerAddress::Udp(*addr)))
+                data: Rc::new(super::peer::PeerData::new(
+                    rc.clone(),
+                    super::peer::PeerAddress::Udp(*addr),
+                )),
             };
             peers.insert(*addr, peer.clone());
             peer

+ 10 - 8
fleck/src/fleck_core/msg.rs

@@ -128,20 +128,21 @@ pub struct MessageService {
 
 impl Service for MessageService {
     fn new(api: Rc<crate::API>) -> Self {
-        Self { api, deser: Default::default() }
+        Self {
+            api,
+            deser: Default::default(),
+        }
     }
     fn setup(self: &Rc<Self>) {
-        self.api.channel::<fleck_core::ReceivePacketChannel>().sub_opt(
-            fleck_core::ReceiveOrder::Parse,
-            self,
-            Self::parse,
-        );
+        self.api
+            .channel::<fleck_core::ReceivePacketChannel>()
+            .sub_opt(fleck_core::ReceiveOrder::Parse, self, Self::parse);
     }
 }
 
 impl MessageService {
     pub fn add_message_type<MT: std::fmt::Debug + MessageParams + Serialize + DeserializeOwned>(
-        &self
+        &self,
     ) {
         let derived_typeid = message_type::<MT>();
         let api = self.api.clone();
@@ -150,10 +151,11 @@ impl MessageService {
             Box::new(
                 move |message| match bincode::deserialize::<MT>(&message.content.data) {
                     Ok(content) => {
+                        log::trace!("packet deserialized: {:?}", content);
                         api.queue::<MessageChannel<MT>>((message.metadata(), content));
                     }
                     Err(_) => {
-                        log::trace!("Packet failed deserialization step?");
+                        log::info!("Packet failed deserialization step!");
                     }
                 },
             ),

+ 66 - 67
fleck/src/fleck_core/node.rs

@@ -2,8 +2,8 @@ use rand::prelude::*;
 
 use std::{
     cell::{Ref, RefCell},
-    collections::{HashMap, HashSet},
-    rc::{Weak, Rc},
+    collections::HashMap,
+    rc::{Rc, Weak},
 };
 
 use crate::prelude::*;
@@ -37,7 +37,7 @@ impl<'l, T> std::ops::Deref for InnerRef<'l, T> {
     }
 }
 
-#[derive(Debug,Default)]
+#[derive(Debug, Default)]
 pub struct Node {
     pubkey: PublicKey,
     peer: Option<fleck_core::peer::Peer>,
@@ -45,13 +45,33 @@ pub struct Node {
 }
 
 impl Node {
-    pub fn build_with_peer(peer: fleck_core::peer::Peer) -> Self {
+    pub fn build_with_peer(pubkey: PublicKey, peer: fleck_core::peer::Peer) -> Self {
         Self {
+            pubkey,
             peer: Some(peer),
             ..Default::default()
         }
     }
 
+    pub fn build_ephemeral() -> Self {
+        let mut csprng = StdRng::from_entropy();
+        let kp = Keypair::generate(&mut csprng);
+
+        Self {
+            pubkey: kp.public,
+            peer: None,
+            keypair: RefCell::new(Some(kp)),
+        }
+    }
+
+    pub fn build_with_keypair(kp: Keypair) -> Self {
+        Self {
+            pubkey: kp.public,
+            peer: None,
+            keypair: RefCell::new(Some(kp)),
+        }
+    }
+
     pub fn peer(&self) -> Option<fleck_core::peer::Peer> {
         self.peer.clone()
     }
@@ -62,37 +82,6 @@ impl Node {
     pub(crate) fn keypair(&self) -> Option<InnerRef<'_, Keypair>> {
         InnerRef::build(self.keypair.borrow())
     }
-
-    /*pub fn set_pubkey(&self, pk: PublicKey) {
-        let mut pkey = self.pubkey.borrow_mut();
-        if pkey.is_some() {
-            panic!("Tried to set public key on node that already has a public key!")
-        }
-        *pkey = Some(pk);
-    }
-
-    pub fn set_keypair(&self, kp: Keypair) {
-        let mut keyp = self.keypair.borrow_mut();
-        if keyp.is_some() {
-            panic!("Tried to set keypair on node that already has a keypair!")
-        }
-        *keyp = Some(kp);
-    }
-
-    pub fn gen_keypair(&self) {
-        let mut pkey = self.pubkey.borrow_mut();
-        let mut keyp = self.keypair.borrow_mut();
-        if pkey.is_some() {
-            panic!("Tried to generate keypair for node that already has a pubkey!")
-        }
-        if keyp.is_some() {
-            panic!("Tried to generate keypair for node that already has a keypair!")
-        }
-        let mut csprng = StdRng::from_entropy();
-        let kp = Keypair::generate(&mut csprng);
-        *pkey = Some(kp.public);
-        *keyp = Some(kp);
-    }*/
 }
 
 pub struct RegTag {}
@@ -108,11 +97,19 @@ pub struct NodeService {
 
 impl NodeService {
     pub fn self_node(&self) -> Rc<Node> {
-        self.self_node.borrow().upgrade().expect("asking for self node before it's been initialized!")
+        self.self_node
+            .borrow()
+            .upgrade()
+            .expect("asking for self node before it's been initialized!")
     }
 
-    pub fn use_ephemeral(&self) {
-        
+    pub fn build_ephemeral_self_node(&self) {
+        assert_eq!(self.self_node.borrow().strong_count(), 0);
+
+        let nself = Rc::new(Node::build_ephemeral());
+
+        self.all_nodes.borrow_mut().push(nself.clone());
+        *self.self_node.borrow_mut() = nself.as_weak();
     }
 
     pub fn node_by_pubkey(&self, pubkey: &PublicKey) -> Option<Rc<Node>> {
@@ -133,35 +130,36 @@ impl NodeService {
             .collect()
     }
 
-    // inform of new nodes 
-    /*
-    pub fn inform_of_peer(&self, peer: fleck_core::peer::Peer) -> Rc<Node> {
-        let mut map = self.node_by_peer.borrow_mut();
-        if !map.contains_key(&peer) {
-            let node = Rc::new(Node::build_with_peer(peer.clone()));
-            map.insert(peer, node.clone());
-            self.all_nodes.borrow_mut().push(node.clone());
-
-            self.api.queue::<NodeRegistrationChannel>(node);
+    pub fn inform_of(&self, pubkey: PublicKey, peer: fleck_core::Peer) {
+        // do we know of a node with these properties already?
+        let pref = &peer;
+        log::trace!(
+            "checking to see if we already know of peer: ({:?}, {:?})",
+            pubkey,
+            peer
+        );
+        for node in self.all_nodes.borrow().iter() {
+            log::trace!("\toption: {:?}, {:?}", node.pubkey, node.peer);
         }
-        
-        map.get(&peer).unwrap().clone()
-    }
-    */
-
-    /*pub fn inform_of_peer_pubkey(&self, peer: fleck_core::peer::Peer, pkey: fleck_core::crypto::PublicKey) -> Rc<Node> {
-        log::trace!("informed of peer {:?} / {:?}", pkey, peer);
-        let ret = self.inform_of_peer(peer);
-        match ret.pubkey() {
-            Some(existing_key) => {
-                if *existing_key != pkey {
-                    log::warn!("Public key of node has changed!");
-                }
-            },
-            None => ret.set_pubkey(pkey),
+        if self
+            .all_nodes
+            .borrow()
+            .iter()
+            .filter(|n| n.peer.as_ref() == Some(pref) && n.pubkey == pubkey)
+            .count()
+            != 0
+        {
+            return;
         }
-        ret
-    }*/
+
+        // nope, it's a new node
+        log::trace!("Informed of new peer!");
+        let node = Rc::new(Node::build_with_peer(pubkey, peer.clone()));
+
+        self.all_nodes.borrow_mut().push(node.clone());
+        self.node_by_peer.borrow_mut().insert(peer, node.clone());
+        self.api.channel::<NodeRegistrationChannel>().queue(node);
+    }
 }
 
 impl Service for NodeService {
@@ -180,7 +178,8 @@ impl Service for NodeService {
     fn setup(self: &Rc<Self>) {
         self.api.create_channel::<NodeRegistrationChannel>();
 
-        self.api.channel::<fleck_core::ReceivePacketChannel>()
+        self.api
+            .channel::<fleck_core::ReceivePacketChannel>()
             .sub_ref(fleck_core::ReceiveOrder::Annotate, self, Self::annotate);
     }
 }
@@ -197,7 +196,7 @@ impl NodeService {
                 None => {
                     // the interesting case, it's a new node!
                     // TODO: send them a request for their public key
-                },
+                }
             }
         }
     }

+ 8 - 8
fleck/src/fleck_core/peer.rs

@@ -1,6 +1,6 @@
 //! Peer: someone we can communicate with directly
 
-use std::{rc::Rc, cell::Cell};
+use std::{cell::Cell, rc::Rc};
 
 use crate::prelude::*;
 
@@ -60,7 +60,7 @@ impl PartialEq for Peer {
 }
 
 impl Eq for Peer {
-    fn assert_receiver_is_total_eq(&self) { }
+    fn assert_receiver_is_total_eq(&self) {}
 }
 
 #[derive(Default)]
@@ -68,16 +68,16 @@ pub struct PeerService {}
 
 impl DefaultService for PeerService {
     fn setup(self: &Rc<Self>, api: Rc<crate::API>) {
-        api.channel::<fleck_core::SendPacketChannel>()
-            .sub_opt(fleck_core::SendOrder::Dispatch, self, Self::dispatch);
+        api.channel::<fleck_core::SendPacketChannel>().sub_opt(
+            fleck_core::SendOrder::Dispatch,
+            self,
+            Self::dispatch,
+        );
     }
 }
 
 impl PeerService {
-    fn dispatch(
-        &self,
-        msg: super::msg::Message,
-    ) -> Option<super::msg::Message> {
+    fn dispatch(&self, msg: super::msg::Message) -> Option<super::msg::Message> {
         if msg.peer.is_none() {
             log::error!("Message got to dispatch with no peer selected?");
             return None;

+ 4 - 8
fleck/src/lib.rs

@@ -7,7 +7,7 @@ pub mod service;
 pub mod prelude {
     pub use crate::fleck_core;
     pub use crate::helper::{AsAny, IntoWeak};
-    pub use crate::service::{ChannelSpec, Service, DefaultService};
+    pub use crate::service::{ChannelSpec, DefaultService, Service};
 }
 
 use prelude::*;
@@ -45,13 +45,11 @@ impl API {
 
 impl API {
     /// Add a service to the Fleck instance.
-    pub fn add_service<S: Service + 'static>(&self)
-    {
+    pub fn add_service<S: Service + 'static>(&self) {
         self.services.borrow().give_service::<S>();
     }
     /// Access a pre-existing service
-    pub fn with_service<S: Service + 'static, R, F: FnOnce(&S) -> R>(&self, f: F) -> R
-    {
+    pub fn with_service<S: Service + 'static, R, F: FnOnce(&S) -> R>(&self, f: F) -> R {
         self.services.borrow().with_service(f)
     }
     /// Create an event channel from a specification
@@ -84,9 +82,7 @@ pub struct Fleck {
 
 impl Fleck {
     pub fn new() -> Self {
-        Self {
-            api: API::new()
-        }
+        Self { api: API::new() }
     }
 
     pub fn api(&self) -> Rc<API> {

+ 8 - 11
fleck/src/service.rs

@@ -2,7 +2,7 @@ use std::any::TypeId;
 use std::cell::RefCell;
 use std::collections::HashMap;
 use std::ops::Deref;
-use std::rc::{Weak, Rc};
+use std::rc::{Rc, Weak};
 
 pub(crate) mod event;
 
@@ -15,7 +15,9 @@ pub trait Service: std::any::Any + crate::helper::AsAny {
 
     /// Please don't implement this.
     #[doc(hidden)]
-    fn _setup_wrapper(self: &Rc<Self>, _api: Rc<crate::API>) { self.setup(); }
+    fn _setup_wrapper(self: &Rc<Self>, _api: Rc<crate::API>) {
+        self.setup();
+    }
 }
 
 #[allow(unused_variables)]
@@ -47,23 +49,18 @@ impl ServiceStack {
     pub(crate) fn give_api(&mut self, api: Weak<crate::API>) {
         self.api = api;
     }
-    pub(crate) fn give_service<S: 'static + Service>(&self)
-    {
+    pub(crate) fn give_service<S: 'static + Service>(&self) {
         let srv = S::new(self.api.upgrade().expect("no API given"));
         self.add_service(srv);
     }
 
-    pub(crate) fn add_service<S: 'static + Service>(&self, srv: S)
-    {
+    pub(crate) fn add_service<S: 'static + Service>(&self, srv: S) {
         let srv = Rc::new(srv);
         srv._setup_wrapper(self.api.upgrade().expect("no API given"));
-        self.service_map
-            .borrow_mut()
-            .insert(TypeId::of::<S>(), srv);
+        self.service_map.borrow_mut().insert(TypeId::of::<S>(), srv);
     }
 
-    pub fn with_service<'l, S: Service + 'static, R, F: 'l + FnOnce(&S) -> R>(&'l self, f: F) -> R
-    {
+    pub fn with_service<'l, S: Service + 'static, R, F: 'l + FnOnce(&S) -> R>(&'l self, f: F) -> R {
         let id = TypeId::of::<S>();
 
         // note: we don't want to hold a reference to service_map when invoking f()

+ 46 - 43
fleck/src/service/event.rs

@@ -10,9 +10,7 @@ pub enum SubscriptionFunction<Host: 'static, Data: 'static> {
     Consume(Box<dyn Fn(&Host, Data) -> ()>),
 }
 
-impl<Host: 'static, Data: 'static>
-    SubscriptionFunction<Host, Data>
-{
+impl<Host: 'static, Data: 'static> SubscriptionFunction<Host, Data> {
     fn invoke(&self, host: &Host, mut data: Data) -> Option<Data> {
         match self {
             Self::ByRef(f) => {
@@ -38,9 +36,7 @@ trait EventSub<Data: 'static> {
     fn invoke(&self, data: Data) -> Option<Data>;
 }
 
-impl<Host: 'static, Data: 'static> EventSub<Data>
-    for ConcreteEventSub<Host, Data>
-{
+impl<Host: 'static, Data: 'static> EventSub<Data> for ConcreteEventSub<Host, Data> {
     fn is_healthy(&self) -> bool {
         self.host.strong_count() > 0
     }
@@ -60,9 +56,7 @@ pub struct Channel<Tag: 'static, Data: 'static, Priority: 'static> {
     _ghost: std::marker::PhantomData<(Tag, Priority)>,
 }
 
-impl<Tag: 'static, Data: 'static, Priority: 'static> Default
-    for Channel<Tag, Data, Priority>
-{
+impl<Tag: 'static, Data: 'static, Priority: 'static> Default for Channel<Tag, Data, Priority> {
     fn default() -> Self {
         Self {
             subs: RefCell::new(Default::default()),
@@ -72,9 +66,7 @@ impl<Tag: 'static, Data: 'static, Priority: 'static> Default
     }
 }
 
-impl<Tag: 'static, Data: 'static>
-    Channel<Tag, Data, NoPriorityTag>
-{
+impl<Tag: 'static, Data: 'static> Channel<Tag, Data, NoPriorityTag> {
     pub fn sub_ref<
         Host: 'static,
         HC: crate::helper::IntoWeak<Host>,
@@ -127,9 +119,7 @@ impl<Tag: 'static, Data: 'static>
     }
 }
 
-impl<Tag: 'static, Data: 'static, Priority: PartialOrd>
-    Channel<Tag, Data, Priority>
-{
+impl<Tag: 'static, Data: 'static, Priority: PartialOrd> Channel<Tag, Data, Priority> {
     pub fn sub_ref<
         Host: 'static,
         HC: crate::helper::IntoWeak<Host>,
@@ -187,20 +177,38 @@ impl<Tag: 'static, Data: 'static, Priority: PartialOrd>
             callback: SubscriptionFunction::Consume(Box::new(cb)),
         });
 
-        self.subs.borrow_mut().push((p, sub));
+        let mut subs = self.subs.borrow_mut();
+        subs.push((p, sub));
+        // XXX: what happens if we actually get an undefined order?...
+        subs.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
     }
 }
 
-impl<Tag: 'static, Data: 'static, Priority: 'static>
-    Channel<Tag, Data, Priority>
-{
+impl<Tag: 'static, Data: 'static, Priority: 'static> Channel<Tag, Data, Priority> {
     pub fn queue(&self, data: Data) {
         self.queue.borrow_mut().push_back(data);
     }
+}
+
+trait ChannelMetadata {
+    fn queue_len(&self) -> usize;
+    fn fire_all(&self) -> usize;
+}
+
+impl<Tag: 'static, Data: 'static, Priority: 'static> ChannelMetadata
+    for Channel<Tag, Data, Priority>
+{
+    fn queue_len(&self) -> usize {
+        self.queue.borrow().len()
+    }
 
-    fn do_fire_all(&self) -> usize {
+    fn fire_all(&self) -> usize {
         let mut count = 0;
 
+        if self.subs.borrow().len() == 0 {
+            return self.queue.borrow_mut().drain(..).count();
+        }
+
         while !self.queue.borrow().is_empty() {
             let mut event = self.queue.borrow_mut().pop_front();
 
@@ -218,23 +226,6 @@ impl<Tag: 'static, Data: 'static, Priority: 'static>
     }
 }
 
-trait ChannelMetadata {
-    fn queue_len(&self) -> usize;
-    fn fire_all(&self) -> usize;
-}
-
-impl<Tag: 'static, Data: 'static, Priority: 'static>
-    ChannelMetadata for Channel<Tag, Data, Priority>
-{
-    fn queue_len(&self) -> usize {
-        self.queue.borrow().len()
-    }
-
-    fn fire_all(&self) -> usize {
-        self.do_fire_all()
-    }
-}
-
 pub trait ChannelSpec: 'static {
     type Tag: 'static;
     type Data: 'static;
@@ -277,9 +268,17 @@ impl EventRoot {
         let tid = std::any::TypeId::of::<Channel<CS::Tag, CS::Data, CS::Priority>>();
 
         let ch = Rc::new(Channel::<CS::Tag, CS::Data, CS::Priority>::default());
-        self.metadata
+        if self
+            .metadata
             .borrow_mut()
-            .insert(tid, (std::any::type_name::<CS>(), ch.clone()));
+            .insert(tid, (std::any::type_name::<CS>(), ch.clone()))
+            .is_some()
+        {
+            panic!(
+                "Tried recreating already-existing channel {}!",
+                std::any::type_name::<CS>()
+            );
+        }
         self.channels.borrow_mut().insert(tid, ch);
     }
 
@@ -310,11 +309,15 @@ impl EventRoot {
             any = false;
 
             for ch in self.metadata.borrow().iter() {
-                let count = ch.1 .1.fire_all();
-                if count > 0 {
-                    log::trace!("Queue {} processed {} event(s)", ch.1 .0, count);
+                if ch.1 .1.queue_len() > 0 {
+                    log::trace!(
+                        "Queue {} has {} event(s) to fire",
+                        ch.1 .0,
+                        &ch.1 .1.queue_len()
+                    );
+                    ch.1 .1.fire_all();
+                    any = true;
                 }
-                any = any || count > 0;
             }
         }
     }