音声デバイス

Sora Python SDK では音声デバイスを扱う機能を提供していません。

代わりに、 PortAudio の Python バインディングである sounddevice を使用することで、 音声デバイスのキャプチャや再生などを行うことができます。

sounddevice のインストール

pip

$ pip install sounddevice

rye

$ rye add sounddevice
$ rye sync

macOS

$ brew install portaudio

Ubuntu

$ sudo apt install libportaudio2

音声デバイス一覧を取得する

sounddevice.query_devices を利用する事で、音声デバイス一覧を取得することができます

from typing import Any, Dict, List

import sounddevice as sd


def get_available_audio_devices() -> None:
    """
    利用可能なオーディオデバイスの情報をシンプルなkey-value形式で表示する関数

    >>> get_available_audio_devices()
    利用可能なオーディオデバイス:
    1:
        name: DELL U4021QW
        host: Core Audio
        max_input_channels: 0
        max_output_channels: 2
        default_samplerate: 48000.0
        default_low_input_latency: 0.010000
        default_high_input_latency: 0.100000
    2:
        name: Logitech StreamCam
        host: Core Audio
        max_input_channels: 2
        max_output_channels: 0
        default_samplerate: 48000.0
        default_low_input_latency: 0.004583
        default_high_input_latency: 0.013917
    ...
    """
    devices: List[Dict[str, Any]] = sd.query_devices()

    print("利用可能なオーディオデバイス:")
    for i, device in enumerate(devices, 1):
        print(f"{i}:")
        print(f"    name: {device['name']}")

        host_api: Dict[str, Any] = sd.query_hostapis(device["hostapi"])
        print(f"    host: {host_api['name']}")

        print(f"    max_input_channels: {device['max_input_channels']}")
        print(f"    max_output_channels: {device['max_output_channels']}")
        print(f"    default_samplerate: {device['default_samplerate']}")
        print(
            f"    default_low_input_latency: {device['default_low_input_latency']:.6f}"
        )
        print(
            f"    default_high_input_latency: {device['default_high_input_latency']:.6f}"
        )
        print()  # デバイス間に空行を挿入


def main():
    get_available_audio_devices()


if __name__ == "__main__":
    main()

音声デバイスから音声キャプチャして再生する

sounddevice.play を利用する事で、音声デバイスから音声キャプチャして音声を表示することができます。

import time
from typing import Dict, List

import numpy as np
import sounddevice as sd


def list_audio_devices() -> List[Dict]:
    """
    利用可能なすべてのオーディオデバイスをリストアップし、表示します。

    Returns:
        List[Dict]: 利用可能なオーディオデバイスのリスト
    """
    devices: List[Dict] = sd.query_devices()
    print("利用可能なオーディオデバイス:")
    for i, device in enumerate(devices):
        print(
            f"{i}: {device['name']} (入力チャンネル: {device['max_input_channels']}, 出力チャンネル: {device['max_output_channels']})"
        )
    return devices


def get_valid_device_index(devices: List[Dict], is_input: bool = True) -> int:
    """
    ユーザーに有効なデバイスインデックスの入力を求めます。

    Args:
        devices (List[Dict]): 利用可能なオーディオデバイスのリスト
        is_input (bool): 入力デバイスを選択する場合は True 、出力デバイスの場合は False

    Returns:
        int: 選択されたデバイスのインデックス
    """
    while True:
        try:
            device_index: int = int(
                input(
                    f"{'入力' if is_input else '出力'}デバイスのインデックスを入力してください: "
                )
            )
            if 0 <= device_index < len(devices):
                if (is_input and devices[device_index]["max_input_channels"] > 0) or (
                    not is_input and devices[device_index]["max_output_channels"] > 0
                ):
                    return device_index
            print(
                f"無効なインデックスです。正しい{'入力' if is_input else '出力'}デバイスのインデックスを入力してください。"
            )
        except ValueError:
            print("数値を入力してください。")


def display_volume(volume: float) -> str:
    """
    音量レベルを視覚的に表示するための文字列を生成します。

    Args:
        volume (float): 0.0から1.0の範囲の音量レベル

    Returns:
        str: 音量レベルを表す文字列(バーグラフ付き)
    """
    amplification: int = 5  # 音量の増幅係数(表示を強調するため)
    max_bar_length: int = 50  # バーの最大長さ
    bar_length: int = int(min(volume * amplification * max_bar_length, max_bar_length))
    return f"Volume: [{'|' * bar_length}{' ' * (max_bar_length - bar_length)}] {volume:.4f}"


