Viewing File: /home/ubuntu/codegamaai-test/phone_call/main.py

# -*- coding: utf-8 -*-
from fastapi import FastAPI, Request, WebSocket, Response
from twilio.twiml.voice_response import VoiceResponse, Connect, Stream, Gather
from google.cloud import speech, texttospeech
import openai
import asyncio
from dotenv import load_dotenv
import os
import traceback
import json 
import base64

load_dotenv()

openai.api_key = os.getenv("OPENAI_API_KEY")
os.environ[
    "GOOGLE_APPLICATION_CREDENTIALS"] = "/home/zrlhowsqpnco/codegama_bot/phone_call-ai/calling_agent/GOOGLE_APPLICATION_CREDENTIALS.json"

app = FastAPI()

speech_client = speech.SpeechClient()
tts_client = texttospeech.TextToSpeechClient()

conversation_context = []

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    print("WebSocket connection accepted")
    try:
        while True:
            message = await websocket.receive()
            if message['type'] == 'websocket.receive':
                data = json.loads(message['text'])
                print(f"Received event: {data['event']}")
                if data['event'] == 'media':
                    audio_chunk = base64.b64decode(data['media']['payload'])
                    transcript = await transcribe(audio_chunk)  # Process each chunk as it comes
                    if transcript:
                        print(f"Real-time Transcript: {transcript}")
                        # Optionally, respond back in real-time (additional implementation needed)
                elif data['event'] == 'stop':
                    print("Received stop event.")
            elif message['type'] == 'websocket.disconnect':
                print("WebSocket disconnected")
                break
    except Exception as e:
        print(f"WebSocket error: {str(e)}")
    finally:
        await websocket.close()

async def transcribe(audio_chunk):
    audio = speech.RecognitionAudio(content=audio_chunk)
    config = speech.RecognitionConfig(
        encoding=speech.RecognitionConfig.AudioEncoding.MULAW,
        sample_rate_hertz=8000,
        language_code="en-US"
    )
    try:
        response = speech_client.recognize(config=config, audio=audio)
        if response.results:
            return response.results[0].alternatives[0].transcript
        else:
            return "No transcription results."
    except Exception as e:
        print(f"Error during transcription: {str(e)}")
        return "Error during transcription."

@app.post("/voice")
async def handle_voice(request: Request):
    body = await request.body()
    print("Incoming request body:", body)
    response = VoiceResponse()
    connect = Connect()
    stream = Stream(url="wss://replica-ai.botfingers.com:9010/ws")
    connect.append(stream)
    gather = Gather(input='speech dtmf', timeout=10, action='/process_speech', method='POST')
    gather.say("Please start speaking after the beep, or press any key to continue.")
    response.append(connect)
    response.append(gather)
    response.say("We did not receive any input. Goodbye!")
    return Response(content=str(response), media_type="application/xml")

@app.post("/process_speech")
async def process_speech(request: Request):
    data = await request.form()
    user_speech = data.get('SpeechResult', '')  # Safely get the transcribed text
    print("Received speech to process:", user_speech)

    if not user_speech:
        response = VoiceResponse()
        response.say("I did not catch that. Please say something.")
        return Response(content=str(response), media_type="application/xml")

    response_text = await generate_response(user_speech)
    response = VoiceResponse()
    response.say(response_text)
    response.hangup()
    return Response(content=str(response), media_type="application/xml")

async def generate_response(transcript):
    global conversation_context
    conversation_context.append({"role": "user", "content": transcript})
    response = openai.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "system", "content": "You are an AI assistant."}] + conversation_context
    )
    assistant_response = response.choices[0].message.content.strip()
    conversation_context.append({"role": "assistant", "content": assistant_response})
    return assistant_response

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=9010,ssl_keyfile="privkey.pem", ssl_certfile="fullchain.pem") 
Back to Directory File Manager