{ "cells": [ { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "import cv2\n", "import time, os\n", "import threading\n", "import numpy as np\n", "import multiprocessing as mp\n", "import matplotlib.pyplot as plt\n", "\n", "from situtils import FPSTimes" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "class VideoWriter(FPSTimes):\n", " # precise timing https://stackoverflow.com/questions/42565297/precise-loop-timing-in-python\n", " \n", " default_cfg = {\n", " 'fps': 20,\n", " 'file_path': 'test_video.avi',\n", " }\n", " \n", " def __init__(self, status, video_stream, cfg):\n", " super(VideoWriter, self).__init__()\n", " \n", " self.cfg = cfg\n", " self.video_stream = video_stream # cv2 video stream\n", " self.stopped = False\n", " self.status = status\n", " \n", " fourcc = cv2.VideoWriter_fourcc(*'XVID')\n", " res_x, res_y = int(video_stream.stream.get(3)), int(video_stream.stream.get(4))\n", " self.out = cv2.VideoWriter(cfg['file_path'], fourcc, cfg['fps'], (res_x, res_y))\n", "\n", " @property\n", " def latency(self):\n", " return 1.0/self.cfg['fps']\n", " \n", " def start(self):\n", " self._th = threading.Thread(target=self.update, args=())\n", " self._th.start()\n", " \n", " def stop(self):\n", " self.stopped = True\n", " time.sleep(0.2) # wait until device is released\n", " self._th.join()\n", " print('Video writer stopped')\n", " \n", " def update(self):\n", " next_frame = time.time() + self.latency\n", " \n", " while not self.stopped:\n", " if self.status.value == 2:\n", " if time.time() < next_frame:\n", " continue # TODO sleep here?\n", "\n", " #frame = self.video_stream.read()\n", " frame = self.video_stream.frame_with_infos\n", " if frame is not None:\n", " self.count() # count FPS\n", " self.out.write(frame)\n", "\n", " frames_missed = np.floor((time.time() - next_frame) * self.cfg['fps'])\n", " next_frame += self.latency * frames_missed + self.latency\n", " else:\n", " time.sleep(0.05)\n", " \n", " self.out.release()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Testing video recording" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Webcam stream 960.0:720.0 at 20.00 FPS started\n", "Camera released\n", "Video writer stopped\n" ] } ], "source": [ "#import nbimporter\n", "#from camera import WebcamStream\n", "\n", "# let's use a webcam stream\n", "vs = WebcamStream(WebcamStream.default_cfg)\n", "vs.start() # stream runs in a separate thread\n", "\n", "# recording status: 1 - idle, 2 - recording\n", "status = mp.Value('i', 1)\n", "\n", "# start/stop recording by pressing 's'\n", "vw = VideoWriter(status, vs, VideoWriter.default_cfg)\n", "vw.start()\n", "\n", "try:\n", " while True:\n", " frame = vs.read()\n", " if frame is not None:\n", " color = (127, 255, 0) if status.value == 1 else (0, 0, 255)\n", " frame = cv2.circle(frame, (20, 20), 15, color, -1)\n", " cv2.imshow('Webcam', frame)\n", "\n", " k = cv2.waitKey(33)\n", " if k == ord('q'):\n", " break\n", " \n", " if k == ord('s'):\n", " status.value = 2 if status.value == 1 else 1\n", "finally:\n", " cv2.destroyAllWindows()\n", " vs.stop(), vw.stop()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.8" } }, "nbformat": 4, "nbformat_minor": 4 }