Quellcode durchsuchen

WIP changeover of network I/O to use channels.

Kestrel vor 2 Jahren
Ursprung
Commit
b6f1cb9366

+ 4 - 2
fleck/examples/simple_node.rs

@@ -5,9 +5,11 @@ fn main() {
 
     let fleck = fleck::Fleck::new();
 
-    fleck.services().with_service(|node: &fleck_core::Nodes| {
+    fleck.bind_udp_socket(3535, None);
+
+    /*fleck.services().with_service(|node: &fleck_core::Nodes| {
         node.self_node().gen_keypair();
-    });
+    });*/
 
     fleck.run();
 }

+ 6 - 5
fleck/src/fleck_core.rs

@@ -1,4 +1,4 @@
-mod lowlevel;
+pub(crate) mod lowlevel;
 mod nodes;
 
 pub use nodes::Node;
@@ -13,12 +13,13 @@ pub mod channel_tags {
 
 #[derive(PartialEq, PartialOrd)]
 pub enum SendOrder {
-    Serialize,
+    Route,
     Sign,
     Encrypt,
     Send,
 }
 pub type SendPacketChannel = (channel_tags::SendPacketTag, crate::io::Packet, SendOrder);
+
 #[derive(PartialEq, PartialOrd)]
 pub enum ReceiveOrder {
     Decryption,
@@ -27,8 +28,8 @@ pub enum ReceiveOrder {
     Dispatch,
 }
 pub type ReceivePacketChannel = (channel_tags::ReceivePacketTag, crate::io::Packet, ReceiveOrder);
+
+/// Minor ticks (roughly once per second): for retries, status updates, etc.
 pub type MinorTickChannel = (channel_tags::MinorTickTag, ());
+/// Major ticks (several times per minute): for retries, cleanup tasks, periodic queries, etc
 pub type MajorTickChannel = (channel_tags::MajorTickTag, ());
-
-pub(crate) use lowlevel::LocalDiscovery;
-pub(crate) use lowlevel::SendPacket;

+ 8 - 1
fleck/src/fleck_core/nodes.rs

@@ -94,6 +94,9 @@ impl Node {
     }
 }
 
+pub struct RegTag {}
+pub type NodeRegistrationChannel = (RegTag, Rc<Node>);
+
 pub struct Nodes {
     all_nodes: RefCell<Vec<Rc<Node>>>,
     self_node: Rc<Node>,
@@ -132,4 +135,8 @@ impl Nodes {
     }
 }
 
-impl Service for Rc<Nodes> {}
+impl Service for Rc<Nodes> {
+    fn register_channels(&self, eroot: &mut EventRoot) {
+        eroot.create_channel::<NodeRegistrationChannel>();
+    }
+}

+ 126 - 122
fleck/src/io.rs

@@ -1,12 +1,14 @@
-use std::cell::{Cell, RefCell};
-use std::collections::VecDeque;
-use std::ops::DerefMut;
-use std::os::unix::prelude::AsRawFd;
-use std::rc::Rc;
+use std::{rc::Rc, cell::RefCell, collections::HashMap};
 
-use mio::unix::SourceFd;
 use mio::{Events, Interest, Poll, Token};
 
+use crate::{Fleck, prelude::ChannelSpec, service::event::NoPriorityTag};
+
+use self::udp::UdpData;
+
+pub(super) mod udp;
+pub(super) mod stream;
+
 lazy_static::lazy_static! {
     pub static ref MULTICAST_ADDRESS : std::net::SocketAddrV4 =
         std::net::SocketAddrV4::new(
@@ -17,62 +19,73 @@ lazy_static::lazy_static! {
 
 pub const DEFAULT_PORT: u16 = 3535;
 
-#[derive(Default)]
-pub struct Packet {
-    pub(crate) addr: Option<std::net::SocketAddr>,
-    pub(crate) data: Option<Vec<u8>>,
-    pub(crate) io_channel: Option<std::rc::Rc<dyn IOChannel>>,
-    pub node: Option<crate::fleck_core::Node>,
-    pub msg: Option<crate::msg::Message>,
+pub(super) trait AbstractFdService: 'static {
+    fn ready_read(&self, api: &Fleck);
+    fn ready_write(&self, api: &Fleck);
+    fn token(&self) -> Token;
 }
 
-impl Packet {
-    pub fn assume_msg(&self) -> &crate::msg::Message {
-        self.msg.as_ref().expect("told we could assume a Message was present")
-    }
+pub(super) struct IO {
+    poll: Rc<RefCell<mio::Poll>>,
+    services: HashMap<Token, Box<dyn AbstractFdService>>,
+}
 
-    pub fn assume_msg_mut(&mut self) -> &mut crate::msg::Message {
-        self.msg.as_mut().expect("told we could assume a Message was present")
+impl IO {
+    pub(super) fn new() -> Self {
+        Self {
+            poll: Rc::new(RefCell::new(mio::Poll::new().expect("couldn't create mio::Poll"))),
+            services: Default::default(),
+        }
     }
 
-    pub fn as_msg<T: crate::msg::MessageParams + serde::de::DeserializeOwned>(&self) -> Option<&T> {
-        self.msg.as_ref().and_then(|m| m.downcast())
-    }
+    pub(super) fn run(&self, api: &Fleck) {
+        let mut events = mio::Events::with_capacity(128);
+        loop {
+            let pr = self.poll.borrow_mut().poll(&mut events, None);
+            if pr.is_err() { continue }
+            for evt in &events {
+                let svc = self.services.get(&evt.token()).unwrap();
+                if evt.is_readable() {
+                    svc.ready_read(api);
+                }
+                if evt.is_writable() {
+                    svc.ready_write(api);
+                }
+            }
 
-    pub fn is_clear(&self) -> bool {
-        self.addr.is_none()
-            && self.data.is_none()
-            && self.io_channel.is_none()
-            && self.msg.is_none()
+            api.services().event_root().fire_all(api);
+        }
     }
 
-    pub fn clear(&mut self) {
-        self.addr.take();
-        self.data.take();
-        self.io_channel.take();
-        self.msg.take();
+    pub(super) fn add_service<S: AbstractFdService>(&mut self, srv: S) {
+        let tok = Token(self.services.len() + 1);
+        self.services.insert(tok, Box::new(srv));
     }
-}
 
-pub(crate) trait IOChannel {
-    fn send_packet(&self, packet: &mut Packet);
-}
-
-pub(crate) trait IOFeedback {
-    fn packet(&self, packet: Packet);
-    fn minor_tick(&self);
-    fn major_tick(&self);
+    pub(super) fn add_udp_service<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, DataType: UdpData>(&mut self, udp: mio::net::UdpSocket) {
+        let token = Token(self.services.len() + 1);
+        self.services.insert(token, Box::new(udp::UdpService::<Incoming, Outgoing, DataType> {
+            socket: RefCell::new(udp),
+            token,
+            poll: self.poll.clone(),
+            send_queue: Default::default(),
+            write_registered: std::cell::Cell::new(false),
+            _ghost: std::marker::PhantomData
+        }));
+    }
 }
 
-pub struct FleckIO {
+/*pub struct FleckIO {
     poll: Rc<RefCell<mio::Poll>>,
-    local: Vec<std::rc::Rc<dyn IOChannel>>,
-    global: std::rc::Rc<SocketWrapperChannel>,
+    services: HashMap<Token, Box<dyn AbstractFdServiceHelper>>,
+    // local: Vec<std::rc::Rc<dyn IOChannel>>,
+    // global: std::rc::Rc<SocketWrapperChannel>,
 
-    minor: timerfd::TimerFd,
-    major: timerfd::TimerFd,
-}
+    // minor: timerfd::TimerFd,
+    // major: timerfd::TimerFd,
+}*/
 
+/*
 struct SocketWrapperChannel {
     poll: Rc<RefCell<mio::Poll>>,
     socket: RefCell<mio::net::UdpSocket>,
@@ -91,20 +104,6 @@ impl SocketWrapperChannel {
     }
 
     fn read_packets(&self, feedback: &dyn IOFeedback) {
-        let mut buffer = [0u8; 2048];
-
-        loop {
-            match self.socket.borrow().recv_from(&mut buffer) {
-                Ok((len, addr)) => {
-                    feedback.packet(Packet {
-                        addr: Some(addr),
-                        data: Some(buffer[..len].into()),
-                        ..Default::default()
-                    });
-                }
-                Err(_) => break,
-            }
-        }
     }
 
     fn write_packets(&self) {
@@ -143,8 +142,9 @@ impl SocketWrapperChannel {
         }
     }
 }
+*/
 
-impl IOChannel for SocketWrapperChannel {
+/*impl IOChannel for SocketWrapperChannel {
     fn send_packet(&self, packet: &mut Packet) {
         assert!(packet.data.is_some());
         assert!(packet.addr.is_some());
@@ -165,74 +165,26 @@ impl IOChannel for SocketWrapperChannel {
                 .expect("couldn't update interest?");
         }
     }
-}
+}*/
 
-impl Default for FleckIO {
+/*impl Default for FleckIO {
     fn default() -> Self {
         let poll = Poll::new().expect("couldn't create mio::Poll");
 
-        // minor tick registration
-        let mut minor = timerfd::TimerFd::new().unwrap();
-        minor.set_state(
-            timerfd::TimerState::Periodic {
-                current: std::time::Duration::new(1, 0),
-                interval: std::time::Duration::new(1, 0),
-            },
-            timerfd::SetTimeFlags::Default,
-        );
-        poll.registry()
-            .register(
-                &mut SourceFd(&minor.as_raw_fd()),
-                Self::MINOR_TICK,
-                Interest::READABLE,
-            )
-            .expect("couldn't register read interest for minor timer?");
-
-        // major tick registration
-        let mut major = timerfd::TimerFd::new().unwrap();
-        major.set_state(
-            timerfd::TimerState::Periodic {
-                current: std::time::Duration::new(1, 0),
-                interval: std::time::Duration::new(15, 0),
-            },
-            timerfd::SetTimeFlags::Default,
-        );
-        poll.registry()
-            .register(
-                &mut SourceFd(&major.as_raw_fd()),
-                Self::MAJOR_TICK,
-                Interest::READABLE,
-            )
-            .expect("couldn't register read interest for major timer?");
-
-        // global socket
-        let mut global_socket = mio::net::UdpSocket::bind(
-            std::net::SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, DEFAULT_PORT).into(),
-        )
-        .expect("couldn't listen on UDP port?");
-        global_socket
-            .join_multicast_v4(&MULTICAST_ADDRESS.ip(), &std::net::Ipv4Addr::UNSPECIFIED)
-            .expect("couldn't join multicast group?");
-
-        poll.registry()
-            .register(&mut global_socket, Self::GLOBAL, Interest::READABLE)
-            .expect("couldn't register read interest in UDP port?");
-
         let poll = Rc::new(RefCell::new(poll));
 
         Self {
-            global: Rc::new(SocketWrapperChannel::new(poll.clone(), global_socket)),
-            local: Default::default(),
             poll,
-            major,
-            minor,
+            ..Default::default()
         }
     }
 }
+*/
 
+/*
 impl FleckIO {
-    fn handle_event(&self, feedback: &dyn IOFeedback, event: &mio::event::Event) {
-        match event.token() {
+    fn handle_event(&self, api: &Fleck, event: &mio::event::Event) {
+        /*match event.token() {
             Self::GLOBAL => {
                 if event.is_readable() {
                     self.global.read_packets(feedback);
@@ -254,17 +206,17 @@ impl FleckIO {
             _ => {
                 println!("unknown token!");
             }
-        }
+        }*/
     }
 
-    pub(crate) fn poll(&self, feedback: &dyn IOFeedback) {
+    pub(crate) fn poll(&self, api: &Fleck) {
         let mut events = Events::with_capacity(128);
         loop {
             let pr = self.poll.borrow_mut().poll(&mut events, None);
             match pr {
                 Ok(()) => {
                     for evt in &events {
-                        self.handle_event(feedback, evt);
+                        self.handle_event(api, evt);
                     }
                 }
                 Err(_e) => {}
@@ -272,16 +224,68 @@ impl FleckIO {
         }
     }
 
-    pub(crate) fn local(&self) -> std::rc::Rc<dyn IOChannel> {
+    /*pub(crate) fn local(&self) -> std::rc::Rc<dyn IOChannel> {
         // HACK: for now we just use the global
         self.global.clone()
     }
 
     pub(crate) fn global(&self) -> std::rc::Rc<dyn IOChannel> {
         self.global.clone()
-    }
+    }*/
 
     const GLOBAL: Token = Token(0);
     const MAJOR_TICK: Token = Token(1);
     const MINOR_TICK: Token = Token(2);
 }
+*/
+
+/*
+
+        // minor tick registration
+        let mut minor = timerfd::TimerFd::new().unwrap();
+        minor.set_state(
+            timerfd::TimerState::Periodic {
+                current: std::time::Duration::new(1, 0),
+                interval: std::time::Duration::new(1, 0),
+            },
+            timerfd::SetTimeFlags::Default,
+        );
+        poll.registry()
+            .register(
+                &mut SourceFd(&minor.as_raw_fd()),
+                Self::MINOR_TICK,
+                Interest::READABLE,
+            )
+            .expect("couldn't register read interest for minor timer?");
+
+        // major tick registration
+        let mut major = timerfd::TimerFd::new().unwrap();
+        major.set_state(
+            timerfd::TimerState::Periodic {
+                current: std::time::Duration::new(1, 0),
+                interval: std::time::Duration::new(15, 0),
+            },
+            timerfd::SetTimeFlags::Default,
+        );
+        poll.registry()
+            .register(
+                &mut SourceFd(&major.as_raw_fd()),
+                Self::MAJOR_TICK,
+                Interest::READABLE,
+            )
+            .expect("couldn't register read interest for major timer?");
+
+        // global socket
+        let mut global_socket = mio::net::UdpSocket::bind(
+            std::net::SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, DEFAULT_PORT).into(),
+        )
+        .expect("couldn't listen on UDP port?");
+        global_socket
+            .join_multicast_v4(&MULTICAST_ADDRESS.ip(), &std::net::Ipv4Addr::UNSPECIFIED)
+            .expect("couldn't join multicast group?");
+
+        poll.registry()
+            .register(&mut global_socket, Self::GLOBAL, Interest::READABLE)
+            .expect("couldn't register read interest in UDP port?");
+ *
+ * */

+ 103 - 0
fleck/src/io/stream.rs

@@ -0,0 +1,103 @@
+use std::cell::Cell;
+use std::collections::VecDeque;
+use std::fs::File;
+use std::io::Read;
+use std::io::Write;
+use std::os::unix::prelude::AsRawFd;
+use std::rc::Rc;
+use std::cell::RefCell;
+
+use mio::Interest;
+use mio::unix::SourceFd;
+use mio::Token;
+
+use crate::Fleck;
+use crate::prelude::*;
+use crate::service::event::NoPriorityTag;
+use super::AbstractFdService;
+
+pub trait StreamData: 'static + Into<Vec<u8>> + From<Vec<u8>> {}
+pub struct StreamService<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing: ChannelSpec<Data = DataType>, DataType: StreamData> {
+    stream: RefCell<File>,
+    token: Token,
+    poll: Rc<RefCell<mio::Poll>>,
+    send_queue: RefCell<VecDeque<u8>>,
+    write_registered: Cell<bool>,
+    _ghost: std::marker::PhantomData<(Incoming, Outgoing, DataType)>,
+}
+
+impl<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, DataType: StreamData> Service for Rc<StreamService<Incoming, Outgoing, DataType>> {
+    fn register_channels(&self, eroot: &mut crate::prelude::EventRoot) { 
+        eroot.channel::<Outgoing>().sub_opt(self, StreamService::handle_outgoing);
+    }
+}
+
+impl<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, DataType: StreamData> StreamService<Incoming, Outgoing, DataType> {
+    fn handle_outgoing(&self, _api: &Fleck, packet: DataType) -> Option<DataType> {
+        self.send_queue.borrow_mut().extend(packet.into().iter());
+
+        self.register_write_interest();
+
+        None
+    }
+
+    fn clear_write_interest(&self) {
+        self.poll
+            .borrow()
+            .registry()
+            .reregister(
+                &mut SourceFd(&self.stream.borrow().as_raw_fd()),
+                self.token,
+                Interest::READABLE,
+            )
+            .expect("couldn't update interest?");
+        self.write_registered.set(false);
+    }
+
+    fn register_write_interest(&self) {
+        if self.write_registered.get() { return }
+        self.poll
+            .borrow()
+            .registry()
+            .reregister(
+                &mut SourceFd(&self.stream.borrow().as_raw_fd()),
+                self.token,
+                Interest::READABLE.add(Interest::WRITABLE),
+            )
+            .expect("couldn't update interest?");
+        self.write_registered.set(true);
+    }
+}
+
+impl<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, DataType: StreamData> AbstractFdService for StreamService<Incoming, Outgoing, DataType> {
+    fn ready_read(&self, api: &Fleck) {
+        let mut buf = [0u8; 2048];
+        while let Ok(len) = self.stream.borrow_mut().read(&mut buf) {
+            api.queue::<Incoming>(Vec::from(&buf[..len]).into());
+        }
+    }
+
+    fn ready_write(&self, _api: &Fleck) {
+        let mut queue = self.send_queue.borrow_mut();
+        while !queue.is_empty() {
+            let pkt = queue.front_mut().unwrap();
+
+            let (data, _) = queue.as_slices();
+
+            match self.stream.borrow_mut().write(data) {
+                Ok(len) => {
+                    queue.drain(0..len);
+                }
+                Err(e) => {
+                    break;
+                }
+            }
+        }
+
+        if queue.is_empty() { self.clear_write_interest() }
+    }
+
+    fn token(&self) -> Token {
+        self.token
+    }
+}

+ 109 - 0
fleck/src/io/udp.rs

@@ -0,0 +1,109 @@
+use std::collections::VecDeque;
+use std::net::SocketAddr;
+use std::rc::Rc;
+use std::cell::{Cell,RefCell};
+use std::ops::DerefMut;
+
+use mio::net::UdpSocket;
+use mio::Token;
+use mio::Interest;
+
+use crate::Fleck;
+use crate::prelude::*;
+use crate::service::event::NoPriorityTag;
+use super::AbstractFdService;
+
+pub trait UdpData: 'static + Into<(SocketAddr, Vec<u8>)> + From<(SocketAddr, Vec<u8>)> {}
+
+impl<T: 'static + Into<(SocketAddr, Vec<u8>)> + From<(SocketAddr, Vec<u8>)>> UdpData for T { }
+
+pub struct UdpService<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing: ChannelSpec<Data = DataType>, DataType: UdpData> {
+    pub(super) socket: RefCell<UdpSocket>,
+    pub(super) token: Token,
+    pub(super) poll: Rc<RefCell<mio::Poll>>,
+    pub(super) send_queue: RefCell<VecDeque<(SocketAddr, Vec<u8>)>>,
+    pub(super) write_registered: Cell<bool>,
+    pub(super) _ghost: std::marker::PhantomData<(Incoming, Outgoing, DataType)>,
+}
+
+impl<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, DataType: UdpData> Service for Rc<UdpService<Incoming, Outgoing, DataType>> {
+    fn register_channels(&self, eroot: &mut crate::prelude::EventRoot) { 
+        eroot.channel::<Outgoing>().sub_opt(self, UdpService::handle_outgoing);
+    }
+}
+
+impl<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, DataType: UdpData> UdpService<Incoming, Outgoing, DataType> {
+    fn clear_write_interest(&self) {
+        self.poll
+            .borrow()
+            .registry()
+            .reregister(
+                self.socket.borrow_mut().deref_mut(),
+                self.token,
+                Interest::READABLE,
+            )
+            .expect("couldn't update interest?");
+        self.write_registered.set(false);
+    }
+
+    fn register_write_interest(&self) {
+        if self.write_registered.get() { return }
+        self.poll
+            .borrow()
+            .registry()
+            .reregister(
+                self.socket.borrow_mut().deref_mut(),
+                self.token,
+                Interest::READABLE.add(Interest::WRITABLE),
+            )
+            .expect("couldn't update interest?");
+        self.write_registered.set(true);
+    }
+
+
+    fn handle_outgoing(&self, _api: &Fleck, packet: DataType) -> Option<DataType> {
+        self.send_queue.borrow_mut().push_back(packet.into());
+
+        self.register_write_interest();
+
+        None
+    }
+}
+
+impl<Incoming: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, Outgoing: ChannelSpec<Priority = NoPriorityTag, Data = DataType>, DataType: UdpData> AbstractFdService for UdpService<Incoming, Outgoing, DataType> {
+    fn ready_read(&self, api: &Fleck) {
+        let mut buf = [0u8; 2048];
+        while let Ok((len, addr)) = self.socket.borrow_mut().recv_from(&mut buf) {
+            api.queue::<Incoming>((addr, Vec::from(&buf[..len])).into());
+        }
+    }
+
+    fn ready_write(&self, _api: &Fleck) {
+        let mut queue = self.send_queue.borrow_mut();
+        while !queue.is_empty() {
+            let pkt = queue.front().unwrap();
+            match self.socket.borrow_mut().send_to(&pkt.1, pkt.0) {
+                Ok(len) => {
+                    if len != pkt.1.len() {
+                        log::warn!(
+                            "Packet of {} bytes truncated to {} bytes when sending",
+                            pkt.1.len(),
+                            len
+                        );
+                    }
+                    queue.pop_front();
+                }
+                Err(e) => {
+                    log::info!("Error while sending packet: {:?}", e);
+                    break;
+                }
+            }
+        }
+
+        if queue.is_empty() { self.clear_write_interest() }
+    }
+
+    fn token(&self) -> Token {
+        self.token
+    }
+}

+ 56 - 31
fleck/src/lib.rs

@@ -1,35 +1,37 @@
 use std::cell::RefCell;
+use std::collections::HashMap;
+use std::os::unix::prelude::OwnedFd;
 use std::rc::Rc;
 
 mod msg;
-
-mod crypto;
+// mod crypto;
 mod helper;
 mod io;
 pub mod service;
-pub mod fleck_core;
+// pub mod fleck_core;
 
 pub mod prelude {
     pub use crate::helper::{AsAny, IntoWeak};
-    pub use crate::io::Packet as FleckPacket;
     pub use crate::msg::{Message,MessageParams};
     pub use crate::service::{
-        event::Event as FleckEvent, ChannelSpec, EventRoot, Service,
+        ChannelSpec, EventRoot, Service,
     };
-    pub use crate::fleck_core;
+    // pub use crate::fleck_core;
 }
 
 use prelude::*;
 
 pub struct Fleck {
-    io: Rc<io::FleckIO>,
+    io: RefCell<io::IO>,
+    fd_services: RefCell<HashMap<mio::Token, Rc<dyn io::AbstractFdService>>>,
     services: RefCell<service::ServiceStack>,
 }
 
 impl Fleck {
     pub fn new() -> Rc<Self> {
         let res = Rc::new(Self {
-            io: Default::default(),
+            io: RefCell::new(io::IO::new()),
+            fd_services: Default::default(),
             services: Default::default(),
         });
 
@@ -39,32 +41,62 @@ impl Fleck {
     }
 
     fn register_core_services(&self) {
-        let mut svcs = self.services.borrow_mut();
-        // initial incoming/minor/major channels
-        svcs.create_io_channels();
+        // initial outgoing/incoming/minor/major channels
+        // self.services.borrow_mut().create_io_channels();
+
+        /*
         // Node registration
-        svcs.give_service::<fleck_core::Nodes>();
-        // Local discovery
-        svcs.give_service::<fleck_core::LocalDiscovery>();
-        // Parsing incoming packets
-        svcs.give_service::<msg::ParsePacket>();
+        self.add_service::<fleck_core::Nodes>();
+        // Local node discovery
+        self.add_service::<fleck_core::lowlevel::LocalDiscovery>();
         // Actually sending packets
-        svcs.give_service::<fleck_core::SendPacket>();
+        self.add_service::<fleck_core::lowlevel::SendPacket>();
+        // Parsing incoming packets
+        self.add_service::<msg::ParsePacket>();
         // Signing packets
-        svcs.give_service::<crypto::SignPacket>();
+        self.add_service::<crypto::SignPacket>();
+        */
     }
 
     pub fn run(&self) {
-        self.io.poll(self);
+        let io = io::IO::new();
+
+        // TODO: insert/build global socket, minor, major timerfds
+
+        io.run(self);
     }
 }
 
+struct SockIncomingTag {}
+struct SockOutgoingTag {}
+
 impl Fleck {
-    pub(crate) fn raw_io(&self) -> &io::FleckIO {
-        &self.io
+    pub fn bind_udp_socket(&self, port: u16, multicast: Option<std::net::Ipv4Addr>) {
+        let socket = mio::net::UdpSocket::bind(
+            std::net::SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, port).into(),
+        )
+        .expect("couldn't listen on UDP port?");
+
+        if let Some(mcast) = multicast {
+            socket 
+                .join_multicast_v4(&mcast, &std::net::Ipv4Addr::UNSPECIFIED)
+                .expect("couldn't join multicast group?");
+        }
+
+        self.io.borrow_mut().add_udp_service::<(SockIncomingTag, (std::net::SocketAddr, Vec<u8>)),(SockOutgoingTag, (std::net::SocketAddr, Vec<u8>)),_>(socket);
+    }
+
+    /// Add a UDP socket to the Fleck polling loop.
+    pub fn add_udp_socket(&self, socket: mio::net::UdpSocket) {
+        
+    }
+    /// Add a file descriptor (stream socket, pipe, etc) to the Fleck polling loop.
+    pub fn add_raw_fd(&self, socket: OwnedFd) {
+        //self.io.borrow_mut().add_service(io::
     }
+
     /// Add a service to the Fleck instance. Must not be invoked from a service context.
-    pub fn add_service<S: Service + Default + 'static>(&self)
+    pub fn add_service<S: Default + 'static>(&self)
     where
         Rc<S>: Service,
     {
@@ -86,16 +118,9 @@ impl Fleck {
             .channel::<CS>()
             .queue(data);
     }
-    pub fn queue_priority<CS: service::ChannelSpec>(&self, data: CS::Data) {
-        self.services
-            .borrow()
-            .event_root()
-            .channel::<CS>()
-            .queue(data);
-    }
 }
 
-impl io::IOFeedback for Fleck {
+/*impl io::IOFeedback for Fleck {
     fn packet(&self, packet: io::Packet) {
         self.services.borrow().process_incoming(self, packet);
     }
@@ -107,4 +132,4 @@ impl io::IOFeedback for Fleck {
     fn major_tick(&self) {
         self.services.borrow().process_major_tick(self);
     }
-}
+}*/

+ 8 - 8
fleck/src/msg.rs

@@ -43,7 +43,7 @@ fn message_type<M: MessageParams>() -> u16 {
 pub struct Message {
     magic: u64,
     pub(crate) ty: u16,
-    pub(crate) crypto_header: crate::crypto::PacketHeader,
+    // pub(crate) crypto_header: crate::crypto::PacketHeader,
     pub(crate) data: Vec<u8>,
 
     #[serde(skip_serializing)]
@@ -60,7 +60,7 @@ impl Message {
         Self {
             magic: MESSAGE_MAGIC,
             ty: message_type::<M>(),
-            crypto_header: Default::default(),
+            // crypto_header: Default::default(),
             data: bincode::serialize(&from).expect("couldn't serialize message"),
             saved_params: Some(SavedMessageParams::save::<M>()),
             parsed: Some(Box::new(from)),
@@ -94,13 +94,13 @@ impl MessageRegistry {
     }
 
     pub(crate) fn deserialize(&self, message: &mut Message) -> Option<Box<dyn std::any::Any>> {
-        match message.crypto_header {
+        /*match message.crypto_header {
             crate::crypto::PacketHeader::NoCrypto => {}
             _ => {
                 log::error!("given packet with crypto still applied to deserialize!");
                 return None;
             }
-        }
+        }*/
 
         (self.deser.get(&message.ty)?)(&message.data)
     }
@@ -113,15 +113,15 @@ impl ParsePacket {}
 
 impl Service for std::rc::Rc<ParsePacket> {
     fn register_channels(&self, eroot: &mut super::EventRoot) {
-        eroot
+        /*eroot
             .channel::<fleck_core::ReceivePacketChannel>()
             // .with::<order::Preprocessing>()
-            .subscribe(fleck_core::ReceiveOrder::Parse, self, ParsePacket::process_incoming);
+            .subscribe(fleck_core::ReceiveOrder::Parse, self, ParsePacket::process_incoming);*/
     }
 }
 
 impl ParsePacket {
-    fn process_incoming(&self, api: &crate::Fleck, event: &mut FleckEvent<FleckPacket>) {
+    /*fn process_incoming(&self, api: &crate::Fleck, event: &mut FleckEvent<FleckPacket>) {
         // try deserializing
         let message: Option<crate::msg::Message> =
             bincode::deserialize(event.data.data.as_ref().unwrap()).ok();
@@ -140,5 +140,5 @@ impl ParsePacket {
         event.data.data.take();
 
         event.data.msg = Some(message);
-    }
+    }*/
 }

+ 2 - 5
fleck/src/service.rs

@@ -3,9 +3,6 @@ use std::collections::HashMap;
 use std::ops::Deref;
 use std::rc::Rc;
 
-use crate::io::Packet;
-use crate::fleck_core;
-
 pub(crate) mod event;
 
 pub use event::ChannelSpec;
@@ -48,8 +45,6 @@ impl ServiceStack {
         srv.register_msgs(&mut self.message_registry);
         srv.register_channels(&mut self.eroot);
 
-        log::trace!("storing service {} as {:?}", std::any::type_name::<S>(), TypeId::of::<S>());
-
         self.service_map
             .insert(TypeId::of::<S>(), Box::new(srv.clone()));
         self.services.push(Box::new(srv));
@@ -80,6 +75,7 @@ impl ServiceStack {
     }
 }
 
+/*
 impl ServiceStack {
     pub(crate) fn process_incoming(&self, api: &crate::Fleck, packet: Packet) {
         self.eroot
@@ -105,3 +101,4 @@ impl ServiceStack {
         self.eroot.create_channel::<fleck_core::MajorTickChannel>();
     }
 }
+*/

+ 70 - 79
fleck/src/service/event.rs

@@ -4,19 +4,33 @@ use std::{
     rc::{Rc, Weak},
 };
 
-pub struct Event<Data: 'static> {
-    pub data: Data,
-    pub emptied: bool,
+pub enum SubscriptionFunction<Host: 'static, Context: 'static + ?Sized, Data: 'static> {
+    ByRef(Box<dyn Fn(&Host, &Context, &mut Data)>),
+    ByValue(Box<dyn Fn(&Host, &Context, Data) -> Option<Data>>),
 }
 
-struct ConcreteEventSub<Host, Context: 'static + ?Sized, Data: 'static> {
+impl<Host: 'static, Context: 'static + ?Sized, Data: 'static> SubscriptionFunction<Host, Context, Data> {
+    fn invoke(&self, host: &Host, context: &Context, mut data: Data) -> Option<Data> {
+        match self {
+            Self::ByRef(f) => {
+                f(host, context, &mut data);
+                Some(data)
+            },
+            Self::ByValue(f) => {
+                f(host, context, data)
+            }
+        }
+    }
+}
+
+struct ConcreteEventSub<Host: 'static, Context: 'static + ?Sized, Data: 'static> {
     host: Weak<Host>,
-    callback: Box<dyn Fn(&Host, &Context, &mut Event<Data>)>,
+    callback: SubscriptionFunction<Host, Context, Data>,
 }
 
 trait EventSub<Context: 'static + ?Sized, Data: 'static> {
     fn is_healthy(&self) -> bool;
-    fn invoke(&self, context: &Context, data: &mut Event<Data>);
+    fn invoke(&self, context: &Context, data: Data) -> Option<Data>;
 }
 
 impl<Host: 'static, Context: 'static + ?Sized, Data: 'static> EventSub<Context, Data>
@@ -25,10 +39,11 @@ impl<Host: 'static, Context: 'static + ?Sized, Data: 'static> EventSub<Context,
     fn is_healthy(&self) -> bool {
         self.host.strong_count() > 0
     }
-    fn invoke(&self, context: &Context, data: &mut Event<Data>) {
+    fn invoke(&self, context: &Context, data: Data) -> Option<Data> {
         self.host
             .upgrade()
-            .map(|h| (self.callback)(h.as_ref(), context, data));
+            .map(|h| self.callback.invoke(h.as_ref(), context, data))
+            .flatten()
     }
 }
 
@@ -54,45 +69,28 @@ impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: 'static>
     }
 }
 
-/*pub struct PrioritySubscriptionBuilder<
-    'l,
-    Tag: 'static,
-    Context: 'static + ?Sized,
-    Data: 'static,
-    Priority: AbstractServicePriority,
-> {
-    parent: &'l PriorityChannel<Tag, Context, Data>,
-    _ghost: std::marker::PhantomData<Priority>,
-}
-
-impl<
-        'l,
-        Tag: 'static,
-        Context: 'static + ?Sized,
-        Data: 'static,
-        Priority: AbstractServicePriority,
-    > PrioritySubscriptionBuilder<'l, Tag, Context, Data, Priority>
-{
-    pub fn subscribe<
+impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> Channel<Tag, Context, Data, NoPriorityTag> {
+    pub fn sub_ref<
         Host: 'static,
         HC: crate::helper::IntoWeak<Host>,
-        CB: Fn(&Host, &Context, &mut Event<Data>) + 'static,
+        CB: Fn(&Host, &Context, &mut Data) + 'static,
     >(
         &self,
         who: HC,
         cb: CB,
     ) {
-        self.parent.subscribe::<Priority, Host, HC, CB>(who, cb);
-    }
-}
-*/
+        let sub = Rc::new(ConcreteEventSub {
+            host: who.as_weak(),
+            callback: SubscriptionFunction::ByRef(Box::new(cb)),
+        });
 
+        self.subs.borrow_mut().push((NoPriorityTag {}, sub));
+    }
 
-impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> Channel<Tag, Context, Data, NoPriorityTag> {
-    pub fn subscribe<
+    pub fn sub_opt<
         Host: 'static,
         HC: crate::helper::IntoWeak<Host>,
-        CB: Fn(&Host, &Context, &mut Event<Data>) + 'static,
+        CB: Fn(&Host, &Context, Data) -> Option<Data> + 'static,
     >(
         &self,
         who: HC,
@@ -100,7 +98,7 @@ impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> Channel<Tag, Contex
     ) {
         let sub = Rc::new(ConcreteEventSub {
             host: who.as_weak(),
-            callback: Box::new(cb),
+            callback: SubscriptionFunction::ByValue(Box::new(cb)),
         });
 
         self.subs.borrow_mut().push((NoPriorityTag {}, sub));
@@ -108,10 +106,31 @@ impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static> Channel<Tag, Contex
 }
 
 impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: PartialOrd> Channel<Tag, Context, Data, Priority> {
-    pub fn subscribe<
+    pub fn sub_ref<
+        Host: 'static,
+        HC: crate::helper::IntoWeak<Host>,
+        CB: Fn(&Host, &Context, &mut Data) + 'static,
+    >(
+        &self,
+        p: Priority,
+        who: HC,
+        cb: CB,
+    ) {
+        let sub = Rc::new(ConcreteEventSub {
+            host: who.as_weak(),
+            callback: SubscriptionFunction::ByRef(Box::new(cb)),
+        });
+
+        let mut subs = self.subs.borrow_mut();
+        subs.push((p.into(), sub));
+        // XXX: what happens if we actually get an undefined order?...
+        subs.sort_by(|a,b| a.0.partial_cmp(&b.0).unwrap());
+    }
+
+    pub fn sub_opt<
         Host: 'static,
         HC: crate::helper::IntoWeak<Host>,
-        CB: Fn(&Host, &Context, &mut Event<Data>) + 'static,
+        CB: Fn(&Host, &Context, Data) -> Option<Data> + 'static,
     >(
         &self,
         p: Priority,
@@ -120,7 +139,7 @@ impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: PartialOr
     ) {
         let sub = Rc::new(ConcreteEventSub {
             host: who.as_weak(),
-            callback: Box::new(cb),
+            callback: SubscriptionFunction::ByValue(Box::new(cb)),
         });
 
         let mut subs = self.subs.borrow_mut();
@@ -139,16 +158,11 @@ impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: 'static>
         let mut count = 0;
 
         while !self.queue.borrow().is_empty() {
-            let mut event = Event {
-                data: self.queue.borrow_mut().pop_front().unwrap(),
-                emptied: false,
-            };
+            let mut event = self.queue.borrow_mut().pop_front();
 
             for sub in self.subs.borrow().iter() {
-                sub.1.invoke(context, &mut event);
-                if event.emptied {
-                    break;
-                }
+                event = sub.1.invoke(context, event.unwrap());
+                if event.is_none() { break }
             }
 
             count += 1;
@@ -175,7 +189,7 @@ impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: 'static>
     }
 }
 
-pub trait ChannelSpec {
+pub trait ChannelSpec: 'static {
     type Tag: 'static;
     type Data: 'static;
     type Priority: 'static;
@@ -221,29 +235,6 @@ impl<Context: 'static + ?Sized> EventRoot<Context> {
         self.channels.insert(tid, ch);
     }
 
-    /*pub fn create_priority_channel<CS: ChannelSpec>(&mut self) {
-        let tid = std::any::TypeId::of::<PriorityChannel<CS::Tag, Context, CS::Data>>();
-
-        let ch = Rc::new(PriorityChannel::<CS::Tag, Context, CS::Data>::default());
-        self.metadata
-            .insert(tid, (std::any::type_name::<CS>(), ch.clone()));
-        self.channels.insert(tid, ch);
-    }*/
-
-    /*pub fn channel<CS: ChannelSpec>(&self) -> &EventChannel<CS::Tag, Context, CS::Data> {
-        let tid = std::any::TypeId::of::<EventChannel<CS::Tag, Context, CS::Data>>();
-
-        match self.channels.get(&tid) {
-            Some(ch) => ch.downcast_ref().expect("internal inconsistency"),
-            None => {
-                panic!(
-                    "Asked for channel {} that has not been created!",
-                    std::any::type_name::<CS>()
-                )
-            }
-        }
-    }*/
-
     pub fn channel<CS: ChannelSpec>(
         &self,
     ) -> &Channel<CS::Tag, Context, CS::Data, CS::Priority> {
@@ -284,14 +275,14 @@ mod tests {
         rc::Rc,
     };
 
-    use super::{Event, EventRoot};
+    use super::EventRoot;
 
     struct Receiver {
         int_count: Cell<usize>,
     }
 
     impl Receiver {
-        fn receive_i32(&self, _ctx: &(), _val: &mut Event<i32>) {
+        fn receive_i32(&self, _ctx: &(), _val: &mut i32) {
             self.int_count.set(self.int_count.get() + 1);
         }
     }
@@ -302,7 +293,7 @@ mod tests {
     }
 
     impl OrderedReceiver {
-        fn receive_i32(&self, _ctx: &(), _val: &mut Event<i32>) {
+        fn receive_i32(&self, _ctx: &(), _val: &mut i32) {
             self.order.borrow_mut().push(self.id)
         }
     }
@@ -329,7 +320,7 @@ mod tests {
         });
 
         root.channel::<IntChannel>()
-            .subscribe(&recv, Receiver::receive_i32);
+            .sub_ref(&recv, Receiver::receive_i32);
         root.channel::<IntChannel>().queue(0i32);
         assert_eq!(recv.int_count.get(), 0);
         root.channel::<IntChannel>().do_fire_all(&());
@@ -358,11 +349,11 @@ mod tests {
         });
 
         root.channel::<IntPriorityChannel>()
-            .subscribe(IntPriority::Second, &recv2, OrderedReceiver::receive_i32);
+            .sub_ref(IntPriority::Second, &recv2, OrderedReceiver::receive_i32);
         root.channel::<IntPriorityChannel>()
-            .subscribe(IntPriority::First, &recv1, OrderedReceiver::receive_i32);
+            .sub_ref(IntPriority::First, &recv1, OrderedReceiver::receive_i32);
         root.channel::<IntPriorityChannel>()
-            .subscribe(IntPriority::Third, &recv3, OrderedReceiver::receive_i32);
+            .sub_ref(IntPriority::Third, &recv3, OrderedReceiver::receive_i32);
         root.channel::<IntPriorityChannel>().queue(0i32);
         assert_eq!(order.borrow().deref(), &vec![]);
         root.channel::<IntPriorityChannel>().do_fire_all(&());