Pārlūkot izejas kodu

Begin adding EncryptionService.

Kestrel 2 gadi atpakaļ
vecāks
revīzija
48eb14cb7a

+ 6 - 3
fleck/src/fleck_core.rs

@@ -5,9 +5,9 @@ pub mod msg;
 pub mod node;
 pub mod peer;
 
-// pub use nodes::Node;
-pub use msg::MessageService;
-pub use node::NodeService;
+pub use msg::{Message,MessageService,MessageParams,MessageChannel};
+pub use node::{Node,NodeService,NodeRegistrationChannel};
+pub use peer::{Peer,PeerService};
 
 pub mod channel_tags {
     pub struct SendPacketTag {}
@@ -27,6 +27,8 @@ pub type SendPacketChannel = (channel_tags::SendPacketTag, msg::Message, SendOrd
 
 #[derive(PartialEq, PartialOrd, Clone)]
 pub enum ReceiveOrder {
+    /// Annotate with various properties based on message source
+    Annotate,
     Decryption,
     Verify,
     Parse,
@@ -65,5 +67,6 @@ impl crate::service::DefaultService for CoreInitService {
         api.add_service::<discovery::LocalDiscovery>();
         api.add_service::<peer::PeerService>();
         api.add_service::<crypto::SignPacket>();
+        api.add_service::<crypto::EncryptionService>();
     }
 }

+ 61 - 20
fleck/src/fleck_core/crypto.rs

@@ -29,12 +29,16 @@ pub struct SymmetricKey {
     nonce: [u8; 16],
 }
 
-#[derive(Default)]
-pub(crate) struct VerifyPacketSignature {}
+pub(crate) struct VerifyPacketSignature {
+    api: std::rc::Rc<crate::API>,
+}
 
-impl DefaultService for VerifyPacketSignature {
-    fn setup(self: &std::rc::Rc<Self>, api: std::rc::Rc<crate::API>) {
-        api.channel::<fleck_core::ReceivePacketChannel>().sub_opt(
+impl Service for VerifyPacketSignature {
+    fn new(api: std::rc::Rc<crate::API>) -> Self {
+        Self { api }
+    }
+    fn setup(self: &std::rc::Rc<Self>) {
+        self.api.channel::<fleck_core::ReceivePacketChannel>().sub_opt(
             fleck_core::ReceiveOrder::Verify,
             self,
             Self::check,
@@ -49,17 +53,19 @@ impl VerifyPacketSignature {
     ) -> Option<fleck_core::msg::Message> {
         match message.crypto_header {
             PacketHeader::Signed(signature) => {
+                // check that we have a pubkey for the node, reject if not
+                let pubkey = message.node.as_ref().and_then(|n| Some(n.pubkey()));
+                if pubkey.is_none() {
+                    return None
+                }
+                let pubkey = pubkey.unwrap();
+
                 message.crypto_header = PacketHeader::NoCrypto;
                 let to_verify = bincode::serialize(&message)
                     .expect("assume that we can re-serialize the message");
 
-                // signature.verify(to_verify, signature);
-
-                // let pubkey = api.services().with_service(|ns: &fleck_core::Nodes| { ns.self_node().pubkey() });
-
-                // match pubkey {
-                // msg.crypto_header = PacketHeader::Signed(keypair.sign(&sign_data));
-                // }
+                use ed25519_dalek::Verifier;
+                pubkey.verify(&to_verify, &signature).ok()?;
             }
             _ => {}
         }
@@ -97,14 +103,15 @@ impl SignPacket {
         }
 
         self.api.with_service(|ns: &fleck_core::NodeService| {
-            let keypair = ns.self_node().keypair().unwrap();
+            let ns = ns.self_node();
+            let keypair = ns.keypair().unwrap();
             let sign_data = bincode::serialize(&message.content).unwrap();
             message.crypto_header = PacketHeader::Signed(keypair.sign(&sign_data));
         });
     }
 }
 
-#[derive(serde::Serialize, serde::Deserialize)]
+#[derive(serde::Serialize, serde::Deserialize, Debug)]
 pub struct KeyExchange {
     key: x25519_dalek::PublicKey,
 }
@@ -122,6 +129,25 @@ pub enum KeyExchangeState {
     Completed(x25519_dalek::PublicKey, SymmetricKey),
 }
 
+impl std::fmt::Debug for KeyExchangeState {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::NoState => {
+                f.write_str("KeyExchangeState::NoState")
+            },
+            Self::Given(pkey) => {
+                f.write_fmt(format_args!("KeyExchangeState::Given {{ pkey: {:?} }}", pkey))
+            },
+            Self::WaitForResponse(_) => {
+                f.write_str("KeyExchangeState::WaitForResponse { redacted }")
+            },
+            Self::Completed(pkey, _) => {
+                f.write_fmt(format_args!("KeyExchangeState::Completed {{ pkey: {:?}, redacted }}", pkey))
+            },
+        }
+    }
+}
+
 pub struct EncryptionService {
     api: std::rc::Rc<crate::API>,
     state: RefCell<HashMap<fleck_core::peer::Peer, KeyExchangeState>>,
@@ -133,6 +159,9 @@ impl Service for EncryptionService {
     }
 
     fn setup(self: &std::rc::Rc<Self>) {
+        self.api.with_service(|msg: &fleck_core::MessageService| {
+            msg.add_message_type::<KeyExchange>();
+        });
         self.api.channel::<fleck_core::node::NodeRegistrationChannel>().sub_ref(self, Self::new_node);
         self.api.channel::<fleck_core::msg::MessageChannel<KeyExchange>>().sub_eat(self, Self::incoming);
     }
@@ -152,19 +181,24 @@ impl EncryptionService {
         let secret = x25519_dalek::EphemeralSecret::new(&mut csprng);
 
         // send notification with new secret
-        self.api.queue::<fleck_core::SendPacketChannel>(fleck_core::msg::Message::build(
-            KeyExchange {
-                key: x25519_dalek::PublicKey::from(&secret)
-            }
-        ));
+        let send = || {
+            log::trace!("sending KeyExchange message");
+            self.api.queue::<fleck_core::SendPacketChannel>(fleck_core::msg::Message::build(
+                KeyExchange {
+                    key: x25519_dalek::PublicKey::from(&secret)
+                }
+            ).with_peer(with.clone()));
+        };
 
         let mut states = self.state.borrow_mut();
         let state = states.remove(&with).unwrap_or(KeyExchangeState::NoState);
         match state {
             KeyExchangeState::NoState => {
+                send();
                 states.insert(with.clone(), KeyExchangeState::WaitForResponse(secret));
             },
             KeyExchangeState::Given(pubkey) => {
+                send();
                 let result = secret.diffie_hellman(&pubkey);
 
                 let sk = SymmetricKey {
@@ -172,13 +206,16 @@ impl EncryptionService {
                     nonce: result.as_bytes()[16..32].try_into().unwrap(),
                 };
 
+                log::trace!("Completed key negotiation with peer!");
                 states.insert(with.clone(), KeyExchangeState::Completed(pubkey, sk));
             },
             KeyExchangeState::WaitForResponse(secret) => {
+                send();
                 states.insert(with.clone(), KeyExchangeState::WaitForResponse(secret));
             },
             KeyExchangeState::Completed(_pubkey, _symmetric_key) => {
                 log::error!("asked to begin handshake when already have a good encryption key?");
+                return
             },
         }
     }
@@ -186,8 +223,9 @@ impl EncryptionService {
     fn incoming(&self, msg: (fleck_core::msg::Metadata, KeyExchange)) {
         let mut states = self.state.borrow_mut();
         let peer = msg.0.peer.expect("incoming KeyExchange should have a peer?");
-        // let state = states.entry(peer.clone()).or_insert(KeyExchangeState::NoState);
+
         let state = states.remove(&peer).unwrap_or(KeyExchangeState::NoState);
+        log::trace!("incoming key exchange message: {:?}, state: {:?}", msg.1, &state);
         match state {
             KeyExchangeState::NoState => {
                 states.insert(peer.clone(), KeyExchangeState::Given(msg.1.key));
@@ -214,6 +252,7 @@ impl EncryptionService {
                 };
 
                 states.insert(peer.clone(), KeyExchangeState::Completed(msg.1.key, sk));
+                drop(states);
             },
             KeyExchangeState::Completed(peer_pubkey, _symmetric_key) => {
                 if peer_pubkey == msg.1.key {
@@ -226,5 +265,7 @@ impl EncryptionService {
                 self.begin_handshake(&peer);
             },
         }
+        log::trace!("after processing incoming key exchange message, state: {:?}", self.state.borrow().get(&peer));
+
     }
 }

+ 87 - 24
fleck/src/fleck_core/discovery.rs

@@ -1,21 +1,54 @@
 use crate::prelude::*;
 
 use serde::{Deserialize, Serialize};
-use std::{cell::RefCell, rc::Rc};
-
-pub struct LocalDiscovery {
-    api: Rc<crate::API>,
-    targets: RefCell<Vec<fleck_core::peer::Peer>>,
-}
+use std::{cell::RefCell, rc::Rc, collections::HashSet};
 
 #[derive(Serialize, Deserialize, Debug)]
 struct DiscoveryMsg {
     pkey: fleck_core::crypto::PublicKey,
+    ttl: u8
 }
 
 impl fleck_core::msg::MessageParams for DiscoveryMsg {
     const NAME: &'static str = "DiscoveryMsg";
     const ENCRYPTED: bool = false;
+    const SIGNED: bool = false;
+}
+
+/// Handles incoming DiscoveryMsgs
+pub struct DiscoveryService {
+    api: Rc<crate::API>,
+}
+
+impl Service for DiscoveryService {
+    fn new(api: Rc<crate::API>) -> Self {
+        Self { api }
+    }
+
+    fn setup(self: &Rc<Self>) {
+        self.api.channel::<fleck_core::msg::MessageChannel<DiscoveryMsg>>()
+            .sub_eat(self, Self::discovery);
+    }
+}
+
+impl DiscoveryService {
+    fn discovery(&self, msg: (fleck_core::msg::Metadata, DiscoveryMsg)) {
+        let discovery = msg.1;
+        log::trace!("received discovery message");
+
+        // this will automatically ignore self-messages because we already know our own pubkey
+        self.api.with_service(|ns: &fleck_core::NodeService| {
+            if ns.node_by_pubkey(&discovery.pkey).is_none() {
+                // XXX ns.inform_of_peer(discovery.pkey, msg.0.peer.unwrap());
+            }
+        });
+    }
+}
+
+/// Periodically sends out multicast messages to look for peers on local network(s)
+pub struct LocalDiscovery {
+    api: Rc<crate::API>,
+    targets: RefCell<Vec<fleck_core::peer::Peer>>,
 }
 
 impl Service for LocalDiscovery {
@@ -31,8 +64,6 @@ impl Service for LocalDiscovery {
         });
         self.api.channel::<fleck_core::MajorTickChannel>()
             .sub_ref(self, LocalDiscovery::process_major_tick);
-        self.api.channel::<fleck_core::msg::MessageChannel<DiscoveryMsg>>()
-            .sub_eat(self, LocalDiscovery::discovery);
     }
 }
 
@@ -41,27 +72,13 @@ impl LocalDiscovery {
         self.targets.borrow_mut().push(peer);
     }
 
-    fn discovery(&self, msg: (fleck_core::msg::Metadata, DiscoveryMsg)) {
-        let discovery = msg.1;
-        log::trace!("received discovery message");
-
-        // this will automatically ignore self-messages because we already know the pubkey
-        self.api.with_service(|ns: &fleck_core::NodeService| {
-            if ns.node_by_pubkey(&discovery.pkey).is_none() {
-                ns.inform_of_peer(msg.0.peer.unwrap());
-                // let node = fleck_core::node::Node::build_with_peer(msg.0.peer);
-                // node.set_pubkey(discovery.pkey);
-                // nodes.inform_of(node);
-            }
-        });
-    }
-
     fn process_major_tick(&self, _: &mut ()) {
         for peer in self.targets.borrow().iter() {
             let msg = fleck_core::msg::Message::build(DiscoveryMsg {
                 pkey: self.api.with_service(|ns: &fleck_core::NodeService| {
-                    ns.self_node().pubkey().unwrap().to_owned()
+                    ns.self_node().pubkey().to_owned()
                 }),
+                ttl: 1,
             })
             .with_peer(peer.clone());
 
@@ -69,3 +86,49 @@ impl LocalDiscovery {
         }
     }
 }
+
+/// Queries peers that have sent us a message for their pubkeys if not already known;
+/// can also be used to introduce a remote peer by address.
+pub struct PeerDiscovery {
+    api: Rc<crate::API>,
+    known: RefCell<HashSet<fleck_core::peer::Peer>>,
+}
+
+impl Service for PeerDiscovery {
+    fn new(api: Rc<crate::API>) -> Self {
+        Self {
+            api,
+            known: Default::default(),
+        }
+    }
+
+    fn setup(self: &Rc<Self>) {
+        self.api.channel::<fleck_core::ReceivePacketChannel>().sub_opt(fleck_core::ReceiveOrder::Annotate, self, Self::discover);
+    }
+}
+
+impl PeerDiscovery {
+    fn discover(&self, msg: fleck_core::Message) -> Option<fleck_core::Message> {
+        let peer = match &msg.peer {
+            Some(peer) => peer,
+            // don't touch if it doesn't have a peer
+            None => { return Some(msg) }
+        };
+        // check to see if we know this peer yet
+        if self.known.borrow().contains(&peer) {
+            self.new_peer(peer.clone());
+        }
+
+        Some(msg)
+    }
+
+    fn new_peer(&self, peer: fleck_core::Peer) {
+        let mut known = self.known.borrow_mut();
+        known.insert(peer.clone());
+
+        self.api.queue::<fleck_core::SendPacketChannel>(DiscoveryMsg {
+            pkey: self.api.with_service(|ns: &fleck_core::NodeService| ns.self_node().pubkey().clone()),
+            ttl: 2,
+        }.into());
+    }
+}

+ 17 - 1
fleck/src/fleck_core/msg.rs

@@ -64,12 +64,16 @@ pub struct Message {
     #[serde(skip_serializing)]
     #[serde(skip_deserializing)]
     pub(crate) peer: Option<super::peer::Peer>,
+
+    #[serde(skip_serializing)]
+    #[serde(skip_deserializing)]
+    pub(crate) node: Option<Rc<fleck_core::Node>>,
 }
 
 #[derive(Default)]
 pub struct Metadata {
     pub peer: Option<fleck_core::peer::Peer>,
-    pub node: Option<fleck_core::node::Node>,
+    pub node: Option<Rc<fleck_core::node::Node>>,
 }
 
 impl Message {
@@ -84,6 +88,7 @@ impl Message {
             saved_params: Some(SavedMessageParams::save::<M>()),
             parsed: Some(Box::new(from)),
             peer: None,
+            node: None,
         }
     }
 
@@ -98,6 +103,17 @@ impl Message {
         self.peer = Some(peer);
         self
     }
+
+    pub fn with_node(mut self, node: Rc<fleck_core::Node>) -> Self {
+        self.node = Some(node);
+        self
+    }
+}
+
+impl<T: 'static + Serialize + MessageParams> From<T> for Message {
+    fn from(m: T) -> Self {
+        Self::build(m)
+    }
 }
 
 pub struct MessageChannelTag;

+ 79 - 31
fleck/src/fleck_core/node.rs

@@ -3,7 +3,7 @@ use rand::prelude::*;
 use std::{
     cell::{Ref, RefCell},
     collections::{HashMap, HashSet},
-    rc::Rc,
+    rc::{Weak, Rc},
 };
 
 use crate::prelude::*;
@@ -37,10 +37,10 @@ impl<'l, T> std::ops::Deref for InnerRef<'l, T> {
     }
 }
 
-#[derive(Default)]
+#[derive(Debug,Default)]
 pub struct Node {
+    pubkey: PublicKey,
     peer: Option<fleck_core::peer::Peer>,
-    pubkey: RefCell<Option<PublicKey>>,
     keypair: RefCell<Option<Keypair>>,
 }
 
@@ -56,14 +56,14 @@ impl Node {
         self.peer.clone()
     }
 
-    pub fn pubkey(&self) -> Option<InnerRef<'_, PublicKey>> {
-        InnerRef::build(self.pubkey.borrow())
+    pub fn pubkey(&self) -> &PublicKey {
+        &self.pubkey
     }
     pub(crate) fn keypair(&self) -> Option<InnerRef<'_, Keypair>> {
         InnerRef::build(self.keypair.borrow())
     }
 
-    pub fn set_pubkey(&self, pk: PublicKey) {
+    /*pub fn set_pubkey(&self, pk: PublicKey) {
         let mut pkey = self.pubkey.borrow_mut();
         if pkey.is_some() {
             panic!("Tried to set public key on node that already has a public key!")
@@ -92,40 +92,34 @@ impl Node {
         let kp = Keypair::generate(&mut csprng);
         *pkey = Some(kp.public);
         *keyp = Some(kp);
-    }
+    }*/
 }
 
 pub struct RegTag {}
 pub type NodeRegistrationChannel = (RegTag, Rc<Node>);
 
 pub struct NodeService {
+    api: Rc<crate::API>,
     all_nodes: RefCell<Vec<Rc<Node>>>,
-    self_node: Rc<Node>,
+    // node_by_pubkey: RefCell<HashMap<fleck_core::crypto::PublicKey, Rc<Node>>>,
+    node_by_peer: RefCell<HashMap<fleck_core::Peer, Rc<Node>>>,
+    self_node: RefCell<Weak<Node>>,
 }
 
-impl Default for NodeService {
-    fn default() -> Self {
-        let ret = Self {
-            all_nodes: Default::default(),
-            self_node: Default::default(),
-        };
-
-        ret.all_nodes.borrow_mut().push(ret.self_node.clone());
-
-        ret
+impl NodeService {
+    pub fn self_node(&self) -> Rc<Node> {
+        self.self_node.borrow().upgrade().expect("asking for self node before it's been initialized!")
     }
-}
 
-impl NodeService {
-    pub fn self_node(&self) -> &Rc<Node> {
-        &self.self_node
+    pub fn use_ephemeral(&self) {
+        
     }
 
     pub fn node_by_pubkey(&self, pubkey: &PublicKey) -> Option<Rc<Node>> {
         self.all_nodes
             .borrow()
             .iter()
-            .find(|n| n.pubkey().as_deref() == Some(pubkey))
+            .find(|n| n.pubkey() == pubkey)
             .map(|r| r.to_owned())
     }
 
@@ -139,18 +133,72 @@ impl NodeService {
             .collect()
     }
 
-    pub fn inform_of(&self, node: Node) {
-        // TODO: check for duplicates here
-        self.all_nodes.borrow_mut().push(Rc::new(node));
+    // inform of new nodes 
+    /*
+    pub fn inform_of_peer(&self, peer: fleck_core::peer::Peer) -> Rc<Node> {
+        let mut map = self.node_by_peer.borrow_mut();
+        if !map.contains_key(&peer) {
+            let node = Rc::new(Node::build_with_peer(peer.clone()));
+            map.insert(peer, node.clone());
+            self.all_nodes.borrow_mut().push(node.clone());
+
+            self.api.queue::<NodeRegistrationChannel>(node);
+        }
+        
+        map.get(&peer).unwrap().clone()
+    }
+    */
+
+    /*pub fn inform_of_peer_pubkey(&self, peer: fleck_core::peer::Peer, pkey: fleck_core::crypto::PublicKey) -> Rc<Node> {
+        log::trace!("informed of peer {:?} / {:?}", pkey, peer);
+        let ret = self.inform_of_peer(peer);
+        match ret.pubkey() {
+            Some(existing_key) => {
+                if *existing_key != pkey {
+                    log::warn!("Public key of node has changed!");
+                }
+            },
+            None => ret.set_pubkey(pkey),
+        }
+        ret
+    }*/
+}
+
+impl Service for NodeService {
+    fn new(api: Rc<crate::API>) -> Self {
+        let ret = Self {
+            api,
+            all_nodes: Default::default(),
+            // node_by_pubkey: Default::default(),
+            node_by_peer: Default::default(),
+            self_node: Default::default(),
+        };
+
+        ret
     }
 
-    pub fn inform_of_peer(&self, peer: fleck_core::peer::Peer) {
-        todo!()
+    fn setup(self: &Rc<Self>) {
+        self.api.create_channel::<NodeRegistrationChannel>();
+
+        self.api.channel::<fleck_core::ReceivePacketChannel>()
+            .sub_ref(fleck_core::ReceiveOrder::Annotate, self, Self::annotate);
     }
 }
 
-impl DefaultService for NodeService {
-    fn setup(self: &Rc<Self>, api: Rc<crate::API>) {
-        api.create_channel::<NodeRegistrationChannel>();
+impl NodeService {
+    fn annotate(&self, msg: &mut fleck_core::Message) {
+        // annotate node based on peer
+        if let Some(peer) = &msg.peer {
+            let map = self.node_by_peer.borrow();
+            match map.get(&peer) {
+                Some(node) => {
+                    msg.node = Some(node.clone());
+                }
+                None => {
+                    // the interesting case, it's a new node!
+                    // TODO: send them a request for their public key
+                },
+            }
+        }
     }
 }

+ 0 - 1
fleck/src/fleck_core/peer.rs

@@ -69,7 +69,6 @@ pub struct PeerService {}
 impl DefaultService for PeerService {
     fn setup(self: &Rc<Self>, api: Rc<crate::API>) {
         api.channel::<fleck_core::SendPacketChannel>()
-            .as_ref()
             .sub_opt(fleck_core::SendOrder::Dispatch, self, Self::dispatch);
     }
 }

+ 6 - 0
fleck/src/lib.rs

@@ -31,6 +31,12 @@ impl API {
     }
 
     pub fn run(self: &Rc<Self>) {
+        // check state
+        self.with_service(|node: &fleck_core::NodeService| {
+            // check that we have a self node
+            node.self_node();
+        });
+
         self.with_service(|io: &fleck_core::io::IOService| {
             io.run();
         });