Mercurial > lbo > hg > stex
changeset 10:a071454b1739
Implement callbacks to server in client
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Fri, 01 Mar 2019 16:33:03 +0100 |
parents | 77ecf0bbe722 |
children | 24971c82b835 |
files | client/client.py |
diffstat | 1 files changed, 97 insertions(+), 13 deletions(-) [+] |
line wrap: on
line diff
--- a/client/client.py Fri Mar 01 16:32:42 2019 +0100 +++ b/client/client.py Fri Mar 01 16:33:03 2019 +0100 @@ -1,9 +1,11 @@ #!/usr/bin/env python3 import arguments +import json import os import os.path as path import sys +import urllib.parse as url import zmq import PyQt5.QtWidgets as wid @@ -157,6 +159,13 @@ value += stock.current_num * stock.current_price return value + def serialize(self): + s = {'cash': self.cash, 'stock': {}, '_stockdepot': True} + for sym, stock in self.stock.items(): + stock_sum = {'num': stock.current_num} + s['stock'][sym] = stock_sum + return json.dumps(s) + # A stock position in a depot. class DepotStock: sym = '' @@ -211,7 +220,6 @@ self.max = value previous, nxt = (self.current - 1) % self.MAX_LEN, (self.current + 1) % self.MAX_LEN - # Shift graph to the left self.series.replace(self.current, self.current, value) self.series.replace(nxt, nxt, 0) @@ -315,16 +323,17 @@ on_new_message = core.pyqtSignal(dict) - def __init__(self, creds): + def __init__(self, zctx, creds): """callback is a function taking received data dicts.""" super().__init__() - self.zctx = zmq.Context() + self.zctx = zctx self.sock = self.zctx.socket(zmq.SUB) self.sock.setsockopt(zmq.IPV6, 1) self.sock.subscribe('') + self.sock.setsockopt(zmq.RCVTIMEO, 0) + self.sock.connect('tcp://{}'.format(creds.addr)) - self.sock.setsockopt(zmq.RCVTIMEO, 0) fd = self.sock.getsockopt(zmq.FD) self.socknot = core.QSocketNotifier(fd, core.QSocketNotifier.Read) self.socknot.activated.connect(self.on_activated) @@ -337,9 +346,70 @@ if '_stockdata' not in msg: return self.on_new_message.emit(msg) - except Exception: + except Exception as e: return +# Manages communication back to the server, like depot settings etc. +class CallbackSocket(core.QObject): + creds = None + socket = None + waiting = False + queue = [] + + def __init__(self, zctx, creds): + super().__init__() + self.creds = creds + socket = zctx.socket(zmq.REQ) + socket.setsockopt(zmq.IPV6, 1) + socket.setsockopt(zmq.RCVTIMEO, 0) + socket.setsockopt(zmq.SNDTIMEO, 10) + + u = url.urlparse(creds.addr) + hostport = u.netloc or u.path + (host, _, port) = hostport.partition(':') + socket.connect('tcp://{}:{}'.format(host, int(port if port else '9988') + 1)) + self.socket = socket + + fd = self.socket.getsockopt(zmq.FD) + self.socknot = core.QSocketNotifier(fd, core.QSocketNotifier.Read) + self.socknot.activated.connect(self.on_reply) + + def login(self): + self.try_send({'_stocklogin': True}) + + def send_depot(self, depot): + summary = depot.serialize() + self.try_send(summary) + + def wrap(self, msg): + return json.dumps({'_stockcallback': True, 'user': self.creds.user, 'password': self.creds.password, 'msg': msg}) + + def try_send(self, msg): + msg = self.wrap(msg) + if self.waiting: + self.queue.append(msg) + else: + try: + self.socket.send_string(msg) + self.waiting = True + except Exception as e: + print ('DEBUG: Send failed on REQ socket: ', e) + self.queue.append(msg) + assert len(self.queue) < 5 + + @core.pyqtSlot(int) + def on_reply(self, _sock): + try: + msg = self.socket.recv_json() + print('DEBUG: Received response: {}'.format(msg)) + self.waiting = False + + # Try sending oldest message. + if len(self.queue) > 0: + self.try_send(self.queue.pop(0)) + except Exception as e: + print ('DEBUG: RECV failed on REQ socket: ', e) + class Client(arguments.BaseArguments, wid.QWidget): _doc = """ @@ -353,6 +423,9 @@ creds = Creds() depot = Depot() depot_widget = None + zctx = zmq.Context() + timer = None + callback_sock = None def __init__(self): super(wid.QWidget, self).__init__() @@ -366,17 +439,25 @@ ccd.accepted.connect(self.start_wait_window) ccd.show_dialog() + self.timer = core.QTimer(self) + self.timer.setInterval(1500) + self.timer.timeout.connect(self.on_periodic_timer) + self.timer.start() + def set_creds(self, creds): self.creds = creds def start_wait_window(self): - self.sock = ClientSocket(self.creds) - self.sock.on_new_message.connect(self.on_new_data) - - self.start_main_window() + self.mainvbox = wid.QVBoxLayout(self) self.waiting = wid.QLabel("Waiting for incoming stock data - hang tight!", self) self.mainvbox.addWidget(self.depot_widget) self.mainvbox.addWidget(self.waiting) + self.show() + + self.sock = ClientSocket(self.zctx, self.creds) + self.sock.on_new_message.connect(self.on_new_data) + self.callback_sock = CallbackSocket(self.zctx, self.creds) + self.callback_sock.login() stock_widgets = {} @@ -394,6 +475,13 @@ self.depot.add_stock(sym, depotstock) self.depot.priceUpdated.connect(sw.update) + @core.pyqtSlot() + def on_periodic_timer(self): + print('DEBUG: timer expired!') + if not self.callback_sock: + return + self.callback_sock.send_depot(self.depot) + mainvbox = None hboxes = [] widgets_per_hbox = 2 @@ -408,10 +496,6 @@ self.hboxes[-1].addWidget(sw) return - def start_main_window(self): - self.mainvbox = wid.QVBoxLayout(self) - self.show() - def main(): app = wid.QApplication(sys.argv)