Mercurial > lbo > hg > clusterrpc-java
view src/main/java/net/borgac/clusterrpc/client/SocketWrapper.java @ 5:117cb812e28a
Implement several lower-level classes (networking/framing)
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Fri, 23 Sep 2016 22:01:17 +0200 |
parents | |
children | 0e608c466a58 |
line wrap: on
line source
package net.borgac.clusterrpc.client; import java.io.Closeable; import java.time.Instant; import java.util.Arrays; import java.util.Random; import org.zeromq.ZMQ; import org.zeromq.ZMsg; /** * SocketWrapper is responsible for framing and lower-level error handling. * * @author lbo */ class SocketWrapper implements Closeable { private static final int EXPECTED_RESPONSE_SIZE = 4; private static final int EXPECTED_REQUEST_SIZE = 4; private static final Random ID_GENERATOR = new Random(Instant.now().getEpochSecond()); private final ZMQ.Socket sock; private final byte[] clientId; // For correlation of requests private byte[] outstandingRequestId; SocketWrapper() { this.sock = GlobalContextProvider.clientSocket(); this.clientId = new byte[5]; this.outstandingRequestId = null; ID_GENERATOR.nextBytes(clientId); } void connect(PeerAddress address) { sock.connect(address.getConnectAddress()); } void disconnect(PeerAddress address) { sock.disconnect(address.getConnectAddress()); } @Override public void close() { sock.close(); } boolean send(String payload) { return send(payload.getBytes()); } boolean send(byte[] payload) { ZMsg message = new ZMsg(); // If a request that we haven't received yet, we will simulate // REQ_RELAXED/REQ_CORRELATE and will just ignore the previous request. byte[] requestId = new byte[5]; ID_GENERATOR.nextBytes(requestId); outstandingRequestId = requestId; // The wire format is // [request ID, client ID, empty frame, payload] // ...simulating what a REQ socket would send. message.add(requestId); message.add(clientId); message.add(new byte[0]); message.add(payload); assert message.size() == EXPECTED_REQUEST_SIZE; return message.send(sock, true); } byte[] recv() { ZMsg message; do { message = ZMsg.recvMsg(sock); // Check if the response is to our last request; otherwise throw away } while (message.size() != EXPECTED_RESPONSE_SIZE || !Arrays.equals(message.getFirst().getData(), outstandingRequestId)); byte[] requestId = message.pop().getData(); byte[] clientId = message.pop().getData(); byte[] empty = message.pop().getData(); byte[] response = message.pop().getData(); if (!Arrays.equals(clientId, this.clientId)) { return null; } return response; } }