WebRTC Encoded Transform

警告

この機能は実験的機能のため、正式版では仕様が変更される可能性があります

概要

WebRTC Encoded Transform とは WebRTC の送信前や受信後のタイミングで、エンコードされた音声や映像フレームを直接書き換える、ブラウザ WebRTC API の拡張的な機能として提供されている仕組みです。

Sora Python SDK では WebRTC Encoded Transform を SDK の一部として提供しています。

音声や映像、データを送るだけの通常の利用では必要ありません。主に音声や映像と 同時に 何かしらのデータを送りたい場合などに利用することを目的としています。

WebRTC Encoded Transform に付いては W3C の仕様よりも MDN の記事 Using WebRTC Encoded Transforms がわかりやすいです。

取得できるフレーム

そもそも WebRTC では音声や映像は RTP というプロトコルを利用し、それを暗号化された SRTP というプロトコルを利用します。 さらに RTP は 1200 バイト程度までしか 1 つのパケットで送れないため、より大きなサイズの場合は分割して送信します。

WebRTC Encoded Transform ではこの送信前の 分割した RTP にしていない状態 と、受信後の RTP を結合した状態 のエンコード済みの音声や映像のフレームを取得することができます。

利用方法

送信時

送信する音声や映像のフレームを変換するには SoraAudioFrameTransformer または SoraVideoFrameTransformer を利用します。

import json
import os
import threading
import time
from threading import Event
from typing import Optional

import numpy
from sora_sdk import (
    Sora,
    SoraAudioFrameTransformer,
    SoraAudioSource,
    SoraTransformableAudioFrame,
    SoraTransformableVideoFrame,
    SoraVideoFrameTransformer,
    SoraVideoSource,
)


