|
@@ -0,0 +1,245 @@
|
|
|
|
+use std::cell::{RefCell, Cell};
|
|
|
|
+use std::collections::{HashMap,VecDeque};
|
|
|
|
+use std::io::Read;
|
|
|
|
+use std::net::SocketAddr;
|
|
|
|
+use std::os::unix::io::RawFd;
|
|
|
|
+use std::os::unix::prelude::AsRawFd;
|
|
|
|
+
|
|
|
|
+use crate::io::Packet;
|
|
|
|
+
|
|
|
|
+use super::{IOChannel, IOFeedback};
|
|
|
|
+
|
|
|
|
+struct Poller {
|
|
|
|
+ epoll: RawFd,
|
|
|
|
+ epoll_events: std::cell::RefCell<Vec<epoll::Event>>,
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+impl Default for Poller {
|
|
|
|
+ fn default() -> Self {
|
|
|
|
+ Self {
|
|
|
|
+ epoll: epoll::create(false).expect("couldn't create poller"),
|
|
|
|
+ epoll_events: Default::default(),
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+impl Drop for Poller {
|
|
|
|
+ fn drop(&mut self) {
|
|
|
|
+ epoll::close(self.epoll).expect("couldn't close epoll instance!");
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+impl Poller {
|
|
|
|
+ fn poll_event(&self) -> Option<epoll::Event> {
|
|
|
|
+ let mut evts = self.epoll_events.borrow_mut();
|
|
|
|
+ if !evts.is_empty() {
|
|
|
|
+ return evts.pop()
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ let mut new_evts = [epoll::Event::new(epoll::Events::empty(), 0); 4];
|
|
|
|
+ match epoll::wait(self.epoll, -1, &mut new_evts) {
|
|
|
|
+ Ok(count) => {
|
|
|
|
+ evts.extend_from_slice(&new_evts[0..count]);
|
|
|
|
+ drop(evts);
|
|
|
|
+ self.poll_event()
|
|
|
|
+ },
|
|
|
|
+ Err(_) => {
|
|
|
|
+ None
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ fn write_interest(&self, fd: RawFd) {
|
|
|
|
+ epoll::ctl(self.epoll, epoll::ControlOptions::EPOLL_CTL_MOD, fd,
|
|
|
|
+ epoll::Event::new(
|
|
|
|
+ epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT,
|
|
|
|
+ fd as u64
|
|
|
|
+ )
|
|
|
|
+ ).expect("adding write interest failed?");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ fn write_disinterest(&self, fd: RawFd) {
|
|
|
|
+ epoll::ctl(self.epoll, epoll::ControlOptions::EPOLL_CTL_MOD, fd,
|
|
|
|
+ epoll::Event::new(
|
|
|
|
+ epoll::Events::EPOLLIN,
|
|
|
|
+ fd as u64
|
|
|
|
+ )
|
|
|
|
+ ).expect("removing write interest failed?");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ fn register_read(&self, fd: RawFd) {
|
|
|
|
+ epoll::ctl(self.epoll, epoll::ControlOptions::EPOLL_CTL_ADD, fd,
|
|
|
|
+ epoll::Event::new(
|
|
|
|
+ epoll::Events::EPOLLIN,
|
|
|
|
+ fd as u64
|
|
|
|
+ )
|
|
|
|
+ ).expect("adding read interest failed?");
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+struct Socket {
|
|
|
|
+ sock: std::net::UdpSocket,
|
|
|
|
+ poller: std::rc::Rc<Poller>,
|
|
|
|
+
|
|
|
|
+ send_queue: RefCell<VecDeque<super::Packet>>,
|
|
|
|
+ receive_buffer: RefCell<[u8; 2048]>,
|
|
|
|
+
|
|
|
|
+ is_writing: Cell<bool>,
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+impl Socket {
|
|
|
|
+ fn try_receive(&self) -> Option<Packet> {
|
|
|
|
+ let mut buf = self.receive_buffer.borrow_mut();
|
|
|
|
+ match self.sock.recv_from(buf.as_mut()) {
|
|
|
|
+ Ok((len, addr)) => {
|
|
|
|
+ println!("received packet {:?} from {:?}", &buf[0..len], addr);
|
|
|
|
+ Some(Packet {
|
|
|
|
+ addr: Some(addr),
|
|
|
|
+ data: Some(buf[0..len].into()),
|
|
|
|
+ channel: None,
|
|
|
|
+ msg: None,
|
|
|
|
+ })
|
|
|
|
+ },
|
|
|
|
+ Err(err) => {
|
|
|
|
+ None
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ fn queue_packet(&self, packet: super::Packet) {
|
|
|
|
+ self.send_queue.borrow_mut().push_back(packet);
|
|
|
|
+ if !self.is_writing.get() {
|
|
|
|
+ self.poller.write_interest(self.sock.as_raw_fd());
|
|
|
|
+ self.is_writing.set(true);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ fn try_send(&self) {
|
|
|
|
+ if !self.is_writing.get() {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ let mut sq = self.send_queue.borrow_mut();
|
|
|
|
+ while !sq.is_empty() {
|
|
|
|
+ let pkt = sq.front().unwrap();
|
|
|
|
+ match self.sock.send_to(pkt.data.as_ref().unwrap(), pkt.addr.as_ref().unwrap()) {
|
|
|
|
+ Ok(len) => {
|
|
|
|
+ if len != pkt.data.as_ref().unwrap().len() {
|
|
|
|
+ log::error!("Tried to send {}-byte packet, only sent {} bytes!", pkt.data.as_ref().unwrap().len(), len);
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ else {
|
|
|
|
+ sq.pop_front();
|
|
|
|
+ }
|
|
|
|
+ },
|
|
|
|
+ Err(_) => {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ self.poller.write_disinterest(self.sock.as_raw_fd());
|
|
|
|
+ self.is_writing.set(false);
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+impl super::IOChannel for Socket {
|
|
|
|
+ fn send_packet(&self, packet: &mut super::Packet) {
|
|
|
|
+ match (packet.addr.take(), packet.data.take()) {
|
|
|
|
+ (Some(addr), Some(data)) => {
|
|
|
|
+ self.queue_packet(super::Packet {
|
|
|
|
+ addr: Some(addr),
|
|
|
|
+ data: Some(data),
|
|
|
|
+ channel: None,
|
|
|
|
+ msg: None,
|
|
|
|
+ });
|
|
|
|
+ },
|
|
|
|
+ _ => {
|
|
|
|
+ println!("")
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+pub(crate) struct LinuxIO {
|
|
|
|
+ local_sockets: Vec<std::rc::Rc<Socket>>,
|
|
|
|
+ global_socket: std::rc::Rc<Socket>,
|
|
|
|
+ minor_timer: timerfd::TimerFd,
|
|
|
|
+ major_timer: timerfd::TimerFd,
|
|
|
|
+ poller: std::rc::Rc<Poller>,
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+impl Default for LinuxIO {
|
|
|
|
+ fn default() -> Self {
|
|
|
|
+ let poller : std::rc::Rc<Poller> = Default::default();
|
|
|
|
+ let global = std::net::UdpSocket::bind(SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)), 3535)).expect("couldn't bind on localhost:3536");
|
|
|
|
+
|
|
|
|
+ global.set_nonblocking(true).expect("couldn't set nonblocking mode?");
|
|
|
|
+
|
|
|
|
+ poller.register_read(global.as_raw_fd());
|
|
|
|
+
|
|
|
|
+ // try joining the relevant multicast group
|
|
|
|
+ nix::sys::socket::setsockopt(
|
|
|
|
+ global.as_raw_fd(),
|
|
|
|
+ nix::sys::socket::sockopt::IpAddMembership,
|
|
|
|
+ &nix::sys::socket::IpMembershipRequest::new(
|
|
|
|
+ *super::MULTICAST_ADDRESS.ip(), None)
|
|
|
|
+ ).expect("couldn't request multicast group join?");
|
|
|
|
+
|
|
|
|
+ let mut minor = timerfd::TimerFd::new().unwrap();
|
|
|
|
+ minor.set_state(timerfd::TimerState::Periodic{ current: std::time::Duration::new(1, 0), interval: std::time::Duration::new(1, 0) }, timerfd::SetTimeFlags::Default);
|
|
|
|
+
|
|
|
|
+ let mut major = timerfd::TimerFd::new().unwrap();
|
|
|
|
+ major.set_state(timerfd::TimerState::Periodic{ current: std::time::Duration::new(2, 0), interval: std::time::Duration::new(15, 0) }, timerfd::SetTimeFlags::Default);
|
|
|
|
+
|
|
|
|
+ poller.register_read(minor.as_raw_fd());
|
|
|
|
+ poller.register_read(major.as_raw_fd());
|
|
|
|
+
|
|
|
|
+ Self {
|
|
|
|
+ local_sockets: vec![],
|
|
|
|
+ global_socket: Socket {
|
|
|
|
+ sock: global,
|
|
|
|
+ send_queue: Default::default(),
|
|
|
|
+ poller: poller.clone(),
|
|
|
|
+ receive_buffer: [0; 2048].into(),
|
|
|
|
+ is_writing: false.into()
|
|
|
|
+ }.into(),
|
|
|
|
+ minor_timer: minor,
|
|
|
|
+ major_timer: major,
|
|
|
|
+ poller,
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+impl super::FleckIO for LinuxIO {
|
|
|
|
+ fn poll(&self, f: &dyn IOFeedback) {
|
|
|
|
+ loop {
|
|
|
|
+ if let Some(evt) = self.poller.poll_event() {
|
|
|
|
+ let ready_fd = evt.data as i32;
|
|
|
|
+ if ready_fd == self.global_socket.sock.as_raw_fd() {
|
|
|
|
+ self.global_socket.try_receive().map(|p| f.packet(p));
|
|
|
|
+ self.global_socket.try_send();
|
|
|
|
+ }
|
|
|
|
+ if ready_fd == self.minor_timer.as_raw_fd() {
|
|
|
|
+ self.minor_timer.read();
|
|
|
|
+ f.minor_tick();
|
|
|
|
+ }
|
|
|
|
+ if ready_fd == self.major_timer.as_raw_fd() {
|
|
|
|
+ self.major_timer.read();
|
|
|
|
+ f.major_tick();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ fn local(&self) -> std::rc::Rc<dyn super::IOChannel> {
|
|
|
|
+ //let res : std::rc::Rc<dyn super::IOChannel> = self.
|
|
|
|
+ self.global()
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ fn global(&self) -> std::rc::Rc<dyn super::IOChannel> {
|
|
|
|
+ let res : std::rc::Rc<dyn super::IOChannel> = self.global_socket.clone();
|
|
|
|
+ res
|
|
|
|
+ }
|
|
|
|
+}
|