サンプル

Python バージョン:

3.10 以降

音声と映像を送受信

import json
import os
import threading
import time
from threading import Event
from typing import Any, Dict, List, Optional

import numpy as np
from sora_sdk import (
    Sora,
    SoraAudioSink,
    SoraAudioSource,
    SoraConnection,
    SoraSignalingErrorCode,
    SoraTrackInterface,
    SoraVideoSink,
    SoraVideoSource,
)


class Sendrecv:
    def __init__(self, signaling_urls: List[str], channel_id: str):
        """
        Sendrecv クラスのコンストラクタ

        :param signaling_urls: シグナリングサーバーの URL リスト
        :param channel_id: 接続するチャンネルの ID
        """
        # シグナリング URL、ロール、チャネル ID を初期化
        self._signaling_urls: List[str] = signaling_urls
        self._channel_id: str = channel_id

        self._role: str = "sendrecv"

        self._connection_id: Optional[str] = None

        self._connected: Event = Event()
        self._closed: bool = False

        self._video_height: int = 480
        self._video_width: int = 640

        self._audio_sink: Optional[SoraAudioSink] = None
        self._video_sink: Optional[SoraVideoSink] = None

        # Sora インスタンスの生成
        self._sora: Sora = Sora()

        self._audio_source: SoraAudioSource = self._sora.create_audio_source(
            sample_rate=48000, channels=1
        )
        self._video_source: SoraVideoSource = self._sora.create_video_source()

        # Sora への接続設定
        self._connection: SoraConnection = self._sora.create_connection(
            signaling_urls=self._signaling_urls,
            role=self._role,
            channel_id=self._channel_id,
            # create_connection するタイミングで audio_source と video_source を指定する
            audio_source=self._audio_source,
            video_source=self._video_source,
        )

        # コールバックの登録
        self._connection.on_set_offer = self._on_set_offer
        self._connection.on_notify = self._on_notify
        self._connection.on_disconnect = self._on_disconnect
        self._connection.on_track = self._on_track

    def connect(self) -> "Sendrecv":
        """
        Sora へ接続する

        :return: 接続が成功した場合、自身のインスタンス
        :raises AssertionError: 接続に失敗した場合
        """
        # Sora へ接続
        self._connection.connect()

        self._audio_input_thread = threading.Thread(
            target=self._audio_input_loop, daemon=True
        )
        self._audio_input_thread.start()

        self._video_input_thread = threading.Thread(
            target=self._video_input_loop, daemon=True
        )
        self._video_input_thread.start()

        # 接続完了まで待機
        assert self._connected.wait(10), "接続に失敗しました"

        # 統計情報の定期取得用スレッド
        self._stats_thread = threading.Thread(target=self._stats_loop, daemon=True)

        return self

    def _audio_input_loop(self) -> None:
        """ダミー音声を生成するループ"""
        # パラメータ
        sample_rate = 16000  # サンプリングレート (Hz)
        freq = 440  # 周波数 (Hz)
        duration = 0.02  # 時間 (秒)
        amplitude = 0.25  # 振幅

        # 時間配列を生成
        t = np.arange(int(sample_rate * duration)) / sample_rate

        while not self._closed:
            # sin 波を生成
            sin_wave = np.sin(2 * np.pi * freq * t)

            # sin 波を 16 ビット整数に変換
            sin_wave_int16 = np.int16(sin_wave * 32767 * amplitude)

            # sin 波を送信
            self._audio_source.on_data(sin_wave_int16.reshape(-1, 1))

            # 次のサイクルのために時間を進める
            t += duration

    def _video_input_loop(self) -> None:
        """ダミー映像を生成するループ"""
        while not self._closed:
            time.sleep(1.0 / 30)
            self._video_source.on_captured(
                np.zeros((self._video_height, self._video_width, 3), dtype=np.uint8)
            )

    def _stats_loop(self) -> None:
        """統計情報を取得するループ"""
        while not self._closed:
            # 取得する統計情報は文字列
            raw_json = self._connection.get_stats()
            # 変換する
            # 統計情報を表示したい場合は stats を表示する
            _stats = json.loads(raw_json)
            # webrtc stats の仕様そのまま
            # https://www.w3.org/TR/webrtc-stats/
            # [{"type": "inbound-rtp", ...}, ...]
            time.sleep(60)

    def disconnect(self) -> None:
        """Sora から切断する"""
        # Sora から切断
        self._connection.disconnect()
        # スレッドの終了を待機
        self._audio_input_thread.join(10)
        self._video_input_thread.join(10)

    def _on_notify(self, raw_message: str) -> None:
        """
        シグナリング通知のコールバック

        :param raw_message: 受信した JSON 形式のメッセージ
        """
        message: Dict[str, Any] = json.loads(raw_message)
        # event_type が connection.created で、
        # connection_id が自分の connection_id と一致する場合、接続が成功
        if (
            message["type"] == "notify"
            and message["event_type"] == "connection.created"
            and message.get("connection_id") == self._connection_id
        ):
            print(f"Sora に接続しました: connection_id={self._connection_id}")
            # 接続が成功したら connected をセット
            self._connected.set()

    def _on_set_offer(self, raw_message: str) -> None:
        """
        シグナリング type: offer のコールバック

        :param raw_message: 受信した JSON 形式のメッセージ
        """
        message: Dict[str, Any] = json.loads(raw_message)
        # "type": "offer" に自分の connection_id が入ってくるので取得しておく
        if message["type"] == "offer":
            self._connection_id = message["connection_id"]

    def _on_disconnect(self, error_code: SoraSignalingErrorCode, message: str) -> None:
        """
        切断時のコールバック

        :param error_code: 切断時のエラーコード
        :param message: エラーメッセージ
        """
        print(f"Sora から切断されました: error_code={error_code}, message={message}")
        self._closed = True
        # 切断完了で connected をクリア
        self._connected.clear()

    def _on_track(self, track: SoraTrackInterface) -> None:
        """
        トラック受信時のコールバック

        :param track: 受信したトラック
        """
        if track.kind == "audio":
            self._audio_sink = SoraAudioSink(
                track=track, output_frequency=16000, output_channels=1
            )

        if track.kind == "video":
            self._video_sink = SoraVideoSink(track=track)

    def run(self) -> None:
        """接続を維持し、必要に応じて切断する"""
        try:
            # 接続を維持
            while not self._closed:
                pass
        except KeyboardInterrupt:
            # キーボード割り込みの場合
            pass
        finally:
            # 接続の切断
            if self._connection:
                self._connection.disconnect()


