Browse Source

Reorganized slightly, discovery multicast messages delivered once again.

Kestrel 2 năm trước cách đây
mục cha
commit
9139d81619

+ 17 - 4
fleck/examples/simple_node.rs

@@ -1,20 +1,33 @@
-use std::net::{IpAddr, SocketAddr};
+use std::net::SocketAddr;
 
-use fleck::{fleck_core, prelude::*};
+use fleck::fleck_core;
 
 fn main() {
     pretty_env_logger::init_timed();
 
     let fleck = fleck::Fleck::new();
 
-    fleck.with_service(|io: &fleck_core::io::IOService| {
+    // generate ephemeral keypair
+    fleck.with_service(|ns: &fleck_core::NodeService| {
+        ns.self_node().gen_keypair();
+    });
+
+    // create UDP socket for communication with a local multicast group
+    let sockref = fleck.with_service(|io: &fleck_core::io::IOService| {
         fleck_core::io::UdpSocketBuilder::default()
             .bind_to(SocketAddr::new(
                 std::net::Ipv4Addr::UNSPECIFIED.into(),
                 3535,
             ))
             .join_multicast("239.0.239.0".parse().unwrap(), "0.0.0.0".parse().unwrap())
-            .build(io);
+            .build(io)
+    });
+
+    // enable local discovery
+    fleck.with_service(|ld: &fleck_core::discovery::LocalDiscovery| {
+        for peer in sockref.multicast_peers() {
+            ld.add_multicast_peer(peer);
+        }
     });
 
     fleck.run();

+ 7 - 4
fleck/src/fleck_core.rs

@@ -1,13 +1,13 @@
-// pub(crate) mod lowlevel;
 pub mod crypto;
-pub mod nodes;
+pub mod discovery;
 pub mod io;
 pub mod msg;
+pub mod node;
 pub mod peer;
 
 // pub use nodes::Node;
-pub use nodes::NodeService;
 pub use msg::MessageService;
+pub use node::NodeService;
 
 pub mod channel_tags {
     pub struct SendPacketTag {}
@@ -57,6 +57,9 @@ impl crate::service::Service for std::rc::Rc<CoreInitService> {
                 .build(io);
         });
 
-        api.add_service::<nodes::NodeService>();
+        api.add_service::<msg::MessageService>();
+        api.add_service::<node::NodeService>();
+        api.add_service::<discovery::LocalDiscovery>();
+        api.add_service::<peer::PeerService>();
     }
 }

+ 20 - 11
fleck/src/fleck_core/crypto.rs

@@ -43,28 +43,35 @@ pub(crate) struct VerifyPacketSignature {}
 
 impl Service for std::rc::Rc<VerifyPacketSignature> {
     fn setup(&self, api: &crate::Fleck) {
-        api
-            .channel::<fleck_core::ReceivePacketChannel>()
-            .sub_opt(fleck_core::ReceiveOrder::Verify, self, VerifyPacketSignature::check);
+        api.channel::<fleck_core::ReceivePacketChannel>().sub_opt(
+            fleck_core::ReceiveOrder::Verify,
+            self,
+            VerifyPacketSignature::check,
+        );
     }
 }
 
 impl VerifyPacketSignature {
-    fn check(&self, api: &crate::Fleck, mut message: fleck_core::msg::Message) -> Option<fleck_core::msg::Message> {
+    fn check(
+        &self,
+        api: &crate::Fleck,
+        mut message: fleck_core::msg::Message,
+    ) -> Option<fleck_core::msg::Message> {
         match message.crypto_header {
             PacketHeader::Signed(signature) => {
                 message.crypto_header = PacketHeader::NoCrypto;
-                let to_verify = bincode::serialize(&message).expect("assume that we can re-serialize the message");
+                let to_verify = bincode::serialize(&message)
+                    .expect("assume that we can re-serialize the message");
 
                 // signature.verify(to_verify, signature);
-                
+
                 // let pubkey = api.services().with_service(|ns: &fleck_core::Nodes| { ns.self_node().pubkey() });
 
                 // match pubkey {
-                    // msg.crypto_header = PacketHeader::Signed(keypair.sign(&sign_data));
+                // msg.crypto_header = PacketHeader::Signed(keypair.sign(&sign_data));
                 // }
             }
-            _ => {},
+            _ => {}
         }
 
         Some(message)
@@ -76,9 +83,11 @@ pub(crate) struct SignPacket {}
 
 impl Service for std::rc::Rc<SignPacket> {
     fn setup(&self, api: &crate::Fleck) {
-        api
-            .channel::<fleck_core::SendPacketChannel>()
-            .sub_ref(fleck_core::SendOrder::Sign, self, SignPacket::sign);
+        api.channel::<fleck_core::SendPacketChannel>().sub_ref(
+            fleck_core::SendOrder::Sign,
+            self,
+            SignPacket::sign,
+        );
     }
 }
 

+ 88 - 0
fleck/src/fleck_core/discovery.rs

@@ -0,0 +1,88 @@
+use crate::prelude::*;
+
+use serde::{Deserialize, Serialize};
+use std::{cell::RefCell, rc::Rc};
+
+#[derive(Default)]
+pub struct LocalDiscovery {
+    targets: RefCell<Vec<fleck_core::peer::Peer>>,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+struct DiscoveryMsg {
+    pkey: fleck_core::crypto::PublicKey,
+}
+
+impl fleck_core::msg::MessageParams for DiscoveryMsg {
+    const NAME: &'static str = "DiscoveryMsg";
+    const ENCRYPTED: bool = false;
+}
+
+impl Service for Rc<LocalDiscovery> {
+    fn setup(&self, api: &crate::Fleck) {
+        api.with_service(|m: &fleck_core::MessageService| {
+            m.add_message_type::<DiscoveryMsg>(api);
+        });
+        api.channel::<fleck_core::MajorTickChannel>()
+            .sub_ref(self, LocalDiscovery::process_major_tick);
+        api.channel::<fleck_core::msg::MessageChannel<DiscoveryMsg>>()
+            .sub_eat(self, LocalDiscovery::discovery);
+    }
+}
+
+impl LocalDiscovery {
+    pub fn add_multicast_peer(&self, peer: fleck_core::peer::Peer) {
+        self.targets.borrow_mut().push(peer);
+    }
+
+    fn discovery(&self, api: &crate::Fleck, msg: (fleck_core::msg::Metadata, DiscoveryMsg)) {
+        let discovery = msg.1;
+        log::trace!("received discovery message");
+
+        // this will automatically ignore self-messages because we already know the pubkey
+        api.with_service(|ns: &fleck_core::NodeService| {
+            if ns.node_by_pubkey(&discovery.pkey).is_none() {
+                // let node = fleck_core::Node::build_with_peer(.data.addr.unwrap());
+                // node.set_pubkey(discovery.pkey);
+                // nodes.inform_of(node);
+            }
+        });
+    }
+
+    /*fn packet(&self, api: &crate::Fleck, pkt: fleck_core::msg::Message) -> Option<fleck_core::msg::Message> {
+        /*if let Some(discovery) = pkt.data.as_msg::<DiscoveryMsg>() {
+            log::trace!("received discovery message");
+
+            // this will automatically ignore self-messages because we already know the pubkey
+            api.with_service(|nodes: &fleck_core::Nodes| {
+                if nodes.node_by_pubkey(&discovery.pkey).is_none() {
+                    let node = fleck_core::Node::build_with_addr(pkt.data.addr.unwrap());
+                    node.set_pubkey(discovery.pkey);
+                    nodes.inform_of(node);
+                }
+            });
+        }*/
+    }*/
+
+    fn process_major_tick(&self, api: &crate::Fleck, _: &mut ()) {
+        for peer in self.targets.borrow().iter() {
+            let msg = fleck_core::msg::Message::build(DiscoveryMsg {
+                pkey: api.with_service(|ns: &fleck_core::NodeService| {
+                    ns.self_node().pubkey().unwrap().to_owned()
+                }),
+            })
+            .with_peer(peer.clone());
+
+            api.queue::<fleck_core::SendPacketChannel>(msg);
+        }
+        /*api.queue::<fleck_core::SendPacketChannel>(FleckPacket {
+            addr: Some((*crate::io::MULTICAST_ADDRESS).into()),
+            data: None,
+            io_channel: Some(api.raw_io().local().clone()),
+            node: None,
+            msg: Some(crate::msg::Message::build(DiscoveryMsg {
+                pkey: api.with_service(|nodes: &fleck_core::Nodes| nodes.self_node().pubkey().unwrap().to_owned())
+            })),
+        });*/
+    }
+}

+ 32 - 5
fleck/src/fleck_core/io.rs

@@ -125,13 +125,13 @@ impl UdpSocketBuilder {
         self
     }
 
-    pub fn build(self, io: &IOService) {
+    pub fn build(self, io: &IOService) -> UdpSocketRef {
         let socket =
             mio::net::UdpSocket::bind(self.bind.expect("no bind address given to UDP socket!"))
                 .expect("couldn't bind UDP socket");
         let interest = io.register_interest(socket.as_raw_fd());
 
-        for mcast in self.multicast {
+        for mcast in &self.multicast {
             use std::net::IpAddr;
             match (mcast.0, mcast.1) {
                 (IpAddr::V4(group), IpAddr::V4(iface)) => {
@@ -151,9 +151,31 @@ impl UdpSocketBuilder {
             socket,
             queue: Default::default(),
             peers: Default::default(),
+            multicast_joined: self.multicast.iter().map(|e| e.0).collect(),
         });
 
         io.register_handler(&sock.interest, sock.clone());
+
+        UdpSocketRef { sock }
+    }
+}
+
+pub struct UdpSocketRef {
+    sock: Rc<UdpSocket>,
+}
+
+impl UdpSocketRef {
+    pub fn multicast_peers(&self) -> Vec<fleck_core::peer::Peer> {
+        let port = self.sock.socket.local_addr().unwrap().port();
+        let rh: Rc<dyn FdHandler> = self.sock.clone();
+        self.sock
+            .multicast_joined
+            .iter()
+            .map(|ip| {
+                self.sock
+                    .peer_for(&rh, &std::net::SocketAddr::new(*ip, port))
+            })
+            .collect()
     }
 }
 
@@ -162,11 +184,13 @@ struct UdpSocket {
     socket: mio::net::UdpSocket,
     queue: RefCell<VecDeque<(std::net::SocketAddr, Vec<u8>)>>,
     peers: RefCell<HashMap<std::net::SocketAddr, super::peer::Peer>>,
+    multicast_joined: Vec<std::net::IpAddr>,
 }
 
 impl UdpSocket {
     fn peer_for(&self, rc: &Rc<dyn FdHandler>, addr: &std::net::SocketAddr) -> super::peer::Peer {
-        if let Some(peer) = self.peers.borrow().get(addr) {
+        let mut peers = self.peers.borrow_mut();
+        if let Some(peer) = peers.get(addr) {
             peer.clone()
         } else {
             let peer = super::peer::Peer {
@@ -175,7 +199,7 @@ impl UdpSocket {
                     io: rc.clone(),
                 }),
             };
-            self.peers.borrow_mut().insert(*addr, peer.clone());
+            peers.insert(*addr, peer.clone());
             peer
         }
     }
@@ -193,7 +217,7 @@ impl FdHandler for UdpSocket {
     }
 
     fn ready_write(&self, _rc: &Rc<dyn FdHandler>, _api: &crate::Fleck) {
-        let queue = self.queue.borrow_mut();
+        let mut queue = self.queue.borrow_mut();
         while !queue.is_empty() {
             let packet = queue.front().unwrap();
 
@@ -205,6 +229,9 @@ impl FdHandler for UdpSocket {
                         packet.1.len()
                     );
                 }
+
+                drop(packet);
+                queue.pop_front();
             } else {
                 return;
             }

+ 0 - 94
fleck/src/fleck_core/lowlevel.rs

@@ -1,94 +0,0 @@
-use crate::prelude::*;
-
-use serde::{Deserialize, Serialize};
-use std::rc::Rc;
-
-#[derive(Default)]
-pub(crate) struct SendPacket {}
-
-impl Service for Rc<SendPacket> {
-    fn register_channels(&self, eroot: &mut EventRoot) {
-        eroot
-            .channel::<fleck_core::SendPacketChannel>()
-            // .with::<order::Last>()
-            .subscribe(fleck_core::SendOrder::Send, self, SendPacket::process_outgoing);
-    }
-}
-
-impl SendPacket {
-    fn process_outgoing(&self, api: &crate::Fleck, packet: &mut FleckEvent<FleckPacket>) {
-        // use default channel if not specified
-        if packet.data.io_channel.is_none() {
-            packet.data.io_channel = Some(api.raw_io().global().clone());
-        }
-
-        // serialize if needed
-        if let (false, Some(msg)) = (packet.data.data.as_ref().is_some(), packet.data.msg.take()) {
-            packet.data.data = Some(bincode::serialize(&msg).expect("failed to serialize message"));
-        }
-
-        match &mut packet.data.io_channel {
-            Some(ch) => ch.clone().send_packet(&mut packet.data),
-            _ => {
-                println!("tried to send packet with no channel?");
-            }
-        }
-    }
-}
-
-#[derive(Default)]
-pub(crate) struct LocalDiscovery;
-
-#[derive(Serialize, Deserialize, Debug)]
-struct DiscoveryMsg {
-    pkey: crate::crypto::PublicKey,
-}
-
-impl MessageParams for DiscoveryMsg {
-    const NAME: &'static str = "DiscoveryMsg";
-    const ENCRYPTED: bool = false;
-}
-
-impl Service for Rc<LocalDiscovery> {
-    fn register_msgs(&self, registry: &mut crate::msg::MessageRegistry) {
-        registry.add_message_type::<DiscoveryMsg>();
-    }
-
-    fn register_channels(self: &Self, eroot: &mut EventRoot) {
-        eroot
-            .channel::<fleck_core::MajorTickChannel>()
-            .subscribe(self, LocalDiscovery::process_major_tick);
-        eroot
-            .channel::<fleck_core::ReceivePacketChannel>()
-            .subscribe(fleck_core::ReceiveOrder::Dispatch, self, LocalDiscovery::packet);
-    }
-}
-
-impl LocalDiscovery {
-    fn packet(&self, api: &crate::Fleck, pkt: &mut FleckEvent<FleckPacket>) {
-        if let Some(discovery) = pkt.data.as_msg::<DiscoveryMsg>() {
-            log::trace!("received discovery message");
-
-            // this will automatically ignore self-messages because we already know the pubkey
-            api.with_service(|nodes: &fleck_core::Nodes| {
-                if nodes.node_by_pubkey(&discovery.pkey).is_none() {
-                    let node = fleck_core::Node::build_with_addr(pkt.data.addr.unwrap());
-                    node.set_pubkey(discovery.pkey);
-                    nodes.inform_of(node);
-                }
-            });
-        }
-    }
-
-    fn process_major_tick(&self, api: &crate::Fleck, _: &mut FleckEvent<()>) {
-        api.queue::<fleck_core::SendPacketChannel>(FleckPacket {
-            addr: Some((*crate::io::MULTICAST_ADDRESS).into()),
-            data: None,
-            io_channel: Some(api.raw_io().local().clone()),
-            node: None,
-            msg: Some(crate::msg::Message::build(DiscoveryMsg {
-                pkey: api.with_service(|nodes: &fleck_core::Nodes| nodes.self_node().pubkey().unwrap().to_owned())
-            })),
-        });
-    }
-}

+ 38 - 53
fleck/src/fleck_core/msg.rs

@@ -1,7 +1,9 @@
 use std::{
     any::Any,
+    cell::RefCell,
     collections::HashMap,
-    hash::{Hash, Hasher}, cell::RefCell, rc::Rc,
+    hash::{Hash, Hasher},
+    rc::Rc,
 };
 
 use serde::{de::DeserializeOwned, Deserialize, Serialize};
@@ -64,6 +66,12 @@ pub struct Message {
     pub(crate) peer: Option<super::peer::Peer>,
 }
 
+#[derive(Default)]
+pub struct Metadata {
+    pub peer: Option<fleck_core::peer::Peer>,
+    pub node: Option<fleck_core::node::Node>,
+}
+
 impl Message {
     pub fn build<M: 'static + MessageParams + Serialize>(from: M) -> Self {
         Self {
@@ -79,19 +87,23 @@ impl Message {
         }
     }
 
-    pub fn is_type<M: MessageParams + DeserializeOwned>(&self) -> bool {
-        self.content.ty == message_type::<M>()
+    fn metadata(&self) -> Metadata {
+        Metadata {
+            peer: self.peer.clone(),
+            node: None,
+        }
     }
 
-    pub fn downcast<M: MessageParams + DeserializeOwned>(&self) -> Option<&M> {
-        self.parsed.as_ref()?.downcast_ref()
+    pub fn with_peer(mut self, peer: fleck_core::peer::Peer) -> Self {
+        self.peer = Some(peer);
+        self
     }
 }
 
 pub struct MessageChannelTag;
-pub type MessageChannel<M> = (MessageChannelTag, M);
+pub type MessageChannel<M> = (MessageChannelTag, (Metadata, M));
 
-type Deserializer = dyn Fn(&[u8]) -> Option<Box<dyn std::any::Any>>;
+type Deserializer = dyn Fn(&crate::Fleck, &Message) -> ();
 #[derive(Default)]
 pub struct MessageService {
     deser: RefCell<HashMap<u16, Box<Deserializer>>>,
@@ -99,67 +111,40 @@ pub struct MessageService {
 
 impl Service for Rc<MessageService> {
     fn setup(&self, api: &crate::Fleck) {
-        api.channel::<fleck_core::ReceivePacketChannel>().sub_opt(fleck_core::ReceiveOrder::Parse, self, MessageService::parse);
+        api.channel::<fleck_core::ReceivePacketChannel>().sub_opt(
+            fleck_core::ReceiveOrder::Parse,
+            self,
+            MessageService::parse,
+        );
     }
 }
 
 impl MessageService {
     pub fn add_message_type<MT: std::fmt::Debug + MessageParams + Serialize + DeserializeOwned>(
-        &self, api: &crate::Fleck
+        &self,
+        api: &crate::Fleck,
     ) {
         let derived_typeid = message_type::<MT>();
         self.deser.borrow_mut().insert(
             derived_typeid,
-            Box::new(|data| Some(Box::new(bincode::deserialize::<MT>(data).ok()?))),
+            Box::new(
+                |api, message| match bincode::deserialize::<MT>(&message.content.data) {
+                    Ok(content) => {
+                        api.queue::<MessageChannel<MT>>((message.metadata(), content));
+                    }
+                    Err(_) => {
+                        log::trace!("Packet failed deserialization step?");
+                    }
+                },
+            ),
         );
 
         api.create_channel::<MessageChannel<MT>>();
     }
 
-    fn parse(&self, api: &crate::Fleck, mut msg: Message) -> Option<Message> {
+    fn parse(&self, api: &crate::Fleck, msg: Message) -> Option<Message> {
         // try deserializing
-        msg.parsed = Some(self.deserialize(&mut msg)?);
+        (self.deser.borrow().get(&msg.content.ty)?)(api, &msg);
         None
     }
-
-    fn deserialize(&self, message: &mut Message) -> Option<Box<dyn std::any::Any>> {
-        (self.deser.borrow().get(&message.content.ty)?)(&message.content.data)
-    }
-}
-
-#[derive(Default)]
-pub(crate) struct ParsePacket {}
-
-impl ParsePacket {}
-
-impl Service for std::rc::Rc<ParsePacket> {
-    fn setup(&self, api: &crate::Fleck) {
-        /*eroot
-        .channel::<fleck_core::ReceivePacketChannel>()
-        // .with::<order::Preprocessing>()
-        .subscribe(fleck_core::ReceiveOrder::Parse, self, ParsePacket::process_incoming);*/
-    }
-}
-
-impl ParsePacket {
-    /*fn process_incoming(&self, api: &crate::Fleck, event: &mut FleckEvent<FleckPacket>) {
-        // try deserializing
-        let message: Option<crate::msg::Message> =
-            bincode::deserialize(event.data.data.as_ref().unwrap()).ok();
-        if message.is_none() {
-            return;
-        }
-        let mut message = message.unwrap();
-        // get message registry from api and add parsed Message
-        message.parsed = api.services().message_registry().deserialize(&mut message);
-        // if the parse failed, stop processing this packet.
-        if message.parsed.is_none() {
-            event.emptied = true;
-            return;
-        }
-        // remove raw data from packet
-        event.data.data.take();
-
-        event.data.msg = Some(message);
-    }*/
 }

+ 19 - 9
fleck/src/fleck_core/nodes.rs → fleck/src/fleck_core/node.rs

@@ -1,24 +1,25 @@
 use rand::prelude::*;
 
-use std::{rc::Rc, collections::{HashMap, HashSet}, cell::{RefCell, Ref}};
+use std::{
+    cell::{Ref, RefCell},
+    collections::{HashMap, HashSet},
+    rc::Rc,
+};
 
 use crate::prelude::*;
 
 use fleck_core::crypto::{Keypair, PublicKey};
 
 pub struct InnerRef<'l, T> {
-    r: Ref<'l, Option<T>>
+    r: Ref<'l, Option<T>>,
 }
 
 impl<'l, T> InnerRef<'l, T> {
     fn build(from: Ref<'l, Option<T>>) -> Option<Self> {
         if from.is_none() {
             None
-        }
-        else {
-            Some(Self {
-                r: from
-            })
+        } else {
+            Some(Self { r: from })
         }
     }
 }
@@ -121,12 +122,21 @@ impl NodeService {
     }
 
     pub fn node_by_pubkey(&self, pubkey: &PublicKey) -> Option<Rc<Node>> {
-        self.all_nodes.borrow().iter().find(|n| n.pubkey().as_deref() == Some(pubkey)).map(|r| r.to_owned())
+        self.all_nodes
+            .borrow()
+            .iter()
+            .find(|n| n.pubkey().as_deref() == Some(pubkey))
+            .map(|r| r.to_owned())
     }
 
     // nodes we know the address of
     pub fn direct_neighbours(&self) -> Vec<Rc<Node>> {
-        self.all_nodes.borrow().iter().filter(|n| n.addr.is_some()).map(|r| r.to_owned()).collect()
+        self.all_nodes
+            .borrow()
+            .iter()
+            .filter(|n| n.addr.is_some())
+            .map(|r| r.to_owned())
+            .collect()
     }
 
     pub fn inform_of(&self, node: Node) {

+ 1 - 0
fleck/src/fleck_core/peer.rs

@@ -47,6 +47,7 @@ impl PeerService {
             log::error!("Message got to dispatch with no peer selected?");
             return None;
         }
+        log::trace!("dispatching message to peer");
         let peer = msg.peer.clone().unwrap();
         peer.data.io.dispatch(api, msg);
 

+ 40 - 0
fleck/src/service/event.rs

@@ -7,6 +7,7 @@ use std::{
 pub enum SubscriptionFunction<Host: 'static, Context: 'static + ?Sized, Data: 'static> {
     ByRef(Box<dyn Fn(&Host, &Context, &mut Data)>),
     ByValue(Box<dyn Fn(&Host, &Context, Data) -> Option<Data>>),
+    Consume(Box<dyn Fn(&Host, &Context, Data) -> ()>),
 }
 
 impl<Host: 'static, Context: 'static + ?Sized, Data: 'static>
@@ -19,6 +20,10 @@ impl<Host: 'static, Context: 'static + ?Sized, Data: 'static>
                 Some(data)
             }
             Self::ByValue(f) => f(host, context, data),
+            Self::Consume(f) => {
+                f(host, context, data);
+                None
+            }
         }
     }
 }
@@ -103,6 +108,23 @@ impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static>
 
         self.subs.borrow_mut().push((NoPriorityTag {}, sub));
     }
+
+    pub fn sub_eat<
+        Host: 'static,
+        HC: crate::helper::IntoWeak<Host>,
+        CB: Fn(&Host, &Context, Data) + 'static,
+    >(
+        &self,
+        who: HC,
+        cb: CB,
+    ) {
+        let sub = Rc::new(ConcreteEventSub {
+            host: who.as_weak(),
+            callback: SubscriptionFunction::Consume(Box::new(cb)),
+        });
+
+        self.subs.borrow_mut().push((NoPriorityTag {}, sub));
+    }
 }
 
 impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: PartialOrd>
@@ -149,6 +171,24 @@ impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: PartialOr
         // XXX: what happens if we actually get an undefined order?...
         subs.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
     }
+
+    pub fn sub_eat<
+        Host: 'static,
+        HC: crate::helper::IntoWeak<Host>,
+        CB: Fn(&Host, &Context, Data) + 'static,
+    >(
+        &self,
+        p: Priority,
+        who: HC,
+        cb: CB,
+    ) {
+        let sub = Rc::new(ConcreteEventSub {
+            host: who.as_weak(),
+            callback: SubscriptionFunction::Consume(Box::new(cb)),
+        });
+
+        self.subs.borrow_mut().push((p, sub));
+    }
 }
 
 impl<Tag: 'static, Context: 'static + ?Sized, Data: 'static, Priority: 'static>