Browse Source

Moved IO support to be implemented as a service.

Kestrel 2 years ago
parent
commit
4bb5680128

+ 8 - 4
fleck/examples/simple_node.rs

@@ -1,3 +1,5 @@
+use std::net::{SocketAddr, IpAddr};
+
 use fleck::{prelude::*, fleck_core};
 use fleck::{prelude::*, fleck_core};
 
 
 fn main() {
 fn main() {
@@ -5,10 +7,12 @@ fn main() {
 
 
     let fleck = fleck::Fleck::new();
     let fleck = fleck::Fleck::new();
 
 
-    struct UDPSource;
-    struct UDPSink;
-    fleck.add_io()
-        .sink_with::<fleck_core::SendPacketChannel>(fleck_core::SendOrder::Send);
+    fleck.with_service(|io: &fleck_core::io::IOService| {
+        fleck_core::io::UdpSocketBuilder::default()
+            .bind_to(SocketAddr::new(std::net::Ipv4Addr::UNSPECIFIED.into(), 3535))
+            .join_multicast("239.0.239.0".parse().unwrap(), "0.0.0.0".parse().unwrap())
+            .build(io);
+    });
 
 
     fleck.run();
     fleck.run();
 }
 }

+ 21 - 0
fleck/src/fleck_core.rs

@@ -3,6 +3,7 @@
 // pub mod vchannel;
 // pub mod vchannel;
 pub mod peer;
 pub mod peer;
 pub mod msg;
 pub mod msg;
+pub mod io;
 
 
 // pub use nodes::Node;
 // pub use nodes::Node;
 // pub use nodes::Nodes;
 // pub use nodes::Nodes;
@@ -36,3 +37,23 @@ pub type ReceivePacketChannel = (channel_tags::ReceivePacketTag, msg::Message, R
 pub type MinorTickChannel = (channel_tags::MinorTickTag, ());
 pub type MinorTickChannel = (channel_tags::MinorTickTag, ());
 /// Major ticks (several times per minute): for retries, cleanup tasks, periodic queries, etc
 /// Major ticks (several times per minute): for retries, cleanup tasks, periodic queries, etc
 pub type MajorTickChannel = (channel_tags::MajorTickTag, ());
 pub type MajorTickChannel = (channel_tags::MajorTickTag, ());
+
+#[derive(Default)]
+pub struct CoreInitService {}
+
+impl crate::service::Service for std::rc::Rc<CoreInitService> {
+    fn setup(&self, api: &crate::Fleck) {
+        api.create_channel::<MinorTickChannel>();
+        api.create_channel::<MajorTickChannel>();
+        api.with_service(|io: &io::IOService| {
+            io::TimerFdBuilder::default()
+                .interval(std::time::Duration::from_secs(1))
+                .trigger(|api| api.queue::<MinorTickChannel>(()))
+                .build(io);
+            io::TimerFdBuilder::default()
+                .interval(std::time::Duration::from_secs(15))
+                .trigger(|api| api.queue::<MajorTickChannel>(()))
+                .build(io);
+        });
+    }
+}

+ 271 - 0
fleck/src/fleck_core/io.rs

@@ -0,0 +1,271 @@
+use std::{rc::Rc, cell::{RefCell, Cell}, os::unix::prelude::{RawFd, AsRawFd}, collections::{HashMap, VecDeque}};
+
+use crate::prelude::*;
+
+pub struct InterestRegistration {
+    poll: Rc<RefCell<mio::Poll>>,
+    token: mio::Token,
+    fd: RawFd,
+    interest: Cell<mio::Interest>,
+}
+
+impl InterestRegistration {
+    pub fn update_interest(&self, to: mio::Interest) {
+        if self.interest.get() != to {
+            self.poll.borrow().registry().reregister(
+                &mut mio::unix::SourceFd(&self.fd),
+                self.token,
+                to
+            ).expect("couldn't update FD interest");
+            self.interest.set(to);
+        }
+    }
+}
+
+pub trait FdHandler {
+    fn ready_read(&self, rc: &Rc<dyn FdHandler>, api: &crate::Fleck);
+    fn ready_write(&self, rc: &Rc<dyn FdHandler>, api: &crate::Fleck);
+    fn dispatch(&self, api: &crate::Fleck, msg: super::msg::Message);
+}
+
+/*pub(crate) struct FdHandler {
+    pub(crate) ready_read: Rc<dyn Fn(&crate::Fleck)>,
+    pub(crate) ready_write: Rc<dyn Fn(&crate::Fleck)>,
+    pub(crate) dispatch: Rc<dyn Fn(&crate::Fleck, super::msg::Message)>,
+    owned: Rc<dyn std::any::Any>,
+}*/
+
+pub struct IOService {
+    poll: Rc<RefCell<mio::Poll>>,
+    next_token: Cell<mio::Token>,
+    handlers: RefCell<HashMap<mio::Token, Rc<dyn FdHandler>>>,
+}
+
+impl Default for IOService {
+    fn default() -> Self {
+        Self {
+            poll: Rc::new(RefCell::new(mio::Poll::new().expect("couldn't create poll?"))),
+            next_token: Cell::new(mio::Token(1)),
+            handlers: Default::default(),
+        }
+    }
+}
+
+impl Service for Rc<IOService> { }
+
+impl IOService {
+    pub fn register_handler(&self, reg: &InterestRegistration, handler: Rc<dyn FdHandler>) {
+        self.handlers.borrow_mut().insert(reg.token, handler);
+    }
+
+    pub fn register_interest(&self, fd: RawFd) -> InterestRegistration {
+        let token = self.next_token.get();
+        self.next_token.set(mio::Token(token.0 + 1));
+
+        self.poll.borrow().registry().register(
+            &mut mio::unix::SourceFd(&fd),
+            token,
+            mio::Interest::READABLE,
+        ).expect("couldn't register initial readable interest?");
+
+        InterestRegistration {
+            poll: self.poll.clone(),
+            token,
+            fd,
+            interest: Cell::new(mio::Interest::READABLE),
+        }
+    }
+
+    pub(crate) fn run(&self, api: &crate::Fleck) {
+        let mut events = mio::Events::with_capacity(128);
+        loop {
+            let pr = self.poll.borrow_mut().poll(&mut events, None);
+            if pr.is_err() {
+                continue;
+            }
+            let handlers = self.handlers.borrow();
+            for evt in &events {
+                let h = handlers.get(&evt.token()).unwrap();
+                if evt.is_readable() {
+                    h.ready_read(h, api);
+                }
+                if evt.is_writable() {
+                    h.ready_write(h, api);
+                }
+            }
+
+            api.services.event_root().fire_all(api);
+        }
+    }
+}
+
+#[derive(Default)]
+pub struct UdpSocketBuilder {
+    bind: Option<std::net::SocketAddr>,
+    multicast: Vec<(std::net::IpAddr, std::net::IpAddr)>,
+}
+
+impl UdpSocketBuilder {
+    pub fn bind_to(mut self, addr: std::net::SocketAddr) -> Self {
+        self.bind = Some(addr);
+        self
+    }
+
+    pub fn join_multicast(mut self, group: std::net::IpAddr, iface: std::net::IpAddr) -> Self {
+        self.multicast.push((group, iface));
+        self
+    }
+
+    pub fn build(self, io: &IOService) {
+        let socket = mio::net::UdpSocket::bind(self.bind.expect("no bind address given to UDP socket!")).expect("couldn't bind UDP socket");
+        let interest = io.register_interest(socket.as_raw_fd());
+
+        for mcast in self.multicast {
+            use std::net::IpAddr;
+            match (mcast.0, mcast.1) {
+                (IpAddr::V4(group), IpAddr::V4(iface)) => {
+                    socket.join_multicast_v4(&group, &iface).expect("couldn't join multicast group");
+                },
+                (IpAddr::V6(_group), IpAddr::V6(_iface)) => {
+                    todo!()
+                },
+                _ => panic!("Multicast specification mixes ipv4 and ipv6 addresses!"),
+            }
+        }
+
+        let sock = Rc::new(UdpSocket {
+            interest,
+            socket,
+            queue: Default::default(),
+            peers: Default::default(),
+        });
+
+        io.register_handler(&sock.interest, sock.clone());
+    }
+}
+
+struct UdpSocket {
+    interest: InterestRegistration,
+    socket: mio::net::UdpSocket,
+    queue: RefCell<VecDeque<(std::net::SocketAddr, Vec<u8>)>>,
+    peers: RefCell<HashMap<std::net::SocketAddr, super::peer::Peer>>,
+}
+
+impl UdpSocket {
+    fn peer_for(&self, rc: &Rc<dyn FdHandler>, addr: &std::net::SocketAddr) -> super::peer::Peer {
+        if let Some(peer) = self.peers.borrow().get(addr) {
+            peer.clone()
+        }
+        else {
+            let peer = super::peer::Peer {
+                data: Rc::new(super::peer::PeerData {
+                    address_info: super::peer::PeerAddress::Udp(*addr),
+                    io: rc.clone(),
+                })
+            };
+            self.peers.borrow_mut().insert(*addr, peer.clone());
+            peer
+        }
+    }
+}
+
+impl FdHandler for UdpSocket {
+    fn ready_read(&self, rc: &Rc<dyn FdHandler>, api: &crate::Fleck) {
+        let mut buf = [0u8; 2048];
+        while let Ok((bytes, addr)) = self.socket.recv_from(&mut buf) {
+            if let Ok(mut msg) = bincode::deserialize::<super::msg::Message>(&buf[..bytes]) {
+                msg.peer = Some(self.peer_for(rc, &addr));
+                api.queue::<super::ReceivePacketChannel>(msg);
+            }
+        }
+    }
+
+    fn ready_write(&self, _rc: &Rc<dyn FdHandler>, _api: &crate::Fleck) {
+        let queue = self.queue.borrow_mut();
+        while !queue.is_empty() {
+            let packet = queue.front().unwrap();
+
+            if let Ok(len) = self.socket.send_to(&packet.1, packet.0) {
+                if len != packet.1.len() {
+                    log::error!("Sent packet truncated to {} of {} bytes", len, packet.1.len());
+                }
+            }
+            else {
+                return
+            }
+        }
+
+        self.interest.update_interest(mio::Interest::READABLE);
+    }
+
+    fn dispatch(&self, _api: &crate::Fleck, msg: fleck_core::msg::Message) {
+        let peer = msg.peer.clone().unwrap(); // if we got this far, the peer points to us
+
+        if let super::peer::PeerAddress::Udp(addr) = peer.data.address_info {
+            self.queue.borrow_mut().push_back(
+                (addr, bincode::serialize(&msg).expect("couldn't serialize message?")));
+            self.interest.update_interest(mio::Interest::READABLE.add(mio::Interest::WRITABLE));
+        }
+        else {
+            log::error!("packet dispatched to UdpSocket without UDP address!");
+        }
+    }
+}
+
+#[derive(Default)]
+pub struct TimerFdBuilder {
+    interval: Option<std::time::Duration>,
+    trigger: Option<Box<dyn Fn(&crate::Fleck)>>,
+}
+
+impl TimerFdBuilder {
+    pub fn interval(mut self, interval: std::time::Duration) -> Self {
+        self.interval = Some(interval);
+        self
+    }
+
+    pub fn trigger<F: 'static + Fn(&crate::Fleck)>(mut self, f: F) -> Self {
+        self.trigger = Some(Box::new(f));
+        self
+    }
+
+    pub fn build(self, io: &IOService) {
+        let mut timer = timerfd::TimerFd::new().expect("couldn't create new timerfd?");
+
+        timer.set_state(timerfd::TimerState::Periodic {
+            current: std::time::Duration::from_millis(1),
+            interval: self.interval.expect("building timerfd with no interval set?"),
+        }, timerfd::SetTimeFlags::Default);
+
+        let interest = io.register_interest(timer.as_raw_fd());
+
+        let sock = Rc::new(TimerFd {
+            interest,
+            timer,
+            trigger: self.trigger.expect("building timerfd with no trigger?"),
+        });
+
+        io.register_handler(&sock.interest, sock.clone());
+    }
+}
+
+struct TimerFd {
+    interest: InterestRegistration,
+    timer: timerfd::TimerFd,
+    trigger: Box<dyn Fn(&crate::Fleck)>,
+}
+
+impl FdHandler for TimerFd {
+    fn ready_read(&self, _rc: &Rc<dyn FdHandler>, api: &crate::Fleck) {
+        self.timer.read();
+        (self.trigger)(api);
+    }
+
+    fn ready_write(&self, _rc: &Rc<dyn FdHandler>, _api: &crate::Fleck) {
+        unreachable!()
+    }
+
+    fn dispatch(&self, _api: &crate::Fleck, _msg: fleck_core::msg::Message) {
+        unreachable!()
+    }
+}

+ 1 - 1
fleck/src/fleck_core/msg.rs

@@ -117,7 +117,7 @@ pub(crate) struct ParsePacket {}
 impl ParsePacket {}
 impl ParsePacket {}
 
 
 impl Service for std::rc::Rc<ParsePacket> {
 impl Service for std::rc::Rc<ParsePacket> {
-    fn register_channels(&self, eroot: &mut crate::EventRoot) {
+    fn setup(&self, api: &crate::Fleck) {
         /*eroot
         /*eroot
         .channel::<fleck_core::ReceivePacketChannel>()
         .channel::<fleck_core::ReceivePacketChannel>()
         // .with::<order::Preprocessing>()
         // .with::<order::Preprocessing>()

+ 23 - 59
fleck/src/fleck_core/peer.rs

@@ -6,85 +6,49 @@ use crate::prelude::*;
 
 
 use super::msg::Message;
 use super::msg::Message;
 
 
-
-#[derive(Clone)]
-pub struct Peer {
-    inner: Rc<dyn InnerPeer>
-}
-
-impl std::fmt::Debug for Peer {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.write_str("Peer")?;
-        Ok(())
-    }
+pub(crate) enum PeerAddress {
+    Stream,
+    Udp(std::net::SocketAddr),
+    Virtual
 }
 }
 
 
-trait InnerPeer {
-    fn dispatch(&self, api: &crate::Fleck, msg: super::msg::Message);
-}
-
-struct UdpPeer<SourceTag: 'static, SinkTag: 'static> {
-    _ghost: std::marker::PhantomData<(SourceTag, SinkTag)>,
-}
-
-impl<SourceTag: 'static, SinkTag: 'static> UdpPeer<SourceTag, SinkTag> {
-    fn forward(&self, api: crate::Fleck, msg: (std::net::SocketAddr, Vec<u8>)) -> Option<(std::net::SocketAddr, Vec<u8>)> {
-        None
-    }
+pub(crate) struct PeerData {
+    pub(crate) io: Rc<dyn super::io::FdHandler>,
+    pub(crate) address_info: PeerAddress,
 }
 }
 
 
-impl<SourceTag: 'static, SinkTag: 'static> InnerPeer for UdpPeer<SourceTag, SinkTag> {
-    fn dispatch(&self, api: &crate::Fleck, msg: fleck_core::msg::Message) {
-        
-        api.queue::<(SinkTag, (std::net::SocketAddr, Vec<u8>))>(todo!());
+impl std::fmt::Debug for PeerData {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.write_str("PeerData {}")
     }
     }
 }
 }
 
 
-struct UdpSocket {
-    peers: HashMap<std::net::SocketAddr, Rc<Peer>>,
-}
-
-impl UdpSocket {
-    fn handle_incoming(&self, ) {
-        
-    }
-
-    fn handle_outgoing(&self, ) {
-        
-    }
+#[derive(Clone,Debug)]
+pub struct Peer {
+    pub(crate) data: Rc<PeerData>
 }
 }
 
 
 #[derive(Default)]
 #[derive(Default)]
 pub struct PeerService {
 pub struct PeerService {
-    udp: RefCell<Vec<Rc<UdpSocket>>>,
+    // udp: RefCell<Vec<Rc<UdpSocket>>>,
+    // stream: RefCell<Vec<Rc<
 }
 }
 
 
 impl Service for std::rc::Rc<PeerService> {
 impl Service for std::rc::Rc<PeerService> {
-    fn register_channels(&self, eroot: &mut EventRoot) {
-        eroot.channel::<fleck_core::SendPacketChannel>().sub_opt(fleck_core::SendOrder::Send, self, PeerService::dispatch);
+    fn setup(&self, api: &crate::Fleck) {
+        api.channel::<fleck_core::SendPacketChannel>().as_ref().sub_opt(fleck_core::SendOrder::Send, self, PeerService::dispatch);
     }
     }
 }
 }
 
 
 impl PeerService {
 impl PeerService {
     fn dispatch(&self, api: &crate::Fleck, msg: super::msg::Message) -> Option<super::msg::Message> {
     fn dispatch(&self, api: &crate::Fleck, msg: super::msg::Message) -> Option<super::msg::Message> {
-        /*let peer = match &msg.peer {
-            Some(peer) => peer.clone(),
-            None => {
-                log::error!("Packet got to peer dispatch with no peer?");
-                return None
-            }
-        };
-
-        peer.inner.dispatch(api, msg);*/
+        if msg.peer.is_none() {
+            log::error!("Message got to dispatch with no peer selected?");
+            return None
+        }
+        let peer = msg.peer.clone().unwrap();
+        peer.data.io.dispatch(api, msg);
 
 
         None
         None
     }
     }
-
-    pub fn link_udp_socket<SourceTag: 'static, SinkTag: 'static>(&self) {
-        self.udp.borrow_mut().push(
-            Rc::new(UdpSocket {
-                peers: Default::default()
-            })
-        );
-    }
 }
 }

+ 41 - 0
fleck/src/helper.rs

@@ -37,3 +37,44 @@ impl<'a, T> IntoWeak<T> for &'a std::rc::Weak<T> {
         self.clone()
         self.clone()
     }
     }
 }
 }
