Building a Real-time Voice AI Agent with Cerebrium and PipeCat

6 mn read

In this tutorial, we’ll create a real-time voice AI agent that can respond to queries via speech in approximately 500ms. We’ll use Cerebrium for deployment and scaling, and PipeCat as our framework for handling voice and multimodal conversational AI. This implementation is flexible, allowing you to swap in different Large Language Models (LLM), Text-to-Speech (TTS), and Speech-to-Text (STT) models as needed.

Credit: This tutorial is based on an example provided by Cerebrium (https://www.cerebrium.ai/).

Cerebrium Setup

First, sign up for a Cerebrium account at https://www.cerebrium.ai/ and follow their documentation to set up your environment.

Create a starter project using the command:

cerebrium init voice-agent

This creates two files:

  • main.py: Your entrypoint file
  • cerebrium.toml: Configuration file for build and environment settings

Add the following to your cerebrium.toml:

[cerebrium.deployment]
# existing values...
docker_base_image_url = "registry.cerebrium.ai/daily:latest"

[cerebrium.hardware]
region = "us-east-1"
provider = "aws"
gpu = "AMPERE_A10"
cpu = 4
memory = 18.0
gpu_count = 1

[cerebrium.dependencies.pip]
torch = ">=2.0.0"
"pipecat-ai[silero, daily, openai, deepgram]" = "latest"
aiohttp = "latest"
torchaudio = "latest"
vllm = "latest"
huggingface_hub = "latest"

PipeCat Setup

We’ll use Llama 3 8B as our LLM, served via vLLM. Authenticate with Hugging Face and accept the model permissions for Llama 8B.

Add your Hugging Face token as a secret in your Cerebrium dashboard. Name it “HF_TOKEN”.

Add the following code to your main.py:

from huggingface_hub import login
import subprocess
import os
from multiprocessing import Process
from loguru import logger
import time

os.environ['OUTLINES_CACHE_DIR'] = '/tmp/.outlines'

login(token=get_secret('HF_TOKEN'))

# Run vllM Server in background process
def start_server():
    while True:
        process = subprocess.Popen(
            f"python -m vllm.entrypoints.openai.api_server --port 5000 --model NousResearch/Meta-Llama-3-8B-Instruct --dtype bfloat16 --api-key {get_secret('HF_TOKEN')} --download-dir /persistent-storage/",
            shell=True
        )
        process.wait()  # Wait for the process to complete
        logger.error("Server process ended unexpectedly. Restarting in 5 seconds...")
        time.sleep(5)  # Wait before restarting

# Start the server in a separate process
server_process = Process(target=start_server, daemon=True)
server_process.start()

Implementing PipeCat Framework

Now we implement the Pipecat framework by instantiating the various components. Create a function call main with the following code

import aiohttp
import sys
import asyncio
from pipecat.vad.vad_analyzer import VADParams
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.services.openai import OpenAILLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.pipeline import Pipeline
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.processors.aggregators.llm_response import (
    LLMAssistantResponseAggregator, LLMUserResponseAggregator
)
from helpers import (
    ClearableDeepgramTTSService,
    AudioVolumeTimer,
    TranscriptionTimingLogger
)

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")

deepgram_voice: str = "aura-asteria-en"

async def main(room_url: str, token: str = None):
    async with aiohttp.ClientSession() as session:
        transport = DailyTransport(
            room_url,
            token if token else get_secret("DAILY_TOKEN"),
            "Respond bots",
            DailyParams(
                audio_out_enabled=True,
                transcription_enabled=False,
                vad_enabled=True,
                vad_analyzer=SileroVADAnalyzer(params=VADParams(
                    stop_secs=0.2
                )),
                vad_audio_passthrough=True
            )
        )

        stt = DeepgramSTTService(
            name="STT",
            api_key=None,
            url='ws://127.0.0.1:8082/v1/listen'
        )

        tts = ClearableDeepgramTTSService(
            name="Voice",
            aiohttp_session=session,
            api_key=None,
            voice=deepgram_voice,
            base_url="http://127.0.0.1:8082/v1/speak"
        )

        llm = OpenAILLMService(
            name="LLM",
            api_key=get_secret("HF_TOKEN"),
            model="NousResearch/Meta-Llama-3-8B-Instruct",
            base_url="http://0.0.0.0:5000/v1"
        )

        messages = [
            {
                "role": "system",
                "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
            },
        ]

        avt = AudioVolumeTimer()
        tl = TranscriptionTimingLogger(avt)

        tma_in = LLMUserResponseAggregator(messages)
        tma_out = LLMAssistantResponseAggregator(messages)

        pipeline = Pipeline([
            transport.input(),   # Transport user input
            avt,                 # Audio volume timer
            stt,                 # Speech-to-text
            tl,                  # Transcription timing logger
            tma_in,              # User responses
            llm,                 # LLM
            tts,                 # TTS
            transport.output(),  # Transport bot output
            tma_out,             # Assistant spoken responses
        ])

        task = PipelineTask(
            pipeline,
            PipelineParams(
                allow_interruptions=True,
                enable_metrics=True,
                report_only_initial_ttfb=True
            ))
  1. Main function initialization:
    • Sets up Daily transport layer for audio/video data
    • Connects to a specified room URL with authentication token
    • Configures VAD (Voice Activity Detection) with a 200ms pause threshold
  2. Local model connections:
    • Connects to Deepgram models running on port 8082 (specified in Docker base image)
    • Uses PipeCat to convert audio to text and vice versa
    • Connects to the locally running LLM model via vLLM server
  3. PipelineTask creation:
    • Combines all components into a PipelineTask
    • Customizable and supports various use cases, including Image and Vision
    • Allows easy handling of interruptions and model swapping
  4. Helper functions:
    • Import additional helper functions from a separate file
    • These functions assist with the overall implementation
    • The helper file (helpers.py) can be copied from the provided GitHub repository

This setup creates a flexible and customizable voice AI agent using PipeCat and Cerebrium, with easy integration of different models and handling of various scenarios.

Daily Event Webhooks:

Add the following code to handle Daily events

@transport.event_handler("on_first_participant_joined")
    async def on_first_participant_joined(transport, participant):
        messages.append(
            {"role": "system", "content": "Please introduce yourself to the user."})
        await task.queue_frame(LLMMessagesFrame(messages))

    @transport.event_handler("on_participant_left")
    async def on_participant_left(transport, participant, reason):
        await task.queue_frame(EndFrame())

    @transport.event_handler("on_call_state_updated")
    async def on_call_state_updated(transport, state):
        if state == "left":
            await task.queue_frame(EndFrame())

    runner = PipelineRunner()
    await runner.run(task)
    await session.close()
  1. Event handling:
    • Bot introduction: When the first participant joins, the bot introduces itself by adding a message to the conversation.
    • Multiple participant support: The system allows multiple participants to join, listen to, and interact with the bot.
    • Bot termination: When a participant leaves or the call ends, the bot terminates itself.
  2. Event attachment:
    • Events are attached to the “Transport” object, which represents the communication method (in this case, the meeting room).
  3. Pipeline execution:
    • The defined Pipeline task is passed to a pipeline runner.
    • The runner executes the task indefinitely until signaled to exit.
    • Exiting occurs when the call ends.

This structure allows for dynamic interaction with the bot based on participant actions and call status, while maintaining a continuous execution flow throughout the duration of the call. The PipeCat infrastructure provides the framework for managing these events and running the pipeline efficiently. If you want to read further about the PipeCat infrastructure you can read more here

Starting the Bot

  1. Instance management:
    • Set a minimum number of instances using “min_replicas” in cerebrium.toml
    • This ensures optimal user experience while also allowing for autoscaling
  2. vLLM server check:
    • Before the bot joins a meeting, a local GET request is made to ensure the vLLM server is live
    • Models typically take about 40 seconds to load into VRAM from disk
  3. Separate execution environment:
    • The main code runs in a separate execution environment to prevent multiple PipeCat instances
    • This is achieved by running the code as a background process
  4. REST API endpoint:
    • The background process serves as the entry point for the REST API to start the PipeCat bot
    • When the call ends (i.e., the PipeCat bot returns), a response is sent back to the API endpoint
  5. Function creation:
    • A function is created to handle this process, likely including:
      • Checking the vLLM server status
      • Starting the bot in a separate process
      • Waiting for the process to complete
      • Returning a response indicating the session has finished

This approach ensures that the bot is ready to join meetings, prevents conflicts between multiple instances, and provides a clean API interface for starting and managing bot sessions.

import requests

def check_vllm_model_status():
    url = "http://0.0.0.0:5000/v1/models"
    headers = {
        "Authorization": f"Bearer {get_secret('HF_TOKEN')}"
    }
    max_retries = 8
    for _ in range(max_retries):
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            return True
        time.sleep(15)
    return False

def start_bot(room_url: str, token: str = None):
    def target():
        asyncio.run(main(room_url, token))

    check_vllm_model_status()
    process = Process(target=target)
    process.start()
    process.join()  # Wait for the process to complete
    return {"message": "session finished"}

Creating a Meeting Room

:

  1. Versatility of Cerebrium:
    • Cerebrium can run any Python code, not just AI workloads.
  2. Demo functions:
    • Two functions are defined for the demo: a. Creating a room to join programmatically b. Generating a temporary token
    • Both the room and token are usable for only 5 minutes
  3. Implementation:
    • These functions use the Daily REST API
  4. Daily developer token:
    • Required for API access
    • Can be obtained from your Daily profile
    • If you don’t have an account, you can sign up here: https://www.daily.co/ (They offer a generous free tier)
  5. Getting the API key:
    • Go to the “developers” tab in your Daily profile
    • Fetch your API key
    • Add this key to your Cerebrium Secrets for secure access

This setup allows for programmatic creation of temporary rooms and tokens, integrating Daily’s API with your Cerebrium-deployed application.

def create_room():
    url = "https://api.daily.co/v1/rooms/"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {get_secret('DAILY_TOKEN')}"
    }
    data = {
        "properties": {
            "exp": int(time.time()) + 60*5 ##5 mins
        }
    }

    response = requests.post(url, headers=headers, json=data)
    if response.status_code == 200:
        room_info = response.json()
        token = create_token(room_info['name'])
        if token and 'token' in token:
            room_info['token'] = token['token']
        else:
            logger.error("Failed to create token")
            return {"message": 'There was an error creating your room', "status_code": 500}
        return room_info
    else:
        logger.error(f"Failed to create room: {response.status_code}")
        return {"message": 'There was an error creating your room', "status_code": 500}