def main() -> None:
    """メイン関数: Sora への接続と切断を行う"""
    # 環境変数からシグナリング URL とチャネル ID を取得
    signaling_url = os.getenv("SORA_SIGNALING_URL")
    if not signaling_url:
        raise ValueError("環境変数 SORA_SIGNALING_URL が設定されていません")
    channel_id = os.getenv("SORA_CHANNEL_ID")
    if not channel_id:
        raise ValueError("環境変数 SORA_CHANNEL_ID が設定されていません")

    # signaling_url はリストである必要があるため、リストに変換
    signaling_urls: List[str] = [signaling_url]

    sample: Sendrecv = Sendrecv(signaling_urls, channel_id)

    # Sora へ接続
    sample.connect()
    # 接続の維持する場合は sample.connect().run() を呼ぶ
    # sample.connect().run()

    time.sleep(3)

    sample.disconnect()


if __name__ == "__main__":
    main()

音声と映像を送信

import json
import os
import threading
import time
from threading import Event
from typing import Any, Dict, List, Optional

import numpy as np
from sora_sdk import (
    Sora,
    SoraAudioSource,
    SoraConnection,
    SoraSignalingErrorCode,
    SoraVideoSource,
)


class Sendonly:
    def __init__(self, signaling_urls: List[str], channel_id: str):
        """
        Sendonly クラスのコンストラクタ

        :param signaling_urls: シグナリングサーバーの URL リスト
        :param channel_id: 接続するチャンネルの ID
        """
        # シグナリング URL とチャネル ID を初期化
        self._signaling_urls: List[str] = signaling_urls
        self._channel_id: str = channel_id

        self._role: str = "sendonly"

        self._connection_id: Optional[str] = None

        self._connected: Event = Event()
        self._closed: bool = False

        self._video_height: int = 480
        self._video_width: int = 640

        self._sora: Sora = Sora()

        self._audio_source: SoraAudioSource = self._sora.create_audio_source(
            sample_rate=48000, channels=1
        )
        self._video_source: SoraVideoSource = self._sora.create_video_source()

        # Sora への接続設定
        self._connection: SoraConnection = self._sora.create_connection(
            signaling_urls=self._signaling_urls,
            role=self._role,
            channel_id=self._channel_id,
            audio=True,
            audio_source=self._audio_source,
            video=True,
            video_source=self._video_source,
        )

        self._connection.on_set_offer = self._on_set_offer
        self._connection.on_notify = self._on_notify
        self._connection.on_disconnect = self._on_disconnect

    def connect(self) -> "Sendonly":
        """
        Sora へ接続する

        :return: 接続が成功した場合、自身のインスタンス
        :raises AssertionError: 接続に失敗した場合
        """
        # Sora へ接続
        self._connection.connect()

        self._audio_input_thread = threading.Thread(
            target=self._audio_input_loop, daemon=True
        )
        self._audio_input_thread.start()

        self._video_input_thread = threading.Thread(
            target=self._video_input_loop, daemon=True
        )
        self._video_input_thread.start()

        # 接続が成功するまで待つ
        assert self._connected.wait(10), "接続に失敗しました"

        # 統計情報の定期取得用スレッド
        self._stats_thread = threading.Thread(target=self._stats_loop, daemon=True)

        return self

    def _audio_input_loop(self) -> None:
        """ダミー音声を生成するループ"""
        while not self._closed:
            time.sleep(0.02)
            self._audio_source.on_data(np.zeros((320, 1), dtype=np.int16))

    def _video_input_loop(self) -> None:
        """ダミー映像を生成するループ"""
        while not self._closed:
            time.sleep(1.0 / 30)
            self._video_source.on_captured(
                np.zeros((self._video_height, self._video_width, 3), dtype=np.uint8)
            )

    def _stats_loop(self) -> None:
        """統計情報を取得するループ"""
        while not self._closed:
            # 取得する統計情報は文字列
            raw_json = self._connection.get_stats()
            # 変換する
            # 統計情報を表示したい場合は stats を表示する
            _stats = json.loads(raw_json)
            # webrtc stats の仕様そのまま
            # https://www.w3.org/TR/webrtc-stats/
            # [{"type": "inbound-rtp", ...}, ...]
            time.sleep(60)

    def disconnect(self) -> None:
        """Sora から切断する"""
        self._connection.disconnect()
        self._audio_input_thread.join(10)
        self._video_input_thread.join(10)

    def _on_notify(self, raw_message: str) -> None:
        """
        シグナリング通知のコールバック

        :param raw_message: 受信した JSON 形式のメッセージ
        """
        message: Dict[str, Any] = json.loads(raw_message)
        # event_type が connection.created で、
        # connection_id が自分の connection_id と一致する場合、接続が成功
        if (
            message["type"] == "notify"
            and message["event_type"] == "connection.created"
            and message.get("connection_id") == self._connection_id
        ):
            print(f"Sora に接続しました: connection_id={self._connection_id}")
            # 接続が成功したら connected をセット
            self._connected.set()

    def _on_set_offer(self, raw_message: str) -> None:
        """
        シグナリング type: offer のコールバック

        :param raw_message: 受信した JSON 形式のメッセージ
        """
        message: Dict[str, Any] = json.loads(raw_message)
        # "type": "offer" に自分の connection_id が入ってくるので取得しておく
        if message["type"] == "offer":
            self._connection_id = message["connection_id"]

    def _on_disconnect(self, error_code: SoraSignalingErrorCode, message: str) -> None:
        """
        切断時のコールバック

        :param error_code: 切断時のエラーコード
        :param message: エラーメッセージ
        """
        print(f"Sora から切断されました: error_code={error_code}, message={message}")
        self._closed = True
        # 切断完了で connected をクリア
        self._connected.clear()

    def run(self) -> None:
        """接続を維持し、必要に応じて切断する"""
        try:
            # 接続を維持
            while not self._closed:
                pass
        except KeyboardInterrupt:
            # キーボード割り込みの場合
            pass
        finally:
            # 接続の切断
            if self._connection:
                self._connection.disconnect()


