映像デバイス

Sora Python SDK では映像デバイスを扱う機能を提供していません。

代わりに、OpenCV の Python バインディングを使用することで、 映像デバイスのキャプチャや表示などを行うことができます。

OpenCV のインストール

pip

$ pip install opencv-python

rye

$ rye add opencv-python
$ rye sync

利用可能な映像デバイス一覧を取得する

利用可能な映像デバイス一覧を取得するには、 cv2.VideoCapture クラスを使用します。

import cv2


def get_available_video_devices(max_video_devices: int = 10):
    available_video_devices: list[int] = []
    for i in range(max_video_devices):
        cap = cv2.VideoCapture(i)
        if cap is None or not cap.isOpened():
            print("カメラが利用できません:", i)
        else:
            print("カメラが利用できます:", i)
            available_video_devices.append(i)
        cap.release()
    return available_video_devices


if __name__ == "__main__":
    get_available_video_devices()

OpenCV ではカメラデバイス名を取得することができません。 カメラデバイス名まで取得したい場合は FFmpeg の記事が参考になりますので、ご参考ください。

Capture/Webcam - FFmpeg

映像デバイスをキャプチャして表示する

cv2.imshow を利用する事で、映像デバイスからキャプチャした映像を表示することができます。

import cv2


def capture_and_display(camera_id: int, width: int, height: int) -> None:
    # カメラデバイスを開く
    cap = cv2.VideoCapture(camera_id)

    if not cap.isOpened():
        print(f"Cannot open camera {camera_id}")
        return

    # 解像度を設定
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)

    try:
        while True:
            # フレームをキャプチャ
            ret, frame = cap.read()

            # 正しくフレームが読み込まれた場合のみ表示
            if ret:
                cv2.imshow("Camera Feed", frame)

            # 'q' キーが押されたらループを抜ける
            if cv2.waitKey(1) == ord("q"):
                break
    except KeyboardInterrupt:
        print("Interrupted by user")
    finally:
        # キャプチャの後始末とウィンドウをすべて閉じる
        cap.release()
        cv2.destroyAllWindows()


if __name__ == "__main__":
    # 例: カメラデバイス ID 0 を使用し、解像度を 640x480 に設定
    capture_and_display(0, 640, 480)

映像デバイスをキャプチャして送信する

create_video_source を利用する事で、映像デバイスからキャプチャした映像を Sora に送信することができます。

import json
import os
import threading
from threading import Event
from typing import Any, Dict, List

import cv2
from sora_sdk import Sora, SoraConnection, SoraSignalingErrorCode, SoraVideoSource


class Sendonly:
    def __init__(self, signaling_urls: List[str], channel_id: str, camera_id: int):
        self._signaling_urls: List[str] = signaling_urls
        self._channel_id: str = channel_id

        self.connection_id: str = ""

        self._connected: Event = Event()
        self._closed: bool = False

        self._video_height: int = 480
        self._video_width: int = 640

        self._sora: Sora = Sora()
        self._video_source: SoraVideoSource = self._sora.create_video_source()
        self._video_capture = cv2.VideoCapture(camera_id)
        if not self._video_capture.isOpened():
            raise RuntimeError(f"カメラが開けません: camera_id={camera_id}")

        # Sora への接続設定
        self._connection: SoraConnection = self._sora.create_connection(
            signaling_urls=signaling_urls,
            role="sendonly",
            channel_id=channel_id,
            audio=False,
            video=True,
            video_source=self._video_source,
        )

        self._connection.on_set_offer = self._on_set_offer
        self._connection.on_notify = self._on_notify
        self._connection.on_disconnect = self._on_disconnect

    def connect(self):
        # Sora へ接続
        self._connection.connect()

        self._video_input_thread = threading.Thread(
            target=self._video_input_loop, daemon=True
        )
        self._video_input_thread.start()

        # 接続が成功するまで待つ
        assert self._connected.wait(10), "接続に失敗しました"

        return self

    def _video_input_loop(self):
        while not self._closed:
            ret, frame = self._video_capture.read()
            if ret:
                self._video_source.on_captured(frame)

            if cv2.waitKey(1) == ord("q"):
                break

    def disconnect(self):
        self._connection.disconnect()
        self._video_input_thread.join(10)

        # キャプチャの後始末とウィンドウを全て閉じる
        self._video_capture.release()
        cv2.destroyAllWindows()

    def _on_notify(self, raw_message: str):
        # シグナリング通知のコールバック
        message = json.loads(raw_message)
        # event_type が connection.created で、
        # connection_id が自分の connection_id と一致する場合、接続が成功
        if (
            message["type"] == "notify"
            and message["event_type"] == "connection.created"
            and message["connection_id"] == self.connection_id
        ):
            print(f"Sora に接続しました: connection_id={self.connection_id}")
            # 接続が成功したら connected をセット
            self._connected.set()

    def _on_set_offer(self, raw_message: str):
        # シグナリング type: offer のコールバック
        message: Dict[str, Any] = json.loads(raw_message)
        # "type": offer に自分の connection_id が入ってくるので取得しておく
        if message["type"] == "offer":
            self.connection_id = message["connection_id"]

    def _on_disconnect(self, error_code: SoraSignalingErrorCode, message: str):
        # 切断時のコールバック
        print(f"Sora から切断されました: error_code={error_code}, message={message}")
        self._closed = True
        # 切断完了で connected をクリア
        self._connected.clear()

    def run(self):
        try:
            # 接続を維持
            while not self._closed:
                pass
        except KeyboardInterrupt:
            # キーボード割り込みの場合
            pass
        finally:
            # 接続の切断
            if self._connection:
                self.disconnect()


def main():
    # 環境変数からシグナリング URL とチャネル ID を取得
    signaling_url = os.getenv("SORA_SIGNALING_URL")
    channel_id = os.getenv("SORA_CHANNEL_ID")

    # signaling_url はリストである必要があるので、リストに変換
    signaling_urls = [signaling_url]

    # camera_id は 0 で固定
    sample = Sendonly(signaling_urls, channel_id, 0)

    # Sora へ接続
    sample.connect()

    sample.run()


if __name__ == "__main__":
    main()
© Copyright 2024, Shiguredo Inc. Created using Sphinx 7.3.7