changeset 6:0e608c466a58

Implement ClientChannel/SocketWrapper and add logging and tests
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 24 Sep 2016 16:42:13 +0200
parents 117cb812e28a
children 0c24a0ee0c34
files src/main/java/net/borgac/clusterrpc/client/Client.java src/main/java/net/borgac/clusterrpc/client/ClientChannel.java src/main/java/net/borgac/clusterrpc/client/GlobalContextProvider.java src/main/java/net/borgac/clusterrpc/client/Logger.java src/main/java/net/borgac/clusterrpc/client/PeerAddress.java src/main/java/net/borgac/clusterrpc/client/SocketWrapper.java src/main/java/net/borgac/clusterrpc/client/StderrLogger.java src/test/java/net/borgac/clusterrpc/client/SocketWrapperTest.java
diffstat 8 files changed, 303 insertions(+), 27 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/net/borgac/clusterrpc/client/Client.java	Fri Sep 23 22:01:17 2016 +0200
+++ b/src/main/java/net/borgac/clusterrpc/client/Client.java	Sat Sep 24 16:42:13 2016 +0200
@@ -6,5 +6,4 @@
  * @author lbo
  */
 public class Client {
-
 }
--- a/src/main/java/net/borgac/clusterrpc/client/ClientChannel.java	Fri Sep 23 22:01:17 2016 +0200
+++ b/src/main/java/net/borgac/clusterrpc/client/ClientChannel.java	Sat Sep 24 16:42:13 2016 +0200
@@ -1,10 +1,12 @@
 package net.borgac.clusterrpc.client;
 
+import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.Closeable;
 import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import proto.Rpc;
 
 /**
  * ClientChannel implements the transport of RPC calls to the server.
@@ -17,9 +19,11 @@
 public final class ClientChannel implements Closeable {
 
     // Simple seed is good enough here.
-    private static final Random CHANNEL_ID_GENERATOR = new Random(
+    private static final Random ID_GENERATOR = new Random(
             Instant.now().getEpochSecond());
 
+    private Logger logger;
+
     private SocketWrapper sock;
     private int peerCounter;
 
@@ -28,22 +32,30 @@
 
     /**
      * Initialize a ClientChannel with no connections.
+     *
+     * @param logger
      */
-    public ClientChannel() {
-        this.sock = new SocketWrapper();
+    public ClientChannel(Logger logger) {
+        this.logger = logger;
+        this.sock = new SocketWrapper(logger);
         this.peerCounter = 0;
         this.peers = new HashMap<>();
-        this.channelId = CHANNEL_ID_GENERATOR.nextLong();
+        this.channelId = ID_GENERATOR.nextLong();
     }
 
     /**
      * Initialize a ClientChannel, and connect to `address`.
      *
+     * @param logger
      * @param address Address of the first peer to connect to.
      */
+    public ClientChannel(Logger logger, PeerAddress address) {
+        this(logger);
+        connect(address);
+    }
+
     public ClientChannel(PeerAddress address) {
-        this();
-        connect(address);
+        this(new StderrLogger(), address);
     }
 
     /**
@@ -87,6 +99,27 @@
         return false;
     }
 
+    boolean send(Rpc.RPCRequest request) {
+        byte[] serialized = request.toByteArray();
+
+        return sock.send(serialized);
+    }
+
+    Rpc.RPCResponse receive() {
+        byte[] response = sock.recv();
+        Rpc.RPCResponse parsed = null;
+
+        try {
+            parsed = Rpc.RPCResponse.parseFrom(response);
+        } catch (InvalidProtocolBufferException e) {
+            logger.log(Logger.Loglevel.ERROR, "Exception parsing RPCResponse:", e.toString());
+        } catch (Exception e) {
+            logger.log(Logger.Loglevel.FATAL, "Unhandled exception; this is a bug:", e.toString());
+        }
+
+        return parsed;
+    }
+
     @Override
     public void close() {
         sock.close();
--- a/src/main/java/net/borgac/clusterrpc/client/GlobalContextProvider.java	Fri Sep 23 22:01:17 2016 +0200
+++ b/src/main/java/net/borgac/clusterrpc/client/GlobalContextProvider.java	Sat Sep 24 16:42:13 2016 +0200
@@ -11,11 +11,11 @@
 
     private static final ZMQ.Context context = ZMQ.context(1);
 
-    private static ZMQ.Socket socket(int type) {
+    static ZMQ.Socket socket(int type) {
         return context.socket(type);
     }
 
-    public static ZMQ.Socket clientSocket() {
+    static ZMQ.Socket clientSocket() {
         // We need to do everything ourselves, as JeroMQ doesn't yet support
         // all features used by clusterrpc servers.
         ZMQ.Socket sock = socket(ZMQ.DEALER);
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/net/borgac/clusterrpc/client/Logger.java	Sat Sep 24 16:42:13 2016 +0200
@@ -0,0 +1,30 @@
+package net.borgac.clusterrpc.client;
+
+/**
+ * A simple logger interface.
+ *
+ * @author lbo
+ */
+public interface Logger {
+
+    public void log(Loglevel l, Object... o);
+
+    /**
+     * Set the loglevel. Any log messages with a loglevel smaller than this will
+     * be swallowed.
+     *
+     * @param l
+     */
+    public void setLoglevel(Loglevel l);
+
+    /**
+     * Loglevel designates how critical a log message is.
+     */
+    public static enum Loglevel implements Comparable<Loglevel> {
+        DEBUG,
+        INFO,
+        WARNING,
+        ERROR,
+        FATAL // FATAL is supposed to abort the program. It is used for non-recoverable errors.
+    }
+}
--- a/src/main/java/net/borgac/clusterrpc/client/PeerAddress.java	Fri Sep 23 22:01:17 2016 +0200
+++ b/src/main/java/net/borgac/clusterrpc/client/PeerAddress.java	Sat Sep 24 16:42:13 2016 +0200
@@ -34,7 +34,21 @@
         this.port = address.getPort();
     }
 
