← Back to all products

Data Observability Setup

$49

Pipeline monitoring dashboards, data freshness alerts, lineage tracking, cost monitoring, and incident response runbooks.

📁 17 files🏷 v1.0.0
PythonYAMLMarkdownJSONDatabricksPySparkSpark

📁 File Structure 17 files

data-observability-setup/ ├── LICENSE ├── README.md ├── configs/ │ ├── alert_rules.yaml │ └── observability_config.yaml ├── guides/ │ └── data-observability-guide.md ├── notebooks/ │ ├── observability_dashboard.py │ └── setup_observability.py ├── src/ │ ├── alert_manager.py │ ├── anomaly_detector.py │ ├── dashboard_data.py │ ├── freshness_monitor.py │ ├── lineage_tracker.py │ └── metric_collector.py └── tests/ ├── conftest.py ├── test_anomaly_detector.py └── test_freshness_monitor.py

📖 Documentation Preview README excerpt

Data Observability Setup

Complete observability framework for Databricks data pipelines — lineage tracking, anomaly detection, SLA monitoring, and alerting.

By [Datanest Digital](https://datanest.dev) | Version 1.0.0 | $49

---

What You Get

  • Data Lineage Tracking — Trace every record from source through transformations to target, stored in Delta
  • Metric Collection — Automated capture of row counts, durations, data volumes, and error rates
  • Statistical Anomaly Detection — Z-score, IQR, and moving average methods to catch pipeline drift
  • Freshness Monitoring — SLA-aware data freshness checks with breach detection
  • Alert Routing — Multi-channel alerting via Slack, PagerDuty, and email with severity-based routing
  • Dashboard-Ready Data — Pre-aggregated health scores and metrics for observability dashboards

File Tree


data-observability-setup/
├── README.md
├── manifest.json
├── LICENSE
├── src/
│   ├── lineage_tracker.py          # Source→transform→target lineage
│   ├── metric_collector.py         # Pipeline metric collection
│   ├── anomaly_detector.py         # Statistical anomaly detection
│   ├── freshness_monitor.py        # Data freshness & SLA monitoring
│   ├── alert_manager.py            # Multi-channel alert routing
│   └── dashboard_data.py           # Dashboard aggregation & health scores
├── configs/
│   ├── observability_config.yaml   # Main configuration
│   └── alert_rules.yaml            # Alert rule definitions
├── notebooks/
│   ├── observability_dashboard.py  # Health & lineage dashboard
│   └── setup_observability.py      # Initialize observability tables
├── tests/
│   ├── conftest.py                 # Shared fixtures
│   ├── test_anomaly_detector.py    # Anomaly detection tests
│   └── test_freshness_monitor.py   # SLA breach tests
└── guides/
    └── data-observability-guide.md # Observability strategy guide

Getting Started

1. Initialize Observability Tables

Run the setup notebook in your Databricks workspace to create the required Delta tables:


# In Databricks — run notebooks/setup_observability.py
# Creates: observability.lineage, observability.metrics,
#          observability.alerts, observability.anomalies

2. Track Lineage in Your Pipelines



*... continues with setup instructions, usage examples, and more.*

📄 Code Sample .py preview

src/alert_manager.py """ Alert Manager — Route alerts to Slack, PagerDuty, and email based on severity. Supports severity-based routing, business-hours scheduling, and alert deduplication via a cooldown window. Author: Datanest Digital """ from __future__ import annotations import json import logging import smtplib import uuid from dataclasses import dataclass, field from datetime import datetime, timedelta from email.mime.text import MIMEText from typing import Any, Dict, List, Optional import requests import yaml logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Data models # --------------------------------------------------------------------------- @dataclass class AlertChannel: """Configuration for a single alert channel.""" name: str channel_type: str # "slack", "pagerduty", "email" endpoint: str # Webhook URL or SMTP host min_severity: str = "low" schedule: str = "always" # "always", "business_hours", "oncall" extra: Dict[str, Any] = field(default_factory=dict) @dataclass class Alert: """An alert event to be routed.""" alert_id: str severity: str title: str message: str # ... 258 more lines ...