use super::fleck_core; use std::{ cell::RefCell, collections::VecDeque, io::{Read, Write}, os::fd::AsRawFd, rc::Rc, }; use mio::net::{UnixListener, UnixStream}; use super::{FdHandler, IOService, InterestRegistration}; #[derive(Default)] pub struct UnixSocketBuilder { path: Option, connect: Option, } impl UnixSocketBuilder { pub fn set_path>(mut self, path: S) -> Self { self.path = Some(path.into()); self } pub fn connect_mode(mut self) -> Self { self.connect = Some(true); self } pub fn serve_mode(mut self) -> Self { self.connect = Some(false); self } pub fn build(self, io: &IOService) -> UnixSocketRef { let sock = match self.connect.unwrap() { true => UnixSocket::Client(UnixClientSocket::connect(io, self.path.unwrap())), false => UnixSocket::Server(UnixServerSocket::build(io, self.path.unwrap())), }; UnixSocketRef { sock } } } pub struct UnixSocketRef { sock: UnixSocket, } impl UnixSocketRef { pub fn connected_peer(&self) -> fleck_core::Peer { match &self.sock { UnixSocket::Client(client) => client.peer.get().unwrap().clone(), UnixSocket::Server(_) => { panic!("Can't get the connected peer for a UNIX server socket!") }, } } } enum UnixSocket { Client(Rc), Server(Rc), } pub struct UnixClientSocket { api: std::rc::Rc, interest: InterestRegistration, socket: RefCell, peer: once_cell::unsync::OnceCell, send_queue: RefCell>, recv_queue: RefCell>, } impl UnixClientSocket { fn connect(io: &IOService, path: String) -> Rc { let socket = UnixStream::connect(path).expect("couldn't connect to socket?"); Self::build(io, socket) } fn build(io: &IOService, socket: UnixStream) -> Rc { let interest = io.register_interest(socket.as_raw_fd()); let sock = Rc::new(Self { api: io.api.clone(), interest, socket: RefCell::new(socket), peer: Default::default(), recv_queue: Default::default(), send_queue: Default::default(), }); let peer = fleck_core::Peer { data: Rc::new(fleck_core::peer::PeerData::new( sock.clone(), fleck_core::peer::PeerAddress::Stream, )), }; sock.peer .set(peer) .expect("somehow couldn't initialize the once_cell?"); io.api.queue::(sock.clone()); sock } } impl FdHandler for UnixClientSocket { fn interest(&self) -> &InterestRegistration { &self.interest } fn ready_read(&self, _rc: &Rc) { // read all available data let mut buf = [0u8; 2048]; let mut rq = self.recv_queue.borrow_mut(); while let Ok(bytes) = self.socket.borrow_mut().read(&mut buf) { rq.extend(&buf[0..bytes]); } // parse as many packets as possible while rq.len() >= 2 { let data = rq.make_contiguous(); let packet_len = u16::from_be_bytes(data[0..2].try_into().unwrap()) as usize; if data.len() < (packet_len + 2) { break; } if let Ok(mut msg) = bincode::deserialize::(&data[2..(packet_len + 2)]) { msg.peer = self.peer.get().cloned(); self.api.queue::(msg); } rq.drain(0..(packet_len + 2)); } } fn ready_write(&self, _rc: &Rc) { let mut sq = self.send_queue.borrow_mut(); while sq.len() > 0 { let data = sq.make_contiguous(); if let Ok(bytes) = self.socket.borrow_mut().write(data) { sq.drain(0..bytes); } else { return; } } self.interest.update_interest(mio::Interest::READABLE); } fn dispatch(&self, msg: crate::fleck_core::msg::Message) { let mut sq = self.send_queue.borrow_mut(); let data = bincode::serialize(&msg).expect("couldn't serialize message?"); sq.extend(u16::to_be_bytes(data.len() as u16)); sq.extend(data); self.interest .update_interest(mio::Interest::READABLE.add(mio::Interest::WRITABLE)); } } pub struct UnixServerSocket { api: std::rc::Rc, interest: InterestRegistration, socket: UnixListener, } impl UnixServerSocket { fn build(io: &IOService, path: String) -> Rc { match std::fs::remove_file(&path) { Ok(_) => (), Err(e) => match e.kind() { std::io::ErrorKind::PermissionDenied => { panic!("couldn't remove existing socket file!") }, std::io::ErrorKind::NotFound => { // this is perfectly okay! }, k => { panic!("Unexpected error removing socket file: {:?}", k) }, }, } let socket = UnixListener::bind(path).expect("couldn't bind to given path"); let interest = io.register_interest(socket.as_raw_fd()); let sock = Rc::new(Self { api: io.api.clone(), interest, socket, }); io.register_handler(sock.clone()); sock } } impl FdHandler for UnixServerSocket { fn interest(&self) -> &InterestRegistration { &self.interest } fn ready_read(&self, _rc: &Rc) { loop { let client = self.socket.accept(); if let Err(err) = client { if err.kind() == std::io::ErrorKind::WouldBlock { break; } else { panic!("Unexpected error occurred while accepting new UNIX socket client"); } } self.api.with_service(|io: &IOService| { UnixClientSocket::build(io, client.unwrap().0); }); } } fn ready_write(&self, _rc: &Rc) { unreachable!(); } fn dispatch(&self, _msg: fleck_core::msg::Message) { unreachable!(); } }