Data Engineering Fundamentals¶
Data engineering is the practice of designing, building, and maintaining systems that collect, store, transform, and serve data. While database design (Chapter 5) covers individual database systems, data engineering focuses on data pipelines, warehousing, and the movement of data between systems at scale.
ETL vs. ELT¶
The two fundamental approaches to data processing differ in when transformation happens:
ETL (Extract, Transform, Load):
Source → Extract → Transform (in pipeline) → Load → Data Warehouse
Traditional approach: transform before storing.
ELT (Extract, Load, Transform):
Source → Extract → Load → Data Warehouse → Transform (in warehouse)
Modern approach: load raw data, transform using warehouse compute.
| Aspect | ETL | ELT |
|---|---|---|
| Transform location | External pipeline (Spark, Airflow, custom code) | Inside the data warehouse (dbt, SQL) |
| Data in warehouse | Pre-processed, clean, structured | Raw (landing zone) + transformed (marts) |
| Flexibility | Changes require pipeline code updates and redeployment | Transform logic changes are just SQL (dbt models) |
| Speed to insight | Slower (must design transforms upfront) | Faster (raw data available immediately for ad-hoc analysis) |
| Compute cost | Compute cost in pipeline infrastructure | Compute cost in warehouse (pay for query processing) |
| Data lineage | Harder to track (transformations in code) | Easier (dbt provides lineage graphs, SQL is declarative) |
| Modern preference | Still used for complex transformations, streaming | Preferred for modern cloud warehouses |
Why ELT won: Cloud data warehouses (Snowflake, BigQuery) provide virtually unlimited, elastic compute. It's cheaper and more flexible to store raw data and transform it on-demand than to pre-process everything. ELT also makes it easy to re-transform data when business logic changes—just re-run the SQL, no pipeline changes needed.
When ETL is still the right choice: When transformations require complex logic (ML feature engineering, image processing), when data must be cleaned before it enters the warehouse for compliance reasons, or when working with streaming data that needs real-time transformation.
Data Storage Architecture¶
┌─────────────────────── Modern Data Architecture ───────────────────────┐
│ │
│ ┌─── Data Sources ──────┐ │
│ │ Databases (OLTP) │ │
│ │ APIs (REST, GraphQL) │──── Ingestion ────┐ │
│ │ Event Streams (Kafka) │ (Fivetran, │ │
│ │ Files (CSV, JSON) │ Airbyte, │ │
│ │ SaaS Apps (Salesforce)│ Debezium) │ │
│ └────────────────────────┘ │ │
│ ▼ │
│ ┌─── Data Lake / Lakehouse ──┐ │
│ │ Raw data (any format) │ │
│ │ Cheap storage (S3, GCS) │ │
│ │ Schema-on-read │ │
│ └──────────┬──────────────────┘ │
│ │ │
│ Transformation (dbt, Spark) │
│ │ │
│ ▼ │
│ ┌─── Data Warehouse ─────────┐ │
│ │ Structured, optimized │ │
│ │ for analytical queries │ │
│ │ (Snowflake, BigQuery, │ │
│ │ Redshift, Databricks) │ │
│ └──────────┬──────────────────┘ │
│ │ │
│ ▼ │
│ ┌─── Consumption Layer ──────┐ │
│ │ BI Tools (Tableau, Looker) │ │
│ │ ML Pipelines │ │
│ │ Reverse ETL to SaaS │ │
│ │ Operational Analytics │ │
│ └─────────────────────────────┘ │
└────────────────────────────────────────────────────────────────────────┘
Data Warehouse¶
A centralized repository optimized for analytical queries (OLAP). Uses columnar storage, compression, and massive parallel processing (MPP). Unlike OLTP databases (optimized for many small transactions), data warehouses are optimized for fewer, complex analytical queries over large datasets.
| Warehouse | Provider | Key Features |
|---|---|---|
| Snowflake | Independent | Separation of storage and compute (scale independently), near-zero maintenance, cross-cloud, time travel (query historical data), data sharing |
| BigQuery | Serverless (no infrastructure to manage), pay-per-query, petabyte-scale, built-in ML (BigQuery ML), streaming inserts | |
| Redshift | AWS | Columnar PostgreSQL-based, tight AWS integration, Redshift Spectrum (query S3 directly), materialized views |
| Databricks | Independent | Lakehouse architecture (unify lake + warehouse), Spark-based, Unity Catalog for governance, Delta Lake format |
Why columnar storage matters:
Row-oriented storage (OLTP — PostgreSQL, MySQL):
Row 1: [id=1, name="Alice", email="alice@co.com", age=30, city="NYC"]
Row 2: [id=2, name="Bob", email="bob@co.com", age=25, city="SF"]
→ Fast for: SELECT * FROM users WHERE id = 1 (reads one row)
→ Slow for: SELECT AVG(age) FROM users (must read all columns to get age)
Columnar storage (OLAP — Snowflake, BigQuery, Redshift):
Column "id": [1, 2, 3, 4, 5, ...]
Column "name": ["Alice", "Bob", "Carol", ...]
Column "age": [30, 25, 35, ...]
→ Fast for: SELECT AVG(age) FROM users (reads only the age column)
→ Excellent compression (similar values stored together — [30, 25, 35] compresses well)
→ Slow for: SELECT * FROM users WHERE id = 1 (must read from every column)
Data Lake¶
A storage repository that holds raw data in its native format (structured, semi-structured, unstructured). Built on cheap object storage (S3, GCS, Azure Blob).
Data Lake vs. Data Warehouse:
| Aspect | Data Lake | Data Warehouse |
|---|---|---|
| Schema | Schema-on-read (flexible, no enforcement) | Schema-on-write (enforced before loading) |
| Data types | Any (JSON, Parquet, images, logs, video) | Structured (tables with defined columns) |
| Cost | Low (object storage: ~$0.023/GB/month on S3) | Higher (compute-optimized storage + processing) |
| Query speed | Variable (depends on format and engine) | Optimized (columnar, indexed, cached) |
| Users | Data engineers, data scientists | Business analysts, BI users |
| Risk | Can become a "data swamp" without governance | Governed by schema enforcement |
Data Lakehouse¶
A hybrid combining the flexibility of data lakes with the performance and ACID guarantees of data warehouses. Technologies like Delta Lake, Apache Iceberg, and Apache Hudi add table formats on top of data lakes, providing:
- ACID transactions: Atomic, consistent reads and writes on object storage
- Schema enforcement and evolution: Add, rename, drop columns without rewriting data
- Time travel: Query data as it existed at a previous point in time
- Upserts and deletes: Modify existing records (impossible on raw object storage)
- Partition evolution: Change partitioning without rewriting data (Iceberg)
-- Delta Lake example (Spark SQL)
-- Create a table backed by S3
CREATE TABLE events
USING DELTA
LOCATION 's3://my-lake/events/'
AS SELECT * FROM raw_events;
-- Time travel: query data as of yesterday
SELECT * FROM events TIMESTAMP AS OF '2025-01-14';
-- Upsert (merge): update existing rows, insert new ones
MERGE INTO events target
USING new_events source
ON target.event_id = source.event_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Data File Formats¶
Choosing the right file format dramatically affects query performance and storage cost:
| Format | Type | Compression | Schema | Query Performance | Use Case |
|---|---|---|---|---|---|
| CSV | Row-based, text | Poor | No | Slow (no predicate pushdown) | Data exchange, small datasets |
| JSON | Row-based, text | Poor | Self-describing | Slow | APIs, logs, small datasets |
| Parquet | Columnar, binary | Excellent (snappy, gzip, zstd) | Embedded | Fast (predicate pushdown, column pruning) | Default for analytics |
| ORC | Columnar, binary | Excellent | Embedded | Fast | Hive ecosystem |
| Avro | Row-based, binary | Good | Embedded (schema evolution) | Medium | Kafka, data pipelines, streaming |
Why Parquet is the standard: Parquet combines columnar storage (read only needed columns), embedded statistics (skip entire row groups based on min/max values), and excellent compression (similar values in a column compress well). A 10GB CSV file might be 1GB as Parquet, and queries that read 3 of 50 columns process 95% less data.
# Working with Parquet in Python
import pandas as pd
# Write Parquet (with compression)
df.to_parquet('data.parquet', compression='snappy', index=False)
# Read Parquet (only specific columns — skips rest on disk)
df = pd.read_parquet('data.parquet', columns=['user_id', 'event_type', 'timestamp'])
# Read Parquet with predicate pushdown (filter happens at storage level)
import pyarrow.parquet as pq
table = pq.read_table('data.parquet',
columns=['user_id', 'amount'],
filters=[('amount', '>', 100), ('country', '=', 'US')]
)
Data Modeling for Analytics¶
Dimensional Modeling¶
Dimensional modeling (by Ralph Kimball) organizes data into fact tables (measurements/events) and dimension tables (descriptive context). This is the standard approach for data warehouses and BI.
Star Schema:
┌──────────────┐
│ dim_product │
│──────────────│
│ product_id │
│ name │
│ category │
│ brand │
└──────┬───────┘
│
┌──────────────┐ ┌─────┴──────────┐ ┌──────────────┐
│ dim_customer │──│ fact_orders │──│ dim_date │
│──────────────│ │────────────────│ │──────────────│
│ customer_id │ │ order_id │ │ date_key │
│ name │ │ customer_id FK │ │ date │
│ email │ │ product_id FK │ │ day_of_week │
│ segment │ │ date_key FK │ │ month │
│ country │ │ quantity │ │ quarter │
└──────────────┘ │ amount │ │ year │
│ discount │ │ is_weekend │
└────────────────┘ └──────────────┘
Star schema (fact table surrounded by dimension tables): Simple, fast queries, widely used.
Snowflake schema (dimensions normalized into sub-dimensions): Reduces data duplication but requires more JOINs.
Fact table types: - Transaction facts: One row per event (order placed, payment processed) - Periodic snapshot facts: One row per entity per time period (daily account balance) - Accumulating snapshot facts: One row per process, updated as stages complete (order lifecycle: placed → shipped → delivered)
Slowly Changing Dimensions (SCD)¶
How to handle dimensions that change over time (e.g., a customer moves from NYC to SF):
| Type | Description | Implementation |
|---|---|---|
| SCD Type 1 | Overwrite old value | Simply UPDATE the row. History is lost. |
| SCD Type 2 | Add new row with effective dates | Add valid_from, valid_to, is_current columns. Preserves full history. |
| SCD Type 3 | Add column for previous value | Add previous_city column. Only tracks one change. |
-- SCD Type 2 implementation
-- Customer dimension with history tracking:
-- When Alice moves from NYC to SF:
-- 1. Close the current record
UPDATE dim_customer
SET valid_to = '2025-01-15', is_current = FALSE
WHERE customer_id = 123 AND is_current = TRUE;
-- 2. Insert new record
INSERT INTO dim_customer (customer_id, name, city, valid_from, valid_to, is_current)
VALUES (123, 'Alice', 'SF', '2025-01-15', '9999-12-31', TRUE);
-- Query: What was Alice's city when she placed order #456?
SELECT c.city
FROM fact_orders o
JOIN dim_customer c ON o.customer_id = c.customer_id
AND o.order_date BETWEEN c.valid_from AND c.valid_to
WHERE o.order_id = 456;
Data Ingestion¶
Change Data Capture (CDC)¶
CDC captures changes (INSERT, UPDATE, DELETE) from source databases and streams them to downstream systems in real-time or near-real-time. This is far more efficient than full-table dumps.
| CDC Method | Description | Latency | Impact on Source |
|---|---|---|---|
| Log-based | Read the database's write-ahead log (WAL/binlog) | Seconds | Minimal (reads log, no query load) |
| Query-based | Periodically query for changes (WHERE updated_at > last_run) | Minutes | Medium (adds query load, misses deletes) |
| Trigger-based | Database triggers write changes to a changelog table | Real-time | High (triggers add overhead to every write) |
Debezium is the leading open-source CDC tool. It reads database transaction logs and streams changes to Kafka:
// Debezium CDC event (Kafka message for an UPDATE)
{
"before": {
"id": 123,
"name": "Alice",
"email": "alice@old.com"
},
"after": {
"id": 123,
"name": "Alice",
"email": "alice@new.com"
},
"source": {
"db": "users_db",
"table": "users",
"ts_ms": 1705000000000
},
"op": "u" // c=create, u=update, d=delete, r=read (snapshot)
}
Ingestion Tools¶
| Tool | Type | Description |
|---|---|---|
| Fivetran | Managed ELT | Automated connectors for 300+ sources (SaaS, databases). Zero-maintenance. |
| Airbyte | Open-source ELT | Self-hosted or cloud. Growing connector library. |
| Debezium | CDC | Log-based CDC for PostgreSQL, MySQL, MongoDB → Kafka |
| Apache Kafka | Streaming platform | Distributed event streaming for real-time ingestion |
| AWS DMS | Managed CDC | Database Migration Service — CDC from/to databases and warehouses |
| Stitch | Managed ETL | Simple, managed data pipeline (Talend) |
Batch Processing vs. Stream Processing¶
| Aspect | Batch Processing | Stream Processing |
|---|---|---|
| Data handling | Process data in bounded chunks (hourly, daily) | Process data continuously as it arrives |
| Latency | Minutes to hours | Milliseconds to seconds |
| Data model | Bounded datasets (files, tables) | Unbounded streams (events, messages) |
| Complexity | Simpler (clear start/end, retry entire batch on failure) | More complex (ordering, late arrivals, exactly-once semantics) |
| State management | Stateless (each batch is independent) or checkpoint between batches | Stateful (maintain running aggregations, windows) |
| Cost | Often cheaper (scheduled resources, process during off-peak) | More expensive (always-on infrastructure) |
| Use cases | Daily reports, ML training, historical analysis, data warehouse loading | Real-time dashboards, fraud detection, alerting, IoT |
| Tools | Apache Spark (batch), dbt, Airflow | Kafka Streams, Apache Flink, Spark Structured Streaming |
Apache Spark¶
Spark is the most widely used engine for large-scale batch (and streaming) data processing. It distributes computation across a cluster and processes data in memory.
# PySpark example: analyze user behavior
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("UserAnalytics") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# Read from data lake (Parquet on S3)
events = spark.read.parquet("s3://data-lake/events/")
users = spark.read.parquet("s3://data-lake/users/")
# Transformation pipeline
daily_active_users = (
events
.filter(F.col("event_date") >= "2025-01-01")
.filter(F.col("event_type") == "page_view")
.groupBy(F.col("event_date"), F.col("user_id"))
.agg(F.count("*").alias("page_views"))
.join(users, "user_id")
.groupBy("event_date", "country")
.agg(
F.countDistinct("user_id").alias("dau"),
F.sum("page_views").alias("total_views"),
F.avg("page_views").alias("avg_views_per_user"),
)
.orderBy("event_date", "country")
)
# Write results to warehouse
daily_active_users.write \
.mode("overwrite") \
.partitionBy("event_date") \
.parquet("s3://data-lake/analytics/dau/")
Apache Kafka¶
Kafka is a distributed event streaming platform used as the central nervous system for real-time data pipelines:
Producer → Kafka Cluster → Consumer
Kafka Cluster:
┌─── Topic: "user-events" (partitioned) ─────────────────┐
│ │
│ Partition 0: [e1] [e4] [e7] [e10] ... │
│ Partition 1: [e2] [e5] [e8] [e11] ... │
│ Partition 2: [e3] [e6] [e9] [e12] ... │
│ │
│ Each partition is an ordered, immutable log. │
│ Messages are retained for a configurable period │
│ (e.g., 7 days) regardless of whether they've been read. │
└─────────────────────────────────────────────────────────┘
Key Kafka concepts:
| Concept | Description |
|---|---|
| Topic | Named stream of records (like a table). Messages are published to topics. |
| Partition | A topic is split into partitions for parallelism. Each partition is an ordered log. |
| Offset | Position of a message within a partition. Consumers track their offset. |
| Consumer Group | Multiple consumers sharing a topic. Each partition is read by exactly one consumer in the group. |
| Replication | Each partition is replicated across brokers (typically 3x) for fault tolerance. |
| Retention | Messages are retained by time (7 days default) or size, not by consumption. Multiple consumers can read the same data. |
# Kafka producer (Python with confluent-kafka)
from confluent_kafka import Producer
import json
producer = Producer({'bootstrap.servers': 'kafka:9092'})
def send_event(topic, event):
producer.produce(
topic=topic,
key=event['user_id'].encode(), # Key determines partition
value=json.dumps(event).encode(),
)
producer.flush()
send_event('user-events', {
'user_id': 'u123',
'event_type': 'purchase',
'amount': 99.99,
'timestamp': '2025-01-15T10:30:00Z'
})
Stream Processing with Apache Flink¶
Flink is the most capable stream processing engine, supporting event time processing, exactly-once semantics, and complex event processing:
# PyFlink example: real-time fraud detection
from pyflink.table import EnvironmentSettings, TableEnvironment
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
# Define source (Kafka)
t_env.execute_sql("""
CREATE TABLE transactions (
user_id STRING,
amount DECIMAL(10, 2),
merchant STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
)
""")
# Detect suspicious patterns: > 3 transactions in 1 minute
t_env.execute_sql("""
SELECT user_id, COUNT(*) as tx_count, SUM(amount) as total
FROM transactions
GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' MINUTE)
HAVING COUNT(*) > 3
""")
Windowing in Stream Processing¶
When processing unbounded streams, windows group events into bounded chunks for aggregation:
| Window Type | Description | Use Case |
|---|---|---|
| Tumbling | Fixed-size, non-overlapping (every 5 minutes) | Hourly report, 5-minute metrics |
| Sliding | Fixed-size, overlapping (5-min window, slides every 1 min) | Moving average, rate calculation |
| Session | Dynamic size based on activity gap (close after 30 min of inactivity) | User session analytics |
| Global | Single window for all time | Running totals, cardinality estimation |
Tumbling Window (5 min):
|──Window 1──|──Window 2──|──Window 3──|
00:00 00:05 00:05 00:10 00:10 00:15
Sliding Window (5 min, slide 1 min):
|──Window 1──────|
|──Window 2──────|
|──Window 3──────|
00:00 00:01 00:02 00:05 00:06 00:07
Session Window (30 min gap):
|events|..gap..|events events|...30 min gap...|events|
|─Session 1────────────────────| |─Session 2─|
Workflow Orchestration¶
Orchestration tools manage complex data pipelines—scheduling tasks, handling dependencies, retries, and monitoring.
| Tool | Language | Key Features |
|---|---|---|
| Apache Airflow | Python | Industry standard, DAG-based, extensive operator ecosystem, 2000+ integrations |
| Dagster | Python | Software-defined assets, built-in data quality, observability, modern UI |
| Prefect | Python | Pythonic, hybrid execution model, dynamic workflows, easier than Airflow |
| dbt | SQL/Jinja | SQL-based transforms, testing, documentation, lineage, the standard for ELT transforms |
| Temporal | Multi-language | Workflow-as-code, durable execution, fault-tolerant, not just for data |
Apache Airflow¶
Airflow defines workflows as Directed Acyclic Graphs (DAGs) — each node is a task, and edges define dependencies.
# Apache Airflow DAG example
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'email_on_failure': True,
'email': ['data-oncall@company.com'],
}
with DAG(
dag_id='daily_user_metrics',
default_args=default_args,
schedule_interval='@daily',
start_date=days_ago(1),
catchup=False,
tags=['metrics', 'users'],
doc_md="""
## Daily User Metrics Pipeline
Extracts user events, computes daily metrics, and loads into warehouse.
**Owner**: data-team | **SLA**: Complete by 6:00 UTC
""",
) as dag:
extract = PythonOperator(
task_id='extract_user_events',
python_callable=extract_user_events,
execution_timeout=timedelta(minutes=30),
)
transform = PythonOperator(
task_id='compute_metrics',
python_callable=compute_daily_metrics,
)
load = PostgresOperator(
task_id='load_to_warehouse',
postgres_conn_id='warehouse',
sql='sql/load_user_metrics.sql',
)
validate = PythonOperator(
task_id='validate_metrics',
python_callable=validate_row_counts, # Check that output is reasonable
)
extract >> transform >> load >> validate # Define task dependencies
dbt (Data Build Tool)¶
dbt is the standard for SQL-based transformations in the ELT paradigm. It lets you write transformations as SQL SELECT statements, and dbt handles materialization, testing, documentation, and lineage.
-- models/staging/stg_orders.sql
-- Staging model: clean and standardize raw data
WITH source AS (
SELECT * FROM {{ source('ecommerce', 'raw_orders') }}
),
cleaned AS (
SELECT
id AS order_id,
user_id,
CAST(amount AS DECIMAL(10, 2)) AS amount,
status,
CAST(created_at AS TIMESTAMP) AS ordered_at
FROM source
WHERE status IS NOT NULL -- Remove malformed records
)
SELECT * FROM cleaned
-- models/marts/fct_daily_revenue.sql
-- Business-level metric: daily revenue by product category
{{ config(
materialized='incremental',
unique_key='date_day || category',
partition_by={'field': 'date_day', 'data_type': 'date'}
) }}
SELECT
DATE_TRUNC('day', o.ordered_at) AS date_day,
p.category,
COUNT(DISTINCT o.order_id) AS order_count,
COUNT(DISTINCT o.user_id) AS unique_customers,
SUM(o.amount) AS revenue,
AVG(o.amount) AS avg_order_value
FROM {{ ref('stg_orders') }} o
JOIN {{ ref('stg_products') }} p ON o.product_id = p.product_id
WHERE o.status = 'completed'
{% if is_incremental() %}
-- Only process new data on incremental runs
AND o.ordered_at > (SELECT MAX(date_day) FROM {{ this }})
{% endif %}
GROUP BY 1, 2
# models/marts/fct_daily_revenue.yml — dbt tests and documentation
version: 2
models:
- name: fct_daily_revenue
description: "Daily revenue metrics by product category"
columns:
- name: date_day
description: "The date of the revenue"
tests:
- not_null
- name: category
description: "Product category"
tests:
- not_null
- accepted_values:
values: ['electronics', 'clothing', 'books', 'home']
- name: revenue
description: "Total revenue in USD"
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: order_count
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "> 0"
dbt commands:
dbt run # Run all models
dbt run --select marts.* # Run only mart models
dbt test # Run all tests
dbt docs generate && dbt docs serve # Generate and view documentation
dbt source freshness # Check if source data is up to date
Data Partitioning and Optimization¶
Partitioning Strategies¶
Partitioning divides large tables into smaller physical chunks, enabling the query engine to skip irrelevant partitions entirely (partition pruning):
| Strategy | Description | Use Case |
|---|---|---|
| Time-based | Partition by date/month/year | Event data, logs, time-series (most common) |
| Hash-based | Partition by hash of a column (distributes evenly) | Uniform distribution across partitions |
| List-based | Partition by specific values (country, category) | Regional data, categorical data |
| Range-based | Partition by value ranges | Numeric IDs, price ranges |
-- BigQuery: partitioned and clustered table
CREATE TABLE analytics.events
PARTITION BY DATE(event_timestamp)
CLUSTER BY user_id, event_type
AS SELECT * FROM raw.events;
-- Query that benefits from partitioning (only reads January data):
SELECT user_id, COUNT(*) as event_count
FROM analytics.events
WHERE event_timestamp BETWEEN '2025-01-01' AND '2025-01-31'
AND event_type = 'purchase'
GROUP BY user_id;
-- Without partitioning: scans entire table
-- With partitioning: scans only January partition
-- With clustering: further reduces data scanned within the partition
Data Compaction¶
Over time, small files accumulate in data lakes (from streaming inserts, frequent updates). Small files degrade query performance because each file requires a metadata lookup and I/O operation.
Solution: Periodic compaction (merge small files into larger ones):
-- Delta Lake compaction
OPTIMIZE delta.`s3://data-lake/events/`
WHERE event_date >= '2025-01-01';
-- Z-ordering (co-locate related data for faster queries)
OPTIMIZE delta.`s3://data-lake/events/`
ZORDER BY (user_id, event_type);
Data Quality¶
Ensuring data is accurate, complete, consistent, and timely is critical—bad data leads to bad decisions.
| Dimension | Description | Check Example |
|---|---|---|
| Completeness | No missing required values | NOT NULL checks, row count thresholds |
| Accuracy | Values are correct | Range checks (age 0-150), referential integrity |
| Consistency | Same data across systems | Cross-source record count comparisons |
| Timeliness | Data is up-to-date | Freshness checks (last updated < 1 hour ago) |
| Uniqueness | No unintended duplicates | Primary key uniqueness tests, deduplication |
| Validity | Data conforms to expected format/range | Email format, date format, enum values |
Data Quality Tools¶
| Tool | Approach | Integration |
|---|---|---|
| dbt tests | SQL assertions in YAML | Built into dbt workflow |
| Great Expectations | Python library for data validation | Standalone, integrates with Airflow/Spark |
| Soda | SQL-based checks with YAML config | Standalone, integrates with orchestrators |
| Monte Carlo | ML-based anomaly detection | Managed service, observability for data |
| Elementary | dbt-native data observability | Extends dbt with anomaly detection |
# Great Expectations example
import great_expectations as gx
context = gx.get_context()
# Define expectations for a dataset
validator = context.sources.pandas_default.read_csv("orders.csv")
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=100000)
validator.expect_column_values_to_be_in_set("status", ["pending", "completed", "cancelled"])
validator.expect_table_row_count_to_be_between(min_value=1000, max_value=1000000)
# Run validation
results = validator.validate()
if not results.success:
raise ValueError(f"Data quality check failed: {results}")
Data Governance and Cataloging¶
Data Governance¶
Data governance is the framework for managing data availability, usability, integrity, and security:
| Practice | Description | Tools |
|---|---|---|
| Data catalog | Searchable inventory of all data assets with metadata | DataHub, Amundsen, Atlan, Alation |
| Data lineage | Track where data comes from and how it's transformed | dbt lineage, OpenLineage, Marquez |
| Access control | Who can read/write which datasets | Unity Catalog, Lake Formation, IAM policies |
| PII management | Identify and protect personally identifiable information | Column-level tagging, masking, tokenization |
| Data retention | Policies for how long data is kept | Lifecycle policies, automated deletion |
| Schema registry | Enforce and evolve data schemas | Confluent Schema Registry, AWS Glue Schema Registry |
Schema Evolution¶
As data sources change, schemas must evolve without breaking downstream consumers:
| Change Type | Risk Level | Handling |
|---|---|---|
| Add column | Low (backward compatible) | Default to NULL, update downstream models |
| Remove column | High (breaks consumers reading that column) | Deprecate first, then remove after migration |
| Rename column | High (breaks consumers) | Add new column, populate, deprecate old |
| Change type | Medium-High | Add new column with new type, migrate consumers |
Avro schema evolution (used with Kafka): Producers can add fields with defaults, and consumers with the old schema will simply ignore the new fields. The Schema Registry enforces compatibility rules (backward, forward, full) to prevent breaking changes.
Real-World Data Pipeline Architecture¶
Example: E-commerce analytics pipeline
Data Sources:
PostgreSQL (orders, users, products) ──── CDC (Debezium) ──┐
Stripe API (payments) ──── Fivetran ─────────┤
Segment (website events) ──── Webhook ──────────┤
Salesforce (CRM) ──── Fivetran ─────────┤
│
▼
Kafka (real-time events) ──────────────────────────→ Data Lake (S3)
│ │
│ │ Airflow orchestrates daily
▼ ▼
Flink (real-time fraud dbt transforms raw → staging → marts
detection, live dashboards) │
▼
Data Warehouse (Snowflake)
│
┌──────────────────┼──────────────────┐
▼ ▼ ▼
Looker (BI) ML Training Reverse ETL
Dashboards (feature store) (sync to Salesforce,
marketing tools)
Key architectural decisions: - Kafka as the central event bus: decouples producers from consumers, enables real-time AND batch processing from the same data - S3 data lake as the source of truth: cheap, durable, format-agnostic - dbt for transformations: SQL-based, tested, documented, version-controlled - Snowflake for serving: fast analytical queries, per-query pricing, zero maintenance - Reverse ETL: Push aggregated data back to operational tools (sync customer scores to Salesforce, send cohort data to marketing platforms)