Mercurial > lbo > hg > clusterrpc-java
changeset 6:0e608c466a58
Implement ClientChannel/SocketWrapper and add logging and tests
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 24 Sep 2016 16:42:13 +0200 |
parents | 117cb812e28a |
children | 0c24a0ee0c34 |
files | src/main/java/net/borgac/clusterrpc/client/Client.java src/main/java/net/borgac/clusterrpc/client/ClientChannel.java src/main/java/net/borgac/clusterrpc/client/GlobalContextProvider.java src/main/java/net/borgac/clusterrpc/client/Logger.java src/main/java/net/borgac/clusterrpc/client/PeerAddress.java src/main/java/net/borgac/clusterrpc/client/SocketWrapper.java src/main/java/net/borgac/clusterrpc/client/StderrLogger.java src/test/java/net/borgac/clusterrpc/client/SocketWrapperTest.java |
diffstat | 8 files changed, 303 insertions(+), 27 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/net/borgac/clusterrpc/client/Client.java Fri Sep 23 22:01:17 2016 +0200 +++ b/src/main/java/net/borgac/clusterrpc/client/Client.java Sat Sep 24 16:42:13 2016 +0200 @@ -6,5 +6,4 @@ * @author lbo */ public class Client { - }
--- a/src/main/java/net/borgac/clusterrpc/client/ClientChannel.java Fri Sep 23 22:01:17 2016 +0200 +++ b/src/main/java/net/borgac/clusterrpc/client/ClientChannel.java Sat Sep 24 16:42:13 2016 +0200 @@ -1,10 +1,12 @@ package net.borgac.clusterrpc.client; +import com.google.protobuf.InvalidProtocolBufferException; import java.io.Closeable; import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.Random; +import proto.Rpc; /** * ClientChannel implements the transport of RPC calls to the server. @@ -17,9 +19,11 @@ public final class ClientChannel implements Closeable { // Simple seed is good enough here. - private static final Random CHANNEL_ID_GENERATOR = new Random( + private static final Random ID_GENERATOR = new Random( Instant.now().getEpochSecond()); + private Logger logger; + private SocketWrapper sock; private int peerCounter; @@ -28,22 +32,30 @@ /** * Initialize a ClientChannel with no connections. + * + * @param logger */ - public ClientChannel() { - this.sock = new SocketWrapper(); + public ClientChannel(Logger logger) { + this.logger = logger; + this.sock = new SocketWrapper(logger); this.peerCounter = 0; this.peers = new HashMap<>(); - this.channelId = CHANNEL_ID_GENERATOR.nextLong(); + this.channelId = ID_GENERATOR.nextLong(); } /** * Initialize a ClientChannel, and connect to `address`. * + * @param logger * @param address Address of the first peer to connect to. */ + public ClientChannel(Logger logger, PeerAddress address) { + this(logger); + connect(address); + } + public ClientChannel(PeerAddress address) { - this(); - connect(address); + this(new StderrLogger(), address); } /** @@ -87,6 +99,27 @@ return false; } + boolean send(Rpc.RPCRequest request) { + byte[] serialized = request.toByteArray(); + + return sock.send(serialized); + } + + Rpc.RPCResponse receive() { + byte[] response = sock.recv(); + Rpc.RPCResponse parsed = null; + + try { + parsed = Rpc.RPCResponse.parseFrom(response); + } catch (InvalidProtocolBufferException e) { + logger.log(Logger.Loglevel.ERROR, "Exception parsing RPCResponse:", e.toString()); + } catch (Exception e) { + logger.log(Logger.Loglevel.FATAL, "Unhandled exception; this is a bug:", e.toString()); + } + + return parsed; + } + @Override public void close() { sock.close();
--- a/src/main/java/net/borgac/clusterrpc/client/GlobalContextProvider.java Fri Sep 23 22:01:17 2016 +0200 +++ b/src/main/java/net/borgac/clusterrpc/client/GlobalContextProvider.java Sat Sep 24 16:42:13 2016 +0200 @@ -11,11 +11,11 @@ private static final ZMQ.Context context = ZMQ.context(1); - private static ZMQ.Socket socket(int type) { + static ZMQ.Socket socket(int type) { return context.socket(type); } - public static ZMQ.Socket clientSocket() { + static ZMQ.Socket clientSocket() { // We need to do everything ourselves, as JeroMQ doesn't yet support // all features used by clusterrpc servers. ZMQ.Socket sock = socket(ZMQ.DEALER);
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/net/borgac/clusterrpc/client/Logger.java Sat Sep 24 16:42:13 2016 +0200 @@ -0,0 +1,30 @@ +package net.borgac.clusterrpc.client; + +/** + * A simple logger interface. + * + * @author lbo + */ +public interface Logger { + + public void log(Loglevel l, Object... o); + + /** + * Set the loglevel. Any log messages with a loglevel smaller than this will + * be swallowed. + * + * @param l + */ + public void setLoglevel(Loglevel l); + + /** + * Loglevel designates how critical a log message is. + */ + public static enum Loglevel implements Comparable<Loglevel> { + DEBUG, + INFO, + WARNING, + ERROR, + FATAL // FATAL is supposed to abort the program. It is used for non-recoverable errors. + } +}
--- a/src/main/java/net/borgac/clusterrpc/client/PeerAddress.java Fri Sep 23 22:01:17 2016 +0200 +++ b/src/main/java/net/borgac/clusterrpc/client/PeerAddress.java Sat Sep 24 16:42:13 2016 +0200 @@ -34,7 +34,21 @@ this.port = address.getPort(); } + /** + * For hermetic testing. + * + * @param inmemoryAddr An arbitrary string. + */ + PeerAddress(String inmemoryAddr) { + this.host = inmemoryAddr; + this.port = 0; + } + String getConnectAddress() { - return String.format("tcp://%s:%d", host, port); + if (port > 0) { + return String.format("tcp://%s:%d", host, port); + } else { + return String.format("inproc://%s", host); + } } }
--- a/src/main/java/net/borgac/clusterrpc/client/SocketWrapper.java Fri Sep 23 22:01:17 2016 +0200 +++ b/src/main/java/net/borgac/clusterrpc/client/SocketWrapper.java Sat Sep 24 16:42:13 2016 +0200 @@ -10,25 +10,36 @@ /** * SocketWrapper is responsible for framing and lower-level error handling. * + * We emulate part of the ZeroMQ functionality here; JeroMQ doesn't implement + * ZMTP 3 (yet), so we handle the REQ_RELAXED/REQ_CORRELATE functionality + * ourselves. Thus, the inner socket is a DEALER socket, and not a REQ socket. + * * @author lbo */ class SocketWrapper implements Closeable { - private static final int EXPECTED_RESPONSE_SIZE = 4; - private static final int EXPECTED_REQUEST_SIZE = 4; + private static final int EXPECTED_RESPONSE_SIZE = 3; + private static final int EXPECTED_REQUEST_SIZE = 3; private static final Random ID_GENERATOR = new Random(Instant.now().getEpochSecond()); private final ZMQ.Socket sock; - private final byte[] clientId; // For correlation of requests private byte[] outstandingRequestId; + private final Logger logger; - SocketWrapper() { + SocketWrapper(Logger l) { this.sock = GlobalContextProvider.clientSocket(); - this.clientId = new byte[5]; this.outstandingRequestId = null; + this.logger = l; + byte[] clientId = new byte[5]; ID_GENERATOR.nextBytes(clientId); + sock.setIdentity(clientId); + } + + // visible for testing + ZMQ.Socket getInner() { + return sock; } void connect(PeerAddress address) { @@ -59,10 +70,9 @@ outstandingRequestId = requestId; // The wire format is - // [request ID, client ID, empty frame, payload] + // [request ID, empty frame, payload] // ...simulating what a REQ socket would send. message.add(requestId); - message.add(clientId); message.add(new byte[0]); message.add(payload); @@ -73,21 +83,30 @@ byte[] recv() { ZMsg message; + byte[] response = null; do { message = ZMsg.recvMsg(sock); - // Check if the response is to our last request; otherwise throw away - } while (message.size() != EXPECTED_RESPONSE_SIZE - || !Arrays.equals(message.getFirst().getData(), outstandingRequestId)); + + if (message.size() != EXPECTED_RESPONSE_SIZE) { + logger.log(Logger.Loglevel.ERROR, "Received response with bad length:", message.size()); + } - byte[] requestId = message.pop().getData(); - byte[] clientId = message.pop().getData(); - byte[] empty = message.pop().getData(); - byte[] response = message.pop().getData(); + // Check if the response is to our last request; otherwise throw away + byte[] requestId = message.pop().getData(); + // empty frame + message.pop().getData(); + response = message.pop().getData(); - if (!Arrays.equals(clientId, this.clientId)) { - return null; - } + if (Arrays.equals(requestId, outstandingRequestId)) { + break; + } else { + logger.log(Logger.Loglevel.WARNING, "Received response with unknown request ID:", + Arrays.toString(requestId), "vs", Arrays.toString(outstandingRequestId)); + } + } while (true); + + outstandingRequestId = null; return response; }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/net/borgac/clusterrpc/client/StderrLogger.java Sat Sep 24 16:42:13 2016 +0200 @@ -0,0 +1,51 @@ +package net.borgac.clusterrpc.client; + +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * + * @author lbo + */ +public class StderrLogger implements Logger { + + private Loglevel loglevel; + private final SimpleDateFormat formatter; + + public StderrLogger() { + this.loglevel = Loglevel.ERROR; + this.formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + } + + public StderrLogger(Loglevel l) { + this(); + this.loglevel = l; + } + + @Override + public void log(Loglevel l, Object... os) { + if (l.compareTo(loglevel) >= 0) { + StringBuilder line = new StringBuilder(); + + line.append(formatter.format(new Date())); + line.append(' '); + line.append(loglevel.toString()); + line.append(' '); + + for (Object o : os) { + line.append(o.toString()); + } + System.err.println(line.toString()); + } + + if (loglevel == Loglevel.FATAL) { + System.err.println("Logged FATAL exception; aborting..."); + System.exit(1); + } + } + + @Override + public void setLoglevel(Loglevel l) { + this.loglevel = l; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/test/java/net/borgac/clusterrpc/client/SocketWrapperTest.java Sat Sep 24 16:42:13 2016 +0200 @@ -0,0 +1,130 @@ +package net.borgac.clusterrpc.client; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.zeromq.ZMQ; +import org.zeromq.ZMsg; + +/** + * Test SocketWrapper using inproc sockets. + * + * @author lbo + */ +public class SocketWrapperTest { + + private ZMQ.Socket server; + private SocketWrapper sw; + + private static final PeerAddress ADDRESS = new PeerAddress("sock-wrapper-test"); + + public SocketWrapperTest() { + } + + @BeforeClass + public static void setUpClass() { + } + + @AfterClass + public static void tearDownClass() { + } + + @Before + public void setUp() { + server = GlobalContextProvider.socket(ZMQ.ROUTER); + server.bind(ADDRESS.getConnectAddress()); + + sw = new SocketWrapper(new StderrLogger(Logger.Loglevel.DEBUG)); + sw.connect(ADDRESS); + } + + @After + public void tearDown() { + server.close(); + server = null; + + sw.close(); + sw = null; + } + + @Test + public void testClientServerCommunication() { + sw.send("TestMessage"); + + ZMsg msgs = ZMsg.recvMsg(server); + + Assert.assertEquals(4, msgs.size()); + + byte[] clientId = msgs.pop().getData(); + + Assert.assertArrayEquals(sw.getInner().getIdentity(), clientId); + + byte[] requestId = msgs.pop().getData(); + + Assert.assertEquals(5, requestId.length); + + byte[] empty = msgs.pop().getData(); + + Assert.assertEquals(0, empty.length); + + byte[] payload = msgs.pop().getData(); + + Assert.assertArrayEquals("TestMessage".getBytes(), payload); + } + + @Test + public void testSendReceive() { + sw.send("TestMessage"); + + ZMsg msgs = ZMsg.recvMsg(server); + Assert.assertEquals(4, msgs.size()); + + ZMsg response = new ZMsg(); + response.add(msgs.pop()); + response.add(msgs.pop()); + response.add(msgs.pop()); + response.add("ServerResponse"); + + response.send(server); + + byte[] fromServer = sw.recv(); + + Assert.assertArrayEquals("ServerResponse".getBytes(), fromServer); + } + + @Test + public void testSendReceiveWithBadRequestId() { + sw.send("TestMessage"); + + byte[] clientId, requestId; + + ZMsg msgs = ZMsg.recvMsg(server); + Assert.assertEquals(4, msgs.size()); + + ZMsg response = new ZMsg(); + clientId = msgs.pop().getData(); + requestId = msgs.pop().getData(); + + response.add(clientId); + response.add("xyz1"); // respond with bad request id + response.add(new byte[0]); + response.add("BadServerResponse"); + + response.send(server); + + response.destroy(); + response.add(clientId); + response.add(requestId); // correct request id + response.add(new byte[0]); + response.add("GoodServerResponse"); + + response.send(server); + + // Expectation: Client discards first response. + byte[] fromServer = sw.recv(); + Assert.assertArrayEquals("GoodServerResponse".getBytes(), fromServer); + } +}