changeset 8:207b826e2b34

Run pyfmt on server and add callback socket
author Lewin Bormann <lbo@spheniscida.de>
date Fri, 01 Mar 2019 15:36:58 +0100
parents 9a701a41a1be
children 77ecf0bbe722
files server/server.py
diffstat 1 files changed, 131 insertions(+), 93 deletions(-) [+]
line wrap: on
line diff
--- a/server/server.py	Fri Mar 01 14:59:39 2019 +0100
+++ b/server/server.py	Fri Mar 01 15:36:58 2019 +0100
@@ -6,7 +6,7 @@
 import random
 import sys
 import time
- 
+    
 from PyQt5.QtWidgets import QApplication, QMainWindow, QMenu, QVBoxLayout, QSizePolicy, QMessageBox, QWidget, QPushButton
 from PyQt5.QtGui import QIcon
 
@@ -19,136 +19,174 @@
 _splitvalue = 20000
 _maxhistory = 100
 
-class Stock:
-    symbol = ''
-    # Stock value in cents
-    _current_value = 0
-    _last_values = []
-    
-    # Random walk coefficients
-    _stddev = 0
 
-    def name():
-        """Generates a stock-ticker-like name."""
-        return ''.join([chr(int(_random.random()*26)+0x41) for i in range(0, 4)])
+class Stock:
+        symbol = ''
+        # Stock value in cents
+        _current_value = 0
+        _last_values = []
+        
+        # Random walk coefficients
+        _stddev = 0
 
-    def __init__(self, name):
-        self.symbol = name
-        self._stddev = _random.random() / 10
-        self._current_value = _random.random() * _maxinitvalue
+        def name():
+            """Generates a stock-ticker-like name."""
+            return ''.join([chr(int(_random.random() * 26) + 0x41) for i in range(0, 4)])
+
+        def __init__(self, name):
+            self.symbol = name
+            self._stddev = _random.random() / 10
+            self._current_value = _random.random() * _maxinitvalue
 
-    def next_price(self):
-        """Calculates a (random) next price based on the current price and history. Returns a dict suitable for inclusion in a _stockdata object."""
-        dev = 0.02*self._current_value or 1
-        new_value = int(_random.normalvariate(self._current_value * 1.001, dev))
-        new_value = abs(new_value)
-        split = False
+        def next_price(self):
+            """Calculates a (random) next price based on the current price and history. Returns a dict suitable for inclusion in a _stockdata object."""
+            dev = 0.02 * self._current_value or 1
+            new_value = int(_random.normalvariate(self._current_value * 1.001, dev))
+            new_value = abs(new_value)
+            split = False
+
+            if new_value > _splitvalue:
+                new_value = new_value / 2
+                split = True
 
-        if new_value > _splitvalue:
-            new_value = new_value / 2
-            split = True
+            self._last_values.append(self._current_value)
+            self._current_value = new_value
+            if len(self._last_values) > _maxhistory:
+                self._last_values = self._last_values[1:]
 
-        self._last_values.append(self._current_value)
-        self._current_value = new_value
-        if len(self._last_values) > _maxhistory:
-            self._last_values = self._last_values[1:]
+            return {'price': new_value, 'split': split, '_stockupdate': True}
 
-        return {'price': new_value, 'split': split, '_stockupdate': True}
+        def current_value(self):
+            return self._current_value
 
-    def current_value(self):
-        return self._current_value
 
 class StockData:
-    _data = {}
+        _data = {}
 
-    def __init__(self, data):
-        self._data = data
-        self._data['_stockdata'] = True
+        def __init__(self, data):
+            self._data = data
+            self._data['_stockdata'] = True
 
-    def data(self):
-        return self._data
+        def data(self):
+            return self._data
 
-    def serialize(self):
-        return json.dumps(self._data)
+        def serialize(self):
+            return json.dumps(self._data)
 
-    def write(self, dst):
-        return json.dump(self._data, dst)
+        def write(self, dst):
+            return json.dump(self._data, dst)
 
-    def deserialize_from(jsondata):
-        """Parse StockData from JSON data. Raises an exception if JSON is invalid or the object is malformed."""
-        data = json.loads(jsondata)
-        if data is not dict or '_stockdata' not in data:
-            raise ValueError('JSON object is not a valid StockData serialization')
-        _data = data
+        def deserialize_from(jsondata):
+            """Parse StockData from JSON data. Raises an exception if JSON is invalid or the object is malformed."""
+            data = json.loads(jsondata)
+            if data is not dict or '_stockdata' not in data:
+                raise ValueError('JSON object is not a valid StockData serialization')
+            _data = data
+
 
 class Stocks:
-    _stocks = []
+        _stocks = []
 
-    def __init__(self, stocks=None):
-        """Takes [Stock]."""
-        self._stocks = stocks
+        def __init__(self, stocks=None):
+            """Takes [Stock]."""
+            self._stocks = stocks
 