def main() -> None:
    """メイン関数: Sora への接続と切断を行う"""
    # 環境変数からシグナリング URL とチャネル ID を取得
    signaling_url = os.getenv("SORA_SIGNALING_URL")
    if not signaling_url:
        raise ValueError("環境変数 SORA_SIGNALING_URL が設定されていません")
    channel_id = os.getenv("SORA_CHANNEL_ID")
    if not channel_id:
        raise ValueError("環境変数 SORA_CHANNEL_ID が設定されていません")

    # signaling_url はリストである必要があるので、リストに変換
    signaling_urls: List[str] = [signaling_url]

    sample: Sendonly = Sendonly(signaling_urls, channel_id)

    # Sora へ接続
    sample.connect()
    # 接続の維持する場合は sample.connect().run() を呼ぶ
    # sample.connect().run()

    time.sleep(3)

    sample.disconnect()


if __name__ == "__main__":
    main()

音声と映像を受信

import json
import os
import threading
import time
from threading import Event
from typing import Any, Dict, List, Optional

from sora_sdk import (
    Sora,
    SoraAudioSink,
    SoraConnection,
    SoraSignalingErrorCode,
    SoraTrackInterface,
    SoraVideoSink,
)


class Recvonly:
    def __init__(self, signaling_urls: List[str], channel_id: str):
        """
        Recvonly クラスのコンストラクタ

        :param signaling_urls: シグナリングサーバーの URL リスト
        :param channel_id: 接続するチャンネルの ID
        """
        # シグナリング URL とチャネル ID を初期化
        self._signaling_urls: List[str] = signaling_urls
        self._channel_id: str = channel_id

        self._role: str = "recvonly"

        self._connection_id: Optional[str] = None

        self._connected: Event = Event()
        self._closed: bool = False

        self._audio_sink: Optional[SoraAudioSink] = None
        self._video_sink: Optional[SoraVideoSink] = None

        self._sora: Sora = Sora()
        # Sora への接続設定
        self._connection: SoraConnection = self._sora.create_connection(
            signaling_urls=self._signaling_urls,
            role=self._role,
            channel_id=self._channel_id,
        )

        self._connection.on_set_offer = self._on_set_offer
        self._connection.on_notify = self._on_notify
        self._connection.on_disconnect = self._on_disconnect
        self._connection.on_track = self._on_track

    def connect(self) -> "Recvonly":
        """
        Sora へ接続する

        :return: 接続が成功した場合、自身のインスタンス
        :raises AssertionError: 接続に失敗した場合
        """
        # Sora へ接続
        self._connection.connect()

        # 接続が成功するまで待つ
        assert self._connected.wait(10), "接続に失敗しました"

        # 統計情報の定期取得用スレッド
        self._stats_thread = threading.Thread(target=self._stats_loop, daemon=True)

        return self

    def _stats_loop(self) -> None:
        """統計情報を取得するループ"""
        while not self._closed:
            # 取得する統計情報は文字列
            raw_json = self._connection.get_stats()
            # 変換する
            # 統計情報を表示したい場合は stats を表示する
            _stats = json.loads(raw_json)
            # webrtc stats の仕様そのまま
            # https://www.w3.org/TR/webrtc-stats/
            # [{"type": "inbound-rtp", ...}, ...]
            time.sleep(60)

    def disconnect(self) -> None:
        """
        Sora から切断する
        """
        self._connection.disconnect()

    def _on_notify(self, raw_message: str) -> None:
        """
        シグナリング通知のコールバック

        :param raw_message: 受信した JSON 形式のメッセージ
        """
        message: Dict[str, Any] = json.loads(raw_message)
        # event_type が connection.created で、
        # connection_id が自分の connection_id と一致する場合、接続が成功
        if (
            message["type"] == "notify"
            and message["event_type"] == "connection.created"
            and message.get("connection_id") == self._connection_id
        ):
            print(f"Sora に接続しました: connection_id={self._connection_id}")
            # 接続が成功したら connected をセット
            self._connected.set()

    def _on_set_offer(self, raw_message: str) -> None:
        """
        シグナリング type: offer のコールバック

        :param raw_message: 受信した JSON 形式のメッセージ
        """
        message: Dict[str, Any] = json.loads(raw_message)
        # "type": "offer" に自分の connection_id が入ってくるので取得しておく
        if message["type"] == "offer":
            self._connection_id = message["connection_id"]

    def _on_disconnect(self, error_code: SoraSignalingErrorCode, message: str) -> None:
        """
        切断時のコールバック

        :param error_code: 切断時のエラーコード
        :param message: エラーメッセージ
        """
        print(f"Sora から切断されました: error_code={error_code}, message={message}")
        self._closed = True
        # 切断完了で connected をクリア
        self._connected.clear()

    def _on_track(self, track: SoraTrackInterface) -> None:
        """
        トラック受信時のコールバック

        :param track: 受信したトラック
        """
        if track.kind == "audio":
            self._audio_sink = SoraAudioSink(
                track=track, output_frequency=16000, output_channels=1
            )

        if track.kind == "video":
            self._video_sink = SoraVideoSink(track=track)

    def run(self) -> None:
        """
        接続を維持し、必要に応じて切断する
        """
        try:
            # 接続を維持
            while not self._closed:
                pass
        except KeyboardInterrupt:
            # キーボード割り込みの場合
            pass
        finally:
            # 接続の切断
            if self._connection:
                self._connection.disconnect()


