|
@@ -1,4 +1,4 @@
|
|
|
-use std::{cell::{RefCell, Cell}, os::unix::prelude::{OwnedFd, AsRawFd, FromRawFd, RawFd}, rc::Rc, collections::VecDeque, io::{Read, Write}};
|
|
|
+use std::{cell::{RefCell, Cell}, os::unix::prelude::{OwnedFd, AsRawFd, FromRawFd, RawFd}, rc::Rc, collections::VecDeque, io::{Read, Write}, ops::DerefMut};
|
|
|
|
|
|
use mio::{Token, unix::SourceFd};
|
|
|
|
|
@@ -24,7 +24,7 @@ pub struct StreamData {
|
|
|
}
|
|
|
|
|
|
pub struct TimerData {
|
|
|
- timer: Option<timerfd::TimerState>,
|
|
|
+ interval: Option<std::time::Duration>,
|
|
|
}
|
|
|
|
|
|
struct InterestRegistration {
|
|
@@ -65,7 +65,6 @@ pub struct ServiceBuilder<'l, SourceChannel: ChannelSpec, SinkChannel: ChannelSp
|
|
|
poll: Rc<RefCell<mio::Poll>>,
|
|
|
token: Token,
|
|
|
|
|
|
- source_priority: Option<SourceChannel::Priority>,
|
|
|
sink_priority: Option<SinkChannel::Priority>,
|
|
|
|
|
|
data: Data,
|
|
@@ -82,7 +81,6 @@ impl<'l, SourceChannel: ChannelSpec, SinkChannel: ChannelSpec>
|
|
|
poll,
|
|
|
token,
|
|
|
|
|
|
- source_priority: None,
|
|
|
sink_priority: None,
|
|
|
|
|
|
data: NoData,
|
|
@@ -101,7 +99,6 @@ impl<'l, SinkChannel: ChannelSpec, Data> ServiceBuilder<'l, NoChannel, SinkChann
|
|
|
poll: self.poll,
|
|
|
token: self.token,
|
|
|
|
|
|
- source_priority: Some(NoPriorityTag),
|
|
|
sink_priority: self.sink_priority,
|
|
|
|
|
|
data: self.data,
|
|
@@ -110,16 +107,17 @@ impl<'l, SinkChannel: ChannelSpec, Data> ServiceBuilder<'l, NoChannel, SinkChann
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- pub fn source_with<Priority: PartialOrd, SourceChannel: ChannelSpec<Priority = Priority>>(
|
|
|
+ pub fn source_with<SourceChannel: ChannelSpec>(
|
|
|
self,
|
|
|
- p: Priority,
|
|
|
- ) -> ServiceBuilder<'l, SourceChannel, SinkChannel, Data> {
|
|
|
+ p: SourceChannel::Priority,
|
|
|
+ ) -> ServiceBuilder<'l, SourceChannel, SinkChannel, Data>
|
|
|
+ where SourceChannel::Priority: PartialOrd
|
|
|
+ {
|
|
|
ServiceBuilder {
|
|
|
fleck: self.fleck,
|
|
|
poll: self.poll,
|
|
|
token: self.token,
|
|
|
|
|
|
- source_priority: Some(p),
|
|
|
sink_priority: self.sink_priority,
|
|
|
|
|
|
data: self.data,
|
|
@@ -138,7 +136,6 @@ impl<'l, SourceChannel: ChannelSpec, Data> ServiceBuilder<'l, SourceChannel, NoC
|
|
|
poll: self.poll,
|
|
|
token: self.token,
|
|
|
|
|
|
- source_priority: self.source_priority,
|
|
|
sink_priority: Some(NoPriorityTag),
|
|
|
|
|
|
data: self.data,
|
|
@@ -147,16 +144,16 @@ impl<'l, SourceChannel: ChannelSpec, Data> ServiceBuilder<'l, SourceChannel, NoC
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- pub fn sink_with<Priority: PartialOrd, SinkChannel: ChannelSpec<Priority = Priority>>(
|
|
|
+ pub fn sink_with<SinkChannel: ChannelSpec>(
|
|
|
self,
|
|
|
- p: Priority,
|
|
|
- ) -> ServiceBuilder<'l, SourceChannel, SinkChannel, Data> {
|
|
|
+ p: SinkChannel::Priority,
|
|
|
+ ) -> ServiceBuilder<'l, SourceChannel, SinkChannel, Data>
|
|
|
+ where SinkChannel::Priority: PartialOrd {
|
|
|
ServiceBuilder {
|
|
|
fleck: self.fleck,
|
|
|
poll: self.poll,
|
|
|
token: self.token,
|
|
|
|
|
|
- source_priority: self.source_priority,
|
|
|
sink_priority: Some(p),
|
|
|
|
|
|
data: self.data,
|
|
@@ -167,9 +164,8 @@ impl<'l, SourceChannel: ChannelSpec, Data> ServiceBuilder<'l, SourceChannel, NoC
|
|
|
}
|
|
|
|
|
|
impl<'l,
|
|
|
- PacketType: 'static + From<(std::net::SocketAddr, Vec<u8>)> + Into<(std::net::SocketAddr, Vec<u8>)>,
|
|
|
- SourceChannel: ChannelSpec<Data = PacketType>,
|
|
|
- SinkChannel: ChannelSpec<Data = PacketType>,
|
|
|
+ SourceChannel: ChannelSpec<Data = (std::net::SocketAddr, Vec<u8>)>,
|
|
|
+ SinkChannel: ChannelSpec<Data = (std::net::SocketAddr, Vec<u8>)>,
|
|
|
> ServiceBuilder<'l, SourceChannel, SinkChannel, NoData>
|
|
|
{
|
|
|
pub fn udp(self) -> ServiceBuilder<'l, SourceChannel, SinkChannel, UdpData> {
|
|
@@ -178,7 +174,6 @@ impl<'l,
|
|
|
poll: self.poll,
|
|
|
token: self.token,
|
|
|
|
|
|
- source_priority: self.source_priority,
|
|
|
sink_priority: self.sink_priority,
|
|
|
|
|
|
data: UdpData {
|
|
@@ -192,9 +187,8 @@ impl<'l,
|
|
|
}
|
|
|
|
|
|
impl<'l,
|
|
|
- PacketType: 'static + From<(std::net::SocketAddr, Vec<u8>)> + Into<(std::net::SocketAddr, Vec<u8>)>,
|
|
|
- SourceChannel: ChannelSpec<Data = PacketType>,
|
|
|
- SinkChannel: ChannelSpec<Data = PacketType>,
|
|
|
+ SourceChannel: ChannelSpec<Data = (std::net::SocketAddr, Vec<u8>)>,
|
|
|
+ SinkChannel: ChannelSpec<Data = (std::net::SocketAddr, Vec<u8>)>,
|
|
|
> ServiceBuilder<'l, SourceChannel, SinkChannel, UdpData>
|
|
|
{
|
|
|
pub fn set_bind_address(mut self, bind_addr: std::net::SocketAddr) -> Self {
|
|
@@ -215,8 +209,62 @@ impl<'l,
|
|
|
}
|
|
|
|
|
|
pub fn build(self) {
|
|
|
+ let socket = mio::net::UdpSocket::bind(
|
|
|
+ self.data.bind_addr.expect("UDP socket not given a bind address!")
|
|
|
+ ).expect("couldn't bind to UDP socket");
|
|
|
+
|
|
|
+ if let Some(mcast) = self.data.join_multicast {
|
|
|
+ use std::net::IpAddr;
|
|
|
+ match (mcast.group, mcast.iface) {
|
|
|
+ (IpAddr::V4(group), IpAddr::V4(iface)) => {
|
|
|
+ socket.join_multicast_v4(&group, &iface).expect("couldn't join multicast group");
|
|
|
+ },
|
|
|
+ (IpAddr::V6(_group), IpAddr::V6(_iface)) => { todo!() },
|
|
|
+ _ => panic!("Multicast specification mixes ipv4 and ipv6 addresses!"),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let svc = Rc::new(UdpService::<SourceChannel, SinkChannel> {
|
|
|
+ interest: InterestRegistration { poll: self.poll, write: Cell::new(false), token: self.token },
|
|
|
+ queue: Default::default(),
|
|
|
+ // file: RefCell::new(unsafe { std::fs::File::from_raw_fd(self.data.fd.expect("Building stream IO service with no FD?")) }),
|
|
|
+ socket: RefCell::new(socket),
|
|
|
+ sink_priority: RefCell::new(Some(self.sink_priority.expect("no sink priority given!"))),
|
|
|
+ _ghost: Default::default(),
|
|
|
+ });
|
|
|
+
|
|
|
+ svc.interest.subscribe_read(svc.socket.borrow_mut().deref_mut());
|
|
|
+
|
|
|
+ self.fleck.io.borrow_mut().services.borrow_mut().insert(self.token, svc.clone());
|
|
|
+ self.fleck.services.borrow_mut().add_service(svc.clone());
|
|
|
+
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+struct UdpService<SourceChannel: ChannelSpec, SinkChannel: ChannelSpec> {
|
|
|
+ interest: InterestRegistration,
|
|
|
+ queue: VecDeque<(std::net::SocketAddr, Vec<u8>)>,
|
|
|
+ socket: RefCell<mio::net::UdpSocket>,
|
|
|
+ sink_priority: RefCell<Option<SinkChannel::Priority>>,
|
|
|
+ _ghost: std::marker::PhantomData<(SourceChannel, SinkChannel)>,
|
|
|
+}
|
|
|
+
|
|
|
+impl<SourceChannel: ChannelSpec, SinkChannel: ChannelSpec> Service for Rc<UdpService<SourceChannel, SinkChannel>> {
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+impl<SourceChannel: ChannelSpec, SinkChannel: ChannelSpec> AbstractFdService for UdpService<SourceChannel, SinkChannel> {
|
|
|
+ fn ready_read(&self, api: &crate::Fleck) {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ fn ready_write(&self, api: &crate::Fleck) {
|
|
|
|
|
|
}
|
|
|
+
|
|
|
+ fn token(&self) -> Token {
|
|
|
+ self.interest.token
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/* timerfd support */
|
|
@@ -229,10 +277,9 @@ impl<'l, SourceChannel: ChannelSpec<Data = ()>>
|
|
|
poll: self.poll,
|
|
|
token: self.token,
|
|
|
|
|
|
- source_priority: self.source_priority,
|
|
|
sink_priority: self.sink_priority,
|
|
|
|
|
|
- data: TimerData { timer: None },
|
|
|
+ data: TimerData { interval: None },
|
|
|
|
|
|
_ghost: Default::default(),
|
|
|
}
|
|
@@ -252,7 +299,7 @@ impl<Tag: 'static> AbstractFdService for Timer<Tag> {
|
|
|
api.queue::<(Tag, ())>(());
|
|
|
}
|
|
|
|
|
|
- fn ready_write(&self, api: &crate::Fleck) {
|
|
|
+ fn ready_write(&self, _api: &crate::Fleck) {
|
|
|
unreachable!();
|
|
|
}
|
|
|
|
|
@@ -264,16 +311,19 @@ impl<Tag: 'static> AbstractFdService for Timer<Tag> {
|
|
|
impl<'l, SourceChannel: ChannelSpec<Data = ()>>
|
|
|
ServiceBuilder<'l, SourceChannel, ((), ()), TimerData>
|
|
|
{
|
|
|
- pub fn setup(mut self, state: timerfd::TimerState) -> Self {
|
|
|
- self.data.timer = Some(state);
|
|
|
+ pub fn interval(mut self, time: std::time::Duration) -> Self {
|
|
|
+ self.data.interval = Some(time);
|
|
|
self
|
|
|
}
|
|
|
|
|
|
pub fn build(self) {
|
|
|
- let mut timer = timerfd::TimerFd::new_custom(timerfd::ClockId::Monotonic, true, false).expect("couldn't create timer");
|
|
|
- timer.set_state(self.data.timer.expect("no setup done for timer?"), timerfd::SetTimeFlags::Default);
|
|
|
+ let mut raw_timer = timerfd::TimerFd::new_custom(timerfd::ClockId::Monotonic, true, false).expect("couldn't create timer");
|
|
|
+ raw_timer.set_state(timerfd::TimerState::Periodic {
|
|
|
+ current: std::time::Duration::from_millis(1),
|
|
|
+ interval: self.data.interval.expect("no interval set for timer"),
|
|
|
+ }, timerfd::SetTimeFlags::Default);
|
|
|
|
|
|
- let timer = Rc::new(Timer::<SourceChannel::Tag> { token: self.token, timer, _ghost: Default::default() });
|
|
|
+ let timer = Rc::new(Timer::<SourceChannel::Tag> { token: self.token, timer: raw_timer, _ghost: Default::default() });
|
|
|
|
|
|
self.poll.borrow().registry().register(
|
|
|
&mut SourceFd(&timer.timer.as_raw_fd()),
|
|
@@ -295,7 +345,6 @@ impl<'l, SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkChannel: ChannelSpec<Da
|
|
|
poll: self.poll,
|
|
|
token: self.token,
|
|
|
|
|
|
- source_priority: self.source_priority,
|
|
|
sink_priority: self.sink_priority,
|
|
|
|
|
|
data: StreamData { fd: None },
|
|
@@ -360,12 +409,6 @@ pub struct StreamService<SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkChannel
|
|
|
_ghost: std::marker::PhantomData<(SourceChannel, SinkChannel)>,
|
|
|
}
|
|
|
|
|
|
-/*impl<SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkChannel: ChannelSpec<Data = Vec<u8>, Priority = SinkPriority>, SinkPriority: 'static + PartialOrd + Clone> Service for Rc<StreamService<SourceChannel, SinkChannel>> {
|
|
|
- fn register_channels(&self, eroot: &mut crate::prelude::EventRoot) {
|
|
|
- eroot.channel::<SinkChannel>().sub_opt(self.sink_priority.clone(), self, StreamService::sink_data);
|
|
|
- }
|
|
|
-}*/
|
|
|
-
|
|
|
impl<SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkTag: 'static, SinkPriority: 'static + Clone + PartialOrd> Service for Rc<StreamService<SourceChannel, (SinkTag, Vec<u8>, SinkPriority)>> {
|
|
|
fn register_channels(&self, eroot: &mut crate::prelude::EventRoot) {
|
|
|
eroot.channel::<(SinkTag, Vec<u8>, SinkPriority)>().sub_opt(self.sink_priority.borrow_mut().take().expect("registering channel with no priority?"), self, StreamService::sink_data);
|
|
@@ -379,7 +422,7 @@ impl<SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkTag: 'static> Service for R
|
|
|
}
|
|
|
|
|
|
impl<SourceChannel: ChannelSpec<Data = Vec<u8>>, SinkChannel: ChannelSpec<Data = Vec<u8>>> StreamService<SourceChannel, SinkChannel> {
|
|
|
- fn sink_data(&self, api: &crate::Fleck, mut data: Vec<u8>) -> Option<Vec<u8>> {
|
|
|
+ fn sink_data(&self, _api: &crate::Fleck, mut data: Vec<u8>) -> Option<Vec<u8>> {
|
|
|
self.queue.borrow_mut().extend(data.drain(..));
|
|
|
self.interest.subscribe_write(&mut SourceFd(&self.file.borrow().as_raw_fd()));
|
|
|
None
|