
Anyone who’s built sentiment pipelines in Python knows the drill. You grab some articles, maybe pull in Google search results manually, clean up the text, then run them through TextBlob, VADER, or a Hugging Face model fine-tuned on tweets or Yelp reviews. For one-off experiments, that’s fine. You’ll get a few polarity scores, plot a bar chart, and we’ll call it insight.
But we both know that the moment you try to do this at scale — across multiple countries, in different languages, with real-time data, and a nuanced topic like politics or privacy — the cracks will show, and fast.
Hardcoded search queries alone don’t translate — literally or culturally. You’ll get irrelevant data by the thousands. Your scrapers stall on regional domains like .de and .in, and need geo-awareness. Your sentiment model, trained on restaurant reviews, doesn’t know what to make of legal commentary or state-run news, and labels everything “neutral”.
This approach is brittle, doesn’t scale, and doesn’t adapt — no planning, no contextual reasoning, no bias detection.
If you need an actually usable media intelligence pipeline, it needs to be able to:
- First, given a topic, plan out what to actually search for, in the right language, with the right cultural nuance, and adapt searches based on what was asked for vs. what we really want.
- Then, based on that plan, gather high-quality, geo-specific data from the actual web, at scale.
- With that real-world data, reason about what it means, in context, instead of just counting positive or negative words.
So I went with the AI agent approach, instead. Composable, interpretable, and capable of working like a human analyst working on real data.
I built a minimal system using Bright Data’s SERP API for live Google results, OpenAI for language and reasoning, and DSPy to avoid the utterly inexact science that is prompt engineering. Let’s talk about it.
Being Realistic About Agent Architectures
The biggest issue I faced on this project was that most starter guides out there were one of two extremes. Either toy-level demos or overengineered science projects that no one is going to use in the real world.
I needed something that made sense, was realistic, and got as close to consistency, determinism, and observability as possible in one weekend.
So instead of ReAct, AutoGPT, or LangChain chains, I built an agent that follows a Perception–Reasoning–Action (PRA) loop. This is a structured architecture where each phase is handled by a dedicated, optimized module (as opposed to inline prompts chained together) that represents a prompting technique.
Perception: Gathering Live Data
First, we need to have our agent dynamically plan the right angles to search for — meaning, a) it needs to intelligently analyze whether a topic can be adequately covered with a single search, or if it needs multiple search angles, and b) it needs to localize — both linguistically and contextually.
Here’s the core loop, simplified as pseudocode. You’ll look at the actual code shortly.
for angle in planner.decompose(topic):
query = optimizer.generate(angle=angle, country=country)
results = searcher.fetch(query, country=country)
dataset.append(results)
For each search angle, we create optimized and localized queries, execute multiple geo-targeted searches through the Bright Data’s SERP API, and merge the results intelligently to avoid duplicates.
This is overkill for surface-level queries like “iPhone vs Samsung”, but absolutely necessary for culturally sensitive or politically loaded topics like “data privacy vs censorship”.
Reasoning: Structured Sentiment Analysis
Once search results are gathered, the agent runs a multi-step reasoning phase. More than basic polarity scoring, we extract themes, emotional tone, bias indicators, intensity, and more.
Sentiment isn’t universal. What reads as “neutral” in English might come off as highly critical in another language or culture. By reasoning over real-world, geo-targeted data with a generalist LLM, we allow the model to consider not just lexical sentiment, but the underlying intent behind the words.
Basically, we’re trying to mimic how a human analyst might assess sentiment in context. This demands more tokens and more inference depth so we’re using higher temperatures, longer context windows, and more capable reasoning models this time (think OpenAI’s o-series, Claude 4, etc.)
Action: Reports, Outputs, Anything Else
Finally, the agent writes out structured outputs and summaries. We can simply print out the results as plaintext, sure, but I’ve kept the structured output so I can easily plug the data into dashboards, feed it back into further decision logic, or trigger downstream automation in the future.
All stages that need LLM calls run through DSPy, which treats each module as a first-class, optimizable component. For a pipeline this complex, I’m not relying on hand-tuned prompts and guesswork.
This is what an actually useful AI agent in production looks like — a systematic, engineered approach that reasons on accurate data, not a hallucinating, pre-biased GPT wrapper.
Coding the Agent
Here’s the basics of what I used to build this AI agent that analyzes global sentiment via live Google data:
- DSPy — A framework for declaratively building AI systems using chains, modules, and signatures. It abstracts prompt logic into reusable components and lets you optimize agents using examples — no more prompt spaghetti.
- OpenAI — Used via DSPy’s LiteLLM integration. I use a multi-model approach — fast models (
gpt-4o-minifor planning/search optimization) and slower reasoning models (o-series, for deeper sentiment analysis) together. - Bright Data SERP API — My source of structured, real-time Google search results, localized by country. It returns normalized JSON so I don’t have to scrape or parse raw HTML.
pydantic— Pydantic is the Zod equivalent for the Python ecosystem, and here it’ll validate and parse our AI outputs into a structured, typed schema. Eliminates string-parsing headaches entirely.requests— For making calls to the Bright Data SERP API via their proxy. You could use their API approach too, if you want.python-dotenv— Manages API keys and config in a .env file — standard stuff.urllib3— Used just to suppress SSL certificate warnings (not in production, though) when working with proxy networks like Bright Data.
Use your Python package manager of choice to install these (preferably as a requirements.txt file).
# 1. Basics
# Environment management
python-dotenv>=1.0.0
# HTTP requests and proxies
requests>=2.28.0
# Async support
aiohttp>=3.8.0
# 2. DSPy
# Core DSPy Framework
dspy>=2.6.16
# Data processing and models
pydantic>=2.0.0
pandas>=1.5.0
# For structured outputs and typing
typing-extensions>=4.5.0
I recommend setting up a venv for this, obviously.
Why OpenAI and not Claude or Gemini? DSPy supports a whole bunch of LLMs via LiteLLM, but OpenAI is the only one I have a subscription for. Swap it out for whichever you use, even local ones.
Why Bright Data? Google is still the best proxy for global sentiment — people search their feelings. But running queries through regular scraping means rate limits, bans, layout breakage, and country-specific quirks. Bright Data’s SERP API handles all of that, gives me localized results per country, and normalizes the output structure so I can trust my inputs.
The Bright Data SERP Primer
If you don’t have a Bright Data account yet, you can sign up for free here. Adding a payment method will grant you a $5 credit to get started — no charges upfront.
1. Sign in to Bright Data
Log in to your Bright Data account.
2. Creating a Proxy Zone
- On the My Zones page, find the SERP API section and click Get Started. This creates a proxy for you automatically.
- If you were already using an active Bright Data proxy, though, just click Add in the top-right corner of this page.
3. Assign a Name to SERP API
- Choose a meaningful name, as it cannot be changed once created.
4. Click “Add” and Verify Your Account
- If you haven’t verified your account yet, you’ll be prompted to add a payment method at this stage.
- First-time users receive a $5 bonus credit, so you can test the service without any upfront costs.
There’s additional ways to configure the SERP API you just created, but we won’t have to think about those just yet.
Once that’s done, this is what you’ll need to copy out in the “Overview” tab, for your .env file.
# Bright Data SERP API Configuration
BRIGHT_DATA_CUSTOMER_ID=hl_xxxxxxx
BRIGHT_DATA_ZONE=xxxxxx
BRIGHT_DATA_PASSWORD=xxxxxxxxxxxxx
# OpenAI API
OPENAI_API_KEY="sk-proj-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
Throw in your OpenAI (or whatever LLM-as-a-service you’re using) key here too, while we have the env file open. DSPy will detect and use it implicitly.
The DSPy Primer
If you’ve ever tried prompt engineering — and let’s be honest, this means writing a brittle chain of prompts inline and attempting to parse GPT outputs with split(“\n”) DSPy will feel like a breath of fresh air.
Instead of inline prompts, you define Signatures — structured input/output schemas. Think of them as functions/methods in programming.
Then you build Modules around those signatures, where you wire in reasoning logic with ChainOfThought (or ReAct, or any of these other prompting strategies), use multiple models, swap out components during optimization, go nuts. It’s all declarative.
Here’s a real example from our project:
class TopicDecompositionSignature(dspy.Signature):
"""
Is this topic complex enough to need multiple search angles for full coverage?
"""
topic: str = dspy.InputField(description="The sentiment analysis topic")
country: str = dspy.InputField(description="Target country for analysis")
language: str = dspy.InputField(description="Country's primary language")
# Use basic types for DSPy signatures (no Pydantic needed here)
needs_breakdown: bool = dspy.OutputField(
description="Whether topic needs multiple search angles"
)
search_angles: List[str] = dspy.OutputField(
description="List of search angles to pursue"
)
reasoning: str = dspy.OutputField(
description="Reasoning for the decomposition decision"
)
You’ll read more on this later, but for now, know that this Signature is the equivalent of a regular prompt that decides whether a given topic (like “TikTok regulation”) needs to be decomposed into sub-angles for better sentiment granularity.
Notice that there’s no prompt in sight. You don’t actually hand-craft a prompt with DSPy, it generates the most optimal one behind the scenes for you, based on your Signature (inputs, outputs, and descriptions for them). So focus on getting those right.
You define your LM logic using DSPy modules that are abstractions of prompting techniques like ChainOfThought, ReAct, RAG, or even custom prompting — passing in a Signature like this.
# Define a reasoning chain using your signature
chain = dspy.ChainOfThought(TopicDecompositionSignature)
# Run that inside the context of your chosen language model
with dspy.context(lm=self.some_lm_you_have):
result = chain(
topic=topic,
country=country.name,
language=country.language
)
We use two more like it in this project, as you’ll see: QueryOptimizationSignature (transforms vague search phrases into region- and language-aware queries) and SentimentAnalysisSignature (pulls structured insights from raw Google search data — emotions, themes, bias signals, confidence scoring, and more.), both using Chain of Thought.
Is DSPy a black box then? Not at all — you can always see the optimized prompts DSPy actually generates by turning on logging (or using lm.inspect_history , even though that’s going to be a long read since it includes the intermediate prompts DSPy used to get to the final optimized prompt), and tweaking them further yourself with DSPy’s Few Shot or MIPRO.
DSPy gives us the control of typed systems, the flexibility of LLMs, and the ability to compose them into reasoning chains without ever touching raw prompt strings.
Coding our PRA architecture
Let’s look at how this actually runs — starting from the top — the main() function. This does three things:
- Configures DSPy with a default model (we’ll just use the fast one here)
- Creates the agent
- Runs sentiment analysis across countries in parallel
Here’s a simplified version:
async def main():
dspy.configure(lm=dspy.LM(model=CONFIG.fast_model)) # set up a default LM for DSPy, we can override this later for individual tasks
agent = ParallelSentimentAnalyzer() # runs PRA loop for all countries
results = agent(topic=CONFIG.topic, countries=CONFIG.countries)
# Basic result handling…
Behind the scenes, this ParallelSentimentAnalyzer (you guessed it) runs SentimentAgent in parallel ( for each country — executing the full Perception → Reasoning → Action pipeline per region.
We should start with SentimentAgent — a single method that runs the full PRA loop for one country and returns a clean dictionary of insights, ready for downstream use.
To make each part of this agent loop modular (so we can swap out our libraries/subscriptions if needed), we’ll break it out into four modules.
class SentimentAgent(dspy.Module):
"""
1. orchestrates the complete sentiment analysis pipeline
2. coordinates multiple DSPy + regular python modules for the same
"""
def __init__(self):
super().__init__()
# first, let's initialize the modules we need for our PRA loop
# 1. for Perception phase
self.planner = PlanDecomposer() # for intelligent search planning (DSPy)
self.optimizer = QueryOptimizer() # for query optimization (DSPy)
self.searcher = BrightDataSearcher() # for executing google search based on that plan
# 2. for Reasoning phase
self.reasoner = SentimentReasoner() # for deep sentiment analysis (DSPy)
# 3. for Action phase
# We don't really need a module for this; it's just a single output function (reports/logging)
def perceive(self, topic: str, country: CountryConfig) -> tuple[PerceptionData, SearchData]:
"""
PERCEPTION PHASE
1. determines search strategy, executes optimized queries, and structures results
2. returns both execution metadata and processed search data
"""
# Implementation details in Step 1...
def reason(self, topic: str, country: str, search_data: SearchData) -> SentimentAnalysis:
"""
REASONING PHASE
1. AI-powered sentiment analysis
2. uses advanced reasoning model to analyze search data and extract comprehensive insights
3. produces structured sentiment analysis with confidence + quality metrics
"""
# Implementation details in Step 2...
def act(self, analysis_result: Dict[str, Any]) -> Dict[str, Any]:
"""
ACTION PHASE
1. takes analysis results and produces actionable outputs (reports, summaries)
"""
# Implementation details in Step 3...
def forward(self, topic: str, country: CountryConfig) -> Dict[str, Any]:
"""
Executes the sentiment analysis pipeline above for a single country
"""
start_time = time.time()
try:
# PERCEPTION PHASE
perception, search_data = self.perceive(topic, country)
# REASONING PHASE
analysis = self.reason(topic, country.name, search_data)
# ACTION PHASE
base_result = {
"country": country.name,
"topic": topic,
"sentiment": analysis.sentiment
# more fields here
}
final_result = self.act(base_result)
return final_result
except Exception as e:
# Error handling and fallback result...
The forward method demonstrates our complete PRA cycle:
- Perception Phase: The agent first calls
perceive()to gather and structure real-time search data from the target country, which usesPlanDecomposer(),QueryOptimizer(), andBrightDataSearcher()— 2 DSPy modules and 1 regular Python module under the hood. - Reasoning Phase: It then calls
reason()to perform sentiment analysis on that data using advanced AI models — using theSentimentAnalyzer()DSPy module. - Action Phase: Finally, it calls
act()to generate actionable insights and recommendations. That one’s just a simple, single function, no need for a module there.
This sums up a complete media intelligence pipeline that goes from raw web data to actionable insights.
💡 What’s forward() ?It could just as easily be called execute() or similar, but this is a convention borrowed from deep learning ecosystems (which use PyTorch extensively). Like PyTorch, DSPy modules also use a forward() function to define their primary computation flow.
Step 1: Perception (Planning and Search)
The perception phase is where the agent decides what to search for and how to get the data.
def perceive(self, topic: str, country: CountryConfig) -> tuple[PerceptionData, SearchData]:
"""
Perception Phase: Intelligent data gathering with adaptive strategy
1. Plans search strategy (single vs multi-angle approach)
2. Creates optimized queries and executes them
3. Structures raw data for analysis
"""
# STEP 1: Determine search strategy
decomposition = self.planner(topic=topic, country=country)
# STEP 2: Execute search based on strategy
if decomposition.needs_breakdown:
# Multi-angle approach for complex topics
all_results = []
for angle in decomposition.search_angles:
query = self.optimizer(topic=topic, country=country, search_angle=angle)
results = self.searcher.search(query, country)
all_results.append(results)
combined_data = self._merge_search_results(all_results)
strategy = "multi-angle"
else:
# Single optimized search
query = self.optimizer(topic=topic, country=country)
combined_data = self.searcher.search(query, country)
strategy = "single"
# STEP 3: Structure the data
search_data = extract_search_data(combined_data)
# Create perception metadata
perception = PerceptionData(
country=country.name,
topic=topic,
search_strategy=strategy,
sources_count=len(search_data.organic),
data_quality="high" if len(search_data.organic) > 10 else "medium"
)
return perception, search_data
Our first step is to see if the topic needs a single search, or multiple decomposed searches. That uses the PlanDecomposer module.
class PlanDecomposer(dspy.Module):
"""
DSPy Module: Intelligent topic decomposition planning
Analyzes topics to determine if multiple search angles are needed for comprehensive coverage
Uses fast model for quick strategic decisions
"""
def __init__(self):
super().__init__()
# Using fast model for quick decomposition decisions
self.fast_lm = dspy.LM(model=CONFIG.fast_model)
self.decompose = dspy.ChainOfThought(TopicDecompositionSignature)
def forward(self, topic: str, country: CountryConfig) -> DecompositionPlan:
"""
Main execution method: Analyzes topic complexity and determines search strategy
Returns structured plan with reasoning for single vs multi-angle approach
"""
try:
# Use fast model for quick decomposition decisions
with dspy.context(lm=self.fast_lm):
result = self.decompose(
topic=topic,
country=country.name,
language=country.language
)
# Convert DSPy result to Pydantic model
return DecompositionPlan(
needs_breakdown=result.needs_breakdown,
search_angles=result.search_angles or [],
reasoning=result.reasoning
)
except Exception as e:
logging.error(f"Decomposition failed for {country.name}: {e}")
return DecompositionPlan(
needs_breakdown=False,
search_angles=[],
reasoning=f"Decomposition failed: {str(e)}"
)
This module uses a fast OpenAI model to decide whether a topic needs to be broken down into multiple angles (e.g. “TikTok ban” into “TikTok privacy” and “TikTok national security”). It outputs data into the shape of a DecompositionPlan, our Pydantic model that represents the reasoning and search angles.
The DSPy signature that defines this interaction looks like:
class TopicDecompositionSignature(dspy.Signature):
"""
DSPy Signature: Defines the input/output interface for topic decomposition
Analyzes whether complex topics need multiple search angles for comprehensive coverage
"""
topic: str = dspy.InputField(description="The sentiment analysis topic")
country: str = dspy.InputField(description="Target country for analysis")
language: str = dspy.InputField(description="Country's primary language")
needs_breakdown: bool = dspy.OutputField(description="Whether topic needs multiple search angles")
search_angles: List[str] = dspy.OutputField(description="List of search angles to pursue")
reasoning: str = dspy.OutputField(description="Reasoning for the decomposition decision")
Note how we never had to write a long, descriptive prompt for this. The DSPy module uses Chain of Thought reasoning (more on that later) to intelligently use our topic to produce a prompt that will yield better sentiment analysis data.
Okay, so we’ve decomposed the search. Now, let’s optimize the actual search terms. QueryOptimizer does exactly that — generating optimal, localized queries using those decomposed angles.
For example, for a topic like “TikTok Bans and Data Privacy Concerns” this’ll give us “TikTok ban controversy United States Congress legislation data privacy concerns social media regulation site:.us OR site:.gov” for US regions, and “TikTok Verbot Deutschland Regierung Politik Datenschutz DSGVO Bundesregierung site:.de OR site:.bundestag.de” for Germany.
class QueryOptimizer(dspy.Module):
"""
DSPy Module: Intelligent search query optimization
Transforms basic search terms into optimized queries tailored for each country and search angle
Uses fast model for efficient query generation
"""
def __init__(self):
super().__init__()
# Using fast model for quick query optimization
self.fast_lm = dspy.LM(model=CONFIG.fast_model)
self.optimize = dspy.ChainOfThought(QueryOptimizationSignature)
def forward(self, topic: str, country: CountryConfig, search_angle: str = "general") -> str:
"""
Main execution method: Generates optimized search queries
Considers topic, country context, language, and specific search angle to create effective queries
"""
try:
# Use fast model for quick query optimization
with dspy.context(lm=self.fast_lm):
result = self.optimize(
topic=topic,
country=country.name,
language=country.language,
search_angle=search_angle,
original_terms=country.search_terms
)
return result.optimized_query
except Exception as e:
logging.error(f"Query optimization failed for {country.name}: {e}")
return f"{topic} {country.name}"
Our optimizer makes sure we:
- Use localized terms for localized searches
- Use culturally relevant terms like “Congress”, “legislation”, “DSGVO”
- Only use authoritative domains like .gov, .bundestag.de
- Tailor queries based on specific angles (for “tiktok ban”, this would be policy, public opinion, etc.)
- Add related terms that improve search coverage
With our optimized search plan in hand, we now send those queries through our main data gathering module — BrightDataSearcher — which uses Bright Data’s SERP API to gather country-specific Google results.
Again, for clarity, this has nothing to do with DSPy, this is just a regular Python module.
class BrightDataSearcher:
"""
Search engine client: Executes Google searches through Bright Data proxy
Handles geo-localized searches with proxy rotation and SSL configuration
Manages rate limiting and error handling for reliable data collection
"""
def __init__(self):
self.session = requests.Session()
self.proxy_url = f"http://brd-customer-{CONFIG.customer_id}-zone-{CONFIG.zone}:{CONFIG.password}@{CONFIG.proxy_host}:{CONFIG.proxy_port}"
self.session.proxies = {
'http': self.proxy_url,
'https': self.proxy_url
}
# Disable SSL certificate verification (equivalent to NODE_TLS_REJECT_UNAUTHORIZED=0)
self.session.verify = False
def search(self, query: str, country: CountryConfig) -> Dict[str, Any]:
"""
Core search method: Executes geo-localized Google searches
Configures language, region, and proxy settings for country-specific results
Returns structured JSON data from search results
"""
try:
search_url = (
f"https://www.google.com/search"
f"?q={requests.utils.quote(query)}"
f"&num={CONFIG.max_results}"
f"&brd_json=1"
)
if country.language != 'en':
search_url += f"&hl={country.language}&lr=lang_{country.language}"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json, text/html, */*',
'Accept-Language': f"{country.language};q=0.9,en;q=0.8"
}
response = self.session.get(search_url, headers=headers, timeout=30)
response.raise_for_status()
return response.json()
except Exception as e:
logging.error(f"Search failed for {country.name}: {e}")
raise
Bright Data’s SERP API makes our Perception phase both resilient and scalable. This abstracts away the complexity of scraping and delivers clean, structured data from Google — exactly what our reasoning modules need.
Under the hood, Bright Data’s proxy infrastructure:
- Routes each search through a residential IP from the correct geo (e.g., Germany, France, India),
- Ensures we get a structured JSON response (not brittle HTML),
- Auto-bypasses CAPTCHA, rate limits, and country restrictions,
- Lets us run multi-angle, multi-region sentiment analysis without worrying about IP bans or being shadow-blocked by Google after only a request or two.
We don’t just want to know what the blobbed internet thinks — we want to know what Germany thinks, or France, or India. Bright Data makes that possible by transforming Google (a normally defensive and hostile environment for scraping) into a structured API. Without it, our multi-country sentiment pipeline would be inconsistent at best, and completely blocked at worst.
Finally, this response is structured into a SearchData object using the helper function extract_search_data(). It’s a straightforward utility that maps the raw JSON into a typed schema for consistency downstream — nothing fancy, just structure and validation. (You’ll see it in the full codebase, but I’ve skipped it here for brevity.)
We end up with organic search results, news articles, and “People Also Ask” questions — a rich dataset that captures the current conversation around our topic in each country.
Step 2: Reasoning (Sentiment Analysis)
The reasoning phase is where the agent makes sense of the data it collected — analyzing sentiment, identifying themes, and evaluating bias.
def reason(
self, topic: str, country: str, search_data: SearchData
) -> SentimentAnalysis:
"""
Reasoning Phase: Deep AI-powered sentiment analysis
1. Analyzes structured search data using advanced reasoning model
2. Extracts sentiment, emotions, themes, and confidence metrics
3. Detects bias indicators and assesses data quality
"""
logging.info(f"REASONING: Analyzing sentiment for {country}")
return self.reasoner(topic=topic, country=country, search_data=search_data)
This is where we make use of our SentimentReasoner DSPy module — specifically, with the DSPy module for the Chain of Thought (CoT) prompting technique.
By using CoT reasoning, the model is actively unpacking its reasoning steps (as opposed to just reacting to data), which improves both interpretability and reliability in ambiguous cases. For nuanced topics (let’s say, censorship vs. security), this gives us more trustworthy sentiment classification than raw logits or classifier outputs.
class SentimentReasoner(dspy.Module):
"""
DSPy Module: Advanced sentiment analysis with Chain of Thought reasoning
Uses powerful reasoning model (o4-mini) for comprehensive sentiment analysis
Produces structured insights with confidence metrics and bias detection
"""
def __init__(self):
super().__init__()
# Using the reasoning model (o4-mini) here
self.reasoning_lm = dspy.LM(
model=CONFIG.reasoning_model,
temperature=1.0,
max_tokens=20000
)
self.analyze = dspy.ChainOfThought(SentimentAnalysisSignature)
def forward(self, topic: str, country: str, search_data: SearchData) -> SentimentAnalysis:
"""
Main execution method: Performs deep sentiment analysis using Chain of Thought
Analyzes search results to extract sentiment, emotions, themes, and quality metrics
"""
try:
# Use reasoning model for complex sentiment analysis
formatted_data = json.dumps(search_data.dict(), indent=2)
with dspy.context(lm=self.reasoning_lm):
result = self.analyze(
topic=topic,
country=country,
search_data=formatted_data
)
# Convert DSPy result to Pydantic model
return SentimentAnalysis(
sentiment=result.sentiment,
emotions=result.emotions or [],
themes=result.themes or [],
intensity=result.intensity,
confidence=float(result.confidence) if result.confidence else 0.0,
data_quality=result.data_quality,
bias_indicators=result.bias_indicators or [],
summary=result.summary
)
except Exception as e:
logging.error(f"Sentiment analysis failed for {country}: {e}")
return SentimentAnalysis(
# some default SentimentAnalysis object
)
This reasoning module is intentionally designed to be more expensive (token- and compute-wise), because it’s the most critical link in the chain.
By leveraging CoT reasoning, a larger model (o-series), and structured outputs, we make this step both interpretable and reliable, and because it returns typed data as a SentimentAnalysis Pydantic object it can plug cleanly into future downstream systems like scoring layers, or dashboards (or just serialized for APIs/logs/database storage).
Step 3: Action (Final Output, Saving Results)
The action phase is where the agent turns all that analysis into output. We don’t really need a separate module for this since all we’re doing right now is building a markdown report (and logging), but consider it if your use case is more complex.
def act(self, analysis_results: List[Dict[str, Any]], topic: str) -> Dict[str, Any]:
"""
Action Phase: Deliver actionable outputs
1. Compiles analysis results into a markdown report
2. Saves structured data and formatted report to filesystem
3. Returns metadata with paths and status
"""
try:
# Generate markdown report
markdown_report = self._generate_markdown_report(analysis_results, topic)
# Save it
report_path = save_results(
markdown_report, "sentiment_analysis_dspy_report.md", "Report"
)
# Save raw data for audit
raw_data_path = save_results(
analysis_results, "sentiment_data_raw.json", "Raw data"
)
return {
"action_type": "report_generation",
"markdown_report": markdown_report,
"report_path": str(report_path) if report_path else None,
"raw_data_path": str(raw_data_path) if raw_data_path else None,
"countries_analyzed": len(analysis_results),
"action_timestamp": datetime.now().isoformat(),
"action_complete": True,
}
except Exception as e:
return {
"action_type": "report_generation",
"action_complete": False,
"error": str(e),
"action_timestamp": datetime.now().isoformat(),
}
Note that this uses a utility function, _generate_markdown_report which just formats available data into a markdown report. Again, I’m not gonna include that part here for brevity (it’s just Python), but you can see it in the full code.
And that’s it! We’re done.
Bringing It All Together
Here’s the full code.
#!/usr/bin/env python3
"""
Advanced AI Sentiment Analysis Agent with DSPy Framework (no more prompt engineering!) and multi-model approach
Architecture: Perception-Reasoning-Action (PRA) Pattern with Bright Data SERP + DSPy Modules
OpenAI usage per country analysis:
1. PlanDecomposer: 1 call to fast model
2. QueryOptimizer: 1-4 calls to fast model (per search angle)
3. SentimentReasoner: 1 call to reasoning model (single reasoning pass)
"""
# Internal
import os
import json
import time
import asyncio
import logging
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Optional, Literal, Any
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completed
# External
import dspy
import requests
from pydantic import BaseModel, Field
from dotenv import load_dotenv
import urllib3
# Disable SSL warnings and certificate verification (not recommended for production)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Get env vars
load_dotenv()
# ============================================================================
# CONFIGURATION AND DATA MODELS
# ============================================================================
@dataclass
class CountryConfig:
"""
Configuration class: Defines country-specific search parameters
Contains localized search terms and language settings for each target country
"""
name: str
search_terms: str
language: str
class CONFIG:
"""
Global configuration class: Contains all system settings and parameters
Centralizes API keys, model settings, search parameters, and debug flags
"""
# Bright Data Configuration
customer_id = os.getenv("BRIGHT_DATA_CUSTOMER_ID")
zone = os.getenv("BRIGHT_DATA_ZONE")
password = os.getenv("BRIGHT_DATA_PASSWORD")
proxy_host = "brd.superproxy.io"
proxy_port = 33335
# Analysis Configuration
topic = "TikTok Bans and Data Privacy Concerns"
countries = [
CountryConfig("US", "TikTok ban OR data privacy site:.us", "en"),
CountryConfig("India", "TikTok ban OR data privacy site:.in", "en"),
CountryConfig("Germany", "TikTok Verbot OR Datenschutz site:.de", "de"),
CountryConfig("France", "TikTok interdiction OR vie privée site:.fr", "fr"),
]
max_results = 25
delay_between_requests = 1000
data_dir = Path("./data")
# DSPy Configuration with Multiple Models
fast_model = "openai/gpt-4o-mini" # For decomposition & query optimization (fast, simple tasks)
reasoning_model = "openai/o4-mini" # For sentiment analysis (complex reasoning)
max_threads = 4
# ============================================================================
# DSPy SIGNATURES
# ============================================================================
class TopicDecompositionSignature(dspy.Signature):
"""Analyzes whether complex topics need multiple search angles for comprehensive coverage"""
topic: str = dspy.InputField(description="The sentiment analysis topic")
country: str = dspy.InputField(description="Target country for analysis")
language: str = dspy.InputField(description="Country's primary language")
# Use basic types for DSPy signatures (no Pydantic needed)
needs_breakdown: bool = dspy.OutputField(
description="Whether topic needs multiple search angles"
)
search_angles: List[str] = dspy.OutputField(
description="List of search angles to pursue"
)
reasoning: str = dspy.OutputField(
description="Reasoning for the decomposition decision"
)
class QueryOptimizationSignature(dspy.Signature):
"""Transforms basic search terms into optimized queries for better sentiment data collection"""
topic: str = dspy.InputField(description="Base topic for sentiment analysis")
country: str = dspy.InputField(description="Target country")
language: str = dspy.InputField(description="Target language")
search_angle: str = dspy.InputField(
description="Specific search angle or 'general'"
)
original_terms: str = dspy.InputField(description="Original search terms")
optimized_query: str = dspy.OutputField(
description="Optimized search query for sentiment analysis"
)
class SentimentAnalysisSignature(dspy.Signature):
"""Takes search results and produces structured sentiment insights with confidence metrics"""
topic: str = dspy.InputField(description="The topic being analyzed")
country: str = dspy.InputField(description="Country context")
search_data: str = dspy.InputField(description="JSON-formatted search results")
# basic types and Literal types that DSPy can handle
sentiment: Literal["Supportive", "Critical", "Mixed", "Neutral"] = dspy.OutputField(
description="Overall sentiment towards the topic"
)
emotions: List[str] = dspy.OutputField(description="Emotional undertones detected")
themes: List[str] = dspy.OutputField(
description="Key themes mentioned (2-4 short phrases)"
)
intensity: Literal["mild", "moderate", "strong"] = dspy.OutputField(
description="Strength of sentiment expression"
)
confidence: float = dspy.OutputField(description="Confidence score from 0-100")
data_quality: Literal["high", "medium", "low"] = dspy.OutputField(
description="Quality of source data"
)
bias_indicators: List[str] = dspy.OutputField(
description="Potential bias indicators"
)
summary: str = dspy.OutputField(
description="Comprehensive 3-4 sentence analysis referencing specific search results"
)
# ============================================================================
# PYDANTIC MODELS FOR DATA VALIDATION (SEPARATE FROM DSPy SIGNATURES)
# ============================================================================
class DecompositionPlan(BaseModel):
"""Strategy for breaking down complex topics into search angles"""
needs_breakdown: bool = Field(
description="Whether topic needs multiple search angles"
)
search_angles: List[str] = Field(
default=[], description="List of search angles to pursue"
)
reasoning: str = Field(description="Reasoning for the decomposition decision")
class SentimentAnalysis(BaseModel):
"""Complete sentiment analysis results with validation"""
sentiment: Literal["Supportive", "Critical", "Mixed", "Neutral"] = Field(
description="Overall sentiment towards the topic"
)
emotions: List[str] = Field(description="Emotional undertones detected")
themes: List[str] = Field(description="Key themes mentioned (2-4 short phrases)")
intensity: Literal["mild", "moderate", "strong"] = Field(
description="Strength of sentiment expression"
)
confidence: float = Field(ge=0, le=100, description="Confidence score from 0-100")
data_quality: Literal["high", "medium", "low"] = Field(
description="Quality of source data"
)
bias_indicators: List[str] = Field(description="Potential bias indicators")
summary: str = Field(
description="Comprehensive 3-4 sentence analysis referencing specific search results"
)
class SearchData(BaseModel):
"""Organizes raw search results from multiple sources"""
organic: List[Dict[str, Any]] = Field(default=[])
news: List[Dict[str, Any]] = Field(default=[])
people_also_ask: List[Dict[str, Any]] = Field(default=[])
class PerceptionData(BaseModel):
"""Captures metadata from the perception phase"""
timestamp: str
country: str
topic: str
search_strategy: str
search_angles: Optional[List[str]]
sources_count: int
data_quality: str
# ============================================================================
# DSPy MODULES
# ============================================================================
class PlanDecomposer(dspy.Module):
"""Analyzes topics to determine if multiple search angles are needed
Uses fast model for quick strategic decisions"""
def __init__(self):
super().__init__()
# Using fast model for quick decomposition decisions
self.fast_lm = dspy.LM(model=CONFIG.fast_model)
self.decompose = dspy.ChainOfThought(TopicDecompositionSignature)
def forward(self, topic: str, country: CountryConfig) -> DecompositionPlan:
"""Analyzes topic complexity and determines search strategy"""
try:
# Use fast model for quick decomposition decisions
with dspy.context(lm=self.fast_lm):
result = self.decompose(
topic=topic, country=country.name, language=country.language
)
# CORRECTED: Convert DSPy result to Pydantic model
return DecompositionPlan(
needs_breakdown=result.needs_breakdown,
search_angles=result.search_angles or [],
reasoning=result.reasoning,
)
except Exception as e:
logging.error(f"Decomposition failed for {country.name}: {e}")
return DecompositionPlan(
needs_breakdown=False,
search_angles=[],
reasoning=f"Decomposition failed: {str(e)}",
)
class QueryOptimizer(dspy.Module):
"""Transforms basic search terms into optimized queries tailored for each country
Uses fast model for efficient query generation"""
def __init__(self):
super().__init__()
# Using fast model for quick query optimization
self.fast_lm = dspy.LM(model=CONFIG.fast_model)
self.optimize = dspy.ChainOfThought(QueryOptimizationSignature)
def forward(
self, topic: str, country: CountryConfig, search_angle: str = "general"
) -> str:
"""Generates optimized search queries for topic, country, and search angle"""
try:
# Use fast model for quick query optimization
with dspy.context(lm=self.fast_lm):
result = self.optimize(
topic=topic,
country=country.name,
language=country.language,
search_angle=search_angle,
original_terms=country.search_terms,
)
return result.optimized_query
except Exception as e:
logging.error(f"Query optimization failed for {country.name}: {e}")
return f"{topic} {country.name}"
class SentimentReasoner(dspy.Module):
"""Advanced sentiment analysis with Chain of Thought reasoning
Uses powerful reasoning model (o4-mini) for comprehensive sentiment analysis"""
def __init__(self):
super().__init__()
# Using reasoning model for complex sentiment analysis with required parameters
self.reasoning_lm = dspy.LM(
model=CONFIG.reasoning_model, temperature=1.0, max_tokens=20000
)
self.analyze = dspy.ChainOfThought(SentimentAnalysisSignature)
def forward(
self, topic: str, country: str, search_data: SearchData
) -> SentimentAnalysis:
"""Performs deep sentiment analysis using Chain of Thought"""
try:
# Use reasoning model for complex sentiment analysis
formatted_data = json.dumps(search_data.dict(), indent=2)
with dspy.context(lm=self.reasoning_lm):
result = self.analyze(
topic=topic, country=country, search_data=formatted_data
)
# CORRECTED: Convert DSPy result to Pydantic model
return SentimentAnalysis(
sentiment=result.sentiment,
emotions=result.emotions or [],
themes=result.themes or [],
intensity=result.intensity,
confidence=float(result.confidence) if result.confidence else 0.0,
data_quality=result.data_quality,
bias_indicators=result.bias_indicators or [],
summary=result.summary,
)
except Exception as e:
logging.error(f"Sentiment analysis failed for {country}: {e}")
return SentimentAnalysis(
sentiment="Neutral",
emotions=["error"],
themes=["Analysis failed"],
intensity="mild",
confidence=0.0,
data_quality="low",
bias_indicators=["analysis_error"],
summary=f"AI reasoning failed: {str(e)}",
)
# ============================================================================
# SEARCH AND DATA GATHERING
# ============================================================================
class BrightDataSearcher:
"""Search engine client for geo-localized Google searches through Bright Data proxy
Handles proxy rotation and SSL configuration for reliable data collection"""
def __init__(self):
self.session = requests.Session()
self.proxy_url = f"http://brd-customer-{CONFIG.customer_id}-zone-{CONFIG.zone}:{CONFIG.password}@{CONFIG.proxy_host}:{CONFIG.proxy_port}"
self.session.proxies = {"http": self.proxy_url, "https": self.proxy_url}
# Disable SSL certificate verification (equivalent to NODE_TLS_REJECT_UNAUTHORIZED=0)
self.session.verify = False
def search(self, query: str, country: CountryConfig) -> Dict[str, Any]:
"""Executes geo-localized Google searches with language and region settings"""
try:
search_url = (
f"https://www.google.com/search"
f"?q={requests.utils.quote(query)}"
f"&num={CONFIG.max_results}"
f"&brd_json=1"
)
if country.language != "en":
search_url += f"&hl={country.language}&lr=lang_{country.language}"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "application/json, text/html, */*",
"Accept-Language": f"{country.language};q=0.9,en;q=0.8",
}
response = self.session.get(search_url, headers=headers, timeout=30)
response.raise_for_status()
return response.json()
except Exception as e:
logging.error(f"Search failed for {country.name}: {e}")
raise
def extract_search_data(raw_results: Dict[str, Any]) -> SearchData:
"""Transforms raw search API responses into structured data
Extracts organic results, news articles, and related questions"""
return SearchData(
organic=[
{
"title": result.get("title", ""),
"description": result.get("description", ""),
"link": result.get("link", ""),
"display_link": result.get("display_link", ""),
}
for result in raw_results.get("organic", [])[: CONFIG.max_results]
],
news=[
{
"title": news.get("title", ""),
"description": news.get("description", ""),
"source": news.get("source", ""),
}
for news in raw_results.get("news", [])[:5]
],
people_also_ask=[
{
"question": paa.get("question", ""),
"answers": [
{"text": answer.get("value", {}).get("text", "")}
for answer in paa.get("answers", [])[:1]
],
}
for paa in raw_results.get("people_also_ask", [])[:3]
],
)
# ============================================================================
# MAIN AGENT ARCHITECTURE: PERCEPTION-REASONING-ACTION
# ============================================================================
class SentimentAgent(dspy.Module):
"""Main AI Agent implementing Perception-Reasoning-Action (PRA) architecture
Orchestrates the complete sentiment analysis pipeline from data gathering to insights"""
def __init__(self):
super().__init__()
# PRA Module initialization
self.planner = PlanDecomposer()
self.optimizer = QueryOptimizer()
self.reasoner = SentimentReasoner()
self.searcher = BrightDataSearcher()
def perceive(
self, topic: str, country: CountryConfig
) -> tuple[PerceptionData, SearchData]:
"""
Perception Phase: Intelligent data gathering with adaptive strategy
1. Plans search strategy (single vs multi-angle approach)
2. Creates optimized queries and executes them
3. Structures raw data for analysis
"""
logging.info(f"PERCEPTION: Starting data gathering for {country.name}")
try:
# STEP 1: Plan decomposition strategy
decomposition = self.planner(topic=topic, country=country)
# STEP 2: Execute search strategy
if decomposition.needs_breakdown and decomposition.search_angles:
# Multi-angle approach
logging.info(
f"Using decomposed search with {len(decomposition.search_angles)} angles"
)
all_results = []
for angle in decomposition.search_angles:
optimized_query = self.optimizer(
topic=topic, country=country, search_angle=angle
)
raw_data = self.searcher.search(optimized_query, country)
all_results.append(raw_data)
# Merge results
combined_data = self._merge_search_results(all_results)
search_strategy = "decomposed"
search_angles = decomposition.search_angles
else:
# Single search approach
logging.info("Using single search strategy")
optimized_query = self.optimizer(topic=topic, country=country)
raw_data = self.searcher.search(optimized_query, country)
combined_data = raw_data
search_strategy = "single"
search_angles = None
# STEP 3: Structure the data
search_data = extract_search_data(combined_data)
perception = PerceptionData(
timestamp=datetime.now().isoformat(),
country=country.name,
topic=topic,
search_strategy=search_strategy,
search_angles=search_angles,
sources_count=len(search_data.organic),
data_quality="high" if len(search_data.organic) > 10 else "medium",
)
logging.info(
f"PERCEPTION SUCCESS: {perception.sources_count} sources via {search_strategy}"
)
return perception, search_data
except Exception as e:
logging.error(f"PERCEPTION FAILED for {country.name}: {e}")
raise
def reason(
self, topic: str, country: str, search_data: SearchData
) -> SentimentAnalysis:
"""
Reasoning Phase: Deep AI-powered sentiment analysis with Chain of Thought
1. Analyzes structured search data using advanced reasoning model
2. Extracts sentiment, emotions, themes, and confidence metrics
3. Detects bias indicators and assesses data quality
"""
logging.info(f"REASONING: Analyzing sentiment for {country}")
return self.reasoner(topic=topic, country=country, search_data=search_data)
def act(self, analysis_results: List[Dict[str, Any]], topic: str) -> Dict[str, Any]:
"""
Action Phase: Generate insights and deliver actionable outputs
1. Compiles analysis results into comprehensive markdown report
2. Saves structured data and formatted reports to filesystem
3. Returns action metadata with paths and completion status
"""
logging.info(
f"ACTION: Generating markdown report for {len(analysis_results)} countries"
)
try:
# Generate markdown report
markdown_report = self._generate_markdown_report(analysis_results, topic)
# Save the report
report_path = save_results(
markdown_report, "sentiment_analysis_dspy_report.md", "Report"
)
# Save raw data
raw_data_path = save_results(
analysis_results, "sentiment_data_raw.json", "Raw data"
)
logging.info(f"ACTION SUCCESS: Report saved to {report_path}")
logging.info(f"ACTION SUCCESS: Raw data saved to {raw_data_path}")
return {
"action_type": "report_generation",
"markdown_report": markdown_report,
"report_path": str(report_path) if report_path else None,
"raw_data_path": str(raw_data_path) if raw_data_path else None,
"countries_analyzed": len(analysis_results),
"action_timestamp": datetime.now().isoformat(),
"action_complete": True,
}
except Exception as e:
logging.error(f"ACTION FAILED: Report generation failed - {e}")
return {
"action_type": "report_generation",
"action_complete": False,
"error": str(e),
"action_timestamp": datetime.now().isoformat(),
}
def _generate_markdown_report(
self, results: List[Dict[str, Any]], topic: str
) -> str:
"""
Generate comprehensive markdown analysis report
Formats sentiment analysis results into readable business intelligence format
"""
timestamp = datetime.now().strftime("%Y-%m-%d at %H:%M:%S")
markdown = f'# Global Sentiment Analysis: "{topic}"\n\n'
markdown += f"*Generated on: {timestamp}*\n"
markdown += (
f"*Powered by: DSPy Framework + Bright Data (Multi-Model Version)*\n"
)
markdown += f"*Fast Model (Decomposition/Optimization): {CONFIG.fast_model}*\n"
markdown += (
f"*Reasoning Model (Sentiment Analysis): {CONFIG.reasoning_model}*\n\n"
)
# Summary table
markdown += "## Executive Summary\n\n"
markdown += (
"| Country | Sentiment | Intensity | Confidence | Strategy | Key Themes |\n"
)
markdown += (
"|---------|-----------|-----------|------------|----------|------------|\n"
)
for result in results:
themes = ", ".join(result.get("themes", [])[:2])
markdown += (
f"| {result['country']} | {result['sentiment']} | "
f"{result['intensity']} | {result['confidence']:.1f}% | "
f"{result['search_strategy']} | {themes} |\n"
)
# Detailed analysis
markdown += "\n## Detailed Country Analysis\n\n"
for i, result in enumerate(results, 1):
markdown += f"### {i}. {result['country'].upper()}\n\n"
# Core Analysis
markdown += f"- **Sentiment**: {result['sentiment']}\n"
markdown += f"- **Confidence**: {result['confidence']:.1f}%\n"
markdown += f"- **Intensity**: {result['intensity']}\n"
markdown += f"- **Data Quality**: {result['data_quality']}\n"
markdown += f"- **Search Strategy**: {result['search_strategy']}\n"
markdown += f"- **Sources**: {result['sources_analyzed']}\n"
markdown += f"- **Emotions**: {', '.join(result['emotions'])}\n"
markdown += f"- **Themes**: {', '.join(result['themes'])}\n"
if result.get("bias_indicators"):
markdown += (
f"- **Bias Indicators**: {', '.join(result['bias_indicators'])}\n"
)
markdown += f"- **Summary**: {result['summary']}\n"
markdown += "\n"
# Global insights
markdown += "## Global Insights\n\n"
sentiments = [r["sentiment"] for r in results]
sentiment_counts = {s: sentiments.count(s) for s in set(sentiments)}
markdown += "### Sentiment Distribution\n\n"
for sentiment, count in sentiment_counts.items():
markdown += f"- **{sentiment}**: {count} countries\n"
avg_confidence = sum(r["confidence"] for r in results) / len(results)
markdown += f"\n### Analysis Quality\n\n"
markdown += f"- **Average Confidence**: {avg_confidence:.1f}%\n"
high_quality = sum(1 for r in results if r["data_quality"] == "high")
markdown += f"- **High Quality Analyses**: {high_quality}/{len(results)}\n"
markdown += "\n---\n*Analysis powered by DSPy Framework with optimized prompts (Multi-Model Version)*\n"
markdown += f"*Models: {CONFIG.fast_model} (fast tasks) + {CONFIG.reasoning_model} (reasoning)*\n"
return markdown
def forward(self, topic: str, country: CountryConfig) -> Dict[str, Any]:
"""
Main agent execution: Complete Perception-Reasoning-Action cycle
Executes full PRA sentiment analysis pipeline for a single country
Returns comprehensive results with action insights and recommendations
"""
start_time = time.time()
try:
# PERCEPTION PHASE
perception, search_data = self.perceive(topic, country)
# REASONING PHASE
analysis = self.reason(topic, country.name, search_data)
execution_time = (time.time() - start_time) * 1000
# Prepare base result
base_result = {
"country": country.name,
"topic": topic,
"sentiment": analysis.sentiment,
"emotions": analysis.emotions,
"themes": analysis.themes,
"intensity": analysis.intensity,
"confidence": analysis.confidence,
"data_quality": analysis.data_quality,
"bias_indicators": analysis.bias_indicators,
"summary": analysis.summary,
"sources_analyzed": perception.sources_count,
"search_strategy": perception.search_strategy,
"execution_time_ms": execution_time,
"timestamp": perception.timestamp,
"fast_model": CONFIG.fast_model,
"reasoning_model": CONFIG.reasoning_model,
}
# Mark PRA cycle as complete
base_result.update(
{
"pra_cycle_complete": True,
}
)
return base_result
except Exception as e:
execution_time = (time.time() - start_time) * 1000
logging.error(f"PRA CYCLE FAILED for {country.name}: {e}")
return {
"country": country.name,
"topic": topic,
"sentiment": "Neutral",
"emotions": ["error"],
"themes": ["Analysis failed"],
"intensity": "mild",
"confidence": 0.0,
"data_quality": "low",
"bias_indicators": ["agent_error"],
"summary": f"PRA cycle failed: {str(e)}",
"sources_analyzed": 0,
"search_strategy": "failed",
"execution_time_ms": execution_time,
"fast_model": CONFIG.fast_model,
"reasoning_model": "error-fallback",
"pra_cycle_complete": False,
"error": str(e),
}
def _merge_search_results(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Data aggregation utility: Combines multiple search result sets
Merges results from different search angles while removing duplicates
Ensures comprehensive data coverage for sentiment analysis
"""
merged = {"organic": [], "news": [], "people_also_ask": []}
for result in results:
if "organic" in result:
merged["organic"].extend(result["organic"])
if "news" in result:
merged["news"].extend(result["news"])
if "people_also_ask" in result:
merged["people_also_ask"].extend(result["people_also_ask"])
# Remove duplicates based on links
seen_links = set()
unique_organic = []
for item in merged["organic"]:
link = item.get("link", "")
if link not in seen_links:
seen_links.add(link)
unique_organic.append(item)
merged["organic"] = unique_organic
return merged
# ============================================================================
# PARALLEL EXECUTION
# ============================================================================
class ParallelSentimentAnalyzer(dspy.Module):
"""
System orchestrator: Manages parallel sentiment analysis across multiple countries
Coordinates concurrent execution using ThreadPoolExecutor for performance
Handles error recovery and result aggregation for production reliability
"""
def __init__(self):
super().__init__()
self.agent = SentimentAgent()
# forward() comes from PyTorch's design pattern
# could technically name it anything, but ML/AI ecosystem consistency and familiarity and so on
def forward(
self, topic: str, countries: List[CountryConfig]
) -> List[Dict[str, Any]]:
"""
Parallel execution manager: Runs sentiment analysis across all target countries
Uses ThreadPoolExecutor for concurrent processing and graceful error handling
Returns aggregated results from all successful country analyses
"""
logging.info(f"PARALLEL EXECUTION: Processing {len(countries)} countries")
results = []
with ThreadPoolExecutor(max_workers=CONFIG.max_threads) as executor:
# Submit all tasks
future_to_country = {
executor.submit(self.agent, topic, country): country
for country in countries
}
# Collect results as they complete
for future in as_completed(future_to_country):
country = future_to_country[future]
try:
result = future.result()
results.append(result)
logging.info(f"Completed: {country.name}")
except Exception as e:
logging.error(f"Failed: {country.name} - {e}")
# Add fallback result
results.append(
{
"country": country.name,
"topic": topic,
"sentiment": "Neutral",
"confidence": 0.0,
"error": str(e),
}
)
# ACTION PHASE: Generate report with all results
if results:
action_result = self.agent.act(results, topic)
logging.info(
f"ACTION PHASE: {action_result.get('action_type', 'unknown')} - {'Complete' if action_result.get('action_complete') else 'Failed'}"
)
return results
# ============================================================================
# ACTION PHASE: REPORTING AND OUTPUT
# ============================================================================
def generate_markdown_report(results: List[Dict[str, Any]], topic: str) -> str:
"""
Report generator: Creates comprehensive markdown analysis report
Formats sentiment analysis results into readable business intelligence format
Includes executive summary, detailed country analysis, and global insights
"""
timestamp = datetime.now().strftime("%Y-%m-%d at %H:%M:%S")
markdown = f'# Global Sentiment Analysis: "{topic}"\n\n'
markdown += f"*Generated on: {timestamp}*\n"
markdown += f"*Powered by: DSPy Framework + Bright Data (Multi-Model Version)*\n"
markdown += f"*Fast Model (Decomposition/Optimization): {CONFIG.fast_model}*\n"
markdown += f"*Reasoning Model (Sentiment Analysis): {CONFIG.reasoning_model}*\n\n"
# Summary table
markdown += "## Executive Summary\n\n"
markdown += (
"| Country | Sentiment | Intensity | Confidence | Strategy | Key Themes |\n"
)
markdown += (
"|---------|-----------|-----------|------------|----------|------------|\n"
)
for result in results:
themes = ", ".join(result.get("themes", [])[:2])
markdown += (
f"| {result['country']} | {result['sentiment']} | "
f"{result['intensity']} | {result['confidence']:.1f}% | "
f"{result['search_strategy']} | {themes} |\n"
)
# Detailed analysis
markdown += "\n## Detailed Country Analysis\n\n"
for i, result in enumerate(results, 1):
markdown += f"### {i}. {result['country'].upper()}\n\n"
# Core Analysis
markdown += f"- **Sentiment**: {result['sentiment']}\n"
markdown += f"- **Confidence**: {result['confidence']:.1f}%\n"
markdown += f"- **Intensity**: {result['intensity']}\n"
markdown += f"- **Data Quality**: {result['data_quality']}\n"
markdown += f"- **Search Strategy**: {result['search_strategy']}\n"
markdown += f"- **Sources**: {result['sources_analyzed']}\n"
markdown += f"- **Emotions**: {', '.join(result['emotions'])}\n"
markdown += f"- **Themes**: {', '.join(result['themes'])}\n"
if result.get("bias_indicators"):
markdown += (
f"- **Bias Indicators**: {', '.join(result['bias_indicators'])}\n"
)
markdown += f"- **Summary**: {result['summary']}\n"
markdown += "\n"
# Global insights
markdown += "## Global Insights\n\n"
sentiments = [r["sentiment"] for r in results]
sentiment_counts = {s: sentiments.count(s) for s in set(sentiments)}
markdown += "### Sentiment Distribution\n\n"
for sentiment, count in sentiment_counts.items():
markdown += f"- **{sentiment}**: {count} countries\n"
avg_confidence = sum(r["confidence"] for r in results) / len(results)
markdown += f"\n### Analysis Quality\n\n"
markdown += f"- **Average Confidence**: {avg_confidence:.1f}%\n"
high_quality = sum(1 for r in results if r["data_quality"] == "high")
markdown += f"- **High Quality Analyses**: {high_quality}/{len(results)}\n"
markdown += "\n---\n*Analysis powered by DSPy Framework with optimized prompts (Multi-Model Version)*\n"
markdown += f"*Models: {CONFIG.fast_model} (fast tasks) + {CONFIG.reasoning_model} (reasoning)*\n"
return markdown
def save_results(data: Any, filename: str, data_type: str = "file") -> Optional[Path]:
"""
File management utility: Saves analysis results with timestamped filenames
Handles both JSON data and text content with proper encoding
Creates organized data directory structure for result tracking
"""
try:
CONFIG.data_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime("%Y-%m-%dT%H-%M-%S-%f")[:-3] + "Z"
full_filename = f"{timestamp}_{filename}"
filepath = CONFIG.data_dir / full_filename
content = (
data if isinstance(data, str) else json.dumps(data, indent=2, default=str)
)
with open(filepath, "w", encoding="utf-8") as f:
f.write(content)
logging.info(f"{data_type} saved to: {filepath}")
return filepath
except Exception as e:
logging.error(f"Failed to save {data_type}: {e}")
return None
def display_results(results: List[Dict[str, Any]], topic: str):
"""
Creates readable table format with key metrics and insights
Provides immediate feedback during development and testing
"""
print("\n" + "=" * 80)
print(f'GLOBAL SENTIMENT ANALYSIS: "{topic}" (PRA DSPy VERSION)')
print(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(
f"Fast Model: {CONFIG.fast_model} | Reasoning Model: {CONFIG.reasoning_model}"
)
print("=" * 80)
# Summary table
print("\nCOUNTRY COMPARISON:")
print("-" * 80)
print(
f"{'Country':<12} {'Sentiment':<12} {'Intensity':<10} {'Confidence':<11} {'Sources':<8}"
)
print("-" * 80)
for result in results:
sources = result.get("sources_analyzed", 0)
print(
f"{result['country']:<12} {result['sentiment']:<12} {result['intensity']:<10} "
f"{result['confidence']:>6.1f}% {sources:<8}"
)
print("-" * 80)
# Analysis summary
pra_complete = sum(1 for r in results if r.get("pra_cycle_complete", False))
print(f"\nANALYSIS RESULTS:")
print(f"Complete Analyses: {pra_complete}/{len(results)}")
print(
f"Average Confidence: {sum(r['confidence'] for r in results) / len(results):.1f}%"
)
print("=" * 80)
# ============================================================================
# MAIN EXECUTION FUNCTION
# ============================================================================
async def main():
"""
Application entry point: Orchestrates complete sentiment analysis pipeline
Configures DSPy framework, initializes multi-model setup, and executes analysis
Handles logging, optimization, and result generation for production deployment
"""
# Setup logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
print("Advanced AI Sentiment Analysis Agent with DSPy (PRA ARCHITECTURE)")
print("Architecture: Complete Perception-Reasoning-Action (PRA) + DSPy")
print(
"Strategy: Fast model for planning, Reasoning model for analysis, Action for insights"
)
print("=" * 80)
try:
# DSPy configuration using official patterns with multiple models
if not os.getenv("OPENAI_API_KEY"):
raise ValueError("OPENAI_API_KEY environment variable required")
# Use official DSPy LM configuration pattern with fast model as default
default_lm = dspy.LM(model=CONFIG.fast_model) # Default to fast model
dspy.configure(lm=default_lm) # Official configuration method
print(f"Topic: {CONFIG.topic}")
print(f"Countries: {', '.join(c.name for c in CONFIG.countries)}")
print(f"Fast Model (Decomposition/Optimization): {CONFIG.fast_model}")
print(f"Reasoning Model (Sentiment Analysis): {CONFIG.reasoning_model}")
print("-" * 80)
start_time = time.time()
# Create agent (no optimization)
agent = ParallelSentimentAnalyzer()
# Kick off agent flow
results = agent(topic=CONFIG.topic, countries=CONFIG.countries)
# Filter successful results
successful_results = [r for r in results if "error" not in r]
total_time = time.time() - start_time
if successful_results:
# Display results
display_results(successful_results, CONFIG.topic)
print(f"\nSUCCESS: Analysis completed with multi-model DSPy patterns!")
print(f"Total time: {total_time:.2f}s")
print(f"Success rate: {len(successful_results)}/{len(CONFIG.countries)}")
print(
f"Average confidence: {sum(r['confidence'] for r in successful_results)/len(successful_results):.1f}%"
)
else:
print("No successful analyses completed")
except Exception as e:
logging.error(f"FATAL ERROR: {e}")
print(f"\nFatal error: {e}")
return 1
return 0
if __name__ == "__main__":
import sys
# Allow command line topic override
if len(sys.argv) > 1:
CONFIG.topic = sys.argv[1]
# Run async main
exit_code = asyncio.run(main())
sys.exit(exit_code)
Across all countries, one pattern held true: sentiment isn’t binary. It’s driven by cultural context, legal frameworks, and media ecosystems. This is another benefit of the architecture of our agent. Our structured, schema-constrained analysis helped surface these nuances without flattening them.
Lessons Learned: What I’d Do Differently
So this agent was a structured, scalable way to compare public sentiment across countries — in a way that’s grounded in real-time web data, not guesswork or model hallucination.
DSPy gave me traceable reasoning steps without prompt spaghetti. And Bright Data’s SERP API handled the messy stuff — rotating IPs, geo-targeting, pagination — so I could focus on building a solid AI agent pipeline, not debugging Google’s latest experiment.
Still, a few things stood out during implementation, and here’s what I’d improve if I rebuilt it tomorrow (or had more time on my hands 😅):
1. Make decomposition use a public knowledge graph (e.g., ConceptNet or Wikidata)
Right now, topic decomposition relies entirely on LLM-generated search angles. I’d like to experiment with a structured knowledge base like ConceptNet or Wikidata to find real-world links around the topic — like related entities, concepts, or controversies. For example, if the topic is “TikTok”, it might surface terms like “ByteDance”, “data collection”, or “youth culture”. We can feed these into the LLM to steer it away from hallucinations and toward grounded, even more country-aware search angles.
2. Cache + score all search data with retrieval metrics
The agent currently throws away unused SERP results after inference. That’s cool because we’re aiming for sub-30 second analyses, but it is wasteful. I’d cache them and implement a scoring pass — measuring novelty (vs. existing country-topic runs), source diversity, and redundancy.
3. Run offline DSPy optimizations
DSPy lets you optimize your prompts with battle-tested algorithms, but I didn’t use them because a) this is only a tutorial, b) my Signatures are already well-designed, c) dspy.ChainOfThought() already provides reasoning, and d) my literal types and detailed descriptions provide strong output constraints. But for production use, I’d build a dataset of labeled search results and sentiment annotations (even via weak supervision), then run offline DSPy optimization passes (BootstrapFewShot, MIPRO, etc.) with real eval feedback. That should theoretically harden the agent against prompt collapse or overfitting on small inputs.
4. Post-process bias detection using rules + embeddings
Right now, bias_indicators is LLM-generated. In production, I’d back it with rule-based heuristics: e.g., flag outlets with extreme sentiment lexicons, or use sentence-level stance detection to tag articles with unbalanced coverage. Could also use retrieval-based checks, now that I think about it. Something like: “Does this article cite opposing views?” — and if not, mark it. But perhaps this requires more scraping, not just SERP results.
5. Add fallback paths for weak data quality
Some countries had low-quality data (e.g., limited coverage, low diversity). Rather than fail silently, I’d implement an agent fallback: switch to multilingual queries, broaden time ranges, or pull from secondary sources (Reddit, YouTube transcripts, etc.). That way, coverage doesn’t collapse just because Google is shallow in a region.
This will blow up your SERP calls, though, so bear that in mind.
This setup is a great base to work with. It’s fast, structured, already production-minded — and costs less than $2 to run the whole thing. It was very fun to build.