← Back to all products

Airflow DAG Templates

$49

30+ production Airflow DAGs with error handling, retry logic, SLA monitoring, alerting, and dynamic task generation patterns.

📁 17 files🏷 v1.0.0
PythonYAMLMarkdownJSONDatabricksSparkAirflow

📁 File Structure 17 files

airflow-dag-templates/ ├── LICENSE ├── README.md ├── configs/ │ ├── connections.yaml │ └── variables.yaml ├── dags/ │ ├── api_ingestion_dag.py │ ├── backfill_dag.py │ ├── cdc_streaming_dag.py │ ├── data_quality_dag.py │ ├── dbt_orchestration_dag.py │ └── etl_pipeline_dag.py ├── guides/ │ └── airflow-best-practices.md ├── plugins/ │ ├── hooks/ │ │ └── slack_webhook_hook.py │ ├── operators/ │ │ ├── data_quality_operator.py │ │ └── spark_submit_operator.py │ └── sensors/ │ └── s3_key_sensor_extended.py └── tests/ └── test_dags.py

📖 Documentation Preview README excerpt

Airflow DAG Templates

Production-ready Apache Airflow DAG templates for modern data pipelines.

Skip the boilerplate. Start with battle-tested DAGs for ETL, data quality, dbt orchestration, API ingestion, CDC streaming, and backfills.

---

What You Get

  • 6 production DAG templates covering the most common data pipeline patterns
  • Custom operators for Spark submit (Databricks), data quality, and more
  • Custom hooks & sensors — Slack notifications, extended S3 sensors
  • Connection & variable configs — ready-to-import YAML for all environments
  • DAG integrity test suite — catch import errors and cycles before deployment
  • Best practices guide — idempotency, XCom, dynamic DAGs, Airflow 2.x patterns

File Tree


airflow-dag-templates/
├── README.md
├── manifest.json
├── LICENSE
├── dags/
│   ├── etl_pipeline_dag.py         # Full ETL with retries & SLA
│   ├── data_quality_dag.py         # Quality checks + Slack alerts
│   ├── dbt_orchestration_dag.py    # dbt run → test → docs
│   ├── api_ingestion_dag.py        # REST API → S3 → Spark
│   ├── cdc_streaming_dag.py        # Debezium → Kafka → Delta
│   └── backfill_dag.py             # Parameterized backfill
├── plugins/
│   ├── operators/
│   │   ├── spark_submit_operator.py    # Databricks Spark submit
│   │   └── data_quality_operator.py    # Data quality checks
│   ├── hooks/
│   │   └── slack_webhook_hook.py       # Slack notifications
│   └── sensors/
│       └── s3_key_sensor_extended.py   # S3 sensor with extras
├── configs/
│   ├── connections.yaml            # Connection templates
│   └── variables.yaml              # Variable templates
├── tests/
│   └── test_dags.py                # DAG integrity tests
└── guides/
    └── airflow-best-practices.md   # Patterns & anti-patterns

Getting Started

1. Copy DAGs to your Airflow home


cp dags/*.py $AIRFLOW_HOME/dags/
cp -r plugins/ $AIRFLOW_HOME/plugins/

2. Configure connections

Use the YAML templates in configs/ to set up connections via the Airflow CLI:

... continues with setup instructions, usage examples, and more.

📄 Code Sample .py preview

dags/api_ingestion_dag.py """ API Ingestion DAG — Airflow DAG Templates ============================================ Ingests data from a paginated REST API, stages to S3, and triggers Spark processing. Handles rate limiting and pagination automatically. By Datanest Digital — https://datanest.dev """ from __future__ import annotations import json import time from datetime import datetime, timedelta from typing import Any, Dict, List, Optional from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.empty import EmptyOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook DEFAULT_ARGS: Dict[str, Any] = { "owner": "data-engineering", "depends_on_past": False, "retries": 3, "retry_delay": timedelta(minutes=5), "retry_exponential_backoff": True, "execution_timeout": timedelta(hours=2), } API_CONFIG: Dict[str, Any] = { "base_url": "https://api.example.com/v2", "endpoint": "/events", "auth_conn_id": "api_auth", "page_size": 100, "max_pages": 1000, "rate_limit_delay": 0.5, "s3_bucket": "my-data-lake", "s3_prefix": "raw/api_events/", } # --------------------------------------------------------------------------- # Task callables # --------------------------------------------------------------------------- def fetch_api_pages(**context: Any) -> Dict[str, Any]: """ Fetch all pages from the REST API and return metadata. # ... 110 more lines ...