{ "cells": [ { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline\n", "\n", "import numpy as np\n", "import threading\n", "import time, os, json\n", "import cv2\n", "import matplotlib.pyplot as plt\n", "import multiprocessing as mp\n", "\n", "from scipy import stats, signal\n", "from situtils import FPSTimes" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# TODO - write an Agent class and unify position/HD tracking for multiple agents" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "class PositionTrackerBase(FPSTimes):\n", " \n", " default_cfg = {\n", " \"single_agent\": True,\n", " \"background_light\": \"background_light.png\",\n", " \"background_dark\": \"background_dark.png\",\n", " \"threshold_light\": 60,\n", " \"threshold_dark\": 30,\n", " \"min_blob_size\": 100,\n", " \"subtract\": 1,\n", " \"arena_x\": 522,\n", " \"arena_y\": 372,\n", " \"arena_radius\": 330,\n", " \"floor_radius\": 287,\n", " \"max_fps\": 50,\n", " \"file_path\": \"positions.csv\",\n", " \"contour_path\": \"contours.csv\",\n", " \"floor_r_in_meters\": 0.46,\n", " \"angle_compensation\": -90,\n", " \"flip_x\": True,\n", " \"flip_y\": False\n", " }\n", " \n", " def __init__(self, status, video_stream, cfg):\n", " super(PositionTrackerBase, self).__init__()\n", " \n", " self.status = status\n", " self.cfg = cfg\n", " self.video_stream = video_stream\n", " self.bg_light = cv2.imread(cfg['background_light'], 1)\n", " self.bg_dark = cv2.imread(cfg['background_dark'], 1)\n", " self.background = self.bg_light # light by default\n", " self.is_light = True\n", " self.pixel_size = cfg['floor_r_in_meters'] / float(cfg['floor_radius'])\n", " self.stopped = False\n", " self.mask = np.zeros(shape=self.background.shape, dtype=\"uint8\")\n", " cv2.circle(self.mask, (cfg['arena_x'], cfg['arena_y']), cfg['arena_radius'], (255,255,255), -1)\n", "\n", " def reload_background(self):\n", " self.bg_light = cv2.imread(self.cfg['background_light'], 1)\n", " self.bg_dark = cv2.imread(self.cfg['background_dark'], 1)\n", " self.background = self.bg_light if self.is_light else self.bg_dark\n", " print('Position tracker - background reloaded')\n", "\n", " def switch_background(self):\n", " self.background = self.bg_dark if self.is_light else self.bg_light \n", " self.is_light = not self.is_light\n", " \n", " def px_to_meters(self, x, y):\n", " x_m = float(self.cfg['arena_x'] - x) * self.pixel_size * (-1 if self.cfg['flip_x'] else 1)\n", " y_m = float(self.cfg['arena_y'] - y) * self.pixel_size * (-1 if self.cfg['flip_y'] else 1)\n", " return x_m, y_m\n", "\n", " def meters_to_px(self, x, y):\n", " x_m = self.cfg['arena_x'] - (x / self.pixel_size) * (-1 if self.cfg['flip_x'] else 1)\n", " y_m = self.cfg['arena_y'] - (y / self.pixel_size) * (-1 if self.cfg['flip_y'] else 1)\n", " return int(x_m), int(y_m)\n", "\n", " def correct_angle(self, phi):\n", " return (2*np.pi - phi) + np.deg2rad(self.cfg['angle_compensation'])\n", "\n", " def is_inside(self, x, y, r):\n", " for pos in self.positions_in_m:\n", " if (pos[0] - x)**2 + (pos[1] - y)**2 <= r**2:\n", " return True\n", " return False\n", " \n", " def start(self):\n", " self._th = threading.Thread(target=self.update, args=())\n", " self._th.start()\n", " \n", " def stop(self):\n", " self.stopped = True\n", " self._th.join()\n", " print('Position tracker stopped')\n", " \n", " def update(self):\n", " next_frame = time.time() + 1.0/self.cfg['max_fps']\n", " \n", " while not self.stopped:\n", " frame = self.video_stream.read()\n", " if frame is None:\n", " time.sleep(0.05)\n", " continue\n", " \n", " if time.time() < next_frame:\n", " time.sleep(0.001)\n", " continue\n", " \n", " self.count() # count FPS\n", " self.detect_position(frame)\n", " next_frame += 1.0/self.cfg['max_fps']\n", "\n", " if self.status.value == 2 and self.positions_in_px is not None:\n", " self.save_position()\n", " self.save_contours() \n", "\n", " # generic interface\n", "\n", " def detect_position(self, frame):\n", " return NotImplemented\n", "\n", " def save_position(self):\n", " return NotImplemented\n", " \n", " def save_contours(self):\n", " return NotImplemented\n", " \n", " @property\n", " def positions_in_px(self):\n", " # a list of pairs [(Xi, Yi), ...] of actual positions for each agent tracked - in pixels\n", " return NotImplemented\n", " \n", " @property\n", " def positions_in_m(self):\n", " # a list of pairs [(Xi, Yi), ...] of actual positions for each agent tracked - in meters\n", " return NotImplemented\n", " \n", " @property\n", " def contours(self):\n", " return NotImplemented\n", " \n", " @property\n", " def speeds(self):\n", " return NotImplemented\n", " \n", " @property\n", " def hds(self):\n", " return NotImplemented" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "class PositionTrackerSingle(PositionTrackerBase):\n", " \n", " def __init__(self, status, video_stream, cfg):\n", " super(PositionTrackerSingle, self).__init__(status, video_stream, cfg)\n", " \n", " # an array of [[t1, X1, Y1, speed1, HD1], [t2, X2, Y2, speed2, HD2], ...] of 'history_duration' length\n", " # X, Y - in pixels, Speed - in m/s, head direction (HD) - in rad\n", " self._positions_list = None \n", " self._contour = None\n", " self._lr = None # linear regression of the contour\n", " \n", " width = 50 # 50 points ~= 1 sec with at 50Hz - to smooth the trajectory\n", " self.kernel = signal.gaussian(width, std=(width) / 7.2)\n", " \n", " with open(cfg['file_path'], 'w') as f:\n", " f.write(\"time,x,y\\n\")\n", " with open(cfg['contour_path'], 'w') as f:\n", " f.write(\"x:y,...\\n\")\n", " \n", " def detect_position(self, frame):\n", " masked_frame = cv2.bitwise_and(src1=frame, src2=self.mask)\n", "\n", " # Substracts background from current frame or takes absdiff\n", " if 'subtract' in self.cfg:\n", " if self.cfg['subtract'] > 0:\n", " subject = cv2.subtract(self.background, masked_frame)\n", " else:\n", " subject = cv2.subtract(self.background, masked_frame)\n", " else:\n", " subject = cv2.absdiff(masked_frame, self.background)\n", "\n", " # Converts subject to grey scale\n", " subject_gray = cv2.cvtColor(subject, cv2.COLOR_BGR2GRAY)\n", "\n", " # Applies blur and thresholding to the subject\n", " kernel_size = (25,25)\n", " frame_blur = cv2.GaussianBlur(subject_gray, kernel_size, 0)\n", " th = self.cfg['threshold_light'] if self.is_light else self.cfg['threshold_dark']\n", " _, thresh = cv2.threshold(frame_blur, th, 255, cv2.THRESH_BINARY)\n", "\n", " # Finds contours and selects the contour with the largest area\n", " contours, hierarchy = cv2.findContours(thresh.copy(), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)\n", "\n", " if len(contours) == 0:\n", " return\n", " contour = contours[np.argmax(list(map(cv2.contourArea, contours)))]\n", " M = cv2.moments(contour)\n", " if (M['m00'] == 0):\n", " return\n", "\n", " x, y = M['m10'] / M['m00'], M['m01'] / M['m00']\n", " if self._positions_list is None:\n", " self._positions_list = np.array([[time.time(), x, y, 0, 0]])\n", " else:\n", " t1 = time.time() - self.cfg['history_duration']\n", " idx = np.argmin(np.abs(self._positions_list[:, 0] - t1))\n", " last_hd = self._positions_list[-1][4]\n", " self._positions_list = np.concatenate([self._positions_list[idx:], [np.array([time.time(), x, y, 0, last_hd])]])\n", "\n", " # speed\n", " if len(self._positions_list) > len(self.kernel):\n", " x = (-self._positions_list[:, 1] + self.cfg['arena_x']) * self.pixel_size * (-1 if self.cfg['flip_x'] else 1)\n", " y = (-self._positions_list[:, 2] + self.cfg['arena_y']) * self.pixel_size * (-1 if self.cfg['flip_y'] else 1)\n", " \n", " # to avoid boundary effects\n", " x = np.concatenate([np.ones(int(len(self.kernel)/2) - 1) * x[0], x, np.ones(int(len(self.kernel)/2)) * x[-1]])\n", " y = np.concatenate([np.ones(int(len(self.kernel)/2) - 1) * y[0], y, np.ones(int(len(self.kernel)/2)) * y[-1]])\n", " \n", " x_smooth = np.convolve(x, self.kernel, 'valid') / self.kernel.sum() # valid mode to avoid boundary effects\n", " y_smooth = np.convolve(y, self.kernel, 'valid') / self.kernel.sum()\n", "\n", " dx = np.sqrt(np.square(np.diff(x_smooth)) + np.square(np.diff(y_smooth)))\n", " dt = np.diff(self._positions_list[:, 0])\n", " speed = np.concatenate([dx/dt, [dx[-1]/dt[-1]]])\n", " self._positions_list[:, 3] = speed # in m/s\n", " \n", " # head direction\n", " recent_traj = self._positions_list[self._positions_list[:, 0] > time.time() - 0.25]\n", " avg_speed = recent_traj[:, 3].mean()\n", " if avg_speed > self.cfg['hd_update_speed'] and len(recent_traj) > 3: # if animal runs basically\n", " x, y = recent_traj[0][1], recent_traj[0][2]\n", " vectors = [np.array([a[1], a[2]]) - np.array([x, y]) for a in recent_traj[1:]]\n", " avg_direction = np.array(vectors).sum(axis=0) / len(vectors)\n", "\n", " avg_angle = -np.arctan2(avg_direction[1], avg_direction[0])\n", " self._positions_list[-1][4] = avg_angle # in radians\n", " \n", " self._contour = contour # in pixels\n", " \n", " #ctr_in_px = np.array([x for x in zip(contour[:, 0, 0], contour[:, 0, 1])])\n", " #if ctr_in_px[:, 0].max() - ctr_in_px[:, 0].min() > ctr_in_px[:, 1].max() - ctr_in_px[:, 1].min():\n", " # slope, intercept, r_value, p_value, std_err = stats.linregress(ctr_in_px[:, 0], ctr_in_px[:, 1])\n", " # self.lr = {'slope': slope, 'intercept': intercept, 'r_value': r_value, 'p_value': p_value, 'std_err': std_err}\n", " #else:\n", " # slope, intercept, r_value, p_value, std_err = stats.linregress(ctr_in_px[:, 1], ctr_in_px[:, 0])\n", " # self.lr = {'slope': 1 - slope, 'intercept': intercept, 'r_value': r_value, 'p_value': p_value, 'std_err': std_err}\n", "\n", " def save_position(self):\n", " if self.positions_in_px is None:\n", " return\n", " with open(self.cfg['file_path'], 'a') as f:\n", " f.write(\",\".join([str(x) for x in (self.frame_times[-1], \\\n", " round(self.positions_in_m[0][0], 4), round(self.positions_in_m[0][1], 4))]) + \"\\n\") \n", " \n", " def save_contours(self):\n", " if self.contours is None:\n", " return\n", " ctr_in_m = np.array([self.px_to_meters(x, y) for x, y in zip(self._contour[:, 0, 0], self._contour[:, 0, 1])])\n", " data = [\"%.4f:%.4f\" % (x[0], x[1]) for x in ctr_in_m]\n", " with open(self.cfg['contour_path'], 'a+') as f: \n", " f.write(\",\".join(data) + \"\\n\")\n", " \n", " @property\n", " def positions_in_px(self):\n", " return [self._positions_list[-1][1:].astype('int32')] if self._positions_list is not None else None\n", " \n", " @property\n", " def positions_in_m(self):\n", " if self._positions_list is None:\n", " return None\n", " x = (self.cfg['arena_x'] - self._positions_list[-1][1]) * self.pixel_size * (-1 if self.cfg['flip_x'] else 1)\n", " y = (self.cfg['arena_y'] - self._positions_list[-1][2]) * self.pixel_size * (-1 if self.cfg['flip_y'] else 1)\n", " return [np.array([x, y])]\n", " \n", " @property\n", " def contours(self):\n", " return [self._contour] if self._contour is not None else None\n", " \n", " @property\n", " def speeds(self):\n", " return [self._positions_list[-1][3]] if self._positions_list is not None else None\n", " \n", " @property\n", " def hds(self):\n", " return [np.degrees(self._positions_list[-1][4])] if self._positions_list is not None else None" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "class PositionTrackerDouble(PositionTrackerBase):\n", " \n", " def __init__(self, status, video_stream, cfg):\n", " super(PositionTrackerDouble, self).__init__(status, video_stream, cfg)\n", "\n", " self._dist_array = []\n", " self._positions_list_1, self._positions_list_2 = None, None\n", " self._contour1, self._contour2 = [], []\n", " \n", " with open(cfg['file_path'], 'w') as f:\n", " f.write(\"time,x1,y1,x2,y2\\n\")\n", " with open(cfg['contour_path'], 'w') as f:\n", " f.write(\"x:y,...\\n\")\n", "\n", " def detect_position(self, frame):\n", " masked_frame = cv2.bitwise_and(src1=frame, src2=self.mask)\n", "\n", " # Substracts background from current frame or takes absdiff\n", " if 'subtract' in self.cfg:\n", " if self.cfg['subtract'] > 0:\n", " subject = cv2.subtract(self.background, masked_frame)\n", " else:\n", " subject = cv2.subtract(self.background, masked_frame)\n", " else:\n", " subject = cv2.absdiff(masked_frame, self.background)\n", "\n", " # Converts subject to grey scale\n", " subject_gray = cv2.cvtColor(subject, cv2.COLOR_BGR2GRAY)\n", "\n", " # Applies blur and thresholding to the subject\n", " kernel_size = (25,25)\n", " frame_blur = cv2.GaussianBlur(subject_gray, kernel_size, 0)\n", " th = self.cfg['threshold_light'] if self.is_light else self.cfg['threshold_dark']\n", " _, thresh = cv2.threshold(frame_blur, th, 255, cv2.THRESH_BINARY)\n", "\n", " # Finds contours and selects the contour with the largest area\n", " contours, hierarchy = cv2.findContours(thresh.copy(), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)\n", "\n", " # filter by size\n", " blobs = [cnt for cnt in contours if cv2.contourArea(cnt) > self.cfg['min_blob_size']]\n", " if len(blobs) == 0: # do nothing if nothing is identified\n", " return\n", " \n", " sizes = [int(cv2.contourArea(blob)) for blob in blobs]\n", " blobs = [blobs[i] for i in np.argsort(sizes)][:2] # two largest blobs\n", " \n", " if len(blobs) == 1: # when two objects are together\n", " blobs.append(np.array(blobs[0]))\n", " \n", " #if np.random.rand() > 0.5: # randomize for testing\n", " # blobs[0], blobs[1] = blobs[1], blobs[0]\n", " \n", " M1 = cv2.moments(blobs[0])\n", " M2 = cv2.moments(blobs[1])\n", " if M1['m00'] == 0 or M2['m00'] == 0:\n", " return\n", " \n", " xy1 = np.array([M1['m10'] / M1['m00'], M1['m01'] / M1['m00']]) # single pair\n", " xy2 = np.array([M2['m10'] / M2['m00'], M2['m01'] / M2['m00']]) # single pair\n", "\n", " if self._positions_list_1 is None: # first detected coordinates\n", " self._positions_list_1 = np.array([xy1]) # array of XY pairs\n", " self._positions_list_2 = np.array([xy2]) # array of XY pairs\n", "\n", " else:\n", " idx = 0 if len(self._positions_list_1) < 20 else 1 # append or shift position list, collecting position history\n", "\n", " # compute distance matrix: current XY1 and XY2 to previous positions\n", " dist_array = []\n", " for point in (xy1, xy2):\n", " for p_list in (self._positions_list_1, self._positions_list_2):\n", " distance = np.sqrt( ((p_list[:, 0] - point[0]).mean())**2 + \\\n", " ((p_list[:, 1] - point[1]).mean())**2 )\n", " dist_array.append(distance)\n", "\n", " self._dist_array = dist_array\n", "\n", " if np.argmin(np.array(dist_array)) < 1 or np.argmin(np.array(dist_array)) > 2: # 1 goes to 1, 2 to 2\n", " self._positions_list_1 = np.concatenate([self._positions_list_1[idx:], [xy1]])\n", " self._positions_list_2 = np.concatenate([self._positions_list_2[idx:], [xy2]])\n", " self._contour1, self._contour2 = blobs[0], blobs[1]\n", " else: # swap\n", " self._positions_list_1 = np.concatenate([self._positions_list_1[idx:], [xy2]])\n", " self._positions_list_2 = np.concatenate([self._positions_list_2[idx:], [xy1]])\n", " self._contour1, self._contour2 = blobs[1], blobs[0]\n", "\n", " def save_position(self):\n", " if self.positions_in_px is None:\n", " return \n", " with open(self.cfg['file_path'], 'a') as f: # save position\n", " data = self.positions_in_m\n", " f.write(\",\".join([str(x) for x in (self.frame_times[-1], \\\n", " round(data[0][0], 4), round(data[0][1], 4), round(data[1][0], 4), round(data[1][1], 4))]) + \"\\n\") \n", " \n", " def save_contours(self):\n", " if self.contours is None:\n", " return\n", " ctr1_in_m = np.array([self.px_to_meters(x, y) for x, y in zip(self._contour1[:, 0, 0], self._contour1[:, 0, 1])])\n", " ctr2_in_m = np.array([self.px_to_meters(x, y) for x, y in zip(self._contour2[:, 0, 0], self._contour2[:, 0, 1])])\n", " data1 = [\"%.4f:%.4f\" % (x[0], x[1]) for x in ctr1_in_m]\n", " data2 = [\"%.4f:%.4f\" % (x[0], x[1]) for x in ctr2_in_m]\n", " with open(self.cfg['contour_path'], 'a+') as f: \n", " f.write(\",\".join(data1) + \";\" + \",\".join(data2) + \"\\n\") \n", " \n", " @property\n", " def positions_in_px(self):\n", " if self._positions_list_1 is None:\n", " return None\n", " x1, y1 = self._positions_list_1[-1].astype('int32')\n", " x2, y2 = self._positions_list_2[-1].astype('int32')\n", " return np.array([[x1, y1], [x2, y2]])\n", " \n", " @property\n", " def positions_in_m(self):\n", " if self._positions_list_1 is None:\n", " return None\n", " x1 = (self.cfg['arena_x'] - self._positions_list_1[-1][0]) * self.pixel_size * (-1 if self.cfg['flip_x'] else 1)\n", " y1 = (self.cfg['arena_y'] - self._positions_list_1[-1][1]) * self.pixel_size * (-1 if self.cfg['flip_y'] else 1)\n", " x2 = (self.cfg['arena_x'] - self._positions_list_2[-1][0]) * self.pixel_size * (-1 if self.cfg['flip_x'] else 1)\n", " y2 = (self.cfg['arena_y'] - self._positions_list_2[-1][1]) * self.pixel_size * (-1 if self.cfg['flip_y'] else 1)\n", " return np.array([[x1, y1], [x2, y2]])\n", " \n", " @property\n", " def contours(self): # always in pixels\n", " return [self._contour1, self._contour2] if self._contour1 is not None else None\n", " \n", " @property\n", " def speeds(self):\n", " # Not Implemented\n", " return [0, 0] if self.positions_in_px is not None else None\n", " \n", " @property\n", " def hds(self):\n", " # Not Implemented\n", " return [0, 0] if self.positions_in_px is not None else None" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Testing position detection" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "with open(os.path.join('..', 'profiles', 'default.json')) as json_file:\n", " cfg = json.load(json_file)\n", "\n", "pt_cfg = cfg['position']\n", "pt_cfg['background_light'] = os.path.join('..', 'assets', 'background_light.png')\n", "pt_cfg['background_dark'] = os.path.join('..', 'assets', 'background_dark.png')" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Webcam stream 1024.0:768.0 at 30.00 FPS started\n", "Camera released\n", "Position tracker stopped\n" ] } ], "source": [ "#import nbimporter\n", "#from camera import WebcamStream\n", "\n", "# controller status: 1 - detecting, 2 - detecting + logging\n", "status = mp.Value('i', 1)\n", "\n", "# let's use a webcam stream\n", "vs = WebcamStream(cfg['camera'])\n", "vs.start() # stream runs in a separate thread\n", "\n", "# init controller\n", "#pt = DoublePositionTracker(status, vs, pt_cfg)\n", "pt = PositionTrackerSingle(status, vs, pt_cfg)\n", "pt.start()\n", "kernel_size = (25,25)\n", "\n", "try:\n", " while True:\n", " frame = vs.read()\n", " if frame is not None:\n", " masked_frame = cv2.bitwise_and(src1=frame, src2=pt.mask)\n", " frame_to_save = masked_frame.copy()\n", " #masked_frame = cv2.absdiff(masked_frame, pt.background)\n", " #masked_frame = cv2.subtract(pt.background, masked_frame)\n", " #masked_frame = cv2.subtract(masked_frame, pt.background)\n", " #masked_frame = cv2.cvtColor(masked_frame, cv2.COLOR_BGR2GRAY)\n", " #masked_frame = cv2.GaussianBlur(masked_frame, kernel_size, 0)\n", " \n", " cv2.putText(masked_frame, 'Position: %.2f FPS' % pt.get_avg_fps(), \n", " (10, 80), cv2.FONT_HERSHEY_DUPLEX, .5, (255, 255, 255)) \n", " \n", " if pt.positions_in_px is not None:\n", " color1 = (127, 255, 0)\n", " #color2 = (0, 0, 255)\n", " cv2.circle(masked_frame, (pt.positions_in_px[0][0], pt.positions_in_px[0][1]), 2, color1, -1)\n", " #cv2.circle(masked_frame, (pt.xy2_in_px[0], pt.xy2_in_px[1]), 2, color2, -1)\n", " \n", " cv2.drawContours(masked_frame, [pt.contours[0]], 0, color1, 1, cv2.LINE_AA)\n", " #cv2.drawContours(masked_frame, [pt.contour2], 0, color2, 1, cv2.LINE_AA)\n", " \n", " cv2.putText(masked_frame, 'Animal1: %.3f %.3f %d' % (pt.positions_in_m[0][0], pt.positions_in_m[0][1], \\\n", " cv2.contourArea(pt.contours[0])), (10, 40), cv2.FONT_HERSHEY_DUPLEX, 0.5, (255, 255, 255))\n", " cv2.putText(masked_frame, 'History: %d' % (len(pt._positions_list)), \\\n", " (10, 60), cv2.FONT_HERSHEY_DUPLEX, 0.5, (255, 255, 255))\n", " \n", " # regression line\n", " #if pt.lr is not None:\n", " # x1, x2 = pt.contour[:, 0, 0].min() - 10, pt.contour[:, 0, 0].max() + 10\n", " # y1, y2 = int(x1 * pt.lr['slope'] + pt.lr['intercept']), int(x2 * pt.lr['slope'] + pt.lr['intercept'])\n", " # cv2.line(frame, (x1, y1), (x2, y2), color, 1)\n", " \n", " cv2.putText(masked_frame, 'L' if pt.is_light else 'D', (10, 20), cv2.FONT_HERSHEY_DUPLEX, .5, (255, 255, 255)) \n", " cv2.imshow('Webcam', masked_frame)\n", "\n", " k = cv2.waitKey(33)\n", " if k == ord('q'):\n", " break\n", " \n", " if k == ord('s'):\n", " status.value = 2 if status.value == 1 else 1\n", "\n", " if k == ord('d'):\n", " pt.switch_background()\n", " \n", " if k == ord('r'):\n", " cv2.imwrite('frame.jpg', frame_to_save)\n", " \n", "finally:\n", " cv2.destroyAllWindows()\n", " vs.stop(), pt.stop()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Image sandbox" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "masked_frame = cv2.imread('frame.jpg')\n", "bg_light = cv2.imread(cfg['position']['background_light'], 1)\n", "subjects = cv2.subtract(bg_light, masked_frame)\n", "subjects_gray = cv2.cvtColor(subjects, cv2.COLOR_BGR2GRAY)\n", "\n", "kernel_size = (25,25)\n", "subjects_blur = cv2.GaussianBlur(subjects_gray, kernel_size, 0)\n", "th = cfg['position']['threshold_light'] # in light\n", "_, thresh = cv2.threshold(subjects_blur, th, 255, cv2.THRESH_BINARY)\n", "\n", "contours, hierarchy = cv2.findContours(thresh.copy(), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)\n", "\n", "contour = contours[np.argmax(list(map(cv2.contourArea, contours)))]" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "plt.imshow(masked_frame)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "plt.imshow(bg_light)" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "plt.imshow(subjects)" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "410.0" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "cv2.contourArea(contours[1])" ] }, { "cell_type": "code", "execution_count": 49, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "2" ] }, "execution_count": 49, "metadata": {}, "output_type": "execute_result" } ], "source": [ "a1 = np.array([12, 56, 5, 5])\n", "np.argmin(a1)" ] }, { "cell_type": "code", "execution_count": 40, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 40, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "plt.scatter(contours[0][:, 0][:, 0], contours[0][:, 0][:, 1])" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([3, 1, 2], dtype=int64)" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "x.astype('int64')" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([1, 2, 0], dtype=int64)" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "x = np.array([3, 1, 2])\n", "idxs = np.argsort(x)\n", "idxs" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([1, 2, 3])" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "x[idxs]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "image = cv2.imread('frame.jpg')\n", "#plt.imshow(image)" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([0, 0, 0], dtype=uint8)" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "image[0][0]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "color = ('b','g','r')\n", "for channel, col in enumerate(color):\n", " histr = cv2.calcHist([image],[channel],None,[256],[0,256])\n", " plt.plot(histr, color = col)\n", " plt.xlim([0,256])\n", " plt.ylim([0, 1000])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Contour test" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "x, y = contour[0][0][0], contour[0][0][1]\n", "pt.px_to_meters(x, y), len(contour)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ctr_in_m = np.array([pt.px_to_meters(x, y) for x, y in zip(contour[:, 0, 0], contour[:, 0, 1])])\n", "ctr_in_px = np.array([x for x in zip(contour[:, 0, 0], contour[:, 0, 1])])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "from scipy import stats\n", "\n", "x = ctr_in_px[:, 0]\n", "y = ctr_in_px[:, 1]\n", "\n", "slope, intercept, r_value, p_value, std_err = stats.linregress(x,y)\n", "x_line = np.linspace(x.min(), x.max(), 10)\n", "y_line = x_line * slope + intercept\n", "\n", "plt.scatter(x, y)\n", "plt.plot(x_line, y_line)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "filename = 'D:\\\\runSIT\\\\sessions\\\\52_aSIT_2021-11-15_11-12-00\\\\contours.csv'\n", "with open(filename) as ff:\n", " data = ff.readlines()\n", "\n", "headers = data[0] # skip headers line\n", "contours = [[(float(x.split(':')[0]), float(x.split(':')[1])) for x in contour.split(',')] for contour in data[1:]]\n", "contours = [np.array(contour) for contour in contours]" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[]" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "plt.plot(contours[0][:, 0], contours[0][:, 1])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.8" } }, "nbformat": 4, "nbformat_minor": 4 }