Browse Source

Successful end-to-end packet serialization and deserialization.

Kestrel 2 years ago
parent
commit
69e923c4ab

+ 0 - 2
fleck/Cargo.toml

@@ -17,8 +17,6 @@ rudp = "0.2.1"
 topological-sort = "0.2.2"
 
 # linux dependencies
-# epoll = "4.3.1"
-# nix = { version = "0.25.0", default-features=false, features = ["socket", "net"] }
 timerfd = "1.3.0"
 
 # crypto dependencies

+ 49 - 1
fleck/src/crypto.rs

@@ -1,4 +1,6 @@
-pub use ed25519_dalek::{Keypair, PublicKey};
+pub use ed25519_dalek::{Keypair, PublicKey, Signature, Signer};
+
+use crate::prelude::*;
 
 pub enum TrustLevel {
     Unknown,
@@ -22,7 +24,53 @@ pub enum KeyExchange {
     Completed(SymmetricKey),
 }
 
+#[derive(serde::Serialize, serde::Deserialize, Default, Debug)]
+pub enum PacketHeader {
+    #[default]
+    NoCrypto,
+    Signed(Signature),
+    // u64 here is the nonce
+    Encrypted(u64),
+}
+
 pub struct SymmetricKey {
     key: [u8; 16],
     nonce: [u8; 16],
 }
+
+#[derive(Default)]
+pub(crate) struct SignPacket {}
+
+impl Service for std::rc::Rc<SignPacket> {
+    fn register_channels(&self, eroot: &mut EventRoot) {
+        eroot
+            .priority_channel::<fleck_core::SendPacketChannel>()
+            .with::<order::Between<order::Postprocessing, order::Last>>()
+            .subscribe(self, SignPacket::sign);
+    }
+}
+
+impl SignPacket {
+    fn sign(&self, api: &crate::Fleck, packet: &mut FleckEvent<FleckPacket>) {
+        let msg = packet
+            .data
+            .msg
+            .as_mut()
+            .expect("packet got to signing stage without having a Msg?");
+
+        let params = msg
+            .saved_params
+            .expect("packet got to signing stage without saved parameters?");
+
+        // don't do anything if the packet doesn't want a signature
+        if !params.signed {
+            return;
+        }
+
+        api.services().with_service(|ns: &fleck_core::Nodes| {
+            let keypair = ns.self_node().keypair().unwrap();
+            let sign_data = bincode::serialize(msg).unwrap();
+            msg.crypto_header = PacketHeader::Signed(keypair.sign(&sign_data));
+        });
+    }
+}

+ 9 - 0
fleck/src/io.rs

@@ -22,10 +22,19 @@ pub struct Packet {
     pub(crate) addr: Option<std::net::SocketAddr>,
     pub(crate) data: Option<Vec<u8>>,
     pub(crate) io_channel: Option<std::rc::Rc<dyn IOChannel>>,
+    pub node: Option<crate::service::core::Node>,
     pub msg: Option<crate::msg::Message>,
 }
 
 impl Packet {
+    pub fn assume_msg(&self) -> &crate::msg::Message {
+        self.msg.as_ref().expect("told we could assume a Message was present")
+    }
+
+    pub fn as_msg<T: crate::msg::MessageParams + serde::de::DeserializeOwned>(&self) -> Option<&T> {
+        self.msg.as_ref().and_then(|m| m.downcast())
+    }
+
     pub fn is_clear(&self) -> bool {
         self.addr.is_none()
             && self.data.is_none()

+ 14 - 3
fleck/src/lib.rs

@@ -2,7 +2,6 @@ use std::cell::RefCell;
 use std::rc::Rc;
 
 mod msg;
-mod node;
 
 mod crypto;
 mod helper;
@@ -11,8 +10,11 @@ pub mod service;
 
 pub mod prelude {
     pub use crate::helper::{AsAny, IntoWeak};
+    pub use crate::io::Packet as FleckPacket;
     pub use crate::msg::Message;
-    pub use crate::service::Service;
+    pub use crate::service::{
+        core as fleck_core, event::Event as FleckEvent, order, ChannelSpec, EventRoot, Service,
+    };
 }
 
 use prelude::*;
@@ -43,9 +45,11 @@ impl Fleck {
         // Local discovery
         svcs.give_service::<service::core::LocalDiscovery>();
         // Parsing incoming packets
-        svcs.give_service::<service::core::ParsePacket>();
+        svcs.give_service::<msg::ParsePacket>();
         // Actually sending packets
         svcs.give_service::<service::core::SendPacket>();
+        // Signing packets
+        svcs.give_service::<crypto::SignPacket>();
     }
 
     pub fn run(&self) {
@@ -57,6 +61,13 @@ impl Fleck {
     pub(crate) fn raw_io(&self) -> &io::FleckIO {
         &self.io
     }
+    /// Add a service to the Fleck instance. Must not be invoked from a service context.
+    pub fn add_service<S: Service + Default + 'static>(&self)
+    where
+        Rc<S>: Service,
+    {
+        self.services.borrow_mut().give_service::<S>();
+    }
     pub fn services(&self) -> std::cell::Ref<service::ServiceStack> {
         self.services.borrow()
     }

+ 94 - 22
fleck/src/msg.rs

@@ -1,41 +1,78 @@
 use std::{
     any::Any,
+    collections::HashMap,
     hash::{Hash, Hasher},
 };
 
 use serde::{de::DeserializeOwned, Deserialize, Serialize};
-use std::collections::HashMap;
+
+use crate::prelude::*;
 
 const MESSAGE_MAGIC: u64 = 0x1234123412341234;
 
+pub trait MessageParams: 'static {
+    const NAME: &'static str;
+    const ENCRYPTED: bool = true;
+    const SIGNED: bool = false;
+}
+
+#[derive(Clone, Copy, Debug)]
+pub(crate) struct SavedMessageParams {
+    pub(crate) name: &'static str,
+    pub(crate) encrypted: bool,
+    pub(crate) signed: bool,
+}
+
+impl SavedMessageParams {
+    fn save<M: MessageParams>() -> Self {
+        Self {
+            name: M::NAME,
+            encrypted: M::ENCRYPTED,
+            signed: M::SIGNED,
+        }
+    }
+}
+
 fn message_type<M: MessageParams>() -> u16 {
     let mut hasher = std::collections::hash_map::DefaultHasher::new();
     M::NAME.hash(&mut hasher);
     (hasher.finish() & 0xffff) as u16
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Serialize, Deserialize, Debug)]
 pub struct Message {
     magic: u64,
-    ty: u16,
-    data: Vec<u8>,
+    pub(crate) ty: u16,
+    pub(crate) crypto_header: crate::crypto::PacketHeader,
+    pub(crate) data: Vec<u8>,
+
+    #[serde(skip_serializing)]
+    #[serde(skip_deserializing)]
+    pub(crate) saved_params: Option<SavedMessageParams>,
+
+    #[serde(skip_serializing)]
+    #[serde(skip_deserializing)]
+    pub(crate) parsed: Option<Box<dyn Any>>,
 }
 
 impl Message {
-    pub fn build<M: MessageParams + Serialize>(from: M) -> Self {
+    pub fn build<M: 'static + MessageParams + Serialize>(from: M) -> Self {
         Self {
             magic: MESSAGE_MAGIC,
             ty: message_type::<M>(),
-            data: vec![],
+            crypto_header: Default::default(),
+            data: bincode::serialize(&from).expect("couldn't serialize message"),
+            saved_params: Some(SavedMessageParams::save::<M>()),
+            parsed: Some(Box::new(from)),
         }
     }
 
-    pub fn is_type<M: MessageParams>(&self) -> bool {
+    pub fn is_type<M: MessageParams + DeserializeOwned>(&self) -> bool {
         self.ty == message_type::<M>()
     }
 
-    pub fn downcast<M: MessageParams + DeserializeOwned>(&self) -> Option<M> {
-        todo!()
+    pub fn downcast<M: MessageParams + DeserializeOwned>(&self) -> Option<&M> {
+        self.parsed.as_ref()?.downcast_ref()
     }
 }
 
@@ -45,28 +82,63 @@ pub struct MessageRegistry {
     deser: HashMap<u16, Box<Deserializer>>,
 }
 
-pub trait MessageParams {
-    const NAME: &'static str;
-    const ENCRYPTED: bool = true;
-}
-
 impl MessageRegistry {
-    pub fn add_message_type<MT: MessageParams + Serialize + DeserializeOwned>(&mut self) {
+    pub fn add_message_type<MT: std::fmt::Debug + MessageParams + Serialize + DeserializeOwned>(&mut self) {
         let derived_typeid = message_type::<MT>();
         self.deser.insert(
             derived_typeid,
             Box::new(|data| {
-                let boxed: Box<dyn Any> = Box::new(bincode::deserialize(data).ok()?);
-                Some(boxed)
+                Some(Box::new(bincode::deserialize::<MT>(data).ok()?))
             }),
         );
     }
 
-    pub(crate) fn deserialize(&self, data: &[u8]) -> Option<Box<dyn std::any::Any>> {
-        let msg_type = u16::from_le_bytes(data[0..1].try_into().unwrap());
-        match self.deser.get(&msg_type) {
-            Some(f) => f(&data[2..]),
-            None => None,
+    pub(crate) fn deserialize(&self, message: &mut Message) -> Option<Box<dyn std::any::Any>> {
+        match message.crypto_header {
+            crate::crypto::PacketHeader::NoCrypto => {}
+            _ => {
+                log::error!("given packet with crypto still applied to deserialize!");
+                return None;
+            }
         }
+
+        (self.deser.get(&message.ty)?)(&message.data)
+    }
+}
+
+#[derive(Default)]
+pub(crate) struct ParsePacket {}
+
+impl ParsePacket {}
+
+impl Service for std::rc::Rc<ParsePacket> {
+    fn register_channels(&self, eroot: &mut super::EventRoot) {
+        eroot
+            .priority_channel::<fleck_core::ReceivePacketChannel>()
+            .with::<order::Preprocessing>()
+            .subscribe(self, ParsePacket::process_incoming);
+    }
+}
+
+impl ParsePacket {
+    fn process_incoming(&self, api: &crate::Fleck, event: &mut FleckEvent<FleckPacket>) {
+        // try deserializing
+        let message: Option<crate::msg::Message> =
+            bincode::deserialize(event.data.data.as_ref().unwrap()).ok();
+        if message.is_none() {
+            return;
+        }
+        let mut message = message.unwrap();
+        // get message registry from api and add parsed Message
+        message.parsed = api.services().message_registry().deserialize(&mut message);
+        // if the parse failed, stop processing this packet.
+        if message.parsed.is_none() {
+            event.emptied = true;
+            return;
+        }
+        // remove raw data from packet
+        event.data.data.take();
+
+        event.data.msg = Some(message);
     }
 }

+ 18 - 24
fleck/src/service.rs

@@ -1,13 +1,11 @@
 use std::any::TypeId;
-use std::cell::RefCell;
 use std::collections::HashMap;
 use std::ops::Deref;
-use std::ops::DerefMut;
-use std::{borrow::BorrowMut, rc::Rc};
+use std::rc::Rc;
 
-use crate::io::{FleckIO, Packet};
+use crate::io::Packet;
 
-mod event;
+pub(crate) mod event;
 mod lowlevel;
 mod nodes;
 mod priority;
@@ -19,28 +17,23 @@ pub use event::ChannelSpec;
 use self::priority::AbstractServicePriority;
 
 pub mod order {
-    pub use super::priority::Never;
+    pub use super::priority::{Never, ServicePriority as Between};
 
     pub struct FirstTag;
     pub type First = super::priority::Fixpoint<FirstTag>;
 
-    pub struct PreprocessingTag;
-    pub type Preprocessing = super::priority::Fixpoint<PreprocessingTag>;
-
-    pub struct ProcessingTag;
-    pub type Processing = super::priority::Fixpoint<ProcessingTag>;
-
-    pub struct PostprocessingTag;
-    pub type Postprocessing = super::priority::Fixpoint<PostprocessingTag>;
+    pub type Preprocessing = Between<First, Processing>;
+    pub type Processing = Between<First, Last>;
+    pub type Postprocessing = Between<First, Processing>;
 
     pub struct LastTag;
-    pub type Last = super::priority::Fixpoint<FirstTag>;
+    pub type Last = super::priority::Fixpoint<LastTag>;
 }
 
 pub mod core {
-    pub(crate) use super::lowlevel::{LocalDiscovery, ParsePacket, SendPacket};
+    pub(crate) use super::lowlevel::{LocalDiscovery, SendPacket};
 
-    pub use super::nodes::Nodes;
+    pub use super::nodes::{Node, Nodes};
 
     pub mod channel_tags {
         #[derive(Default)]
@@ -61,11 +54,6 @@ pub type EventRoot = event::EventRoot<crate::Fleck>;
 
 #[allow(unused_variables)]
 pub trait Service: std::any::Any + crate::helper::AsAny {
-    /*fn process_incoming(&mut self, api: &crate::Fleck, packet: &mut Packet) {}
-    fn process_outgoing(&mut self, api: &crate::Fleck, packet: &mut Packet) {}
-    fn process_minor_tick(&mut self, api: &crate::Fleck) {}
-    fn process_major_tick(&mut self, api: &crate::Fleck) {}*/
-
     fn register_channels(&self, eroot: &mut EventRoot) {}
     fn register_msgs(&self, registry: &mut crate::msg::MessageRegistry) {}
 }
@@ -100,6 +88,8 @@ impl ServiceStack {
         srv.register_msgs(&mut self.message_registry);
         srv.register_channels(&mut self.eroot);
 
+        log::trace!("storing service {} as {:?}", std::any::type_name::<S>(), TypeId::of::<S>());
+
         self.service_map
             .insert(TypeId::of::<S>(), Box::new(srv.clone()));
         self.services.push(Box::new(srv));
@@ -116,14 +106,18 @@ impl ServiceStack {
             .expect("asked for service that doesn't exist!")
             .deref()
             .as_any()
-            .downcast_ref()
-            .map(f)
+            .downcast_ref::<Rc<S>>()
+            .map(|s| f(s.as_ref()))
             .unwrap()
     }
 
     pub fn event_root(&self) -> &EventRoot {
         &self.eroot
     }
+
+    pub fn message_registry(&self) -> &crate::msg::MessageRegistry {
+        &self.message_registry
+    }
 }
 
 impl ServiceStack {

+ 14 - 3
fleck/src/service/event.rs

@@ -290,7 +290,10 @@ impl<Context: 'static + ?Sized> EventRoot<Context> {
         match self.channels.get(&tid) {
             Some(ch) => ch.downcast_ref().expect("internal inconsistency"),
             None => {
-                panic!("Asked for channel that has not been created!")
+                panic!(
+                    "Asked for channel {} that has not been created!",
+                    std::any::type_name::<CS>()
+                )
             }
         }
     }
@@ -303,7 +306,16 @@ impl<Context: 'static + ?Sized> EventRoot<Context> {
         match self.channels.get(&tid) {
             Some(ch) => ch.downcast_ref().expect("internal inconsistency"),
             None => {
-                panic!("Asked for priority channel that has not been created!")
+                if self.channels.contains_key(&std::any::TypeId::of::<
+                    EventChannel<CS::Tag, Context, CS::Data>,
+                >()) {
+                    panic!("Asked for priority channel {}, which does not exist, but there is a non-priority channel!", std::any::type_name::<CS>())
+                } else {
+                    panic!(
+                        "Asked for priority channel {} that has not been created!",
+                        std::any::type_name::<CS>()
+                    )
+                }
             }
         }
     }
@@ -357,7 +369,6 @@ mod tests {
     }
 
     struct IntTag;
-
     type IntChannel = (IntTag, i32);
 
     #[test]

+ 24 - 25
fleck/src/service/lowlevel.rs

@@ -9,26 +9,6 @@ use crate::io::Packet;
 use crate as fleck;
 use crate::msg::MessageParams;
 
-#[derive(Default)]
-pub(crate) struct ParsePacket {}
-
-impl ParsePacket {}
-
-impl Service for Rc<ParsePacket> {
-    fn register_channels(&self, eroot: &mut super::EventRoot) {
-        // eroot.channel::<>().subscribe(self, Self::process_incoming);
-    }
-}
-
-impl ParsePacket {
-    fn process_incoming(&mut self, api: &crate::Fleck, _packet: &mut Packet) {
-        // try deserializing
-        // get message registry from api
-        // remove data from packet, add parsed Msg instead
-        println!("incoming packet!");
-    }
-}
-
 pub struct SendPacketTag {}
 pub type SendPacketChannel = (SendPacketTag, crate::io::Packet);
 
@@ -41,7 +21,7 @@ impl Service for Rc<SendPacket> {
         eroot
             .priority_channel::<SendPacketChannel>()
             .with::<order::Last>()
-            .subscribe(self, |s, api, packet| s.process_outgoing(api, packet));
+            .subscribe(self, SendPacket::process_outgoing);
     }
 }
 
@@ -69,7 +49,7 @@ impl SendPacket {
 #[derive(Default)]
 pub(crate) struct LocalDiscovery;
 
-#[derive(Serialize, Deserialize)]
+#[derive(Serialize, Deserialize, Debug)]
 struct DiscoveryMsg {
     pkey: crate::crypto::PublicKey,
 }
@@ -85,21 +65,40 @@ impl Service for Rc<LocalDiscovery> {
     }
 
     fn register_channels(self: &Self, eroot: &mut super::EventRoot) {
-        log::info!("LocalDiscovery registering channels...");
         eroot
             .channel::<MajorTickChannel>()
             .subscribe(self, LocalDiscovery::process_major_tick);
+        eroot
+            .priority_channel::<super::core::ReceivePacketChannel>()
+            .with::<order::Processing>()
+            .subscribe(self, LocalDiscovery::packet);
     }
 }
 
 impl LocalDiscovery {
-    fn process_major_tick(&self, api: &crate::Fleck, _: &mut Event<()>) {
-        log::info!("LocalDiscovery major tick!");
+    fn packet(&self, api: &crate::Fleck, pkt: &mut Event<Packet>) {
+        if let Some(discovery) = pkt.data.as_msg::<DiscoveryMsg>() {
+            let own_msg = api.services().with_service(|n: &super::core::Nodes| {
+                Some(&discovery.pkey) == n.self_node().pubkey()
+                || discovery.pkey == Default::default()
+            });
+
+            // if is self-message, ignore
+            if own_msg {
+                pkt.emptied = true;
+                return
+            }
 
+            log::trace!("received discovery message");
+        }
+    }
+
+    fn process_major_tick(&self, api: &crate::Fleck, _: &mut Event<()>) {
         api.queue_priority::<SendPacketChannel>(Packet {
             addr: Some((*crate::io::MULTICAST_ADDRESS).into()),
             data: None,
             io_channel: Some(api.raw_io().local().clone()),
+            node: None,
             msg: Some(crate::msg::Message::build(DiscoveryMsg {
                 pkey: Default::default(),
             })),

+ 62 - 7
fleck/src/service/nodes.rs

@@ -1,24 +1,79 @@
-use std::rc::Rc;
+use std::{rc::Rc, collections::{HashMap, HashSet}, cell::{RefCell, Ref}};
 
 use super::Service;
 
 use crate::crypto::{Keypair, PublicKey};
 
+pub struct InnerRef<'l, T> {
+    r: Ref<'l, Option<T>>
+}
+
+impl<'l, T> InnerRef<'l, T> {
+    fn build(from: Ref<'l, Option<T>>) -> Option<Self> {
+        if from.is_none() {
+            None
+        }
+        else {
+            Some(Self {
+                r: from
+            })
+        }
+    }
+}
+
+impl<'l, T> AsRef<T> for InnerRef<'l, T> {
+    fn as_ref(&self) -> &T {
+        self.r.as_ref().unwrap()
+    }
+}
+
+impl<'l, T> std::ops::Deref for InnerRef<'l, T> {
+    type Target = T;
+    fn deref(&self) -> &Self::Target {
+        self.r.as_ref().unwrap()
+    }
+}
+
+#[derive(Default)]
 pub struct Node {
     addr: Option<std::net::SocketAddr>,
     pubkey: Option<PublicKey>,
-    keypair: Option<Keypair>,
+    keypair: RefCell<Option<Keypair>>,
 }
 
-#[derive(Default)]
-pub struct Nodes {}
+impl Node {
+    pub fn pubkey(&self) -> Option<&PublicKey> {
+        self.pubkey.as_ref()
+    }
+    pub(crate) fn keypair(&self) -> Option<InnerRef<'_, Keypair>> {
+        InnerRef::build(self.keypair.borrow())
+    }
+}
+
+pub struct Nodes {
+    all_nodes: Vec<Rc<Node>>,
+    self_node: Rc<Node>,
+}
+
+impl Default for Nodes {
+    fn default() -> Self {
+        let mut ret = Self {
+            all_nodes: Default::default(),
+            self_node: Default::default(),
+        };
+
+        ret.all_nodes.push(ret.self_node.clone());
+
+        ret
+    }
+}
 
 impl Nodes {
-    pub fn self_node(&self) -> &Node {
-        todo!()
+    pub fn self_node(&self) -> &Rc<Node> {
+        &self.self_node
     }
 
-    pub fn node_by_pubkey(&self, pubkey: ()) -> Option<&Node> {
+    pub fn node_by_pubkey(&self, pubkey: PublicKey) -> Option<Rc<Node>> {
         None
     }
 }