def create_token(room_name: str):
    url = "https://api.daily.co/v1/meeting-tokens"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {get_secret('DAILY_TOKEN')}"
    }
    data = {
        "properties": {
            "room_name": room_name
        }
    }

    response = requests.post(url, headers=headers, json=data)
    if response.status_code == 200:
        token_info = response.json()
        return token_info
    else:
        logger.error(f"Failed to create token: {response.status_code}")
        return None

Deployment to Cerebrium

Deploy your application to Cerebrium using the command

Connecting the Frontend

Clone the PipeCat frontend repository

Follow the instructions in the README.md and populate the following variables in your .env.development.local

VITE_SERVER_URL=https://api.cortex.cerebrium.ai/v4/p-xxxxx/<APP_NAME>
VITE_SERVER_AUTH=<Your Cerebrium JWT token>

Run yarn dev and navigate to http://localhost:5173/ to test your application.

Conclusion

This tutorial provides a comprehensive guide to creating a real-time voice AI agent using Cerebrium and PipeCat. By leveraging these tools, you can build powerful voice-based AI solutions with optimal performance and scalability.

Leave a Reply

Your email address will not be published. Required fields are marked *

Reading is essential for those who seek to rise above the ordinary.

ABOUT US

The internet as we know is powerful. Its underlying technologies are transformative, but also, there’s a plethora of haphazard information out there.We are here to serve you as a reliable and credible source to gain consistent information

© 2024, cloudiafrica
Cloudi Africa