-    def generate(self):
-        next = {}
-        for s in self._stocks:
-            nextprice = s.next_price()
-            next[s.symbol] = nextprice
-        return StockData(next)
+        def generate(self):
+            next = {}
+            for s in self._stocks:
+                nextprice = s.next_price()
+                next[s.symbol] = nextprice
+            return StockData(next)
 
 
 class Server(arguments.BaseArguments):
-    _doc = """
+        _doc = """
     Usage:
         stex-server [options]
 
     Options:
         -a --address=<address>  Listen on address.
-        -p --port=<port>        Listen on port.
+        -p --port=<port>        Listen on port (the port directly above will also be used)
         --stocks=<stocks>       Number of stocks to generate.
         --stocklist=<stocks>    List of ticker symbols to generate stocks for.
         --interval=<interval>   Interval in ms to publish stock data (default 500)
         --help                  Print help.
     """
 
-    _stocks = Stocks(None)
+        _stocks = Stocks(None)
+
+        def __init__(self, zctx, callback=None):
+            """callback is called with a StockData object every time new data are available."""
+            super(arguments.BaseArguments, self).__init__(doc=self._doc)
+            if self.help or None:
+                print(self._doc)
+                sys.exit(0)
+
+            port = self.port or '9988'
 
-    def __init__(self, zctx, callback=None):
-        """callback is called with a StockData object every time new data are available."""
-        super(arguments.BaseArguments, self).__init__(doc=self._doc)
-        if self.help or None:
-            print(self._doc)
-            sys.exit(0)
+            interactivesocket = zctx.socket(zmq.ROUTER)
+            interactivesocket.setsockopt(zmq.IPV6, 1)
+            interactivesocket.bind('tcp://{}:{}'.format(self.address or '[::]', int(port) + 1 if self.port else '9989'))
+            interactivesocket.setsockopt(zmq.RCVTIMEO, 0)
+            self.interactivesocket = interactivesocket
 
-        socket = zctx.socket(zmq.PUB)
-        socket.setsockopt(zmq.IPV6, 1)
-        socket.bind('tcp://{}:{}'.format(self.address or '[::]', self.port or '9988'))
-        self._socket = socket
-        self.init_stocks()
+            pubsocket = zctx.socket(zmq.PUB)
+            pubsocket.setsockopt(zmq.IPV6, 1)
+            pubsocket.bind('tcp://{}:{}'.format(self.address or '[::]', port))
+            self.pubsocket = pubsocket
+            self.init_stocks()
+
+        def init_stocks(self):
+            stocklist = []
+            if self.stocklist:
+                stocklist = self.stocklist.split(',')
+            elif self.stocks and int(self.stocks) > 0:
+                stocklist = [Stock.name() for _ in range(0, self.stocks)]
+            else:
+                stocklist = [Stock.name() for _ in range(0, 10)]
 
-    def init_stocks(self):
-        stocklist = []
-        if self.stocklist:
-            stocklist = self.stocklist.split(',')
-        elif self.stocks and int(self.stocks) > 0:
-            stocklist = [Stock.name() for _ in range(0, self.stocks)]
-        else:
-            stocklist = [Stock.name() for _ in range(0, 10)]
+            stocklist = [Stock(name=s) for s in stocklist]
+            self._stocks = Stocks(stocklist)
+
+        def run(self):
+            interval = int(self.interval or 500)
+            newinterval = interval
+
+            p = zmq.Poller()
+            p.register(self.interactivesocket, zmq.POLLIN)
+            while True:
+                before = time.clock_gettime_ns(time.CLOCK_MONOTONIC)
+                events = p.poll(newinterval)
 
-        stocklist = [Stock(name=s) for s in stocklist]
-        self._stocks = Stocks(stocklist)
+                if len(events) > 0:
+                    self.handle_calls(events)
+                    diff = time.clock_gettime_ns(time.CLOCK_MONOTONIC) - before
+                    newinterval = interval - diff if diff < newinterval else interval
+                    newinterval /= 1e6
+                else:  # Timeout
+                    nextdata = self._stocks.generate()
+                    print("DEBUG: {}".format(nextdata))
+                    self.pubsocket.send_string(nextdata.serialize())
+                    newinterval = interval
 
-    def run(self):
-        interval = int(self.interval or 500)
-        while True:
-            time.sleep(interval / 1000.)
-            nextdata = self._stocks.generate()
-            print("DEBUG: {}".format(nextdata))
-            self._socket.send_string(nextdata.serialize())
+        # Handle callbacks from clients.
+        def handle_calls(self, events):
+            # TODO
+            for (sock, ev) in events:
+                if not (ev | zmq.POLLIN):
+                    continue
+                try:
+                    msgs = sock.recv_multipart()
+                    print ('Client {:x}: {}', msgs[0], msgs[1])
+                except:
+                    continue
+
 
 def main():
-    ctx = zmq.Context()
-    s = Server(ctx)
-    s.run()
+        ctx = zmq.Context()
+        s = Server(ctx)
+        s.run()
+
 
 if __name__ == "__main__":
-    main()
+        main()