def main() -> None:
    """
    メイン関数: Sora への接続と切断を行う
    """
    # 環境変数からシグナリング URL とチャネル ID を取得
    signaling_url: Optional[str] = os.getenv("SORA_SIGNALING_URL")
    if not signaling_url:
        raise ValueError("環境変数 SORA_SIGNALING_URL が設定されていません")
    channel_id: Optional[str] = os.getenv("SORA_CHANNEL_ID")
    if not channel_id:
        raise ValueError("環境変数 SORA_CHANNEL_ID が設定されていません")

    # signaling_url はリストである必要があるので、リストに変換
    signaling_urls: List[str] = [signaling_url]

    sample: Recvonly = Recvonly(signaling_urls, channel_id)

    # Sora へ接続
    sample.connect()
    # 接続の維持する場合は sample.connect().run() を呼ぶ
    # sample.connect().run()

    time.sleep(3)

    sample.disconnect()


if __name__ == "__main__":
    main()

受信した音声を VAD を利用して判定

import json
import os
import time
from threading import Event
from typing import Any, Dict, List, Optional

from sora_sdk import (
    Sora,
    SoraAudioFrame,
    SoraAudioStreamSink,
    SoraConnection,
    SoraMediaTrack,
    SoraSignalingErrorCode,
    SoraVAD,
)


