{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import threading\n", "import time, os, json\n", "import cv2\n", "import matplotlib.pyplot as plt\n", "import multiprocessing as mp\n", "\n", "from situtils import FPSTimes" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "class PositionTracker(FPSTimes):\n", "\n", " default_cfg = {\n", " 'background_file': 'background.png',\n", " 'arena_x': 512,\n", " 'arena_y': 384,\n", " 'arena_radius': 350,\n", " \"max_fps\": 50,\n", " 'file_path': 'test_position_log.csv'\n", " }\n", " \n", " def __init__(self, status, video_stream, cfg):\n", " super(PositionTracker, self).__init__()\n", " \n", " self.status = status\n", " self.cfg = cfg\n", " self.video_stream = video_stream\n", " self.background = cv2.imread(cfg['background_file'], 1)\n", " self.x, self.y = None, None\n", " self.contour = []\n", " self.stopped = False\n", "\n", " self.mask = np.zeros(shape=self.background.shape, dtype=\"uint8\")\n", " cv2.circle(self.mask, (cfg['arena_x'], cfg['arena_y']), cfg['arena_radius'], (255,255,255), -1)\n", "\n", " with open(cfg['file_path'], 'w') as f:\n", " f.write(\"time,x,y\\n\")\n", "\n", " def start(self):\n", " self._th = threading.Thread(target=self.update, args=())\n", " self._th.start()\n", " \n", " def stop(self):\n", " self.stopped = True\n", " self._th.join()\n", " print('Position tracker stopped')\n", " \n", " def update(self):\n", " next_frame = time.time() + 1.0/self.cfg['max_fps']\n", " \n", " while not self.stopped:\n", " frame = self.video_stream.read()\n", " if frame is None:\n", " time.sleep(0.05)\n", " continue\n", " \n", " if time.time() < next_frame:\n", " time.sleep(0.001)\n", " continue\n", " \n", " self.count() # count FPS\n", " self.detect_position(frame)\n", " next_frame += 1.0/self.cfg['max_fps']\n", "\n", " if self.status.value == 2 and self.x is not None:\n", " with open(self.cfg['file_path'], 'a') as f:\n", " f.write(\",\".join([str(x) for x in (self.frame_times[-1], self.x, self.y)]) + \"\\n\") \n", " \n", " def detect_position(self, frame):\n", " masked_frame = cv2.bitwise_and(src1=frame, src2=self.mask)\n", "\n", " # Substracts background from current frame\n", " subject = cv2.subtract(masked_frame, self.background)\n", "\n", " # Converts subject to grey scale\n", " subject_gray = cv2.cvtColor(subject, cv2.COLOR_BGR2GRAY)\n", "\n", " # Applies blur and thresholding to the subject\n", " kernel_size = (25,25)\n", " frame_blur = cv2.GaussianBlur(subject_gray, kernel_size, 0)\n", " _, thresh = cv2.threshold(frame_blur, 20, 255, cv2.THRESH_BINARY)\n", "\n", " # Finds contours and selects the contour with the largest area\n", " contours, hierarchy = cv2.findContours(thresh.copy(), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)\n", "\n", " if len(contours) == 0:\n", " return\n", " contour = contours[np.argmax(list(map(cv2.contourArea, contours)))]\n", " M = cv2.moments(contour)\n", " if (M['m00'] == 0):\n", " return\n", "\n", " self.x = int(M['m10'] / M['m00'])\n", " self.y = int(M['m01'] / M['m00'])\n", " self.contour = contour" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Testing position detection" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Webcam stream 1024.0:768.0 at 20.00 FPS started\n", "Camera released\n", "Position tracker stopped\n" ] } ], "source": [ "#import nbimporter\n", "#from camera import WebcamStream\n", "\n", "with open(os.path.join('..', 'settings_light.json')) as json_file:\n", " cfg = json.load(json_file)\n", "\n", "# controller status: 1 - detecting, 2 - detecting + logging\n", "status = mp.Value('i', 1)\n", "\n", "# let's use a webcam stream\n", "vs = WebcamStream(cfg['camera'])\n", "vs.start() # stream runs in a separate thread\n", "\n", "# init controller\n", "pt_cfg = cfg['position']\n", "pt_cfg['background_file'] = os.path.join('..', 'assets', 'background_light.png')\n", "pt = PositionTracker(status, vs, pt_cfg)\n", "pt.start()\n", "\n", "try:\n", " while True:\n", " frame = vs.read()\n", " if frame is not None:\n", " masked_frame = cv2.bitwise_and(src1=frame, src2=pt.mask)\n", " #masked_frame = cv2.subtract(masked_frame, pt.background)\n", " #masked_frame = cv2.cvtColor(masked_frame, cv2.COLOR_BGR2GRAY)\n", " \n", " if pt.x is not None:\n", " color = (127, 255, 0) if status.value == 1 else (0, 0, 255)\n", " cv2.circle(masked_frame, (pt.x, pt.y), 10, color, -1)\n", " cv2.drawContours(masked_frame, [pt.contour], 0, color, 1, cv2.LINE_AA)\n", " \n", " cv2.imshow('Webcam', masked_frame)\n", "\n", " k = cv2.waitKey(33)\n", " if k == ord('q'):\n", " break\n", " \n", " if k == ord('s'):\n", " status.value = 2 if status.value == 1 else 1\n", "finally:\n", " cv2.destroyAllWindows()\n", " vs.stop(), pt.stop()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.8" } }, "nbformat": 4, "nbformat_minor": 4 }