class SendonlyEncodedTransform:
    def __init__(
        self,
        signaling_urls: list[str],
        channel_id: str,
    ):
        self._signaling_urls: list[str] = signaling_urls
        self._channel_id: str = channel_id

        self._connection_id: str

        # 接続した
        self._connected: Event = Event()
        # 終了
        self._closed = Event()

        self._audio_channels: int = 1
        self._audio_sample_rate: int = 16000

        self._video_width: int = 960
        self._video_height: int = 540

        self._sora = Sora()

        self._fake_audio_thread: Optional[threading.Thread] = None
        self._fake_video_thread: Optional[threading.Thread] = None

        self._audio_source: Optional[SoraAudioSource] = None
        self._audio_source = self._sora.create_audio_source(
            self._audio_channels, self._audio_sample_rate
        )

        self._video_source: Optional[SoraVideoSource] = None
        self._video_source = self._sora.create_video_source()

        # Audio 向けの Encoded Transformer
        self._audio_transformer = SoraAudioFrameTransformer()
        # Audio のエンコードフレームを受け取るコールバック関数を on_transform に設定
        self._audio_transformer.on_transform = self._on_audio_transform

        # Video 向けの Encoded Transformer
        self._video_transformer = SoraVideoFrameTransformer()
        # Video のエンコードフレームを受け取るコールバック関数を on_transform に設定
        self._video_transformer.on_transform = self._on_video_transform

        self._connection = self._sora.create_connection(
            signaling_urls=signaling_urls,
            role="sendonly",
            channel_id=channel_id,
            audio=True,
            video=True,
            audio_source=self._audio_source,
            video_source=self._video_source,
            audio_frame_transformer=self._audio_transformer,
            video_frame_transformer=self._video_transformer,
        )

        self._connection.on_set_offer = self._on_set_offer
        self._connection.on_notify = self._on_notify
        self._connection.on_disconnect = self._on_disconnect

    def __enter__(self) -> "SendonlyEncodedTransform":
        return self.connect()

    def __exit__(self, exc_type, exc_value, traceback) -> None:
        self.disconnect()

    def connect(self):
        self._fake_audio_thread = threading.Thread(
            target=self._fake_audio_loop, daemon=True
        )
        self._fake_audio_thread.start()

        self._fake_video_thread = threading.Thread(
            target=self._fake_video_loop, daemon=True
        )
        self._fake_video_thread.start()

        try:
            self._connection.connect()

            # _connected が set されるまで 30 秒待つ
            assert self._connected.wait(30)
        except Exception as e:
            # connect を呼び出したら、例外があったとしても必ず disconnect を呼び出す
            self._connection.disconnect()
            raise e

        return self

    def disconnect(self):
        self._connection.disconnect()

    def _fake_audio_loop(self):
        while not self._closed.is_set():
            time.sleep(0.02)
            if self._audio_source is not None:
                self._audio_source.on_data(numpy.zeros((320, 1), dtype=numpy.int16))

    def _fake_video_loop(self):
        while not self._closed.is_set():
            time.sleep(1.0 / 30)
            if self._video_source is not None:
                self._video_source.on_captured(
                    numpy.zeros(
                        (self._video_height, self._video_width, 3), dtype=numpy.uint8
                    )
                )

    def _on_set_offer(self, raw_offer):
        offer = json.loads(raw_offer)
        if offer["type"] == "offer":
            self._connection_id = offer["connection_id"]
            print(f"Received 'Offer': connection_id={self._connection_id}")

    def _on_notify(self, raw_message):
        message = json.loads(raw_message)
        if (
            message["type"] == "notify"
            and message["event_type"] == "connection.created"
            and message["connection_id"] == self._connection_id
        ):
            print(f"Connected Sora: connection_id={self._connection_id}")
            self._connected.set()

    def _on_disconnect(self, error_code, message):
        print(f"Disconnected Sora: error_code='{error_code}' message='{message}'")
        self._closed.set()
        self._connected.clear()

        if self._fake_audio_thread is not None:
            self._fake_audio_thread.join(timeout=10)

        if self._fake_video_thread is not None:
            self._fake_video_thread.join(timeout=10)

    def _on_audio_transform(self, frame: SoraTransformableAudioFrame):
        # この実装が Encoded Transform を利用する上での基本形となる

        # frame からエンコードされたフレームデータを取得する
        # 戻り値は ArrayLike になっている
        new_data = frame.get_data()

        # ここで new_data の末尾にデータをつける new_data を暗号化するなど任意の処理を実装する

        # 加工したフレームデータで frame の フレームデータを入れ替える
        frame.set_data(new_data)
        self._audio_transformer.enqueue(frame)

    def _on_video_transform(self, frame: SoraTransformableVideoFrame):
        # この実装が Encoded Transform を利用する上での基本形となる

        # frame からエンコードされたフレームデータを取得する
        # 戻り値は numpy.ndarray になっている
        new_data = frame.get_data()

        # ここで new_data の末尾にデータをつける new_data を暗号化するなど任意の処理を実装する

        # 加工したフレームデータで frame の フレームデータを入れ替える
        frame.set_data(new_data)
        self._video_transformer.enqueue(frame)

    def run(self) -> None:
        """接続を維持し、必要に応じて切断する"""
        try:
            # 接続を維持
            while not self._closed:
                pass
        except KeyboardInterrupt:
            # キーボード割り込みの場合
            pass
        finally:
            # 接続の切断
            if self._connection:
                self._connection.disconnect()


def main() -> None:
    """メイン関数: Sora への接続と切断を行う"""
    # 環境変数からシグナリング URL とチャネル ID を取得
    signaling_url = os.getenv("SORA_SIGNALING_URL")
    if not signaling_url:
        raise ValueError("環境変数 SORA_SIGNALING_URL が設定されていません")
    channel_id = os.getenv("SORA_CHANNEL_ID")
    if not channel_id:
        raise ValueError("環境変数 SORA_CHANNEL_ID が設定されていません")

    # signaling_url はリストである必要があるので、リストに変換
    signaling_urls: list[str] = [signaling_url]

    sample = SendonlyEncodedTransform(
        signaling_urls,
        channel_id,
    )

    # Sora へ接続
    sample.connect()
    # 接続の維持する場合は sample.connect().run() を呼ぶ
    # sample.connect().run()

    time.sleep(3)

    sample.disconnect()