class RecvonlyVAD:
    def __init__(self, signaling_urls: List[str], channel_id: str):
        """
        RecvonlyVAD クラスの初期化

        :param signaling_urls: Sora シグナリングサーバーの URL リスト
        :param channel_id: 接続するチャンネルの ID
        """
        self._vad: SoraVAD = SoraVAD()
        self._signaling_urls: List[str] = signaling_urls
        self._channel_id: str = channel_id

        self._connection_id: Optional[str] = None

        self._connected: Event = Event()
        self._closed: bool = False

        self._audio_stream_sink: SoraAudioStreamSink
        self._audio_output_frequency: int = 24000
        self._audio_output_channels: int = 1

        self._sora: Sora = Sora()

        # Sora への接続設定
        self._connection: SoraConnection = self._sora.create_connection(
            signaling_urls=self._signaling_urls,
            role="recvonly",
            channel_id=self._channel_id,
            audio=True,
            video=False,
        )

        self._connection.on_set_offer = self._on_set_offer
        self._connection.on_notify = self._on_notify
        self._connection.on_disconnect = self._on_disconnect
        self._connection.on_track = self._on_track

    def connect(self) -> "RecvonlyVAD":
        """
        Sora に接続する
        """
        self._connection.connect()

        # 接続が成功するまで待つ
        assert self._connected.wait(10), "接続に失敗しました"

        return self

    def disconnect(self) -> None:
        """
        Sora との接続を切断する
        """
        self._connection.disconnect()

    def _on_notify(self, raw_message: str) -> None:
        """
        シグナリング通知のコールバック

        :param raw_message: JSON 形式の生のメッセージ
        """
        message: Dict[str, Any] = json.loads(raw_message)
        if (
            message["type"] == "notify"
            and message["event_type"] == "connection.created"
            and message["connection_id"] == self._connection_id
        ):
            print(f"Sora に接続しました: connection_id={self._connection_id}")
            self._connected.set()

    def _on_set_offer(self, raw_message: str) -> None:
        """
        シグナリング type: offer のコールバック

        :param raw_message: JSON 形式の生のメッセージ
        """
        message: Dict[str, Any] = json.loads(raw_message)
        if message["type"] == "offer":
            self._connection_id = message["connection_id"]

    def _on_disconnect(self, error_code: SoraSignalingErrorCode, message: str) -> None:
        """
        切断時のコールバック

        :param error_code: 切断の理由を示すエラーコード
        :param message: エラーメッセージ
        """
        print(f"Sora から切断されました: error_code={error_code}, message={message}")
        self._closed = True
        self._connected.clear()

    def _on_frame(self, frame: SoraAudioFrame) -> None:
        """
        音声フレーム受信時のコールバック

        :param frame: 受信した音声フレーム
        """
        voice_probability: float = self._vad.analyze(frame)
        if voice_probability > 0.95:  # 0.95 は libwebrtc の判定値
            print(f"Voice! voice_probability={voice_probability}")

    def _on_track(self, track: SoraMediaTrack) -> None:
        """
        トラック受信時のコールバック

        :param track: 受信したメディアトラック
        """
        if track.kind == "audio":
            self._audio_stream_sink = SoraAudioStreamSink(
                track, self._audio_output_frequency, self._audio_output_channels
            )
            self._audio_stream_sink.on_frame = self._on_frame

    def run(self) -> None:
        """
        メインループ。接続を維持し、キーボード割り込みを処理する
        """
        try:
            while not self._closed:
                pass
        except KeyboardInterrupt:
            pass
        finally:
            if self._connection:
                self._connection.disconnect()


def main() -> None:
    """
    メイン関数。RecvonlyVAD インスタンスを作成し、Sora に接続する
    """
    signaling_url: str | None = os.getenv("SORA_SIGNALING_URL")
    if not signaling_url:
        raise ValueError("環境変数 SORA_SIGNALING_URL が設定されていません")
    channel_id: str | None = os.getenv("SORA_CHANNEL_ID")
    if not channel_id:
        raise ValueError("環境変数 SORA_CHANNEL_ID が設定されていません")

    signaling_urls: List[str] = [signaling_url]

    sample: RecvonlyVAD = RecvonlyVAD(signaling_urls, channel_id)

    sample.connect()
    # 接続を維持する場合は sample.connect().run() を呼ぶ
    # sample.connect().run()

    time.sleep(3)

    sample.disconnect()


if __name__ == "__main__":
    main()
© Copyright 2024, Shiguredo Inc. Created using Sphinx 8.0.2