In financial services, retail, and enterprise operations, the integrity of transactional data is not just a technical concern, it is a regulatory operational, and strategic one. Organizations processing millions of transaction daily face mounting challenges, detecting anomalies before they become crises, maintaining data lineage for audit readiness, enforcing quality standards across international data feeds, and ensuring pipelines meet strict SLA requirements.
This project presents a fully production grade data engineering platform built to address these challenges at scale. Designed around a synthetic financial payments dataset of over 10.5 million records, the system demonstrates end-to-end data pipeline architecture, from raw data ingestion and quality validation thtough autoomated anomaly detection, governance documentation, and Snowflake compatible reporting exports. Every component is containerized, reproducible, and domain agnostic, making it applicable to healthcare, manufacrturing, retail and any industry where data quality and pipeline reliability are critical.
Build an enterprise scale ETL pipeline capable of processing 10M+ records with full ACID compliance.
Implement automated anomaly detection across 7 distinct operational scenario types
Enforce data governance through lineage documentation, quality rules, and remediation workflows.
Track pipeline SLA performance and detect schema drift automatically across every batch.
Export snowflake compatible , Power BI ready reporting datasets for downstream consumption.
Demonstrate production grade engineering practices : rollback safety procedures, audit trails, observability.
The full pipeline includes the following stages:
Synthetic Data Generation - 10.5M realistic payment records across 50K accounts and 5K merchants.
Schema Initialization - PostgreSQL tables, triggers, stored procedures, views, and governance seed data.
ETL Pipeline - Validated batch loading with scvhema drift detection, SLA tracking, and rollback safety.
Anomaly Detection - 7 real time monitoring scenarios triggered automatically.
Governance Layer - data lineage, quality rules, and remediation workflow logging.
KPI Snapshot Generation - daily aggregated metrics for executive reporting.
Snowflake-Compatible Export - 9 CSV report types ready for cloud warehouse ingestion or BI tools.
Each component is modular, the domain specific data layer can be replaced with healthcare, manufacturing, or retail data without changing a single line of pipeline code.
1.Synthetic Data generation
The system generates 10.5 million realistic financial transaction records in configurable batch sizes using Numpy and Pandas. the generation engine simulates realistic distributions including lognormal transaction amounts, country weighted account geography, 5% geo mismatch injection for fraud simulation, and 7% failure rate injection for anomaly testing.
amounts = np.random.lognormal(mean=4.5, sigma=1.8, size=size).round(2)
amounts = np.clip(amounts, 0.50, 500_000)
Dimension tables for 50,000 accounts and 5000 merchants are generated separately with realistic categorical distributions across account types, risk tiers, merchant categories and geographic regions.
2. Database Schema Design
The schema is structured across five layers, dimensions, facts, audit, governance, and KPI, with full referential integrity enforced via foreign keys and PostgreSQL constraints.
SQL:
CREATE TABLE fact_transactions (
transaction_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
account_id VARCHAR(20) NOT NULL REFERENCES dim_accounts(account_id),
merchant_id VARCHAR(20) NOT NULL REFERENCES dim_merchants(merchant_id),
amount NUMERIC(12,2) NOT NULL,
status VARCHAR(20) NOT NULL,
);
Seven automated triggers fire on every insert or update, detecting anomalies in real time without any manual intervention.
3. ETL Pipeline with Validation and SLA Tracking
The ETL pipeline uses PostgreSQL's COPY command for bulk loading, approximately 3x faster than row-by-row INSERT, while validating every batch against configurable quality rules before load.
Python:
cur.copy_expert(
f"COPY {table_name} ({cols}) FROM STDIN WITH CSV NULL '\\N'",
buffer)
Every pipeline run is logged with records processed, records failed, schema violations, actual elapsed time, and SLA compliance status. Invalid rows are quarantined to separate CSV files and never silently dropped.
4. Seven Automated Trigger Scenarios
Seven PostgreSQL triggers fire automatically on every transaction insert, requiring zero manual monitoring:
5. Schema Drift Detection
Each ETL batch is automatically compared against the expected schema definition. Missing columns, unexpected columns, and type deviations are logged to a dedicated drift table and surfaced in the monitoring dashboard.
Python:
missing = expected - actual
for col in missing:
cur.execute("""
INSERT INTO audit_schema_drift_log
table_name, column_name, drift_type, ...)
VALUES (%s, %s, 'MISSING_COLUMN', ...)
""", (table_name, col))
6. Governance Layer
Full data governance is implemented across three tables. The lineage table documents every data source, transformation type, and data owner from ingestion to reporting. The quality rules table defines 8 active validation rules covering range checks, value set validation, referential integrity, and geo validation. The remediation log tracks every open issue with root cause, assigned owner, and resolution status.
7. KPI Snapshots and Reporting Exports
Daily KPI snapshots are generated via stored procedure and cover total transaction volume, success rate, anomaly count, SLA breach count, average processing time, and schema drift events. Nine Snowflake-compatible CSV reports are exported after every pipeline run.
Database PostgreSQL 15 ACID Compliant transactional storage
Containerization Docker Compose Reproducible local deployment
ETL Engine Python + psycopg2 Bulk validated data loading
Data Generation NumPy + Pandas Realistic synthetic record generation
Monitoring Python + SQLAlchemy 7-Scenario automated anomaly detection
Reporting CSV Exports Snowflake, Power BI, Tableau ready
Governance PostgreSQL tables + stored procedures Lineage, Quality, Remediation
Visual DB Tool pgAdmin 4 Browser-based database exploration
10,555,000 records processed with full ACID compliance
100% rollback-safe via stored procedures with nested exception handling
30% reduction in manual audit effort through automated trigger-based detection
~3x faster ETL throughput via PostgreSQL COPY vs row-by-row INSERT
7 scenario types monitored automatically without human intervention
SLA tracking per pipeline run with breach detection and logging
Full data lineage documented from raw source to reporting layer
9 Snowflake-compatible CSV exports generated after every pipeline run
Financial Services - Payment processors like Worldpay and Stripe need exactly this: real-time anomaly detection, fraud flagging, SLA compliance tracking, and regulatory audit trails across billions of daily transactions.
Healthcare - Replace payment records with patient admission records. The same pipeline validates clinical data quality, detects anomalies in lab results, and maintains lineage documentation for HIPAA audit readiness.
Manufacturing - Replace transactions with IoT sensor readings from factory equipment. The same triggers detect anomalous sensor spikes, schema changes from firmware updates, and SLA breaches in real-time monitoring feeds.
Retail - Replace transactions with e-commerce order events. The same governance layer documents data ownership across catalog systems, fulfillment pipelines, and customer analytics feeds.
This project successfully demonstrates end-to-end data engineering from synthetic generation through governance-ready reporting
Production SQL architecture: Triggers, Stored procedures, Views, ACID compliance, Rollback safety
Automated observability without external monitoring tools, built entirely in SQL and Python
Data governance practices: Lineage documentation, quality rule enforcement, remediation workflows
Snowflake and cloud data warehouse readiness through compatible export formats
Containerized, reproducible deployment with Docker for any environment