Viewing File: /home/ubuntu/codegamaai-test/phone_call/main.py
# -*- coding: utf-8 -*-
from fastapi import FastAPI, Request, WebSocket, Response
from twilio.twiml.voice_response import VoiceResponse, Connect, Stream, Gather
from google.cloud import speech, texttospeech
import openai
import asyncio
from dotenv import load_dotenv
import os
import traceback
import json
import base64
load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")
os.environ[
"GOOGLE_APPLICATION_CREDENTIALS"] = "/home/zrlhowsqpnco/codegama_bot/phone_call-ai/calling_agent/GOOGLE_APPLICATION_CREDENTIALS.json"
app = FastAPI()
speech_client = speech.SpeechClient()
tts_client = texttospeech.TextToSpeechClient()
conversation_context = []
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
print("WebSocket connection accepted")
try:
while True:
message = await websocket.receive()
if message['type'] == 'websocket.receive':
data = json.loads(message['text'])
print(f"Received event: {data['event']}")
if data['event'] == 'media':
audio_chunk = base64.b64decode(data['media']['payload'])
transcript = await transcribe(audio_chunk) # Process each chunk as it comes
if transcript:
print(f"Real-time Transcript: {transcript}")
# Optionally, respond back in real-time (additional implementation needed)
elif data['event'] == 'stop':
print("Received stop event.")
elif message['type'] == 'websocket.disconnect':
print("WebSocket disconnected")
break
except Exception as e:
print(f"WebSocket error: {str(e)}")
finally:
await websocket.close()
async def transcribe(audio_chunk):
audio = speech.RecognitionAudio(content=audio_chunk)
config = speech.RecognitionConfig(
encoding=speech.RecognitionConfig.AudioEncoding.MULAW,
sample_rate_hertz=8000,
language_code="en-US"
)
try:
response = speech_client.recognize(config=config, audio=audio)
if response.results:
return response.results[0].alternatives[0].transcript
else:
return "No transcription results."
except Exception as e:
print(f"Error during transcription: {str(e)}")
return "Error during transcription."
@app.post("/voice")
async def handle_voice(request: Request):
body = await request.body()
print("Incoming request body:", body)
response = VoiceResponse()
connect = Connect()
stream = Stream(url="wss://replica-ai.botfingers.com:9010/ws")
connect.append(stream)
gather = Gather(input='speech dtmf', timeout=10, action='/process_speech', method='POST')
gather.say("Please start speaking after the beep, or press any key to continue.")
response.append(connect)
response.append(gather)
response.say("We did not receive any input. Goodbye!")
return Response(content=str(response), media_type="application/xml")
@app.post("/process_speech")
async def process_speech(request: Request):
data = await request.form()
user_speech = data.get('SpeechResult', '') # Safely get the transcribed text
print("Received speech to process:", user_speech)
if not user_speech:
response = VoiceResponse()
response.say("I did not catch that. Please say something.")
return Response(content=str(response), media_type="application/xml")
response_text = await generate_response(user_speech)
response = VoiceResponse()
response.say(response_text)
response.hangup()
return Response(content=str(response), media_type="application/xml")
async def generate_response(transcript):
global conversation_context
conversation_context.append({"role": "user", "content": transcript})
response = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "system", "content": "You are an AI assistant."}] + conversation_context
)
assistant_response = response.choices[0].message.content.strip()
conversation_context.append({"role": "assistant", "content": assistant_response})
return assistant_response
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=9010,ssl_keyfile="privkey.pem", ssl_certfile="fullchain.pem")
Back to Directory
File Manager