zmq_socket.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. import zmq
  2. import msgpack as serializer
  3. print('Opening ports')
  4. class ZMQsocket:
  5. def __init__(self, remote_ip, port):
  6. """
  7. Setup zmq socket, context, and remote helper.
  8. Parameters:
  9. port (float): Specified port to connect to. Defaults to 50020.
  10. """
  11. self.ip = remote_ip
  12. self.port = port
  13. self.ctx = zmq.Context()
  14. self.socket = zmq.Socket(self.ctx, zmq.REQ) # this is pub socket
  15. def connect(self):
  16. """
  17. Connects to defined zmq socket.
  18. """
  19. self.socket.connect(f'tcp://{self.ip}:{self.port}')
  20. self.socket.send_string('PUB_PORT')
  21. self.pub_port = self.socket.recv_string()
  22. self.pub_socket = zmq.Socket(self.ctx, zmq.PUB)
  23. self.pub_socket.connect(f"tcp://{self.ip}:{self.pub_port}")
  24. def start_calibration(self):
  25. """
  26. Starts pupil calibration procedure.
  27. """
  28. self.socket.send_string('C')
  29. return self.socket.recv_string()
  30. def stop_calibration(self):
  31. """
  32. Stops pupil calibration procedure.
  33. """
  34. self.socket.send_string('c')
  35. return self.socket.recv_string()
  36. def start_recording(self, dir_name=None):
  37. """
  38. Starts pupil recording.
  39. Parameters:
  40. dir_name (str): Optional name for directory
  41. """
  42. if dir_name:
  43. self.socket.send_string(f'R {dir_name}')
  44. else:
  45. self.socket.send_string('R')
  46. return self.socket.recv_string()
  47. def stop_recording(self):
  48. """
  49. Stops pupil recording.
  50. """
  51. self.socket.send_string('r')
  52. return self.socket.recv_string()
  53. def set_time(self, time_fn):
  54. """
  55. Sets the time in pupil.
  56. Parameters:
  57. time (float): Time to set to.
  58. """
  59. self.time_fn = time_fn
  60. self.socket.send_string(f'T {time_fn()}')
  61. return self.socket.recv_string()
  62. # send notification:
  63. def notify(self, notification):
  64. """Sends ``notification`` to Pupil Remote"""
  65. topic = 'notify.' + notification['subject']
  66. payload = serializer.dumps(notification, use_bin_type=True)
  67. self.socket.send_string(topic, flags=zmq.SNDMORE)
  68. self.socket.send(payload)
  69. return self.socket.recv_string()
  70. #test notification, note that you need to listen on the IPC to receive notifications!
  71. #notify({'subject':"calibration.should_start"})
  72. #notify({'subject':"calibration.should_stop"})
  73. # TODO fix sending annotations
  74. def send_trigger(self, trigger):
  75. """
  76. Sends a trigger object pub_socket
  77. """
  78. payload = serializer.dumps(trigger, use_bin_type=True)
  79. self.pub_socket.send_string(trigger['topic'], flags=zmq.SNDMORE)
  80. self.pub_socket.send(payload)
  81. def new_trigger(self, topic, label, duration):
  82. """
  83. Creates a trigger dictionary object (make sure set_time() has been invoked)
  84. """
  85. return {
  86. "topic": topic,
  87. "label": label,
  88. "timestamp": self.time_fn(),
  89. "duration": duration,
  90. }
  91. def annotation(self, label, duration):
  92. """
  93. Shortcut to sending an annotation to pupil remote (make sure set_time() has been invoked)
  94. """
  95. self.send_trigger(self.new_trigger('annotation', label, duration))
  96. print('Done!')