Skip to main content

Command Palette

Search for a command to run...

How to Implement Structured Logging for Data Integration Pipelines

A practical guide to JSON logging, correlation IDs, and connecting logs to metrics and traces

Published

Unstructured logs are a liability. When a data integration pipeline fails at 2am, you do not want to debug it by grepping through plain-text log files that concatenate messages from multiple stages, multiple workers, and multiple runs. Structured logging fixes this by making every log entry a queryable, filterable data record instead of a raw string.

This guide covers the logging schema design, implementation patterns in Python and Node.js, how to connect logs to metrics and traces, and the tooling layer that makes structured logs useful in practice.

Why Structured Logging Matters for Pipelines

Data integration pipelines have characteristics that make structured logging especially valuable compared to single-process applications:

Multiple stages with shared log streams. A pipeline may run extraction, transformation, and loading in separate workers that all write to the same log stream. Without a stage field in every log entry, distinguishing extraction failures from loading failures requires parsing message text.

Batch and run correlation. A pipeline may process millions of records across multiple runs. Without a run ID in every log entry, answering "which records failed during run X" requires timestamps to bound the search, which is fragile.

Variable throughput. High-volume runs produce dramatically more log entries than low-volume runs. Log management platforms charge by ingested volume. Structured logging enables sampling strategies (log every Nth successful record, but log all errors) without losing the correlation between sampled and unsampled entries.

The Core Log Schema

Every structured log entry from a data integration pipeline should include these fields:

{
  "timestamp": "2026-05-08T06:30:15.123Z",
  "level": "INFO",
  "run_id": "run-2026-05-08-063000",
  "pipeline": "customer-sync",
  "stage": "extract",
  "event": "record_processed",
  "records_in": 100,
  "records_out": 100,
  "records_error": 0,
  "duration_ms": 342,
  "source_id": "tenant-42"
}

The run_id is the critical correlation field. It ties every log entry from a single pipeline execution together, allowing you to reconstruct the full timeline of any run in any log aggregation tool. Use a format that includes the pipeline name and scheduled time, not a random UUID, so run IDs are human-readable at a glance.

Implementation: Python

Python's built-in logging module supports structured output through custom formatters. For JSON logging, the python-json-logger library is the simplest approach:

import logging
from pythonjsonlogger import jsonlogger

