import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.agenthuman.api import NewSessionRequest
from pipecat.services.agenthuman.video import AgentHumanVideoService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.elevenlabs.tts import ElevenLabsTTSService
from pipecat.services.google.llm import GoogleLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams, DailyTransport
load_dotenv(override=True)
# Lambdas defer transport params until the runner selects Daily vs WebRTC at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=True,
video_out_is_live=True,
video_out_width=1280,
video_out_height=960,
video_out_bitrate=2_000_000,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=True,
video_out_is_live=True,
video_out_width=1280,
video_out_height=960,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info("Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="cgSgspJ2msm6clMCkdW9",
)
llm = GoogleLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
settings=GoogleLLMService.Settings(
system_instruction=(
"You are a helpful assistant. Your output will be spoken aloud, so avoid "
"special characters that can't easily be spoken, such as emojis or bullet points. "
"Be succinct and respond to what the user said in a creative and helpful way."
),
),
)
agentHuman = AgentHumanVideoService(
api_key=os.getenv("AGENTHUMAN_API_KEY"),
session_request=NewSessionRequest(
avatar="avat_01KMZHXFPBVCXA5ATK85HCP8G1"
),
transport=transport,
)
context = LLMContext()
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
agentHuman,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
if isinstance(transport, DailyTransport):
await transport.update_publishing(
publishing_settings={
"camera": {
"sendSettings": {
"allowAdaptiveLayers": True,
}
}
}
)
context.add_message(
{
"role": "developer",
"content": "Start by saying 'Hello' and then a short greeting.",
}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud and the local runner."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()