A lot of my data analysis work is forensic (example here). I’ll pull repeated snapshots — content, traffic, SERPs, metadata — and look at patterns across hundreds of these ingestions. So naturally, bad batches will skew results in ways that are just convincing enough so I don’t catch it. Disastrous.
You can add try-catches, but a 200 OK from your API with empty titles and duplicate rows is still considered a success on the wire unless you model validation as its own failure path. So is there a better way?
This is exactly the gap Great Expectations (GX Core) fills. It’s an open-source Python library for defining declarative quality rules on your data — “quality gates”, explicit rules your data must pass before it’s trusted — things like “this field must never be null”, “these IDs must be unique”, or “this value must fall within a known range”. You basically codify what “good” looks like, run it as a validation step in your pipeline, and get a clear pass/fail on every batch — before bad data ever touches your analysis.
Let’s turn that idea into code — we’ll get data at scale via Bright Data (a SERP API), run a GX suite over each batch, then store it in DuckDB (a file-backed analytical SQL database that’s just a .duckdb file on disk) clean rows to the main table, failed batches to quarantine with enough context to debug.
The Setup
Here’s what you need:
requests>=2.28.0
python-dotenv>=1.0.0
duckdb>=1.0.0
pandas>=2.0.0
psutil>=5.9.0
great-expectations>=1.0.0
Note that Python 3.10+ is the floor for Great Expectations 1.x with current pandas / duckdb wheels.
The important ones:
- great-expectations — the quality gates: expectations on each batch, clear pass/fail and reports. All we need is GX Core.
- duckdb — Doesn't need a server to run.
- requests —For HTTP to Bright Data’s POST /request endpoint (your zone must be a SERP zone).
Before running pip install -r requirements.txt, create a .env file with:
BRIGHT_DATA_API_KEY=your_api_key
BRIGHT_DATA_ZONE=serp # or your SERP zone name from the Bright Data dashboard
BRIGHT_DATA_COUNTRY=us # optional
The Bright Data client reads these when you run the pipeline. You need a SERP-capable zone — without one, POST /request will not match what this code expects. Proxy rotation and unblocking stay on Bright Data’s side.
How Does the Pipeline Actually Work?
This diagram should make it clear.