def setup_logger(run_id, pipeline_name):
    logger = logging.getLogger(pipeline_name)
    handler = logging.StreamHandler()
    formatter = jsonlogger.JsonFormatter(
        fmt='%(timestamp)s %(level)s %(name)s %(message)s',
        datefmt='%Y-%m-%dT%H:%M:%S'
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(logging.DEBUG)

    # Inject run-scoped fields into all log entries
    extra = {'run_id': run_id, 'pipeline': pipeline_name}
    return logging.LoggerAdapter(logger, extra)

logger = setup_logger('run-2026-05-08-063000', 'customer-sync')
logger.info('Record batch processed', extra={
    'stage': 'extract',
    'records_in': 100,
    'records_out': 100,
    'records_error': 0,
    'duration_ms': 342
})

OpenTelemetry provides a Python SDK that extends this pattern with distributed trace context, allowing log entries to be correlated with spans from multiple services when your pipeline spans more than one system.

Implementation: Node.js

In Node.js, the pino library is the standard choice for structured logging. It produces JSON by default and is significantly faster than alternatives for high-throughput pipelines:

const pino = require('pino');

const logger = pino({
  level: 'info',
  base: {
    run_id: 'run-2026-05-08-063000',
    pipeline: 'customer-sync'
  }
});

logger.info({
  stage: 'extract',
  event: 'batch_processed',
  records_in: 100,
  records_out: 100,
  records_error: 0,
  duration_ms: 342
});

The base field injects run-scoped context into every log entry automatically. This is equivalent to Python's LoggerAdapter pattern. The Node.js documentation covers the async I/O patterns that make structured logging non-blocking in high-throughput contexts.

Connecting Logs to Metrics

Logs alone are not enough for pipeline monitoring. Logs capture events; metrics track rates and trends. The most useful patterns combine both:

Log-derived metrics: Some observability platforms (like Datadog) can generate metrics from log entries in real time. A log-based metric on the records_error field produces an error rate chart without requiring separate metric instrumentation. Grafana provides the same capability through its Loki log query language.

Explicit metric emission: For platforms that do not support log-derived metrics, emit explicit metrics at stage boundaries using a metrics library alongside the logging calls. Prometheus client libraries for Python and Node.js make this straightforward.

Distributed traces: When a pipeline spans multiple services, OpenTelemetry distributed traces connect log entries across service boundaries. Each service adds trace context to its log entries, and the observability backend correlates them into a single timeline. This is especially valuable for debugging failures that start in one service and manifest in another.

Log Aggregation and Storage

Raw structured logs need an aggregation layer to be searchable. Common options:

Elasticsearch / OpenSearch with Kibana: the most full-featured option for complex log queries. High operational overhead for self-hosted deployments.

Grafana Loki: optimized for log aggregation with a lower storage cost than Elasticsearch. Well-suited for pipelines that already use Grafana for metrics.

Splunk: enterprise-grade with strong compliance features. Higher cost than open-source alternatives.

Fluentd: a CNCF-graduated log aggregation router. Fluentd collects logs from multiple sources, transforms them, and routes them to any destination. Useful when you need to aggregate logs from multiple pipeline workers across different machines.

Regardless of the aggregation platform, retain raw logs for at least 30 days. Pipeline failures discovered weeks after they occurred still require the original log context to diagnose.

Sampling Strategies for High-Volume Pipelines

For pipelines that process millions of records per run, logging every record event at INFO level is impractical. A pipeline processing 5 million records per hour generates 5 million INFO log entries per hour, which at a few hundred bytes per entry amounts to several gigabytes of log volume per hour. Storage and ingestion costs scale with that volume.

Structured logging supports sampling strategies that reduce volume without losing visibility:

Batch-level logging only. Instead of logging each record, log each batch. A batch of 1,000 records produces one log entry with records_in: 1000, records_out: 998, records_error: 2. Record-level detail is reserved for error events, which are always logged individually.

Error-always sampling. Log every error event at full detail regardless of volume. For success events, log every Nth batch at INFO level and the rest at DEBUG. This produces a representative sample of the success path while capturing complete data on every failure.

Level-based production filtering. Set production log level to INFO and capture summary-level events: batch boundaries, run start and end, stage transitions. When investigating a specific failure, temporarily lower the log level to DEBUG for the affected pipeline. Because every entry is structured, filtering by run_id or stage returns only relevant entries regardless of total log volume.

The key constraint: error events and DLQ writes always log at full detail and full verbosity. Sampling applies only to the success path where the primary signal is volume and rate, not individual record content. This ensures that failure investigation always has complete context, even when normal operation runs with reduced logging.

For a high-volume pipeline, the practical effect of batch-level logging with error-always sampling is a 90-95% reduction in log ingestion costs while maintaining complete observability of the failure path.

What to Log at Each Stage

Extraction:

  • Run start/end, records fetched from source, pagination state, rate limit encounters

Transformation:

  • Records in, records out, records skipped (with skip reason), validation failures per field

Loading:

  • Records inserted/updated, upsert conflicts resolved, destination API errors

DLQ writes:

  • Full record payload, error, stage, retry count

Reconciliation:

  • Source count vs destination count, discrepancy details

The guide on monitoring and observing data integration pipelines covers how these log-level details connect to the higher-level monitoring and alerting layer.

The JSON specification and the syslog RFC 5424 both inform structured logging best practices, with RFC 5424 defining standard severity levels that most logging frameworks map to.

137Foundry designs observable data integration systems with structured logging as a first-class requirement, not an afterthought. The data integration and technical SEO services both rely on instrumented, observable pipelines in production.