Mercurial > lbo > hg > clusterrpc-java
view src/main/java/net/borgac/clusterrpc/client/SocketWrapper.java @ 11:593822c857b7 default tip
Enable true correlation in SocketWrapper
This allows applications to send a batch of messages at once and later
still receive the correct responses to every request.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sun, 25 Sep 2016 15:19:20 +0200 |
parents | 0e608c466a58 |
children |
line wrap: on
line source
package net.borgac.clusterrpc.client; import java.io.Closeable; import java.time.Instant; import java.util.Arrays; import java.util.HashMap; import java.util.Random; import org.zeromq.ZMQ; import org.zeromq.ZMsg; /** * SocketWrapper is responsible for framing and lower-level error handling. * * We emulate part of the ZeroMQ functionality here; JeroMQ doesn't implement * ZMTP 3 (yet), so we handle the REQ_RELAXED/REQ_CORRELATE functionality * ourselves. Thus, the inner socket is a DEALER socket, and not a REQ socket. * * This class is not threadsafe. * * @author lbo */ class SocketWrapper implements Closeable { private static final int EXPECTED_RESPONSE_SIZE = 3; private static final int EXPECTED_REQUEST_SIZE = 3; private static final Random ID_GENERATOR = new Random(Instant.now().getEpochSecond()); private final ZMQ.Socket sock; // For correlation of requests private final HashMap<RequestID, byte[]> outstandingRequests; private final Logger logger; SocketWrapper(Logger l) { this.sock = GlobalContextProvider.clientSocket(); this.outstandingRequests = new HashMap<>(); this.logger = l; byte[] clientId = new byte[5]; ID_GENERATOR.nextBytes(clientId); sock.setIdentity(clientId); } // visible for testing ZMQ.Socket getInner() { return sock; } void connect(PeerAddress address) { sock.connect(address.getConnectAddress()); } void disconnect(PeerAddress address) { sock.disconnect(address.getConnectAddress()); } @Override public void close() { sock.close(); } /** * Returns an (opaque) Request ID that should be used in recv() * * @param payload * @return */ RequestID send(String payload) { return send(payload.getBytes()); } RequestID send(byte[] payload) { ZMsg message = new ZMsg(); RequestID id = new RequestID(); // The wire format is // [request ID, empty frame, payload] // ...simulating what a REQ socket would send. message.add(id.getSerializedId()); message.add(new byte[0]); message.add(payload); assert message.size() == EXPECTED_REQUEST_SIZE; if (message.send(sock, true)) { return id; } else { return null; } } byte[] recv(RequestID id) { do { // Check if our response is already here. if (outstandingRequests.containsKey(id)) { return outstandingRequests.remove(id); } ZMsg message = ZMsg.recvMsg(sock); if (message.size() != EXPECTED_RESPONSE_SIZE) { logger.log(Logger.Loglevel.ERROR, "Received response with bad length:", message.size()); } // Check if the response is to our last request; otherwise throw away byte[] requestId = message.pop().getData(); // empty frame message.pop().getData(); byte[] response = message.pop().getData(); if (Arrays.equals(requestId, id.getSerializedId())) { return response; } else { outstandingRequests.put(new RequestID(requestId), response); } } while (true); } static class RequestID { private static final Random ID_GENERATOR = new Random(Instant.now().getEpochSecond() + Instant.now().getNano()); private final byte[] id; RequestID() { this.id = new byte[6]; ID_GENERATOR.nextBytes(this.id); } RequestID(byte[] id) { this.id = id; } byte[] getSerializedId() { return id; } @Override public int hashCode() { return id.hashCode(); } @Override public boolean equals(Object obj) { if (this == obj) { return true; } if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } final RequestID other = (RequestID) obj; if (!Arrays.equals(this.id, other.id)) { return false; } return true; } } }