ソースを参照

Add support for communicating across UNIX stream sockets.

Kestrel 2 年 前
コミット
483b8fd3b9

+ 142 - 4
Cargo.lock

@@ -52,7 +52,7 @@ version = "0.2.14"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
 dependencies = [
- "hermit-abi",
+ "hermit-abi 0.1.19",
  "libc",
  "winapi",
 ]
@@ -109,6 +109,43 @@ dependencies = [
  "inout",
 ]
 
+[[package]]
+name = "clap"
+version = "4.1.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c3d7ae14b20b94cb02149ed21a86c423859cbe18dc7ed69845cace50e52b40a5"
+dependencies = [
+ "bitflags",
+ "clap_derive",
+ "clap_lex",
+ "is-terminal",
+ "once_cell",
+ "strsim",
+ "termcolor",
+]
+
+[[package]]
+name = "clap_derive"
+version = "4.1.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "44bec8e5c9d09e439c4335b1af0abaab56dcf3b94999a936e1bb47b9134288f0"
+dependencies = [
+ "heck",
+ "proc-macro-error",
+ "proc-macro2 1.0.47",
+ "quote 1.0.21",
+ "syn 1.0.103",
+]
+
+[[package]]
+name = "clap_lex"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "350b9cf31731f9957399229e9b2adc51eeabdfbe9d71d9a0552275fd12710d09"
+dependencies = [
+ "os_str_bytes",
+]
+
 [[package]]
 name = "cpufeatures"
 version = "0.2.5"
