""" Publisher Agent for CrewAI workflow. Outputs processed content to local files instead of external API. """ from crewai import Agent, Task import json import os from datetime import datetime from typing import Dict, Any from config import Config class PublisherAgent: """Agent responsible for outputting processed summaries to local files.""" def __init__(self, perplexity_llm): """ Initialize the publisher agent. Args: perplexity_llm: Configured LLM for CrewAI """ self.config = Config() self.output_dir = "output" self._ensure_output_dir() self.agent = self._create_agent(perplexity_llm) def _ensure_output_dir(self): """Ensure output directory exists.""" if not os.path.exists(self.output_dir): os.makedirs(self.output_dir) def _create_agent(self, llm) -> Agent: """Create the CrewAI agent for publishing.""" return Agent( role='Content Publisher', goal='Successfully output processed content to local files with proper formatting and organization', backstory="""You are a skilled content manager with expertise in organizing and publishing processed content. You excel at creating well-structured output files, managing different content types, and ensuring reliable data storage. Your work is characterized by thoroughness and attention to detail in content organization.""", verbose=True, allow_delegation=False ) def create_publishing_task(self, summarized_text: str, metadata: Dict[str, Any]) -> Task: """ Create a publishing task for summarized text. Args: summarized_text: The summarized text to publish metadata: Additional metadata for the note Returns: CrewAI Task for publishing """ return Task( description=f""" Output the following summarized content to local files: Summarized Content: {summarized_text} Metadata: {json.dumps(metadata, indent=2)} Your task is to: 1. Format the content appropriately for local storage 2. Include all relevant metadata 3. Create well-organized output files 4. Provide clear status feedback Return the file path and confirmation of successful output. """, expected_output="File path and confirmation of successful local output", agent=self.agent ) def save_to_local_file(self, summarized_text: str, metadata_map: Dict[str, Any] = None) -> Dict[str, Any]: """ Save summarized text to local file with metadata. Args: summarized_text: Text to save metadata_map: Additional metadata for the note Returns: Save result dictionary """ if metadata_map is None: metadata_map = {} try: # Create filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") youtube_url = metadata_map.get("youtube_url", "unknown_video") video_id = youtube_url.split("v=")[-1].split("&")[0] if "youtube.com" in youtube_url else "unknown" target_language = metadata_map.get("target_language", "unknown") filename = f"{timestamp}_{video_id}_{target_language}.json" filepath = os.path.join(self.output_dir, filename) # Prepare the complete content content_data = { "summary": summarized_text, "metadata": metadata_map, "timestamp": timestamp, "type": "youtube_summary", "workflow_version": "1.0" } # Save to JSON file with open(filepath, 'w', encoding='utf-8') as f: json.dump(content_data, f, indent=2, ensure_ascii=False) # Also save a simple text version text_filename = filename.replace('.json', '.txt') text_filepath = os.path.join(self.output_dir, text_filename) with open(text_filepath, 'w', encoding='utf-8') as f: f.write(f"# YouTube Video Summary\n") f.write(f"**Video:** {metadata_map.get('youtube_url', 'Unknown')}\n") f.write(f"**Language:** {target_language}\n") f.write(f"**Generated:** {timestamp}\n") f.write(f"\n---\n\n") f.write(summarized_text) f.write(f"\n\n---\n**Metadata:**\n") f.write(json.dumps(metadata_map, indent=2, ensure_ascii=False)) return { "success": True, "message": f"Successfully saved summary to local files", "file_paths": { "json": filepath, "txt": text_filepath }, "filename": filename } except Exception as e: return { "success": False, "message": f"Error saving summary to local files: {str(e)}", "file_paths": None, "filename": None } def publish(self, summarized_text: str, metadata_map: Dict[str, Any] = None) -> Dict[str, Any]: """ Main publishing method that saves content locally. Args: summarized_text: Text to publish metadata_map: Additional metadata for the note Returns: Publishing result dictionary with file paths """ try: return self.save_to_local_file(summarized_text, metadata_map) except Exception as e: return { "success": False, "message": f"Error in publishing workflow: {str(e)}", "file_paths": None, "filename": None }