|
@@ -0,0 +1,134 @@
|
|
|
+import time
|
|
|
+import numpy as np
|
|
|
+import uuid
|
|
|
+import json
|
|
|
+import zmq
|
|
|
+import serial
|
|
|
+import sys
|
|
|
+from collections import OrderedDict
|
|
|
+
|
|
|
+sys.path.append('../')
|
|
|
+from openephys import OpenEphysEvent, OpenEphysSpikeEvent
|
|
|
+
|
|
|
+context = zmq.Context()
|
|
|
+
|
|
|
+def send_heartbeat():
|
|
|
+ global data_socket, event_socket, poller
|
|
|
+ global socket_waits_reply
|
|
|
+ global last_heartbeat_time
|
|
|
+
|
|
|
+ d = {'application': 'latency_test', 'uuid': uuid, 'type': 'heartbeat'}
|
|
|
+ j_msg = json.dumps(d)
|
|
|
+ event_socket.send(j_msg.encode('utf-8'))
|
|
|
+ last_heartbeat_time = time.time()
|
|
|
+ socket_waits_reply = True
|
|
|
+
|
|
|
+def fdump(fhnd, ts, data=None):
|
|
|
+ if data is not None:
|
|
|
+ fhnd.write("%d, %.5f" % (ts, data))
|
|
|
+ else:
|
|
|
+ fhnd.write("%d" % ts)
|
|
|
+ fhnd.flush()
|
|
|
+
|
|
|
+def connect():
|
|
|
+ global data_socket, event_socket, poller
|
|
|
+ global ser
|
|
|
+
|
|
|
+ print("init socket")
|
|
|
+ data_socket = context.socket(zmq.SUB)
|
|
|
+ data_socket.connect("tcp://localhost:%d" % dataport)
|
|
|
+
|
|
|
+ event_socket = context.socket(zmq.REQ)
|
|
|
+ event_socket.connect("tcp://localhost:%d" % eventport)
|
|
|
+
|
|
|
+ data_socket.setsockopt(zmq.SUBSCRIBE, b'')
|
|
|
+ poller.register(data_socket, zmq.POLLIN)
|
|
|
+ poller.register(event_socket, zmq.POLLIN)
|
|
|
+
|
|
|
+ ser = serial.Serial('COM6', 2000000, timeout=.1)
|
|
|
+
|
|
|
+
|
|
|
+def dump_event(header, event):
|
|
|
+ global timestamp
|
|
|
+ if event.type == 'TIMESTAMP':
|
|
|
+ timestamp = event.timestamp
|
|
|
+ elif event.type == 'TTL' and event.event_id == 1:
|
|
|
+ fdump(fttl, event.sample_num + timestamp)
|
|
|
+ print("Event:", header)
|
|
|
+ print(event)
|
|
|
+
|
|
|
+def dump_data(header, content, data):
|
|
|
+ if timestamp == -1:
|
|
|
+ print("Dropping data - arrived before timestamp")
|
|
|
+ print("Data:", content)
|
|
|
+ print(header)
|
|
|
+
|
|
|
+def send_pps():
|
|
|
+ ser.write(b'x')
|
|
|
+ print("Pulse triggered")
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ dataport=5556
|
|
|
+ eventport=5557
|
|
|
+ data_socket = None
|
|
|
+ event_socket = None
|
|
|
+ poller = zmq.Poller()
|
|
|
+ message_no = -1
|
|
|
+ socket_waits_reply = False
|
|
|
+ app_name = 'Dumper Process'
|
|
|
+ uuid = str(uuid.uuid4())
|
|
|
+ last_heartbeat_time = 0
|
|
|
+ last_reply_time = time.time()
|
|
|
+ timestamp = -1
|
|
|
+
|
|
|
+ connect()
|
|
|
+
|
|
|
+ next = time.perf_counter() + 1
|
|
|
+ last_sent = 0
|
|
|
+
|
|
|
+ measurements = []
|
|
|
+
|
|
|
+ print("Waiting 20 seconds")
|
|
|
+ #time.sleep(20)
|
|
|
+
|
|
|
+ limit = 500
|
|
|
+ evtcount = 0
|
|
|
+
|
|
|
+ while evtcount < limit:
|
|
|
+
|
|
|
+ socks = dict(poller.poll(1))
|
|
|
+
|
|
|
+ if not socks:
|
|
|
+ # print("poll exits")
|
|
|
+ continue
|
|
|
+ if data_socket in socks:
|
|
|
+ print(".", end='', flush=True)
|
|
|
+ try:
|
|
|
+ # drop data immediately
|
|
|
+ message = data_socket.recv_multipart(zmq.NOBLOCK)
|
|
|
+ except zmq.ZMQError as err:
|
|
|
+ logger.error("Got error: {0}".format(err))
|
|
|
+ break
|
|
|
+
|
|
|
+ if message:
|
|
|
+ if len(message) < 2:
|
|
|
+ logger.info("No frames for message: ", message[0])
|
|
|
+ else:
|
|
|
+ try:
|
|
|
+ header = json.loads(message[1].decode('utf-8'))
|
|
|
+ #print(header)
|
|
|
+ if header['type'] == 'event':
|
|
|
+ if header['data_size'] > 0:
|
|
|
+ event = OpenEphysEvent(header['content'], message[2])
|
|
|
+ else:
|
|
|
+ event = OpenEphysEvent(header['content'])
|
|
|
+ if event.event_id == 1:
|
|
|
+ send_pps()
|
|
|
+
|
|
|
+ except ValueError as e:
|
|
|
+ logger.error("ValueError: ", e)
|
|
|
+ logger.info(message[1])
|
|
|
+ elif event_socket in socks and socket_waits_reply:
|
|
|
+ evtcount += 1
|
|
|
+ message = event_socket.recv()
|
|
|
+ print("+")
|