bytelyst-devops-tools/supabase monitor/agents/translator_agent.py

101 lines
3.5 KiB
Python

"""
Translator Agent for CrewAI workflow.
"""
from crewai import Agent, Task
from typing import Dict, Any
class TranslatorAgent:
"""Agent responsible for translating transcripts."""
def __init__(self, perplexity_llm):
"""
Initialize the translator agent.
Args:
perplexity_llm: Configured LLM for CrewAI
"""
self.agent = self._create_agent(perplexity_llm)
def _create_agent(self, llm) -> Agent:
"""Create the CrewAI agent for translation."""
return Agent(
role='Language Translator',
goal='Accurately translate text between languages while preserving meaning and context',
backstory="""You are a professional translator with expertise in multiple
languages and cultural contexts. You excel at translating text while
maintaining the original meaning, tone, and cultural nuances. Your
translations are always contextually appropriate and linguistically accurate.""",
verbose=True,
allow_delegation=False,
llm=llm
)
def create_translation_task(self, transcript: str, target_language: str) -> Task:
"""
Create a translation task for transcript.
Args:
transcript: The transcript text to translate
target_language: Target language for translation
Returns:
CrewAI Task for translation
"""
return Task(
description=f"""
Translate the following transcript to {target_language}:
Transcript:
{transcript}
Your task is to:
1. Translate the entire transcript to {target_language}
2. Maintain the original meaning and context
3. Preserve the conversational tone
4. Ensure grammatical accuracy in the target language
5. Keep the structure and formatting of the original text
Return only the translated text without any additional comments or explanations.
""",
expected_output=f"Complete transcript translated to {target_language}",
agent=self.agent
)
def translate(self, transcript: str, target_language: str) -> str:
"""
Translate transcript to target language using LLM.
Args:
transcript: Text to translate
target_language: Target language
Returns:
Translated text
"""
try:
# Clean transcript to handle encoding issues
clean_transcript = transcript.encode('utf-8', errors='ignore').decode('utf-8')
# Create translation task
task = self.create_translation_task(clean_transcript, target_language)
# Create crew and execute
from crewai import Crew
crew = Crew(
agents=[self.agent],
tasks=[task],
verbose=True
)
result = crew.kickoff()
return str(result)
except Exception as e:
# Handle encoding errors gracefully
error_msg = str(e)
if 'charmap' in error_msg or 'encode' in error_msg:
return f"Error: Unable to process text due to encoding issues. Original text: {transcript[:100]}..."
return f"Error translating text: {error_msg}"