Prechádzať zdrojové kódy

Started cleaner FD interface.

Kestrel 2 rokov pred
rodič
commit
f4c5243972

+ 0 - 6
fleck/examples/simple_node.rs

@@ -5,11 +5,5 @@ fn main() {
 
     let fleck = fleck::Fleck::new();
 
-    fleck.bind_udp_socket(3535, None);
-
-    /*fleck.services().with_service(|node: &fleck_core::Nodes| {
-        node.self_node().gen_keypair();
-    });*/
-
     fleck.run();
 }

+ 6 - 6
fleck/src/fleck_core.rs

@@ -1,8 +1,8 @@
-pub(crate) mod lowlevel;
-mod nodes;
+// pub(crate) mod lowlevel;
+// mod nodes;
 
-pub use nodes::Node;
-pub use nodes::Nodes;
+// pub use nodes::Node;
+// pub use nodes::Nodes;
 
 pub mod channel_tags {
     pub struct SendPacketTag {}
@@ -18,7 +18,7 @@ pub enum SendOrder {
     Encrypt,
     Send,
 }
-pub type SendPacketChannel = (channel_tags::SendPacketTag, crate::io::Packet, SendOrder);
+// pub type SendPacketChannel = (channel_tags::SendPacketTag, crate::io::Packet, SendOrder);
 
 #[derive(PartialEq, PartialOrd)]
 pub enum ReceiveOrder {
@@ -27,7 +27,7 @@ pub enum ReceiveOrder {
     Parse,
     Dispatch,
 }
-pub type ReceivePacketChannel = (channel_tags::ReceivePacketTag, crate::io::Packet, ReceiveOrder);
+// pub type ReceivePacketChannel = (channel_tags::ReceivePacketTag, crate::io::Packet, ReceiveOrder);
 
 /// Minor ticks (roughly once per second): for retries, status updates, etc.
 pub type MinorTickChannel = (channel_tags::MinorTickTag, ());

+ 95 - 63
fleck/src/io.rs

@@ -1,13 +1,18 @@
-use std::{rc::Rc, cell::RefCell, collections::HashMap};
+use std::{
+    cell::{Cell, RefCell},
+    collections::HashMap,
+    rc::Rc,
+};
 
 use mio::{Events, Interest, Poll, Token};
 
-use crate::{Fleck, prelude::ChannelSpec, service::event::NoPriorityTag};
+use crate::{prelude::ChannelSpec, service::event::NoPriorityTag, Fleck};
 
-use self::udp::UdpData;
+pub mod builder;
+// pub(super) mod stream;
+// pub(super) mod udp;
 
-pub(super) mod udp;
-pub(super) mod stream;
+use builder::DefaultServiceBuilder;
 
 lazy_static::lazy_static! {
     pub static ref MULTICAST_ADDRESS : std::net::SocketAddrV4 =
@@ -27,13 +32,15 @@ pub(super) trait AbstractFdService: 'static {
 
 pub(super) struct IO {
     poll: Rc<RefCell<mio::Poll>>,
-    services: HashMap<Token, Box<dyn AbstractFdService>>,
+    services: RefCell<HashMap<Token, Rc<dyn AbstractFdService>>>,
 }
 
 impl IO {
     pub(super) fn new() -> Self {
         Self {
-            poll: Rc::new(RefCell::new(mio::Poll::new().expect("couldn't create mio::Poll"))),
+            poll: Rc::new(RefCell::new(
+                mio::Poll::new().expect("couldn't create mio::Poll"),
+            )),
             services: Default::default(),
         }
     }
@@ -41,10 +48,16 @@ impl IO {
     pub(super) fn run(&self, api: &Fleck) {
         let mut events = mio::Events::with_capacity(128);
         loop {
+            log::trace!("Waiting for events...");
             let pr = self.poll.borrow_mut().poll(&mut events, None);
-            if pr.is_err() { continue }
+            log::trace!("Polled!");
+            if pr.is_err() {
+                continue;
+            }
+            log::trace!("Waking up for events");
+            let svcs = self.services.borrow();
             for evt in &events {
-                let svc = self.services.get(&evt.token()).unwrap();
+                let svc = svcs.get(&evt.token()).unwrap();
                 if evt.is_readable() {
                     svc.ready_read(api);
                 }
@@ -57,22 +70,41 @@ impl IO {
         }
     }
 
-    pub(super) fn add_service<S: AbstractFdService>(&mut self, srv: S) {
+    pub(super) fn add_service<'l>(&self, fleck: &'l Fleck) -> DefaultServiceBuilder<'l> {
+        let tok = Token(self.services.borrow().len() + 1);
+
+        builder::DefaultServiceBuilder::new(fleck, self.poll.clone(), tok)
+    }
+
+    /*pub(super) fn add_service<S: AbstractFdService>(&mut self, srv: S) {
         let tok = Token(self.services.len() + 1);
-        self.services.insert(tok, Box::new(srv));
+        self.services.insert(tok, Rc::new(srv));
     }
 
-    pub(super) fn add_udp_service<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, DataType: UdpData>(&mut self, udp: mio::net::UdpSocket) {
+    pub(super) fn add_udp_service<CB: 'static + udp::UdpCallback>(&mut self, udp: mio::net::UdpSocket, on_read: CB) -> impl Fn(std::net::SocketAddr, Vec<u8>) {
         let token = Token(self.services.len() + 1);
-        self.services.insert(token, Box::new(udp::UdpService::<Incoming, Outgoing, DataType> {
+        let srv = Rc::new(udp::UdpService {
+            socket: RefCell::new(udp),
+            token,
+            poll: self.poll.clone(),
+            send_queue: Default::default(),
+            write_registered: Cell::new(false),
+            read_callback: Rc::new(on_read),
+        });
+        self.services.insert(token, srv);
+
+        |addr, data| {
+
+        }
+        /*self.services.insert(token, Box::new(udp::UdpService::<Incoming, Outgoing, DataType> {
             socket: RefCell::new(udp),
             token,
             poll: self.poll.clone(),
             send_queue: Default::default(),
             write_registered: std::cell::Cell::new(false),
             _ghost: std::marker::PhantomData
-        }));
-    }
+        }));*/
+    }*/
 }
 
 /*pub struct FleckIO {
@@ -241,51 +273,51 @@ impl FleckIO {
 
 /*
 
-        // minor tick registration
-        let mut minor = timerfd::TimerFd::new().unwrap();
-        minor.set_state(
-            timerfd::TimerState::Periodic {
-                current: std::time::Duration::new(1, 0),
-                interval: std::time::Duration::new(1, 0),
-            },
-            timerfd::SetTimeFlags::Default,
-        );
-        poll.registry()
-            .register(
-                &mut SourceFd(&minor.as_raw_fd()),
-                Self::MINOR_TICK,
-                Interest::READABLE,
-            )
-            .expect("couldn't register read interest for minor timer?");
-
-        // major tick registration
-        let mut major = timerfd::TimerFd::new().unwrap();
-        major.set_state(
-            timerfd::TimerState::Periodic {
-                current: std::time::Duration::new(1, 0),
-                interval: std::time::Duration::new(15, 0),
-            },
-            timerfd::SetTimeFlags::Default,
-        );
-        poll.registry()
-            .register(
-                &mut SourceFd(&major.as_raw_fd()),
-                Self::MAJOR_TICK,
-                Interest::READABLE,
-            )
-            .expect("couldn't register read interest for major timer?");
-
-        // global socket
-        let mut global_socket = mio::net::UdpSocket::bind(
-            std::net::SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, DEFAULT_PORT).into(),
-        )
-        .expect("couldn't listen on UDP port?");
-        global_socket
-            .join_multicast_v4(&MULTICAST_ADDRESS.ip(), &std::net::Ipv4Addr::UNSPECIFIED)
-            .expect("couldn't join multicast group?");
-
-        poll.registry()
-            .register(&mut global_socket, Self::GLOBAL, Interest::READABLE)
-            .expect("couldn't register read interest in UDP port?");
- *
- * */
+       // minor tick registration
+       let mut minor = timerfd::TimerFd::new().unwrap();
+       minor.set_state(
+           timerfd::TimerState::Periodic {
+               current: std::time::Duration::new(1, 0),
+               interval: std::time::Duration::new(1, 0),
+           },
+           timerfd::SetTimeFlags::Default,
+       );
+       poll.registry()
+           .register(
+               &mut SourceFd(&minor.as_raw_fd()),
+               Self::MINOR_TICK,
+               Interest::READABLE,
+           )
+           .expect("couldn't register read interest for minor timer?");
+
+       // major tick registration
+       let mut major = timerfd::TimerFd::new().unwrap();
+       major.set_state(
+           timerfd::TimerState::Periodic {
+               current: std::time::Duration::new(1, 0),
+               interval: std::time::Duration::new(15, 0),
+           },
+           timerfd::SetTimeFlags::Default,
+       );
+       poll.registry()
+           .register(
+               &mut SourceFd(&major.as_raw_fd()),
+               Self::MAJOR_TICK,
+               Interest::READABLE,
+           )
+           .expect("couldn't register read interest for major timer?");
+
+       // global socket
+       let mut global_socket = mio::net::UdpSocket::bind(
+           std::net::SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, DEFAULT_PORT).into(),
+       )
+       .expect("couldn't listen on UDP port?");
+       global_socket
+           .join_multicast_v4(&MULTICAST_ADDRESS.ip(), &std::net::Ipv4Addr::UNSPECIFIED)
+           .expect("couldn't join multicast group?");
+
+       poll.registry()
+           .register(&mut global_socket, Self::GLOBAL, Interest::READABLE)
+           .expect("couldn't register read interest in UDP port?");
+*
+* */

+ 416 - 0
fleck/src/io/builder.rs

@@ -0,0 +1,416 @@
+use std::{cell::{RefCell, Cell}, os::unix::prelude::{OwnedFd, AsRawFd, FromRawFd, RawFd}, rc::Rc, collections::VecDeque, io::{Read, Write}};
+
+use mio::{Token, unix::SourceFd};
+
+use crate::{prelude::{ChannelSpec, Service}, service::event::NoPriorityTag};
+use super::AbstractFdService;
+
+pub type NoChannel = ((), ());
+
+pub struct MulticastSpec {
+    group: std::net::IpAddr,
+    iface: std::net::IpAddr,
+}
+
+pub struct NoData;
+
+pub struct UdpData {
+    bind_addr: Option<std::net::SocketAddr>,
+    join_multicast: Option<MulticastSpec>,
+}
+
+pub struct StreamData {
+    fd: Option<RawFd>,
+}
+
+pub struct TimerData {
+    timer: Option<timerfd::TimerState>,
+}
+
+struct InterestRegistration {
+    poll: Rc<RefCell<mio::Poll>>,
+    write: Cell<bool>,
+    token: Token,
+}
+
+impl InterestRegistration {
+    fn subscribe_read(&self, source: &mut dyn mio::event::Source) {
+        self.poll.borrow_mut().registry().register(
+            source, self.token, mio::Interest::READABLE,
+        ).expect("couldn't subscribe?")
+    }
+
+    fn subscribe_write(&self, source: &mut dyn mio::event::Source) {
+        if self.write.get() { return }
+        self.poll.borrow_mut().registry().reregister(
+            source, self.token, mio::Interest::READABLE.add(mio::Interest::WRITABLE),
+        ).expect("couldn't subscribe?");
+        self.write.set(true);
+    }
+
+    fn unsubscribe_write(&self, source: &mut dyn mio::event::Source) {
+        self.poll.borrow_mut().registry().reregister(
+            source, self.token, mio::Interest::READABLE,
+        ).expect("couldn't subscribe?");
+        self.write.set(false);
+    }
+}
+
+pub type DefaultServiceBuilder<'l> = ServiceBuilder<'l, NoChannel, NoChannel, NoData>;
+
+// Source: where the service writes packets to (recv)
+// Sink: where the service reads packets from (send)
+pub struct ServiceBuilder<'l, SourceChannel: ChannelSpec, SinkChannel: ChannelSpec, Data> {
+    fleck: &'l crate::Fleck,
+    poll: Rc<RefCell<mio::Poll>>,
+    token: Token,
+
+    source_priority: Option<SourceChannel::Priority>,
+    sink_priority: Option<SinkChannel::Priority>,
+
+    data: Data,
+
+    _ghost: std::marker::PhantomData<(SourceChannel, SinkChannel)>,
+}
+
+impl<'l, SourceChannel: ChannelSpec, SinkChannel: ChannelSpec>
+    ServiceBuilder<'l, SourceChannel, SinkChannel, NoData>
+{
+    pub(super) fn new(fleck: &'l crate::Fleck, poll: Rc<RefCell<mio::Poll>>, token: Token) -> Self {
+        Self {
+            fleck,
+            poll,
+            token,
+
+            source_priority: None,
+            sink_priority: None,
+
+            data: NoData,
+
+            _ghost: Default::default(),
+        }
+    }
+}
+
+impl<'l, SinkChannel: ChannelSpec, Data> ServiceBuilder<'l, NoChannel, SinkChannel, Data> {
+    pub fn source<SourceChannel: ChannelSpec<Priority = NoPriorityTag>>(
+        self,
+    ) -> ServiceBuilder<'l, SourceChannel, SinkChannel, Data> {
+        ServiceBuilder {
+            fleck: self.fleck,
+            poll: self.poll,
+            token: self.token,
+
+            source_priority: Some(NoPriorityTag),
+            sink_priority: self.sink_priority,
+
+            data: self.data,
+
+            _ghost: Default::default(),
+        }
+    }
+
+    pub fn source_with<Priority: PartialOrd, SourceChannel: ChannelSpec<Priority = Priority>>(
+        self,
+        p: Priority,
+    ) -> ServiceBuilder<'l, SourceChannel, SinkChannel, Data> {
+        ServiceBuilder {
+            fleck: self.fleck,
+            poll: self.poll,
+            token: self.token,
+
+            source_priority: Some(p),
+            sink_priority: self.sink_priority,
+
+            data: self.data,
+
+            _ghost: Default::default(),
+        }
+    }
+}
+
+impl<'l, SourceChannel: ChannelSpec, Data> ServiceBuilder<'l, SourceChannel, NoChannel, Data> {
+    pub fn sink<SinkChannel: ChannelSpec<Priority = NoPriorityTag>>(
+        self,
+    ) -> ServiceBuilder<'l, SourceChannel, SinkChannel, Data> {
+        ServiceBuilder {
+            fleck: self.fleck,
+            poll: self.poll,
+            token: self.token,
+
+            source_priority: self.source_priority,
+            sink_priority: Some(NoPriorityTag),
+
+            data: self.data,
+
+            _ghost: Default::default(),
+        }
+    }
+
+    pub fn sink_with<Priority: PartialOrd, SinkChannel: ChannelSpec<Priority = Priority>>(
+        self,
+        p: Priority,
+    ) -> ServiceBuilder<'l, SourceChannel, SinkChannel, Data> {
+        ServiceBuilder {
+            fleck: self.fleck,
+            poll: self.poll,
+            token: self.token,
+
+            source_priority: self.source_priority,
+            sink_priority: Some(p),
+
+            data: self.data,
+
+            _ghost: Default::default(),
+        }
+    }
+}
+
+impl<'l,
+        PacketType: 'static + From<(std::net::SocketAddr, Vec<u8>)> + Into<(std::net::SocketAddr, Vec<u8>)>,
+        SourceChannel: ChannelSpec<Data = PacketType>,
+        SinkChannel: ChannelSpec<Data = PacketType>,
+    > ServiceBuilder<'l, SourceChannel, SinkChannel, NoData>
+{
+    pub fn udp(self) -> ServiceBuilder<'l, SourceChannel, SinkChannel, UdpData> {
+        ServiceBuilder {
+            fleck: self.fleck,
+            poll: self.poll,
+            token: self.token,
+
+            source_priority: self.source_priority,
+            sink_priority: self.sink_priority,
+
+            data: UdpData {
+                bind_addr: None,
+                join_multicast: None,
+            },
+
+            _ghost: Default::default(),
+        }
+    }
+}
+
+impl<'l,
+        PacketType: 'static + From<(std::net::SocketAddr, Vec<u8>)> + Into<(std::net::SocketAddr, Vec<u8>)>,
+        SourceChannel: ChannelSpec<Data = PacketType>,
+        SinkChannel: ChannelSpec<Data = PacketType>,
+    > ServiceBuilder<'l, SourceChannel, SinkChannel, UdpData>
+{
+    pub fn set_bind_address(mut self, bind_addr: std::net::SocketAddr) -> Self {
+        self.data.bind_addr = Some(bind_addr);
+        self
+    }
+
+    pub fn set_multicast_group(
+        mut self,
+        group_addr: std::net::IpAddr,
+        interface_addr: std::net::IpAddr,
+    ) -> Self {
+        self.data.join_multicast = Some(MulticastSpec {
+            group: group_addr,
+            iface: interface_addr,
+        });
+        self
+    }
+
+    pub fn build(self) {
+        
+    }
+}
+
+/* timerfd support */
+impl<'l, SourceChannel: ChannelSpec<Data = ()>>
+    ServiceBuilder<'l, SourceChannel, ((), ()), NoData>
+{
+    pub fn timer(self) -> ServiceBuilder<'l, SourceChannel, ((), ()), TimerData> {
+        ServiceBuilder {
+            fleck: self.fleck,
+            poll: self.poll,
+            token: self.token,
+
+            source_priority: self.source_priority,
+            sink_priority: self.sink_priority,
+
+            data: TimerData { timer: None },
+
+            _ghost: Default::default(),
+        }
+    }
+}
+
+struct Timer<Tag: 'static> {
+    token: Token,
+    timer: timerfd::TimerFd,
+    _ghost: std::marker::PhantomData<Tag>,
+}
+
+impl<Tag: 'static> Service for Rc<Timer<Tag>> {}
+impl<Tag: 'static> AbstractFdService for Timer<Tag> {
+    fn ready_read(&self, api: &crate::Fleck) {
+        self.timer.read();
+        api.queue::<(Tag, ())>(());
+    }
+
+    fn ready_write(&self, api: &crate::Fleck) {
+        unreachable!();
+    }
+
+    fn token(&self) -> Token {
+        self.token
+    }
+}
+
+impl<'l, SourceChannel: ChannelSpec<Data = ()>>
+    ServiceBuilder<'l, SourceChannel, ((), ()), TimerData>
+{
+    pub fn setup(mut self, state: timerfd::TimerState) -> Self {
+        self.data.timer = Some(state);
+        self
+    }
+
+    pub fn build(self) {
+        let mut timer = timerfd::TimerFd::new_custom(timerfd::ClockId::Monotonic, true, false).expect("couldn't create timer");
+        timer.set_state(self.data.timer.expect("no setup done for timer?"), timerfd::SetTimeFlags::Default);
+
+        let timer = Rc::new(Timer::<SourceChannel::Tag> { token: self.token, timer, _ghost: Default::default() });
+
+        self.poll.borrow().registry().register(
+            &mut SourceFd(&timer.timer.as_raw_fd()),
+            self.token,
+            mio::Interest::READABLE
+        ).expect("couldn't register timer read interest?");
+
+        self.fleck.io.borrow_mut().services.borrow_mut().insert(self.token, timer);
+    }
+}
+
+/* stream support */
+impl<'l, SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkChannel: ChannelSpec<Data = Vec<u8>>>
+    ServiceBuilder<'l, SourceChannel, SinkChannel, NoData>
+{
+    pub fn stream(self) -> ServiceBuilder<'l, SourceChannel, SinkChannel, StreamData> {
+        ServiceBuilder {
+            fleck: self.fleck,
+            poll: self.poll,
+            token: self.token,
+
+            source_priority: self.source_priority,
+            sink_priority: self.sink_priority,
+
+            data: StreamData { fd: None },
+
+            _ghost: Default::default(),
+        }
+    }
+}
+
+impl<'l, SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkChannel: ChannelSpec<Data = Vec<u8>>>
+    ServiceBuilder<'l, SourceChannel, SinkChannel, StreamData>
+{
+    pub fn fd(mut self, fd: RawFd) -> Self {
+        self.data.fd = Some(fd);
+        self
+    }
+}
+
+impl<'l, SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkTag: 'static, SinkPriority: 'static + Clone + PartialOrd>
+    ServiceBuilder<'l, SourceChannel, (SinkTag, Vec<u8>, SinkPriority), StreamData>
+{
+    pub fn build(self) {
+        let svc = Rc::new(StreamService::<SourceChannel, (SinkTag, Vec<u8>, SinkPriority)> {
+            interest: InterestRegistration { poll: self.poll, write: Cell::new(false), token: self.token },
+            queue: Default::default(),
+            file: RefCell::new(unsafe { std::fs::File::from_raw_fd(self.data.fd.expect("Building stream IO service with no FD?")) }),
+            sink_priority: RefCell::new(Some(self.sink_priority.expect("no sink priority given!"))),
+            _ghost: Default::default(),
+        });
+
+        svc.interest.subscribe_read(&mut SourceFd(&svc.file.borrow().as_raw_fd()));
+
+        self.fleck.io.borrow_mut().services.borrow_mut().insert(self.token, svc.clone());
+        self.fleck.services.borrow_mut().add_service(svc.clone());
+    }
+}
+
+impl<'l, SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkTag: 'static>
+    ServiceBuilder<'l, SourceChannel, (SinkTag, Vec<u8>), StreamData>
+{
+    pub fn build(self) {
+        let svc = Rc::new(StreamService::<SourceChannel, (SinkTag, Vec<u8>)> {
+            interest: InterestRegistration { poll: self.poll, write: Cell::new(false), token: self.token },
+            queue: Default::default(),
+            file: RefCell::new(unsafe { std::fs::File::from_raw_fd(self.data.fd.expect("Building stream IO service with no FD?")) }),
+            sink_priority: RefCell::new(Some(self.sink_priority.expect("no sink priority given!"))),
+            _ghost: Default::default(),
+        });
+
+        svc.interest.subscribe_read(&mut SourceFd(&svc.file.borrow().as_raw_fd()));
+
+        self.fleck.io.borrow_mut().services.borrow_mut().insert(self.token, svc.clone());
+        self.fleck.services.borrow_mut().add_service(svc.clone());
+    }
+}
+
+pub struct StreamService<SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkChannel: ChannelSpec<Data = Vec<u8>>> {
+    interest: InterestRegistration,
+    queue: RefCell<VecDeque<u8>>,
+    file: RefCell<std::fs::File>,
+    sink_priority: RefCell<Option<SinkChannel::Priority>>,
+    _ghost: std::marker::PhantomData<(SourceChannel, SinkChannel)>,
+}
+
+/*impl<SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkChannel: ChannelSpec<Data = Vec<u8>, Priority = SinkPriority>, SinkPriority: 'static + PartialOrd + Clone> Service for Rc<StreamService<SourceChannel, SinkChannel>> {
+    fn register_channels(&self, eroot: &mut crate::prelude::EventRoot) {
+        eroot.channel::<SinkChannel>().sub_opt(self.sink_priority.clone(), self, StreamService::sink_data);
+    }
+}*/
+
+impl<SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkTag: 'static, SinkPriority: 'static + Clone + PartialOrd> Service for Rc<StreamService<SourceChannel, (SinkTag, Vec<u8>, SinkPriority)>> {
+    fn register_channels(&self, eroot: &mut crate::prelude::EventRoot) {
+        eroot.channel::<(SinkTag, Vec<u8>, SinkPriority)>().sub_opt(self.sink_priority.borrow_mut().take().expect("registering channel with no priority?"), self, StreamService::sink_data);
+    }
+}
+
+impl<SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkTag: 'static> Service for Rc<StreamService<SourceChannel, (SinkTag, Vec<u8>)>> {
+    fn register_channels(&self, eroot: &mut crate::prelude::EventRoot) {
+        eroot.channel::<(SinkTag, Vec<u8>)>().sub_opt(self, StreamService::sink_data);
+    }
+}
+
+impl<SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkChannel: ChannelSpec<Data = Vec<u8>>> StreamService<SourceChannel, SinkChannel> {
+    fn sink_data(&self, api: &crate::Fleck, mut data: Vec<u8>) -> Option<Vec<u8>> {
+        self.queue.borrow_mut().extend(data.drain(..));
+        self.interest.subscribe_write(&mut SourceFd(&self.file.borrow().as_raw_fd()));
+        None
+    }
+}
+
+impl<SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkChannel: ChannelSpec<Data = Vec<u8>>> AbstractFdService for StreamService<SourceChannel, SinkChannel> {
+    fn ready_read(&self, api: &crate::Fleck) {
+        let mut buf = [0u8; 1024];
+        while let Ok(bytes) = self.file.borrow_mut().read(&mut buf) {
+            api.queue::<SourceChannel>(Vec::from(&buf[..bytes]));
+        }
+
+    }
+    fn ready_write(&self, _api: &crate::Fleck) {
+        let mut queue = self.queue.borrow_mut();
+        while !queue.is_empty() {
+            let (data, _) = queue.as_slices();
+            match self.file.borrow_mut().write(data) {
+                Ok(len) => {
+                    queue.drain(..len);
+                },
+                Err(_) => {
+                    return
+                }
+            }
+        }
+
+        self.interest.unsubscribe_write(&mut SourceFd(&self.file.borrow().as_raw_fd()));
+    }
+    fn token(&self) -> Token {
+        self.interest.token
+    }
+}

