Skip to main content

Command Palette

Search for a command to run...

How to Set Up a Python Data Pipeline That Runs on a Schedule Without Cron Headaches

Build a reliable scheduled data pipeline using Python and modern scheduling tools

Published

Cron works. It has worked for decades. But anyone who has managed more than a handful of cron jobs on a production server knows the pain points: no built-in retry logic, no error notifications unless you configure them yourself, no dependency management between jobs, and logging that requires manual setup. When a cron job fails at 3 AM, you find out when someone notices the morning report is missing.

Python gives you better options for scheduling data pipelines. This guide walks through building a data pipeline that extracts, transforms, and loads data on a reliable schedule with proper error handling, logging, and monitoring, all without managing raw cron entries.

Step 1: Structure Your Pipeline as Composable Functions

Before adding scheduling, structure your pipeline code so each stage is a self-contained function that accepts input and returns output. This makes testing, debugging, and rerunning individual stages straightforward.

# pipeline/extract.py
import requests
import pandas as pd
from typing import Optional

def extract_sales_data(api_url: str, api_key: str, date: str) -> pd.DataFrame:
    """Extract sales data from the API for a given date."""
    headers = {'Authorization': f'Bearer {api_key}'}
    params = {'date': date, 'limit': 1000}

    all_records = []
    page = 1

    while True:
        params['page'] = page
        response = requests.get(api_url, headers=headers, params=params)
        response.raise_for_status()
        data = response.json()

        if not data['records']:
            break

        all_records.extend(data['records'])
        page += 1

    return pd.DataFrame(all_records)
# pipeline/transform.py
import pandas as pd

def clean_and_transform(df: pd.DataFrame) -> pd.DataFrame:
    """Clean raw sales data and compute derived metrics."""
    # Standardize date format
    df['sale_date'] = pd.to_datetime(df['sale_date']).dt.date

    # Remove test transactions
    df = df[df['customer_id'] != 'TEST_ACCOUNT']

    # Calculate revenue metrics
    df['revenue_usd'] = df['amount_cents'] / 100.0
    df['tax_amount'] = df['revenue_usd'] * df['tax_rate']
    df['net_revenue'] = df['revenue_usd'] - df['tax_amount']

    # Drop duplicates based on transaction ID
    df = df.drop_duplicates(subset=['transaction_id'], keep='first')

    return df

Separating extraction from transformation means you can rerun the transformation without hitting the API again. This saves API quota and speeds up development. Store each module in its own file so changes to transformation logic do not require touching extraction code.

The Pandas documentation on DataFrame operations covers the full range of transformation methods available. For type safety and validation, consider adding Pandera schemas to validate your DataFrames at each pipeline stage.

Developer writing Python code on a laptop with a terminal window open Photo by Christina Morillo on Pexels

Step 2: Add Error Handling and Retries

Network requests fail. APIs return unexpected responses. Database connections time out. Your pipeline needs to handle these gracefully rather than crashing silently.

The tenacity library provides a clean decorator-based approach to retries:

# pipeline/extract.py (updated with retries)
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import requests

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=60),
    retry=retry_if_exception_type((requests.ConnectionError, requests.Timeout))
)
def extract_sales_data(api_url: str, api_key: str, date: str) -> pd.DataFrame:
    headers = {'Authorization': f'Bearer {api_key}'}
    params = {'date': date, 'limit': 1000}

    response = requests.get(api_url, headers=headers, params=params, timeout=30)
    response.raise_for_status()
    return pd.DataFrame(response.json()['records'])

This retries up to 3 times with exponential backoff (4 seconds, 8 seconds, 16 seconds) for connection errors and timeouts. It does not retry for HTTP 400 or 404 errors, which indicate problems with your request rather than transient network issues.

Add validation between stages to catch data quality issues before they propagate. The Great Expectations library provides a comprehensive framework for data validation, but for simpler pipelines, basic assertions work fine:

def validate_sales_data(df: pd.DataFrame) -> pd.DataFrame:
    """Validate extracted data before transformation."""
    assert len(df) > 0, "No records extracted"
    assert df['transaction_id'].nunique() == len(df), "Duplicate transaction IDs found"
    assert df['amount_cents'].min() >= 0, "Negative amounts detected"
    assert df['sale_date'].notna().all(), "Null sale dates found"
    return df

Step 3: Choose Your Scheduling Approach

This is where most developers reach for cron and later regret it. Here are three better options, each suited to different complexity levels.

Option A: APScheduler (Simplest)

APScheduler (Advanced Python Scheduler) runs inside your Python process. No external dependencies, no separate scheduler service. It supports cron-style schedules, interval-based schedules, and one-time execution.

