サンプル¶
- Python バージョン:
3.11 以降
音声と映像を送受信¶
import json
import os
import threading
import time
from threading import Event
from typing import Any, Dict, List
import numpy as np
from sora_sdk import (
Sora,
SoraAudioSink,
SoraAudioSource,
SoraConnection,
SoraSignalingErrorCode,
SoraTrackInterface,
SoraVideoSink,
SoraVideoSource,
)
class Sendrecv:
def __init__(self, signaling_urls: list[str], channel_id: str):
# シグナリング URL、ロール、チャネル ID を初期化
self._signaling_urls: List[str] = signaling_urls
self._channel_id: str = channel_id
self._role: str = "sendrecv"
self.connection_id: str
self._connected: Event = Event()
self._closed: bool = False
self._video_height: int = 480
self._video_width: int = 640
# Sora インスタンスの生成
self._sora: Sora = Sora()
self._audio_source: SoraAudioSource = self._sora.create_audio_source(
sample_rate=48000, channels=1
)
self._video_source: SoraVideoSource = self._sora.create_video_source()
# Sora への接続設定
self._connection: SoraConnection = self._sora.create_connection(
signaling_urls=self._signaling_urls,
role=self._role,
channel_id=self._channel_id,
# create_connection するタイミングで audio_source と video_source を指定する
audio_source=self._audio_source,
video_source=self._video_source,
)
# コールバックの登録
self._connection.on_set_offer = self._on_set_offer
self._connection.on_notify = self._on_notify
self._connection.on_disconnect = self._on_disconnect
def connect(self):
# Sora へ接続
self._connection.connect()
self._audio_input_thread = threading.Thread(
target=self._audio_input_loop, daemon=True
)
self._audio_input_thread.start()
self._video_input_thread = threading.Thread(
target=self._video_input_loop, daemon=True
)
self._video_input_thread.start()
# 接続完了まで待機
assert self._connected.wait(10), "接続に失敗しました"
return self
# ダミー音声
def _audio_input_loop(self):
# パラメータ
sample_rate = 16000 # サンプリングレート (Hz)
freq = 440 # 周波数 (Hz)
duration = 0.02 # 時間 (秒)
amplitude = 0.25 # 振幅
# 時間配列を生成
t = np.arange(int(sample_rate * duration)) / sample_rate
while not self._closed:
# sin 波を生成
sin_wave = np.sin(2 * np.pi * freq * t)
# sin 波を 16 ビット整数に変換
sin_wave_int16 = np.int16(sin_wave * 32767 * amplitude)
# sin 波を送信
self._audio_source.on_data(sin_wave_int16.reshape(-1, 1))
# 次のサイクルのために時間を進める
t += duration
# ダミー映像
def _video_input_loop(self):
while not self._closed:
time.sleep(1.0 / 30)
self._video_source.on_captured(
np.zeros((self._video_height, self._video_width, 3), dtype=np.uint8)
)
def disconnect(self):
# Sora から切断
self._connection.disconnect()
# スレッドの終了を待機
self._audio_input_thread.join(10)
self._video_input_thread.join(10)
def _on_notify(self, raw_message: str):
# シグナリング通知のコールバック
message = json.loads(raw_message)
# event_type が connection.created で、
# connection_id が自分の connection_id と一致する場合、接続が成功
if (
message["type"] == "notify"
and message["event_type"] == "connection.created"
and message["connection_id"] == self.connection_id
):
print(f"Sora に接続しました: connection_id={self.connection_id}")
# 接続が成功したら connected をセット
self._connected.set()
def _on_set_offer(self, raw_message: str):
# シグナリング type: offer のコールバック
message = json.loads(raw_message)
# "type": "offer" に自分の connection_id が入ってくるので取得しておく
if message["type"] == "offer":
self.connection_id = message["connection_id"]
def _on_disconnect(self, error_code: SoraSignalingErrorCode, message: str):
# 切断時のコールバック
print(f"Sora から切断されました: error_code={error_code}, message={message}")
self._closed = True
# 切断完了で connected をクリア
self._connected.clear()
def _on_track(self, track: SoraTrackInterface):
# トラック受信時のコールバック
if track.kind == "audio":
self._audio_sink = SoraAudioSink(
track=track, output_frequency=16000, output_channels=1
)
if track.kind == "video":
self._video_sink = SoraVideoSink(track=track)
def run(self):
try:
# 接続を維持
while not self._closed:
pass
except KeyboardInterrupt:
# キーボード割り込みの場合
pass
finally:
# 接続の切断
if self._connection:
self._connection.disconnect()
def main():
# 環境変数からシグナリング URL とチャネル ID を取得
signaling_url = os.getenv("SORA_SIGNALING_URL")
channel_id = os.getenv("SORA_CHANNEL_ID")
# signaling_url はリストである必要があるため、リストに変換
signaling_urls = [signaling_url]
# Sendrecv インスタンスの生成
sample = Sendrecv(signaling_urls, channel_id)
# Sora へ接続
sample.connect()
time.sleep(3)
sample.disconnect()
# 接続の維持する場合は sample.disconnect() の代わりに sample.run() を呼ぶ
# sample.run()
if __name__ == "__main__":
main()
音声と映像を送信¶
import json
import os
import threading
import time
from threading import Event
import numpy as np
from sora_sdk import (
Sora,
SoraAudioSource,
SoraConnection,
SoraSignalingErrorCode,
SoraVideoSource,
)
class Sendonly:
def __init__(self, signaling_urls: list[str], channel_id: str):
# シグナリング URL とチャネル ID を初期化
self._signaling_urls = signaling_urls
self._channel_id = channel_id
self._connection_id: str
self._connected: Event = Event()
self._closed: bool = False
self._video_height: int = 480
self._video_width: int = 640
self._sora: Sora = Sora()
self._audio_source: SoraAudioSource = self._sora.create_audio_source(
sample_rate=48000, channels=1
)
self._video_source: SoraVideoSource = self._sora.create_video_source()
# Sora への接続設定
self._connection: SoraConnection = self._sora.create_connection(
signaling_urls=self._signaling_urls,
role="sendonly",
channel_id=self._channel_id,
audio=True,
audio_source=self._audio_source,
video=True,
video_source=self._video_source,
)
self._connection.on_set_offer = self._on_set_offer
self._connection.on_notify = self._on_notify
self._connection.on_disconnect = self._on_disconnect
def connect(self):
# Sora へ接続
self._connection.connect()
self._audio_input_thread: threading.Thread = threading.Thread(
target=self._audio_input_loop, daemon=True
)
self._audio_input_thread.start()
self._video_input_thread: threading.Thread = threading.Thread(
target=self._video_input_loop, daemon=True
)
self._video_input_thread.start()
# 接続が成功するまで待つ
assert self._connected.wait(10), "接続に失敗しました"
return self
# ダミー音声
def _audio_input_loop(self):
while not self._closed:
time.sleep(0.02)
self._audio_source.on_data(np.zeros((320, 1), dtype=np.int16))
# ダミー映像
def _video_input_loop(self):
while not self._closed:
time.sleep(1.0 / 30)
self._video_source.on_captured(
np.zeros((self._video_height, self._video_width, 3), dtype=np.uint8)
)
def disconnect(self):
self._connection.disconnect()
self._audio_input_thread.join(10)
self._video_input_thread.join(10)
def _on_notify(self, raw_message: str):
# シグナリング通知のコールバック
message = json.loads(raw_message)
# event_type が connection.created で、
# connection_id が自分の connection_id と一致する場合、接続が成功
if (
message["type"] == "notify"
and message["event_type"] == "connection.created"
and message["connection_id"] == self._connection_id
):
print(f"Sora に接続しました: connection_id={self._connection_id}")
# 接続が成功したら connected をセット
self._connected.set()
def _on_set_offer(self, raw_message: str):
# シグナリング type: offer のコールバック
message = json.loads(raw_message)
# "type": "offer" に自分の connection_id が入ってくるので取得しておく
if message["type"] == "offer":
self._connection_id = message["connection_id"]
def _on_disconnect(self, error_code: SoraSignalingErrorCode, message: str):
# 切断時のコールバック
print(f"Sora から切断されました: error_code={error_code}, message={message}")
self._closed = True
# 切断完了で connected をクリア
self._connected.clear()
def run(self):
try:
# 接続を維持
while not self.shutdown:
pass
except KeyboardInterrupt:
# キーボード割り込みの場合
pass
finally:
# 接続の切断
if self.connection:
self.connection.disconnect()
def main():
# 環境変数からシグナリング URL とチャネル ID を取得
signaling_url = os.getenv("SORA_SIGNALING_URL")
channel_id = os.getenv("SORA_CHANNEL_ID")
# signaling_url はリストである必要があるので、リストに変換
signaling_urls = [signaling_url]
sample = Sendonly(signaling_urls, channel_id)
# Sora へ接続
sample.connect()
time.sleep(3)
sample.disconnect()
# 接続の維持する場合は sample.disconnect() の代わりに sample.run() を呼ぶ
# sample.run()
if __name__ == "__main__":
main()
音声と映像を受信¶
import json
import os
import time
from threading import Event
from typing import Any, Dict, List
from sora_sdk import (
Sora,
SoraAudioSink,
SoraConnection,
SoraSignalingErrorCode,
SoraTrackInterface,
SoraVideoSink,
)
class Recvonly:
def __init__(self, signaling_urls: list[str], channel_id: str):
# シグナリング URL とチャネル ID を初期化
self._signaling_urls: List[str] = signaling_urls
self._channel_id: str = channel_id
self._connected: Event = Event()
self._closed: bool = False
self._audio_sink: SoraAudioSink
self._video_sink: SoraVideoSink
self._sora = Sora()
# Sora への接続設定
self._connection: SoraConnection = self._sora.create_connection(
signaling_urls=self._signaling_urls,
role="recvonly",
channel_id=self._channel_id,
)
self._connection.on_set_offer = self._on_set_offer
self._connection.on_notify = self._on_notify
self._connection.on_disconnect = self._on_disconnect
self._connection.on_track = self._on_track
def connect(self):
# Sora へ接続
self._connection.connect()
# 接続が成功するまで待つ
assert self._connected.wait(10), "接続に失敗しました"
return self
def disconnect(self):
self._connection.disconnect()
def _on_notify(self, raw_message: str):
# シグナリング通知のコールバック
message = json.loads(raw_message)
# event_type が connection.created で、
# connection_id が自分の connection_id と一致する場合、接続が成功
if (
message["type"] == "notify"
and message["event_type"] == "connection.created"
and message["connection_id"] == self._connection_id
):
print(f"Sora に接続しました: connection_id={self._connection_id}")
# 接続が成功したら connected をセット
self._connected.set()
def _on_set_offer(self, raw_message: str):
# シグナリング type: offer のコールバック
message: Dict[str, Any] = json.loads(raw_message)
# type: offer に自分の connection_id が入ってくるので取得しておく
if message["type"] == "offer":
self._connection_id = message["connection_id"]
def _on_disconnect(self, error_code: SoraSignalingErrorCode, message: str):
# 切断時のコールバック
print(f"Sora から切断されました: error_code={error_code}, message={message}")
self._closed = True
# 切断完了で connected をクリア
self._connected.clear()
def _on_track(self, track: SoraTrackInterface):
# トラック受信時のコールバック
if track.kind == "audio":
self._audio_sink = SoraAudioSink(
track=track, output_frequency=16000, output_channels=1
)
if track.kind == "video":
self._video_sink = SoraVideoSink(track=track)
def run(self):
try:
# 接続を維持
while not self._closed:
pass
except KeyboardInterrupt:
# キーボード割り込みの場合
pass
finally:
# 接続の切断
if self._connection:
self._connection.disconnect()
def main():
# 環境変数からシグナリング URL とチャネル ID を取得
signaling_url = os.getenv("SORA_SIGNALING_URL")
channel_id = os.getenv("SORA_CHANNEL_ID")
# signaling_url はリストである必要があるので、リストに変換
signaling_urls = [signaling_url]
# RecvonlySample インスタンスの生成
sample = Recvonly(signaling_urls, channel_id)
# Sora へ接続
sample.connect()
time.sleep(3)
sample.disconnect()
# 接続の維持する場合は sample.disconnect() の代わりに sample.run() を呼ぶ
# sample.run()
if __name__ == "__main__":
main()
受信した音声を VAD を利用して判定¶
import json
import os
import time
from threading import Event
from typing import Any, Dict, List
from sora_sdk import (
Sora,
SoraAudioFrame,
SoraAudioStreamSink,
SoraConnection,
SoraMediaTrack,
SoraSignalingErrorCode,
SoraVAD,
)
class RecvonlyVAD:
def __init__(self, signaling_urls: list[str], channel_id: str):
# VAD を初期化
self._vad: SoraVAD = SoraVAD()
# シグナリング URL とチャネル ID を初期化
self._signaling_urls: List[str] = signaling_urls
self._channel_id: str = channel_id
self._connection_id: str
self._connected: Event = Event()
self._closed: bool = False
self._audio_stream_sink: SoraAudioStreamSink
self._audio_output_frequency: int = 24000
self._audio_output_channels: int = 1
self._sora = Sora()
# Sora への接続設定
self._connection: SoraConnection = self._sora.create_connection(
signaling_urls=self._signaling_urls,
role="recvonly",
channel_id=self._channel_id,
audio=True,
video=False,
)
self._connection.on_set_offer = self._on_set_offer
self._connection.on_notify = self._on_notify
self._connection.on_disconnect = self._on_disconnect
self._connection.on_track = self._on_track
def connect(self):
# Sora へ接続
self._connection.connect()
# 接続が成功するまで待つ
assert self._connected.wait(10), "接続に失敗しました"
return self
def disconnect(self):
self._connection.disconnect()
def _on_notify(self, raw_message: str):
# シグナリング通知のコールバック
message: Dict[str, Any] = json.loads(raw_message)
# event_type が connection.created で、
# connection_id が自分の connection_id と一致する場合、接続が成功
if (
message["type"] == "notify"
and message["event_type"] == "connection.created"
and message["connection_id"] == self._connection_id
):
print(f"Sora に接続しました: connection_id={self._connection_id}")
# 接続が成功したら connected をセット
self._connected.set()
def _on_set_offer(self, raw_message: str):
# シグナリング type: offer のコールバック
message: Dict[str, Any] = json.loads(raw_message)
# type: offer に自分の connection_id が入ってくるので取得しておく
if message["type"] == "offer":
self._connection_id = message["connection_id"]
def _on_disconnect(self, error_code: SoraSignalingErrorCode, message: str):
# 切断時のコールバック
print(f"Sora から切断されました: error_code={error_code}, message={message}")
self._closed = True
# 切断完了で connected をクリア
self._connected.clear()
def _on_frame(self, frame: SoraAudioFrame):
# frame が音声である確率を求める
# vad を利用して解析
voice_probability = self._vad.analyze(frame)
if voice_probability > 0.95: # 0.95 は libwebrtc の判定値
print(f"Voice! voice_probability={voice_probability}")
else:
pass
def _on_track(self, track: SoraMediaTrack):
# トラック受信時のコールバック
if track.kind == "audio":
# 音声ストリームの受信
self._audio_stream_sink = SoraAudioStreamSink(
track, self._audio_output_frequency, self._audio_output_channels
)
self._audio_stream_sink.on_frame = self._on_frame
def run(self):
try:
# 接続を維持
while not self._closed:
pass
except KeyboardInterrupt:
# キーボード割り込みの場合
pass
finally:
# 接続の切断
if self._connection:
self._connection.disconnect()
def main():
# 環境変数からシグナリング URL とチャネル ID を取得
signaling_url = os.getenv("SORA_SIGNALING_URL")
channel_id = os.getenv("SORA_CHANNEL_ID")
# signaling_url はリストである必要があるので、リストに変換
signaling_urls = [signaling_url]
# RecvonlySample インスタンスの生成
sample = RecvonlyVAD(signaling_urls, channel_id)
# Sora へ接続
sample.connect()
time.sleep(3)
sample.disconnect()
# 接続の維持する場合は sample.disconnect() の代わりに sample.run() を呼ぶ
# sample.run()
if __name__ == "__main__":
main()