|
@@ -1,459 +0,0 @@
|
|
|
-use std::{cell::{RefCell, Cell}, os::unix::prelude::{OwnedFd, AsRawFd, FromRawFd, RawFd}, rc::Rc, collections::VecDeque, io::{Read, Write}, ops::DerefMut};
|
|
|
-
|
|
|
-use mio::{Token, unix::SourceFd};
|
|
|
-
|
|
|
-use crate::{prelude::{ChannelSpec, Service}, service::event::NoPriorityTag};
|
|
|
-use super::AbstractFdService;
|
|
|
-
|
|
|
-pub type NoChannel = ((), ());
|
|
|
-
|
|
|
-pub struct MulticastSpec {
|
|
|
- group: std::net::IpAddr,
|
|
|
- iface: std::net::IpAddr,
|
|
|
-}
|
|
|
-
|
|
|
-pub struct NoData;
|
|
|
-
|
|
|
-pub struct UdpData {
|
|
|
- bind_addr: Option<std::net::SocketAddr>,
|
|
|
- join_multicast: Option<MulticastSpec>,
|
|
|
-}
|
|
|
-
|
|
|
-pub struct StreamData {
|
|
|
- fd: Option<RawFd>,
|
|
|
-}
|
|
|
-
|
|
|
-pub struct TimerData {
|
|
|
- interval: Option<std::time::Duration>,
|
|
|
-}
|
|
|
-
|
|
|
-struct InterestRegistration {
|
|
|
- poll: Rc<RefCell<mio::Poll>>,
|
|
|
- write: Cell<bool>,
|
|
|
- token: Token,
|
|
|
-}
|
|
|
-
|
|
|
-impl InterestRegistration {
|
|
|
- fn subscribe_read(&self, source: &mut dyn mio::event::Source) {
|
|
|
- self.poll.borrow_mut().registry().register(
|
|
|
- source, self.token, mio::Interest::READABLE,
|
|
|
- ).expect("couldn't subscribe?")
|
|
|
- }
|
|
|
-
|
|
|
- fn subscribe_write(&self, source: &mut dyn mio::event::Source) {
|
|
|
- if self.write.get() { return }
|
|
|
- self.poll.borrow_mut().registry().reregister(
|
|
|
- source, self.token, mio::Interest::READABLE.add(mio::Interest::WRITABLE),
|
|
|
- ).expect("couldn't subscribe?");
|
|
|
- self.write.set(true);
|
|
|
- }
|
|
|
-
|
|
|
- fn unsubscribe_write(&self, source: &mut dyn mio::event::Source) {
|
|
|
- self.poll.borrow_mut().registry().reregister(
|
|
|
- source, self.token, mio::Interest::READABLE,
|
|
|
- ).expect("couldn't subscribe?");
|
|
|
- self.write.set(false);
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-pub type DefaultServiceBuilder<'l> = ServiceBuilder<'l, NoChannel, NoChannel, NoData>;
|
|
|
-
|
|
|
-// Source: where the service writes packets to (recv)
|
|
|
-// Sink: where the service reads packets from (send)
|
|
|
-pub struct ServiceBuilder<'l, SourceChannel: ChannelSpec, SinkChannel: ChannelSpec, Data> {
|
|
|
- fleck: &'l crate::Fleck,
|
|
|
- poll: Rc<RefCell<mio::Poll>>,
|
|
|
- token: Token,
|
|
|
-
|
|
|
- sink_priority: Option<SinkChannel::Priority>,
|
|
|
-
|
|
|
- data: Data,
|
|
|
-
|
|
|
- _ghost: std::marker::PhantomData<(SourceChannel, SinkChannel)>,
|
|
|
-}
|
|
|
-
|
|
|
-impl<'l, SourceChannel: ChannelSpec, SinkChannel: ChannelSpec>
|
|
|
- ServiceBuilder<'l, SourceChannel, SinkChannel, NoData>
|
|
|
-{
|
|
|
- pub(super) fn new(fleck: &'l crate::Fleck, poll: Rc<RefCell<mio::Poll>>, token: Token) -> Self {
|
|
|
- Self {
|
|
|
- fleck,
|
|
|
- poll,
|
|
|
- token,
|
|
|
-
|
|
|
- sink_priority: None,
|
|
|
-
|
|
|
- data: NoData,
|
|
|
-
|
|
|
- _ghost: Default::default(),
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-impl<'l, SinkChannel: ChannelSpec, Data> ServiceBuilder<'l, NoChannel, SinkChannel, Data> {
|
|
|
- pub fn source<SourceChannel: ChannelSpec<Priority = NoPriorityTag>>(
|
|
|
- self,
|
|
|
- ) -> ServiceBuilder<'l, SourceChannel, SinkChannel, Data> {
|
|
|
- ServiceBuilder {
|
|
|
- fleck: self.fleck,
|
|
|
- poll: self.poll,
|
|
|
- token: self.token,
|
|
|
-
|
|
|
- sink_priority: self.sink_priority,
|
|
|
-
|
|
|
- data: self.data,
|
|
|
-
|
|
|
- _ghost: Default::default(),
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- pub fn source_with<SourceChannel: ChannelSpec>(
|
|
|
- self,
|
|
|
- p: SourceChannel::Priority,
|
|
|
- ) -> ServiceBuilder<'l, SourceChannel, SinkChannel, Data>
|
|
|
- where SourceChannel::Priority: PartialOrd
|
|
|
- {
|
|
|
- ServiceBuilder {
|
|
|
- fleck: self.fleck,
|
|
|
- poll: self.poll,
|
|
|
- token: self.token,
|
|
|
-
|
|
|
- sink_priority: self.sink_priority,
|
|
|
-
|
|
|
- data: self.data,
|
|
|
-
|
|
|
- _ghost: Default::default(),
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-impl<'l, SourceChannel: ChannelSpec, Data> ServiceBuilder<'l, SourceChannel, NoChannel, Data> {
|
|
|
- pub fn sink<SinkChannel: ChannelSpec<Priority = NoPriorityTag>>(
|
|
|
- self,
|
|
|
- ) -> ServiceBuilder<'l, SourceChannel, SinkChannel, Data> {
|
|
|
- ServiceBuilder {
|
|
|
- fleck: self.fleck,
|
|
|
- poll: self.poll,
|
|
|
- token: self.token,
|
|
|
-
|
|
|
- sink_priority: Some(NoPriorityTag),
|
|
|
-
|
|
|
- data: self.data,
|
|
|
-
|
|
|
- _ghost: Default::default(),
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- pub fn sink_with<SinkChannel: ChannelSpec>(
|
|
|
- self,
|
|
|
- p: SinkChannel::Priority,
|
|
|
- ) -> ServiceBuilder<'l, SourceChannel, SinkChannel, Data>
|
|
|
- where SinkChannel::Priority: PartialOrd {
|
|
|
- ServiceBuilder {
|
|
|
- fleck: self.fleck,
|
|
|
- poll: self.poll,
|
|
|
- token: self.token,
|
|
|
-
|
|
|
- sink_priority: Some(p),
|
|
|
-
|
|
|
- data: self.data,
|
|
|
-
|
|
|
- _ghost: Default::default(),
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-impl<'l,
|
|
|
- SourceChannel: ChannelSpec<Data = (std::net::SocketAddr, Vec<u8>)>,
|
|
|
- SinkChannel: ChannelSpec<Data = (std::net::SocketAddr, Vec<u8>)>,
|
|
|
- > ServiceBuilder<'l, SourceChannel, SinkChannel, NoData>
|
|
|
-{
|
|
|
- pub fn udp(self) -> ServiceBuilder<'l, SourceChannel, SinkChannel, UdpData> {
|
|
|
- ServiceBuilder {
|
|
|
- fleck: self.fleck,
|
|
|
- poll: self.poll,
|
|
|
- token: self.token,
|
|
|
-
|
|
|
- sink_priority: self.sink_priority,
|
|
|
-
|
|
|
- data: UdpData {
|
|
|
- bind_addr: None,
|
|
|
- join_multicast: None,
|
|
|
- },
|
|
|
-
|
|
|
- _ghost: Default::default(),
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-impl<'l,
|
|
|
- SourceChannel: ChannelSpec<Data = (std::net::SocketAddr, Vec<u8>)>,
|
|
|
- SinkChannel: ChannelSpec<Data = (std::net::SocketAddr, Vec<u8>)>,
|
|
|
- > ServiceBuilder<'l, SourceChannel, SinkChannel, UdpData>
|
|
|
-{
|
|
|
- pub fn set_bind_address(mut self, bind_addr: std::net::SocketAddr) -> Self {
|
|
|
- self.data.bind_addr = Some(bind_addr);
|
|
|
- self
|
|
|
- }
|
|
|
-
|
|
|
- pub fn set_multicast_group(
|
|
|
- mut self,
|
|
|
- group_addr: std::net::IpAddr,
|
|
|
- interface_addr: std::net::IpAddr,
|
|
|
- ) -> Self {
|
|
|
- self.data.join_multicast = Some(MulticastSpec {
|
|
|
- group: group_addr,
|
|
|
- iface: interface_addr,
|
|
|
- });
|
|
|
- self
|
|
|
- }
|
|
|
-
|
|
|
- pub fn build(self) {
|
|
|
- let socket = mio::net::UdpSocket::bind(
|
|
|
- self.data.bind_addr.expect("UDP socket not given a bind address!")
|
|
|
- ).expect("couldn't bind to UDP socket");
|
|
|
-
|
|
|
- if let Some(mcast) = self.data.join_multicast {
|
|
|
- use std::net::IpAddr;
|
|
|
- match (mcast.group, mcast.iface) {
|
|
|
- (IpAddr::V4(group), IpAddr::V4(iface)) => {
|
|
|
- socket.join_multicast_v4(&group, &iface).expect("couldn't join multicast group");
|
|
|
- },
|
|
|
- (IpAddr::V6(_group), IpAddr::V6(_iface)) => { todo!() },
|
|
|
- _ => panic!("Multicast specification mixes ipv4 and ipv6 addresses!"),
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- let svc = Rc::new(UdpService::<SourceChannel, SinkChannel> {
|
|
|
- interest: InterestRegistration { poll: self.poll, write: Cell::new(false), token: self.token },
|
|
|
- queue: Default::default(),
|
|
|
- // file: RefCell::new(unsafe { std::fs::File::from_raw_fd(self.data.fd.expect("Building stream IO service with no FD?")) }),
|
|
|
- socket: RefCell::new(socket),
|
|
|
- sink_priority: RefCell::new(Some(self.sink_priority.expect("no sink priority given!"))),
|
|
|
- _ghost: Default::default(),
|
|
|
- });
|
|
|
-
|
|
|
- svc.interest.subscribe_read(svc.socket.borrow_mut().deref_mut());
|
|
|
-
|
|
|
- self.fleck.io.borrow_mut().services.borrow_mut().insert(self.token, svc.clone());
|
|
|
- self.fleck.services.borrow_mut().add_service(svc.clone());
|
|
|
-
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-struct UdpService<SourceChannel: ChannelSpec, SinkChannel: ChannelSpec> {
|
|
|
- interest: InterestRegistration,
|
|
|
- queue: VecDeque<(std::net::SocketAddr, Vec<u8>)>,
|
|
|
- socket: RefCell<mio::net::UdpSocket>,
|
|
|
- sink_priority: RefCell<Option<SinkChannel::Priority>>,
|
|
|
- _ghost: std::marker::PhantomData<(SourceChannel, SinkChannel)>,
|
|
|
-}
|
|
|
-
|
|
|
-impl<SourceChannel: ChannelSpec, SinkChannel: ChannelSpec> Service for Rc<UdpService<SourceChannel, SinkChannel>> {
|
|
|
-
|
|
|
-}
|
|
|
-
|
|
|
-impl<SourceChannel: ChannelSpec, SinkChannel: ChannelSpec> AbstractFdService for UdpService<SourceChannel, SinkChannel> {
|
|
|
- fn ready_read(&self, api: &crate::Fleck) {
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- fn ready_write(&self, api: &crate::Fleck) {
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- fn token(&self) -> Token {
|
|
|
- self.interest.token
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-/* timerfd support */
|
|
|
-impl<'l, SourceChannel: ChannelSpec<Data = ()>>
|
|
|
- ServiceBuilder<'l, SourceChannel, ((), ()), NoData>
|
|
|
-{
|
|
|
- pub fn timer(self) -> ServiceBuilder<'l, SourceChannel, ((), ()), TimerData> {
|
|
|
- ServiceBuilder {
|
|
|
- fleck: self.fleck,
|
|
|
- poll: self.poll,
|
|
|
- token: self.token,
|
|
|
-
|
|
|
- sink_priority: self.sink_priority,
|
|
|
-
|
|
|
- data: TimerData { interval: None },
|
|
|
-
|
|
|
- _ghost: Default::default(),
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-struct Timer<Tag: 'static> {
|
|
|
- token: Token,
|
|
|
- timer: timerfd::TimerFd,
|
|
|
- _ghost: std::marker::PhantomData<Tag>,
|
|
|
-}
|
|
|
-
|
|
|
-impl<Tag: 'static> Service for Rc<Timer<Tag>> {}
|
|
|
-impl<Tag: 'static> AbstractFdService for Timer<Tag> {
|
|
|
- fn ready_read(&self, api: &crate::Fleck) {
|
|
|
- self.timer.read();
|
|
|
- api.queue::<(Tag, ())>(());
|
|
|
- }
|
|
|
-
|
|
|
- fn ready_write(&self, _api: &crate::Fleck) {
|
|
|
- unreachable!();
|
|
|
- }
|
|
|
-
|
|
|
- fn token(&self) -> Token {
|
|
|
- self.token
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-impl<'l, SourceChannel: ChannelSpec<Data = ()>>
|
|
|
- ServiceBuilder<'l, SourceChannel, ((), ()), TimerData>
|
|
|
-{
|
|
|
- pub fn interval(mut self, time: std::time::Duration) -> Self {
|
|
|
- self.data.interval = Some(time);
|
|
|
- self
|
|
|
- }
|
|
|
-
|
|
|
- pub fn build(self) {
|
|
|
- let mut raw_timer = timerfd::TimerFd::new_custom(timerfd::ClockId::Monotonic, true, false).expect("couldn't create timer");
|
|
|
- raw_timer.set_state(timerfd::TimerState::Periodic {
|
|
|
- current: std::time::Duration::from_millis(1),
|
|
|
- interval: self.data.interval.expect("no interval set for timer"),
|
|
|
- }, timerfd::SetTimeFlags::Default);
|
|
|
-
|
|
|
- let timer = Rc::new(Timer::<SourceChannel::Tag> { token: self.token, timer: raw_timer, _ghost: Default::default() });
|
|
|
-
|
|
|
- self.poll.borrow().registry().register(
|
|
|
- &mut SourceFd(&timer.timer.as_raw_fd()),
|
|
|
- self.token,
|
|
|
- mio::Interest::READABLE
|
|
|
- ).expect("couldn't register timer read interest?");
|
|
|
-
|
|
|
- self.fleck.io.borrow_mut().services.borrow_mut().insert(self.token, timer);
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-/* stream support */
|
|
|
-impl<'l, SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkChannel: ChannelSpec<Data = Vec<u8>>>
|
|
|
- ServiceBuilder<'l, SourceChannel, SinkChannel, NoData>
|
|
|
-{
|
|
|
- pub fn stream(self) -> ServiceBuilder<'l, SourceChannel, SinkChannel, StreamData> {
|
|
|
- ServiceBuilder {
|
|
|
- fleck: self.fleck,
|
|
|
- poll: self.poll,
|
|
|
- token: self.token,
|
|
|
-
|
|
|
- sink_priority: self.sink_priority,
|
|
|
-
|
|
|
- data: StreamData { fd: None },
|
|
|
-
|
|
|
- _ghost: Default::default(),
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-impl<'l, SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkChannel: ChannelSpec<Data = Vec<u8>>>
|
|
|
- ServiceBuilder<'l, SourceChannel, SinkChannel, StreamData>
|
|
|
-{
|
|
|
- pub fn fd(mut self, fd: RawFd) -> Self {
|
|
|
- self.data.fd = Some(fd);
|
|
|
- self
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-impl<'l, SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkTag: 'static, SinkPriority: 'static + Clone + PartialOrd>
|
|
|
- ServiceBuilder<'l, SourceChannel, (SinkTag, Vec<u8>, SinkPriority), StreamData>
|
|
|
-{
|
|
|
- pub fn build(self) {
|
|
|
- let svc = Rc::new(StreamService::<SourceChannel, (SinkTag, Vec<u8>, SinkPriority)> {
|
|
|
- interest: InterestRegistration { poll: self.poll, write: Cell::new(false), token: self.token },
|
|
|
- queue: Default::default(),
|
|
|
- file: RefCell::new(unsafe { std::fs::File::from_raw_fd(self.data.fd.expect("Building stream IO service with no FD?")) }),
|
|
|
- sink_priority: RefCell::new(Some(self.sink_priority.expect("no sink priority given!"))),
|
|
|
- _ghost: Default::default(),
|
|
|
- });
|
|
|
-
|
|
|
- svc.interest.subscribe_read(&mut SourceFd(&svc.file.borrow().as_raw_fd()));
|
|
|
-
|
|
|
- self.fleck.io.borrow_mut().services.borrow_mut().insert(self.token, svc.clone());
|
|
|
- self.fleck.services.borrow_mut().add_service(svc.clone());
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-impl<'l, SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkTag: 'static>
|
|
|
- ServiceBuilder<'l, SourceChannel, (SinkTag, Vec<u8>), StreamData>
|
|
|
-{
|
|
|
- pub fn build(self) {
|
|
|
- let svc = Rc::new(StreamService::<SourceChannel, (SinkTag, Vec<u8>)> {
|
|
|
- interest: InterestRegistration { poll: self.poll, write: Cell::new(false), token: self.token },
|
|
|
- queue: Default::default(),
|
|
|
- file: RefCell::new(unsafe { std::fs::File::from_raw_fd(self.data.fd.expect("Building stream IO service with no FD?")) }),
|
|
|
- sink_priority: RefCell::new(Some(self.sink_priority.expect("no sink priority given!"))),
|
|
|
- _ghost: Default::default(),
|
|
|
- });
|
|
|
-
|
|
|
- svc.interest.subscribe_read(&mut SourceFd(&svc.file.borrow().as_raw_fd()));
|
|
|
-
|
|
|
- self.fleck.io.borrow_mut().services.borrow_mut().insert(self.token, svc.clone());
|
|
|
- self.fleck.services.borrow_mut().add_service(svc.clone());
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-pub struct StreamService<SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkChannel: ChannelSpec<Data = Vec<u8>>> {
|
|
|
- interest: InterestRegistration,
|
|
|
- queue: RefCell<VecDeque<u8>>,
|
|
|
- file: RefCell<std::fs::File>,
|
|
|
- sink_priority: RefCell<Option<SinkChannel::Priority>>,
|
|
|
- _ghost: std::marker::PhantomData<(SourceChannel, SinkChannel)>,
|
|
|
-}
|
|
|
-
|
|
|
-impl<SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkTag: 'static, SinkPriority: 'static + Clone + PartialOrd> Service for Rc<StreamService<SourceChannel, (SinkTag, Vec<u8>, SinkPriority)>> {
|
|
|
- fn register_channels(&self, eroot: &mut crate::prelude::EventRoot) {
|
|
|
- eroot.channel::<(SinkTag, Vec<u8>, SinkPriority)>().sub_opt(self.sink_priority.borrow_mut().take().expect("registering channel with no priority?"), self, StreamService::sink_data);
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-impl<SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkTag: 'static> Service for Rc<StreamService<SourceChannel, (SinkTag, Vec<u8>)>> {
|
|
|
- fn register_channels(&self, eroot: &mut crate::prelude::EventRoot) {
|
|
|
- eroot.channel::<(SinkTag, Vec<u8>)>().sub_opt(self, StreamService::sink_data);
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-impl<SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkChannel: ChannelSpec<Data = Vec<u8>>> StreamService<SourceChannel, SinkChannel> {
|
|
|
- fn sink_data(&self, _api: &crate::Fleck, mut data: Vec<u8>) -> Option<Vec<u8>> {
|
|
|
- self.queue.borrow_mut().extend(data.drain(..));
|
|
|
- self.interest.subscribe_write(&mut SourceFd(&self.file.borrow().as_raw_fd()));
|
|
|
- None
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-impl<SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkChannel: ChannelSpec<Data = Vec<u8>>> AbstractFdService for StreamService<SourceChannel, SinkChannel> {
|
|
|
- fn ready_read(&self, api: &crate::Fleck) {
|
|
|
- let mut buf = [0u8; 1024];
|
|
|
- while let Ok(bytes) = self.file.borrow_mut().read(&mut buf) {
|
|
|
- api.queue::<SourceChannel>(Vec::from(&buf[..bytes]));
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
- fn ready_write(&self, _api: &crate::Fleck) {
|
|
|
- let mut queue = self.queue.borrow_mut();
|
|
|
- while !queue.is_empty() {
|
|
|
- let (data, _) = queue.as_slices();
|
|
|
- match self.file.borrow_mut().write(data) {
|
|
|
- Ok(len) => {
|
|
|
- queue.drain(..len);
|
|
|
- },
|
|
|
- Err(_) => {
|
|
|
- return
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- self.interest.unsubscribe_write(&mut SourceFd(&self.file.borrow().as_raw_fd()));
|
|
|
- }
|
|
|
- fn token(&self) -> Token {
|
|
|
- self.interest.token
|
|
|
- }
|
|
|
-}
|