changeset 10:0868a500a7f6 draft

Implement Stream for UDP sockets, and a Sink for UDP sockets
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 03 Dec 2016 11:18:08 +0100
parents b299e00035bc
children 469c979e4d33
files src/dgram_stream.rs
diffstat 1 files changed, 190 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/dgram_stream.rs	Sat Dec 03 11:18:08 2016 +0100
@@ -0,0 +1,190 @@
+use std::io;
+use std::mem;
+use std::net::SocketAddr;
+
+use error;
+
+use futures::{Async, Poll, Stream, AsyncSink, StartSend};
+use futures::sink::{Send, Sink};
+
+use tokio_core::net::UdpSocket;
+use tokio_uds::UnixDatagram;
+
+fn would_block(e: &io::Error) -> bool {
+    e.kind() == io::ErrorKind::WouldBlock
+}
+
+pub struct UnixDgramStream {
+    sock: UnixDatagram,
+    buf: Vec<u8>,
+}
+
+pub fn new_unix_dgram_stream(max_recv_len: usize, socket: UnixDatagram) -> UnixDgramStream {
+    if max_recv_len < 1 {
+        error::panic("max_recv_len is 0");
+    }
+    let buf = vec![0; max_recv_len];
+
+    UnixDgramStream {
+        sock: socket,
+        buf: buf,
+    }
+}
+
+impl Stream for UnixDgramStream {
+    type Item = Vec<u8>;
+    type Error = io::Error;
+
+    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+        let len = try_nb!(self.sock.recv(&mut self.buf));
+
+        let copy = Vec::from(&self.buf[0..len]);
+
+        Ok(Async::Ready(Some(copy)))
+    }
+}
+
+pub struct UdpDgramStream {
+    sock: UdpSocket,
+    buf: Vec<u8>,
+}
+
+pub fn new_udp_stream(max_recv_len: usize, socket: UdpSocket) -> UdpDgramStream {
+    if max_recv_len < 1 {
+        error::panic("max_recv_len is 0");
+    }
+
+    let buf = vec![0; max_recv_len];
+
+    UdpDgramStream {
+        sock: socket,
+        buf: buf,
+    }
+}
+
+impl Stream for UdpDgramStream {
+    type Item = (Vec<u8>, String);
+    type Error = io::Error;
+
+    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+        let (len, from) = try_nb!(self.sock.recv_from(&mut self.buf));
+        // TODO: Do reverse resolution
+        let sender = format!("{}:{}", from.ip(), from.port());
+
+        Ok(Async::Ready(Some((Vec::from(&self.buf[0..len]), sender))))
+    }
+}
+
+
+/// A sink that sends items to another host.
+pub struct UdpDgramSink {
+    sock: UdpSocket,
+    addr: SocketAddr,
+
+    // LIFO buffer
+    buf: Vec<Vec<u8>>,
+    buffered: usize,
+}
+
+pub fn new_udp_sink(dest: SocketAddr, sock: UdpSocket, bufsize: usize) -> UdpDgramSink {
+    UdpDgramSink {
+        sock: sock,
+        addr: dest,
+        buf: vec![vec![]; bufsize],
+        buffered: 0,
+    }
+}
+
+impl Sink for UdpDgramSink {
+    type SinkItem = Vec<u8>;
+    type SinkError = io::Error;
+
+    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
+        // First try sending
+        match self.sock.send_to(&item, &self.addr) {
+            Ok(_) => {
+                println!("sent");
+                Ok(AsyncSink::Ready)
+            }
+            Err(ref e) if would_block(e) => {
+                if self.buffered < self.buf.len() {
+                    self.buffered += 1;
+                    self.buf[self.buffered - 1] = item;
+                    println!("queued");
+                    Ok(AsyncSink::Ready)
+                } else {
+                    println!("not accepted");
+                    Ok(AsyncSink::NotReady(item))
+                }
+            }
+            Err(e) => Err(e),
+        }
+    }
+
+    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+        println!("polled");
+        if self.buffered > 0 {
+            match self.sock.send_to(&self.buf[self.buffered - 1], &self.addr) {
+                Ok(_) => {
+                    self.buffered -= 1;
+                    let dgram = mem::replace(&mut self.buf[self.buffered], vec![]);
+                    if self.buffered == 0 {
+                        println!("done");
+                        Ok(Async::Ready(()))
+                    } else {
+                        println!("sent later");
+                        Ok(Async::NotReady)
+                    }
+                }
+                Err(ref e) if would_block(e) => {
+                    println!("couldn't send again");
+                    Ok(Async::NotReady)
+                }
+                Err(e) => Err(e),
+            }
+        } else {
+            Ok(Async::Ready(()))
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::net::ToSocketAddrs;
+    use std::cell::Cell;
+    use std::rc::Rc;
+
+    use super::*;
+
+    use futures::{self, Future, Stream, Sink};
+    use tokio_core::reactor::Core;
+    use tokio_core::net::UdpSocket;
+
+    #[test]
+    fn test_dgram_sink() {
+        let verifier = Rc::new(Cell::new(false));
+
+        let mut l = Core::new().unwrap();
+        let handle = l.handle();
+        let dest = "127.0.0.1:12345".to_socket_addrs().unwrap().next().unwrap();
+
+        let sock = UdpSocket::bind(&("127.0.0.1:11111".to_socket_addrs().unwrap().next().unwrap()),
+                                   &handle)
+            .unwrap();
+        let recvsock = UdpSocket::bind(&dest, &handle).unwrap();
+
+        let msg = "HelloWorld".as_bytes().to_vec();
+        let sender = new_udp_sink(dest, sock, 128);
+
+        let receive_verify = verifier.clone();
+        let mut buf = vec![0; 512];
+        let receiver = recvsock.recv_dgram(&mut buf).and_then(|_| {
+            receive_verify.set(true);
+            futures::future::ok(())
+        });
+
+        l.run(sender.send(msg).join(receiver).map(|_| ())).unwrap();
+
+        assert!(verifier.get());
+    }
+}