← Back to all products

Spark ETL Framework

$59

Production PySpark ETL framework with schema validation, data quality checks, incremental processing, and medallion architecture.

📁 16 files🏷 v1.0.0
PythonYAMLMarkdownJSONDatabricksPySparkSparkDelta Lake

📁 File Structure 16 files

spark-etl-framework/ ├── LICENSE ├── README.md ├── configs/ │ ├── pipeline_config.yaml │ └── quality_rules.yaml ├── guides/ │ └── etl-patterns.md ├── notebooks/ │ ├── backfill.py │ └── run_pipeline.py ├── src/ │ ├── bronze_loader.py │ ├── config_manager.py │ ├── etl_base.py │ ├── gold_aggregator.py │ ├── quality_gate.py │ └── silver_transformer.py └── tests/ ├── conftest.py └── test_etl_base.py

📖 Documentation Preview README excerpt

Spark ETL Framework

Production-ready medallion architecture ETL framework for Databricks and Apache Spark.

Build reliable, observable, and maintainable data pipelines with a battle-tested extract-transform-load pattern that scales from prototype to petabyte.

---

What You Get

  • Abstract ETL base class with built-in logging, metrics collection, and error handling
  • Medallion architecture (Bronze / Silver / Gold) with production patterns baked in
  • Data quality gates between every layer — catch issues before they propagate
  • YAML-driven configuration with environment overrides and secret scope integration
  • Databricks notebooks for orchestration and date-range backfills
  • Comprehensive test suite with mock Spark sessions and fixture data
  • Architecture guide covering idempotency, partitioning, and error recovery

File Tree


spark-etl-framework/
├── README.md
├── manifest.json
├── LICENSE
├── src/
│   ├── etl_base.py              # Abstract base ETL class (~200 lines)
│   ├── bronze_loader.py         # Bronze layer ingestion (~150 lines)
│   ├── silver_transformer.py    # Silver layer transforms (~200 lines)
│   ├── gold_aggregator.py       # Gold layer aggregations (~150 lines)
│   ├── quality_gate.py          # Inter-layer quality checks (~120 lines)
│   └── config_manager.py        # YAML config + env overrides (~100 lines)
├── configs/
│   ├── pipeline_config.yaml     # Pipeline configuration
│   └── quality_rules.yaml       # Quality rule definitions
├── notebooks/
│   ├── run_pipeline.py          # Orchestration entry point
│   └── backfill.py              # Date-range backfill utility
├── tests/
│   ├── conftest.py              # Spark fixtures & sample data
│   └── test_etl_base.py         # Unit tests
└── guides/
    └── etl-patterns.md          # Architecture & patterns guide

Getting Started

1. Configure Your Pipeline

Edit configs/pipeline_config.yaml to define your source, destination, and quality rules:


pipeline:
  name: "customer_orders"
  schedule: "0 6 * * *"

source:
  format: "json"
  path: "/mnt/raw/customer_orders/"


*... continues with setup instructions, usage examples, and more.*

📄 Code Sample .py preview

src/bronze_loader.py """ Bronze Layer Loader — Spark ETL Framework ============================================ Ingests raw data into the Bronze (landing) layer of the medallion architecture. Automatically infers schema, appends audit metadata columns, and writes to Delta format. Metadata columns added: _source_file — originating file path _ingested_at — UTC ingestion timestamp _batch_id — unique batch identifier By Datanest Digital — https://datanest.dev """ from __future__ import annotations from datetime import datetime, timezone from typing import Any, Dict, List, Optional from pyspark.sql import DataFrame, SparkSession from pyspark.sql import functions as F from pyspark.sql.types import StringType, StructType, TimestampType from src.etl_base import ETLBase class BronzeLoader(ETLBase): """ Load raw files into the Bronze Delta table with metadata enrichment. Supports JSON, CSV, Parquet, and Avro source formats. Schema can be explicitly provided via config or auto-inferred from the source data. Config keys (under ``source``): format: str — file format (json | csv | parquet | avro) path: str — source directory path options: dict — extra reader options (e.g. ``{"header": "true"}``) schema: dict — optional explicit StructType definition Config keys (under ``destination``): database: str — target database / catalog table: str — target table name path: str — Delta table storage path (optional) """ def __init__( self, spark: SparkSession, pipeline_name: str, # ... 127 more lines ...