view src/main/java/net/borgac/clusterrpc/client/SocketWrapper.java @ 5:117cb812e28a

Implement several lower-level classes (networking/framing)
author Lewin Bormann <lbo@spheniscida.de>
date Fri, 23 Sep 2016 22:01:17 +0200
parents
children 0e608c466a58
line wrap: on
line source

package net.borgac.clusterrpc.client;

import java.io.Closeable;
import java.time.Instant;
import java.util.Arrays;
import java.util.Random;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

/**
 * SocketWrapper is responsible for framing and lower-level error handling.
 *
 * @author lbo
 */
class SocketWrapper implements Closeable {

    private static final int EXPECTED_RESPONSE_SIZE = 4;
    private static final int EXPECTED_REQUEST_SIZE = 4;
    private static final Random ID_GENERATOR = new Random(Instant.now().getEpochSecond());

    private final ZMQ.Socket sock;
    private final byte[] clientId;
    // For correlation of requests
    private byte[] outstandingRequestId;

    SocketWrapper() {
        this.sock = GlobalContextProvider.clientSocket();
        this.clientId = new byte[5];
        this.outstandingRequestId = null;

        ID_GENERATOR.nextBytes(clientId);
    }

    void connect(PeerAddress address) {
        sock.connect(address.getConnectAddress());
    }

    void disconnect(PeerAddress address) {
        sock.disconnect(address.getConnectAddress());
    }

    @Override
    public void close() {
        sock.close();
    }

    boolean send(String payload) {
        return send(payload.getBytes());
    }

    boolean send(byte[] payload) {
        ZMsg message = new ZMsg();

        // If a request that we haven't received yet, we will simulate
        // REQ_RELAXED/REQ_CORRELATE and will just ignore the previous request.
        byte[] requestId = new byte[5];
        ID_GENERATOR.nextBytes(requestId);

        outstandingRequestId = requestId;

        // The wire format is
        // [request ID, client ID, empty frame, payload]
        // ...simulating what a REQ socket would send.
        message.add(requestId);
        message.add(clientId);
        message.add(new byte[0]);
        message.add(payload);

        assert message.size() == EXPECTED_REQUEST_SIZE;

        return message.send(sock, true);
    }

    byte[] recv() {
        ZMsg message;

        do {
            message = ZMsg.recvMsg(sock);
            // Check if the response is to our last request; otherwise throw away
        } while (message.size() != EXPECTED_RESPONSE_SIZE
                || !Arrays.equals(message.getFirst().getData(), outstandingRequestId));

        byte[] requestId = message.pop().getData();
        byte[] clientId = message.pop().getData();
        byte[] empty = message.pop().getData();
        byte[] response = message.pop().getData();

        if (!Arrays.equals(clientId, this.clientId)) {
            return null;
        }

        return response;
    }
}