Mercurial > lbo > hg > syslog
view src/dgram_stream.rs @ 10:0868a500a7f6 draft
Implement Stream for UDP sockets, and a Sink for UDP sockets
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 03 Dec 2016 11:18:08 +0100 |
parents | src/unix_datagram_stream.rs@58aaac3848cb |
children | 4185ac5ed03d |
line wrap: on
line source
use std::io; use std::mem; use std::net::SocketAddr; use error; use futures::{Async, Poll, Stream, AsyncSink, StartSend}; use futures::sink::{Send, Sink}; use tokio_core::net::UdpSocket; use tokio_uds::UnixDatagram; fn would_block(e: &io::Error) -> bool { e.kind() == io::ErrorKind::WouldBlock } pub struct UnixDgramStream { sock: UnixDatagram, buf: Vec<u8>, } pub fn new_unix_dgram_stream(max_recv_len: usize, socket: UnixDatagram) -> UnixDgramStream { if max_recv_len < 1 { error::panic("max_recv_len is 0"); } let buf = vec![0; max_recv_len]; UnixDgramStream { sock: socket, buf: buf, } } impl Stream for UnixDgramStream { type Item = Vec<u8>; type Error = io::Error; fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { let len = try_nb!(self.sock.recv(&mut self.buf)); let copy = Vec::from(&self.buf[0..len]); Ok(Async::Ready(Some(copy))) } } pub struct UdpDgramStream { sock: UdpSocket, buf: Vec<u8>, } pub fn new_udp_stream(max_recv_len: usize, socket: UdpSocket) -> UdpDgramStream { if max_recv_len < 1 { error::panic("max_recv_len is 0"); } let buf = vec![0; max_recv_len]; UdpDgramStream { sock: socket, buf: buf, } } impl Stream for UdpDgramStream { type Item = (Vec<u8>, String); type Error = io::Error; fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { let (len, from) = try_nb!(self.sock.recv_from(&mut self.buf)); // TODO: Do reverse resolution let sender = format!("{}:{}", from.ip(), from.port()); Ok(Async::Ready(Some((Vec::from(&self.buf[0..len]), sender)))) } } /// A sink that sends items to another host. pub struct UdpDgramSink { sock: UdpSocket, addr: SocketAddr, // LIFO buffer buf: Vec<Vec<u8>>, buffered: usize, } pub fn new_udp_sink(dest: SocketAddr, sock: UdpSocket, bufsize: usize) -> UdpDgramSink { UdpDgramSink { sock: sock, addr: dest, buf: vec![vec![]; bufsize], buffered: 0, } } impl Sink for UdpDgramSink { type SinkItem = Vec<u8>; type SinkError = io::Error; fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { // First try sending match self.sock.send_to(&item, &self.addr) { Ok(_) => { println!("sent"); Ok(AsyncSink::Ready) } Err(ref e) if would_block(e) => { if self.buffered < self.buf.len() { self.buffered += 1; self.buf[self.buffered - 1] = item; println!("queued"); Ok(AsyncSink::Ready) } else { println!("not accepted"); Ok(AsyncSink::NotReady(item)) } } Err(e) => Err(e), } } fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { println!("polled"); if self.buffered > 0 { match self.sock.send_to(&self.buf[self.buffered - 1], &self.addr) { Ok(_) => { self.buffered -= 1; let dgram = mem::replace(&mut self.buf[self.buffered], vec![]); if self.buffered == 0 { println!("done"); Ok(Async::Ready(())) } else { println!("sent later"); Ok(Async::NotReady) } } Err(ref e) if would_block(e) => { println!("couldn't send again"); Ok(Async::NotReady) } Err(e) => Err(e), } } else { Ok(Async::Ready(())) } } } #[cfg(test)] mod tests { use std::net::ToSocketAddrs; use std::cell::Cell; use std::rc::Rc; use super::*; use futures::{self, Future, Stream, Sink}; use tokio_core::reactor::Core; use tokio_core::net::UdpSocket; #[test] fn test_dgram_sink() { let verifier = Rc::new(Cell::new(false)); let mut l = Core::new().unwrap(); let handle = l.handle(); let dest = "127.0.0.1:12345".to_socket_addrs().unwrap().next().unwrap(); let sock = UdpSocket::bind(&("127.0.0.1:11111".to_socket_addrs().unwrap().next().unwrap()), &handle) .unwrap(); let recvsock = UdpSocket::bind(&dest, &handle).unwrap(); let msg = "HelloWorld".as_bytes().to_vec(); let sender = new_udp_sink(dest, sock, 128); let receive_verify = verifier.clone(); let mut buf = vec![0; 512]; let receiver = recvsock.recv_dgram(&mut buf).and_then(|_| { receive_verify.set(true); futures::future::ok(()) }); l.run(sender.send(msg).join(receiver).map(|_| ())).unwrap(); assert!(verifier.get()); } }