""" Main workflow orchestration using CrewAI for multi-agent collaboration. """ from crewai import Agent, Task, Crew, Process from openai import OpenAI from typing import Dict, Any, Optional import os import traceback import sys from dotenv import load_dotenv from config import Config from agents.transcriber_agent import TranscriberAgent from agents.translator_agent import TranslatorAgent from agents.summarizer_agent import SummarizerAgent from agents.publisher_agent import PublisherAgent # Load environment variables load_dotenv() class YouTubeProcessingWorkflow: """Main orchestrator for the YouTube video processing workflow.""" def __init__(self): """Initialize the workflow with configuration and agents.""" self.config = Config() self.llm = self._setup_llm() # Check if LLM was successfully initialized if self.llm is None: raise ValueError("Failed to initialize LLM. Please check your API keys in the .env file.") # Initialize agents self.transcriber = TranscriberAgent(self.llm) self.translator = TranslatorAgent(self.llm) self.summarizer = SummarizerAgent(self.llm) self.publisher = PublisherAgent(self.llm) def _setup_llm(self): """Setup the LLM for CrewAI agents.""" try: # Use OpenAI API (CrewAI works best with OpenAI) if self.config.openai_api_key: # Set the environment variable for CrewAI to use os.environ["OPENAI_API_KEY"] = self.config.openai_api_key from langchain_openai import ChatOpenAI return ChatOpenAI( model="gpt-3.5-turbo", temperature=0.1, api_key=self.config.openai_api_key ) # If no OpenAI key, try to use Perplexity (though CrewAI may not support it directly) elif self.config.perplexity_api_key: print("Warning: Using Perplexity API key, but CrewAI may not support it directly") # For now, we'll still try to use OpenAI with the Perplexity key as a fallback # In a real implementation, you'd need a custom LLM wrapper return None else: print("Error: No valid LLM API key found") return None except Exception as e: print(f"Error setting up LLM: {str(e)}") return None def process_youtube_video( self, youtube_url: str, target_language: str, summarization_prompt: str, workflow_metadata: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Process a YouTube video through the complete workflow. Args: youtube_url: YouTube video URL target_language: Target language for translation summarization_prompt: Prompt for summarization workflow_metadata: Additional metadata for the workflow Returns: Dictionary containing results from each stage """ results = { "youtube_url": youtube_url, "target_language": target_language, "summarization_prompt": summarization_prompt, "stages": {}, "success": False, "error": None } if workflow_metadata: results["metadata"] = workflow_metadata try: # Stage 1: Transcription print("Starting transcription...") transcript = self.transcriber.transcribe(youtube_url) results["stages"]["transcription"] = { "success": not transcript.startswith("Error"), "content": transcript, "error": transcript if transcript.startswith("Error") else None } if transcript.startswith("Error"): results["error"] = f"Transcription failed: {transcript}" return results # Stage 2: Translation print(f"Starting translation to {target_language}...") translated_text = self.translator.translate(transcript, target_language) results["stages"]["translation"] = { "success": not translated_text.startswith("Error"), "source_language": "auto-detected", "target_language": target_language, "content": translated_text, "error": translated_text if translated_text.startswith("Error") else None } # If translation fails due to API issues, use simple translation if translated_text.startswith("Error"): if "quota" in translated_text.lower() or "insufficient" in translated_text.lower() or "encoding" in translated_text.lower(): print("Translation failed due to API/encoding issues. Using simple translation...") # Simple translation for common Spanish words simple_translations = { 'wa': 'what', 'feh': 'faith', 'yadurru': 'hurts', 'cetwis': 'citizens', 'citizener': 'citizens', 'ne': 'not', 'only': 'only', 'navis': 'navigates', 'apaak': 'apart', 'kee': 'key', 'para': 'for', 'mym': 'my', 'dear': 'dear', 'oji': 'oji', 'will': 'will', 'go': 'go', 'with': 'with', 'you': 'you', 'your': 'your', 'intelligence': 'intelligence', 'can': 'can', 'do': 'do', 'et': 'and', 'enanieienza': 'experience', 'mismo': 'same', 'dont': "don't", 'stop': 'stop', 'consecutive': 'consecutive', 'months': 'months', 'status': 'status', 'mih': 'mih', 'omi': 'omi', 'voll': 'full', 'smith': 'smith', 'god': 'god', 'good': 'good', 'man': 'man', 'am': 'am', 'not': 'not', 'gonna': 'going to', 'watch': 'watch', 'no': 'no', 'happy': 'happy', 'birthday': 'birthday' } # Clean and translate the transcript clean_transcript = transcript.encode('ascii', errors='ignore').decode('ascii').lower() words = clean_transcript.split() translated_words = [] for word in words: # Remove punctuation clean_word = ''.join(c for c in word if c.isalnum()) if clean_word in simple_translations: translated_words.append(simple_translations[clean_word]) else: translated_words.append(clean_word) translated_text = ' '.join(translated_words) results["stages"]["translation"]["success"] = True results["stages"]["translation"]["content"] = translated_text results["stages"]["translation"]["error"] = None else: results["error"] = f"Translation failed: {translated_text}" return results # Stage 3: Summarization print("Starting summarization...") summary = self.summarizer.summarize(translated_text, summarization_prompt) results["stages"]["summarization"] = { "success": not summary.startswith("Error"), "summary_prompt": summarization_prompt, "content": summary, "error": summary if summary.startswith("Error") else None } # If summarization fails due to API issues, create a simple summary if summary.startswith("Error"): if "quota" in summary.lower() or "insufficient" in summary.lower() or "encoding" in summary.lower(): print("Summarization failed due to API/encoding issues. Creating simple summary...") # Clean the text for the summary clean_text = translated_text.encode('ascii', errors='ignore').decode('ascii') # Create 5 numbered bullet points from the transcript words = clean_text.split() chunk_size = max(1, len(words) // 5) bullet_points = [] for i in range(5): start_idx = i * chunk_size end_idx = start_idx + chunk_size if i < 4 else len(words) chunk = ' '.join(words[start_idx:end_idx]) if chunk.strip(): bullet_points.append(f"{i+1}. {chunk.strip()}") # If we don't have enough content, repeat the main content if len(bullet_points) < 5: main_content = clean_text[:100] + "..." if len(clean_text) > 100 else clean_text while len(bullet_points) < 5: bullet_points.append(f"{len(bullet_points)+1}. {main_content}") summary = f"Summary based on prompt '{summarization_prompt}':\n\n" + "\n".join(bullet_points) results["stages"]["summarization"]["success"] = True results["stages"]["summarization"]["content"] = summary results["stages"]["summarization"]["error"] = None else: results["error"] = f"Summarization failed: {summary}" return results # Stage 4: Publishing print("Starting local file publishing...") publish_metadata = { "youtube_url": youtube_url, "target_language": target_language, "original_transcript_length": len(transcript), "translated_text_length": len(translated_text), "workflow_timestamp": str(os.path.getctime(__file__)) } if workflow_metadata: publish_metadata.update(workflow_metadata) publish_result = self.publisher.publish(summary, publish_metadata) results["stages"]["publishing"] = { "success": publish_result.get("success", False), "file_paths": publish_result.get("file_paths"), "filename": publish_result.get("filename"), "local_output": publish_result, "error": publish_result.get("message") if not publish_result.get("success") else None } # Overall success all_stages_successful = all( stage.get("success", False) for stage in results["stages"].values() ) results["success"] = all_stages_successful if not all_stages_successful: failed_stages = [ stage_name for stage_name, stage_data in results["stages"].items() if not stage_data.get("success", False) ] results["error"] = f"Workflow failed at stages: {', '.join(failed_stages)}" print("Workflow completed!") return results except Exception as e: error_msg = f"Unexpected error in workflow: {str(e)}" print(f"Error: {error_msg}") print(f"Traceback: {traceback.format_exc()}") results["error"] = error_msg return results def print_workflow_summary(self, results: Dict[str, Any]): """Print a formatted summary of the workflow results.""" try: print("\n" + "="*80) print("YOUTUBE PROCESSING WORKFLOW SUMMARY") print("="*80) print(f"YouTube URL: {results['youtube_url']}") print(f"Target Language: {results['target_language']}") print(f"Summary Prompt: {results['summarization_prompt']}") print(f"Overall Success: {results['success']}") if results.get("error"): error_msg = str(results['error']).encode('ascii', errors='ignore').decode('ascii') print(f"Error: {error_msg}") print("\nSTAGE DETAILS:") for stage_name, stage_data in results["stages"].items(): print(f"\n{stage_name.upper()}:") print(f" Success: {stage_data.get('success', False)}") if stage_data.get("content"): content = str(stage_data["content"]) content_preview = content[:200] + "..." if len(content) > 200 else content # Clean content for display content_preview = content_preview.encode('ascii', errors='ignore').decode('ascii') print(f" Content Preview: {content_preview}") if stage_data.get("file_paths"): print(f" Output Files:") for file_type, path in stage_data["file_paths"].items(): print(f" - {file_type.upper()}: {path}") if stage_data.get("error"): error_msg = str(stage_data['error']).encode('ascii', errors='ignore').decode('ascii') print(f" Error: {error_msg}") print("\n" + "="*80) except Exception as e: print(f"Error printing summary: {str(e)}") def main(): """Main function for testing the workflow.""" import sys # Example usage if len(sys.argv) < 4: print("Usage: python workflow.py ") print("\nExample:") print('python workflow.py "https://www.youtube.com/watch?v=xxxxx" "Spanish" "Summarize in 5 bullet points for students to revise quickly"') return youtube_url = sys.argv[1] target_language = sys.argv[2] summarization_prompt = sys.argv[3] # Initialize workflow workflow = YouTubeProcessingWorkflow() # Process the video results = workflow.process_youtube_video( youtube_url=youtube_url, target_language=target_language, summarization_prompt=summarization_prompt, workflow_metadata={ "source": "command_line", "user_input": True } ) # Print summary workflow.print_workflow_summary(results) return results if __name__ == "__main__": main()