166 lines
6.1 KiB
Python
166 lines
6.1 KiB
Python
"""
|
|
Publisher Agent for CrewAI workflow.
|
|
Outputs processed content to local files instead of external API.
|
|
"""
|
|
from crewai import Agent, Task
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
from typing import Dict, Any
|
|
from config import Config
|
|
|
|
class PublisherAgent:
|
|
"""Agent responsible for outputting processed summaries to local files."""
|
|
|
|
def __init__(self, perplexity_llm):
|
|
"""
|
|
Initialize the publisher agent.
|
|
|
|
Args:
|
|
perplexity_llm: Configured LLM for CrewAI
|
|
"""
|
|
self.config = Config()
|
|
self.output_dir = "output"
|
|
self._ensure_output_dir()
|
|
self.agent = self._create_agent(perplexity_llm)
|
|
|
|
def _ensure_output_dir(self):
|
|
"""Ensure output directory exists."""
|
|
if not os.path.exists(self.output_dir):
|
|
os.makedirs(self.output_dir)
|
|
|
|
def _create_agent(self, llm) -> Agent:
|
|
"""Create the CrewAI agent for publishing."""
|
|
return Agent(
|
|
role='Content Publisher',
|
|
goal='Successfully output processed content to local files with proper formatting and organization',
|
|
backstory="""You are a skilled content manager with expertise in organizing
|
|
and publishing processed content. You excel at creating well-structured output
|
|
files, managing different content types, and ensuring reliable data storage.
|
|
Your work is characterized by thoroughness and attention to detail in content
|
|
organization.""",
|
|
verbose=True,
|
|
allow_delegation=False
|
|
)
|
|
|
|
def create_publishing_task(self, summarized_text: str, metadata: Dict[str, Any]) -> Task:
|
|
"""
|
|
Create a publishing task for summarized text.
|
|
|
|
Args:
|
|
summarized_text: The summarized text to publish
|
|
metadata: Additional metadata for the note
|
|
|
|
Returns:
|
|
CrewAI Task for publishing
|
|
"""
|
|
return Task(
|
|
description=f"""
|
|
Output the following summarized content to local files:
|
|
|
|
Summarized Content:
|
|
{summarized_text}
|
|
|
|
Metadata:
|
|
{json.dumps(metadata, indent=2)}
|
|
|
|
Your task is to:
|
|
1. Format the content appropriately for local storage
|
|
2. Include all relevant metadata
|
|
3. Create well-organized output files
|
|
4. Provide clear status feedback
|
|
|
|
Return the file path and confirmation of successful output.
|
|
""",
|
|
expected_output="File path and confirmation of successful local output",
|
|
agent=self.agent
|
|
)
|
|
|
|
def save_to_local_file(self, summarized_text: str, metadata_map: Dict[str, Any] = None) -> Dict[str, Any]:
|
|
"""
|
|
Save summarized text to local file with metadata.
|
|
|
|
Args:
|
|
summarized_text: Text to save
|
|
metadata_map: Additional metadata for the note
|
|
|
|
Returns:
|
|
Save result dictionary
|
|
"""
|
|
if metadata_map is None:
|
|
metadata_map = {}
|
|
|
|
try:
|
|
# Create filename with timestamp
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
youtube_url = metadata_map.get("youtube_url", "unknown_video")
|
|
video_id = youtube_url.split("v=")[-1].split("&")[0] if "youtube.com" in youtube_url else "unknown"
|
|
target_language = metadata_map.get("target_language", "unknown")
|
|
|
|
filename = f"{timestamp}_{video_id}_{target_language}.json"
|
|
filepath = os.path.join(self.output_dir, filename)
|
|
|
|
# Prepare the complete content
|
|
content_data = {
|
|
"summary": summarized_text,
|
|
"metadata": metadata_map,
|
|
"timestamp": timestamp,
|
|
"type": "youtube_summary",
|
|
"workflow_version": "1.0"
|
|
}
|
|
|
|
# Save to JSON file
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(content_data, f, indent=2, ensure_ascii=False)
|
|
|
|
# Also save a simple text version
|
|
text_filename = filename.replace('.json', '.txt')
|
|
text_filepath = os.path.join(self.output_dir, text_filename)
|
|
with open(text_filepath, 'w', encoding='utf-8') as f:
|
|
f.write(f"# YouTube Video Summary\n")
|
|
f.write(f"**Video:** {metadata_map.get('youtube_url', 'Unknown')}\n")
|
|
f.write(f"**Language:** {target_language}\n")
|
|
f.write(f"**Generated:** {timestamp}\n")
|
|
f.write(f"\n---\n\n")
|
|
f.write(summarized_text)
|
|
f.write(f"\n\n---\n**Metadata:**\n")
|
|
f.write(json.dumps(metadata_map, indent=2, ensure_ascii=False))
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Successfully saved summary to local files",
|
|
"file_paths": {
|
|
"json": filepath,
|
|
"txt": text_filepath
|
|
},
|
|
"filename": filename
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"message": f"Error saving summary to local files: {str(e)}",
|
|
"file_paths": None,
|
|
"filename": None
|
|
}
|
|
|
|
def publish(self, summarized_text: str, metadata_map: Dict[str, Any] = None) -> Dict[str, Any]:
|
|
"""
|
|
Main publishing method that saves content locally.
|
|
|
|
Args:
|
|
summarized_text: Text to publish
|
|
metadata_map: Additional metadata for the note
|
|
|
|
Returns:
|
|
Publishing result dictionary with file paths
|
|
"""
|
|
try:
|
|
return self.save_to_local_file(summarized_text, metadata_map)
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"message": f"Error in publishing workflow: {str(e)}",
|
|
"file_paths": None,
|
|
"filename": None
|
|
} |