+
+/*pub struct ClosureRef<'l, Target: 'l, Closure: Fn() -> &'l Target> {
+    closure: Closure,
+}
+
+impl<'l, Target: 'l, Closure: Fn() -> &'l Target> ClosureRef<'l, Target, Closure> {
+    pub fn new(c: Closure) -> Self {
+        Self { closure: c }
+    }
+}
+
+impl<'l, Target: 'l, Closure: Fn() -> &'l Target> AsRef<Target> for ClosureRef<'l, Target, Closure> {
+    fn as_ref(&self) -> &Target {
+        (self.closure)()
+    }
+}
+
+pub struct OwnedRef<'base, Base: 'base, Contained> {
+    own: Base,
+    get: Box<dyn Fn(&Base) -> &Contained>,
+    _ghost: std::marker::PhantomData<&'base ()>,
+}
+
+impl<'base, Base, Contained> OwnedRef<'base, Base, Contained> {
+    pub fn new<F: 'static + Fn(&Base) -> &Contained>(own: Base, get: F) -> Self {
+        Self { own, get: Box::new(get), _ghost: Default::default() }
+    }
+}
+
+impl<'base, Base, Contained> AsRef<Contained> for OwnedRef<'base, Base, Contained> {
+    fn as_ref(&self) -> &Contained {
+        (self.get)(&self.own)
+    }
+}
+
+impl<'base, Base, Contained> std::ops::Deref for OwnedRef<'base, Base, Contained> {
+    type Target = Contained;
+    fn deref(&self) -> &Self::Target {
+        (self.get)(&self.own)
+    }
+}*/

