1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
| from multiprocessing import Process import os import json import asyncio from aiohttp import web, WSMsgType import zmq
def sst_read(zmq_host='192.168.1.108', zmq_port=5556, aio_host='0.0.0.0', aio_port=8080, verbose=False): """ A process, used for subscribing zmq socket on tcp://zmq_host:zmq_port, receiving Sound Source Tracking(SST) data frames. In the meantime the process runs an aiohttp server on ws://aio_host:aio_port, sending SST data frames to user browser via websocket connection. User browser websocket-client should send a 'received' message after every data frame received, to tell the server 'last message received, please send next.' :output: Sound Source Tracking output like this: { "timeStamp": 45602, "src": [ { "id": 43, "tag": "dynamic", "x": 0.025, "y": 0.128, "z": 0.991, "activity": 1.000 }, { "id": 0, "tag": "", "x": 0.000, "y": 0.000, "z": 0.000, "activity": 0.000 }, { "id": 0, "tag": "", "x": 0.000, "y": 0.000, "z": 0.000, "activity": 0.000 }, { "id": 0, "tag": "", "x": 0.000, "y": 0.000, "z": 0.000, "activity": 0.000 } ] } :return: None """ read_pid = os.getpid() print('#Read Process# %s start' % read_pid) routes = web.RouteTableDef()
context = zmq.Context() zmq_socket = context.socket(zmq.SUB)
print("Collecting updates from odas publisher...") zmq_socket.connect(f"tcp://{zmq_host}:{zmq_port}") zmq_socket.setsockopt_string(zmq.SUBSCRIBE, '{')
@routes.get('/ws') async def websocket_handler(request): print('websocket connection open')
ws = web.WebSocketResponse() await ws.prepare(request) c = 0 await ws.send_str(zmq_socket.recv_string()) async for msg in ws: if msg.type == WSMsgType.TEXT: if msg.data == 'close': print('websocket connection close') await ws.close() else: c += 1 tmp = zmq_socket.recv_string() await ws.send_str(tmp) if verbose: c += 1 print(f'\r{c}', end='', flush=True) elif msg.type == WSMsgType.ERROR: print('ws connection closed with exception %s' % ws.exception()) return ws
app = web.Application() app.add_routes(routes) web.run_app(app, host=aio_host, port=aio_port)
def ssl_read(zmq_host='192.168.1.108', zmq_port=5557, aio_host='0.0.0.0', aio_port=8081, verbose=False): """ A process, used for subscribing zmq socket on tcp://zmq_host:zmq_port, receiving Sound Source Localization(SSL) data frames. In the meantime the process runs an aiohttp server on ws://aio_host:aio_port/ws, sending SSL data frames to user browser via websocket connection. User browser websocket-client should send a 'received' message after every data frame received, to tell the server 'last message received, please send next.' :output: Sound Source Localization output like this: { "timeStamp": 45608, "src": [ { "x": 0.132, "y": 0.181, "z": 0.975, "E": 0.557 }, { "x": 0.198, "y": 0.342, "z": 0.918, "E": 0.130 }, { "x": 0.000, "y": 0.273, "z": 0.962, "E": 0.018 }, { "x": 0.000, "y": 0.339, "z": 0.941, "E": 0.006 } ] } :return: None """ read_pid = os.getpid() print('#Read Process# %s start' % read_pid) routes = web.RouteTableDef()
context = zmq.Context() zmq_socket = context.socket(zmq.SUB)
print("Collecting updates from odas publisher...") zmq_socket.connect(f"tcp://{zmq_host}:{zmq_port}") zmq_socket.setsockopt_string(zmq.SUBSCRIBE, '{')
@routes.get('/ws') async def websocket_handler(request): print('websocket connection open')
ws = web.WebSocketResponse() await ws.prepare(request) c = 0 await ws.send_str(zmq_socket.recv_string()) async for msg in ws: if msg.type == WSMsgType.TEXT: if msg.data == 'close': print('websocket connection close') await ws.close() else: tmp = zmq_socket.recv_string() await ws.send_str(tmp) if verbose: c += 1 print(f'\r{c}', end='', flush=True) elif msg.type == WSMsgType.ERROR: print('ws connection closed with exception %s' % ws.exception()) return ws
app = web.Application() app.add_routes(routes) web.run_app(app, host=aio_host, port=aio_port)
if __name__ == '__main__': try: p_ssl_r = Process(target=ssl_read, kwargs={"verbose": True}) p_sst_r = Process(target=sst_read) p_ssl_r.start() p_sst_r.start() except KeyboardInterrupt: p_ssl_r.terminate() p_sst_r.terminate() print('KeyboardInterrupt pw pr Terminated')
|