Просмотр исходного кода

Fleshed out event system some more.

Kestrel 2 лет назад
Родитель
Сommit
3b1eef5b18

+ 98 - 29
Cargo.lock

@@ -38,10 +38,24 @@ dependencies = [
 ]
 
 [[package]]
-name = "autocfg"
-version = "1.1.0"
+name = "aho-corasick"
+version = "0.7.19"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
+checksum = "b4f55bd91a0978cbfd91c457a164bab8b4001c833b7f323132c0a4e1922dd44e"
+dependencies = [
+ "memchr",
+]
+
+[[package]]
+name = "atty"
+version = "0.2.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
+dependencies = [
+ "hermit-abi",
+ "libc",
+ "winapi",
+]
 
 [[package]]
 name = "bincode"
@@ -183,13 +197,16 @@ dependencies = [
 ]
 
 [[package]]
-name = "epoll"
-version = "4.3.1"
+name = "env_logger"
+version = "0.7.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "20df693c700404f7e19d4d6fae6b15215d2913c27955d2b9d6f2c0f537511cd0"
+checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36"
 dependencies = [
- "bitflags",
- "libc",
+ "atty",
+ "humantime",
+ "log",
+ "regex",
+ "termcolor",
 ]
 
 [[package]]
@@ -220,11 +237,10 @@ dependencies = [
  "aes-gcm",
  "bincode",
  "ed25519-dalek",
- "epoll",
  "lazy_static",
  "log",
  "mio",
- "nix",
+ "pretty_env_logger",
  "rand 0.8.5",
  "rudp",
  "serde",
@@ -275,6 +291,24 @@ dependencies = [
  "polyval",
 ]
 
+[[package]]
+name = "hermit-abi"
+version = "0.1.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
+dependencies = [
+ "libc",
+]
+
+[[package]]
+name = "humantime"
+version = "1.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
+dependencies = [
+ "quick-error",
+]
+
 [[package]]
 name = "inout"
 version = "0.1.3"
@@ -318,13 +352,10 @@ dependencies = [
 ]
 
 [[package]]
-name = "memoffset"
-version = "0.6.5"
+name = "memchr"
+version = "2.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce"
-dependencies = [
- "autocfg",
-]
+checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
 
 [[package]]
 name = "metashell"
@@ -345,19 +376,6 @@ dependencies = [
  "windows-sys",
 ]
 
-[[package]]
-name = "nix"
-version = "0.25.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e322c04a9e3440c327fca7b6c8a63e6890a32fa2ad689db972425f07e0d22abb"
-dependencies = [
- "autocfg",
- "bitflags",
- "cfg-if",
- "libc",
- "memoffset",
-]
-
 [[package]]
 name = "opaque-debug"
 version = "0.3.0"
@@ -382,6 +400,16 @@ version = "0.2.17"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
 
+[[package]]
+name = "pretty_env_logger"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "926d36b9553851b8b0005f1275891b392ee4d2d833852c417ed025477350fb9d"
+dependencies = [
+ "env_logger",
+ "log",
+]
+
 [[package]]
 name = "proc-macro2"
 version = "0.4.30"
@@ -400,6 +428,12 @@ dependencies = [
  "unicode-ident",
 ]
 
+[[package]]
+name = "quick-error"
+version = "1.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
+
 [[package]]
 name = "quote"
 version = "0.6.13"
@@ -489,6 +523,23 @@ dependencies = [
  "rand_core 0.5.1",
 ]
 
+[[package]]
+name = "regex"
+version = "1.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e076559ef8e241f2ae3479e36f97bd5741c0330689e217ad51ce2c76808b868a"
+dependencies = [
+ "aho-corasick",
+ "memchr",
+ "regex-syntax",
+]
+
+[[package]]
+name = "regex-syntax"
+version = "0.6.28"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848"
+
 [[package]]
 name = "rudp"
 version = "0.2.1"
@@ -601,6 +652,15 @@ dependencies = [
  "unicode-xid 0.2.4",
 ]
 
+[[package]]
+name = "termcolor"
+version = "1.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755"
+dependencies = [
+ "winapi-util",
+]
+
 [[package]]
 name = "timerfd"
 version = "1.3.0"
@@ -684,6 +744,15 @@ version = "0.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
 
+[[package]]
+name = "winapi-util"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
+dependencies = [
+ "winapi",
+]
+
 [[package]]
 name = "winapi-x86_64-pc-windows-gnu"
 version = "0.4.0"

+ 6 - 3
fleck/Cargo.toml

@@ -12,13 +12,13 @@ serde = { version = "1.0", features = ["derive"] }
 bincode = "1.3.3"
 log = "0.4.17"
 lazy_static = "1.4.0"
-mio = "0.8.5"
+mio = { version = "0.8.5", features = ["os-poll", "os-ext", "net"] }
 rudp = "0.2.1"
 topological-sort = "0.2.2"
 
 # linux dependencies
-epoll = "4.3.1"
-nix = { version = "0.25.0", default-features=false, features = ["socket", "net"] }
+# epoll = "4.3.1"
+# nix = { version = "0.25.0", default-features=false, features = ["socket", "net"] }
 timerfd = "1.3.0"
 
 # crypto dependencies
@@ -26,3 +26,6 @@ aes-gcm = "0.10.1"
 ed25519-dalek = { version = "1.0.1", features = ["serde"] }
 x25519-dalek = { version = "1.2.0" }
 rand = "0.8"
+
+[dev-dependencies]
+pretty_env_logger = "0.4.0"

+ 3 - 1
fleck/examples/simple_node.rs

@@ -1,5 +1,7 @@
 fn main() {
-    let mut fleck = fleck::Fleck::new();
+    pretty_env_logger::init_timed();
+
+    let fleck = fleck::Fleck::new();
 
     fleck.run();
 }

+ 1 - 1
fleck/src/crypto.rs

@@ -1,4 +1,4 @@
-use ed25519_dalek::{Keypair, PublicKey};
+pub use ed25519_dalek::{Keypair, PublicKey};
 
 pub enum TrustLevel {
     Unknown,

+ 8 - 0
fleck/src/helper.rs

@@ -0,0 +1,8 @@
+use std::any::Any;
+
+pub trait AsAny {
+    fn as_any(&self) -> &dyn Any;
+}
+impl<T: Any> AsAny for T {
+    fn as_any(&self) -> &dyn Any { self }
+}

+ 230 - 20
fleck/src/io.rs

@@ -1,4 +1,11 @@
-// pub mod linux;
+use std::cell::{Cell, RefCell};
+use std::collections::VecDeque;
+use std::ops::DerefMut;
+use std::os::unix::prelude::AsRawFd;
+use std::rc::Rc;
+
+use mio::unix::SourceFd;
+use mio::{Events, Interest, Poll, Token};
 
 lazy_static::lazy_static! {
     pub static ref MULTICAST_ADDRESS : std::net::SocketAddrV4 =
@@ -8,13 +15,32 @@ lazy_static::lazy_static! {
         );
 }
 
+pub const DEFAULT_PORT: u16 = 3535;
+
+#[derive(Default)]
 pub struct Packet {
     pub(crate) addr: Option<std::net::SocketAddr>,
     pub(crate) data: Option<Vec<u8>>,
-    pub(crate) channel: Option<std::rc::Rc<dyn IOChannel>>,
+    pub(crate) io_channel: Option<std::rc::Rc<dyn IOChannel>>,
     pub msg: Option<crate::msg::Message>,
 }
 
+impl Packet {
+    pub fn is_clear(&self) -> bool {
+        self.addr.is_none()
+            && self.data.is_none()
+            && self.io_channel.is_none()
+            && self.msg.is_none()
+    }
+
+    pub fn clear(&mut self) {
+        self.addr.take();
+        self.data.take();
+        self.io_channel.take();
+        self.msg.take();
+    }
+}
+
 pub(crate) trait IOChannel {
     fn send_packet(&self, packet: &mut Packet);
 }
@@ -25,36 +51,220 @@ pub(crate) trait IOFeedback {
     fn major_tick(&self);
 }
 
-/*pub(crate) trait FleckIO {
-    fn poll<'a>(&self, f: &dyn IOFeedback);
+pub struct FleckIO {
+    poll: Rc<RefCell<mio::Poll>>,
+    local: Vec<std::rc::Rc<dyn IOChannel>>,
+    global: std::rc::Rc<SocketWrapperChannel>,
 
-    fn local(&self) -> std::rc::Rc<dyn IOChannel>;
-    fn global(&self) -> std::rc::Rc<dyn IOChannel>;
+    minor: timerfd::TimerFd,
+    major: timerfd::TimerFd,
 }
 
-pub(crate) fn platform() -> std::rc::Rc<dyn FleckIO> {
-    std::rc::Rc::new(linux::LinuxIO::default())
-}*/
-
-pub struct FleckIO {
-    local: Vec<std::rc::Rc<dyn IOChannel>>,
-    global: std::rc::Rc<dyn IOChannel>,
+struct SocketWrapperChannel {
+    poll: Rc<RefCell<mio::Poll>>,
+    socket: RefCell<mio::net::UdpSocket>,
+    queue: RefCell<VecDeque<(Vec<u8>, std::net::SocketAddr)>>,
+    write_registered: Cell<bool>,
 }
 
-impl FleckIO {
-    pub(crate) fn poll(&self, f: &dyn IOFeedback) {}
+impl SocketWrapperChannel {
+    fn new(poll: Rc<RefCell<mio::Poll>>, socket: mio::net::UdpSocket) -> Self {
+        Self {
+            poll,
+            socket: RefCell::new(socket),
+            queue: Default::default(),
+            write_registered: Cell::new(false),
+        }
+    }
+
+    fn read_packets(&self, feedback: &dyn IOFeedback) {
+        let mut buffer = [0u8; 2048];
 
-    pub(crate) fn local(&self) -> &std::rc::Rc<dyn IOChannel> {
-        todo!()
+        loop {
+            match self.socket.borrow().recv_from(&mut buffer) {
+                Ok((len, addr)) => {
+                    feedback.packet(Packet {
+                        addr: Some(addr),
+                        data: Some(buffer[..len].into()),
+                        ..Default::default()
+                    });
+                }
+                Err(_) => break,
+            }
+        }
     }
 
-    pub(crate) fn global(&self) -> &std::rc::Rc<dyn IOChannel> {
-        &self.global
+    fn write_packets(&self) {
+        let mut queue = self.queue.borrow_mut();
+        while !queue.is_empty() {
+            let pkt = queue.front().unwrap();
+            match self.socket.borrow().send_to(&pkt.0, pkt.1) {
+                Ok(len) => {
+                    if len != pkt.0.len() {
+                        log::warn!("Packet of {} bytes truncated to {} bytes when sending", pkt.0.len(), len);
+                    }
+                    queue.pop_front();
+                },
+                Err(e) => {
+                    log::info!("Error while sending packet: {:?}", e);
+                    break
+                }
+            }
+        }
+
+        if queue.is_empty() {
+            self.poll
+                .borrow()
+                .registry()
+                .reregister(
+                    self.socket.borrow_mut().deref_mut(),
+                    FleckIO::GLOBAL,
+                    Interest::READABLE,
+                )
+                .expect("couldn't update interest?");
+            self.write_registered.set(false);
+        }
+    }
+}
+
+impl IOChannel for SocketWrapperChannel {
+    fn send_packet(&self, packet: &mut Packet) {
+        assert!(packet.data.is_some());
+        assert!(packet.addr.is_some());
+
+        self.queue
+            .borrow_mut()
+            .push_back((packet.data.clone().unwrap(), packet.addr.unwrap()));
+        if !self.write_registered.get() {
+            self.write_registered.set(true);
+            self.poll
+                .borrow()
+                .registry()
+                .reregister(
+                    self.socket.borrow_mut().deref_mut(),
+                    FleckIO::GLOBAL,
+                    Interest::READABLE.add(Interest::WRITABLE),
+                )
+                .expect("couldn't update interest?");
+        }
     }
 }
 
 impl Default for FleckIO {
     fn default() -> Self {
-        todo!()
+        let poll = Poll::new().expect("couldn't create mio::Poll");
+
+        // minor tick registration
+        let mut minor = timerfd::TimerFd::new().unwrap();
+        minor.set_state(
+            timerfd::TimerState::Periodic {
+                current: std::time::Duration::new(1, 0),
+                interval: std::time::Duration::new(1, 0),
+            },
+            timerfd::SetTimeFlags::Default,
+        );
+        poll.registry()
+            .register(
+                &mut SourceFd(&minor.as_raw_fd()),
+                Self::MINOR_TICK,
+                Interest::READABLE,
+            )
+            .expect("couldn't register read interest for minor timer?");
+
+        // major tick registration
+        let mut major = timerfd::TimerFd::new().unwrap();
+        major.set_state(
+            timerfd::TimerState::Periodic {
+                current: std::time::Duration::new(1, 0),
+                interval: std::time::Duration::new(15, 0),
+            },
+            timerfd::SetTimeFlags::Default,
+        );
+        poll.registry()
+            .register(
+                &mut SourceFd(&major.as_raw_fd()),
+                Self::MAJOR_TICK,
+                Interest::READABLE,
+            )
+            .expect("couldn't register read interest for major timer?");
+
+        // global socket
+        let mut global_socket = mio::net::UdpSocket::bind(
+            std::net::SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, DEFAULT_PORT).into(),
+        )
+        .expect("couldn't listen on UDP port?");
+        global_socket
+            .join_multicast_v4(&MULTICAST_ADDRESS.ip(), &std::net::Ipv4Addr::UNSPECIFIED)
+            .expect("couldn't join multicast group?");
+
+        poll.registry()
+            .register(&mut global_socket, Self::GLOBAL, Interest::READABLE)
+            .expect("couldn't register read interest in UDP port?");
+
+        let poll = Rc::new(RefCell::new(poll));
+
+        Self {
+            global: Rc::new(SocketWrapperChannel::new(poll.clone(), global_socket)),
+            local: Default::default(),
+            poll,
+            major,
+            minor
+        }
     }
 }
+
+impl FleckIO {
+    fn handle_event(&self, feedback: &dyn IOFeedback, event: &mio::event::Event) {
+        match event.token() {
+            Self::GLOBAL => {
+                if event.is_readable() {
+                    self.global.read_packets(feedback);
+                }
+                if event.is_writable() {
+                    self.global.write_packets();
+                }
+            }
+            Self::MINOR_TICK => {
+                assert!(event.is_readable());
+                self.minor.read();
+                feedback.minor_tick();
+            }
+            Self::MAJOR_TICK => {
+                assert!(event.is_readable());
+                self.major.read();
+                feedback.major_tick();
+            }
+            _ => {
+                println!("unknown token!");
+            }
+        }
+    }
+
+    pub(crate) fn poll(&self, feedback: &dyn IOFeedback) {
+        let mut events = Events::with_capacity(128);
+        loop {
+            let pr = self.poll.borrow_mut().poll(&mut events, None);
+            match pr {
+                Ok(()) => {
+                    for evt in &events {
+                        self.handle_event(feedback, evt);
+                    }
+                }
+                Err(_e) => {}
+            }
+        }
+    }
+
+    pub(crate) fn local(&self) -> std::rc::Rc<dyn IOChannel> {
+        // HACK: for now we just use the global
+        self.global.clone()
+    }
+
+    pub(crate) fn global(&self) -> std::rc::Rc<dyn IOChannel> {
+        self.global.clone()
+    }
+
+    const GLOBAL: Token = Token(0);
+    const MAJOR_TICK: Token = Token(1);
+    const MINOR_TICK: Token = Token(2);
+}

+ 17 - 17
fleck/src/lib.rs

@@ -6,18 +6,17 @@ mod node;
 
 mod crypto;
 mod io;
+mod helper;
 pub mod service;
 
 pub mod prelude {
     pub use crate::msg::Message;
-    pub use crate::service::{Service, API as ServiceAPI};
+    pub use crate::service::{Service, ServiceAPI};
 }
 
-use crate as fleck;
-
 use prelude::*;
 
-use io::{FleckIO, IOChannel};
+use io::FleckIO;
 
 pub struct Fleck {
     io: Rc<FleckIO>,
@@ -29,7 +28,7 @@ pub struct Fleck {
 
 impl Fleck {
     pub fn new() -> Rc<Self> {
-        let mut res = Rc::new(Self {
+        let res = Rc::new(Self {
             io: Rc::new(FleckIO::default()),
             services: RefCell::new(service::ServiceStack::new()),
             message_registry: msg::MessageRegistry::new(),
@@ -43,12 +42,13 @@ impl Fleck {
 
     fn register_core_services(&self) {
         let mut svcs = self.services.borrow_mut();
-        // Inserted in send-processing order
-
+        // Node registration
+        svcs.give_service(service::core::Nodes::default());
         // Local discovery
-        svcs.give_service(service::core::LocalDiscovery::new());
-
-        // Finally, send the packet
+        svcs.give_service(service::core::LocalDiscovery::default());
+        // Parsing incoming packets
+        svcs.give_service(service::core::ParsePacket::default());
+        // Actually sending packets
         svcs.give_service(service::core::SendPacket::default());
     }
 
@@ -57,7 +57,7 @@ impl Fleck {
     }
 
     fn send_queued_packets(&self) {
-        let mut svcs = self.services.borrow_mut();
+        let svcs = self.services.borrow();
         for mut pkt in self.queued_packets.borrow_mut().drain(..) {
             svcs.process_outgoing(self, &mut pkt);
         }
@@ -71,24 +71,24 @@ impl ServiceAPI for Fleck {
     fn send_packet(&self, pkt: io::Packet) {
         self.queued_packets.borrow_mut().push(pkt);
     }
+    fn services(&self) -> std::cell::Ref<service::ServiceStack> {
+        self.services.borrow()
+    }
 }
 
 impl io::IOFeedback for Fleck {
     fn packet(&self, mut packet: io::Packet) {
-        let mut svcs = self.services.borrow_mut();
-        svcs.process_incoming(self, &mut packet);
+        self.services.borrow().process_incoming(self, &mut packet);
         self.send_queued_packets();
     }
 
     fn minor_tick(&self) {
-        let mut svcs = self.services.borrow_mut();
-        svcs.process_minor_tick(self);
+        self.services.borrow().process_minor_tick(self);
         self.send_queued_packets();
     }
 
     fn major_tick(&self) {
-        let mut svcs = self.services.borrow_mut();
-        svcs.process_major_tick(self);
+        self.services.borrow().process_major_tick(self);
         self.send_queued_packets();
     }
 }

+ 15 - 15
fleck/src/msg.rs

@@ -8,37 +8,39 @@ use std::collections::HashMap;
 
 const MESSAGE_MAGIC: u64 = 0x1234123412341234;
 
-#[derive(Serialize, Deserialize)]
-#[repr(u16)]
-pub enum HeaderType {
-    Hello,
-    KeepAlive,
-    UserType(u8),
+fn message_type<M: MessageParams>() -> u16 {
+    let mut hasher = std::collections::hash_map::DefaultHasher::new();
+    M::NAME.hash(&mut hasher);
+    (hasher.finish() & 0xffff) as u16
 }
 
 #[derive(Serialize, Deserialize)]
 pub struct Message {
-    ht: HeaderType,
     magic: u64,
+    ty: u16,
+    data: Vec<u8>,
 }
 
+pub struct MessageBuilder {}
+
 impl Message {
-    pub fn build(ht: HeaderType) -> Self {
+    pub fn build<M: MessageParams + Serialize>(from: M) -> Self {
         Self {
-            ht,
             magic: MESSAGE_MAGIC,
+            ty: message_type::<M>(),
+            data: vec![]
         }
     }
 }
 
 type Deserializer = dyn Fn(&[u8]) -> Option<Box<dyn std::any::Any>>;
-pub(crate) struct MessageRegistry {
+pub struct MessageRegistry {
     deser: HashMap<u16, Box<Deserializer>>,
 }
 
 pub trait MessageParams {
     const NAME: &'static str;
-    const ENCRYPTED: bool;
+    const ENCRYPTED: bool = true;
 }
 
 impl MessageRegistry {
@@ -48,10 +50,8 @@ impl MessageRegistry {
         }
     }
 
-    pub(crate) fn add_message_type<MT: MessageParams + Serialize + DeserializeOwned>(&mut self) {
-        let mut hasher = std::collections::hash_map::DefaultHasher::new();
-        MT::NAME.hash(&mut hasher);
-        let derived_typeid = (hasher.finish() & 0xffff) as u16;
+    pub fn add_message_type<MT: MessageParams + Serialize + DeserializeOwned>(&mut self) {
+        let derived_typeid = message_type::<MT>();
         self.deser.insert(
             derived_typeid,
             Box::new(|data| {

+ 51 - 32
fleck/src/service.rs

@@ -1,11 +1,15 @@
+use std::any::TypeId;
 use std::cell::RefCell;
+use std::collections::HashMap;
 use std::ops::DerefMut;
+use std::ops::Deref;
 use std::{borrow::BorrowMut, rc::Rc};
 
 use crate::io::{FleckIO, Packet};
 
 mod lowlevel;
 mod priority;
+mod nodes;
 
 pub use priority::{Never, ServicePriority as Priority};
 
@@ -30,20 +34,25 @@ pub mod order {
     pub type Last = super::priority::Fixpoint<FirstTag>;
 }
 
-pub(crate) mod core {
-    pub(crate) use super::lowlevel::{LocalDiscovery, SendPacket};
+pub mod core {
+    pub(crate) use super::lowlevel::{LocalDiscovery, SendPacket, ParsePacket};
+
+    pub use super::nodes::Nodes;
 }
 
-pub trait API {
+pub trait ServiceAPI {
     fn raw_io(&self) -> &FleckIO;
     fn send_packet(&self, pkt: Packet);
+    fn services(&self) -> std::cell::Ref<ServiceStack>;
 }
 
-pub trait Service {
-    fn process_incoming(&mut self, _api: &dyn API, _packet: &mut Packet) {}
-    fn process_outgoing(&mut self, _api: &dyn API, _packet: &mut Packet) {}
-    fn process_minor_tick(&mut self, _api: &dyn API) {}
-    fn process_major_tick(&mut self, _api: &dyn API) {}
+pub trait Service : std::any::Any + crate::helper::AsAny {
+    fn process_incoming(&mut self, _api: &dyn ServiceAPI, _packet: &mut Packet) {}
+    fn process_outgoing(&mut self, _api: &dyn ServiceAPI, _packet: &mut Packet) {}
+    fn process_minor_tick(&mut self, _api: &dyn ServiceAPI) {}
+    fn process_major_tick(&mut self, _api: &dyn ServiceAPI) {}
+
+    fn register_msgs(&self, _registry: &mut crate::msg::MessageRegistry) {}
 }
 
 pub trait IncomingPriority {
@@ -94,23 +103,18 @@ macro_rules! service_priority {
 pub struct ServiceStack {
     services: Vec<Rc<RefCell<dyn Service>>>,
 
+    service_map: HashMap<TypeId, Rc<RefCell<dyn Service>>>,
+
     incoming_order: priority::TotalOrder<Rc<RefCell<dyn Service>>>,
     outgoing_order: priority::TotalOrder<Rc<RefCell<dyn Service>>>,
 
-    incoming_cache: Option<Vec<Rc<RefCell<dyn Service>>>>,
-    outgoing_cache: Option<Vec<Rc<RefCell<dyn Service>>>>,
+    incoming_cache: RefCell<Option<Vec<Rc<RefCell<dyn Service>>>>>,
+    outgoing_cache: RefCell<Option<Vec<Rc<RefCell<dyn Service>>>>>,
 }
 
 impl ServiceStack {
     pub(crate) fn new() -> Self {
         Self::default()
-        /*Self {
-            api: None,
-            services: Vec::new(),
-
-            incoming_order: Default::default(),
-            outgoing_order: Default::default(),
-        }*/
     }
 
     pub fn give_service<S: 'static + Service + IncomingPriority + OutgoingPriority>(
@@ -119,28 +123,43 @@ impl ServiceStack {
     ) {
         let srv = Rc::new(RefCell::new(srv));
 
-        self.incoming_cache = None;
-        self.outgoing_cache = None;
+        self.incoming_cache = RefCell::new(None);
+        self.outgoing_cache = RefCell::new(None);
 
         if !<<S as IncomingPriority>::Priority as AbstractServicePriority>::NEVER {
             self.incoming_order
                 .add_priority::<<S as IncomingPriority>::Priority>(srv.clone());
         }
         if !<<S as OutgoingPriority>::Priority as AbstractServicePriority>::NEVER {
-            self.incoming_order
+            self.outgoing_order
                 .add_priority::<<S as OutgoingPriority>::Priority>(srv.clone());
         }
+        self.service_map.insert(TypeId::of::<S>(), srv.clone());
         self.services.push(srv);
     }
+
+    pub fn with_service<S: Service + 'static, R, F: FnOnce(&S) -> R>(&self, f: F) -> R {
+        let id = TypeId::of::<S>();
+
+        self.service_map
+            .get(&id)
+            .expect("asked for service that doesn't exist!")
+            .borrow()
+            .deref()
+            .as_any()
+            .downcast_ref()
+            .map(f)
+            .unwrap()
+    }
 }
 
-impl Service for ServiceStack {
-    fn process_incoming(&mut self, api: &dyn API, packet: &mut Packet) {
-        let services = self
-            .incoming_cache
-            .get_or_insert_with(|| self.incoming_order.clone().order());
+impl ServiceStack {
+    pub(crate) fn process_incoming(&self, api: &dyn ServiceAPI, packet: &mut Packet) {
+        let mut cache = self.incoming_cache.borrow_mut();
+        let services = cache.get_or_insert_with(|| self.incoming_order.clone().order());
 
         for srv in services {
+            if packet.is_clear() { break }
             srv.as_ref()
                 .borrow_mut()
                 .deref_mut()
@@ -148,20 +167,20 @@ impl Service for ServiceStack {
         }
     }
 
-    fn process_outgoing(&mut self, api: &dyn API, packet: &mut Packet) {
-        let services = self
-            .outgoing_cache
-            .get_or_insert_with(|| self.outgoing_order.clone().order());
+    pub(crate) fn process_outgoing(&self, api: &dyn ServiceAPI, packet: &mut Packet) {
+        let mut cache = self.outgoing_cache.borrow_mut();
+        let services = cache.get_or_insert_with(|| self.outgoing_order.clone().order());
 
         for srv in services {
+            if packet.is_clear() { break }
             srv.as_ref()
                 .borrow_mut()
                 .deref_mut()
-                .process_incoming(api, packet);
+                .process_outgoing(api, packet);
         }
     }
 
-    fn process_minor_tick(&mut self, api: &dyn API) {
+    pub(crate) fn process_minor_tick(&self, api: &dyn ServiceAPI) {
         for srv in &self.services {
             srv.as_ref()
                 .borrow_mut()
@@ -170,7 +189,7 @@ impl Service for ServiceStack {
         }
     }
 
-    fn process_major_tick(&mut self, api: &dyn API) {
+    pub(crate) fn process_major_tick(&self, api: &dyn ServiceAPI) {
         for srv in &self.services {
             srv.as_ref()
                 .borrow_mut()

+ 51 - 15
fleck/src/service/lowlevel.rs

@@ -1,21 +1,39 @@
-use super::{order, Priority, Service, API};
+use serde::{Serialize, Deserialize};
+
+use super::{order, Priority, Service, ServiceAPI};
 use crate::io::Packet;
 
 use crate as fleck;
+use crate::msg::MessageParams;
+
+crate::service_priority!(
+    ParsePacket,
+    in: Priority<order::First, order::Preprocessing>
+);
+#[derive(Default)]
+pub(crate) struct ParsePacket {}
+
+impl Service for ParsePacket {
+    fn process_incoming(&mut self, api: &dyn ServiceAPI, _packet: &mut Packet) {
+        // try deserializing
+        // get message registry from api
+        // remove data from packet, add parsed Msg instead
+        println!("incoming packet!");
+    }
+}
 
 crate::service_priority!(
     SendPacket,
     out: Priority<order::Postprocessing, order::Last>
 );
-
 #[derive(Default)]
 pub(crate) struct SendPacket {}
 
 impl Service for SendPacket {
-    fn process_outgoing(&mut self, api: &dyn API, packet: &mut crate::io::Packet) {
+    fn process_outgoing(&mut self, api: &dyn ServiceAPI, packet: &mut crate::io::Packet) {
         // use default channel if not specified
-        if packet.channel.is_none() {
-            packet.channel = Some(api.raw_io().global().clone());
+        if packet.io_channel.is_none() {
+            packet.io_channel = Some(api.raw_io().global().clone());
         }
 
         // serialize if needed
@@ -23,7 +41,7 @@ impl Service for SendPacket {
             packet.data = Some(bincode::serialize(&msg).expect("failed to serialize message"));
         }
 
-        match &mut packet.channel {
+        match &mut packet.io_channel {
             Some(ch) => ch.clone().send_packet(packet),
             _ => {
                 println!("tried to send packet with no channel?");
@@ -32,22 +50,40 @@ impl Service for SendPacket {
     }
 }
 
-crate::service_priority!(LocalDiscovery);
-pub(crate) struct LocalDiscovery {}
+crate::service_priority!(LocalDiscovery, out: Priority<order::Processing, order::Postprocessing>);
+#[derive(Default)]
+pub(crate) struct LocalDiscovery;
 
-impl LocalDiscovery {
-    pub(crate) fn new() -> Self {
-        Self {}
-    }
+
+#[derive(Serialize, Deserialize)]
+struct DiscoveryMsg {
+    pkey: crate::crypto::PublicKey
+}
+
+impl MessageParams for DiscoveryMsg {
+    const NAME: &'static str = "DiscoveryMsg";
+    const ENCRYPTED: bool = false;
 }
 
 impl Service for LocalDiscovery {
-    fn process_major_tick(&mut self, api: &dyn API) {
+    fn register_msgs(&self, registry: &mut fleck::msg::MessageRegistry) {
+        registry.add_message_type::<DiscoveryMsg>();
+    }
+
+    fn process_incoming(&mut self, api: &dyn ServiceAPI, packet: &mut Packet) {
+
+    }
+
+    fn process_major_tick(&mut self, api: &dyn ServiceAPI) {
+        api.services().with_service(|n: &super::nodes::Nodes| {
+            
+        });
+
         api.send_packet(Packet {
             addr: Some((*crate::io::MULTICAST_ADDRESS).into()),
             data: None,
-            channel: Some(api.raw_io().local().clone()),
-            msg: Some(crate::msg::Message::build(crate::msg::HeaderType::Hello)),
+            io_channel: Some(api.raw_io().local().clone()),
+            msg: Some(crate::msg::Message::build(DiscoveryMsg { pkey: Default::default() })),
         });
     }
 }

+ 30 - 0
fleck/src/service/nodes.rs

@@ -0,0 +1,30 @@
+use super::Service;
+use crate as fleck;
+
+use crate::crypto::{PublicKey,Keypair};
+
+pub struct Node {
+    addr: Option<std::net::SocketAddr>,
+    pubkey: Option<PublicKey>,
+    keypair: Option<Keypair>,
+}
+
+crate::service_priority!(Nodes);
+#[derive(Default)]
+pub struct Nodes {
+    
+}
+
+impl Nodes {
+    pub fn self_node(&self) -> &Node {
+        todo!()
+    }
+
+    pub fn node_by_pubkey(&self, pubkey: ()) -> Option<&Node> {
+        None
+    }
+}
+
+impl Service for Nodes {
+
+}

+ 1 - 5
fleck/src/service/priority.rs

@@ -137,10 +137,6 @@ impl<Assoc: Clone> TotalOrder<Assoc> {
         self.walk_priorities::<SP>(&mut visited, &assoc);
     }
 
-    pub(crate) fn add_placeholder_priority<SP: AbstractServicePriority>(&mut self) {
-        self.add_priority_internal::<SP>(None);
-    }
-
     pub(crate) fn add_priority<SP: AbstractServicePriority>(&mut self, assoc: Assoc) {
         self.add_priority_internal::<SP>(Some(assoc));
     }
@@ -164,7 +160,7 @@ impl<Assoc: Clone> TotalOrder<Assoc> {
         let mut tid_order = vec![];
         while tsort.pop().map(|tid| tid_order.push(tid)).is_some() {}
         // we expect there to be two items left in the tsort: MaxPriority and Never
-        if !tsort.is_empty()  {
+        if !tsort.is_empty() {
             panic!("Circular dependency detected! tsort: {:?}", tsort);
         }