+ 25 - 57
fleck/src/lib.rs

@@ -6,7 +6,7 @@ use std::rc::Rc;
 // mod crypto;
 // mod crypto;
 pub mod fleck_core;
 pub mod fleck_core;
 mod helper;
 mod helper;
-mod io;
+// mod io;
 pub mod service;
 pub mod service;
 
 
 pub mod prelude {
 pub mod prelude {
@@ -19,24 +19,26 @@ pub mod prelude {
 use prelude::*;
 use prelude::*;
 
 
 pub struct Fleck {
 pub struct Fleck {
-    io: RefCell<io::IO>,
-    services: RefCell<service::ServiceStack>,
+    services: service::ServiceStack,
 }
 }
 
 
 impl Fleck {
 impl Fleck {
     pub fn new() -> Rc<Self> {
     pub fn new() -> Rc<Self> {
         let res = Rc::new(Self {
         let res = Rc::new(Self {
-            io: RefCell::new(io::IO::new()),
+            // io: RefCell::new(io::IO::new()),
             services: Default::default(),
             services: Default::default(),
         });
         });
 
 
-        res.timer_setup();
-        res.register_core_services();
+        res.add_service::<fleck_core::io::IOService>();
+        res.add_service::<fleck_core::CoreInitService>();
+
+        // res.register_core_services();
+        // res.timer_setup();
 
 
         res
         res
     }
     }
 
 
-    fn timer_setup(&self) {
+    /*fn timer_setup(&self) {
         // minor tick registration
         // minor tick registration
         self.add_io()
         self.add_io()
             .source::<(fleck_core::MinorTickChannel, ())>()
             .source::<(fleck_core::MinorTickChannel, ())>()
@@ -49,9 +51,10 @@ impl Fleck {
             .timer()
             .timer()
             .interval(std::time::Duration::new(15, 0))
             .interval(std::time::Duration::new(15, 0))
             .build();
             .build();
-    }
+    }*/
 
 
     fn register_core_services(&self) {
     fn register_core_services(&self) {
+        // core I/O service
         // initial outgoing/incoming/minor/major channels
         // initial outgoing/incoming/minor/major channels
         // self.services.borrow_mut().create_io_channels();
         // self.services.borrow_mut().create_io_channels();
 
 
@@ -68,73 +71,38 @@ impl Fleck {
     }
     }
 
 
     pub fn run(&self) {
     pub fn run(&self) {
-        self.io.borrow().run(self);
+        self.with_service(|io: &fleck_core::io::IOService| {
+            io.run(self);
+        });
     }
     }
 }
 }
 
 
 impl Fleck {
 impl Fleck {
-    pub fn add_io(&self) -> io::builder::DefaultServiceBuilder<'_> {
-        self.io.borrow().add_service(self)
-    }
-    /*
-    pub fn bind_udp_socket(&self, port: u16, multicast: Option<std::net::Ipv4Addr>) {
-        let socket = mio::net::UdpSocket::bind(
-            std::net::SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, port).into(),
-        )
-        .expect("couldn't listen on UDP port?");
-
-        if let Some(mcast) = multicast {
-            socket
-                .join_multicast_v4(&mcast, &std::net::Ipv4Addr::UNSPECIFIED)
-                .expect("couldn't join multicast group?");
-        }
-
-        /*self.io.borrow_mut().add_udp_service::<(SockIncomingTag, (std::net::SocketAddr, Vec<u8>)),(SockOutgoingTag, (std::net::SocketAddr, Vec<u8>)),_>(socket);*/
-    }
-
-    /// Add a UDP socket to the Fleck polling loop.
-    pub fn add_udp_socket(&self, socket: mio::net::UdpSocket) {}
-    /// Add a file descriptor (stream socket, pipe, etc) to the Fleck polling loop.
-    pub fn add_raw_fd(&self, socket: OwnedFd) {
-        //self.io.borrow_mut().add_service(io::
-    }
-    */
-
-    /// Add a service to the Fleck instance. Must not be invoked from a service context.
+    /// Add a service to the Fleck instance.
     pub fn add_service<S: Default + 'static>(&self)
     pub fn add_service<S: Default + 'static>(&self)
     where
     where
         Rc<S>: Service,
         Rc<S>: Service,
     {
     {
-        self.services.borrow_mut().give_service::<S>();
-    }
-    pub fn services(&self) -> std::cell::Ref<service::ServiceStack> {
-        self.services.borrow()
+        self.services.give_service::<S>(self);
     }
     }
     pub fn with_service<S: 'static, R, F: FnOnce(&S) -> R>(&self, f: F) -> R
     pub fn with_service<S: 'static, R, F: FnOnce(&S) -> R>(&self, f: F) -> R
     where
     where
         Rc<S>: Service,
         Rc<S>: Service,
     {
     {
-        self.services.borrow().with_service(f)
+        self.services.with_service(f)
+    }
+    pub fn create_channel<CS: service::ChannelSpec>(&self) -> Rc<service::event::Channel<CS::Tag, Self, CS::Data, CS::Priority>> {
+        self.services.event_root().create_channel::<CS>();
+        self.channel::<CS>()
+    }
+    pub fn channel<CS: service::ChannelSpec>(&self) -> Rc<service::event::Channel<CS::Tag, Self, CS::Data, CS::Priority>> {
+        self.services.event_root().channel::<CS>()
     }
     }
     pub fn queue<CS: service::ChannelSpec>(&self, data: CS::Data) {
     pub fn queue<CS: service::ChannelSpec>(&self, data: CS::Data) {
         self.services
         self.services
-            .borrow()
             .event_root()
             .event_root()
             .channel::<CS>()
             .channel::<CS>()
+            .as_ref()
             .queue(data);
             .queue(data);
     }
     }
 }
 }
-
-/*impl io::IOFeedback for Fleck {
-    fn packet(&self, packet: io::Packet) {
-        self.services.borrow().process_incoming(self, packet);
-    }
-
-    fn minor_tick(&self) {
-        self.services.borrow().process_minor_tick(self);
-    }
-
-    fn major_tick(&self) {
-        self.services.borrow().process_major_tick(self);
-    }
-}*/

+ 17 - 19
fleck/src/service.rs

@@ -2,6 +2,7 @@ use std::any::TypeId;
 use std::collections::HashMap;
 use std::collections::HashMap;
 use std::ops::Deref;
 use std::ops::Deref;
 use std::rc::Rc;
 use std::rc::Rc;
+use std::cell::RefCell;
 
 
 pub(crate) mod event;
 pub(crate) mod event;
 
 
@@ -13,8 +14,7 @@ pub type EventRoot = event::EventRoot<crate::Fleck>;
 
 
 #[allow(unused_variables)]
 #[allow(unused_variables)]
 pub trait Service: std::any::Any + crate::helper::AsAny {
 pub trait Service: std::any::Any + crate::helper::AsAny {
-    fn register_channels(&self, eroot: &mut EventRoot) {}
-    fn register_msgs(&self, registry: &mut fleck_core::msg::MessageRegistry) {}
+    fn setup(&self, api: &crate::Fleck) {}
 }
 }
 
 
 trait ServiceExt {
 trait ServiceExt {
@@ -29,32 +29,31 @@ impl<T: Service + Default> ServiceExt for T {
 
 
 #[derive(Default)]
 #[derive(Default)]
 pub struct ServiceStack {
 pub struct ServiceStack {
-    services: Vec<Box<dyn Service>>,
+    services: RefCell<Vec<Box<dyn Service>>>,
 
 
-    service_map: HashMap<TypeId, Box<dyn Service>>,
+    service_map: RefCell<HashMap<TypeId, Box<dyn Service>>>,
 
 
     eroot: EventRoot,
     eroot: EventRoot,
     message_registry: fleck_core::msg::MessageRegistry,
     message_registry: fleck_core::msg::MessageRegistry,
 }
 }
 
 
 impl ServiceStack {
 impl ServiceStack {
-    pub fn give_service<S: 'static + Default>(&mut self)
+    pub(crate) fn give_service<S: 'static + Default>(&self, api: &crate::Fleck)
     where
     where
         Rc<S>: Service,
         Rc<S>: Service,
     {
     {
-        self.add_service(Rc::new(S::default()));
+        self.add_service(api, Rc::new(S::default()));
     }
     }
 
 
-    pub fn add_service<S: 'static>(&mut self, srv: Rc<S>)
+    pub(crate) fn add_service<S: 'static>(&self, api: &crate::Fleck, srv: Rc<S>)
     where
     where
         Rc<S>: Service,
         Rc<S>: Service,
     {
     {
-        srv.register_msgs(&mut self.message_registry);
-        srv.register_channels(&mut self.eroot);
+        srv.setup(api);
 
 
-        self.service_map
+        self.service_map.borrow_mut()
             .insert(TypeId::of::<S>(), Box::new(srv.clone()));
             .insert(TypeId::of::<S>(), Box::new(srv.clone()));
-        self.services.push(Box::new(srv));
+        self.services.borrow_mut().push(Box::new(srv));
     }
     }
 
 
     pub fn with_service<S: 'static, R, F: FnOnce(&S) -> R>(&self, f: F) -> R
     pub fn with_service<S: 'static, R, F: FnOnce(&S) -> R>(&self, f: F) -> R
@@ -63,14 +62,13 @@ impl ServiceStack {
     {
     {
         let id = TypeId::of::<S>();
         let id = TypeId::of::<S>();
 
 
-        self.service_map
-            .get(&id)
-            .expect("asked for service that doesn't exist!")
-            .deref()
-            .as_any()
-            .downcast_ref::<Rc<S>>()
-            .map(|s| f(s.as_ref()))
-            .unwrap()
+        let svc = {
+            let svcs = self.service_map.borrow();
+            svcs.get(&id).expect("asked for service that doesn't exist!")
+                .deref().as_any().downcast_ref::<Rc<S>>().unwrap().clone()
+        };
+
+        f(svc.as_ref())
     }
     }
 
 
     pub fn event_root(&self) -> &EventRoot {
     pub fn event_root(&self) -> &EventRoot {

+ 22 - 16
fleck/src/service/event.rs

@@ -221,8 +221,8 @@ impl<Tag: 'static, Data: 'static> ChannelSpec for (Tag, Data) {
 }
 }
 
 
 pub struct EventRoot<Context: 'static + ?Sized> {
 pub struct EventRoot<Context: 'static + ?Sized> {
-    channels: HashMap<std::any::TypeId, Rc<dyn std::any::Any>>,
-    metadata: HashMap<std::any::TypeId, (&'static str, Rc<dyn ChannelMetadata<Context>>)>,
+    pub(crate) channels: RefCell<HashMap<std::any::TypeId, Rc<dyn std::any::Any>>>,
+    metadata: RefCell<HashMap<std::any::TypeId, (&'static str, Rc<dyn ChannelMetadata<Context>>)>>,
     _ghost: std::marker::PhantomData<Context>,
     _ghost: std::marker::PhantomData<Context>,
 }
 }
 
 
@@ -236,27 +236,33 @@ impl<Context: 'static + ?Sized> Default for EventRoot<Context> {
     }
     }
 }
 }
 
 
+// pub type ChannelRef<'a, CS: ChannelSpec, Context> = crate::helper::OwnedRef<'a, HashMap<std::any::TypeId, Rc<dyn std::any::Any>>, Channel<CS::Tag, Context, CS::Data, CS::Priority>>;
+
 impl<Context: 'static + ?Sized> EventRoot<Context> {
 impl<Context: 'static + ?Sized> EventRoot<Context> {
-    pub fn create_channel<CS: ChannelSpec>(&mut self) {
+    pub fn create_channel<CS: ChannelSpec>(&self) {
         let tid = std::any::TypeId::of::<Channel<CS::Tag, Context, CS::Data, CS::Priority>>();
         let tid = std::any::TypeId::of::<Channel<CS::Tag, Context, CS::Data, CS::Priority>>();
 
 
         let ch = Rc::new(Channel::<CS::Tag, Context, CS::Data, CS::Priority>::default());
         let ch = Rc::new(Channel::<CS::Tag, Context, CS::Data, CS::Priority>::default());
-        self.metadata
+        self.metadata.borrow_mut()
             .insert(tid, (std::any::type_name::<CS>(), ch.clone()));
             .insert(tid, (std::any::type_name::<CS>(), ch.clone()));
-        self.channels.insert(tid, ch);
+        self.channels.borrow_mut().insert(tid, ch);
     }
     }
 
 
-    pub fn channel<CS: ChannelSpec>(&self) -> &Channel<CS::Tag, Context, CS::Data, CS::Priority> {
+    pub fn channel<'l, 's: 'l, CS: ChannelSpec>(&'s self) -> Rc<Channel<CS::Tag, Context, CS::Data, CS::Priority>> {
         let tid = std::any::TypeId::of::<Channel<CS::Tag, Context, CS::Data, CS::Priority>>();
         let tid = std::any::TypeId::of::<Channel<CS::Tag, Context, CS::Data, CS::Priority>>();
 
 
-        match self.channels.get(&tid) {
-            Some(ch) => ch.downcast_ref().expect("internal inconsistency"),
-            None => {
-                panic!(
-                    "Asked for channel {} that has not been created!",
-                    std::any::type_name::<CS>()
-                )
-            }
+        let own = self.channels.borrow();
+
+        if own.contains_key(&tid) {
+            let entry = own.get(&tid).unwrap().clone();
+
+            entry.downcast::<Channel<CS::Tag, Context, CS::Data, CS::Priority>>().expect("internal inconsistency?")
+        }
+        else {
+            panic!(
+                "Asked for channel {} that has not been created!",
+                std::any::type_name::<CS>()
+            )
         }
         }
     }
     }
 
 
@@ -265,7 +271,7 @@ impl<Context: 'static + ?Sized> EventRoot<Context> {
         while any {
         while any {
             any = false;
             any = false;
 
 
-            for ch in &self.metadata {
+            for ch in self.metadata.borrow().iter() {
                 let count = ch.1 .1.fire_all(context);
                 let count = ch.1 .1.fire_all(context);
                 if count > 0 {
                 if count > 0 {
                     log::trace!("Queue {} processed {} event(s)", ch.1 .0, count);
                     log::trace!("Queue {} processed {} event(s)", ch.1 .0, count);
@@ -310,7 +316,7 @@ mod tests {
     struct IntTag;
     struct IntTag;
     type IntChannel = (IntTag, i32);
     type IntChannel = (IntTag, i32);
 
 
-    #[derive(PartialEq, PartialOrd)]
+    #[derive(PartialEq, PartialOrd, Clone)]
     enum IntPriority {
     enum IntPriority {
         First,
         First,
         Second,
         Second,