サンプル¶
- Python バージョン:
3.10 以降
音声と映像を送受信¶
import json
import os
import threading
import time
from threading import Event
from typing import Any, Dict, List, Optional
import numpy as np
from sora_sdk import (
Sora,
SoraAudioSink,
SoraAudioSource,
SoraConnection,
SoraSignalingErrorCode,
SoraTrackInterface,
SoraVideoSink,
SoraVideoSource,
)
class Sendrecv:
def __init__(self, signaling_urls: List[str], channel_id: str):
"""
Sendrecv クラスのコンストラクタ
:param signaling_urls: シグナリングサーバーの URL リスト
:param channel_id: 接続するチャンネルの ID
"""
# シグナリング URL、ロール、チャネル ID を初期化
self._signaling_urls: List[str] = signaling_urls
self._channel_id: str = channel_id
self._role: str = "sendrecv"
self._connection_id: Optional[str] = None
self._connected: Event = Event()
self._closed: bool = False
self._video_height: int = 480
self._video_width: int = 640
self._audio_sink: Optional[SoraAudioSink] = None
self._video_sink: Optional[SoraVideoSink] = None
# Sora インスタンスの生成
self._sora: Sora = Sora()
self._audio_source: SoraAudioSource = self._sora.create_audio_source(
sample_rate=48000, channels=1
)
self._video_source: SoraVideoSource = self._sora.create_video_source()
# Sora への接続設定
self._connection: SoraConnection = self._sora.create_connection(
signaling_urls=self._signaling_urls,
role=self._role,
channel_id=self._channel_id,
# create_connection するタイミングで audio_source と video_source を指定する
audio_source=self._audio_source,
video_source=self._video_source,
)
# コールバックの登録
self._connection.on_set_offer = self._on_set_offer
self._connection.on_notify = self._on_notify
self._connection.on_disconnect = self._on_disconnect
self._connection.on_track = self._on_track
def connect(self) -> "Sendrecv":
"""
Sora へ接続する
:return: 接続が成功した場合、自身のインスタンス
:raises AssertionError: 接続に失敗した場合
"""
# Sora へ接続
self._connection.connect()
self._audio_input_thread = threading.Thread(
target=self._audio_input_loop, daemon=True
)
self._audio_input_thread.start()
self._video_input_thread = threading.Thread(
target=self._video_input_loop, daemon=True
)
self._video_input_thread.start()
# 接続完了まで待機
assert self._connected.wait(10), "接続に失敗しました"
# 統計情報の定期取得用スレッド
self._stats_thread = threading.Thread(target=self._stats_loop, daemon=True)
return self
def _audio_input_loop(self) -> None:
"""ダミー音声を生成するループ"""
# パラメータ
sample_rate = 16000 # サンプリングレート (Hz)
freq = 440 # 周波数 (Hz)
duration = 0.02 # 時間 (秒)
amplitude = 0.25 # 振幅
# 時間配列を生成
t = np.arange(int(sample_rate * duration)) / sample_rate
while not self._closed:
# sin 波を生成
sin_wave = np.sin(2 * np.pi * freq * t)
# sin 波を 16 ビット整数に変換
sin_wave_int16 = np.int16(sin_wave * 32767 * amplitude)
# sin 波を送信
self._audio_source.on_data(sin_wave_int16.reshape(-1, 1))
# 次のサイクルのために時間を進める
t += duration
def _video_input_loop(self) -> None:
"""ダミー映像を生成するループ"""
while not self._closed:
time.sleep(1.0 / 30)
self._video_source.on_captured(
np.zeros((self._video_height, self._video_width, 3), dtype=np.uint8)
)
def _stats_loop(self) -> None:
"""統計情報を取得するループ"""
while not self._closed:
# 取得する統計情報は文字列
raw_json = self._connection.get_stats()
# 変換する
# 統計情報を表示したい場合は stats を表示する
_stats = json.loads(raw_json)
# webrtc stats の仕様そのまま
# https://www.w3.org/TR/webrtc-stats/
# [{"type": "inbound-rtp", ...}, ...]
time.sleep(60)
def disconnect(self) -> None:
"""Sora から切断する"""
# Sora から切断
self._connection.disconnect()
# スレッドの終了を待機
self._audio_input_thread.join(10)
self._video_input_thread.join(10)
def _on_notify(self, raw_message: str) -> None:
"""
シグナリング通知のコールバック
:param raw_message: 受信した JSON 形式のメッセージ
"""
message: Dict[str, Any] = json.loads(raw_message)
# event_type が connection.created で、
# connection_id が自分の connection_id と一致する場合、接続が成功
if (
message["type"] == "notify"
and message["event_type"] == "connection.created"
and message.get("connection_id") == self._connection_id
):
print(f"Sora に接続しました: connection_id={self._connection_id}")
# 接続が成功したら connected をセット
self._connected.set()
def _on_set_offer(self, raw_message: str) -> None:
"""
シグナリング type: offer のコールバック
:param raw_message: 受信した JSON 形式のメッセージ
"""
message: Dict[str, Any] = json.loads(raw_message)
# "type": "offer" に自分の connection_id が入ってくるので取得しておく
if message["type"] == "offer":
self._connection_id = message["connection_id"]
def _on_disconnect(self, error_code: SoraSignalingErrorCode, message: str) -> None:
"""
切断時のコールバック
:param error_code: 切断時のエラーコード
:param message: エラーメッセージ
"""
print(f"Sora から切断されました: error_code={error_code}, message={message}")
self._closed = True
# 切断完了で connected をクリア
self._connected.clear()
def _on_track(self, track: SoraTrackInterface) -> None:
"""
トラック受信時のコールバック
:param track: 受信したトラック
"""
if track.kind == "audio":
self._audio_sink = SoraAudioSink(
track=track, output_frequency=16000, output_channels=1
)
if track.kind == "video":
self._video_sink = SoraVideoSink(track=track)
def run(self) -> None:
"""接続を維持し、必要に応じて切断する"""
try:
# 接続を維持
while not self._closed:
pass
except KeyboardInterrupt:
# キーボード割り込みの場合
pass
finally:
# 接続の切断
if self._connection:
self._connection.disconnect()
def main() -> None:
"""メイン関数: Sora への接続と切断を行う"""
# 環境変数からシグナリング URL とチャネル ID を取得
signaling_url = os.getenv("SORA_SIGNALING_URL")
if not signaling_url:
raise ValueError("環境変数 SORA_SIGNALING_URL が設定されていません")
channel_id = os.getenv("SORA_CHANNEL_ID")
if not channel_id:
raise ValueError("環境変数 SORA_CHANNEL_ID が設定されていません")
# signaling_url はリストである必要があるため、リストに変換
signaling_urls: List[str] = [signaling_url]
sample: Sendrecv = Sendrecv(signaling_urls, channel_id)
# Sora へ接続
sample.connect()
# 接続の維持する場合は sample.connect().run() を呼ぶ
# sample.connect().run()
time.sleep(3)
sample.disconnect()
if __name__ == "__main__":
main()
音声と映像を送信¶
import json
import os
import threading
import time
from threading import Event
from typing import Any, Dict, List, Optional
import numpy as np
from sora_sdk import (
Sora,
SoraAudioSource,
SoraConnection,
SoraSignalingErrorCode,
SoraVideoSource,
)
class Sendonly:
def __init__(self, signaling_urls: List[str], channel_id: str):
"""
Sendonly クラスのコンストラクタ
:param signaling_urls: シグナリングサーバーの URL リスト
:param channel_id: 接続するチャンネルの ID
"""
# シグナリング URL とチャネル ID を初期化
self._signaling_urls: List[str] = signaling_urls
self._channel_id: str = channel_id
self._role: str = "sendonly"
self._connection_id: Optional[str] = None
self._connected: Event = Event()
self._closed: bool = False
self._video_height: int = 480
self._video_width: int = 640
self._sora: Sora = Sora()
self._audio_source: SoraAudioSource = self._sora.create_audio_source(
sample_rate=48000, channels=1
)
self._video_source: SoraVideoSource = self._sora.create_video_source()
# Sora への接続設定
self._connection: SoraConnection = self._sora.create_connection(
signaling_urls=self._signaling_urls,
role=self._role,
channel_id=self._channel_id,
audio=True,
audio_source=self._audio_source,
video=True,
video_source=self._video_source,
)
self._connection.on_set_offer = self._on_set_offer
self._connection.on_notify = self._on_notify
self._connection.on_disconnect = self._on_disconnect
def connect(self) -> "Sendonly":
"""
Sora へ接続する
:return: 接続が成功した場合、自身のインスタンス
:raises AssertionError: 接続に失敗した場合
"""
# Sora へ接続
self._connection.connect()
self._audio_input_thread = threading.Thread(
target=self._audio_input_loop, daemon=True
)
self._audio_input_thread.start()
self._video_input_thread = threading.Thread(
target=self._video_input_loop, daemon=True
)
self._video_input_thread.start()
# 接続が成功するまで待つ
assert self._connected.wait(10), "接続に失敗しました"
# 統計情報の定期取得用スレッド
self._stats_thread = threading.Thread(target=self._stats_loop, daemon=True)
return self
def _audio_input_loop(self) -> None:
"""ダミー音声を生成するループ"""
while not self._closed:
time.sleep(0.02)
self._audio_source.on_data(np.zeros((320, 1), dtype=np.int16))
def _video_input_loop(self) -> None:
"""ダミー映像を生成するループ"""
while not self._closed:
time.sleep(1.0 / 30)
self._video_source.on_captured(
np.zeros((self._video_height, self._video_width, 3), dtype=np.uint8)
)
def _stats_loop(self) -> None:
"""統計情報を取得するループ"""
while not self._closed:
# 取得する統計情報は文字列
raw_json = self._connection.get_stats()
# 変換する
# 統計情報を表示したい場合は stats を表示する
_stats = json.loads(raw_json)
# webrtc stats の仕様そのまま
# https://www.w3.org/TR/webrtc-stats/
# [{"type": "inbound-rtp", ...}, ...]
time.sleep(60)
def disconnect(self) -> None:
"""Sora から切断する"""
self._connection.disconnect()
self._audio_input_thread.join(10)
self._video_input_thread.join(10)
def _on_notify(self, raw_message: str) -> None:
"""
シグナリング通知のコールバック
:param raw_message: 受信した JSON 形式のメッセージ
"""
message: Dict[str, Any] = json.loads(raw_message)
# event_type が connection.created で、
# connection_id が自分の connection_id と一致する場合、接続が成功
if (
message["type"] == "notify"
and message["event_type"] == "connection.created"
and message.get("connection_id") == self._connection_id
):
print(f"Sora に接続しました: connection_id={self._connection_id}")
# 接続が成功したら connected をセット
self._connected.set()
def _on_set_offer(self, raw_message: str) -> None:
"""
シグナリング type: offer のコールバック
:param raw_message: 受信した JSON 形式のメッセージ
"""
message: Dict[str, Any] = json.loads(raw_message)
# "type": "offer" に自分の connection_id が入ってくるので取得しておく
if message["type"] == "offer":
self._connection_id = message["connection_id"]
def _on_disconnect(self, error_code: SoraSignalingErrorCode, message: str) -> None:
"""
切断時のコールバック
:param error_code: 切断時のエラーコード
:param message: エラーメッセージ
"""
print(f"Sora から切断されました: error_code={error_code}, message={message}")
self._closed = True
# 切断完了で connected をクリア
self._connected.clear()
def run(self) -> None:
"""接続を維持し、必要に応じて切断する"""
try:
# 接続を維持
while not self._closed:
pass
except KeyboardInterrupt:
# キーボード割り込みの場合
pass
finally:
# 接続の切断
if self._connection:
self._connection.disconnect()
def main() -> None:
"""メイン関数: Sora への接続と切断を行う"""
# 環境変数からシグナリング URL とチャネル ID を取得
signaling_url = os.getenv("SORA_SIGNALING_URL")
if not signaling_url:
raise ValueError("環境変数 SORA_SIGNALING_URL が設定されていません")
channel_id = os.getenv("SORA_CHANNEL_ID")
if not channel_id:
raise ValueError("環境変数 SORA_CHANNEL_ID が設定されていません")
# signaling_url はリストである必要があるので、リストに変換
signaling_urls: List[str] = [signaling_url]
sample: Sendonly = Sendonly(signaling_urls, channel_id)
# Sora へ接続
sample.connect()
# 接続の維持する場合は sample.connect().run() を呼ぶ
# sample.connect().run()
time.sleep(3)
sample.disconnect()
if __name__ == "__main__":
main()
音声と映像を受信¶
import json
import os
import threading
import time
from threading import Event
from typing import Any, Dict, List, Optional
from sora_sdk import (
Sora,
SoraAudioSink,
SoraConnection,
SoraSignalingErrorCode,
SoraTrackInterface,
SoraVideoSink,
)
class Recvonly:
def __init__(self, signaling_urls: List[str], channel_id: str):
"""
Recvonly クラスのコンストラクタ
:param signaling_urls: シグナリングサーバーの URL リスト
:param channel_id: 接続するチャンネルの ID
"""
# シグナリング URL とチャネル ID を初期化
self._signaling_urls: List[str] = signaling_urls
self._channel_id: str = channel_id
self._role: str = "recvonly"
self._connection_id: Optional[str] = None
self._connected: Event = Event()
self._closed: bool = False
self._audio_sink: Optional[SoraAudioSink] = None
self._video_sink: Optional[SoraVideoSink] = None
self._sora: Sora = Sora()
# Sora への接続設定
self._connection: SoraConnection = self._sora.create_connection(
signaling_urls=self._signaling_urls,
role=self._role,
channel_id=self._channel_id,
)
self._connection.on_set_offer = self._on_set_offer
self._connection.on_notify = self._on_notify
self._connection.on_disconnect = self._on_disconnect
self._connection.on_track = self._on_track
def connect(self) -> "Recvonly":
"""
Sora へ接続する
:return: 接続が成功した場合、自身のインスタンス
:raises AssertionError: 接続に失敗した場合
"""
# Sora へ接続
self._connection.connect()
# 接続が成功するまで待つ
assert self._connected.wait(10), "接続に失敗しました"
# 統計情報の定期取得用スレッド
self._stats_thread = threading.Thread(target=self._stats_loop, daemon=True)
return self
def _stats_loop(self) -> None:
"""統計情報を取得するループ"""
while not self._closed:
# 取得する統計情報は文字列
raw_json = self._connection.get_stats()
# 変換する
# 統計情報を表示したい場合は stats を表示する
_stats = json.loads(raw_json)
# webrtc stats の仕様そのまま
# https://www.w3.org/TR/webrtc-stats/
# [{"type": "inbound-rtp", ...}, ...]
time.sleep(60)
def disconnect(self) -> None:
"""
Sora から切断する
"""
self._connection.disconnect()
def _on_notify(self, raw_message: str) -> None:
"""
シグナリング通知のコールバック
:param raw_message: 受信した JSON 形式のメッセージ
"""
message: Dict[str, Any] = json.loads(raw_message)
# event_type が connection.created で、
# connection_id が自分の connection_id と一致する場合、接続が成功
if (
message["type"] == "notify"
and message["event_type"] == "connection.created"
and message.get("connection_id") == self._connection_id
):
print(f"Sora に接続しました: connection_id={self._connection_id}")
# 接続が成功したら connected をセット
self._connected.set()
def _on_set_offer(self, raw_message: str) -> None:
"""
シグナリング type: offer のコールバック
:param raw_message: 受信した JSON 形式のメッセージ
"""
message: Dict[str, Any] = json.loads(raw_message)
# "type": "offer" に自分の connection_id が入ってくるので取得しておく
if message["type"] == "offer":
self._connection_id = message["connection_id"]
def _on_disconnect(self, error_code: SoraSignalingErrorCode, message: str) -> None:
"""
切断時のコールバック
:param error_code: 切断時のエラーコード
:param message: エラーメッセージ
"""
print(f"Sora から切断されました: error_code={error_code}, message={message}")
self._closed = True
# 切断完了で connected をクリア
self._connected.clear()
def _on_track(self, track: SoraTrackInterface) -> None:
"""
トラック受信時のコールバック
:param track: 受信したトラック
"""
if track.kind == "audio":
self._audio_sink = SoraAudioSink(
track=track, output_frequency=16000, output_channels=1
)
if track.kind == "video":
self._video_sink = SoraVideoSink(track=track)
def run(self) -> None:
"""
接続を維持し、必要に応じて切断する
"""
try:
# 接続を維持
while not self._closed:
pass
except KeyboardInterrupt:
# キーボード割り込みの場合
pass
finally:
# 接続の切断
if self._connection:
self._connection.disconnect()
def main() -> None:
"""
メイン関数: Sora への接続と切断を行う
"""
# 環境変数からシグナリング URL とチャネル ID を取得
signaling_url: Optional[str] = os.getenv("SORA_SIGNALING_URL")
if not signaling_url:
raise ValueError("環境変数 SORA_SIGNALING_URL が設定されていません")
channel_id: Optional[str] = os.getenv("SORA_CHANNEL_ID")
if not channel_id:
raise ValueError("環境変数 SORA_CHANNEL_ID が設定されていません")
# signaling_url はリストである必要があるので、リストに変換
signaling_urls: List[str] = [signaling_url]
sample: Recvonly = Recvonly(signaling_urls, channel_id)
# Sora へ接続
sample.connect()
# 接続の維持する場合は sample.connect().run() を呼ぶ
# sample.connect().run()
time.sleep(3)
sample.disconnect()
if __name__ == "__main__":
main()
受信した音声を VAD を利用して判定¶
import json
import os
import time
from threading import Event
from typing import Any, Dict, List, Optional
from sora_sdk import (
Sora,
SoraAudioFrame,
SoraAudioStreamSink,
SoraConnection,
SoraMediaTrack,
SoraSignalingErrorCode,
SoraVAD,
)
class RecvonlyVAD:
def __init__(self, signaling_urls: List[str], channel_id: str):
"""
RecvonlyVAD クラスの初期化
:param signaling_urls: Sora シグナリングサーバーの URL リスト
:param channel_id: 接続するチャンネルの ID
"""
self._vad: SoraVAD = SoraVAD()
self._signaling_urls: List[str] = signaling_urls
self._channel_id: str = channel_id
self._connection_id: Optional[str] = None
self._connected: Event = Event()
self._closed: bool = False
self._audio_stream_sink: SoraAudioStreamSink
self._audio_output_frequency: int = 24000
self._audio_output_channels: int = 1
self._sora: Sora = Sora()
# Sora への接続設定
self._connection: SoraConnection = self._sora.create_connection(
signaling_urls=self._signaling_urls,
role="recvonly",
channel_id=self._channel_id,
audio=True,
video=False,
)
self._connection.on_set_offer = self._on_set_offer
self._connection.on_notify = self._on_notify
self._connection.on_disconnect = self._on_disconnect
self._connection.on_track = self._on_track
def connect(self) -> "RecvonlyVAD":
"""
Sora に接続する
"""
self._connection.connect()
# 接続が成功するまで待つ
assert self._connected.wait(10), "接続に失敗しました"
return self
def disconnect(self) -> None:
"""
Sora との接続を切断する
"""
self._connection.disconnect()
def _on_notify(self, raw_message: str) -> None:
"""
シグナリング通知のコールバック
:param raw_message: JSON 形式の生のメッセージ
"""
message: Dict[str, Any] = json.loads(raw_message)
if (
message["type"] == "notify"
and message["event_type"] == "connection.created"
and message["connection_id"] == self._connection_id
):
print(f"Sora に接続しました: connection_id={self._connection_id}")
self._connected.set()
def _on_set_offer(self, raw_message: str) -> None:
"""
シグナリング type: offer のコールバック
:param raw_message: JSON 形式の生のメッセージ
"""
message: Dict[str, Any] = json.loads(raw_message)
if message["type"] == "offer":
self._connection_id = message["connection_id"]
def _on_disconnect(self, error_code: SoraSignalingErrorCode, message: str) -> None:
"""
切断時のコールバック
:param error_code: 切断の理由を示すエラーコード
:param message: エラーメッセージ
"""
print(f"Sora から切断されました: error_code={error_code}, message={message}")
self._closed = True
self._connected.clear()
def _on_frame(self, frame: SoraAudioFrame) -> None:
"""
音声フレーム受信時のコールバック
:param frame: 受信した音声フレーム
"""
voice_probability: float = self._vad.analyze(frame)
if voice_probability > 0.95: # 0.95 は libwebrtc の判定値
print(f"Voice! voice_probability={voice_probability}")
def _on_track(self, track: SoraMediaTrack) -> None:
"""
トラック受信時のコールバック
:param track: 受信したメディアトラック
"""
if track.kind == "audio":
self._audio_stream_sink = SoraAudioStreamSink(
track, self._audio_output_frequency, self._audio_output_channels
)
self._audio_stream_sink.on_frame = self._on_frame
def run(self) -> None:
"""
メインループ。接続を維持し、キーボード割り込みを処理する
"""
try:
while not self._closed:
pass
except KeyboardInterrupt:
pass
finally:
if self._connection:
self._connection.disconnect()
def main() -> None:
"""
メイン関数。RecvonlyVAD インスタンスを作成し、Sora に接続する
"""
signaling_url: str | None = os.getenv("SORA_SIGNALING_URL")
if not signaling_url:
raise ValueError("環境変数 SORA_SIGNALING_URL が設定されていません")
channel_id: str | None = os.getenv("SORA_CHANNEL_ID")
if not channel_id:
raise ValueError("環境変数 SORA_CHANNEL_ID が設定されていません")
signaling_urls: List[str] = [signaling_url]
sample: RecvonlyVAD = RecvonlyVAD(signaling_urls, channel_id)
sample.connect()
# 接続を維持する場合は sample.connect().run() を呼ぶ
# sample.connect().run()
time.sleep(3)
sample.disconnect()
if __name__ == "__main__":
main()