def capture_audio(input_device_index: int, duration: int = 5) -> np.ndarray:
    """
    指定されたデバイスから音声をキャプチャし、リアルタイムで音量を表示します。

    Args:
        input_device_index (int): 入力デバイスのインデックス
        duration (int): 録音時間(秒)

    Returns:
        np.ndarray: キャプチャされた音声データ
    """
    sample_rate: int = 48000  # サンプリングレート(Hz)
    channels: int = 1  # モノラル録音
    block_size: int = 1024  # 一度に処理するサンプル数

    print(
        f"デバイス '{sd.query_devices(device=input_device_index)['name']}' からの音声をキャプチャしています..."
    )
    print(f"サンプリングレート: {sample_rate} Hz")

    audio_data: List[np.ndarray] = []
    start_time: float = time.time()

    def audio_callback(
        indata: np.ndarray, frames: int, time_info: Dict, status: sd.CallbackFlags
    ) -> None:
        """
        音声データが利用可能になるたびに呼び出されるコールバック関数。

        Args:
            indata (np.ndarray): 入力音声データ
            frames (int): フレーム数
            time_info (Dict): タイムスタンプ情報
            status (sd.CallbackFlags): ステータスフラグ
        """
        if status:
            print(status)  # エラーがあれば表示
        audio_data.append(indata.copy())  # 音声データをリストに追加
        volume: float = np.sqrt(np.mean(indata**2))  # RMS音量を計算
        elapsed_time: int = int(time.time() - start_time)
        print(
            f"\r{display_volume(volume)} 録音中: {elapsed_time}/{duration} 秒", end=""
        )

    # InputStream を使用して音声をキャプチャ
    with sd.InputStream(
        device=input_device_index,
        channels=channels,
        samplerate=sample_rate,
        callback=audio_callback,
        blocksize=block_size,
    ):
        sd.sleep(duration * 1000)  # ミリ秒単位で待機

    print("\n録音完了")
    return np.concatenate(audio_data)  # 全ての音声データを1つの配列に結合


def play_audio(output_device_index: int, audio_data: np.ndarray) -> None:
    """
    キャプチャした音声データを再生します。

    Args:
        output_device_index (int): 出力デバイスのインデックス
        audio_data (np.ndarray): 再生する音声データ
    """
    sample_rate: int = 48000  # キャプチャ時と同じサンプリングレート
    print(
        f"デバイス '{sd.query_devices(device=output_device_index)['name']}' で音声を再生しています..."
    )
    print(f"サンプリングレート: {sample_rate} Hz")
    sd.play(audio_data, samplerate=sample_rate, device=output_device_index)
    sd.wait()  # 再生が完了するまで待機
    print("再生完了")


def main():
    # メインの実行フロー
    devices: List[Dict] = list_audio_devices()
    input_device_index: int = get_valid_device_index(devices, is_input=True)
    output_device_index: int = get_valid_device_index(devices, is_input=False)

    audio_data: np.ndarray = capture_audio(input_device_index)
    play_audio(output_device_index, audio_data)


if __name__ == "__main__":
    main()

音声デバイスをキャプチャして送信する

Sora.create_audio_source() を利用する事で、音声デバイスからキャプチャした音声を Sora に送信することができます。

import json
import os
import time
from threading import Event
from typing import Any, Dict, List, Optional

import numpy as np
import sounddevice as sd
from sora_sdk import Sora, SoraAudioSource, SoraConnection, SoraSignalingErrorCode


