pyvirtualcam を使って流星検出しながらのライブ配信

こちらも過去に X (旧Twitter)に投げた内容の書き直しです。

AtomCam といったリモートの動画像ストリームを入力として流星検出処理を行い pyvirtualcam を利用して仮想カメラに出力します。 これによって、流星検出フィルタ処理を行った結果を仮想カメラの出力となり、その仮想カメラの出力を OBS といった動画配信ツールの入力として扱う事ができます。 今回は検出処理のハフ変換で得られる直線成分の二点を用いて OpenCV の rectangle で四角形を描画し、流星をくくるような描画を行っています。

実装は先日の流星検出を実装を連続するストリームに対して動作するように書き換え、結果を pyvirtualcam の Camera コンテキストに送っているだけです。 入力される動画像を buffer に溜めながら差分比較明合成画像を作成し、buffer に fps の数の画像、つまり 1秒間の画像が溜まったら差分比較明合成画像に対して直線成分検出を行い、検出されれば四角形のマーカーを描画してカメラに送ると言った処理になります。

import pyvirtualcam
import cv2

capture = cv2.VideoCapture("rtsp://atomcam.local:8554/video0_unicast")
width = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(capture.get(cv2.CAP_PROP_FPS))
camera = pyvirtualcam.Camera(width, height, fps, fmt=pyvirtualcam.PixelFormat.BGR, device="/dev/video2")

s, cache = capture.read()
lighten = cv2.UMat(height, width, type=cv2.CV_8UC3, s=(0))
buffer = []
while s:
    s, frame = capture.read()
    if s:
        buffer.append(frame)
        difference = cv2.subtract(frame, cache)
        lighten = cv2.max(lighten, difference)
        cache = frame
        if len(buffer) > fps:
            g = cv2.GaussianBlur(lighten, (5, 5), 0)
            c = cv2.Canny(g, 100, 200, 3)
            r = cv2.HoughLinesP(c, 1, 3.141592653/180, 25, minLineLength=30, maxLineGap=5).get()

            for f in buffer:
                if r is not None:
                    for p in r:
                        p = p[0]
                        cv2.rectangle(f, pt1=(p[0], p[1]), pt2=(p[2], p[3]), color=(0, 255, 0)) 
                camera.send(f)
                camera.sleep_until_next_frame()

            lighten = cv2.UMat(height, width, type=cv2.CV_8UC3, s=(0))
            buffer.clear()

camera.close()
capture.release()

ここでは Linux 環境で v4l2loopback を利用しています。ですので pyvirtualcam.Camera の引数に device="/dev/video2" を指定しています。 WindowsMac では OBS の仮想カメラ機能を利用して pyvirtualcam を利用することができるようです。