WebRTC Encoded Transform¶
警告
この機能は実験的機能のため、正式版では仕様が変更される可能性があります
概要¶
WebRTC Encoded Transform とは WebRTC の送信前や受信後のタイミングで、エンコードされた音声や映像フレームを直接書き換える、ブラウザ WebRTC API の拡張的な機能として提供されている仕組みです。
Sora Python SDK では WebRTC Encoded Transform を SDK の一部として提供しています。
音声や映像、データを送るだけの通常の利用では必要ありません。主に音声や映像と 同時に 何かしらのデータを送りたい場合などに利用することを目的としています。
WebRTC Encoded Transform に付いては W3C の仕様よりも MDN の記事 Using WebRTC Encoded Transforms がわかりやすいです。
取得できるフレーム¶
そもそも WebRTC では音声や映像は RTP というプロトコルを利用し、それを暗号化された SRTP というプロトコルを利用します。 さらに RTP は 1200 バイト程度までしか 1 つのパケットで送れないため、より大きなサイズの場合は分割して送信します。
WebRTC Encoded Transform ではこの送信前の 分割した RTP にしていない状態 と、受信後の RTP を結合した状態 のエンコード済みの音声や映像のフレームを取得することができます。
利用方法¶
送信時¶
送信する音声や映像のフレームを変換するには SoraAudioFrameTransformer
または SoraVideoFrameTransformer
を利用します。
import json
import os
import threading
import time
from threading import Event
from typing import Optional
import numpy
from sora_sdk import (
Sora,
SoraAudioFrameTransformer,
SoraAudioSource,
SoraTransformableAudioFrame,
SoraTransformableVideoFrame,
SoraVideoFrameTransformer,
SoraVideoSource,
)
class SendonlyEncodedTransform:
def __init__(
self,
signaling_urls: list[str],
channel_id: str,
):
self._signaling_urls: list[str] = signaling_urls
self._channel_id: str = channel_id
self._connection_id: str
# 接続した
self._connected: Event = Event()
# 終了
self._closed = Event()
self._audio_channels: int = 1
self._audio_sample_rate: int = 16000
self._video_width: int = 960
self._video_height: int = 540
self._sora = Sora()
self._fake_audio_thread: Optional[threading.Thread] = None
self._fake_video_thread: Optional[threading.Thread] = None
self._audio_source: Optional[SoraAudioSource] = None
self._audio_source = self._sora.create_audio_source(
self._audio_channels, self._audio_sample_rate
)
self._video_source: Optional[SoraVideoSource] = None
self._video_source = self._sora.create_video_source()
# Audio 向けの Encoded Transformer
self._audio_transformer = SoraAudioFrameTransformer()
# Audio のエンコードフレームを受け取るコールバック関数を on_transform に設定
self._audio_transformer.on_transform = self._on_audio_transform
# Video 向けの Encoded Transformer
self._video_transformer = SoraVideoFrameTransformer()
# Video のエンコードフレームを受け取るコールバック関数を on_transform に設定
self._video_transformer.on_transform = self._on_video_transform
self._connection = self._sora.create_connection(
signaling_urls=signaling_urls,
role="sendonly",
channel_id=channel_id,
audio=True,
video=True,
audio_source=self._audio_source,
video_source=self._video_source,
audio_frame_transformer=self._audio_transformer,
video_frame_transformer=self._video_transformer,
)
self._connection.on_set_offer = self._on_set_offer
self._connection.on_notify = self._on_notify
self._connection.on_disconnect = self._on_disconnect
def __enter__(self) -> "SendonlyEncodedTransform":
return self.connect()
def __exit__(self, exc_type, exc_value, traceback) -> None:
self.disconnect()
def connect(self):
self._fake_audio_thread = threading.Thread(
target=self._fake_audio_loop, daemon=True
)
self._fake_audio_thread.start()
self._fake_video_thread = threading.Thread(
target=self._fake_video_loop, daemon=True
)
self._fake_video_thread.start()
try:
self._connection.connect()
# _connected が set されるまで 30 秒待つ
assert self._connected.wait(30)
except Exception as e:
# connect を呼び出したら、例外があったとしても必ず disconnect を呼び出す
self._connection.disconnect()
raise e
return self
def disconnect(self):
self._connection.disconnect()
def _fake_audio_loop(self):
while not self._closed.is_set():
time.sleep(0.02)
if self._audio_source is not None:
self._audio_source.on_data(numpy.zeros((320, 1), dtype=numpy.int16))
def _fake_video_loop(self):
while not self._closed.is_set():
time.sleep(1.0 / 30)
if self._video_source is not None:
self._video_source.on_captured(
numpy.zeros(
(self._video_height, self._video_width, 3), dtype=numpy.uint8
)
)
def _on_set_offer(self, raw_offer):
offer = json.loads(raw_offer)
if offer["type"] == "offer":
self._connection_id = offer["connection_id"]
print(f"Received 'Offer': connection_id={self._connection_id}")
def _on_notify(self, raw_message):
message = json.loads(raw_message)
if (
message["type"] == "notify"
and message["event_type"] == "connection.created"
and message["connection_id"] == self._connection_id
):
print(f"Connected Sora: connection_id={self._connection_id}")
self._connected.set()
def _on_disconnect(self, error_code, message):
print(f"Disconnected Sora: error_code='{error_code}' message='{message}'")
self._closed.set()
self._connected.clear()
if self._fake_audio_thread is not None:
self._fake_audio_thread.join(timeout=10)
if self._fake_video_thread is not None:
self._fake_video_thread.join(timeout=10)
def _on_audio_transform(self, frame: SoraTransformableAudioFrame):
# この実装が Encoded Transform を利用する上での基本形となる
# frame からエンコードされたフレームデータを取得する
# 戻り値は ArrayLike になっている
new_data = frame.get_data()
# ここで new_data の末尾にデータをつける new_data を暗号化するなど任意の処理を実装する
# 加工したフレームデータで frame の フレームデータを入れ替える
frame.set_data(new_data)
self._audio_transformer.enqueue(frame)
def _on_video_transform(self, frame: SoraTransformableVideoFrame):
# この実装が Encoded Transform を利用する上での基本形となる
# frame からエンコードされたフレームデータを取得する
# 戻り値は numpy.ndarray になっている
new_data = frame.get_data()
# ここで new_data の末尾にデータをつける new_data を暗号化するなど任意の処理を実装する
# 加工したフレームデータで frame の フレームデータを入れ替える
frame.set_data(new_data)
self._video_transformer.enqueue(frame)
def run(self) -> None:
"""接続を維持し、必要に応じて切断する"""
try:
# 接続を維持
while not self._closed:
pass
except KeyboardInterrupt:
# キーボード割り込みの場合
pass
finally:
# 接続の切断
if self._connection:
self._connection.disconnect()
def main() -> None:
"""メイン関数: Sora への接続と切断を行う"""
# 環境変数からシグナリング URL とチャネル ID を取得
signaling_url = os.getenv("SORA_SIGNALING_URL")
if not signaling_url:
raise ValueError("環境変数 SORA_SIGNALING_URL が設定されていません")
channel_id = os.getenv("SORA_CHANNEL_ID")
if not channel_id:
raise ValueError("環境変数 SORA_CHANNEL_ID が設定されていません")
# signaling_url はリストである必要があるので、リストに変換
signaling_urls: list[str] = [signaling_url]
sample = SendonlyEncodedTransform(
signaling_urls,
channel_id,
)
# Sora へ接続
sample.connect()
# 接続の維持する場合は sample.connect().run() を呼ぶ
# sample.connect().run()
time.sleep(3)
sample.disconnect()
if __name__ == "__main__":
main()
受信時¶
受信した音声や映像のフレームを変換するには SoraMediaTrack
の SoraMediaTrack.set_frame_transformer()
を利用します。
import json
import os
import time
from threading import Event
from typing import Optional
import numpy
from sora_sdk import (
Sora,
SoraAudioFrameTransformer,
SoraMediaTrack,
SoraTransformableAudioFrame,
SoraTransformableVideoFrame,
SoraVideoFrameTransformer,
)
class RecvonlyEncodedTransform:
def __init__(
self,
signaling_urls: list[str],
channel_id: str,
):
self._signaling_urls: list[str] = signaling_urls
self._channel_id: str = channel_id
self._connection_id: str
# 接続した
self._connected: Event = Event()
# 終了
self._closed = Event()
self._audio_output_frequency: int = 24000
self._audio_output_channels: int = 1
self._sora = Sora()
self._connection = self._sora.create_connection(
signaling_urls=signaling_urls,
role="recvonly",
channel_id=channel_id,
audio=True,
video=True,
)
self._connection.on_set_offer = self._on_set_offer
self._connection.on_notify = self._on_notify
self._connection.on_disconnect = self._on_disconnect
self._connection.on_track = self._on_track
def __enter__(self) -> "RecvonlyEncodedTransform":
return self.connect()
def __exit__(self, exc_type, exc_value, traceback) -> None:
self.disconnect()
def connect(self):
try:
self._connection.connect()
# _connected が set されるまで 30 秒待つ
assert self._connected.wait(30)
except Exception as e:
# connect を呼び出したら、例外があったとしても必ず disconnect を呼び出す
self._connection.disconnect()
raise e
return self
def disconnect(self):
self._connection.disconnect()
def _on_set_offer(self, raw_offer):
offer = json.loads(raw_offer)
if offer["type"] == "offer":
self._connection_id = offer["connection_id"]
print(f"Received 'Offer': connection_id={self._connection_id}")
def _on_notify(self, raw_message):
message = json.loads(raw_message)
if (
message["type"] == "notify"
and message["event_type"] == "connection.created"
and message["connection_id"] == self._connection_id
):
print(f"Connected Sora: connection_id={self._connection_id}")
self._connected.set()
def _on_disconnect(self, error_code, message):
print(f"Disconnected Sora: error_code='{error_code}' message='{message}'")
self._closed = True
self._connected.clear()
def _on_track(self, track: SoraMediaTrack) -> None:
if track.kind == "audio":
# Audio 向けの Encoded Transformer
self._audio_transformer = SoraAudioFrameTransformer()
# Audio のエンコードフレームを受け取るコールバック関数を on_transform に設定
self._audio_transformer.on_transform = self._on_audio_transform
# Encoded Transformer を RTPReceiver に設定する
track.set_frame_transformer(self._audio_transformer)
if track.kind == "video":
# Video 向けの Encoded Transformer
self._video_transformer = SoraVideoFrameTransformer()
# Video のエンコードフレームを受け取るコールバック関数を on_transform に設定
self._video_transformer.on_transform = self._on_video_transform
# Encoded Transformer を SoraMediaTrack に設定する
track.set_frame_transformer(self._video_transformer)
def _on_audio_transform(self, frame: SoraTransformableAudioFrame):
# この実装が Encoded Transform を利用する上での基本形となる
# frame からエンコードされたフレームデータを取得する
# 戻り値は ArrayLike になっている
new_data = frame.get_data()
# ここで new_data の末尾にデータをつける new_data を暗号化するなど任意の処理を実装する
# 加工したフレームデータで frame の フレームデータを入れ替える
frame.set_data(new_data)
self._audio_transformer.enqueue(frame)
def _on_video_transform(self, frame: SoraTransformableVideoFrame):
# この実装が Encoded Transform を利用する上での基本形となる
# frame からエンコードされたフレームデータを取得する
# 戻り値は ArrayLike になっている
new_data = frame.get_data()
# ここで new_data の末尾にデータをつける new_data を暗号化するなど任意の処理を実装する
# 加工したフレームデータで frame の フレームデータを入れ替える
frame.set_data(new_data)
self._video_transformer.enqueue(frame)
def run(self) -> None:
"""
接続を維持し、必要に応じて切断する
"""
try:
# 接続を維持
while not self._closed:
pass
except KeyboardInterrupt:
# キーボード割り込みの場合
pass
finally:
# 接続の切断
if self._connection:
self._connection.disconnect()
def main() -> None:
"""
メイン関数: Sora への接続と切断を行う
"""
# 環境変数からシグナリング URL とチャネル ID を取得
signaling_url: Optional[str] = os.getenv("SORA_SIGNALING_URL")
if not signaling_url:
raise ValueError("環境変数 SORA_SIGNALING_URL が設定されていません")
channel_id: Optional[str] = os.getenv("SORA_CHANNEL_ID")
if not channel_id:
raise ValueError("環境変数 SORA_CHANNEL_ID が設定されていません")
# signaling_url はリストである必要があるので、リストに変換
signaling_urls: list[str] = [signaling_url]
sample = RecvonlyEncodedTransform(
signaling_urls,
channel_id,
)
# Sora へ接続
with sample.connect():
time.sleep(5)
# 接続の維持する場合は sample.connect().run() を呼ぶ
# sample.connect().run()
if __name__ == "__main__":
main()
利用例¶
H.264 NAL ユニットの追加¶
これは H.264 NAL ユニットで SEI を追加する例です
利用コーデックが H.264 の時のみ利用できます
import numpy as np
import (
SoraTransformableVideoFrame
)
class SoraClient:
# 色々省略
def _on_video_transform(self, frame: SoraTransformableVideoFrame) :
# データを取り出す
new_data = frame.get_data()
# UUID (16バイトの仮のUUID)
uuid = np.array([
0x12, 0x34, 0x56, 0x78, 0x9A, 0xBC, 0xDE, 0xF0,
0x12, 0x34, 0x56, 0x78, 0x9A, 0xBC, 0xDE, 0xF0
], dtype=np.uint8)
# 自由に追加するメタデータ (仮のメタデータ)
metadata = np.array([0x01, 0x02, 0x03, 0x04], dtype=np.uint8) # 4バイトのメタデータ
# 仮のNALユニット (NALヘッダー + SEIペイロードタイプ + SEIペイロードサイズ + UUID + ペイロード)
# NALユニットのタイプ: SEI
nal_header = np.array([0x06], dtype=np.uint8)
# SEIペイロードタイプ: ユーザーデータ未登録型
sei_payload_type = np.array([5], dtype=np.uint8)
# SEIペイロードのサイズ (UUIDの長さ + メタデータサイズ)
sei_payload_size = np.array([len(uuid) + len(metdata)], dtype=np.uint8)
# NALユニット全体を ndarray として結合して new_data の末尾に追加する
new_data = np.concatenate((new_data, nal_header, sei_payload_type, sei_payload_size, uuid, metadata))
# frame の data を上書きする
frame.set_data(new_data)
self._video_transformer.enqueue(frame)