Mercurial > lbo > hg > clusterrpc-java
view src/main/java/net/borgac/clusterrpc/client/SocketWrapper.java @ 6:0e608c466a58
Implement ClientChannel/SocketWrapper and add logging and tests
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 24 Sep 2016 16:42:13 +0200 |
parents | 117cb812e28a |
children | 593822c857b7 |
line wrap: on
line source
package net.borgac.clusterrpc.client; import java.io.Closeable; import java.time.Instant; import java.util.Arrays; import java.util.Random; import org.zeromq.ZMQ; import org.zeromq.ZMsg; /** * SocketWrapper is responsible for framing and lower-level error handling. * * We emulate part of the ZeroMQ functionality here; JeroMQ doesn't implement * ZMTP 3 (yet), so we handle the REQ_RELAXED/REQ_CORRELATE functionality * ourselves. Thus, the inner socket is a DEALER socket, and not a REQ socket. * * @author lbo */ class SocketWrapper implements Closeable { private static final int EXPECTED_RESPONSE_SIZE = 3; private static final int EXPECTED_REQUEST_SIZE = 3; private static final Random ID_GENERATOR = new Random(Instant.now().getEpochSecond()); private final ZMQ.Socket sock; // For correlation of requests private byte[] outstandingRequestId; private final Logger logger; SocketWrapper(Logger l) { this.sock = GlobalContextProvider.clientSocket(); this.outstandingRequestId = null; this.logger = l; byte[] clientId = new byte[5]; ID_GENERATOR.nextBytes(clientId); sock.setIdentity(clientId); } // visible for testing ZMQ.Socket getInner() { return sock; } void connect(PeerAddress address) { sock.connect(address.getConnectAddress()); } void disconnect(PeerAddress address) { sock.disconnect(address.getConnectAddress()); } @Override public void close() { sock.close(); } boolean send(String payload) { return send(payload.getBytes()); } boolean send(byte[] payload) { ZMsg message = new ZMsg(); // If a request that we haven't received yet, we will simulate // REQ_RELAXED/REQ_CORRELATE and will just ignore the previous request. byte[] requestId = new byte[5]; ID_GENERATOR.nextBytes(requestId); outstandingRequestId = requestId; // The wire format is // [request ID, empty frame, payload] // ...simulating what a REQ socket would send. message.add(requestId); message.add(new byte[0]); message.add(payload); assert message.size() == EXPECTED_REQUEST_SIZE; return message.send(sock, true); } byte[] recv() { ZMsg message; byte[] response = null; do { message = ZMsg.recvMsg(sock); if (message.size() != EXPECTED_RESPONSE_SIZE) { logger.log(Logger.Loglevel.ERROR, "Received response with bad length:", message.size()); } // Check if the response is to our last request; otherwise throw away byte[] requestId = message.pop().getData(); // empty frame message.pop().getData(); response = message.pop().getData(); if (Arrays.equals(requestId, outstandingRequestId)) { break; } else { logger.log(Logger.Loglevel.WARNING, "Received response with unknown request ID:", Arrays.toString(requestId), "vs", Arrays.toString(outstandingRequestId)); } } while (true); outstandingRequestId = null; return response; } }