← Back to all products

Data Pipeline Testing Kit

$39

PySpark unit testing framework, mock data generators, integration test patterns, data contract validation, and CI/CD templates.

📁 17 files🏷 v1.0.0
PythonYAMLTOMLJSONMarkdownDatabricksPySparkSpark

📁 File Structure 17 files

data-pipeline-testing/ ├── LICENSE ├── README.md ├── configs/ │ └── test_config.yaml ├── fixtures/ │ ├── expected_outputs/ │ │ └── customer_summary.json │ ├── sample_customers.json │ └── sample_orders.json ├── guides/ │ └── testing-data-pipelines.md ├── src/ │ ├── assertions.py │ ├── data_generators.py │ ├── mock_utils.py │ ├── snapshot_testing.py │ └── test_framework.py └── tests/ ├── conftest.py ├── test_bronze_pipeline.py ├── test_gold_pipeline.py └── test_silver_pipeline.py

📖 Documentation Preview README excerpt

Data Pipeline Testing Kit

Comprehensive testing framework for PySpark data pipelines — from unit tests to integration validation.

By [Datanest Digital](https://datanest.dev) | Version 1.0.0 | $39

---

What You Get

A complete testing toolkit for data pipelines running on Databricks and PySpark, including:

  • Test Framework — base classes and runners for PySpark unit/integration tests
  • Data Generators — realistic synthetic data factories for customers, orders, events
  • Custom Assertions — DataFrame-level assertions for schema, row count, nulls, uniqueness
  • Mock Utilities — helpers for mocking spark, dbutils, Delta tables, and external APIs
  • Snapshot Testing — golden-file comparison for pipeline output validation
  • Sample Fixtures — ready-to-use JSON test data (customers, orders, expected outputs)
  • Pipeline Tests — complete examples testing bronze, silver, and gold layers

File Tree


data-pipeline-testing/
├── README.md
├── manifest.json
├── LICENSE
├── src/
│   ├── test_framework.py        # Base test classes and PySpark test runner
│   ├── data_generators.py       # Synthetic data factories
│   ├── assertions.py            # DataFrame assertion library
│   ├── mock_utils.py            # Spark/dbutils/Delta mocking helpers
│   └── snapshot_testing.py      # Golden-file snapshot comparison
├── fixtures/
│   ├── sample_customers.json    # 50 customer records
│   ├── sample_orders.json       # 100 order records
│   └── expected_outputs/
│       └── customer_summary.json
├── tests/
│   ├── conftest.py              # Shared pytest fixtures with SparkSession
│   ├── test_bronze_pipeline.py  # Bronze layer ingestion tests
│   ├── test_silver_pipeline.py  # Silver layer transformation tests
│   └── test_gold_pipeline.py    # Gold layer aggregation tests
├── configs/
│   └── test_config.yaml         # Test environment configuration
└── guides/
    └── testing-data-pipelines.md

Getting Started

1. Install Dependencies


pip install pyspark delta-spark pytest pyyaml

2. Use the Test Framework



*... continues with setup instructions, usage examples, and more.*

📄 Code Sample .py preview

src/assertions.py """ Data Pipeline Testing Kit — Assertions Library By Datanest Digital (https://datanest.dev) | Version 1.0.0 Custom assertion functions for validating PySpark DataFrames in tests. Covers schema validation, null checks, uniqueness, value ranges, referential integrity, and data quality rules. Usage: from assertions import DataFrameAssertions assertions = DataFrameAssertions(spark) assertions.assert_schema_matches(df, expected_schema) assertions.assert_no_nulls(df, ["id", "name"]) assertions.assert_unique(df, ["id"]) """ from __future__ import annotations from typing import Any, Dict, List, Optional, Set, Union from pyspark.sql import SparkSession, DataFrame from pyspark.sql import functions as F from pyspark.sql.types import StructType, StructField class AssertionError(Exception): """Raised when a DataFrame assertion fails.""" pass class DataFrameAssertions: """Library of DataFrame-level assertions for pipeline testing. Args: spark: Active SparkSession (used for creating comparison DataFrames). """ def __init__(self, spark: SparkSession) -> None: self.spark = spark # ========================================================================= # Schema Assertions # ========================================================================= def assert_schema_matches( self, df: DataFrame, expected: StructType, check_nullable: bool = False, # ... 281 more lines ...