# run_pipeline.py
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from pipeline.extract import extract_sales_data
from pipeline.transform import clean_and_transform
import logging
import os

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def run_daily_pipeline():
    try:
        logger.info("Starting daily sales pipeline")
        raw = extract_sales_data(
            api_url=os.environ['SALES_API_URL'],
            api_key=os.environ['SALES_API_KEY'],
            date='yesterday'
        )
        logger.info(f"Extracted {len(raw)} records")

        cleaned = clean_and_transform(raw)
        logger.info(f"Transformed to {len(cleaned)} clean records")

        # Load step here
        logger.info("Pipeline completed successfully")
    except Exception as e:
        logger.error(f"Pipeline failed: {e}")
        # Send alert (email, Slack, PagerDuty)
        raise

scheduler = BlockingScheduler()
scheduler.add_job(run_daily_pipeline, CronTrigger(hour=6, minute=30))
scheduler.start()

APScheduler is ideal for single-machine deployments with a handful of pipelines. Run the script as a systemd service or in a Docker container with restart policies. The APScheduler user guide covers persistence options so scheduled jobs survive process restarts.

Option B: Prefect (Medium Complexity)

For teams that need monitoring, a dashboard, and more sophisticated scheduling, Prefect wraps your existing Python functions with minimal code changes:

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(retries=3, retry_delay_seconds=[10, 30, 60],
      cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def extract(api_url, api_key, date):
    return extract_sales_data(api_url, api_key, date)

@task
def transform(raw_data):
    return clean_and_transform(raw_data)

@flow(name="daily-sales", log_prints=True)
def daily_sales_pipeline(date: str = 'yesterday'):
    raw = extract(os.environ['SALES_API_URL'], os.environ['SALES_API_KEY'], date)
    cleaned = transform(raw)
    print(f"Processed {len(cleaned)} records")

Prefect provides a web UI for monitoring pipeline runs, built-in alerting, and deployment options that range from a local process to Kubernetes. The learning curve is minimal if you already write Python, and the Prefect tutorials walk through common patterns step by step.

"Schedule reliability matters more than schedule precision. A pipeline that runs at 6:32 AM and never fails is more valuable than one that runs at exactly 6:00 AM and crashes twice a month." - Dennis Traina, 137Foundry

Data pipeline monitoring dashboard showing job status and run history Photo by Tima Miroshnichenko on Pexels

Option C: GitHub Actions (Zero Infrastructure)

For pipelines that do not require persistent state, GitHub Actions offers free scheduled execution with no infrastructure to manage:

# .github/workflows/daily-pipeline.yml
name: Daily Sales Pipeline
on:
  schedule:
    - cron: '30 6 * * *'
  workflow_dispatch:  # Allow manual trigger

jobs:
  run-pipeline:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      - run: pip install -r requirements.txt
      - run: python run_pipeline.py
        env:
          SALES_API_URL: ${{ secrets.SALES_API_URL }}
          SALES_API_KEY: ${{ secrets.SALES_API_KEY }}

GitHub Actions handles scheduling, secret management, and execution logging. The free tier provides 2,000 minutes per month for private repositories, which is sufficient for most daily pipelines. The limitation is execution time: each job can run for a maximum of 6 hours, and the scheduling precision is approximate (jobs may start up to 15 minutes late during peak periods).

Step 4: Add Monitoring and Alerting

A scheduled pipeline without monitoring is a liability. At minimum, implement these three monitoring patterns:

  1. Heartbeat monitoring: Use a service like Healthchecks.io (free tier available). Your pipeline pings a URL when it completes successfully. If the ping does not arrive within the expected window, you get an alert.

  2. Data quality alerts: After each pipeline run, check that the output data meets basic quality thresholds. Row counts should be within historical ranges. Key metrics should not change by more than a reasonable percentage. The data automation team at 137Foundry builds validation checks into every pipeline stage to catch issues before they reach downstream reports and dashboards.

  3. Structured logging: Use Python's logging module with structured output (JSON format) so logs are searchable. Include pipeline name, run date, stage name, record counts, and execution time in every log entry.

Putting It All Together

The complete pipeline architecture looks like this: composable Python functions for each stage, retry logic on external calls, validation between stages, a scheduler that handles failures gracefully, and monitoring that alerts you before anyone notices stale data.

For a more comprehensive guide on designing data pipelines from scratch, including how to choose between extraction methods, design transformation logic, and decide when to use pre-built connectors versus custom code, this guide on building automated data pipelines covers the full decision framework from business requirements through deployment.

Further Reading

Start with APScheduler or GitHub Actions for your first pipeline. Move to Prefect or Airflow when you have enough pipelines that monitoring and dependency management become a real operational need. The scheduling infrastructure should grow with your pipeline complexity, not ahead of it.