Mercurial > lbo > hg > syslog
changeset 10:0868a500a7f6 draft
Implement Stream for UDP sockets, and a Sink for UDP sockets
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 03 Dec 2016 11:18:08 +0100 |
parents | b299e00035bc |
children | 469c979e4d33 |
files | src/dgram_stream.rs |
diffstat | 1 files changed, 190 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/dgram_stream.rs Sat Dec 03 11:18:08 2016 +0100 @@ -0,0 +1,190 @@ +use std::io; +use std::mem; +use std::net::SocketAddr; + +use error; + +use futures::{Async, Poll, Stream, AsyncSink, StartSend}; +use futures::sink::{Send, Sink}; + +use tokio_core::net::UdpSocket; +use tokio_uds::UnixDatagram; + +fn would_block(e: &io::Error) -> bool { + e.kind() == io::ErrorKind::WouldBlock +} + +pub struct UnixDgramStream { + sock: UnixDatagram, + buf: Vec<u8>, +} + +pub fn new_unix_dgram_stream(max_recv_len: usize, socket: UnixDatagram) -> UnixDgramStream { + if max_recv_len < 1 { + error::panic("max_recv_len is 0"); + } + let buf = vec![0; max_recv_len]; + + UnixDgramStream { + sock: socket, + buf: buf, + } +} + +impl Stream for UnixDgramStream { + type Item = Vec<u8>; + type Error = io::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + let len = try_nb!(self.sock.recv(&mut self.buf)); + + let copy = Vec::from(&self.buf[0..len]); + + Ok(Async::Ready(Some(copy))) + } +} + +pub struct UdpDgramStream { + sock: UdpSocket, + buf: Vec<u8>, +} + +pub fn new_udp_stream(max_recv_len: usize, socket: UdpSocket) -> UdpDgramStream { + if max_recv_len < 1 { + error::panic("max_recv_len is 0"); + } + + let buf = vec![0; max_recv_len]; + + UdpDgramStream { + sock: socket, + buf: buf, + } +} + +impl Stream for UdpDgramStream { + type Item = (Vec<u8>, String); + type Error = io::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + let (len, from) = try_nb!(self.sock.recv_from(&mut self.buf)); + // TODO: Do reverse resolution + let sender = format!("{}:{}", from.ip(), from.port()); + + Ok(Async::Ready(Some((Vec::from(&self.buf[0..len]), sender)))) + } +} + + +/// A sink that sends items to another host. +pub struct UdpDgramSink { + sock: UdpSocket, + addr: SocketAddr, + + // LIFO buffer + buf: Vec<Vec<u8>>, + buffered: usize, +} + +pub fn new_udp_sink(dest: SocketAddr, sock: UdpSocket, bufsize: usize) -> UdpDgramSink { + UdpDgramSink { + sock: sock, + addr: dest, + buf: vec![vec![]; bufsize], + buffered: 0, + } +} + +impl Sink for UdpDgramSink { + type SinkItem = Vec<u8>; + type SinkError = io::Error; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { + // First try sending + match self.sock.send_to(&item, &self.addr) { + Ok(_) => { + println!("sent"); + Ok(AsyncSink::Ready) + } + Err(ref e) if would_block(e) => { + if self.buffered < self.buf.len() { + self.buffered += 1; + self.buf[self.buffered - 1] = item; + println!("queued"); + Ok(AsyncSink::Ready) + } else { + println!("not accepted"); + Ok(AsyncSink::NotReady(item)) + } + } + Err(e) => Err(e), + } + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + println!("polled"); + if self.buffered > 0 { + match self.sock.send_to(&self.buf[self.buffered - 1], &self.addr) { + Ok(_) => { + self.buffered -= 1; + let dgram = mem::replace(&mut self.buf[self.buffered], vec![]); + if self.buffered == 0 { + println!("done"); + Ok(Async::Ready(())) + } else { + println!("sent later"); + Ok(Async::NotReady) + } + } + Err(ref e) if would_block(e) => { + println!("couldn't send again"); + Ok(Async::NotReady) + } + Err(e) => Err(e), + } + } else { + Ok(Async::Ready(())) + } + } +} + +#[cfg(test)] +mod tests { + use std::net::ToSocketAddrs; + use std::cell::Cell; + use std::rc::Rc; + + use super::*; + + use futures::{self, Future, Stream, Sink}; + use tokio_core::reactor::Core; + use tokio_core::net::UdpSocket; + + #[test] + fn test_dgram_sink() { + let verifier = Rc::new(Cell::new(false)); + + let mut l = Core::new().unwrap(); + let handle = l.handle(); + let dest = "127.0.0.1:12345".to_socket_addrs().unwrap().next().unwrap(); + + let sock = UdpSocket::bind(&("127.0.0.1:11111".to_socket_addrs().unwrap().next().unwrap()), + &handle) + .unwrap(); + let recvsock = UdpSocket::bind(&dest, &handle).unwrap(); + + let msg = "HelloWorld".as_bytes().to_vec(); + let sender = new_udp_sink(dest, sock, 128); + + let receive_verify = verifier.clone(); + let mut buf = vec![0; 512]; + let receiver = recvsock.recv_dgram(&mut buf).and_then(|_| { + receive_verify.set(true); + futures::future::ok(()) + }); + + l.run(sender.send(msg).join(receiver).map(|_| ())).unwrap(); + + assert!(verifier.get()); + } +}