unix.rs 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. use super::fleck_core;
  2. use std::{
  3. cell::RefCell,
  4. collections::VecDeque,
  5. io::{Read, Write},
  6. os::fd::AsRawFd,
  7. rc::Rc,
  8. };
  9. use mio::net::{UnixListener, UnixStream};
  10. use super::{FdHandler, IOService, InterestRegistration};
  11. #[derive(Default)]
  12. pub struct UnixSocketBuilder {
  13. path: Option<String>,
  14. connect: Option<bool>,
  15. }
  16. impl UnixSocketBuilder {
  17. pub fn set_path<S: Into<String>>(mut self, path: S) -> Self {
  18. self.path = Some(path.into());
  19. self
  20. }
  21. pub fn connect_mode(mut self) -> Self {
  22. self.connect = Some(true);
  23. self
  24. }
  25. pub fn serve_mode(mut self) -> Self {
  26. self.connect = Some(false);
  27. self
  28. }
  29. pub fn build(self, io: &IOService) -> UnixSocketRef {
  30. let sock = match self.connect.unwrap() {
  31. true => UnixSocket::Client(UnixClientSocket::connect(io, self.path.unwrap())),
  32. false => UnixSocket::Server(UnixServerSocket::build(io, self.path.unwrap())),
  33. };
  34. UnixSocketRef { sock }
  35. }
  36. }
  37. pub struct UnixSocketRef {
  38. sock: UnixSocket,
  39. }
  40. impl UnixSocketRef {
  41. pub fn connected_peer(&self) -> fleck_core::Peer {
  42. match &self.sock {
  43. UnixSocket::Client(client) => client.peer.get().unwrap().clone(),
  44. UnixSocket::Server(_) => {
  45. panic!("Can't get the connected peer for a UNIX server socket!")
  46. },
  47. }
  48. }
  49. }
  50. enum UnixSocket {
  51. Client(Rc<UnixClientSocket>),
  52. Server(Rc<UnixServerSocket>),
  53. }
  54. pub struct UnixClientSocket {
  55. api: std::rc::Rc<crate::API>,
  56. interest: InterestRegistration,
  57. socket: RefCell<mio::net::UnixStream>,
  58. peer: once_cell::unsync::OnceCell<fleck_core::Peer>,
  59. send_queue: RefCell<VecDeque<u8>>,
  60. recv_queue: RefCell<VecDeque<u8>>,
  61. }
  62. impl UnixClientSocket {
  63. fn connect(io: &IOService, path: String) -> Rc<Self> {
  64. let socket = UnixStream::connect(path).expect("couldn't connect to socket?");
  65. Self::build(io, socket)
  66. }
  67. fn build(io: &IOService, socket: UnixStream) -> Rc<Self> {
  68. let interest = io.register_interest(socket.as_raw_fd());
  69. let sock = Rc::new(Self {
  70. api: io.api.clone(),
  71. interest,
  72. socket: RefCell::new(socket),
  73. peer: Default::default(),
  74. recv_queue: Default::default(),
  75. send_queue: Default::default(),
  76. });
  77. let peer = fleck_core::Peer {
  78. data: Rc::new(fleck_core::peer::PeerData::new(
  79. sock.clone(),
  80. fleck_core::peer::PeerAddress::Stream,
  81. )),
  82. };
  83. sock.peer
  84. .set(peer)
  85. .expect("somehow couldn't initialize the once_cell?");
  86. io.api.queue::<super::NewHandlerChannel>(sock.clone());
  87. sock
  88. }
  89. }
  90. impl FdHandler for UnixClientSocket {
  91. fn interest(&self) -> &InterestRegistration {
  92. &self.interest
  93. }
  94. fn ready_read(&self, _rc: &Rc<dyn FdHandler>) {
  95. // read all available data
  96. let mut buf = [0u8; 2048];
  97. let mut rq = self.recv_queue.borrow_mut();
  98. while let Ok(bytes) = self.socket.borrow_mut().read(&mut buf) {
  99. rq.extend(&buf[0..bytes]);
  100. }
  101. // parse as many packets as possible
  102. while rq.len() >= 2 {
  103. let data = rq.make_contiguous();
  104. let packet_len = u16::from_be_bytes(data[0..2].try_into().unwrap()) as usize;
  105. if data.len() < (packet_len + 2) {
  106. break;
  107. }
  108. if let Ok(mut msg) =
  109. bincode::deserialize::<fleck_core::msg::Message>(&data[2..(packet_len + 2)])
  110. {
  111. msg.peer = self.peer.get().cloned();
  112. self.api.queue::<fleck_core::ReceivePacketChannel>(msg);
  113. }
  114. rq.drain(0..(packet_len + 2));
  115. }
  116. }
  117. fn ready_write(&self, _rc: &Rc<dyn FdHandler>) {
  118. let mut sq = self.send_queue.borrow_mut();
  119. while sq.len() > 0 {
  120. let data = sq.make_contiguous();
  121. if let Ok(bytes) = self.socket.borrow_mut().write(data) {
  122. sq.drain(0..bytes);
  123. } else {
  124. return;
  125. }
  126. }
  127. self.interest.update_interest(mio::Interest::READABLE);
  128. }
  129. fn dispatch(&self, msg: crate::fleck_core::msg::Message) {
  130. let mut sq = self.send_queue.borrow_mut();
  131. let data = bincode::serialize(&msg).expect("couldn't serialize message?");
  132. sq.extend(u16::to_be_bytes(data.len() as u16));
  133. sq.extend(data);
  134. self.interest
  135. .update_interest(mio::Interest::READABLE.add(mio::Interest::WRITABLE));
  136. }
  137. }
  138. pub struct UnixServerSocket {
  139. api: std::rc::Rc<crate::API>,
  140. interest: InterestRegistration,
  141. socket: UnixListener,
  142. }
  143. impl UnixServerSocket {
  144. fn build(io: &IOService, path: String) -> Rc<Self> {
  145. match std::fs::remove_file(&path) {
  146. Ok(_) => (),
  147. Err(e) => match e.kind() {
  148. std::io::ErrorKind::PermissionDenied => {
  149. panic!("couldn't remove existing socket file!")
  150. },
  151. std::io::ErrorKind::NotFound => {
  152. // this is perfectly okay!
  153. },
  154. k => {
  155. panic!("Unexpected error removing socket file: {:?}", k)
  156. },
  157. },
  158. }
  159. let socket = UnixListener::bind(path).expect("couldn't bind to given path");
  160. let interest = io.register_interest(socket.as_raw_fd());
  161. let sock = Rc::new(Self {
  162. api: io.api.clone(),
  163. interest,
  164. socket,
  165. });
  166. io.register_handler(sock.clone());
  167. sock
  168. }
  169. }
  170. impl FdHandler for UnixServerSocket {
  171. fn interest(&self) -> &InterestRegistration {
  172. &self.interest
  173. }
  174. fn ready_read(&self, _rc: &Rc<dyn FdHandler>) {
  175. loop {
  176. let client = self.socket.accept();
  177. if let Err(err) = client {
  178. if err.kind() == std::io::ErrorKind::WouldBlock {
  179. break;
  180. } else {
  181. panic!("Unexpected error occurred while accepting new UNIX socket client");
  182. }
  183. }
  184. self.api.with_service(|io: &IOService| {
  185. UnixClientSocket::build(io, client.unwrap().0);
  186. });
  187. }
  188. }
  189. fn ready_write(&self, _rc: &Rc<dyn FdHandler>) {
  190. unreachable!();
  191. }
  192. fn dispatch(&self, _msg: fleck_core::msg::Message) {
  193. unreachable!();
  194. }
  195. }