if __name__ == "__main__":
    main()

受信時

受信した音声や映像のフレームを変換するには SoraMediaTrackSoraMediaTrack.set_frame_transformer() を利用します。

import json
import os
import time
from threading import Event
from typing import Optional

import numpy
from sora_sdk import (
    Sora,
    SoraAudioFrameTransformer,
    SoraMediaTrack,
    SoraTransformableAudioFrame,
    SoraTransformableVideoFrame,
    SoraVideoFrameTransformer,
)


class RecvonlyEncodedTransform:
    def __init__(
        self,
        signaling_urls: list[str],
        channel_id: str,
    ):
        self._signaling_urls: list[str] = signaling_urls
        self._channel_id: str = channel_id

        self._connection_id: str

        # 接続した
        self._connected: Event = Event()
        # 終了
        self._closed = Event()

        self._audio_output_frequency: int = 24000
        self._audio_output_channels: int = 1

        self._sora = Sora()

        self._connection = self._sora.create_connection(
            signaling_urls=signaling_urls,
            role="recvonly",
            channel_id=channel_id,
            audio=True,
            video=True,
        )

        self._connection.on_set_offer = self._on_set_offer
        self._connection.on_notify = self._on_notify
        self._connection.on_disconnect = self._on_disconnect

        self._connection.on_track = self._on_track

    def __enter__(self) -> "RecvonlyEncodedTransform":
        return self.connect()

    def __exit__(self, exc_type, exc_value, traceback) -> None:
        self.disconnect()

    def connect(self):
        try:
            self._connection.connect()

            # _connected が set されるまで 30 秒待つ
            assert self._connected.wait(30)
        except Exception as e:
            # connect を呼び出したら、例外があったとしても必ず disconnect を呼び出す
            self._connection.disconnect()
            raise e

        return self

    def disconnect(self):
        self._connection.disconnect()

    def _on_set_offer(self, raw_offer):
        offer = json.loads(raw_offer)
        if offer["type"] == "offer":
            self._connection_id = offer["connection_id"]
            print(f"Received 'Offer': connection_id={self._connection_id}")

    def _on_notify(self, raw_message):
        message = json.loads(raw_message)
        if (
            message["type"] == "notify"
            and message["event_type"] == "connection.created"
            and message["connection_id"] == self._connection_id
        ):
            print(f"Connected Sora: connection_id={self._connection_id}")
            self._connected.set()

    def _on_disconnect(self, error_code, message):
        print(f"Disconnected Sora: error_code='{error_code}' message='{message}'")
        self._closed = True
        self._connected.clear()

    def _on_track(self, track: SoraMediaTrack) -> None:
        if track.kind == "audio":
            # Audio 向けの Encoded Transformer
            self._audio_transformer = SoraAudioFrameTransformer()
            # Audio のエンコードフレームを受け取るコールバック関数を on_transform に設定
            self._audio_transformer.on_transform = self._on_audio_transform
            # Encoded Transformer を RTPReceiver に設定する
            track.set_frame_transformer(self._audio_transformer)
        if track.kind == "video":
            # Video 向けの Encoded Transformer
            self._video_transformer = SoraVideoFrameTransformer()
            # Video のエンコードフレームを受け取るコールバック関数を on_transform に設定
            self._video_transformer.on_transform = self._on_video_transform
            # Encoded Transformer を SoraMediaTrack に設定する
            track.set_frame_transformer(self._video_transformer)

    def _on_audio_transform(self, frame: SoraTransformableAudioFrame):
        # この実装が Encoded Transform を利用する上での基本形となる

        # frame からエンコードされたフレームデータを取得する
        # 戻り値は ArrayLike になっている
        new_data = frame.get_data()

        # ここで new_data の末尾にデータをつける new_data を暗号化するなど任意の処理を実装する

        # 加工したフレームデータで frame の フレームデータを入れ替える
        frame.set_data(new_data)
        self._audio_transformer.enqueue(frame)

    def _on_video_transform(self, frame: SoraTransformableVideoFrame):
        # この実装が Encoded Transform を利用する上での基本形となる
        # frame からエンコードされたフレームデータを取得する
        # 戻り値は ArrayLike になっている
        new_data = frame.get_data()

        # ここで new_data の末尾にデータをつける new_data を暗号化するなど任意の処理を実装する

        # 加工したフレームデータで frame の フレームデータを入れ替える
        frame.set_data(new_data)
        self._video_transformer.enqueue(frame)

    def run(self) -> None:
        """
        接続を維持し、必要に応じて切断する
        """
        try:
            # 接続を維持
            while not self._closed:
                pass
        except KeyboardInterrupt:
            # キーボード割り込みの場合
            pass
        finally:
            # 接続の切断
            if self._connection:
                self._connection.disconnect()


