{ "cells": [ { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "import cv2\n", "import time\n", "import threading\n", "import multiprocessing as mp\n", "import matplotlib.pyplot as plt\n", "\n", "from situtils import FPSTimes" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "class WebcamStream(FPSTimes):\n", " # check camera output formats with FFMPEG\n", " # https://stackoverflow.com/questions/15301608/how-to-query-a-webcams-output-formats\n", " #\n", " # cd D:\\software\\ffmpeg\n", " # ffmpeg -list_devices true -f dshow -i dummy\n", " # ffmpeg -list_options true -f dshow -i video=\"HD USB Camera\"\n", " #\n", " # Also\n", " # https://stackoverflow.com/questions/39308664/opencv-cant-set-mjpg-compression-for-usb-camera\n", " \n", " default_cfg = {\n", " 'source': 0,\n", " 'frame_width': 1024,\n", " 'frame_height': 768,\n", " 'fps': 25,\n", " 'api': '', # 700 -> cv2.CAP_DSHOW\n", " 'verbose': True,\n", " }\n", " \n", " def __init__(self, cfg):\n", " super(WebcamStream, self).__init__()\n", " \n", " self.cfg = cfg\n", " self.frame = None\n", " self.frame_with_infos = None\n", " self.stopped = False\n", " \n", " # preparing a MPEG video stream\n", " self.stream = cv2.VideoCapture(cfg['source'], cfg['api']) if cfg['api'] else cv2.VideoCapture(cfg['source'])\n", " self.stream.set(cv2.CAP_PROP_FPS, cfg['fps'])\n", " time.sleep(1.5) # this helps to keep the FPS stable\n", " #self.stream.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'))\n", " self.stream.set(cv2.CAP_PROP_FRAME_WIDTH, cfg['frame_width'])\n", " self.stream.set(cv2.CAP_PROP_FRAME_HEIGHT, cfg['frame_height'])\n", " self.stream.set(cv2.CAP_PROP_FPS, cfg['fps'])\n", " self.stream.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')) # cv2.VideoWriter_fourcc('H', '2', '6', '4')\n", "\n", " def start(self):\n", " self._th = threading.Thread(target=self.update, args=())\n", " self._th.start()\n", "\n", " def stop(self):\n", " self.stopped = True\n", " time.sleep(0.3) # wait until device is released\n", " self._th.join()\n", " print('Camera released')\n", " \n", " def update(self):\n", " x_res = self.stream.get(cv2.CAP_PROP_FRAME_WIDTH)\n", " y_res = self.stream.get(cv2.CAP_PROP_FRAME_HEIGHT)\n", " fps = self.stream.get(cv2.CAP_PROP_FPS)\n", " print('Webcam stream %s:%s at %.2f FPS started' % (x_res, y_res, fps))\n", " \n", " while not self.stopped:\n", " (self.grabbed, self.frame) = self.stream.read()\n", " self.count() # count FPS\n", " #cv2.waitKey(50)\n", " \n", " self.stream.release()\n", " \n", " def read(self):\n", " return self.frame" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Testing streaming" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Webcam stream 1024.0:768.0 at 30.00 FPS started\n", "Camera released\n" ] }, { "ename": "KeyboardInterrupt", "evalue": "", "output_type": "error", "traceback": [ "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[1;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)", "\u001b[1;32m\u001b[0m in \u001b[0;36m\u001b[1;34m\u001b[0m\n\u001b[0;32m 10\u001b[0m \u001b[0mcv2\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mimshow\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m'Webcam'\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mframe\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 11\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 12\u001b[1;33m \u001b[0mk\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mcv2\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mwaitKey\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;36m33\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 13\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mk\u001b[0m \u001b[1;33m==\u001b[0m \u001b[0mord\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m'q'\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 14\u001b[0m \u001b[1;32mbreak\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;31mKeyboardInterrupt\u001b[0m: " ] } ], "source": [ "vs = WebcamStream(WebcamStream.default_cfg)\n", "vs.start() # stream runs in a separate thread\n", "\n", "try:\n", " while True:\n", " frame = vs.read()\n", " if frame is not None:\n", " frame = cv2.putText(frame, '%.2f FPS' % vs.get_avg_fps(), (10, 20), \n", " cv2.FONT_HERSHEY_DUPLEX, .5, (255, 255, 255))\n", " cv2.imshow('Webcam', frame)\n", "\n", " k = cv2.waitKey(33)\n", " if k == ord('q'):\n", " break\n", "\n", "finally:\n", " cv2.destroyAllWindows()\n", " vs.stop()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.8" } }, "nbformat": 4, "nbformat_minor": 4 }