Notes
Notes - notes.io |
Main orchestrator for AI Magazine Generator
Coordinates all agents to create the complete magazine
"""
import asyncio
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Optional
from openai import OpenAI
from agents.data_collector import DataCollectorAgent
from agents.content_validator import ContentValidatorAgent
from agents.editorial_agent import EditorialAgent
from agents.content_formatter import ContentFormatterAgent
from agents.design_agent import DesignAgent
from agents.publisher_agent import PublisherAgent
from config.settings import settings
from utils.models import AgentResponse, Magazine
from utils.helpers import setup_logging, Timer
class MagazineOrchestrator:
"""Main orchestrator that coordinates all agents"""
def __init__(self, config_override: Optional[Dict[str, Any]] = None):
"""Initialize the orchestrator with all agents"""
# Setup logging
self.logger = setup_logging("magazine_generator.log")
# Validate configuration
config_issues = settings.validate()
if config_issues:
for issue in config_issues:
self.logger.warning(f"Configuration issue: {issue}")
# Apply any configuration overrides
if config_override:
for key, value in config_override.items():
if hasattr(settings, key):
setattr(settings, key, value)
# Initialize OpenAI client
try:
self.openai_client = OpenAI(api_key=settings.openai_api_key)
self.logger.info("OpenAI client initialized successfully")
except Exception as e:
self.logger.error(f"Failed to initialize OpenAI client: {e}")
raise
# Initialize all agents
self.agents = self._initialize_agents()
# Generation statistics
self.stats = {
'generation_start': None,
'generation_end': None,
'total_time': 0,
'agent_times': {},
'articles_processed': 0,
'sections_created': 0,
'errors': []
}
def _initialize_agents(self) -> Dict[str, Any]:
"""Initialize all agents with their configurations"""
self.logger.info("🤖 Initializing AI agents...")
agents = {
'collector': DataCollectorAgent(
config=settings.collector_agent,
openai_client=self.openai_client,
news_sources=settings.news_sources,
ai_keywords=settings.ai_keywords
),
'validator': ContentValidatorAgent(
config=settings.validator_agent,
openai_client=self.openai_client
),
'editor': EditorialAgent(
config=settings.editorial_agent,
openai_client=self.openai_client
),
'formatter': ContentFormatterAgent(
config=settings.formatter_agent,
openai_client=self.openai_client
),
'designer': DesignAgent(
config=settings.designer_agent,
openai_client=self.openai_client
),
'publisher': PublisherAgent(
config=settings.publisher_agent,
openai_client=self.openai_client,
output_dir=settings.output_dir
)
}
self.logger.info(f"✅ Initialized {len(agents)} agents successfully")
return agents
async def generate_magazine(self,
days_back: int = None,
max_articles: int = None,
magazine_title: str = None) -> Dict[str, Any]:
"""Generate complete AI magazine"""
self.logger.info("🚀 Starting AI Magazine Generation Pipeline")
self.logger.info("=" * 60)
# Initialize generation stats
self.stats['generation_start'] = datetime.now()
# Use defaults if not provided
days_back = days_back or settings.days_back
max_articles = max_articles or 100
try:
with Timer("Complete Magazine Generation") as total_timer:
# Phase 1: Data Collection
self.logger.info("📡 PHASE 1: Data Collection")
articles_response = await self._execute_agent_with_stats(
'collector',
days_back=days_back,
max_articles=max_articles
)
if not articles_response.success or not articles_response.data:
return self._create_error_response("Data collection failed", articles_response.errors)
articles = articles_response.data
self.stats['articles_processed'] = len(articles)
self.logger.info(f"📊 Collected {len(articles)} articles")
# Phase 2: Content Validation
self.logger.info("✅ PHASE 2: Content Validation")
validation_response = await self._execute_agent_with_stats(
'validator',
articles=articles
)
if not validation_response.success:
return self._create_error_response("Content validation failed", validation_response.errors)
validated_articles = validation_response.data
self.logger.info(f"📊 Validated {len(validated_articles)} articles")
# Phase 3: Editorial Curation
self.logger.info("📝 PHASE 3: Editorial Curation")
editorial_response = await self._execute_agent_with_stats(
'editor',
articles=validated_articles
)
if not editorial_response.success:
return self._create_error_response("Editorial curation failed", editorial_response.errors)
sections = editorial_response.data
self.stats['sections_created'] = len(sections)
self.logger.info(f"📊 Created {len(sections)} magazine sections")
# Phase 4: Content Formatting
self.logger.info("📋 PHASE 4: Content Formatting")
formatting_response = await self._execute_agent_with_stats(
'formatter',
sections=sections
)
if not formatting_response.success:
return self._create_error_response("Content formatting failed", formatting_response.errors)
formatted_sections = formatting_response.data
self.logger.info(f"📊 Formatted {len(formatted_sections)} sections")
# Phase 5: Design & Layout
self.logger.info("🎨 PHASE 5: Design & Layout")
design_response = await self._execute_agent_with_stats(
'designer',
sections=formatted_sections,
magazine_title=magazine_title
)
if not design_response.success:
return self._create_error_response("Design generation failed", design_response.errors)
magazine = design_response.data
self.logger.info(f"🎨 Created magazine: {magazine.title}")
# Phase 6: Publishing
self.logger.info("📖 PHASE 6: Publishing")
publishing_response = await self._execute_agent_with_stats(
'publisher',
magazine=magazine
)
if not publishing_response.success:
return self._create_error_response("Publishing failed", publishing_response.errors)
publication_data = publishing_response.data
# Finalize stats
self.stats['generation_end'] = datetime.now()
self.stats['total_time'] = total_timer.elapsed
self.logger.info("✅ MAGAZINE GENERATION COMPLETED SUCCESSFULLY!")
self.logger.info("=" * 60)
self._log_final_stats(publication_data)
return {
'success': True,
'magazine': publication_data['magazine'],
'files': publication_data['files'],
'metadata': publication_data['metadata'],
'quality_check': publication_data['quality_check'],
'generation_stats': self.stats,
'message': f"Magazine '{magazine.title}' generated successfully!"
}
except Exception as e:
self.logger.error(f"❌ Magazine generation failed: {str(e)}")
self.stats['errors'].append(str(e))
return self._create_error_response("Magazine generation failed", [str(e)])
async def _execute_agent_with_stats(self, agent_name: str, **kwargs) -> AgentResponse:
"""Execute an agent and track statistics"""
agent = self.agents[agent_name]
with Timer(f"{agent_name} Agent") as timer:
response = await agent.execute(**kwargs)
# Track timing
self.stats['agent_times'][agent_name] = timer.elapsed
# Track errors
if response.errors:
self.stats['errors'].extend(response.errors)
return response
def _create_error_response(self, message: str, errors: list) -> Dict[str, Any]:
"""Create standardized error response"""
self.stats['generation_end'] = datetime.now()
return {
'success': False,
'message': message,
'errors': errors,
'generation_stats': self.stats
}
def _log_final_stats(self, publication_data: Dict[str, Any]):
"""Log final generation statistics"""
magazine = publication_data['magazine']
files = publication_data['files']
quality_check = publication_data['quality_check']
self.logger.info("📊 GENERATION STATISTICS")
self.logger.info("-" * 40)
self.logger.info(f"Total Time: {self.stats['total_time']:.2f} seconds")
self.logger.info(f"Articles Processed: {self.stats['articles_processed']}")
self.logger.info(f"Sections Created: {self.stats['sections_created']}")
self.logger.info(f"Final Articles: {sum(len(section.articles) for section in magazine.sections)}")
self.logger.info("\nAgent Performance:")
for agent_name, time_taken in self.stats['agent_times'].items():
self.logger.info(f" {agent_name}: {time_taken:.2f}s")
self.logger.info("\nOutput Files:")
self.logger.info(f" HTML: {files['html_filename']} ({files['html_size']})")
self.logger.info(f" JSON: {files['json_filename']} ({files['json_size']})")
self.logger.info(f" README: {files['readme_filename']}")
self.logger.info(f"\nQuality Score: {quality_check.get('overall_score', 'N/A')}/10")
self.logger.info(f"Publication Ready: {quality_check.get('publication_ready', 'Unknown')}")
if self.stats['errors']:
self.logger.warning(f"\nErrors Encountered: {len(self.stats['errors'])}")
for error in self.stats['errors'][:5]: # Show first 5 errors
self.logger.warning(f" - {error}")
def get_agent_stats(self) -> Dict[str, Dict[str, Any]]:
"""Get statistics from all agents"""
stats = {}
for name, agent in self.agents.items():
stats[name] = agent.get_stats()
return stats
def reset_agent_stats(self):
"""Reset statistics for all agents"""
for agent in self.agents.values():
agent.reset_stats()
self.stats = {
'generation_start': None,
'generation_end': None,
'total_time': 0,
'agent_times': {},
'articles_processed': 0,
'sections_created': 0,
'errors': []
}
async def test_agents(self) -> Dict[str, bool]:
"""Test all agents to ensure they're working"""
self.logger.info("🧪 Testing all agents...")
test_results = {}
# Test each agent with minimal data
try:
# Test collector
test_response = await self.agents['collector'].execute(days_back=1, max_articles=5)
test_results['collector'] = test_response.success
if test_response.success and test_response.data:
# Test validator with collected data
test_response = await self.agents['validator'].execute(articles=test_response.data[:2])
test_results['validator'] = test_response.success
if test_response.success and test_response.data:
# Test editorial with validated data
test_response = await self.agents['editor'].execute(articles=test_response.data[:1])
test_results['editor'] = test_response.success
if test_response.success and test_response.data:
# Test formatter
test_response = await self.agents['formatter'].execute(sections=test_response.data[:1])
test_results['formatter'] = test_response.success
if test_response.success and test_response.data:
# Test designer
test_response = await self.agents['designer'].execute(
sections=test_response.data[:1],
magazine_title="Test Magazine"
)
test_results['designer'] = test_response.success
if test_response.success and test_response.data:
# Test publisher
test_response = await self.agents['publisher'].execute(magazine=test_response.data)
test_results['publisher'] = test_response.success
except Exception as e:
self.logger.error(f"Agent testing failed: {e}")
# Fill in any missing test results
for agent_name in self.agents.keys():
if agent_name not in test_results:
test_results[agent_name] = False
success_count = sum(test_results.values())
self.logger.info(f"🧪 Agent Testing: {success_count}/{len(test_results)} agents passed")
return test_results
# Convenience function for direct usage
async def generate_ai_magazine(
days_back: int = 7,
max_articles: int = 100,
magazine_title: str = None,
config_override: Dict[str, Any] = None
) -> Dict[str, Any]:
"""
Generate AI magazine with default settings
Args:
days_back: Number of days to look back for news
max_articles: Maximum articles to collect
magazine_title: Custom magazine title
config_override: Configuration overrides
Returns:
Dictionary with generation results
"""
orchestrator = MagazineOrchestrator(config_override)
return await orchestrator.generate_magazine(
days_back=days_back,
max_articles=max_articles,
magazine_title=magazine_title
)
![]() |
Notes is a web-based application for online taking notes. You can take your notes and share with others people. If you like taking long notes, notes.io is designed for you. To date, over 8,000,000,000+ notes created and continuing...
With notes.io;
- * You can take a note from anywhere and any device with internet connection.
- * You can share the notes in social platforms (YouTube, Facebook, Twitter, instagram etc.).
- * You can quickly share your contents without website, blog and e-mail.
- * You don't need to create any Account to share a note. As you wish you can use quick, easy and best shortened notes with sms, websites, e-mail, or messaging services (WhatsApp, iMessage, Telegram, Signal).
- * Notes.io has fabulous infrastructure design for a short link and allows you to share the note as an easy and understandable link.
Fast: Notes.io is built for speed and performance. You can take a notes quickly and browse your archive.
Easy: Notes.io doesn’t require installation. Just write and share note!
Short: Notes.io’s url just 8 character. You’ll get shorten link of your note when you want to share. (Ex: notes.io/q )
Free: Notes.io works for 14 years and has been free since the day it was started.
You immediately create your first note and start sharing with the ones you wish. If you want to contact us, you can use the following communication channels;
Email: [email protected]
Twitter: http://twitter.com/notesio
Instagram: http://instagram.com/notes.io
Facebook: http://facebook.com/notesio
Regards;
Notes.io Team
