Created
January 29, 2026 22:55
-
-
Save medaminebt/b25a40994c47104c91134ea89ef6847a to your computer and use it in GitHub Desktop.
New call.py file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Call routes | |
| """ | |
| # blob / text khalihom nafshom / | |
| # audio visualizer > page html , javascript ; | |
| # pylint: disable=logging-fstring-interpolation, broad-exception-caught | |
| import logging | |
| import json | |
| import os | |
| import datetime | |
| from io import StringIO | |
| from typing import List, Dict, Any, Optional | |
| from fastapi import APIRouter, Depends, Request, Form, HTTPException, Response | |
| from fastapi.responses import ( | |
| JSONResponse, | |
| FileResponse, | |
| ) | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.security import HTTPBearer | |
| from sqlalchemy.orm import Session | |
| from sqlalchemy import func, desc, asc | |
| from app.db.database import get_db | |
| from app.db.models import ( | |
| Call, | |
| Recording, | |
| Transcript, | |
| Assistant, | |
| ChatMessage, | |
| WebhookLog, | |
| ) | |
| from app.services.assistant_service import AssistantService | |
| from app.services.auth_service import AuthService | |
| from app.utils.config import config | |
| # Create router without a prefix - web routes will be at the root level | |
| router = APIRouter(tags=["web"]) | |
| # Initialize Jinja2 templates (keeping for reference but not using for JSON responses) | |
| templates = Jinja2Templates(directory="app/templates") | |
| # Initialize logger | |
| logger = logging.getLogger(__name__) | |
| # Security | |
| security = HTTPBearer(auto_error=False) | |
| auth_service = AuthService() | |
| def get_template_context(request: Request, **extra_context) -> dict: | |
| """Get template context with session data and any extra context.""" | |
| context = { | |
| "request": request, | |
| "session": { | |
| "user_id": request.session.get("user_id"), | |
| "organization_id": request.session.get("organization_id"), | |
| "user_email": request.session.get("user_email", ""), | |
| "user_first_name": request.session.get("user_first_name", ""), | |
| "user_last_name": request.session.get("user_last_name", ""), | |
| "organization_name": request.session.get("organization_name", ""), | |
| "organization_slug": request.session.get("organization_slug", ""), | |
| "api_key_count": request.session.get("api_key_count", 0), | |
| }, | |
| } | |
| context.update(extra_context) | |
| return context | |
| @router.get("/calls/{call_id}/recording/{recording_id}") | |
| async def download_recording( | |
| request: Request, call_id: int, recording_id: int, db: Session = Depends(get_db) | |
| ): | |
| """Download or serve recording from S3.""" | |
| recording = ( | |
| db.query(Recording) | |
| .filter(Recording.id == recording_id, Recording.call_id == call_id) | |
| .first() | |
| ) | |
| if not recording: | |
| raise HTTPException(status_code=404, detail="Recording not found") | |
| # Log debug info | |
| logger.info( | |
| f"Recording {recording_id}: s3_key={recording.s3_key}, recording_source={recording.recording_source}, status={recording.status}" | |
| ) | |
| # Check if recording is still processing | |
| if recording.status == "processing": | |
| # Return a 202 Accepted status with helpful message | |
| return Response( | |
| content='{"status": "processing", "message": "Recording is still being processed. Please try again in a moment."}', | |
| status_code=202, | |
| media_type="application/json", | |
| headers={ | |
| "Retry-After": "10" # Suggest retry after 10 seconds | |
| } | |
| ) | |
| # Check if recording failed | |
| if recording.status == "failed": | |
| raise HTTPException(status_code=404, detail="Recording processing failed") | |
| # Check if this is an S3 recording | |
| if recording.recording_source == "s3" and recording.s3_key: | |
| try: | |
| from app.services.s3_service import S3Service | |
| s3_service = S3Service.create_default_instance() | |
| # Download the file from S3 | |
| audio_data = await s3_service.download_audio_file(recording.s3_key) | |
| if audio_data: | |
| # Determine content type based on format | |
| content_type = "audio/mpeg" if recording.format == "mp3" else "audio/wav" | |
| # Generate filename | |
| filename = f"recording_{recording.recording_type}_{recording.id}.{recording.format}" | |
| # Return the audio file for inline viewing/playing in browser | |
| return Response( | |
| content=audio_data, | |
| media_type=content_type, | |
| headers={ | |
| "Content-Disposition": f"inline; filename={filename}", | |
| "Content-Length": str(len(audio_data)), | |
| "Accept-Ranges": "bytes", | |
| "Cache-Control": "public, max-age=3600" | |
| } | |
| ) | |
| else: | |
| raise HTTPException(status_code=404, detail="Recording file not found in S3") | |
| except Exception as e: | |
| logger.error(f"Error downloading recording from S3 {recording_id}: {e}") | |
| raise HTTPException(status_code=500, detail="Error downloading recording file") | |
| else: | |
| # Legacy support for non-S3 recordings or missing S3 key | |
| logger.error( | |
| f"Recording not available: s3_key={recording.s3_key}, recording_source={recording.recording_source}, status={recording.status}" | |
| ) | |
| raise HTTPException(status_code=404, detail="Recording file not available") | |
| @router.get("/calls/{call_id}/recording/{recording_id}/play") | |
| async def play_recording( | |
| request: Request, call_id: int, recording_id: int, db: Session = Depends(get_db) | |
| ): | |
| """Serve recording for in-browser audio player from S3.""" | |
| recording = ( | |
| db.query(Recording) | |
| .filter(Recording.id == recording_id, Recording.call_id == call_id) | |
| .first() | |
| ) | |
| logger = logging.getLogger(__name__) | |
| if not recording: | |
| raise HTTPException(status_code=404, detail="Recording not found") | |
| # Check if this is an S3 recording | |
| if recording.recording_source == "s3" and recording.s3_key: | |
| try: | |
| from app.services.s3_service import S3Service | |
| s3_service = S3Service.create_default_instance() | |
| # Download the file from S3 | |
| audio_data = await s3_service.download_audio_file(recording.s3_key) | |
| if audio_data: | |
| # Determine content type based on format | |
| content_type = "audio/mpeg" if recording.format == "mp3" else "audio/wav" | |
| # Return the audio file for streaming/playing | |
| return Response( | |
| content=audio_data, | |
| media_type=content_type, | |
| headers={ | |
| "Content-Length": str(len(audio_data)), | |
| "Accept-Ranges": "bytes", | |
| "Cache-Control": "public, max-age=3600" | |
| } | |
| ) | |
| else: | |
| raise HTTPException(status_code=404, detail="Recording file not found in S3") | |
| except Exception as e: | |
| logger.error(f"Error downloading recording from S3 for audio player {recording_id}: {e}") | |
| raise HTTPException(status_code=500, detail="Error loading recording file") | |
| else: | |
| raise HTTPException( | |
| status_code=404, detail="S3 recording file not available" | |
| ) | |
| @router.get("/calls/{call_id}/transcripts/export") | |
| async def export_transcripts( | |
| request: Request, call_id: int, format: str = "txt", db: Session = Depends(get_db) | |
| ): | |
| """Export call transcripts in various formats.""" | |
| call = db.query(Call).filter(Call.id == call_id).first() | |
| if not call: | |
| raise HTTPException(status_code=404, detail="Call not found") | |
| transcripts = ( | |
| db.query(Transcript) | |
| .filter(Transcript.call_id == call_id) | |
| .order_by(Transcript.created_at) | |
| .all() | |
| ) | |
| if not transcripts: | |
| raise HTTPException(status_code=404, detail="No transcripts found") | |
| if format == "txt": | |
| # Create plain text format | |
| content = f"Call Transcript - {call.call_sid}\n" | |
| content += f"Started: {call.started_at}\n" | |
| content += f"From: {call.customer_phone_number}\n" | |
| content += f"To: {call.to_phone_number}\n" | |
| content += "=" * 50 + "\n\n" | |
| for transcript in transcripts: | |
| speaker = transcript.speaker.upper() if transcript.speaker else "UNKNOWN" | |
| timestamp = transcript.created_at.strftime("%H:%M:%S") | |
| content += f"[{timestamp}] {speaker}: {transcript.content}\n" | |
| return Response( | |
| content=content, | |
| media_type="text/plain", | |
| headers={ | |
| "Content-Disposition": f"attachment; filename=call-{call.call_sid}-transcript.txt" | |
| }, | |
| ) | |
| elif format == "json": | |
| # Create JSON format | |
| transcript_data = { | |
| "call_sid": call.call_sid, | |
| "started_at": call.started_at.isoformat() if call.started_at else None, | |
| "customer_phone_number": call.customer_phone_number, | |
| "to_phone_number": call.to_phone_number, | |
| "transcripts": [ | |
| { | |
| "speaker": t.speaker, | |
| "content": t.content, | |
| "timestamp": t.created_at.isoformat() if t.created_at else None, | |
| "confidence": t.confidence, | |
| "is_final": t.is_final, | |
| } | |
| for t in transcripts | |
| ], | |
| } | |
| return Response( | |
| content=json.dumps(transcript_data, indent=2), | |
| media_type="application/json", | |
| headers={ | |
| "Content-Disposition": f"attachment; filename=call-{call.call_sid}-transcript.json" | |
| }, | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=400, detail="Unsupported format. Use 'txt' or 'json'." | |
| ) | |
| # ========== Calls Routes ========== | |
| @router.get("/calls", response_class=JSONResponse) | |
| async def list_calls( | |
| request: Request, | |
| page: int = 1, | |
| per_page: int = 10, | |
| search: str = None, | |
| status: str = None, | |
| assistant_id: int = None, | |
| date_range: str = None, | |
| sort_by: str = "started_at", | |
| sort_order: str = "desc", | |
| db: Session = Depends(get_db), | |
| ): | |
| """List calls with pagination, filtering, and sorting. Returns JSON.""" | |
| # Base query | |
| query = db.query(Call) | |
| # Apply search filter | |
| if search: | |
| search_term = f"%{search}%" | |
| query = query.filter( | |
| (Call.call_sid.ilike(search_term)) | |
| | (Call.customer_phone_number.ilike(search_term)) | |
| | (Call.to_phone_number.ilike(search_term)) | |
| ) | |
| # Apply status filter | |
| if status: | |
| if status == "active": | |
| query = query.filter(Call.status == "ongoing") | |
| elif status == "completed": | |
| query = query.filter(Call.status == "completed") | |
| elif status == "failed": | |
| query = query.filter( | |
| Call.status.in_(["failed", "no-answer", "busy", "canceled"]) | |
| ) | |
| else: | |
| query = query.filter(Call.status == status) | |
| # Apply assistant filter | |
| if assistant_id: | |
| query = query.filter(Call.assistant_id == assistant_id) | |
| # Apply date range filter | |
| if date_range: | |
| today = datetime.datetime.now().date() | |
| if date_range == "today": | |
| query = query.filter(func.date(Call.started_at) == today) | |
| elif date_range == "yesterday": | |
| yesterday = today - datetime.timedelta(days=1) | |
| query = query.filter(func.date(Call.started_at) == yesterday) | |
| elif date_range == "week": | |
| week_ago = today - datetime.timedelta(days=7) | |
| query = query.filter(Call.started_at >= week_ago) | |
| elif date_range == "month": | |
| month_ago = today - datetime.timedelta(days=30) | |
| query = query.filter(Call.started_at >= month_ago) | |
| # Get total count before pagination | |
| total_count = query.count() | |
| # Apply sorting | |
| if sort_by == "started_at": | |
| order_col = Call.started_at | |
| elif sort_by == "duration": | |
| order_col = Call.duration | |
| elif sort_by == "customer_phone": | |
| order_col = Call.customer_phone_number | |
| elif sort_by == "status": | |
| order_col = Call.status | |
| elif sort_by == "assistant": | |
| order_col = Assistant.name | |
| query = query.join(Assistant) | |
| else: | |
| order_col = Call.started_at | |
| if sort_order == "asc": | |
| query = query.order_by(asc(order_col)) | |
| else: | |
| query = query.order_by(desc(order_col)) | |
| # Apply pagination | |
| offset = (page - 1) * per_page | |
| calls = query.offset(offset).limit(per_page).all() | |
| # Calculate pagination info | |
| total_pages = (total_count + per_page - 1) // per_page | |
| has_prev = page > 1 | |
| has_next = page < total_pages | |
| # Calculate page range for pagination display | |
| page_range_start = max(1, page - 2) | |
| page_range_end = min(total_pages + 1, page + 3) | |
| page_numbers = list(range(page_range_start, page_range_end)) | |
| # Add additional data for each call | |
| calls_data = [] | |
| for call in calls: | |
| recording_count = ( | |
| db.query(Recording).filter(Recording.call_id == call.id).count() | |
| ) | |
| transcript_count = ( | |
| db.query(Transcript).filter(Transcript.call_id == call.id).count() | |
| ) | |
| # Calculate call quality from transcripts | |
| avg_confidence = ( | |
| db.query(func.avg(Transcript.confidence)) | |
| .filter(Transcript.call_id == call.id, Transcript.confidence.isnot(None)) | |
| .scalar() | |
| ) | |
| quality = int(avg_confidence * 100) if avg_confidence else None | |
| # Convert call to dict | |
| call_dict = { | |
| "id": call.id, | |
| "call_sid": call.call_sid, | |
| "assistant_id": call.assistant_id, | |
| "assistant_name": call.assistant.name if call.assistant else None, | |
| "customer_phone_number": call.customer_phone_number, | |
| "to_phone_number": call.to_phone_number, | |
| "status": call.status, | |
| "duration": call.duration, | |
| "started_at": call.started_at.isoformat() if call.started_at else None, | |
| "ended_at": call.ended_at.isoformat() if call.ended_at else None, | |
| "created_at": call.created_at.isoformat() if call.created_at else None, | |
| "updated_at": call.updated_at.isoformat() if call.updated_at else None, | |
| } | |
| calls_data.append( | |
| { | |
| "call": call_dict, | |
| "recording_count": recording_count, | |
| "transcript_count": transcript_count, | |
| "has_recording": recording_count > 0, | |
| "has_transcripts": transcript_count > 0, | |
| "quality": quality, | |
| } | |
| ) | |
| # Calculate overall statistics | |
| total_calls = db.query(Call).count() | |
| active_calls = db.query(Call).filter(Call.status == "ongoing").count() | |
| completed_calls = db.query(Call).filter(Call.status == "completed").count() | |
| failed_calls = ( | |
| db.query(Call) | |
| .filter(Call.status.in_(["failed", "no-answer", "busy", "canceled"])) | |
| .count() | |
| ) | |
| # Calculate average duration for completed calls | |
| avg_duration_result = ( | |
| db.query(func.avg(Call.duration)) | |
| .filter(Call.status == "completed", Call.duration.isnot(None)) | |
| .scalar() | |
| ) | |
| avg_duration = int(avg_duration_result) if avg_duration_result else 0 | |
| # Calculate success rate | |
| success_rate = (completed_calls / total_calls * 100) if total_calls > 0 else 0 | |
| # Get available assistants for filter dropdown | |
| organization_id = request.session.get("organization_id") | |
| if organization_id: | |
| assistants = await AssistantService.get_assistants(organization_id=organization_id, active_only=False) | |
| else: | |
| assistants = [] | |
| # Convert assistants to dict | |
| assistants_data = [] | |
| for assistant in assistants: | |
| assistants_data.append({ | |
| "id": assistant.id, | |
| "name": assistant.name, | |
| "description": assistant.description, | |
| "is_active": assistant.is_active, | |
| }) | |
| # Return JSON response | |
| return JSONResponse(content={ | |
| "calls_data": calls_data, | |
| "pagination": { | |
| "page": page, | |
| "per_page": per_page, | |
| "total_count": total_count, | |
| "total_pages": total_pages, | |
| "has_prev": has_prev, | |
| "has_next": has_next, | |
| "prev_page": page - 1 if has_prev else None, | |
| "next_page": page + 1 if has_next else None, | |
| "page_range_start": page_range_start, | |
| "page_range_end": page_range_end, | |
| "page_numbers": page_numbers, | |
| }, | |
| "filters": { | |
| "search": search or "", | |
| "status": status or "", | |
| "assistant_id": assistant_id or "", | |
| "date_range": date_range or "", | |
| "sort_by": sort_by, | |
| "sort_order": sort_order, | |
| }, | |
| "stats": { | |
| "total_calls": total_calls, | |
| "active_calls": active_calls, | |
| "completed_calls": completed_calls, | |
| "failed_calls": failed_calls, | |
| "success_rate": round(success_rate, 1), | |
| "avg_duration": avg_duration, | |
| }, | |
| "assistants": assistants_data, | |
| }) | |
| @router.get("/calls/{call_id}", response_class=JSONResponse) | |
| async def view_call(request: Request, call_id: int, db: Session = Depends(get_db)): | |
| """View a call with recordings, transcripts, chat messages, and webhook logs. Returns JSON.""" | |
| call = db.query(Call).filter(Call.id == call_id).first() | |
| if not call: | |
| raise HTTPException(status_code=404, detail="Call not found") | |
| # Get recordings for this call | |
| recordings = db.query(Recording).filter(Recording.call_id == call_id).all() | |
| # Get transcripts for this call, ordered by creation time | |
| transcripts = ( | |
| db.query(Transcript) | |
| .filter(Transcript.call_id == call_id) | |
| .order_by(Transcript.created_at) | |
| .all() | |
| ) | |
| # Get chat messages for this call, ordered by message index | |
| chat_messages = ( | |
| db.query(ChatMessage) | |
| .filter(ChatMessage.call_id == call_id) | |
| .order_by(ChatMessage.message_index) | |
| .all() | |
| ) | |
| # Get webhook logs for this call, ordered by attempt time | |
| webhook_logs = ( | |
| db.query(WebhookLog) | |
| .filter(WebhookLog.call_id == call_id) | |
| .order_by(WebhookLog.attempted_at) | |
| .all() | |
| ) | |
| # Calculate dynamic metrics from actual data | |
| metrics = calculate_call_metrics(call, transcripts) | |
| # Group transcripts by speaker for conversation view | |
| conversation = [] | |
| current_speaker = None | |
| current_messages = [] | |
| for transcript in transcripts: | |
| if transcript.speaker != current_speaker: | |
| if current_messages: | |
| conversation.append( | |
| {"speaker": current_speaker, "messages": current_messages} | |
| ) | |
| current_speaker = transcript.speaker | |
| current_messages = [transcript] | |
| else: | |
| current_messages.append(transcript) | |
| # Add the last group | |
| if current_messages: | |
| conversation.append({"speaker": current_speaker, "messages": current_messages}) | |
| # Calculate chat message statistics | |
| chat_stats = { | |
| "total_messages": len(chat_messages), | |
| "system_messages": len([msg for msg in chat_messages if msg.role == "system"]), | |
| "user_messages": len([msg for msg in chat_messages if msg.role == "user"]), | |
| "assistant_messages": len([msg for msg in chat_messages if msg.role == "assistant"]), | |
| "total_tokens": sum(msg.total_tokens for msg in chat_messages if msg.total_tokens), | |
| "total_cost": 0, # Can be calculated based on token usage and model pricing | |
| } | |
| # Calculate webhook statistics | |
| webhook_stats = { | |
| "total_attempts": len(webhook_logs), | |
| "successful_attempts": len([log for log in webhook_logs if log.success]), | |
| "failed_attempts": len([log for log in webhook_logs if not log.success]), | |
| "avg_response_time": ( | |
| sum(log.response_time_ms for log in webhook_logs if log.response_time_ms) / | |
| len([log for log in webhook_logs if log.response_time_ms]) | |
| ) if webhook_logs and any(log.response_time_ms for log in webhook_logs) else 0, | |
| "webhook_types": list(set(log.webhook_type for log in webhook_logs)), | |
| } | |
| # Convert data to dictionaries for JSON serialization | |
| call_dict = { | |
| "id": call.id, | |
| "call_sid": call.call_sid, | |
| "assistant_id": call.assistant_id, | |
| "assistant_name": call.assistant.name if call.assistant else None, | |
| "customer_phone_number": call.customer_phone_number, | |
| "to_phone_number": call.to_phone_number, | |
| "status": call.status, | |
| "duration": call.duration, | |
| "started_at": call.started_at.isoformat() if call.started_at else None, | |
| "ended_at": call.ended_at.isoformat() if call.ended_at else None, | |
| "created_at": call.created_at.isoformat() if call.created_at else None, | |
| "updated_at": call.updated_at.isoformat() if call.updated_at else None, | |
| "call_meta": call.call_meta, | |
| } | |
| recordings_data = [] | |
| for recording in recordings: | |
| recordings_data.append({ | |
| "id": recording.id, | |
| "recording_sid": recording.recording_sid, | |
| "call_id": recording.call_id, | |
| "file_path": recording.file_path, | |
| "recording_url": recording.recording_url, | |
| "duration": recording.duration, | |
| "format": recording.format, | |
| "recording_type": recording.recording_type, | |
| "recording_source": recording.recording_source, | |
| "status": recording.status, | |
| "s3_key": recording.s3_key, | |
| "created_at": recording.created_at.isoformat() if recording.created_at else None, | |
| }) | |
| transcripts_data = [] | |
| for transcript in transcripts: | |
| transcripts_data.append({ | |
| "id": transcript.id, | |
| "call_id": transcript.call_id, | |
| "content": transcript.content, | |
| "speaker": transcript.speaker, | |
| "segment_start": transcript.segment_start, | |
| "segment_end": transcript.segment_end, | |
| "confidence": transcript.confidence, | |
| "is_final": transcript.is_final, | |
| "created_at": transcript.created_at.isoformat() if transcript.created_at else None, | |
| }) | |
| chat_messages_data = [] | |
| for msg in chat_messages: | |
| chat_messages_data.append({ | |
| "id": msg.id, | |
| "call_id": msg.call_id, | |
| "message_index": msg.message_index, | |
| "role": msg.role, | |
| "content": msg.content, | |
| "total_tokens": msg.total_tokens, | |
| "prompt_tokens": msg.prompt_tokens, | |
| "completion_tokens": msg.completion_tokens, | |
| "created_at": msg.created_at.isoformat() if msg.created_at else None, | |
| }) | |
| webhook_logs_data = [] | |
| for log in webhook_logs: | |
| webhook_logs_data.append({ | |
| "id": log.id, | |
| "call_id": log.call_id, | |
| "webhook_type": log.webhook_type, | |
| "attempted_at": log.attempted_at.isoformat() if log.attempted_at else None, | |
| "success": log.success, | |
| "response_code": log.response_code, | |
| "response_time_ms": log.response_time_ms, | |
| "error_message": log.error_message, | |
| "request_body": log.request_body, | |
| "response_body": log.response_body, | |
| }) | |
| # Convert conversation data | |
| conversation_data = [] | |
| for conv in conversation: | |
| conversation_data.append({ | |
| "speaker": conv["speaker"], | |
| "messages": [ | |
| { | |
| "id": msg.id, | |
| "content": msg.content, | |
| "created_at": msg.created_at.isoformat() if msg.created_at else None, | |
| "confidence": msg.confidence, | |
| } | |
| for msg in conv["messages"] | |
| ] | |
| }) | |
| return JSONResponse(content={ | |
| "call": call_dict, | |
| "recordings": recordings_data, | |
| "transcripts": transcripts_data, | |
| "chat_messages": chat_messages_data, | |
| "webhook_logs": webhook_logs_data, | |
| "conversation": conversation_data, | |
| "metrics": metrics, | |
| "chat_stats": chat_stats, | |
| "webhook_stats": webhook_stats, | |
| }) | |
| def calculate_call_metrics(call, transcripts): | |
| """Calculate dynamic metrics from call and transcript data.""" | |
| logger.info(f"Calculating metrics for call {call.call_sid if call else 'None'} with {len(transcripts)} transcripts") | |
| if not transcripts: | |
| logger.info("No transcripts found, returning default metrics") | |
| return { | |
| "avg_response_time": 0, | |
| "fastest_response_time": 0, | |
| "response_consistency": 0, | |
| "conversation_flow_score": 0, | |
| "engagement_score": 0, | |
| "quality_score": 0, | |
| "user_percentage": 50, | |
| "ai_percentage": 50, | |
| "user_turns": 0, | |
| "ai_turns": 0, | |
| } | |
| # Separate transcripts by speaker | |
| user_transcripts = [t for t in transcripts if t.speaker == 'user'] | |
| ai_transcripts = [t for t in transcripts if t.speaker == 'assistant'] | |
| logger.info(f"Found {len(user_transcripts)} user transcripts and {len(ai_transcripts)} AI transcripts") | |
| # Sort all transcripts by timing for proper chronological order | |
| all_transcripts_with_timing = [ | |
| t for t in transcripts | |
| if t.segment_start is not None and t.segment_end is not None | |
| ] | |
| all_transcripts_with_timing.sort(key=lambda x: x.segment_start) | |
| logger.info(f"Found {len(all_transcripts_with_timing)} transcripts with timing data") | |
| # Log some sample timing data | |
| for i, t in enumerate(all_transcripts_with_timing[:3]): # Log first 3 | |
| logger.info(f"Sample transcript {i}: speaker={t.speaker}, start={t.segment_start}, end={t.segment_end}, content='{t.content[:50]}...'") | |
| # Calculate response times using chronological matching | |
| response_times = [] | |
| # Method 1: Use timing data if available | |
| if all_transcripts_with_timing: | |
| for i, transcript in enumerate(all_transcripts_with_timing): | |
| if transcript.speaker == 'user': | |
| # Find the next assistant response after this user message | |
| for j in range(i + 1, len(all_transcripts_with_timing)): | |
| next_transcript = all_transcripts_with_timing[j] | |
| if next_transcript.speaker == 'assistant': | |
| # Calculate response time | |
| response_time = next_transcript.segment_start - transcript.segment_end | |
| logger.info(f"Response time calculation: {next_transcript.segment_start} - {transcript.segment_end} = {response_time}") | |
| if response_time > 0 and response_time <= 30: # Reasonable response time | |
| response_times.append(response_time) | |
| logger.info(f"Added response time: {response_time}") | |
| else: | |
| logger.info(f"Rejected response time: {response_time} (out of bounds)") | |
| break # Only match with the first assistant response | |
| logger.info(f"Method 1 found {len(response_times)} response times: {response_times}") | |
| # Method 2: Fallback to timestamp differences if no timing data or no response times found | |
| if not response_times and user_transcripts and ai_transcripts: | |
| logger.info("Falling back to timestamp-based calculation") | |
| # Sort transcripts by creation time | |
| all_transcripts_by_time = sorted(transcripts, key=lambda x: x.created_at or datetime.datetime.min) | |
| for i, transcript in enumerate(all_transcripts_by_time): | |
| if transcript.speaker == 'user': | |
| # Find the next assistant response after this user message | |
| for j in range(i + 1, len(all_transcripts_by_time)): | |
| next_transcript = all_transcripts_by_time[j] | |
| if next_transcript.speaker == 'assistant': | |
| if transcript.created_at and next_transcript.created_at: | |
| response_time = (next_transcript.created_at - transcript.created_at).total_seconds() | |
| logger.info(f"Timestamp response time: {response_time}") | |
| # Only consider reasonable response times (0.1 to 30 seconds) | |
| if 0.1 <= response_time <= 30: | |
| response_times.append(response_time) | |
| logger.info(f"Added timestamp response time: {response_time}") | |
| break # Only match with the first assistant response | |
| logger.info(f"Final response times: {response_times}") | |
| # Calculate average and fastest response times | |
| avg_response_time = sum(response_times) / len(response_times) if response_times else 0 | |
| fastest_response_time = min(response_times) if response_times else 0 | |
| # Calculate response consistency (percentage of responses within 2s of average) | |
| if response_times: | |
| consistent_responses = sum(1 for rt in response_times if abs(rt - avg_response_time) <= 2.0) | |
| response_consistency = (consistent_responses / len(response_times)) * 100 | |
| else: | |
| response_consistency = 0 | |
| # Calculate speaking time distribution | |
| user_time = len(user_transcripts) * 3 # Approximate 3 seconds per segment | |
| ai_time = len(ai_transcripts) * 3 | |
| total_time = user_time + ai_time | |
| if total_time > 0: | |
| user_percentage = round((user_time / total_time) * 100) | |
| ai_percentage = round((ai_time / total_time) * 100) | |
| else: | |
| user_percentage = 50 | |
| ai_percentage = 50 | |
| # Calculate quality score from confidence levels | |
| confidences = [t.confidence for t in transcripts if t.confidence is not None] | |
| quality_score = round(sum(confidences) / len(confidences) * 100) if confidences else 0 | |
| # Calculate conversation flow score (based on turn-taking pattern) | |
| conversation_flow_score = min(95, max(50, 100 - abs(len(user_transcripts) - len(ai_transcripts)) * 5)) | |
| # Calculate engagement score (based on transcript length and frequency) | |
| avg_transcript_length = sum(len(t.content) for t in transcripts) / len(transcripts) if transcripts else 0 | |
| engagement_score = min(100, max(30, (avg_transcript_length / 50) * 100)) | |
| calculated_metrics = { | |
| "avg_response_time": round(avg_response_time, 2), | |
| "fastest_response_time": round(fastest_response_time, 2), | |
| "response_consistency": round(response_consistency), | |
| "conversation_flow_score": round(conversation_flow_score), | |
| "engagement_score": round(engagement_score), | |
| "quality_score": quality_score, | |
| "user_percentage": user_percentage, | |
| "ai_percentage": ai_percentage, | |
| "user_turns": len(user_transcripts), | |
| "ai_turns": len(ai_transcripts), | |
| } | |
| logger.info(f"Calculated metrics: {calculated_metrics}") | |
| return calculated_metrics | |
| @router.get("/calls/export", response_class=Response) | |
| async def export_calls( | |
| request: Request, | |
| format: str = "csv", | |
| search: str = None, | |
| status: str = None, | |
| assistant_id: int = None, | |
| date_range: str = None, | |
| db: Session = Depends(get_db), | |
| ): | |
| """Export calls data in CSV or JSON format.""" | |
| import csv | |
| # Get calls with same filtering as list view | |
| query = db.query(Call) | |
| # Apply search filter | |
| if search: | |
| search_term = f"%{search}%" | |
| query = query.filter( | |
| (Call.call_sid.ilike(search_term)) | |
| | (Call.customer_phone_number.ilike(search_term)) | |
| | (Call.to_phone_number.ilike(search_term)) | |
| ) | |
| # Apply status filter | |
| if status: | |
| if status == "active": | |
| query = query.filter(Call.status == "ongoing") | |
| elif status == "completed": | |
| query = query.filter(Call.status == "completed") | |
| elif status == "failed": | |
| query = query.filter( | |
| Call.status.in_(["failed", "no-answer", "busy", "canceled"]) | |
| ) | |
| else: | |
| query = query.filter(Call.status == status) | |
| # Apply assistant filter | |
| if assistant_id: | |
| query = query.filter(Call.assistant_id == assistant_id) | |
| # Apply date range filter | |
| if date_range: | |
| today = datetime.datetime.now().date() | |
| if date_range == "today": | |
| query = query.filter(func.date(Call.started_at) == today) | |
| elif date_range == "yesterday": | |
| yesterday = today - datetime.timedelta(days=1) | |
| query = query.filter(func.date(Call.started_at) == yesterday) | |
| elif date_range == "week": | |
| week_ago = today - datetime.timedelta(days=7) | |
| query = query.filter(Call.started_at >= week_ago) | |
| elif date_range == "month": | |
| month_ago = today - datetime.timedelta(days=30) | |
| query = query.filter(Call.started_at >= month_ago) | |
| calls = query.order_by(desc(Call.started_at)).all() | |
| # Prepare export data | |
| export_data = [] | |
| for call in calls: | |
| recording_count = ( | |
| db.query(Recording).filter(Recording.call_id == call.id).count() | |
| ) | |
| transcript_count = ( | |
| db.query(Transcript).filter(Transcript.call_id == call.id).count() | |
| ) | |
| # Calculate call quality from transcripts | |
| avg_confidence = ( | |
| db.query(func.avg(Transcript.confidence)) | |
| .filter(Transcript.call_id == call.id, Transcript.confidence.isnot(None)) | |
| .scalar() | |
| ) | |
| quality = int(avg_confidence * 100) if avg_confidence else None | |
| export_data.append( | |
| { | |
| "call_sid": call.call_sid, | |
| "assistant_name": call.assistant.name if call.assistant else "Unknown", | |
| "customer_phone": call.customer_phone_number, | |
| "to_phone": call.to_phone_number, | |
| "status": call.status.capitalize(), | |
| "duration": f"{call.duration}s" if call.duration else "N/A", | |
| "recording_count": recording_count, | |
| "transcript_count": transcript_count, | |
| "quality": f"{quality}%" if quality is not None else "N/A", | |
| "started_at": ( | |
| call.started_at.strftime("%Y-%m-%d %H:%M:%S") | |
| if call.started_at | |
| else "" | |
| ), | |
| "ended_at": ( | |
| call.ended_at.strftime("%Y-%m-%d %H:%M:%S") if call.ended_at else "" | |
| ), | |
| } | |
| ) | |
| if format.lower() == "json": | |
| # Export as JSON | |
| json_content = json.dumps(export_data, indent=2) | |
| return Response( | |
| content=json_content, | |
| media_type="application/json", | |
| headers={"Content-Disposition": "attachment; filename=calls.json"}, | |
| ) | |
| else: | |
| # Export as CSV (default) | |
| output = StringIO() | |
| writer = csv.DictWriter( | |
| output, | |
| fieldnames=[ | |
| "call_sid", | |
| "assistant_name", | |
| "customer_phone", | |
| "to_phone", | |
| "status", | |
| "duration", | |
| "recording_count", | |
| "transcript_count", | |
| "quality", | |
| "started_at", | |
| "ended_at", | |
| ], | |
| ) | |
| writer.writeheader() | |
| writer.writerows(export_data) | |
| return Response( | |
| content=output.getvalue(), | |
| media_type="text/csv", | |
| headers={"Content-Disposition": "attachment; filename=calls.csv"}, | |
| ) | |
| @router.post("/calls/bulk-action") | |
| async def bulk_action_calls( | |
| request: Request, | |
| action: str = Form(...), | |
| call_ids: str = Form(...), | |
| db: Session = Depends(get_db), | |
| ): | |
| """Perform bulk actions on calls.""" | |
| try: | |
| # Parse call IDs | |
| ids = [int(id.strip()) for id in call_ids.split(",") if id.strip()] | |
| if not ids: | |
| return {"success": False, "message": "No calls selected"} | |
| # Get calls | |
| calls = db.query(Call).filter(Call.id.in_(ids)).all() | |
| if action == "delete": | |
| # Delete related records first | |
| for call in calls: | |
| # Delete recordings (S3 files will be cleaned up separately if needed) | |
| recordings = ( | |
| db.query(Recording).filter(Recording.call_id == call.id).all() | |
| ) | |
| for recording in recordings: | |
| # For S3 recordings, optionally delete from S3 | |
| if recording.recording_source == "s3" and recording.s3_key: | |
| try: | |
| from app.services.s3_service import S3Service | |
| s3_service = S3Service.create_default_instance() | |
| await s3_service.delete_audio_file(recording.s3_key) | |
| logger.info(f"Deleted S3 file: {recording.s3_key}") | |
| except Exception as e: | |
| logger.warning( | |
| f"Could not delete S3 file {recording.s3_key}: {e}" | |
| ) | |
| db.delete(recording) | |
| # Delete transcripts | |
| transcripts = ( | |
| db.query(Transcript).filter(Transcript.call_id == call.id).all() | |
| ) | |
| for transcript in transcripts: | |
| db.delete(transcript) | |
| # Delete call | |
| db.delete(call) | |
| message = ( | |
| f"Deleted {len(calls)} calls with their recordings and transcripts" | |
| ) | |
| elif action == "download_recordings": | |
| # For S3 recordings, we'll provide presigned URLs instead of downloading files | |
| recording_urls = [] | |
| for call in calls: | |
| recordings = ( | |
| db.query(Recording).filter(Recording.call_id == call.id).all() | |
| ) | |
| for recording in recordings: | |
| if recording.recording_source == "s3" and recording.s3_key: | |
| try: | |
| from app.services.s3_service import S3Service | |
| s3_service = S3Service.create_default_instance() | |
| # Generate presigned URL for download | |
| presigned_url = await s3_service.generate_presigned_url( | |
| s3_key=recording.s3_key, | |
| expiration=3600, # 1 hour | |
| ) | |
| if presigned_url: | |
| recording_urls.append({ | |
| "call_sid": call.call_sid, | |
| "recording_type": recording.recording_type, | |
| "format": recording.format, | |
| "url": presigned_url, | |
| "filename": f"{call.call_sid}_{recording.recording_type}.{recording.format}" | |
| }) | |
| except Exception as e: | |
| logger.warning(f"Could not generate presigned URL for recording {recording.id}: {e}") | |
| if recording_urls: | |
| # Return JSON with download URLs | |
| return { | |
| "success": True, | |
| "message": f"Generated download URLs for {len(recording_urls)} recordings", | |
| "recording_urls": recording_urls, | |
| "expires_in": "1 hour" | |
| } | |
| else: | |
| return { | |
| "success": False, | |
| "message": "No S3 recordings found for selected calls" | |
| } | |
| else: | |
| return {"success": False, "message": "Invalid action"} | |
| db.commit() | |
| return {"success": True, "message": message} | |
| except Exception as e: | |
| db.rollback() | |
| return {"success": False, "message": f"Error: {str(e)}"} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment