123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- use super::fleck_core;
- use std::{
- cell::RefCell,
- collections::VecDeque,
- io::{Read, Write},
- os::fd::AsRawFd,
- rc::Rc,
- };
- use mio::net::{UnixListener, UnixStream};
- use super::{FdHandler, IOService, InterestRegistration};
- #[derive(Default)]
- pub struct UnixSocketBuilder {
- path: Option<String>,
- connect: Option<bool>,
- }
- impl UnixSocketBuilder {
- pub fn set_path<S: Into<String>>(mut self, path: S) -> Self {
- self.path = Some(path.into());
- self
- }
- pub fn connect_mode(mut self) -> Self {
- self.connect = Some(true);
- self
- }
- pub fn serve_mode(mut self) -> Self {
- self.connect = Some(false);
- self
- }
- pub fn build(self, io: &IOService) -> UnixSocketRef {
- let sock = match self.connect.unwrap() {
- true => UnixSocket::Client(UnixClientSocket::connect(io, self.path.unwrap())),
- false => UnixSocket::Server(UnixServerSocket::build(io, self.path.unwrap())),
- };
- UnixSocketRef { sock }
- }
- }
- pub struct UnixSocketRef {
- sock: UnixSocket,
- }
- impl UnixSocketRef {
- pub fn connected_peer(&self) -> fleck_core::Peer {
- match &self.sock {
- UnixSocket::Client(client) => client.peer.get().unwrap().clone(),
- UnixSocket::Server(_) => {
- panic!("Can't get the connected peer for a UNIX server socket!")
- },
- }
- }
- }
- enum UnixSocket {
- Client(Rc<UnixClientSocket>),
- Server(Rc<UnixServerSocket>),
- }
- pub struct UnixClientSocket {
- api: std::rc::Rc<crate::API>,
- interest: InterestRegistration,
- socket: RefCell<mio::net::UnixStream>,
- peer: once_cell::unsync::OnceCell<fleck_core::Peer>,
- send_queue: RefCell<VecDeque<u8>>,
- recv_queue: RefCell<VecDeque<u8>>,
- }
- impl UnixClientSocket {
- fn connect(io: &IOService, path: String) -> Rc<Self> {
- let socket = UnixStream::connect(path).expect("couldn't connect to socket?");
- Self::build(io, socket)
- }
- fn build(io: &IOService, socket: UnixStream) -> Rc<Self> {
- let interest = io.register_interest(socket.as_raw_fd());
- let sock = Rc::new(Self {
- api: io.api.clone(),
- interest,
- socket: RefCell::new(socket),
- peer: Default::default(),
- recv_queue: Default::default(),
- send_queue: Default::default(),
- });
- let peer = fleck_core::Peer {
- data: Rc::new(fleck_core::peer::PeerData::new(
- sock.clone(),
- fleck_core::peer::PeerAddress::Stream,
- )),
- };
- sock.peer
- .set(peer)
- .expect("somehow couldn't initialize the once_cell?");
- io.api.queue::<super::NewHandlerChannel>(sock.clone());
- sock
- }
- }
- impl FdHandler for UnixClientSocket {
- fn interest(&self) -> &InterestRegistration {
- &self.interest
- }
- fn ready_read(&self, _rc: &Rc<dyn FdHandler>) {
- // read all available data
- let mut buf = [0u8; 2048];
- let mut rq = self.recv_queue.borrow_mut();
- while let Ok(bytes) = self.socket.borrow_mut().read(&mut buf) {
- rq.extend(&buf[0..bytes]);
- }
- // parse as many packets as possible
- while rq.len() >= 2 {
- let data = rq.make_contiguous();
- let packet_len = u16::from_be_bytes(data[0..2].try_into().unwrap()) as usize;
- if data.len() < (packet_len + 2) {
- break;
- }
- if let Ok(mut msg) =
- bincode::deserialize::<fleck_core::msg::Message>(&data[2..(packet_len + 2)])
- {
- msg.peer = self.peer.get().cloned();
- self.api.queue::<fleck_core::ReceivePacketChannel>(msg);
- }
- rq.drain(0..(packet_len + 2));
- }
- }
- fn ready_write(&self, _rc: &Rc<dyn FdHandler>) {
- let mut sq = self.send_queue.borrow_mut();
- while sq.len() > 0 {
- let data = sq.make_contiguous();
- if let Ok(bytes) = self.socket.borrow_mut().write(data) {
- sq.drain(0..bytes);
- } else {
- return;
- }
- }
- self.interest.update_interest(mio::Interest::READABLE);
- }
- fn dispatch(&self, msg: crate::fleck_core::msg::Message) {
- let mut sq = self.send_queue.borrow_mut();
- let data = bincode::serialize(&msg).expect("couldn't serialize message?");
- sq.extend(u16::to_be_bytes(data.len() as u16));
- sq.extend(data);
- self.interest
- .update_interest(mio::Interest::READABLE.add(mio::Interest::WRITABLE));
- }
- }
- pub struct UnixServerSocket {
- api: std::rc::Rc<crate::API>,
- interest: InterestRegistration,
- socket: UnixListener,
- }
- impl UnixServerSocket {
- fn build(io: &IOService, path: String) -> Rc<Self> {
- match std::fs::remove_file(&path) {
- Ok(_) => (),
- Err(e) => match e.kind() {
- std::io::ErrorKind::PermissionDenied => {
- panic!("couldn't remove existing socket file!")
- },
- std::io::ErrorKind::NotFound => {
- // this is perfectly okay!
- },
- k => {
- panic!("Unexpected error removing socket file: {:?}", k)
- },
- },
- }
- let socket = UnixListener::bind(path).expect("couldn't bind to given path");
- let interest = io.register_interest(socket.as_raw_fd());
- let sock = Rc::new(Self {
- api: io.api.clone(),
- interest,
- socket,
- });
- io.register_handler(sock.clone());
- sock
- }
- }
- impl FdHandler for UnixServerSocket {
- fn interest(&self) -> &InterestRegistration {
- &self.interest
- }
- fn ready_read(&self, _rc: &Rc<dyn FdHandler>) {
- loop {
- let client = self.socket.accept();
- if let Err(err) = client {
- if err.kind() == std::io::ErrorKind::WouldBlock {
- break;
- } else {
- panic!("Unexpected error occurred while accepting new UNIX socket client");
- }
- }
- self.api.with_service(|io: &IOService| {
- UnixClientSocket::build(io, client.unwrap().0);
- });
- }
- }
- fn ready_write(&self, _rc: &Rc<dyn FdHandler>) {
- unreachable!();
- }
- fn dispatch(&self, _msg: fleck_core::msg::Message) {
- unreachable!();
- }
- }
|