How to Design Idempotent Data Loads That Survive Re-Runs
A practical guide to upsert patterns, checkpoint management, and safe re-runs in data pipelines
A pipeline that can't be safely re-run is a liability. Failures happen - network timeouts, destination API limits, deployment interruptions. When they do, you need to be able to re-run the pipeline from a checkpoint and know with confidence that the result will be correct: no duplicate records, no missing records, no orphaned partial writes.
This property is called idempotency. An idempotent operation produces the same result regardless of how many times it runs on the same input. For data pipelines, it's not optional infrastructure - it's the property that makes pipelines trustworthy in production.
This guide covers how to implement idempotent load patterns in practice, how to combine them with checkpoint management, and how to test them correctly.
Why Insert-Only Loads Break on Re-Run
The simplest load implementation is a direct insert: for each record in the batch, write a new row to the destination.
def load_simple(records, session):
for record in records:
session.add(Contact(**record))
session.commit()
This works on the first run. On a re-run after a partial failure, it creates duplicates for every record that was already successfully written. You now have two copies of the same customer, the same order, or the same event.
Detecting and cleaning duplicates after the fact is expensive and error-prone. The correct fix is to design the load step to handle re-runs correctly from the start.
Upsert: Insert or Update Based on a Unique Key
An upsert writes a record if it doesn't exist and updates it if it does, based on a unique identifier. Running an upsert twice on the same record produces one record in the destination - not two.
In PostgreSQL, the INSERT ... ON CONFLICT syntax implements this:
INSERT INTO contacts (id, email, company, updated_at)
VALUES (:id, :email, :company, :updated_at)
ON CONFLICT (id)
DO UPDATE SET
email = EXCLUDED.email,
company = EXCLUDED.company,
updated_at = EXCLUDED.updated_at;
The same thing in Python with SQLAlchemy:
from sqlalchemy.dialects.postgresql import insert
def upsert_batch(session, records):
stmt = insert(Contact).values(records)
update_cols = {
"email": stmt.excluded.email,
"company": stmt.excluded.company,
"updated_at": stmt.excluded.updated_at
}
stmt = stmt.on_conflict_do_update(
index_elements=["id"],
set_=update_cols
)
session.execute(stmt)
session.commit()
The EXCLUDED reference in the update clause means "the value that would have been inserted," which is the new incoming value. This ensures that re-running with updated data produces the correct latest state.
Choosing the Right Unique Key
The unique key determines idempotency. If two records have the same key, the upsert treats them as the same record. If the key isn't truly unique, you'll get incorrect merges.
For most business data:
- CRM contacts: The source system's record ID (
contact_idfrom Salesforce,idfrom HubSpot) is reliable and stable. - E-commerce orders: Order ID from the source system.
- Events and logs: A composite key of source, event type, and timestamp - or a hash of the record content.
Avoid using natural keys (like email address) as the sole unique key unless you're confident they're stable and unique in the source system. Email addresses change. Source system IDs generally don't.
For sources that don't provide a stable ID, compute a deterministic hash of the record's defining fields and use that as the key:
import hashlib
import json
def record_key(record):
canonical = json.dumps({k: record[k] for k in sorted(record.keys())}, sort_keys=True)
return hashlib.sha256(canonical.encode()).hexdigest()
Checkpoint Management
Idempotent loads handle the "re-run produces correct results" property. Checkpoint management handles the "re-run starts from the right place" property. Together they make re-runs both safe and efficient.
A checkpoint stores the progress of the last successful run. The simplest form is a single row in a database table:
CREATE TABLE pipeline_checkpoints (
pipeline_name TEXT PRIMARY KEY,
last_processed_at TIMESTAMPTZ NOT NULL,
last_run_id TEXT,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
def get_checkpoint(pipeline_name, session):
row = session.query(Checkpoint).filter_by(pipeline_name=pipeline_name).first()
return row.last_processed_at if row else None
def save_checkpoint(pipeline_name, high_water_mark, run_id, session):
stmt = insert(Checkpoint).values(
pipeline_name=pipeline_name,
last_processed_at=high_water_mark,
last_run_id=run_id
)
stmt = stmt.on_conflict_do_update(
index_elements=["pipeline_name"],
set_={"last_processed_at": stmt.excluded.last_processed_at, "last_run_id": stmt.excluded.last_run_id}
)
session.execute(stmt)
session.commit()
The checkpoint is saved after - not before - confirming that the batch was successfully loaded. This ordering matters: if you save the checkpoint before the load succeeds, a load failure will advance the checkpoint past records that were never written.
"Checkpoint management and idempotent loads together solve the re-run problem completely. With both in place, you can restart a failed pipeline at any point without losing data or creating duplicates. Without either one, every pipeline failure becomes a data quality investigation." - Dennis Traina, founder of 137Foundry
Batch Size and Atomicity
Upserts should be batched rather than executed one record at a time. Single-record upserts generate one round trip per record, which becomes slow at scale. Batched upserts send N records to the database in one round trip.
The batch size affects failure recovery: if a batch of 1,000 records fails halfway through, you re-process the entire batch on re-run. Since upserts are idempotent, this is safe - but smaller batches mean less re-work after a failure, at the cost of more database round trips.
A practical batch size is 500-1,000 records for typical OLTP destinations. For bulk analytics destinations that support batch imports, use the destination's recommended batch size.
def load_in_batches(session, records, batch_size=500):
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
upsert_batch(session, batch)
log.info(f"Loaded batch {i // batch_size + 1}, records {i} to {i + len(batch)}")
Testing Idempotency
An idempotent load should be verified with an explicit test, not assumed. The test is straightforward:
- Run the pipeline once. Record the destination state (count, sample record values).
- Run the pipeline again with the same source data and same checkpoint.
- Verify that the destination state is identical to step 1: same count, same values.
If the count is different after the second run, the load is not idempotent. If values changed when they shouldn't have, the upsert update logic is incorrect.
This test should be part of your pipeline's automated test suite and run against a real destination, not a mock. Mock databases can't detect constraint violations or the subtle differences between database-level upsert implementations.
Photo by panumas nikhomkhai on Pexels
Managing Overlapping Windows
Incremental extraction typically uses a time-based high-water mark: "give me all records updated after timestamp X." Records updated at exactly the boundary timestamp may appear in both the previous run and the current run. Without idempotency, this creates duplicates.
With upsert semantics, the duplicate records from the overlap window are simply updated to their current values - which are the same values that were written in the previous run. The result is correct.
Deliberately using an overlap window - re-processing the last N minutes of records on every run as a safety margin - is a common pattern for handling records that may arrive slightly out of order. Idempotent loads make this safe without any additional logic.
For Node.js pipelines using a document database like MongoDB rather than a relational database, the equivalent of an upsert is updateOne({filter}, {$set: update}, {upsert: true}). The same principles apply: filter on a stable unique key, set the updated fields.
The Complete Load Pattern
Combining checkpointing, batched upserts, and logging gives you a load step that is safe to re-run, efficient at scale, and observable:
def run_pipeline(pipeline_name):
with get_session() as session:
checkpoint = get_checkpoint(pipeline_name, session)
records = extract(since=checkpoint)
transformed = [transform(r) for r in records if r is not None]
if transformed:
load_in_batches(session, transformed)
high_water = max(r["updated_at"] for r in transformed)
save_checkpoint(pipeline_name, high_water, current_run_id(), session)
log_run_metrics(pipeline_name, len(records), len(transformed))
For the complete pipeline architecture that this load pattern fits into - incremental extraction, error categorization, and run monitoring - How to Build an ETL Pipeline for Business Data Syncing covers each component in sequence.
https://137foundry.com works with businesses on data pipeline architecture and implementation. The AI automation and data integration services at 137foundry.com/services/ai-automation include idempotent pipeline design as a core part of the engagement - along with the monitoring setup that makes the idempotency guarantees verifiable in production.
Additional reference for database-level upsert patterns: PostgreSQL's documentation on INSERT ... ON CONFLICT covers the full syntax and edge cases. SQLAlchemy's documentation covers the Python ORM implementation across different database backends. Apache Airflow's documentation covers checkpoint patterns for orchestrated pipelines.
For pipelines that process streaming data rather than batched records, Redis provides patterns for exactly-once processing semantics using atomic operations and distributed locks. Pandas handles in-memory deduplication for smaller batch pipelines as a complement to database-level upserts.
For alternative orchestration options, Prefect and Dagster both support native checkpoint and retry patterns with lighter infrastructure requirements than Airflow. Airbyte manages incremental sync state for supported connectors, handling the checkpoint layer automatically for pipelines built on its connector library. dbt handles the transformation layer on top of idempotently loaded raw data, enabling incremental SQL models that only process new records.

