← Back to all products
$49
CDC Replication Toolkit
Change data capture patterns with Debezium, database replication scripts, event sourcing integration, and consistency checks.
PythonYAMLMarkdownJSONDatabricksPostgreSQL
📁 File Structure 19 files
cdc-replication-toolkit/
├── LICENSE
├── README.md
├── configs/
│ ├── replication_config.yaml
│ └── source_mappings/
│ ├── mysql_to_delta.yaml
│ ├── postgres_to_delta.yaml
│ └── sqlserver_to_delta.yaml
├── guides/
│ └── cdc-replication-guide.md
├── notebooks/
│ ├── replication_status.py
│ └── start_replication.py
├── src/
│ ├── cdc_processor.py
│ ├── debezium_parser.py
│ ├── merge_applier.py
│ ├── offset_manager.py
│ ├── replication_monitor.py
│ └── schema_mapper.py
└── tests/
├── conftest.py
├── test_cdc_processor.py
└── test_debezium_parser.py
📖 Documentation Preview README excerpt
CDC Replication Toolkit
Production-ready change data capture pipeline for Databricks — Debezium parsing, MERGE INTO application, offset management, and replication monitoring.
By [Datanest Digital](https://datanest.dev) | Version 1.0.0 | $49
---
What You Get
- CDC Event Processor — Process insert/update/delete events and apply to target Delta tables
- Debezium Parser — Parse Debezium JSON envelope format with before/after images
- MERGE Applier — Apply CDC changes via
MERGE INTOwith full operation type support - Offset Manager — Track Kafka offsets and LSN positions with checkpoint management
- Schema Mapper — Map source database schemas to Delta targets with type conversion
- Replication Monitor — Track replication lag, throughput, and error rates
File Tree
cdc-replication-toolkit/
├── README.md
├── manifest.json
├── LICENSE
├── src/
│ ├── cdc_processor.py # CDC event processing engine
│ ├── debezium_parser.py # Debezium JSON format parser
│ ├── merge_applier.py # MERGE INTO change application
│ ├── offset_manager.py # Kafka offset & LSN tracking
│ ├── schema_mapper.py # Source→target schema mapping
│ └── replication_monitor.py # Lag, throughput, error monitoring
├── configs/
│ ├── replication_config.yaml # Main replication configuration
│ └── source_mappings/
│ ├── postgres_to_delta.yaml # PostgreSQL type mappings
│ ├── mysql_to_delta.yaml # MySQL type mappings
│ └── sqlserver_to_delta.yaml # SQL Server type mappings
├── notebooks/
│ ├── start_replication.py # Start CDC streaming pipeline
│ └── replication_status.py # Replication status dashboard
├── tests/
│ ├── conftest.py # Shared fixtures
│ ├── test_cdc_processor.py # CDC processing tests
│ └── test_debezium_parser.py # Debezium parsing tests
└── guides/
└── cdc-replication-guide.md # CDC patterns & Debezium guide
Getting Started
1. Configure Source Mappings
Edit configs/replication_config.yaml with your source database and Kafka settings:
source:
type: postgres
kafka_bootstrap_servers: "broker1:9092,broker2:9092"
topic_prefix: "dbserver1"
tables:
*... continues with setup instructions, usage examples, and more.*
📄 Code Sample .py preview
src/cdc_processor.py
"""
CDC Processor — Process CDC events from Kafka and apply to target Delta tables.
Orchestrates the end-to-end CDC pipeline: read from Kafka, parse Debezium
envelopes, map schemas, apply changes via MERGE INTO, and manage offsets.
Author: Datanest Digital
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional
import yaml
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.streaming import StreamingQuery
from pyspark.sql.types import StringType, StructField, StructType
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
class OperationType(str, Enum):
"""CDC operation types aligned with Debezium conventions."""
CREATE = "c"
UPDATE = "u"
DELETE = "d"
READ = "r" # Snapshot read
@dataclass
class CDCEvent:
"""Parsed CDC event with before/after images."""
operation: OperationType
table: str
before: Optional[Dict[str, Any]] = None
after: Optional[Dict[str, Any]] = None
source_ts_ms: Optional[int] = None
lsn: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
# ... 201 more lines ...