class SendonlyAudio:
    def __init__(self, signaling_urls: List[str], channel_id: str, device_id: int):
        """
        SendonlyAudio クラスのコンストラクタ

        :param signaling_urls: Sora シグナリングサーバーの URL リスト
        :param channel_id: 接続するチャンネルの ID
        :param device_id: 使用するオーディオデバイスの ID(デフォルト: None)
        """
        self._signaling_urls: List[str] = signaling_urls
        self._channel_id: str = channel_id

        self._connection_id: Optional[str] = None

        self._connected: Event = Event()
        self._closed: bool = False

        # オーディオ設定
        self._sample_rate: int = 48000
        self._channels: int = 1
        self._device_id: int = device_id

        # Sora SDK の初期化
        self._sora: Sora = Sora()
        self._audio_source: SoraAudioSource = self._sora.create_audio_source(
            channels=self._channels, sample_rate=self._sample_rate
        )

        # Sora への接続設定
        self._connection: SoraConnection = self._sora.create_connection(
            signaling_urls=signaling_urls,
            role="sendonly",
            channel_id=channel_id,
            audio=True,
            video=False,
            audio_source=self._audio_source,
        )

        # コールバック関数の設定
        self._connection.on_set_offer = self._on_set_offer
        self._connection.on_notify = self._on_notify
        self._connection.on_disconnect = self._on_disconnect

    def connect(self) -> "SendonlyAudio":
        """
        Sora サーバーに接続し、オーディオ入力ストリームを開始する

        :return: self (メソッドチェーン用)
        """
        # Sora へ接続
        self._connection.connect()

        # オーディオ入力ストリームを開始
        self._audio_stream = sd.InputStream(
            samplerate=self._sample_rate,
            channels=self._channels,
            dtype="int16",
            device=self._device_id,
            callback=self._audio_callback,
        )
        self._audio_stream.start()

        # 接続が成功するまで待つ
        assert self._connected.wait(10), "接続に失敗しました"
        return self

    def _audio_callback(
        self, indata: np.ndarray, frames: int, time: Any, status: Any
    ) -> None:
        """
        オーディオ入力コールバック関数

        :param indata: 入力オーディオデータ
        :param frames: フレーム数
        :param time: タイムスタンプ
        :param status: ストリームのステータス
        """
        self._audio_source.on_data(indata)

    def disconnect(self) -> None:
        """
        Sora サーバーから切断し、リソースを解放する
        """
        self._connection.disconnect()
        if hasattr(self, "_audio_stream"):
            self._audio_stream.stop()
            self._audio_stream.close()

    def _on_notify(self, raw_message: str) -> None:
        """
        シグナリング通知のコールバック

        :param raw_message: サーバーからの生の通知メッセージ
        """
        message: Dict[str, Any] = json.loads(raw_message)
        if (
            message["type"] == "notify"
            and message["event_type"] == "connection.created"
            and message["connection_id"] == self._connection_id
        ):
            print(f"Sora に接続しました: connection_id={self._connection_id}")
            self._connected.set()

    def _on_set_offer(self, raw_message: str) -> None:
        """
        シグナリング type: offer のコールバック

        :param raw_message: サーバーからのオファーメッセージ
        """
        message: Dict[str, Any] = json.loads(raw_message)
        if message["type"] == "offer":
            self._connection_id = message["connection_id"]

    def _on_disconnect(self, error_code: SoraSignalingErrorCode, message: str) -> None:
        """
        切断時のコールバック

        :param error_code: 切断理由を示すエラーコード
        :param message: 切断に関する追加メッセージ
        """
        print(f"Sora から切断されました: error_code={error_code}, message={message}")
        self._closed = True
        self._connected.clear()

    def run(self) -> None:
        """
        メインループ: 接続を維持し、必要に応じて切断処理を行う
        """
        try:
            while not self._closed:
                pass
        except KeyboardInterrupt:
            pass
        finally:
            if self._connection:
                self.disconnect()


def main() -> None:
    """
    メイン関数: SendonlyAudio インスタンスを作成し、実行する
    """
    signaling_url = os.getenv("SORA_SIGNALING_URL")
    if not signaling_url:
        raise ValueError("環境変数 SORA_SIGNALING_URL が設定されていません")
    channel_id = os.getenv("SORA_CHANNEL_ID")
    if not channel_id:
        raise ValueError("環境変数 SORA_CHANNEL_ID が設定されていません")

    signaling_urls: List[str] = [signaling_url]

    sample: SendonlyAudio = SendonlyAudio(signaling_urls, channel_id, 1)

    # Sora へ接続
    sample.connect()
    # 接続の維持する場合は sample.connect() の代わりに sample.connect().run() を呼ぶ
    # sample.connect().run()

    time.sleep(3)

    sample.disconnect()


if __name__ == "__main__":
    main()
© Copyright 2024, Shiguredo Inc. Created using Sphinx 8.0.2