+    /**
+     * For hermetic testing.
+     *
+     * @param inmemoryAddr An arbitrary string.
+     */
+    PeerAddress(String inmemoryAddr) {
+        this.host = inmemoryAddr;
+        this.port = 0;
+    }
+
     String getConnectAddress() {
-        return String.format("tcp://%s:%d", host, port);
+        if (port > 0) {
+            return String.format("tcp://%s:%d", host, port);
+        } else {
+            return String.format("inproc://%s", host);
+        }
     }
 }
--- a/src/main/java/net/borgac/clusterrpc/client/SocketWrapper.java	Fri Sep 23 22:01:17 2016 +0200
+++ b/src/main/java/net/borgac/clusterrpc/client/SocketWrapper.java	Sat Sep 24 16:42:13 2016 +0200
@@ -10,25 +10,36 @@
 /**
  * SocketWrapper is responsible for framing and lower-level error handling.
  *
+ * We emulate part of the ZeroMQ functionality here; JeroMQ doesn't implement
+ * ZMTP 3 (yet), so we handle the REQ_RELAXED/REQ_CORRELATE functionality
+ * ourselves. Thus, the inner socket is a DEALER socket, and not a REQ socket.
+ *
  * @author lbo
  */
 class SocketWrapper implements Closeable {
 
-    private static final int EXPECTED_RESPONSE_SIZE = 4;
-    private static final int EXPECTED_REQUEST_SIZE = 4;
+    private static final int EXPECTED_RESPONSE_SIZE = 3;
+    private static final int EXPECTED_REQUEST_SIZE = 3;
     private static final Random ID_GENERATOR = new Random(Instant.now().getEpochSecond());
 
     private final ZMQ.Socket sock;
-    private final byte[] clientId;
     // For correlation of requests
     private byte[] outstandingRequestId;
+    private final Logger logger;
 
-    SocketWrapper() {
+    SocketWrapper(Logger l) {
         this.sock = GlobalContextProvider.clientSocket();
-        this.clientId = new byte[5];
         this.outstandingRequestId = null;
+        this.logger = l;
 
+        byte[] clientId = new byte[5];
         ID_GENERATOR.nextBytes(clientId);
+        sock.setIdentity(clientId);
+    }
+
+    // visible for testing
+    ZMQ.Socket getInner() {
+        return sock;
     }
 
     void connect(PeerAddress address) {
@@ -59,10 +70,9 @@
         outstandingRequestId = requestId;
 
         // The wire format is
-        // [request ID, client ID, empty frame, payload]
+        // [request ID, empty frame, payload]
         // ...simulating what a REQ socket would send.
         message.add(requestId);
-        message.add(clientId);
         message.add(new byte[0]);
         message.add(payload);
 
@@ -73,21 +83,30 @@
 
     byte[] recv() {
         ZMsg message;
+        byte[] response = null;
 
         do {
             message = ZMsg.recvMsg(sock);
-            // Check if the response is to our last request; otherwise throw away
-        } while (message.size() != EXPECTED_RESPONSE_SIZE
-                || !Arrays.equals(message.getFirst().getData(), outstandingRequestId));
+
+            if (message.size() != EXPECTED_RESPONSE_SIZE) {
+                logger.log(Logger.Loglevel.ERROR, "Received response with bad length:", message.size());
+            }
 
-        byte[] requestId = message.pop().getData();
-        byte[] clientId = message.pop().getData();
-        byte[] empty = message.pop().getData();
-        byte[] response = message.pop().getData();
+            // Check if the response is to our last request; otherwise throw away
+            byte[] requestId = message.pop().getData();
+            // empty frame
+            message.pop().getData();
+            response = message.pop().getData();
 
-        if (!Arrays.equals(clientId, this.clientId)) {
-            return null;
-        }
+            if (Arrays.equals(requestId, outstandingRequestId)) {
+                break;
+            } else {
+                logger.log(Logger.Loglevel.WARNING, "Received response with unknown request ID:",
+                        Arrays.toString(requestId), "vs", Arrays.toString(outstandingRequestId));
+            }
+        } while (true);
+
+        outstandingRequestId = null;
 
         return response;
     }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/net/borgac/clusterrpc/client/StderrLogger.java	Sat Sep 24 16:42:13 2016 +0200
@@ -0,0 +1,51 @@
+package net.borgac.clusterrpc.client;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ *
+ * @author lbo
+ */
+public class StderrLogger implements Logger {
+
+    private Loglevel loglevel;
+    private final SimpleDateFormat formatter;
+
+    public StderrLogger() {
+        this.loglevel = Loglevel.ERROR;
+        this.formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+    }
+
+    public StderrLogger(Loglevel l) {
+        this();
+        this.loglevel = l;
+    }
+
+    @Override
+    public void log(Loglevel l, Object... os) {
+        if (l.compareTo(loglevel) >= 0) {
+            StringBuilder line = new StringBuilder();
+
+            line.append(formatter.format(new Date()));
+            line.append(' ');
+            line.append(loglevel.toString());
+            line.append(' ');
+
+            for (Object o : os) {
+                line.append(o.toString());
+            }
+            System.err.println(line.toString());
+        }
+
+        if (loglevel == Loglevel.FATAL) {
+            System.err.println("Logged FATAL exception; aborting...");
+            System.exit(1);
+        }
+    }
+
+    @Override
+    public void setLoglevel(Loglevel l) {
+        this.loglevel = l;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/test/java/net/borgac/clusterrpc/client/SocketWrapperTest.java	Sat Sep 24 16:42:13 2016 +0200
@@ -0,0 +1,130 @@
+package net.borgac.clusterrpc.client;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.zeromq.ZMQ;
+import org.zeromq.ZMsg;
+
+/**
+ * Test SocketWrapper using inproc sockets.
+ *
+ * @author lbo
+ */
+public class SocketWrapperTest {
+
+    private ZMQ.Socket server;
+    private SocketWrapper sw;
+
+    private static final PeerAddress ADDRESS = new PeerAddress("sock-wrapper-test");
+
+    public SocketWrapperTest() {
+    }
+
+    @BeforeClass
+    public static void setUpClass() {
+    }
+
+    @AfterClass
+    public static void tearDownClass() {
+    }
+
+    @Before
+    public void setUp() {
+        server = GlobalContextProvider.socket(ZMQ.ROUTER);
+        server.bind(ADDRESS.getConnectAddress());
+
+        sw = new SocketWrapper(new StderrLogger(Logger.Loglevel.DEBUG));
+        sw.connect(ADDRESS);
+    }
+
+    @After
+    public void tearDown() {
+        server.close();
+        server = null;
+
+        sw.close();
+        sw = null;
+    }
+
+    @Test
+    public void testClientServerCommunication() {
+        sw.send("TestMessage");
+
+        ZMsg msgs = ZMsg.recvMsg(server);
+
+        Assert.assertEquals(4, msgs.size());
+
+        byte[] clientId = msgs.pop().getData();
+
+        Assert.assertArrayEquals(sw.getInner().getIdentity(), clientId);
+
+        byte[] requestId = msgs.pop().getData();
+
+        Assert.assertEquals(5, requestId.length);
+
+        byte[] empty = msgs.pop().getData();
+
+        Assert.assertEquals(0, empty.length);
+
+        byte[] payload = msgs.pop().getData();
+
+        Assert.assertArrayEquals("TestMessage".getBytes(), payload);
+    }
+
+    @Test
+    public void testSendReceive() {
+        sw.send("TestMessage");
+
+        ZMsg msgs = ZMsg.recvMsg(server);
+        Assert.assertEquals(4, msgs.size());
+
+        ZMsg response = new ZMsg();
+        response.add(msgs.pop());
+        response.add(msgs.pop());
+        response.add(msgs.pop());
+        response.add("ServerResponse");
+
+        response.send(server);
+
+        byte[] fromServer = sw.recv();
+
+        Assert.assertArrayEquals("ServerResponse".getBytes(), fromServer);
+    }
+
+    @Test
+    public void testSendReceiveWithBadRequestId() {
+        sw.send("TestMessage");
+
+        byte[] clientId, requestId;
+
+        ZMsg msgs = ZMsg.recvMsg(server);
+        Assert.assertEquals(4, msgs.size());
+
+        ZMsg response = new ZMsg();
+        clientId = msgs.pop().getData();
+        requestId = msgs.pop().getData();
+
+        response.add(clientId);
+        response.add("xyz1"); // respond with bad request id
+        response.add(new byte[0]);
+        response.add("BadServerResponse");
+
+        response.send(server);
+
+        response.destroy();
+        response.add(clientId);
+        response.add(requestId); // correct request id
+        response.add(new byte[0]);
+        response.add("GoodServerResponse");
+
+        response.send(server);
+
+        // Expectation: Client discards first response.
+        byte[] fromServer = sw.recv();
+        Assert.assertArrayEquals("GoodServerResponse".getBytes(), fromServer);
+    }
+}