Skip to content

Instantly share code, notes, and snippets.

@gkmngrgn
Created January 27, 2026 10:59
Show Gist options
  • Select an option

  • Save gkmngrgn/ba39f6e22072e848bd4a8a9685fee5dc to your computer and use it in GitHub Desktop.

Select an option

Save gkmngrgn/ba39f6e22072e848bd4a8a9685fee5dc to your computer and use it in GitHub Desktop.
import os
from loguru import logger
from pipecat.audio.filters.aic_filter import AICFilter
from pipecat.frames.frames import Frame, InputAudioRawFrame, OutputAudioRawFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.transports.base_transport import BaseTransport, TransportParams
# loopback processor
class AudioFrameConverter(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, InputAudioRawFrame):
output_frame = OutputAudioRawFrame(
audio=frame.audio,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels,
)
await self.push_frame(output_frame, direction)
else:
await self.push_frame(frame, direction)
# bot logic
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info("Bot starting: Direct Audio Loopback with AIC Filter")
converter = AudioFrameConverter()
pipeline = Pipeline(
[
transport.input(),
converter,
transport.output(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("WebRTC Client Connected")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("WebRTC Client Disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
# initialize aic filter
aic_filter = AICFilter(
license_key=os.getenv("AICOUSTICS_LICENSE_KEY", ""),
model_id="quail-vf-l-16khz", # or "quail-vf-l-16khz", "sparrow-xxs-48khz"
)
transport_params = {
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_filter=aic_filter,
)
}
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment