In [10]:
import numpy as np
import threading
import time, os
import cv2
import matplotlib.pyplot as plt
import multiprocessing as mp

import nbimporter
from utils import FPSTimes
from camera import WebcamStream

In [11]:
class PositionTracker(FPSTimes):

 default_cfg = {
 'background_file': 'background.png',
 'arena_x': 512,
 'arena_y': 384,
 'arena_radius': 350,
 'log_file': 'test_position_log.csv'
 }
 
 def __init__(self, status, video_stream, cfg):
 super(PositionTracker, self).__init__()
 
 self.status = status
 self.cfg = cfg
 self.video_stream = video_stream
 self.background = cv2.imread(cfg['background_file'], 1)
 self.x, self.y = None, None
 self.contour = []
 self.stopped = False

 self.mask = np.zeros(shape=self.background.shape, dtype="uint8")
 cv2.circle(self.mask, (cfg['arena_x'], cfg['arena_y']), cfg['arena_radius'], (255,255,255), -1)

 with open(cfg['log_file'], 'w') as f:
 f.write("time,x,y\n")

 def start(self):
 self._th = threading.Thread(target=self.update, args=())
 self._th.start()
 
 def stop(self):
 self.stopped = True
 self._th.join()
 print('Position tracker stopped')
 
 def update(self):
 while not self.stopped:
 frame = self.video_stream.read()
 if frame is None:
 time.sleep(0.05)
 continue
 
 self.count() # count FPS
 self.detect_position(frame)

 if self.status.value == 2:
 with open(self.cfg['log_file'], 'a') as f:
 f.write(",".join([str(x) for x in (self.frame_times[-1], self.x, self.y)]) + "\n") 
 
 def detect_position(self, frame):
 masked_frame = cv2.bitwise_and(src1=frame, src2=self.mask)

 # Substracts background from current frame
 subject = cv2.subtract(masked_frame, self.background)

 # Converts subject to grey scale
 subject_gray = cv2.cvtColor(subject, cv2.COLOR_BGR2GRAY)

 # Applies blur and thresholding to the subject
 kernel_size = (25,25)
 frame_blur = cv2.GaussianBlur(subject_gray, kernel_size, 0)
 _, thresh = cv2.threshold(frame_blur, 40, 255, cv2.THRESH_BINARY)

 # Finds contours and selects the contour with the largest area
 contours, hierarchy = cv2.findContours(thresh.copy(), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)

 if len(contours) == 0:
 return
 contour = contours[np.argmax(list(map(cv2.contourArea, contours)))]
 M = cv2.moments(contour)
 if (M['m00'] == 0):
 return

 self.x = int(M['m10'] / M['m00'])
 self.y = int(M['m01'] / M['m00'])
 self.contour = contour

### Testing position detection

In [14]:
# controller status: 1 - detecting, 2 - detecting + logging
status = mp.Value('i', 1)

# let's use a webcam stream
vs = WebcamStream(WebcamStream.default_cfg)
vs.start() # stream runs in a separate thread

# init controller
pt_cfg = PositionTracker.default_cfg
pt_cfg['background_file'] = os.path.join('..', 'assets', 'background.png')
pt = PositionTracker(status, vs, pt_cfg)
pt.start()

try:
 while True:
 frame = vs.read()
 if frame is not None:
 masked_frame = cv2.bitwise_and(src1=frame, src2=pt.mask)
 #masked_frame = cv2.subtract(masked_frame, pt.background)
 #masked_frame = cv2.cvtColor(masked_frame, cv2.COLOR_BGR2GRAY)
 
 if pt.x is not None:
 color = (127, 255, 0) if status.value == 1 else (0, 0, 255)
 cv2.circle(masked_frame, (pt.x, pt.y), 10, color, -1)
 cv2.drawContours(masked_frame, [pt.contour], 0, color, 1, cv2.LINE_AA)
 
 cv2.imshow('Webcam', masked_frame)

 k = cv2.waitKey(33)
 if k == ord('q'):
 break
 
 if k == ord('s'):
 status.value = 2 if status.value == 1 else 1
finally:
 cv2.destroyAllWindows()
 vs.stop(), pt.stop()

Webcam stream 960.0:720.0 at 20.00 FPS started
Camera released
Position tracker stopped