bright_data.py # Bright Data API client
serp_expectations.py # Great Expectations validation suite
duckdb_store.py # DuckDB schema + insert/quarantine logic
ingest.py # Pipeline: fetch → validate → store or quarantine
The flow is straightforward: one query equals one batch. For each batch, we check things in a strict order before anything touches the database.
Step 1: Fetching Structured SERP Data with Bright Data
Our API client will just be a thin wrapper around Bright Data’s POST /request endpoint.
bright_data.py
import json
import os
from dataclasses import dataclass
from pathlib import Path
import requests
from dotenv import load_dotenv
_GX_ROOT = Path(__file__).resolve().parent
load_dotenv(_GX_ROOT / ".env")
def normalize_serp_payload(raw):
"""Bright Data may wrap JSON in a string body."""
if isinstance(raw, dict) and "body" in raw:
body = raw["body"]
if isinstance(body, str):
return json.loads(body)
if isinstance(body, dict):
return body
return raw
@dataclass
class SerpApiResponse:
"""Bright Data POST /request: HTTP status plus parsed JSON body (if any)."""
status_code: int
data: dict
class BrightDataClient:
"""SERP API zone client; uses https://api.brightdata.com/request"""
def __init__(self, api_key=None, zone=None, country=None):
self.api_key = api_key or os.getenv("BRIGHT_DATA_API_KEY")
self.zone = zone or os.getenv("BRIGHT_DATA_ZONE")
self.country = country or os.getenv("BRIGHT_DATA_COUNTRY")
self.api_endpoint = "https://api.brightdata.com/request"
if not self.api_key:
raise ValueError("BRIGHT_DATA_API_KEY must be set in the environment or constructor.")
if not self.zone:
raise ValueError("BRIGHT_DATA_ZONE must be set in the environment or constructor.")
self.session = requests.Session()
self.session.headers.update(
{
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
}
)
def search(self, query, num_results=10, language=None, country=None):
"""Raises on non-200 or network error."""
serp = self.search_with_status(query, num_results, language, country)
if serp.status_code != 200:
raise RuntimeError(
f"Search request failed with HTTP {serp.status_code}: {serp.data!r}"[:500]
)
return normalize_serp_payload(serp.data)
def search_with_status(self, query, num_results=10, language=None, country=None):
"""Does not raise on non-200 — for ingest + quarantine routing."""
search_url = (
f"https://www.google.com/search"
f"?q={requests.utils.quote(query)}"
f"&num={num_results}"
f"&brd_json=1"
)
if language:
search_url += f"&hl={language}&lr=lang_{language}"
target_country = country or self.country
payload = {"zone": self.zone, "url": search_url, "format": "json"}
if target_country:
payload["country"] = target_country
try:
response = self.session.post(self.api_endpoint, json=payload, timeout=30)
except requests.exceptions.RequestException as e:
return SerpApiResponse(status_code=0, data={"_error": str(e)})
data = _parse_response_body(response)
return SerpApiResponse(status_code=response.status_code, data=data)
def _parse_response_body(response):
if not response.text:
return {}
try:
return response.json()
except ValueError:
return {"_non_json_body": response.text[:8000]}
Some things to note here:
- The
search_with_statusdoes not raise on non-200so ingest can quarantine API failures - Transport errors are caught so you get
status_code=0and the sameapi_errorpath as HTTP failures. - Also,
search()raises on failure for callers that want exceptions. - And finally,
normalize_serp_payloadunwraps responses just in case your API puts JSON under a stringbody.
Step 2: What Happens to Data That Doesn’t Pass?
Every batch in this pipeline has two possible destinations in DuckDB:
- If it clears all our defined quality gates, it goes to
serp_results. - If it doesn't — wrong HTTP status, empty organic list, or a failed GX expectation — it lands in
serp_quarantineinstead, with enough context to understand why.
Let's build that store before we wire up the gates.
duckdb_store.py
import hashlib
import json
import os
from datetime import datetime
import duckdb
import pandas as pd
import psutil
class SerpStore:
"""serp_results + serp_quarantine"""
def __init__(self, db_path, memory_limit=None):
parent = os.path.dirname(db_path)
if parent:
os.makedirs(parent, exist_ok=True)
self.db_path = db_path
self.conn = duckdb.connect(db_path)
if memory_limit:
self.conn.execute(f"SET memory_limit='{memory_limit}'")
else:
available_memory = psutil.virtual_memory().available
memory_gb = int(available_memory / (1024**3) * 0.8)
self.conn.execute(f"SET memory_limit='{memory_gb}GB'")
self._create_schema()
def _create_schema(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS serp_results (
id BIGINT PRIMARY KEY,
query TEXT NOT NULL,
timestamp TIMESTAMP NOT NULL,
result_position INTEGER NOT NULL,
title TEXT,
url TEXT,
snippet TEXT,
domain TEXT,
rank INTEGER,
previous_rank INTEGER,
rank_delta INTEGER
)
""")
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_query ON serp_results(query)")
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_domain ON serp_results(domain)")
self.conn.execute("""
CREATE TABLE IF NOT EXISTS serp_quarantine (
query TEXT,
timestamp TIMESTAMP,
reason TEXT,
organic_count INTEGER,
payload_hash TEXT,
http_status INTEGER,
raw_json JSON
)
""")
@staticmethod
def _payload_hash(payload):
canonical = json.dumps(payload, sort_keys=True, default=str)
return hashlib.sha256(canonical.encode()).hexdigest()
def insert_quarantine( self,
query,
reason,
organic_count,
raw_json,
http_status,
timestamp=None, ):
if timestamp is None:
timestamp = datetime.now()
payload_hash = self._payload_hash(raw_json)
self.conn.execute(
"""
INSERT INTO serp_quarantine
(query, timestamp, reason, organic_count, payload_hash, http_status, raw_json)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
[
query,
timestamp,
reason,
organic_count,
payload_hash,
http_status,
json.dumps(raw_json, default=str),
],
)
def insert_batch(self, vdf, query, timestamp=None):
"""Insert from the GX validation frame (output of organic_to_validation_df).
Store what we validated: normalized url, derived domain, snippet, and positional rank —
not a second parse from raw organic dicts (avoids drift vs Great Expectations).
"""
if timestamp is None:
timestamp = datetime.now()
if vdf.empty:
return
max_id_result = self.conn.execute("SELECT COALESCE(MAX(id), 0) FROM serp_results").fetchone()
next_id = (max_id_result[0] if max_id_result else 0) + 1
rows = []
for idx, (_, row) in enumerate(vdf.iterrows()):
title = "" if pd.isna(row["title"]) else str(row["title"])
url = "" if pd.isna(row["url"]) else str(row["url"])
snippet = "" if pd.isna(row["snippet"]) else str(row["snippet"])
domain = "" if pd.isna(row["domain"]) else str(row["domain"])
rank_val = int(row["rank"])
rows.append(
{
"id": next_id + idx,
"query": query,
"timestamp": timestamp,
"result_position": idx + 1,
"title": title,
"url": url,
"snippet": snippet,
"domain": domain,
"rank": rank_val,
"previous_rank": None,
"rank_delta": None,
}
)
df = pd.DataFrame(rows)
self.conn.execute("""
INSERT INTO serp_results (id, query, timestamp, result_position, title, url, snippet, domain, rank, previous_rank, rank_delta)
SELECT id, query, timestamp, result_position, title, url, snippet, domain, rank, previous_rank, rank_delta FROM df
""")
def get_row_count(self):
result = self.conn.execute("SELECT COUNT(*) FROM serp_results").fetchone()
return result[0] if result else 0
def close(self):
self.conn.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
What’s happening here?
- The
reasonfield is an enum-like string:api_error,organic_empty, orvalidation_failed. - The
payload_hashis a SHA-256 of the canonical JSON — so you can tell if the same bad payload keeps coming back (very useful!) - Each row stores a JSON blob in
raw_json: forapi_errorandorganic_emptyit is the normalized SERP dict; forvalidation_failedit is{"serp":[normalized serp here], "gx_validation":[GX result dict here]}so you still have both the SERP payload and the failing expectation details. Every bad batch lands inserp_quarantinewith a paper trail instead of silently disappearing or, worse, silently making it intoserp_results. When a batch passes all gates,insert_batchwrites one row per organic result from the validated DataFrame — same normalized fields GX validated.previous_rank/rank_deltaare reserved for later rank-tracking; they stay null on first ingest.
Step 3: The Gate Order — Why Sequence Matters
Bright Data's parsed SERP JSON path (request URL includes brd_json=1 or, in their docs, brd_json=json) returns the same kind of structured object you see in their examples and in real dumps — organic, general, input, and so on — not a page of HTML you parse yourself.
So when Google can't be reached or nothing usable is extracted, that tends to show up as a non-200 from POST /request, a network error (we treat as api_error), or an empty organic array —and not as a captcha HTML document mistaken for a normal organic list. So the gates to implement are exactly three:
- non-
200, - empty
organic, - then GX on whatever is left.
ingest.py
import json
from datetime import datetime, timezone
from pathlib import Path
_ROOT = Path(__file__).resolve().parent
from bright_data import BrightDataClient, normalize_serp_payload
from duckdb_store import SerpStore
from serp_expectations import organic_to_validation_df, validate_organic_batch
def _write_validation_report(report_entries):
reports_dir = _ROOT / "data" / "reports"
reports_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
path = reports_dir / f"validation_{ts}.json"
doc = {
"generated_at_utc": ts,
"batches": report_entries,
}
path.write_text(json.dumps(doc, indent=2, default=str), encoding="utf-8")
return path
def ingest_live( queries,
*,
db_path=None,
num_results=10,
memory_limit="2GB",
serp_json_out=None,):
"""
One batch = one query → one SERP response.
Order: api_error → organic_empty → GX validate → insert or validation_failed quarantine.
"""
path = db_path or str(_ROOT / "data" / "serp_gx.duckdb")
client = BrightDataClient()
total = 0
batches_total = 0
empty_batches = 0
api_error_batches = 0
validation_failed_batches = 0
report_entries = []
serp_dump_batches = []
with SerpStore(path, memory_limit=memory_limit) as store:
for q in queries:
batches_total += 1
serp = client.search_with_status(q, num_results=num_results)
raw = normalize_serp_payload(serp.data)
if serp_json_out:
serp_dump_batches.append(
{
"query": q,
"http_status": serp.status_code,
"normalized_serp": raw,
}
)
organic = raw.get("organic") or []
organic_count = len(organic)
# Optional (not implemented): best-effort catch-all for an explicit blocked API response
# (e.g. substring-match json.dumps(raw) for "captcha" / "unusual traffic") — just in case.
# Not needed here, only include if the API you're using can error out like that
if serp.status_code != 200:
api_error_batches += 1
store.insert_quarantine(q, "api_error", organic_count, raw, serp.status_code)
report_entries.append(
{
"query": q,
"outcome": "api_error",
"http_status": serp.status_code,
"organic_count": organic_count,
"gx": None,
}
)
continue
if not organic:
empty_batches += 1
store.insert_quarantine(q, "organic_empty", 0, raw, serp.status_code)
report_entries.append(
{
"query": q,
"outcome": "organic_empty",
"http_status": serp.status_code,
"organic_count": 0,
"gx": None,
}
)
continue
vdf = organic_to_validation_df(organic)
gx_ok, gx_payload = validate_organic_batch(vdf, num_results=num_results)
if not gx_ok:
validation_failed_batches += 1
store.insert_quarantine(
q,
"validation_failed",
organic_count,
{"serp": raw, "gx_validation": gx_payload},
serp.status_code,
)
report_entries.append(
{
"query": q,
"outcome": "validation_failed",
"http_status": serp.status_code,
"organic_count": organic_count,
"gx": gx_payload,
}
)
continue
# Persist the validated DataFrame so DuckDB gets the same normalized url/domain/rank GX checked.
store.insert_batch(vdf, q)
total += len(vdf)
report_entries.append(
{
"query": q,
"outcome": "inserted",
"http_status": serp.status_code,
"organic_count": organic_count,
"gx": gx_payload,
}
)
row_count = store.get_row_count()
report_path = _write_validation_report(report_entries)
stats = {
"batches_total": batches_total,
"empty_batches": empty_batches,
"api_error_batches": api_error_batches,
"validation_failed_batches": validation_failed_batches,
"empty_rate": (empty_batches / batches_total) if batches_total else 0.0,
"api_error_rate": (api_error_batches / batches_total) if batches_total else 0.0,
"validation_failed_rate": (validation_failed_batches / batches_total)
if batches_total
else 0.0,
"rows_ingested": total,
"serp_results_row_count": row_count,
"db_path": path,
"validation_report_path": str(report_path),
}
if serp_json_out:
out_path = Path(serp_json_out)
out_path.parent.mkdir(parents=True, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
doc = {
"generated_at_utc": ts,
"num_results": num_results,
"batches": serp_dump_batches,
}
out_path.write_text(json.dumps(doc, indent=2, default=str), encoding="utf-8")
stats["serp_json_path"] = str(out_path.resolve())
return stats
def main():
import argparse
p = argparse.ArgumentParser(
description="gx: Bright Data SERP → Great Expectations → DuckDB (fail fast on GX failure)",
)
p.add_argument(
"queries",
nargs="*",
default=[
"inference engineering",
"python duckdb",
"Great Expectations data validation",
"machine learning",
"opentelemetry how to",
],
help\="Search queries (default: five example queries if none given)",
)
p.add_argument("--num-results", type\=int, default=10)
p.add_argument("--db", type\=str, default=None, help\="DuckDB file path")
p.add_argument(
"--save-serp-json",
type\=str,
default=None,
metavar="PATH",
help\="Write normalized SERP (+ query, http_status) per batch to this JSON file.",
)
args = p.parse_args()
out = ingest_live(
args.queries,
db_path=args.db,
num_results=args.num_results,
serp_json_out=args.save_serp_json,
)
print(json.dumps(out, indent=2))
if __name__ == "__main__":
main()
The main loop enforces that order we talked about: api_error and organic_empty are cheap pre-checks that catch the most common failure modes without spinning up GX at all.
In fact, GX only runs when there's actual data to validate. store.insert_batch(vdf, q) passes the validated DataFrame, not the original raw organic list — what goes into DuckDB is exactly what GX checked, with the same URL normalization applied.
Step 4: What Quality Gates Should You Put on SERP Data?
To reiterate, when we say “Quality gates”, we mean explicit Great Expectations rules on the DataFrame we validate: each “expectation” is simply one condition the batch must satisfy before you trust it for analytics.
For this pipeline we normalize the organic list in our API response into a small schema — url, title, snippet, domain, and rank — and run the suite on that frame only.
serp_expectations.py
import uuid
from urllib.parse import urlparse
import great_expectations as gx
import pandas as pd
from great_expectations.data_context.types.base import ProgressBarsConfig
# helpers
def _extract_domain(url):
if not url:
return ""
return urlparse(url).netloc.replace("www.", "")
def _coerce_rank(raw_rank, fallback):
if raw_rank is None:
return fallback
try:
return int(float(raw_rank))
except (TypeError, ValueError):
return fallback
def _normalize_url(url):
if not url:
return url
try:
parsed = urlparse(url)
normalised = parsed._replace(
scheme=parsed.scheme.lower(),
fragment="",
)
return normalised.geturl()
except Exception:
return url
# before anything, normalize the data
def organic_to_validation_df(organic):
rows = []
for i, r in enumerate(organic):
raw_url = (r.get("url") or r.get("link") or "").strip()
url = _normalize_url(raw_url)
title = (r.get("title") or "").strip()
snippet = (r.get("snippet") or r.get("description") or "").strip()
domain = _extract_domain(url)
raw_rank = r.get("rank")
rank = _coerce_rank(raw_rank, fallback=i + 1)
rows.append(
{
"url": url,
"title": title,
"snippet": snippet,
"domain": domain,
"rank": rank,
}
)
return pd.DataFrame(rows)
# now add the actual expectations
def build_serp_expectation_suite(num_results):
_meta = {"notes": "SERP organic batch gates for Bright Data brd_json=1 payloads."}
return gx.ExpectationSuite(
name="serp_organic_batch",
expectations=[
gx.expectations.ExpectColumnValuesToNotBeNull(
column="url",
meta=_meta,
),
gx.expectations.ExpectColumnValuesToNotBeNull(
column="title",
meta=_meta,
),
gx.expectations.ExpectColumnValuesToMatchRegex(
column="url",
regex=r"^https?://\\S+",
meta=_meta,
),
gx.expectations.ExpectColumnValuesToBeUnique(
column="url",
meta=_meta,
),
gx.expectations.ExpectTableRowCountToBeBetween(
min_value=1,
max_value=num_results,
meta=_meta,
),
gx.expectations.ExpectColumnValueLengthsToBeBetween(
column="title",
min_value=1,
max_value=500,
meta=_meta,
),
gx.expectations.ExpectColumnValuesToMatchRegex(
column="title",
regex=r"\\S",
meta=_meta,
),
gx.expectations.ExpectColumnValueLengthsToBeBetween(
column="domain",
min_value=1,
max_value=253,
meta=_meta,
),
gx.expectations.ExpectColumnValuesToBeBetween(
column="rank",
min_value=1,
max_value=float(num_results),
meta=_meta,
),
gx.expectations.ExpectColumnValuesToNotBeNull(
column="snippet",
mostly=0.8,
meta=_meta,
),
],
)
# gx in ephemeral mode
def validate_organic_batch(df, *, num_results):
if df.empty:
return False, {
"success": False,
"statistics": {
"evaluated_expectations": 0,
"successful_expectations": 0,
"unsuccessful_expectations": 0,
},
"results": [],
"exception_message": "validation_frame_empty",
}
context = gx.get_context(mode="ephemeral")
context.variables.progress_bars = ProgressBarsConfig(
globally=False,
metric_calculations=False,
)
data_source = context.data_sources.add_pandas(f"serp_{uuid.uuid4().hex[:12]}")
asset = data_source.add_dataframe_asset(name="organic_batch")
batch_definition = asset.add_batch_definition_whole_dataframe("whole")
batch = batch_definition.get_batch(batch_parameters={"dataframe": df})
suite = build_serp_expectation_suite(num_results=num_results)
result = batch.validate(suite)
payload = result.to_json_dict()
return bool(result.success), payload
Let’s walk through the decisions we made here, because they’re not arbitrary.
Normalize Before You Validate — Never Inside the Validation Layer
Before anything in GX runs, organic_to_validation_df normalizes the data:
def _normalize_url(url: str) -> str:
"""Lowercase scheme, strip fragment."""
parsed = urlparse(url)
return parsed._replace(scheme=parsed.scheme.lower(), fragment="").geturl()
def organic_to_validation_df(organic: List[Dict[str, Any]]) -> pd.DataFrame:
rows = []
for i, r in enumerate(organic):
raw_url = (r.get("url") or r.get("link") or "").strip()
url = _normalize_url(raw_url)
title = (r.get("title") or "").strip()
snippet = (r.get("snippet") or r.get("description") or "").strip()
domain = _extract_domain(url)
rank = _coerce_rank(r.get("rank"), fallback=i + 1)
rows.append({"url": url, "title": title, "snippet": snippet, "domain": domain, "rank": rank})
return pd.DataFrame(rows)
A few things happening here:
- Bright Data’s
brd_json=1responses uselinkas the key, noturl. We normalize this before GX sees it. You’ve probably used such remappings plenty of times, for most API responses. - Fragments are stripped and scheme is lowercased before the uniqueness check — so
https://example.com/page#sectionandhttps://example.com/pagedon't pass as different URLs. _coerce_rankhandles the fact that rank values from APIs can come back asint,float, numpy scalars, or even strings. Coerce to a consistent type before GX, not inside an expectation.
Ephemeral mode
The [mode="ephemeral"] simply means no config files, no persisted context directory, and no great_expectations.yml — a pure in-process validator you spin up per batch. Progress bars are turned off in code when batching many runs. We do this to make GX lightweight and easier to get started with.
Null Checks Must Come Before Regex Gate
ExpectColumnValuesToNotBeNull on url and title must come before the regex and length gates. That's because GX's ExpectColumnValuesToMatchRegex silently skips null values by default — it only evaluates non-null rows. Without an explicit null check, a null URL would slip through the regex gate undetected.
Why the URL Regex Is Intentionally Loose
^https?://\S+ is deliberately not a strict RFC 3986 URL validator. Overly strict URL validation generates false positives on legitimately unusual URLs — CDN URLs, tracking URLs, URLs with encoded characters. The goal here is to catch the two actual failure modes: an empty string and a non-URL value like "N/A" or a relative path.
URL Uniqueness Catches Parse Artifacts
ExpectColumnValuesToBeUnique on url catches cases where the same URL appears twice in a single batch. That's a real thing that can happen with pagination artifacts or certain response formats. In a ranking pipeline, a duplicate URL means your rank counts are wrong.
Use a Fixed Ceiling for Rank and Row Count, Not a Self-Adjusting One
Notice that max_value=float(num_results) for the rank gate, and max_value=num_results for the row count, both use the same value — the same number we used for our SERP API. This is intentional and it matters.
A common mistake is to derive the ceiling from the batch itself:
# Don't do this
max_rank = max(num_results, int(float(vdf["rank"].max())))
Why? Well, if the API returns a result with rank=47 for a 10-result query, max_rank becomes 47 and the gate passes. You've made the gate self-adjust to whatever the data says, which means the gate can never actually fail on an out-of-range rank. Using a fixed num_results means an unexpected rank value will actually trigger a quarantine.
Optional Fields Need Soft Gates, Not Hard Ones
snippet uses mostly=0.8 rather than a hard non-null gate. That's because Google legitimately suppresses snippets for some result types — videos, certain knowledge panel entries, sitelinks. A hard non-null gate on snippet would quarantine perfectly valid batches. The mostly parameter lets you say "80% of rows must pass this check" — if more than 20% of rows have no snippet, that's a signal the response structure has changed, not normal Google behaviour.
What Does a Failed GX Validation Actually Look Like?
When a batch fails validation, the gx block in the report tells you exactly which expectation failed and what the unexpected values were. Here's a representative example — two organic rows end up with the same normalized URL (duplicate rows in the response, or two URLs that collapse to one after fragment stripping), so the uniqueness gate fires:
{
"query": "inference engineering",
"outcome": "validation_failed",
"http_status": 200,
"organic_count": 2,
"gx": {
"success": false,
"statistics": {
"evaluated_expectations": 10,
"successful_expectations": 9,
"unsuccessful_expectations": 1
},
"results": [
{
"success": false,
"expectation_config": {
"type": "expect_column_values_to_be_unique",
"kwargs": { "column": "url" }
},
"result": {
"element_count": 2,
"unexpected_count": 2,
"unexpected_percent": 100.0,
"partial_unexpected_list": [
"https://example.com/page?q=1",
"https://example.com/page?q=1"
]
}
}
]
}
}
The http_status is 200. Without the quality gate, this batch would have been inserted. The downstream join on URL would have silently doubled a result's apparent frequency in your analytics.
The Validation Report
After every run, we emit a timestamped JSON report with one entry per query:
{
"generated_at_utc": "20260326T102005Z",
"batches": [
{
"query": "inference engineering",
"outcome": "inserted",
"http_status": 200,
"organic_count": 8,
"gx": { "success": true, "statistics": { "evaluated_expectations": 10, ... } }
},
{
"query": "some broken query",
"outcome": "organic_empty",
"http_status": 200,
"organic_count": 0,
"gx": null
}
]
}
The possible outcome values are inserted, api_error, organic_empty, and validation_failed. Over time, watching the rate of each outcome per query is how you'd catch gradual degradation — a rising organic_empty rate is a signal before it becomes a crisis.
How to Run the Pipeline
Put the four modules in one package or folder on your PYTHONPATH, install dependencies (pip install -r requirements.txt), and set Bright Data credentials (BRIGHT_DATA_API_KEY, BRIGHT_DATA_ZONE, plus optional BRIGHT_DATA_COUNTRY) in a .env file next to the code or in the environment.
Our orchestrator uses argparse. So here’s some useful flags worth including, mostly for quality-of-life:
--num-results(default10),--db(override the DuckDB file path),- and optionally, I’ve found using a
--save-serp-json PATHto dump normalized SERP payloads while tuning your rules really helps.
# One query
python ingest.py "inference engineering"
# Multiple queries (defaults to five example queries if you pass none)
python ingest.py
# Example: 20 results, custom DB path
python ingest.py "python asyncio" --num-results 20 --db ./data/my_run.duckdb
On success, the script prints a small JSON summary — batch counts, rows written, paths to the DuckDB file and the validation report:
{
"batches_total": 1,
"empty_batches": 0,
"api_error_batches": 0,
"validation_failed_batches": 0,
"empty_rate": 0.0,
"api_error_rate": 0.0,
"validation_failed_rate": 0.0,
"rows_ingested": 8,
"serp_results_row_count": 8,
"db_path": "./data/serp_gx.duckdb",
"validation_report_path": "./data/reports/validation_20260326T102005Z.json"
}
That’s pretty much everything. I’ve walked you through each file. Just know ingest.py is the entry point.
Frequently Asked Questions (FAQ)
Q: Why not just write the validation logic myself?
A: You can, and for one or two checks, you probably should! The case for GX is not that something likeif not url or not url.startswith("http") is hard to write — it's that twenty of those checks scattered across your pipeline become hard to read, hard to audit, and easy to accidentally skip when you're in a hurry. GX gives you a single place where all your rules live, a consistent result structure across every check, and a report that tells you exactly which rule failed and on which values. It saves a ton of trouble in the long run.
Q: Does running GX on every batch slow the pipeline down?
A: In practice, no — not at the batch sizes typical of API calls (let’s say 10–50 rows per query). To make double sure, this is why I run GX in Ephemeral context mode (mode=”ephemeral”) — that avoids any file I/O or context persistence overhead. The validation itself is pandas operations under the hood anyway.
Q: What happens to bad data in a pipeline without validation?
A: It gets inserted and you don’t find out until something downstream breaks — or worse, until it produces wrong answers that are just plausible enough to go unnoticed. A 200 OK with duplicate rows, empty titles, or out-of-range values looks like a success on the wire. Without an explicit quality gate, that batch goes straight into your database and silently skews every query that touches it.
Q: How do I know what rules to write for my own data?
A: Sample first, then write rules. Pull a handful of real responses from your source API, inspect the fields and edge cases, then encode what “good” looks like. Rules written speculatively against an imagined schema generate false positives; rules written against real samples catch actual failure modes. Start with the fields you’d join on or aggregate in your analytics, and only add gates for everything else if you’ve seen it break.
What GX + Bright Data Gives You
So why does this pattern work reliably?
- Bright Data handles the upstream layer. Proxy rotation, bot detection, structured extraction — all abstracted behind a single API call that returns structured JSON.
- Great Expectations handles downstream trust. It doesn’t fix bad data. It measures whether incoming data meets your rules before you trust it with your analytics.
- The quarantine table (in DuckDB) gives you an audit trail. Not just “something went wrong,” but what the payload was, which expectations failed, and when. You can query the quarantine table to understand your pipeline’s health over time.
Web pipelines that don’t have explicit quality gates just gradually produce wrong answers. Catching that before it reaches your database is the whole point.
Comments
Loading comments…