view src/dgram_stream.rs @ 10:0868a500a7f6 draft

Implement Stream for UDP sockets, and a Sink for UDP sockets
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 03 Dec 2016 11:18:08 +0100
parents src/unix_datagram_stream.rs@58aaac3848cb
children 4185ac5ed03d
line wrap: on
line source

use std::io;
use std::mem;
use std::net::SocketAddr;

use error;

use futures::{Async, Poll, Stream, AsyncSink, StartSend};
use futures::sink::{Send, Sink};

use tokio_core::net::UdpSocket;
use tokio_uds::UnixDatagram;

fn would_block(e: &io::Error) -> bool {
    e.kind() == io::ErrorKind::WouldBlock
}

pub struct UnixDgramStream {
    sock: UnixDatagram,
    buf: Vec<u8>,
}

pub fn new_unix_dgram_stream(max_recv_len: usize, socket: UnixDatagram) -> UnixDgramStream {
    if max_recv_len < 1 {
        error::panic("max_recv_len is 0");
    }
    let buf = vec![0; max_recv_len];

    UnixDgramStream {
        sock: socket,
        buf: buf,
    }
}

impl Stream for UnixDgramStream {
    type Item = Vec<u8>;
    type Error = io::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        let len = try_nb!(self.sock.recv(&mut self.buf));

        let copy = Vec::from(&self.buf[0..len]);

        Ok(Async::Ready(Some(copy)))
    }
}

pub struct UdpDgramStream {
    sock: UdpSocket,
    buf: Vec<u8>,
}

pub fn new_udp_stream(max_recv_len: usize, socket: UdpSocket) -> UdpDgramStream {
    if max_recv_len < 1 {
        error::panic("max_recv_len is 0");
    }

    let buf = vec![0; max_recv_len];

    UdpDgramStream {
        sock: socket,
        buf: buf,
    }
}

impl Stream for UdpDgramStream {
    type Item = (Vec<u8>, String);
    type Error = io::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        let (len, from) = try_nb!(self.sock.recv_from(&mut self.buf));
        // TODO: Do reverse resolution
        let sender = format!("{}:{}", from.ip(), from.port());

        Ok(Async::Ready(Some((Vec::from(&self.buf[0..len]), sender))))
    }
}


/// A sink that sends items to another host.
pub struct UdpDgramSink {
    sock: UdpSocket,
    addr: SocketAddr,

    // LIFO buffer
    buf: Vec<Vec<u8>>,
    buffered: usize,
}

pub fn new_udp_sink(dest: SocketAddr, sock: UdpSocket, bufsize: usize) -> UdpDgramSink {
    UdpDgramSink {
        sock: sock,
        addr: dest,
        buf: vec![vec![]; bufsize],
        buffered: 0,
    }
}

impl Sink for UdpDgramSink {
    type SinkItem = Vec<u8>;
    type SinkError = io::Error;

    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
        // First try sending
        match self.sock.send_to(&item, &self.addr) {
            Ok(_) => {
                println!("sent");
                Ok(AsyncSink::Ready)
            }
            Err(ref e) if would_block(e) => {
                if self.buffered < self.buf.len() {
                    self.buffered += 1;
                    self.buf[self.buffered - 1] = item;
                    println!("queued");
                    Ok(AsyncSink::Ready)
                } else {
                    println!("not accepted");
                    Ok(AsyncSink::NotReady(item))
                }
            }
            Err(e) => Err(e),
        }
    }

    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
        println!("polled");
        if self.buffered > 0 {
            match self.sock.send_to(&self.buf[self.buffered - 1], &self.addr) {
                Ok(_) => {
                    self.buffered -= 1;
                    let dgram = mem::replace(&mut self.buf[self.buffered], vec![]);
                    if self.buffered == 0 {
                        println!("done");
                        Ok(Async::Ready(()))
                    } else {
                        println!("sent later");
                        Ok(Async::NotReady)
                    }
                }
                Err(ref e) if would_block(e) => {
                    println!("couldn't send again");
                    Ok(Async::NotReady)
                }
                Err(e) => Err(e),
            }
        } else {
            Ok(Async::Ready(()))
        }
    }
}

#[cfg(test)]
mod tests {
    use std::net::ToSocketAddrs;
    use std::cell::Cell;
    use std::rc::Rc;

    use super::*;

    use futures::{self, Future, Stream, Sink};
    use tokio_core::reactor::Core;
    use tokio_core::net::UdpSocket;

    #[test]
    fn test_dgram_sink() {
        let verifier = Rc::new(Cell::new(false));

        let mut l = Core::new().unwrap();
        let handle = l.handle();
        let dest = "127.0.0.1:12345".to_socket_addrs().unwrap().next().unwrap();

        let sock = UdpSocket::bind(&("127.0.0.1:11111".to_socket_addrs().unwrap().next().unwrap()),
                                   &handle)
            .unwrap();
        let recvsock = UdpSocket::bind(&dest, &handle).unwrap();

        let msg = "HelloWorld".as_bytes().to_vec();
        let sender = new_udp_sink(dest, sock, 128);

        let receive_verify = verifier.clone();
        let mut buf = vec![0; 512];
        let receiver = recvsock.recv_dgram(&mut buf).and_then(|_| {
            receive_verify.set(true);
            futures::future::ok(())
        });

        l.run(sender.send(msg).join(receiver).map(|_| ())).unwrap();

        assert!(verifier.get());
    }
}