|
@@ -1,245 +0,0 @@
|
|
-use std::cell::{RefCell, Cell};
|
|
|
|
-use std::collections::{HashMap,VecDeque};
|
|
|
|
-use std::io::Read;
|
|
|
|
-use std::net::SocketAddr;
|
|
|
|
-use std::os::unix::io::RawFd;
|
|
|
|
-use std::os::unix::prelude::AsRawFd;
|
|
|
|
-
|
|
|
|
-use crate::io::Packet;
|
|
|
|
-
|
|
|
|
-use super::{IOChannel, IOFeedback};
|
|
|
|
-
|
|
|
|
-struct Poller {
|
|
|
|
- epoll: RawFd,
|
|
|
|
- epoll_events: std::cell::RefCell<Vec<epoll::Event>>,
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-impl Default for Poller {
|
|
|
|
- fn default() -> Self {
|
|
|
|
- Self {
|
|
|
|
- epoll: epoll::create(false).expect("couldn't create poller"),
|
|
|
|
- epoll_events: Default::default(),
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-impl Drop for Poller {
|
|
|
|
- fn drop(&mut self) {
|
|
|
|
- epoll::close(self.epoll).expect("couldn't close epoll instance!");
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-impl Poller {
|
|
|
|
- fn poll_event(&self) -> Option<epoll::Event> {
|
|
|
|
- let mut evts = self.epoll_events.borrow_mut();
|
|
|
|
- if !evts.is_empty() {
|
|
|
|
- return evts.pop()
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- let mut new_evts = [epoll::Event::new(epoll::Events::empty(), 0); 4];
|
|
|
|
- match epoll::wait(self.epoll, -1, &mut new_evts) {
|
|
|
|
- Ok(count) => {
|
|
|
|
- evts.extend_from_slice(&new_evts[0..count]);
|
|
|
|
- drop(evts);
|
|
|
|
- self.poll_event()
|
|
|
|
- },
|
|
|
|
- Err(_) => {
|
|
|
|
- None
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- fn write_interest(&self, fd: RawFd) {
|
|
|
|
- epoll::ctl(self.epoll, epoll::ControlOptions::EPOLL_CTL_MOD, fd,
|
|
|
|
- epoll::Event::new(
|
|
|
|
- epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT,
|
|
|
|
- fd as u64
|
|
|
|
- )
|
|
|
|
- ).expect("adding write interest failed?");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- fn write_disinterest(&self, fd: RawFd) {
|
|
|
|
- epoll::ctl(self.epoll, epoll::ControlOptions::EPOLL_CTL_MOD, fd,
|
|
|
|
- epoll::Event::new(
|
|
|
|
- epoll::Events::EPOLLIN,
|
|
|
|
- fd as u64
|
|
|
|
- )
|
|
|
|
- ).expect("removing write interest failed?");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- fn register_read(&self, fd: RawFd) {
|
|
|
|
- epoll::ctl(self.epoll, epoll::ControlOptions::EPOLL_CTL_ADD, fd,
|
|
|
|
- epoll::Event::new(
|
|
|
|
- epoll::Events::EPOLLIN,
|
|
|
|
- fd as u64
|
|
|
|
- )
|
|
|
|
- ).expect("adding read interest failed?");
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-struct Socket {
|
|
|
|
- sock: std::net::UdpSocket,
|
|
|
|
- poller: std::rc::Rc<Poller>,
|
|
|
|
-
|
|
|
|
- send_queue: RefCell<VecDeque<super::Packet>>,
|
|
|
|
- receive_buffer: RefCell<[u8; 2048]>,
|
|
|
|
-
|
|
|
|
- is_writing: Cell<bool>,
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-impl Socket {
|
|
|
|
- fn try_receive(&self) -> Option<Packet> {
|
|
|
|
- let mut buf = self.receive_buffer.borrow_mut();
|
|
|
|
- match self.sock.recv_from(buf.as_mut()) {
|
|
|
|
- Ok((len, addr)) => {
|
|
|
|
- println!("received packet {:?} from {:?}", &buf[0..len], addr);
|
|
|
|
- Some(Packet {
|
|
|
|
- addr: Some(addr),
|
|
|
|
- data: Some(buf[0..len].into()),
|
|
|
|
- channel: None,
|
|
|
|
- msg: None,
|
|
|
|
- })
|
|
|
|
- },
|
|
|
|
- Err(err) => {
|
|
|
|
- None
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- fn queue_packet(&self, packet: super::Packet) {
|
|
|
|
- self.send_queue.borrow_mut().push_back(packet);
|
|
|
|
- if !self.is_writing.get() {
|
|
|
|
- self.poller.write_interest(self.sock.as_raw_fd());
|
|
|
|
- self.is_writing.set(true);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- fn try_send(&self) {
|
|
|
|
- if !self.is_writing.get() {
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- let mut sq = self.send_queue.borrow_mut();
|
|
|
|
- while !sq.is_empty() {
|
|
|
|
- let pkt = sq.front().unwrap();
|
|
|
|
- match self.sock.send_to(pkt.data.as_ref().unwrap(), pkt.addr.as_ref().unwrap()) {
|
|
|
|
- Ok(len) => {
|
|
|
|
- if len != pkt.data.as_ref().unwrap().len() {
|
|
|
|
- log::error!("Tried to send {}-byte packet, only sent {} bytes!", pkt.data.as_ref().unwrap().len(), len);
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- else {
|
|
|
|
- sq.pop_front();
|
|
|
|
- }
|
|
|
|
- },
|
|
|
|
- Err(_) => {
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- self.poller.write_disinterest(self.sock.as_raw_fd());
|
|
|
|
- self.is_writing.set(false);
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-impl super::IOChannel for Socket {
|
|
|
|
- fn send_packet(&self, packet: &mut super::Packet) {
|
|
|
|
- match (packet.addr.take(), packet.data.take()) {
|
|
|
|
- (Some(addr), Some(data)) => {
|
|
|
|
- self.queue_packet(super::Packet {
|
|
|
|
- addr: Some(addr),
|
|
|
|
- data: Some(data),
|
|
|
|
- channel: None,
|
|
|
|
- msg: None,
|
|
|
|
- });
|
|
|
|
- },
|
|
|
|
- _ => {
|
|
|
|
- println!("")
|
|
|
|
- }
|
|
|
|
- };
|
|
|
|
-
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-pub(crate) struct LinuxIO {
|
|
|
|
- local_sockets: Vec<std::rc::Rc<Socket>>,
|
|
|
|
- global_socket: std::rc::Rc<Socket>,
|
|
|
|
- minor_timer: timerfd::TimerFd,
|
|
|
|
- major_timer: timerfd::TimerFd,
|
|
|
|
- poller: std::rc::Rc<Poller>,
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-impl Default for LinuxIO {
|
|
|
|
- fn default() -> Self {
|
|
|
|
- let poller : std::rc::Rc<Poller> = Default::default();
|
|
|
|
- let global = std::net::UdpSocket::bind(SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)), 3535)).expect("couldn't bind on localhost:3536");
|
|
|
|
-
|
|
|
|
- global.set_nonblocking(true).expect("couldn't set nonblocking mode?");
|
|
|
|
-
|
|
|
|
- poller.register_read(global.as_raw_fd());
|
|
|
|
-
|
|
|
|
- // try joining the relevant multicast group
|
|
|
|
- nix::sys::socket::setsockopt(
|
|
|
|
- global.as_raw_fd(),
|
|
|
|
- nix::sys::socket::sockopt::IpAddMembership,
|
|
|
|
- &nix::sys::socket::IpMembershipRequest::new(
|
|
|
|
- *super::MULTICAST_ADDRESS.ip(), None)
|
|
|
|
- ).expect("couldn't request multicast group join?");
|
|
|
|
-
|
|
|
|
- let mut minor = timerfd::TimerFd::new().unwrap();
|
|
|
|
- minor.set_state(timerfd::TimerState::Periodic{ current: std::time::Duration::new(1, 0), interval: std::time::Duration::new(1, 0) }, timerfd::SetTimeFlags::Default);
|
|
|
|
-
|
|
|
|
- let mut major = timerfd::TimerFd::new().unwrap();
|
|
|
|
- major.set_state(timerfd::TimerState::Periodic{ current: std::time::Duration::new(1, 0), interval: std::time::Duration::new(15, 0) }, timerfd::SetTimeFlags::Default);
|
|
|
|
-
|
|
|
|
- poller.register_read(minor.as_raw_fd());
|
|
|
|
- poller.register_read(major.as_raw_fd());
|
|
|
|
-
|
|
|
|
- Self {
|
|
|
|
- local_sockets: vec![],
|
|
|
|
- global_socket: Socket {
|
|
|
|
- sock: global,
|
|
|
|
- send_queue: Default::default(),
|
|
|
|
- poller: poller.clone(),
|
|
|
|
- receive_buffer: [0; 2048].into(),
|
|
|
|
- is_writing: false.into()
|
|
|
|
- }.into(),
|
|
|
|
- minor_timer: minor,
|
|
|
|
- major_timer: major,
|
|
|
|
- poller,
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-impl super::FleckIO for LinuxIO {
|
|
|
|
- fn poll(&self, f: &dyn IOFeedback) {
|
|
|
|
- loop {
|
|
|
|
- if let Some(evt) = self.poller.poll_event() {
|
|
|
|
- let ready_fd = evt.data as i32;
|
|
|
|
- if ready_fd == self.global_socket.sock.as_raw_fd() {
|
|
|
|
- self.global_socket.try_receive().map(|p| f.packet(p));
|
|
|
|
- self.global_socket.try_send();
|
|
|
|
- }
|
|
|
|
- if ready_fd == self.minor_timer.as_raw_fd() {
|
|
|
|
- self.minor_timer.read();
|
|
|
|
- f.minor_tick();
|
|
|
|
- }
|
|
|
|
- if ready_fd == self.major_timer.as_raw_fd() {
|
|
|
|
- self.major_timer.read();
|
|
|
|
- f.major_tick();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- fn local(&self) -> std::rc::Rc<dyn super::IOChannel> {
|
|
|
|
- //let res : std::rc::Rc<dyn super::IOChannel> = self.
|
|
|
|
- self.global()
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- fn global(&self) -> std::rc::Rc<dyn super::IOChannel> {
|
|
|
|
- let res : std::rc::Rc<dyn super::IOChannel> = self.global_socket.clone();
|
|
|
|
- res
|
|
|
|
- }
|
|
|
|
-}
|
|
|