def main() -> None:
    """
    メイン関数: Sora への接続と切断を行う
    """
    # 環境変数からシグナリング URL とチャネル ID を取得
    signaling_url: Optional[str] = os.getenv("SORA_SIGNALING_URL")
    if not signaling_url:
        raise ValueError("環境変数 SORA_SIGNALING_URL が設定されていません")
    channel_id: Optional[str] = os.getenv("SORA_CHANNEL_ID")
    if not channel_id:
        raise ValueError("環境変数 SORA_CHANNEL_ID が設定されていません")

    # signaling_url はリストである必要があるので、リストに変換
    signaling_urls: list[str] = [signaling_url]

    sample = RecvonlyEncodedTransform(
        signaling_urls,
        channel_id,
    )

    # Sora へ接続
    with sample.connect():
        time.sleep(5)
    # 接続の維持する場合は sample.connect().run() を呼ぶ
    # sample.connect().run()


if __name__ == "__main__":
    main()

利用例

H.264 NAL ユニットの追加

  • これは H.264 NAL ユニットで SEI を追加する例です

  • 利用コーデックが H.264 の時のみ利用できます

import numpy as np

import (
   SoraTransformableVideoFrame
)

class SoraClient:
   # 色々省略

   def _on_video_transform(self, frame: SoraTransformableVideoFrame) :
      # データを取り出す
      new_data = frame.get_data()

      # UUID (16バイトの仮のUUID)
      uuid = np.array([
         0x12, 0x34, 0x56, 0x78, 0x9A, 0xBC, 0xDE, 0xF0,
         0x12, 0x34, 0x56, 0x78, 0x9A, 0xBC, 0xDE, 0xF0
      ], dtype=np.uint8)

      # 自由に追加するメタデータ (仮のメタデータ)
      metadata = np.array([0x01, 0x02, 0x03, 0x04], dtype=np.uint8)  # 4バイトのメタデータ

      # 仮のNALユニット (NALヘッダー + SEIペイロードタイプ + SEIペイロードサイズ + UUID + ペイロード)
      # NALユニットのタイプ: SEI
      nal_header = np.array([0x06], dtype=np.uint8)
      # SEIペイロードタイプ: ユーザーデータ未登録型
      sei_payload_type = np.array([5], dtype=np.uint8)
      # SEIペイロードのサイズ (UUIDの長さ + メタデータサイズ)
      sei_payload_size = np.array([len(uuid) + len(metdata)], dtype=np.uint8)

      # NALユニット全体を ndarray として結合して new_data の末尾に追加する
      new_data = np.concatenate((new_data, nal_header, sei_payload_type, sei_payload_size, uuid, metadata))

      # frame の data を上書きする
      frame.set_data(new_data)
      self._video_transformer.enqueue(frame)
© Copyright 2025, Shiguredo Inc. Created using Sphinx 8.2.3