@@ -237,10 +274,12 @@ version = "0.1.0"
 dependencies = [
  "aes-gcm",
  "bincode",
+ "clap",
  "ed25519-dalek",
  "lazy_static",
  "log",
  "mio",
+ "once_cell",
  "pretty_env_logger",
  "rand",
  "rudp",
@@ -292,6 +331,12 @@ dependencies = [
  "polyval",
 ]
 
+[[package]]
+name = "heck"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
+
 [[package]]
 name = "hermit-abi"
 version = "0.1.19"
@@ -301,6 +346,15 @@ dependencies = [
  "libc",
 ]
 
+[[package]]
+name = "hermit-abi"
+version = "0.2.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7"
+dependencies = [
+ "libc",
+]
+
 [[package]]
 name = "humantime"
 version = "1.3.0"
@@ -325,6 +379,28 @@ version = "0.6.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9448015e586b611e5d322f6703812bbca2f1e709d5773ecd38ddb4e3bb649504"
 
+[[package]]
+name = "io-lifetimes"
+version = "1.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e7d6c6f8c91b4b9ed43484ad1a938e393caf35960fce7f82a040497207bd8e9e"
+dependencies = [
+ "libc",
+ "windows-sys",
+]
+
+[[package]]
+name = "is-terminal"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "28dfb6c8100ccc63462345b67d1bbc3679177c75ee4bf59bf29c8b1d110b8189"
+dependencies = [
+ "hermit-abi 0.2.6",
+ "io-lifetimes 1.0.4",
+ "rustix 0.36.7",
+ "windows-sys",
+]
+
 [[package]]
 name = "lazy_static"
 version = "1.4.0"
@@ -343,6 +419,12 @@ version = "0.0.46"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d4d2456c373231a208ad294c33dc5bff30051eafd954cd4caae83a712b12854d"
 
+[[package]]
+name = "linux-raw-sys"
+version = "0.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4"
+
 [[package]]
 name = "log"
 version = "0.4.17"
@@ -377,12 +459,24 @@ dependencies = [
  "windows-sys",
 ]
 
+[[package]]
+name = "once_cell"
+version = "1.17.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3"
+
 [[package]]
 name = "opaque-debug"
 version = "0.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
 
+[[package]]
+name = "os_str_bytes"
+version = "6.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee"
+
 [[package]]
 name = "polyval"
 version = "0.6.0"
@@ -411,6 +505,30 @@ dependencies = [
  "log",
 ]
 
+[[package]]
+name = "proc-macro-error"
+version = "1.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
+dependencies = [
+ "proc-macro-error-attr",
+ "proc-macro2 1.0.47",
+ "quote 1.0.21",
+ "syn 1.0.103",
+ "version_check",
+]
+
+[[package]]
+name = "proc-macro-error-attr"
+version = "1.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
+dependencies = [
+ "proc-macro2 1.0.47",
+ "quote 1.0.21",
+ "version_check",
+]
+
 [[package]]
 name = "proc-macro2"
 version = "0.4.30"
@@ -538,12 +656,26 @@ checksum = "2079c267b8394eb529872c3cf92e181c378b41fea36e68130357b52493701d2e"
 dependencies = [
  "bitflags",
  "errno",
- "io-lifetimes",
+ "io-lifetimes 0.6.1",
  "libc",
- "linux-raw-sys",
+ "linux-raw-sys 0.0.46",
  "winapi",
 ]
 
+[[package]]
+name = "rustix"
+version = "0.36.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d4fdebc4b395b7fbb9ab11e462e20ed9051e7b16e42d24042c776eca0ac81b03"
+dependencies = [
+ "bitflags",
+ "errno",
+ "io-lifetimes 1.0.4",
+ "libc",
+ "linux-raw-sys 0.1.4",
+ "windows-sys",
+]
+
 [[package]]
 name = "serde"
 version = "1.0.147"
@@ -592,6 +724,12 @@ version = "1.6.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c"
 
+[[package]]
+name = "strsim"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
+
 [[package]]
 name = "subtle"
 version = "2.4.1"
@@ -647,7 +785,7 @@ version = "1.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "29f85a7c965b8e7136952f59f2a359694c78f105b2d2ff99cf6c2c404bf7e33f"
 dependencies = [
- "rustix",
+ "rustix 0.34.8",
 ]
 
 [[package]]

+ 2 - 0
fleck/Cargo.toml

@@ -15,6 +15,7 @@ lazy_static = "1.4.0"
 mio = { version = "0.8.5", features = ["os-poll", "os-ext", "net"] }
 rudp = "0.2.1"
 topological-sort = "0.2.2"
+once_cell = "1.17.1"
 
 # linux dependencies
 timerfd = "1.3.0"
@@ -27,3 +28,4 @@ rand = { version = "0.7", features = ["getrandom"] }
 
 [dev-dependencies]
 pretty_env_logger = "0.4.0"
+clap = { version = "4.1.8", features = ["derive"] }

+ 52 - 0
fleck/examples/unix_socket.rs

@@ -0,0 +1,52 @@
+use fleck::fleck_core;
+
+use clap::Parser;
+
+#[derive(Parser, Debug)]
+struct SocketArgs {
+    #[arg(short, long)]
+    server: bool,
+
+    filename: String
+}
+
+fn main() {
+    pretty_env_logger::init_timed();
+
+    let args = SocketArgs::parse();
+
+    let fleck = fleck::API::new();
+
+    // generate ephemeral keypair
+    fleck.with_service(|ns: &fleck_core::NodeService| {
+        ns.build_ephemeral_self_node();
+    });
+
+    let sockref = fleck.with_service(|io: &fleck_core::io::IOService| {
+        let builder = fleck_core::io::UnixSocketBuilder::default()
+            .set_path(args.filename);
+        if args.server {
+            builder.serve_mode().build(io)
+        }
+        else {
+            builder.connect_mode().build(io)
+        }
+    });
+
+    if !args.server {
+        fleck.with_service(|ds: &fleck_core::discovery::PeerDiscovery| {
+            ds.new_peer(sockref.connected_peer());
+        });
+    }
+
+    // enable local discovery
+    /*fleck.with_service(|ld: &fleck_core::discovery::LocalDiscovery| {
+        for peer in sockref.multicast_peers() {
+            ld.add_multicast_peer(peer);
+        }
+    });*/
+
+    // run and perform all automatic services, but don't do anything interesting...
+    fleck.run();
+}
+

+ 6 - 5
fleck/src/fleck_core/discovery.rs

@@ -127,18 +127,19 @@ impl PeerDiscovery {
         Some(msg)
     }
 
-    fn new_peer(&self, peer: fleck_core::Peer) {
+    pub fn new_peer(&self, peer: fleck_core::Peer) {
         let mut known = self.known.borrow_mut();
-        known.insert(peer);
+        known.insert(peer.clone());
+
+        log::trace!("given new peer!");
 
         self.api.queue::<fleck_core::SendPacketChannel>(
-            DiscoveryMsg {
+            fleck_core::msg::Message::build(DiscoveryMsg {
                 pkey: self
                     .api
                     .with_service(|ns: &fleck_core::NodeService| *ns.self_node().pubkey()),
                 ttl: 2,
-            }
-            .into(),
+            }).with_peer(peer)
         );
     }
 }

+ 32 - 10
fleck/src/fleck_core/io.rs

@@ -7,10 +7,12 @@ use std::{
 
 use crate::prelude::*;
 
-mod udp;
-pub use udp::UdpSocketBuilder;
 mod timerfd;
 pub use self::timerfd::TimerFdBuilder;
+mod udp;
+pub use udp::UdpSocketBuilder;
+mod unix;
+pub use unix::UnixSocketBuilder;
 
 pub struct InterestRegistration {
     poll: Rc<RefCell<mio::Poll>>,
@@ -33,11 +35,15 @@ impl InterestRegistration {
 }
 
 pub trait FdHandler {
+    fn interest(&self) -> &InterestRegistration;
     fn ready_read(&self, rc: &Rc<dyn FdHandler>);
     fn ready_write(&self, rc: &Rc<dyn FdHandler>);
     fn dispatch(&self, msg: super::msg::Message);
 }
 
+pub struct HandlerTag;
+pub type NewHandlerChannel = (HandlerTag, Rc<dyn FdHandler>);
+
 pub struct IOService {
     api: Rc<crate::API>,
     poll: Rc<RefCell<mio::Poll>>,
@@ -59,12 +65,14 @@ impl Service for IOService {
     fn setup(self: &Rc<Self>) {
         self.api.create_channel::<super::ReceivePacketChannel>();
         self.api.create_channel::<super::SendPacketChannel>();
+        self.api.create_channel::<NewHandlerChannel>();
+        self.api.channel::<NewHandlerChannel>().sub_eat(self, &Self::register_handler);
     }
 }
 
 impl IOService {
-    pub fn register_handler(&self, reg: &InterestRegistration, handler: Rc<dyn FdHandler>) {
-        self.handlers.borrow_mut().insert(reg.token, handler);
+    pub fn register_handler(&self, handler: Rc<dyn FdHandler>) {
+        self.handlers.borrow_mut().insert(handler.interest().token, handler);
     }
 
     pub fn register_interest(&self, fd: RawFd) -> InterestRegistration {
@@ -89,7 +97,14 @@ impl IOService {
         }
     }
 
+    pub fn unregister_interest(&self, reg: InterestRegistration) {
+        reg.poll.borrow().registry().deregister(&mut mio::unix::SourceFd(&reg.fd)).expect("Couldn't deregister file descriptor?");
+    }
+
     pub(crate) fn run(&self) {
+        // flush any events generated during startup
+        self.api.services.borrow().event_root().fire_all();
+
         let mut events = mio::Events::with_capacity(128);
         loop {
             let pr = self.poll.borrow_mut().poll(&mut events, None);
@@ -98,14 +113,21 @@ impl IOService {
             }
             let handlers = self.handlers.borrow();
             for evt in &events {
-                let h = handlers.get(&evt.token()).unwrap();
-                if evt.is_readable() {
-                    h.ready_read(h);
-                }
-                if evt.is_writable() {
-                    h.ready_write(h);
+                match handlers.get(&evt.token()) {
+                    Some(h) => {
+                        if evt.is_readable() {
+                            h.ready_read(h);
+                        }
+                        if evt.is_writable() {
+                            h.ready_write(h);
+                        }
+                    }
+                    None => {
+                        log::trace!("Received event with valid token but no handler!");
+                    }
                 }
             }
+            drop(handlers);
 
             self.api.services.borrow().event_root().fire_all();
         }

+ 4 - 1
fleck/src/fleck_core/io/timerfd.rs

@@ -39,7 +39,7 @@ impl TimerFdBuilder {
             trigger: self.trigger.expect("building timerfd with no trigger?"),
         });
 
-        io.register_handler(&sock.interest, sock.clone());
+        io.api.queue::<super::NewHandlerChannel>(sock.clone());
     }
 }
 
@@ -50,6 +50,9 @@ struct TimerFd {
 }
 
 impl FdHandler for TimerFd {
+    fn interest(&self) -> &InterestRegistration {
+        &self.interest
+    }
     fn ready_read(&self, _rc: &Rc<dyn FdHandler>) {
         self.timer.read();
         (self.trigger)();

+ 6 - 2
fleck/src/fleck_core/io/udp.rs

@@ -3,7 +3,7 @@ use fleck_core::peer::{Peer, PeerAddress, PeerData};
 use std::{
     cell::RefCell,
     collections::{HashMap, VecDeque},
-    os::unix::prelude::{AsRawFd, RawFd},
+    os::unix::prelude::AsRawFd,
     rc::Rc,
 };
 
@@ -54,7 +54,7 @@ impl UdpSocketBuilder {
             multicast_joined: self.multicast.iter().map(|e| e.0).collect(),
         });
 
-        io.register_handler(&sock.interest, sock.clone());
+        io.api.queue::<super::NewHandlerChannel>(sock.clone());
 
         UdpSocketRef { sock }
     }
@@ -104,6 +104,10 @@ impl UdpSocket {
 }
 
 impl FdHandler for UdpSocket {
+    fn interest(&self) -> &InterestRegistration {
+        &self.interest
+    }
+
     fn ready_read(&self, rc: &Rc<dyn FdHandler>) {
         let mut buf = [0u8; 2048];
         while let Ok((bytes, addr)) = self.socket.recv_from(&mut buf) {

+ 227 - 0
fleck/src/fleck_core/io/unix.rs

@@ -0,0 +1,227 @@
+use std::{os::fd::AsRawFd, rc::Rc, cell::RefCell, collections::{VecDeque, HashMap}, io::{Read, Write}};
+use super::fleck_core;
+
+use mio::net::{UnixDatagram, UnixListener, UnixStream};
+
+use super::{IOService, InterestRegistration, FdHandler};
+
+#[derive(Default)]
+pub struct UnixSocketBuilder {
+    path: Option<String>,
+    connect: Option<bool>,
+}
+
+impl UnixSocketBuilder {
+    pub fn set_path<S: Into<String>>(mut self, path: S) -> Self {
+        self.path = Some(path.into());
+        self
+    }
+
+    pub fn connect_mode(mut self) -> Self {
+        self.connect = Some(true);
+        self
+    }
+
+    pub fn serve_mode(mut self) -> Self {
+        self.connect = Some(false);
+        self
+    }
+
+    pub fn build(self, io: &IOService) -> UnixSocketRef {
+        let sock = match self.connect.unwrap() {
+            true => UnixSocket::Client(UnixClientSocket::connect(io, self.path.unwrap())),
+            false => UnixSocket::Server(UnixServerSocket::build(io, self.path.unwrap())),
+        };
+
+        UnixSocketRef {
+            sock
+        }
+    }
+}
+
+pub struct UnixSocketRef {
+    sock: UnixSocket,
+}
+
+impl UnixSocketRef {
+    pub fn connected_peer(&self) -> fleck_core::Peer {
+        match &self.sock {
+            UnixSocket::Client(client) => {
+                client.peer.get().unwrap().clone()
+            },
+            UnixSocket::Server(_) => {
+                panic!("Can't get the connected peer for a UNIX server socket!")
+            }
+        }
+    }
+}
+
+enum UnixSocket {
+    Client(Rc<UnixClientSocket>),
+    Server(Rc<UnixServerSocket>),
+}
+
+pub struct UnixClientSocket {
+    api: std::rc::Rc<crate::API>,
+    interest: InterestRegistration,
+    socket: RefCell<mio::net::UnixStream>,
+    peer: once_cell::unsync::OnceCell<fleck_core::Peer>,
+    send_queue: RefCell<VecDeque<u8>>,
+    recv_queue: RefCell<VecDeque<u8>>,
+}
+
+impl UnixClientSocket {
+    fn connect(io: &IOService, path: String) -> Rc<Self> {
+        let socket = UnixStream::connect(path).expect("couldn't connect to socket?");
+        Self::build(io, socket)
+    }
+
+    fn build(io: &IOService, socket: UnixStream) -> Rc<Self> {
+        let interest = io.register_interest(socket.as_raw_fd());
+
+        let sock = Rc::new(Self {
+            api: io.api.clone(),
+            interest,
+            socket: RefCell::new(socket),
+            peer: Default::default(),
+            recv_queue: Default::default(),
+            send_queue: Default::default(),
+        });
+
+        let peer = fleck_core::Peer {
+            data: Rc::new(fleck_core::peer::PeerData::new(sock.clone(), fleck_core::peer::PeerAddress::Stream)),
+        };
+
+        sock.peer.set(peer).expect("somehow couldn't initialize the once_cell?");
+
+        io.api.queue::<super::NewHandlerChannel>(sock.clone());
+
+        sock
+        
+    }
+}
+
+impl FdHandler for UnixClientSocket {
+    fn interest(&self) -> &InterestRegistration {
+        &self.interest
+    }
+
+    fn ready_read(&self, _rc: &Rc<dyn FdHandler>) {
+        // read all available data
+        let mut buf = [0u8; 2048];
+        let mut rq = self.recv_queue.borrow_mut();
+        while let Ok(bytes) = self.socket.borrow_mut().read(&mut buf) {
+            rq.extend(&buf[0..bytes]);
+        }
+
+        // parse as many packets as possible
+        while rq.len() >= 2 {
+            let data = rq.make_contiguous();
+            let packet_len = u16::from_be_bytes(data[0..2].try_into().unwrap()) as usize;
+            if data.len() < (packet_len + 2) {
+                break
+            }
+            if let Ok(mut msg) = bincode::deserialize::<fleck_core::msg::Message>(&data[2..(packet_len + 2)]) {
+                msg.peer = self.peer.get().cloned();
+                self.api.queue::<fleck_core::ReceivePacketChannel>(msg);
+            }
+            rq.drain(0..(packet_len + 2));
+        }
+    }
+
+    fn ready_write(&self, _rc: &Rc<dyn FdHandler>) {
+        let mut sq = self.send_queue.borrow_mut();
+
+        while sq.len() > 0 {
+            let data = sq.make_contiguous();
+            if let Ok(bytes) = self.socket.borrow_mut().write(&data) {
+                sq.drain(0..bytes);
+            }
+            else {
+                return
+            }
+        }
+
+        self.interest.update_interest(mio::Interest::READABLE);
+    }
+
+    fn dispatch(&self, msg: crate::fleck_core::msg::Message) {
+        let mut sq = self.send_queue.borrow_mut();
+        let data = bincode::serialize(&msg).expect("couldn't serialize message?");
+        sq.extend(u16::to_be_bytes(data.len() as u16));
+        sq.extend(data);
+        self.interest
+            .update_interest(mio::Interest::READABLE.add(mio::Interest::WRITABLE));
+    }
+}
+
+pub struct UnixServerSocket {
+    api: std::rc::Rc<crate::API>,
+    interest: InterestRegistration,
+    socket: UnixListener,
+}
+
+impl UnixServerSocket {
+    fn build(io: &IOService, path: String) -> Rc<Self> {
+
+        match std::fs::remove_file(&path) {
+            Ok(_) => (),
+            Err(e) => match e.kind() {
+                std::io::ErrorKind::PermissionDenied => {
+                    panic!("couldn't remove existing socket file!")
+                },
+                std::io::ErrorKind::NotFound => {
+                    // this is perfectly okay!
+                },
+                k => { panic!("Unexpected error removing socket file: {:?}", k) }
+            }
+        }
+
+        let socket = UnixListener::bind(path).expect("couldn't bind to given path");
+        let interest = io.register_interest(socket.as_raw_fd());
+
+        let sock = Rc::new(Self {
+            api: io.api.clone(),
+            interest,
+            socket,
+        });
+
+        io.register_handler(sock.clone());
+
+        sock
+    }
+
+
+}
+
+impl FdHandler for UnixServerSocket {
+    fn interest(&self) -> &InterestRegistration {
+        &self.interest
+    }
+
+    fn ready_read(&self, _rc: &Rc<dyn FdHandler>) {
+        loop {
+            let client = self.socket.accept();
+            if let Err(err) = client {
+                if err.kind() == std::io::ErrorKind::WouldBlock {
+                    break;
+                }
+                else {
+                    panic!("Unexpected error occurred while accepting new UNIX socket client");
+                }
+            }
+
+            self.api.with_service(|io: &IOService| {
+                UnixClientSocket::build(io, client.unwrap().0);
+            });
+        }
+    }
+
+    fn ready_write(&self, _rc: &Rc<dyn FdHandler>) {
+        unreachable!();
+    }
+
+    fn dispatch(&self, _msg: fleck_core::msg::Message) {
+        unreachable!();
+    }
+}