Browse Source

Clippy nits and refactoring.

Kestrel 2 years ago
parent
commit
f18257de41

+ 5 - 6
fleck/examples/simple_node.rs

@@ -5,16 +5,15 @@ use fleck::fleck_core;
 fn main() {
     pretty_env_logger::init_timed();
 
-    let fleck = fleck::Fleck::new();
-    let api = fleck.api();
+    let fleck = fleck::API::new();
 
     // generate ephemeral keypair
-    api.with_service(|ns: &fleck_core::NodeService| {
+    fleck.with_service(|ns: &fleck_core::NodeService| {
         ns.build_ephemeral_self_node();
     });
 
     // create UDP socket for communication with a local multicast group
-    let sockref = api.with_service(|io: &fleck_core::io::IOService| {
+    let sockref = fleck.with_service(|io: &fleck_core::io::IOService| {
         fleck_core::io::UdpSocketBuilder::default()
             .bind_to(SocketAddr::new(
                 std::net::Ipv4Addr::UNSPECIFIED.into(),
@@ -25,12 +24,12 @@ fn main() {
     });
 
     // enable local discovery
-    api.with_service(|ld: &fleck_core::discovery::LocalDiscovery| {
+    fleck.with_service(|ld: &fleck_core::discovery::LocalDiscovery| {
         for peer in sockref.multicast_peers() {
             ld.add_multicast_peer(peer);
         }
     });
 
     // run and perform all automatic services, but don't do anything interesting...
-    api.run();
+    fleck.run();
 }

+ 2 - 3
fleck/src/fleck_core.rs

@@ -1,5 +1,5 @@
 //! This module contains the core services (and channel definitions) that make up a `fleck` node.
-//! 
+//!
 //! Currently, the following externally-accessible services are added by default:
 //! - [`msg::MessageService`]
 //! - [`node::NodeService`]
@@ -7,7 +7,6 @@
 //! - [`peer::PeerService`]
 //! - [`crypto::EncryptionService`]
 
-
 pub mod crypto;
 pub mod discovery;
 pub mod io;
@@ -62,7 +61,7 @@ impl crate::service::DefaultService for CoreInitService {
         let io_api = api.clone();
         api.with_service(move |io: &io::IOService| {
             let minor_api = io_api.clone();
-            let major_api = io_api.clone();
+            let major_api = io_api;
             io::TimerFdBuilder::default()
                 .interval(std::time::Duration::from_secs(1))
                 .trigger(move || minor_api.queue::<MinorTickChannel>(()))

+ 1 - 2
fleck/src/fleck_core/crypto.rs

@@ -194,7 +194,7 @@ impl EncryptionService {
         };
 
         let mut states = self.state.borrow_mut();
-        let state = states.remove(&with).unwrap_or(KeyExchangeState::NoState);
+        let state = states.remove(with).unwrap_or(KeyExchangeState::NoState);
         match state {
             KeyExchangeState::NoState => {
                 send();
@@ -218,7 +218,6 @@ impl EncryptionService {
             }
             KeyExchangeState::Completed(_pubkey, _symmetric_key) => {
                 log::warn!("asked to begin handshake when already have a good encryption key --- did the peer die?");
-                return;
             }
         }
     }

+ 3 - 3
fleck/src/fleck_core/discovery.rs

@@ -120,7 +120,7 @@ impl PeerDiscovery {
             None => return Some(msg),
         };
         // check to see if we know this peer yet
-        if self.known.borrow().contains(&peer) {
+        if self.known.borrow().contains(peer) {
             self.new_peer(peer.clone());
         }
 
@@ -129,13 +129,13 @@ impl PeerDiscovery {
 
     fn new_peer(&self, peer: fleck_core::Peer) {
         let mut known = self.known.borrow_mut();
-        known.insert(peer.clone());
+        known.insert(peer);
 
         self.api.queue::<fleck_core::SendPacketChannel>(
             DiscoveryMsg {
                 pkey: self
                     .api
-                    .with_service(|ns: &fleck_core::NodeService| ns.self_node().pubkey().clone()),
+                    .with_service(|ns: &fleck_core::NodeService| *ns.self_node().pubkey()),
                 ttl: 2,
             }
             .into(),

+ 7 - 215
fleck/src/fleck_core/io.rs

@@ -1,12 +1,17 @@
 use std::{
     cell::{Cell, RefCell},
-    collections::{HashMap, VecDeque},
-    os::unix::prelude::{AsRawFd, RawFd},
+    collections::HashMap,
+    os::unix::prelude::RawFd,
     rc::Rc,
 };
 
 use crate::prelude::*;
 
+mod udp;
+pub use udp::UdpSocketBuilder;
+mod timerfd;
+pub use self::timerfd::TimerFdBuilder;
+
 pub struct InterestRegistration {
     poll: Rc<RefCell<mio::Poll>>,
     token: mio::Token,
@@ -106,216 +111,3 @@ impl IOService {
         }
     }
 }
-
-#[derive(Default)]
-pub struct UdpSocketBuilder {
-    bind: Option<std::net::SocketAddr>,
-    multicast: Vec<(std::net::IpAddr, std::net::IpAddr)>,
-}
-
-impl UdpSocketBuilder {
-    pub fn bind_to(mut self, addr: std::net::SocketAddr) -> Self {
-        self.bind = Some(addr);
-        self
-    }
-
-    pub fn join_multicast(mut self, group: std::net::IpAddr, iface: std::net::IpAddr) -> Self {
-        self.multicast.push((group, iface));
-        self
-    }
-
-    pub fn build(self, io: &IOService) -> UdpSocketRef {
-        let socket =
-            mio::net::UdpSocket::bind(self.bind.expect("no bind address given to UDP socket!"))
-                .expect("couldn't bind UDP socket");
-        let interest = io.register_interest(socket.as_raw_fd());
-
-        for mcast in &self.multicast {
-            use std::net::IpAddr;
-            match (mcast.0, mcast.1) {
-                (IpAddr::V4(group), IpAddr::V4(iface)) => {
-                    socket
-                        .join_multicast_v4(&group, &iface)
-                        .expect("couldn't join multicast group");
-                }
-                (IpAddr::V6(_group), IpAddr::V6(_iface)) => {
-                    todo!()
-                }
-                _ => panic!("Multicast specification mixes ipv4 and ipv6 addresses!"),
-            }
-        }
-
-        let sock = Rc::new(UdpSocket {
-            api: io.api.clone(),
-            interest,
-            socket,
-            queue: Default::default(),
-            peers: Default::default(),
-            multicast_joined: self.multicast.iter().map(|e| e.0).collect(),
-        });
-
-        io.register_handler(&sock.interest, sock.clone());
-
-        UdpSocketRef { sock }
-    }
-}
-
-pub struct UdpSocketRef {
-    sock: Rc<UdpSocket>,
-}
-
-impl UdpSocketRef {
-    pub fn multicast_peers(&self) -> Vec<fleck_core::peer::Peer> {
-        let port = self.sock.socket.local_addr().unwrap().port();
-        let rh: Rc<dyn FdHandler> = self.sock.clone();
-        self.sock
-            .multicast_joined
-            .iter()
-            .map(|ip| {
-                self.sock
-                    .peer_for(&rh, &std::net::SocketAddr::new(*ip, port))
-            })
-            .collect()
-    }
-}
-
-struct UdpSocket {
-    api: std::rc::Rc<crate::API>,
-    interest: InterestRegistration,
-    socket: mio::net::UdpSocket,
-    queue: RefCell<VecDeque<(std::net::SocketAddr, Vec<u8>)>>,
-    peers: RefCell<HashMap<std::net::SocketAddr, super::peer::Peer>>,
-    multicast_joined: Vec<std::net::IpAddr>,
-}
-
-impl UdpSocket {
-    fn peer_for(&self, rc: &Rc<dyn FdHandler>, addr: &std::net::SocketAddr) -> super::peer::Peer {
-        let mut peers = self.peers.borrow_mut();
-        if let Some(peer) = peers.get(addr) {
-            peer.clone()
-        } else {
-            let peer = super::peer::Peer {
-                data: Rc::new(super::peer::PeerData::new(
-                    rc.clone(),
-                    super::peer::PeerAddress::Udp(*addr),
-                )),
-            };
-            peers.insert(*addr, peer.clone());
-            peer
-        }
-    }
-}
-
-impl FdHandler for UdpSocket {
-    fn ready_read(&self, rc: &Rc<dyn FdHandler>) {
-        let mut buf = [0u8; 2048];
-        while let Ok((bytes, addr)) = self.socket.recv_from(&mut buf) {
-            if let Ok(mut msg) = bincode::deserialize::<super::msg::Message>(&buf[..bytes]) {
-                msg.peer = Some(self.peer_for(rc, &addr));
-                self.api.queue::<super::ReceivePacketChannel>(msg);
-            }
-        }
-    }
-
-    fn ready_write(&self, _rc: &Rc<dyn FdHandler>) {
-        let mut queue = self.queue.borrow_mut();
-        while !queue.is_empty() {
-            let packet = queue.front().unwrap();
-
-            if let Ok(len) = self.socket.send_to(&packet.1, packet.0) {
-                if len != packet.1.len() {
-                    log::error!(
-                        "Sent packet truncated to {} of {} bytes",
-                        len,
-                        packet.1.len()
-                    );
-                }
-
-                drop(packet);
-                queue.pop_front();
-            } else {
-                return;
-            }
-        }
-
-        self.interest.update_interest(mio::Interest::READABLE);
-    }
-
-    fn dispatch(&self, msg: fleck_core::msg::Message) {
-        let peer = msg.peer.clone().unwrap(); // if we got this far, the peer points to us
-
-        if let super::peer::PeerAddress::Udp(addr) = peer.data.address_info {
-            self.queue.borrow_mut().push_back((
-                addr,
-                bincode::serialize(&msg).expect("couldn't serialize message?"),
-            ));
-            self.interest
-                .update_interest(mio::Interest::READABLE.add(mio::Interest::WRITABLE));
-        } else {
-            log::error!("packet dispatched to UdpSocket without UDP address!");
-        }
-    }
-}
-
-#[derive(Default)]
-pub struct TimerFdBuilder {
-    interval: Option<std::time::Duration>,
-    trigger: Option<Box<dyn Fn()>>,
-}
-
-impl TimerFdBuilder {
-    pub fn interval(mut self, interval: std::time::Duration) -> Self {
-        self.interval = Some(interval);
-        self
-    }
-
-    pub fn trigger<F: 'static + Fn()>(mut self, f: F) -> Self {
-        self.trigger = Some(Box::new(f));
-        self
-    }
-
-    pub fn build(self, io: &IOService) {
-        let mut timer = timerfd::TimerFd::new().expect("couldn't create new timerfd?");
-
-        timer.set_state(
-            timerfd::TimerState::Periodic {
-                current: std::time::Duration::from_millis(1),
-                interval: self
-                    .interval
-                    .expect("building timerfd with no interval set?"),
-            },
-            timerfd::SetTimeFlags::Default,
-        );
-
-        let interest = io.register_interest(timer.as_raw_fd());
-
-        let sock = Rc::new(TimerFd {
-            interest,
-            timer,
-            trigger: self.trigger.expect("building timerfd with no trigger?"),
-        });
-
-        io.register_handler(&sock.interest, sock.clone());
-    }
-}
-
-struct TimerFd {
-    interest: InterestRegistration,
-    timer: timerfd::TimerFd,
-    trigger: Box<dyn Fn()>,
-}
-
-impl FdHandler for TimerFd {
-    fn ready_read(&self, _rc: &Rc<dyn FdHandler>) {
-        self.timer.read();
-        (self.trigger)();
-    }
-
-    fn ready_write(&self, _rc: &Rc<dyn FdHandler>) {
-        unreachable!()
-    }
-
-    fn dispatch(&self, _msg: fleck_core::msg::Message) {
-        unreachable!()
-    }
-}

+ 65 - 0
fleck/src/fleck_core/io/timerfd.rs

@@ -0,0 +1,65 @@
+use super::{fleck_core, FdHandler, IOService, InterestRegistration};
+use std::{os::unix::prelude::AsRawFd, rc::Rc};
+
+#[derive(Default)]
+pub struct TimerFdBuilder {
+    interval: Option<std::time::Duration>,
+    trigger: Option<Box<dyn Fn()>>,
+}
+
+impl TimerFdBuilder {
+    pub fn interval(mut self, interval: std::time::Duration) -> Self {
+        self.interval = Some(interval);
+        self
+    }
+
+    pub fn trigger<F: 'static + Fn()>(mut self, f: F) -> Self {
+        self.trigger = Some(Box::new(f));
+        self
+    }
+
+    pub fn build(self, io: &IOService) {
+        let mut timer = timerfd::TimerFd::new().expect("couldn't create new timerfd?");
+
+        timer.set_state(
+            timerfd::TimerState::Periodic {
+                current: std::time::Duration::from_millis(1),
+                interval: self
+                    .interval
+                    .expect("building timerfd with no interval set?"),
+            },
+            timerfd::SetTimeFlags::Default,
+        );
+
+        let interest = io.register_interest(timer.as_raw_fd());
+
+        let sock = Rc::new(TimerFd {
+            interest,
+            timer,
+            trigger: self.trigger.expect("building timerfd with no trigger?"),
+        });
+
+        io.register_handler(&sock.interest, sock.clone());
+    }
+}
+
+struct TimerFd {
+    interest: InterestRegistration,
+    timer: timerfd::TimerFd,
+    trigger: Box<dyn Fn()>,
+}
+
+impl FdHandler for TimerFd {
+    fn ready_read(&self, _rc: &Rc<dyn FdHandler>) {
+        self.timer.read();
+        (self.trigger)();
+    }
+
+    fn ready_write(&self, _rc: &Rc<dyn FdHandler>) {
+        unreachable!()
+    }
+
+    fn dispatch(&self, _msg: fleck_core::msg::Message) {
+        unreachable!()
+    }
+}

+ 154 - 0
fleck/src/fleck_core/io/udp.rs

@@ -0,0 +1,154 @@
+use super::{fleck_core, FdHandler, IOService, InterestRegistration};
+use fleck_core::peer::{Peer, PeerAddress, PeerData};
+use std::{
+    cell::RefCell,
+    collections::{HashMap, VecDeque},
+    os::unix::prelude::{AsRawFd, RawFd},
+    rc::Rc,
+};
+
+#[derive(Default)]
+pub struct UdpSocketBuilder {
+    bind: Option<std::net::SocketAddr>,
+    multicast: Vec<(std::net::IpAddr, std::net::IpAddr)>,
+}
+
+impl UdpSocketBuilder {
+    pub fn bind_to(mut self, addr: std::net::SocketAddr) -> Self {
+        self.bind = Some(addr);
+        self
+    }
+
+    pub fn join_multicast(mut self, group: std::net::IpAddr, iface: std::net::IpAddr) -> Self {
+        self.multicast.push((group, iface));
+        self
+    }
+
+    pub fn build(self, io: &IOService) -> UdpSocketRef {
+        let socket =
+            mio::net::UdpSocket::bind(self.bind.expect("no bind address given to UDP socket!"))
+                .expect("couldn't bind UDP socket");
+        let interest = io.register_interest(socket.as_raw_fd());
+
+        for mcast in &self.multicast {
+            use std::net::IpAddr;
+            match (mcast.0, mcast.1) {
+                (IpAddr::V4(group), IpAddr::V4(iface)) => {
+                    socket
+                        .join_multicast_v4(&group, &iface)
+                        .expect("couldn't join multicast group");
+                }
+                (IpAddr::V6(_group), IpAddr::V6(_iface)) => {
+                    todo!()
+                }
+                _ => panic!("Multicast specification mixes ipv4 and ipv6 addresses!"),
+            }
+        }
+
+        let sock = Rc::new(UdpSocket {
+            api: io.api.clone(),
+            interest,
+            socket,
+            queue: Default::default(),
+            peers: Default::default(),
+            multicast_joined: self.multicast.iter().map(|e| e.0).collect(),
+        });
+
+        io.register_handler(&sock.interest, sock.clone());
+
+        UdpSocketRef { sock }
+    }
+}
+
+pub struct UdpSocketRef {
+    sock: Rc<UdpSocket>,
+}
+
+impl UdpSocketRef {
+    pub fn multicast_peers(&self) -> Vec<Peer> {
+        let port = self.sock.socket.local_addr().unwrap().port();
+        let rh: Rc<dyn FdHandler> = self.sock.clone();
+        self.sock
+            .multicast_joined
+            .iter()
+            .map(|ip| {
+                self.sock
+                    .peer_for(&rh, &std::net::SocketAddr::new(*ip, port))
+            })
+            .collect()
+    }
+}
+
+struct UdpSocket {
+    api: std::rc::Rc<crate::API>,
+    interest: InterestRegistration,
+    socket: mio::net::UdpSocket,
+    queue: RefCell<VecDeque<(std::net::SocketAddr, Vec<u8>)>>,
+    peers: RefCell<HashMap<std::net::SocketAddr, Peer>>,
+    multicast_joined: Vec<std::net::IpAddr>,
+}
+
+impl UdpSocket {
+    fn peer_for(&self, rc: &Rc<dyn FdHandler>, addr: &std::net::SocketAddr) -> Peer {
+        let mut peers = self.peers.borrow_mut();
+        if let Some(peer) = peers.get(addr) {
+            peer.clone()
+        } else {
+            let peer = Peer {
+                data: Rc::new(PeerData::new(rc.clone(), PeerAddress::Udp(*addr))),
+            };
+            peers.insert(*addr, peer.clone());
+            peer
+        }
+    }
+}
+
+impl FdHandler for UdpSocket {
+    fn ready_read(&self, rc: &Rc<dyn FdHandler>) {
+        let mut buf = [0u8; 2048];
+        while let Ok((bytes, addr)) = self.socket.recv_from(&mut buf) {
+            if let Ok(mut msg) = bincode::deserialize::<fleck_core::msg::Message>(&buf[..bytes]) {
+                msg.peer = Some(self.peer_for(rc, &addr));
+                self.api.queue::<fleck_core::ReceivePacketChannel>(msg);
+            }
+        }
+    }
+
+    fn ready_write(&self, _rc: &Rc<dyn FdHandler>) {
+        let mut queue = self.queue.borrow_mut();
+        while !queue.is_empty() {
+            let packet = queue.front().unwrap();
+
+            if let Ok(len) = self.socket.send_to(&packet.1, packet.0) {
+                if len != packet.1.len() {
+                    log::error!(
+                        "Sent packet truncated to {} of {} bytes",
+                        len,
+                        packet.1.len()
+                    );
+                }
+
+                queue.pop_front();
+            } else {
+                return;
+            }
+        }
+
+        self.interest.update_interest(mio::Interest::READABLE);
+    }
+
+    fn dispatch(&self, msg: fleck_core::msg::Message) {
+        let peer = msg.peer.clone().unwrap(); // if we got this far, the peer points to us
+
+        if let PeerAddress::Udp(addr) = peer.data.address_info {
+            self.queue.borrow_mut().push_back((
+                addr,
+                bincode::serialize(&msg).expect("couldn't serialize message?"),
+            ));
+            self.interest
+                .update_interest(mio::Interest::READABLE.add(mio::Interest::WRITABLE));
+        } else {
+            log::error!("packet dispatched to UdpSocket without UDP address!");
+        }
+    }
+}

+ 2 - 7
fleck/src/fleck_core/msg.rs

@@ -1,5 +1,4 @@
 use std::{
-    any::Any,
     cell::RefCell,
     collections::HashMap,
     hash::{Hash, Hasher},
@@ -20,6 +19,7 @@ pub trait MessageParams: 'static {
 
 #[derive(Clone, Copy, Debug)]
 pub(crate) struct SavedMessageParams {
+    #[allow(unused)]
     pub(crate) name: &'static str,
     pub(crate) encrypted: bool,
     pub(crate) signed: bool,
@@ -57,10 +57,6 @@ pub struct Message {
     #[serde(skip_deserializing)]
     pub(crate) saved_params: Option<SavedMessageParams>,
 
-    #[serde(skip_serializing)]
-    #[serde(skip_deserializing)]
-    pub(crate) parsed: Option<Box<dyn Any>>,
-
     #[serde(skip_serializing)]
     #[serde(skip_deserializing)]
     pub(crate) peer: Option<super::peer::Peer>,
@@ -86,7 +82,6 @@ impl Message {
                 data: bincode::serialize(&from).expect("couldn't serialize message"),
             },
             saved_params: Some(SavedMessageParams::save::<M>()),
-            parsed: Some(Box::new(from)),
             peer: None,
             node: None,
         }
@@ -119,7 +114,7 @@ impl<T: 'static + Serialize + MessageParams> From<T> for Message {
 pub struct MessageChannelTag;
 pub type MessageChannel<M> = (MessageChannelTag, (Metadata, M));
 
-type Deserializer = dyn Fn(&Message) -> ();
+type Deserializer = dyn Fn(&Message);
 
 pub struct MessageService {
     api: Rc<crate::API>,

+ 3 - 5
fleck/src/fleck_core/node.rs

@@ -164,15 +164,13 @@ impl NodeService {
 
 impl Service for NodeService {
     fn new(api: Rc<crate::API>) -> Self {
-        let ret = Self {
+        Self {
             api,
             all_nodes: Default::default(),
             // node_by_pubkey: Default::default(),
             node_by_peer: Default::default(),
             self_node: Default::default(),
-        };
-
-        ret
+        }
     }
 
     fn setup(self: &Rc<Self>) {
@@ -189,7 +187,7 @@ impl NodeService {
         // annotate node based on peer
         if let Some(peer) = &msg.peer {
             let map = self.node_by_peer.borrow();
-            match map.get(&peer) {
+            match map.get(peer) {
                 Some(node) => {
                     msg.node = Some(node.clone());
                 }

+ 4 - 1
fleck/src/fleck_core/peer.rs

@@ -41,7 +41,10 @@ impl PeerData {
 
 impl std::fmt::Debug for PeerData {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.write_fmt(format_args!("PeerData {{ address_info: {:?} }}", self.address_info))
+        f.write_fmt(format_args!(
+            "PeerData {{ address_info: {:?} }}",
+            self.address_info
+        ))
     }
 }
 

+ 9 - 9
fleck/src/helper.rs

@@ -11,29 +11,29 @@ impl<T: Any> AsAny for T {
 }
 
 pub trait IntoWeak<T> {
-    fn as_weak(self) -> std::rc::Weak<T>;
+    fn as_weak(&self) -> std::rc::Weak<T>;
 }
 
 impl<T> IntoWeak<T> for std::rc::Rc<T> {
-    fn as_weak(self) -> std::rc::Weak<T> {
-        std::rc::Rc::downgrade(&self)
+    fn as_weak(&self) -> std::rc::Weak<T> {
+        std::rc::Rc::downgrade(self)
     }
 }
 
 impl<'a, T> IntoWeak<T> for &'a std::rc::Rc<T> {
-    fn as_weak(self) -> std::rc::Weak<T> {
-        std::rc::Rc::downgrade(&self)
+    fn as_weak(&self) -> std::rc::Weak<T> {
+        std::rc::Rc::downgrade(self)
     }
 }
 
 impl<T> IntoWeak<T> for std::rc::Weak<T> {
-    fn as_weak(self) -> std::rc::Weak<T> {
-        self
+    fn as_weak(&self) -> std::rc::Weak<T> {
+        self.clone()
     }
 }
 
 impl<'a, T> IntoWeak<T> for &'a std::rc::Weak<T> {
-    fn as_weak(self) -> std::rc::Weak<T> {
-        self.clone()
+    fn as_weak(&self) -> std::rc::Weak<T> {
+        (*self).clone()
     }
 }

+ 4 - 17
fleck/src/lib.rs

@@ -10,7 +10,7 @@
 //! global state is stored. The [`API`] instance stores a collection of
 //! [`Channel`](service/event/struct.Channel.html) and [`Service`](service/trait.Service.html)
 //! instances. Services listen on specific channels, and events are sent to channels to be handled.
-//! 
+//!
 //! For example, the [`NodeService`](fleck_core/node/struct.NodeService.html) will send events on a
 //! channel whenever a new node joins the `fleck` network. To reduce shared global state, channels
 //! are identified by a static [`ChannelSpec`](service/event/trait.ChannelSpec.html) type, which is
@@ -22,7 +22,6 @@
 //! See the documentation for the [`fleck_core`] module to see the core
 //! services and channels that make up a standard `fleck` node.
 
-
 use std::rc::Rc;
 
 pub mod fleck_core;
@@ -37,6 +36,7 @@ pub mod prelude {
 
 use prelude::*;
 
+/// Entry point to a running fleck instance.
 pub struct API {
     services: std::cell::RefCell<service::ServiceStack>,
 }
@@ -55,6 +55,7 @@ impl API {
         res
     }
 
+    /// Run the fleck instance. Note that this function will not return until fleck is shut down.
     pub fn run(self: &Rc<Self>) {
         // check state
         self.with_service(|node: &fleck_core::NodeService| {
@@ -69,7 +70,7 @@ impl API {
 }
 
 impl API {
-    /// Add a service to the Fleck instance.
+    /// Add a service to the fleck instance.
     pub fn add_service<S: Service + 'static>(&self) {
         self.services.borrow().give_service::<S>();
     }
@@ -100,17 +101,3 @@ impl API {
             .queue(data);
     }
 }
-
-pub struct Fleck {
-    api: Rc<API>,
-}
-
-impl Fleck {
-    pub fn new() -> Self {
-        Self { api: API::new() }
-    }
-
-    pub fn api(&self) -> Rc<API> {
-        self.api.clone()
-    }
-}

+ 0 - 1
fleck/src/service.rs

@@ -1,7 +1,6 @@
 use std::any::TypeId;
 use std::cell::RefCell;
 use std::collections::HashMap;
-use std::ops::Deref;
 use std::rc::{Rc, Weak};
 
 pub mod event;

+ 9 - 16
fleck/src/service/event.rs

@@ -7,7 +7,7 @@ use std::{
 pub enum SubscriptionFunction<Host: 'static, Data: 'static> {
     ByRef(Box<dyn Fn(&Host, &mut Data)>),
     ByValue(Box<dyn Fn(&Host, Data) -> Option<Data>>),
-    Consume(Box<dyn Fn(&Host, Data) -> ()>),
+    Consume(Box<dyn Fn(&Host, Data)>),
 }
 
 impl<Host: 'static, Data: 'static> SubscriptionFunction<Host, Data> {
@@ -43,13 +43,13 @@ impl<Host: 'static, Data: 'static> EventSub<Data> for ConcreteEventSub<Host, Dat
     fn invoke(&self, data: Data) -> Option<Data> {
         self.host
             .upgrade()
-            .map(|h| self.callback.invoke(h.as_ref(), data))
-            .flatten()
+            .and_then(|h| self.callback.invoke(h.as_ref(), data))
     }
 }
 
+type Subscription<Data, Priority> = (Priority, Rc<dyn EventSub<Data>>);
 pub struct Channel<Tag: 'static, Data: 'static, Priority: 'static> {
-    subs: RefCell<Vec<(Priority, Rc<dyn EventSub<Data>>)>>,
+    subs: RefCell<Vec<Subscription<Data, Priority>>>,
 
     queue: RefCell<VecDeque<Data>>,
 
@@ -136,7 +136,7 @@ impl<Tag: 'static, Data: 'static, Priority: PartialOrd> Channel<Tag, Data, Prior
         });
 
         let mut subs = self.subs.borrow_mut();
-        subs.push((p.into(), sub));
+        subs.push((p, sub));
         // XXX: what happens if we actually get an undefined order?...
         subs.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
     }
@@ -157,7 +157,7 @@ impl<Tag: 'static, Data: 'static, Priority: PartialOrd> Channel<Tag, Data, Prior
         });
 
         let mut subs = self.subs.borrow_mut();
-        subs.push((p.into(), sub));
+        subs.push((p, sub));
         // XXX: what happens if we actually get an undefined order?...
         subs.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
     }
@@ -249,18 +249,11 @@ impl<Tag: 'static, Data: 'static> ChannelSpec for (Tag, Data) {
     type Priority = NoPriorityTag;
 }
 
+type NamedChannelMetadata = (&'static str, Rc<dyn ChannelMetadata>);
+#[derive(Default)]
 pub struct EventRoot {
     pub(crate) channels: RefCell<HashMap<std::any::TypeId, Rc<dyn std::any::Any>>>,
-    metadata: RefCell<HashMap<std::any::TypeId, (&'static str, Rc<dyn ChannelMetadata>)>>,
-}
-
-impl Default for EventRoot {
-    fn default() -> Self {
-        Self {
-            channels: Default::default(),
-            metadata: Default::default(),
-        }
-    }
+    metadata: RefCell<HashMap<std::any::TypeId, NamedChannelMetadata>>,
 }
 
 impl EventRoot {