view src/test/java/net/borgac/clusterrpc/client/SocketWrapperTest.java @ 11:593822c857b7 default tip

Enable true correlation in SocketWrapper This allows applications to send a batch of messages at once and later still receive the correct responses to every request.
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 25 Sep 2016 15:19:20 +0200
parents 0e608c466a58
children
line wrap: on
line source

package net.borgac.clusterrpc.client;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

/**
 * Test SocketWrapper using inproc sockets.
 *
 * @author lbo
 */
public class SocketWrapperTest {

    private ZMQ.Socket server;
    private SocketWrapper sw;

    private static final PeerAddress ADDRESS = new PeerAddress("sock-wrapper-test");

    public SocketWrapperTest() {
    }

    @BeforeClass
    public static void setUpClass() {
    }

    @AfterClass
    public static void tearDownClass() {
    }

    @Before
    public void setUp() {
        server = GlobalContextProvider.socket(ZMQ.ROUTER);
        server.bind(ADDRESS.getConnectAddress());

        sw = new SocketWrapper(new StderrLogger(Logger.Loglevel.DEBUG));
        sw.connect(ADDRESS);
    }

    @After
    public void tearDown() {
        server.close();
        server = null;

        sw.close();
        sw = null;
    }

    @Test
    public void testClientServerCommunication() {
        sw.send("TestMessage");

        ZMsg msgs = ZMsg.recvMsg(server);

        Assert.assertEquals(4, msgs.size());

        byte[] clientId = msgs.pop().getData();

        Assert.assertArrayEquals(sw.getInner().getIdentity(), clientId);

        byte[] requestId = msgs.pop().getData();

        Assert.assertEquals(6, requestId.length);

        byte[] empty = msgs.pop().getData();

        Assert.assertEquals(0, empty.length);

        byte[] payload = msgs.pop().getData();

        Assert.assertArrayEquals("TestMessage".getBytes(), payload);
    }

    /**
     * Echoes responseMessage or the client's payload if responseMessage is
     * null.
     *
     * @param responseMessage
     */
    void serverEcho(String responseMessage) {
        ZMsg msgs = ZMsg.recvMsg(server);
        Assert.assertEquals(4, msgs.size());

        ZMsg response = new ZMsg();
        response.add(msgs.pop());
        response.add(msgs.pop());
        response.add(msgs.pop());

        if (responseMessage != null) {
            response.add(responseMessage);
        } else {
            response.add(msgs.pop());
        }

        response.send(server);
    }

    @Test
    public void testSendReceive() {
        SocketWrapper.RequestID id = sw.send("TestMessage");

        serverEcho("ServerResponse");

        byte[] fromServer = sw.recv(id);

        Assert.assertArrayEquals("ServerResponse".getBytes(), fromServer);
    }

    @Test
    public void testSendReceiveWithBadRequestId() {
        SocketWrapper.RequestID id = sw.send("TestMessage");

        byte[] clientId, requestId;

        ZMsg msgs = ZMsg.recvMsg(server);
        Assert.assertEquals(4, msgs.size());

        ZMsg response = new ZMsg();
        clientId = msgs.pop().getData();
        requestId = msgs.pop().getData();

        response.add(clientId);
        response.add("xyz1"); // respond with bad request id
        response.add(new byte[0]);
        response.add("BadServerResponse");

        response.send(server);

        response.destroy();
        response.add(clientId);
        response.add(requestId); // correct request id
        response.add(new byte[0]);
        response.add("GoodServerResponse");

        response.send(server);

        // Expectation: Client discards first response.
        byte[] fromServer = sw.recv(id);
        Assert.assertArrayEquals("GoodServerResponse".getBytes(), fromServer);
    }

    @Test
    public void testCorrelation() {
        SocketWrapper.RequestID id1 = sw.send("Test1");
        serverEcho(null);

        SocketWrapper.RequestID id2 = sw.send("Test2");
        serverEcho(null);

        SocketWrapper.RequestID id3 = sw.send("Test3");
        serverEcho(null);

        // Expected: Although sent in reverse order, the requests should be correlated correctly.
        byte[] response2 = sw.recv(id2);
        byte[] response3 = sw.recv(id3);
        byte[] response1 = sw.recv(id1);

        Assert.assertArrayEquals(response3, "Test3".getBytes());
        Assert.assertArrayEquals(response2, "Test2".getBytes());
        Assert.assertArrayEquals(response1, "Test1".getBytes());
    }
}