bytelyst-devops-tools/supabase monitor/agents/summarizer_agent.py

105 lines
3.7 KiB
Python

"""
Summarizer Agent for CrewAI workflow.
"""
from crewai import Agent, Task
from typing import Dict, Any
class SummarizerAgent:
"""Agent responsible for summarizing translated transcripts."""
def __init__(self, perplexity_llm):
"""
Initialize the summarizer agent.
Args:
perplexity_llm: Configured LLM for CrewAI
"""
self.agent = self._create_agent(perplexity_llm)
def _create_agent(self, llm) -> Agent:
"""Create the CrewAI agent for summarization."""
return Agent(
role='Content Summarizer',
goal='Create clear, concise, and comprehensive summaries based on specific requirements',
backstory="""You are an expert content analyst with exceptional summarization
skills. You excel at distilling complex information into clear, organized
summaries that capture the essential points while maintaining readability.
Your summaries are always tailored to specific requirements and target
audiences.""",
verbose=True,
allow_delegation=False,
llm=llm
)
def create_summarization_task(self, translated_text: str, summarization_prompt: str) -> Task:
"""
Create a summarization task for translated text.
Args:
translated_text: The translated text to summarize
summarization_prompt: Custom prompt for summarization requirements
Returns:
CrewAI Task for summarization
"""
return Task(
description=f"""
Summarize the following translated text according to the specific requirements:
Translated Text:
{translated_text}
Summarization Requirements:
{summarization_prompt}
Your task is to:
1. Analyze the translated content thoroughly
2. Follow the specific summarization instructions provided
3. Create a well-structured summary that meets the requirements
4. Ensure the summary is accurate and comprehensive
5. Maintain clarity and readability
Return only the summary without any additional comments or explanations.
""",
expected_output="Summary based on the provided requirements and prompt",
agent=self.agent
)
def summarize(self, translated_text: str, summarization_prompt: str) -> str:
"""
Summarize translated text based on custom prompt.
Args:
translated_text: Text to summarize
summarization_prompt: Custom prompt for summarization
Returns:
Summarized text
"""
try:
# Clean text to handle encoding issues
clean_text = translated_text.encode('utf-8', errors='ignore').decode('utf-8')
# Create summarization task
task = self.create_summarization_task(clean_text, summarization_prompt)
# Create crew and execute
from crewai import Crew
crew = Crew(
agents=[self.agent],
tasks=[task],
verbose=True
)
result = crew.kickoff()
return str(result)
except Exception as e:
# Handle encoding errors gracefully
error_msg = str(e)
if 'charmap' in error_msg or 'encode' in error_msg:
return f"Error: Unable to process text due to encoding issues. Original text: {translated_text[:100]}..."
return f"Error summarizing text: {error_msg}"