+ 37 - 12
fleck/src/io/stream.rs

@@ -1,23 +1,27 @@
 use std::cell::Cell;
+use std::cell::RefCell;
 use std::collections::VecDeque;
 use std::fs::File;
 use std::io::Read;
 use std::io::Write;
 use std::os::unix::prelude::AsRawFd;
 use std::rc::Rc;
-use std::cell::RefCell;
 
-use mio::Interest;
 use mio::unix::SourceFd;
+use mio::Interest;
 use mio::Token;
 
-use crate::Fleck;
+use super::AbstractFdService;
 use crate::prelude::*;
 use crate::service::event::NoPriorityTag;
-use super::AbstractFdService;
+use crate::Fleck;
 
 pub trait StreamData: 'static + Into<Vec<u8>> + From<Vec<u8>> {}
-pub struct StreamService<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing: ChannelSpec<Data = DataType>, DataType: StreamData> {
+pub struct StreamService<
+    Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>,
+    Outgoing: ChannelSpec<Data = DataType>,
+    DataType: StreamData,
+> {
     stream: RefCell<File>,
     token: Token,
     poll: Rc<RefCell<mio::Poll>>,
@@ -26,13 +30,25 @@ pub struct StreamService<Incoming: ChannelSpec<Priority = NoPriorityTag, Data =
     _ghost: std::marker::PhantomData<(Incoming, Outgoing, DataType)>,
 }
 
-impl<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, DataType: StreamData> Service for Rc<StreamService<Incoming, Outgoing, DataType>> {
-    fn register_channels(&self, eroot: &mut crate::prelude::EventRoot) { 
-        eroot.channel::<Outgoing>().sub_opt(self, StreamService::handle_outgoing);
+impl<
+        Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>,
+        Outgoing: ChannelSpec<Priority = NoPriorityTag, Data = DataType>,
+        DataType: StreamData,
+    > Service for Rc<StreamService<Incoming, Outgoing, DataType>>
+{
+    fn register_channels(&self, eroot: &mut crate::prelude::EventRoot) {
+        eroot
+            .channel::<Outgoing>()
+            .sub_opt(self, StreamService::handle_outgoing);
     }
 }
 
-impl<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, DataType: StreamData> StreamService<Incoming, Outgoing, DataType> {
+impl<
+        Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>,
+        Outgoing: ChannelSpec<Priority = NoPriorityTag, Data = DataType>,
+        DataType: StreamData,
+    > StreamService<Incoming, Outgoing, DataType>
+{
     fn handle_outgoing(&self, _api: &Fleck, packet: DataType) -> Option<DataType> {
         self.send_queue.borrow_mut().extend(packet.into().iter());
 
@@ -55,7 +71,9 @@ impl<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing:
     }
 
     fn register_write_interest(&self) {
-        if self.write_registered.get() { return }
+        if self.write_registered.get() {
+            return;
+        }
         self.poll
             .borrow()
             .registry()
@@ -69,7 +87,12 @@ impl<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing:
     }
 }
 
-impl<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, DataType: StreamData> AbstractFdService for StreamService<Incoming, Outgoing, DataType> {
+impl<
+        Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>,
+        Outgoing: ChannelSpec<Priority = NoPriorityTag, Data = DataType>,
+        DataType: StreamData,
+    > AbstractFdService for StreamService<Incoming, Outgoing, DataType>
+{
     fn ready_read(&self, api: &Fleck) {
         let mut buf = [0u8; 2048];
         while let Ok(len) = self.stream.borrow_mut().read(&mut buf) {
@@ -94,7 +117,9 @@ impl<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing:
             }
         }
 
-        if queue.is_empty() { self.clear_write_interest() }
+        if queue.is_empty() {
+            self.clear_write_interest()
+        }
     }
 
     fn token(&self) -> Token {

+ 26 - 25
fleck/src/io/udp.rs

@@ -1,38 +1,36 @@
+use std::cell::{Cell, RefCell};
 use std::collections::VecDeque;
 use std::net::SocketAddr;
-use std::rc::Rc;
-use std::cell::{Cell,RefCell};
 use std::ops::DerefMut;
+use std::rc::Rc;
 
 use mio::net::UdpSocket;
-use mio::Token;
 use mio::Interest;
+use mio::Token;
 
-use crate::Fleck;
-use crate::prelude::*;
-use crate::service::event::NoPriorityTag;
 use super::AbstractFdService;
+use crate::Fleck;
 
-pub trait UdpData: 'static + Into<(SocketAddr, Vec<u8>)> + From<(SocketAddr, Vec<u8>)> {}
+pub trait UdpCallback {
+    fn invoke(&self, addr: SocketAddr, data: Vec<u8>);
+}
 
-impl<T: 'static + Into<(SocketAddr, Vec<u8>)> + From<(SocketAddr, Vec<u8>)>> UdpData for T { }
+impl<T: Fn(SocketAddr, Vec<u8>)> UdpCallback for T {
+    fn invoke(&self, addr: SocketAddr, data: Vec<u8>) {
+        (self)(addr, data);
+    }
+}
 
-pub struct UdpService<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing: ChannelSpec<Data = DataType>, DataType: UdpData> {
+pub struct UdpService {
     pub(super) socket: RefCell<UdpSocket>,
     pub(super) token: Token,
     pub(super) poll: Rc<RefCell<mio::Poll>>,
     pub(super) send_queue: RefCell<VecDeque<(SocketAddr, Vec<u8>)>>,
     pub(super) write_registered: Cell<bool>,
-    pub(super) _ghost: std::marker::PhantomData<(Incoming, Outgoing, DataType)>,
-}
-
-impl<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, DataType: UdpData> Service for Rc<UdpService<Incoming, Outgoing, DataType>> {
-    fn register_channels(&self, eroot: &mut crate::prelude::EventRoot) { 
-        eroot.channel::<Outgoing>().sub_opt(self, UdpService::handle_outgoing);
-    }
+    pub(super) read_callback: Rc<dyn UdpCallback>,
 }
 
-impl<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, DataType: UdpData> UdpService<Incoming, Outgoing, DataType> {
+impl UdpService {
     fn clear_write_interest(&self) {
         self.poll
             .borrow()
@@ -47,7 +45,9 @@ impl<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing:
     }
 
     fn register_write_interest(&self) {
-        if self.write_registered.get() { return }
+        if self.write_registered.get() {
+            return;
+        }
         self.poll
             .borrow()
             .registry()
@@ -60,21 +60,20 @@ impl<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing:
         self.write_registered.set(true);
     }
 
-
-    fn handle_outgoing(&self, _api: &Fleck, packet: DataType) -> Option<DataType> {
+    /*fn handle_outgoing(&self, _api: &Fleck, packet: DataType) -> Option<DataType> {
         self.send_queue.borrow_mut().push_back(packet.into());
 
         self.register_write_interest();
 
         None
-    }
+    }*/
 }
 
-impl<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, DataType: UdpData> AbstractFdService for UdpService<Incoming, Outgoing, DataType> {
-    fn ready_read(&self, api: &Fleck) {
+impl AbstractFdService for UdpService {
+    fn ready_read(&self, _api: &Fleck) {
         let mut buf = [0u8; 2048];
         while let Ok((len, addr)) = self.socket.borrow_mut().recv_from(&mut buf) {
-            api.queue::<Incoming>((addr, Vec::from(&buf[..len])).into());
+            (self.read_callback)(addr, Vec::from(&buf[..len]));
         }
     }
 
@@ -100,7 +99,9 @@ impl<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing:
             }
         }
 
-        if queue.is_empty() { self.clear_write_interest() }
+        if queue.is_empty() {
+            self.clear_write_interest()
+        }
     }
 
     fn token(&self) -> Token {

+ 68 - 18
fleck/src/lib.rs

@@ -1,29 +1,47 @@
 use std::cell::RefCell;
 use std::collections::HashMap;
-use std::os::unix::prelude::OwnedFd;
+use std::os::unix::prelude::{AsRawFd, OwnedFd};
 use std::rc::Rc;
 
 mod msg;
 // mod crypto;
+pub mod fleck_core;
 mod helper;
 mod io;
 pub mod service;
-// pub mod fleck_core;
 
 pub mod prelude {
     pub use crate::helper::{AsAny, IntoWeak};
-    pub use crate::msg::{Message,MessageParams};
-    pub use crate::service::{
-        ChannelSpec, EventRoot, Service,
-    };
+    pub use crate::msg::{Message, MessageParams};
+    pub use crate::service::{ChannelSpec, EventRoot, Service};
     // pub use crate::fleck_core;
 }
 
 use prelude::*;
 
+struct SimpleTag;
+
+#[derive(Default)]
+struct SimpleListener<T: 'static>(std::marker::PhantomData<T>);
+
+impl<T: 'static> Service for Rc<SimpleListener<T>> {
+    fn register_channels(&self, eroot: &mut EventRoot) {
+        eroot.create_channel::<(SimpleTag, T)>();
+        eroot
+            .channel::<(SimpleTag, T)>()
+            .sub_opt(self, SimpleListener::<T>::tick);
+    }
+}
+
+impl<T: 'static> SimpleListener<T> {
+    fn tick(&self, _: &Fleck, _: T) -> Option<T> {
+        log::trace!("Ticked!");
+        None
+    }
+}
+
 pub struct Fleck {
     io: RefCell<io::IO>,
-    fd_services: RefCell<HashMap<mio::Token, Rc<dyn io::AbstractFdService>>>,
     services: RefCell<service::ServiceStack>,
 }
 
@@ -31,15 +49,46 @@ impl Fleck {
     pub fn new() -> Rc<Self> {
         let res = Rc::new(Self {
             io: RefCell::new(io::IO::new()),
-            fd_services: Default::default(),
             services: Default::default(),
         });
 
         res.register_core_services();
+        res.set_up_timers();
 
         res
     }
 
+    fn set_up_timers(&self) {
+        // minor tick registration
+        let mut minor =
+            timerfd::TimerFd::new_custom(timerfd::ClockId::Monotonic, true, false).unwrap();
+        minor.set_state(
+            timerfd::TimerState::Periodic {
+                current: std::time::Duration::new(1, 0),
+                interval: std::time::Duration::new(1, 0),
+            },
+            timerfd::SetTimeFlags::Default,
+        );
+
+        self.add_service::<SimpleListener<Vec<u8>>>();
+        self.add_service::<SimpleListener<()>>();
+
+        /*self.add_io()
+            .sink::<(SimpleTag, Vec<u8>)>()
+            .source::<(SimpleTag, Vec<u8>)>()
+            .stream()
+            .fd(minor.as_raw_fd())
+            .build();*/
+
+        self.add_io()
+            .source::<(SimpleTag, ())>()
+            .timer()
+            .setup(timerfd::TimerState::Periodic { current: std::time::Duration::new(1, 0), interval: std::time::Duration::new(1, 0) })
+            .build();
+
+        Box::leak(Box::new(minor));
+    }
+
     fn register_core_services(&self) {
         // initial outgoing/incoming/minor/major channels
         // self.services.borrow_mut().create_io_channels();
@@ -59,18 +108,20 @@ impl Fleck {
     }
 
     pub fn run(&self) {
-        let io = io::IO::new();
+        /*let io = io::IO::new();
 
         // TODO: insert/build global socket, minor, major timerfds
 
-        io.run(self);
+        io.run(self);*/
+        self.io.borrow().run(self);
     }
 }
 
-struct SockIncomingTag {}
-struct SockOutgoingTag {}
-
 impl Fleck {
+    pub fn add_io(&self) -> io::builder::DefaultServiceBuilder<'_> {
+        self.io.borrow().add_service(self)
+    }
+    /*
     pub fn bind_udp_socket(&self, port: u16, multicast: Option<std::net::Ipv4Addr>) {
         let socket = mio::net::UdpSocket::bind(
             std::net::SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, port).into(),
@@ -78,22 +129,21 @@ impl Fleck {
         .expect("couldn't listen on UDP port?");
 
         if let Some(mcast) = multicast {
-            socket 
+            socket
                 .join_multicast_v4(&mcast, &std::net::Ipv4Addr::UNSPECIFIED)
                 .expect("couldn't join multicast group?");
         }
 
-        self.io.borrow_mut().add_udp_service::<(SockIncomingTag, (std::net::SocketAddr, Vec<u8>)),(SockOutgoingTag, (std::net::SocketAddr, Vec<u8>)),_>(socket);
+        /*self.io.borrow_mut().add_udp_service::<(SockIncomingTag, (std::net::SocketAddr, Vec<u8>)),(SockOutgoingTag, (std::net::SocketAddr, Vec<u8>)),_>(socket);*/
     }
 
     /// Add a UDP socket to the Fleck polling loop.
-    pub fn add_udp_socket(&self, socket: mio::net::UdpSocket) {
-        
-    }
+    pub fn add_udp_socket(&self, socket: mio::net::UdpSocket) {}
     /// Add a file descriptor (stream socket, pipe, etc) to the Fleck polling loop.
     pub fn add_raw_fd(&self, socket: OwnedFd) {
         //self.io.borrow_mut().add_service(io::
     }
+    */
 
     /// Add a service to the Fleck instance. Must not be invoked from a service context.
     pub fn add_service<S: Default + 'static>(&self)

+ 7 - 7
fleck/src/msg.rs

@@ -83,13 +83,13 @@ pub struct MessageRegistry {
 }
 
 impl MessageRegistry {
-    pub fn add_message_type<MT: std::fmt::Debug + MessageParams + Serialize + DeserializeOwned>(&mut self) {
+    pub fn add_message_type<MT: std::fmt::Debug + MessageParams + Serialize + DeserializeOwned>(
+        &mut self,
+    ) {
         let derived_typeid = message_type::<MT>();
         self.deser.insert(
             derived_typeid,
-            Box::new(|data| {
-                Some(Box::new(bincode::deserialize::<MT>(data).ok()?))
-            }),
+            Box::new(|data| Some(Box::new(bincode::deserialize::<MT>(data).ok()?))),
         );
     }
 
@@ -114,9 +114,9 @@ impl ParsePacket {}
 impl Service for std::rc::Rc<ParsePacket> {
     fn register_channels(&self, eroot: &mut super::EventRoot) {
         /*eroot
-            .channel::<fleck_core::ReceivePacketChannel>()
-            // .with::<order::Preprocessing>()
-            .subscribe(fleck_core::ReceiveOrder::Parse, self, ParsePacket::process_incoming);*/
+        .channel::<fleck_core::ReceivePacketChannel>()
+        // .with::<order::Preprocessing>()
+        .subscribe(fleck_core::ReceiveOrder::Parse, self, ParsePacket::process_incoming);*/
     }
 }
 

+ 6 - 1
fleck/src/service.rs

@@ -40,8 +40,13 @@ impl ServiceStack {
     where
         Rc<S>: Service,
     {
-        let srv = Rc::new(S::default());
+        self.add_service(Rc::new(S::default()));
+    }
 
+    pub fn add_service<S: 'static>(&mut self, srv: Rc<S>)
+    where
+        Rc<S>: Service,
+    {
         srv.register_msgs(&mut self.message_registry);
         srv.register_channels(&mut self.eroot);
 

+ 44 - 26
fleck/src/service/event.rs

@@ -9,16 +9,16 @@ pub enum SubscriptionFunction<Host: 'static, Context: 'static + ?Sized, Data: 's
     ByValue(Box<dyn Fn(&Host, &Context, Data) -> Option<Data>>),
 }
 
-impl<Host: 'static, Context: 'static + ?Sized, Data: 'static> SubscriptionFunction<Host, Context, Data> {
+impl<Host: 'static, Context: 'static + ?Sized, Data: 'static>
+    SubscriptionFunction<Host, Context, Data>
+{
     fn invoke(&self, host: &Host, context: &Context, mut data: Data) -> Option<Data> {
         match self {
             Self::ByRef(f) => {
                 f(host, context, &mut data);
                 Some(data)
-            },
-            Self::ByValue(f) => {
-                f(host, context, data)
             }
+            Self::ByValue(f) => f(host, context, data),
         }
     }
 }
@@ -54,7 +54,7 @@ pub struct Channel<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Prior
 
     queue: RefCell<VecDeque<Data>>,
 
-    _ghost: std::marker::PhantomData<(Tag,Priority)>,
+    _ghost: std::marker::PhantomData<(Tag, Priority)>,
 }
 
 impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: 'static> Default
@@ -69,7 +69,9 @@ impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: 'static>
     }
 }
 
-impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> Channel<Tag, Context, Data, NoPriorityTag> {
+impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static>
+    Channel<Tag, Context, Data, NoPriorityTag>
+{
     pub fn sub_ref<
         Host: 'static,
         HC: crate::helper::IntoWeak<Host>,
@@ -105,7 +107,9 @@ impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> Channel<Tag, Contex
     }
 }
 
-impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: PartialOrd> Channel<Tag, Context, Data, Priority> {
+impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: PartialOrd>
+    Channel<Tag, Context, Data, Priority>
+{
     pub fn sub_ref<
         Host: 'static,
         HC: crate::helper::IntoWeak<Host>,
@@ -124,7 +128,7 @@ impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: PartialOr
         let mut subs = self.subs.borrow_mut();
         subs.push((p.into(), sub));
         // XXX: what happens if we actually get an undefined order?...
-        subs.sort_by(|a,b| a.0.partial_cmp(&b.0).unwrap());
+        subs.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
     }
 
     pub fn sub_opt<
@@ -145,11 +149,13 @@ impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: PartialOr
         let mut subs = self.subs.borrow_mut();
         subs.push((p.into(), sub));
         // XXX: what happens if we actually get an undefined order?...
-        subs.sort_by(|a,b| a.0.partial_cmp(&b.0).unwrap());
+        subs.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
     }
 }
 
-impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: 'static> Channel<Tag, Context, Data, Priority> {
+impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: 'static>
+    Channel<Tag, Context, Data, Priority>
+{
     pub fn queue(&self, data: Data) {
         self.queue.borrow_mut().push_back(data);
     }
@@ -162,7 +168,9 @@ impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: 'static>
 
             for sub in self.subs.borrow().iter() {
                 event = sub.1.invoke(context, event.unwrap());
-                if event.is_none() { break }
+                if event.is_none() {
+                    break;
+                }
             }
 
             count += 1;
@@ -177,8 +185,8 @@ trait ChannelMetadata<Context: 'static + ?Sized> {
     fn fire_all(&self, context: &Context) -> usize;
 }
 
-impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: 'static> ChannelMetadata<Context>
-    for Channel<Tag, Context, Data, Priority>
+impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: 'static>
+    ChannelMetadata<Context> for Channel<Tag, Context, Data, Priority>
 {
     fn queue_len(&self) -> usize {
         self.queue.borrow().len()
@@ -192,12 +200,15 @@ impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: 'static>
 pub trait ChannelSpec: 'static {
     type Tag: 'static;
     type Data: 'static;
-    type Priority: 'static;
+    type Priority: 'static + Clone;
 }
 
-pub struct NoPriorityTag {}
+#[derive(Clone)]
+pub struct NoPriorityTag;
 
-impl<Tag: 'static, Data: 'static, Priority: 'static + PartialOrd> ChannelSpec for (Tag, Data, Priority) {
+impl<Tag: 'static, Data: 'static, Priority: 'static + Clone + PartialOrd> ChannelSpec
+    for (Tag, Data, Priority)
+{
     type Tag = Tag;
     type Data = Data;
     type Priority = Priority;
@@ -235,9 +246,7 @@ impl<Context: 'static + ?Sized> EventRoot<Context> {
         self.channels.insert(tid, ch);
     }
 
-    pub fn channel<CS: ChannelSpec>(
-        &self,
-    ) -> &Channel<CS::Tag, Context, CS::Data, CS::Priority> {
+    pub fn channel<CS: ChannelSpec>(&self) -> &Channel<CS::Tag, Context, CS::Data, CS::Priority> {
         let tid = std::any::TypeId::of::<Channel<CS::Tag, Context, CS::Data, CS::Priority>>();
 
         match self.channels.get(&tid) {
@@ -305,7 +314,7 @@ mod tests {
     enum IntPriority {
         First,
         Second,
-        Third
+        Third,
     }
     type IntPriorityChannel = (IntTag, i32, IntPriority);
 
@@ -348,12 +357,21 @@ mod tests {
             order: order.clone(),
         });
 
-        root.channel::<IntPriorityChannel>()
-            .sub_ref(IntPriority::Second, &recv2, OrderedReceiver::receive_i32);
-        root.channel::<IntPriorityChannel>()
-            .sub_ref(IntPriority::First, &recv1, OrderedReceiver::receive_i32);
-        root.channel::<IntPriorityChannel>()
-            .sub_ref(IntPriority::Third, &recv3, OrderedReceiver::receive_i32);
+        root.channel::<IntPriorityChannel>().sub_ref(
+            IntPriority::Second,
+            &recv2,
+            OrderedReceiver::receive_i32,
+        );
+        root.channel::<IntPriorityChannel>().sub_ref(
+            IntPriority::First,
+            &recv1,
+            OrderedReceiver::receive_i32,
+        );
+        root.channel::<IntPriorityChannel>().sub_ref(
+            IntPriority::Third,
+            &recv3,
+            OrderedReceiver::receive_i32,
+        );
         root.channel::<IntPriorityChannel>().queue(0i32);
         assert_eq!(order.borrow().deref(), &vec![]);
         root.channel::<IntPriorityChannel>().do_fire_all(&());