Skip to content

Data Engineering Fundamentals

Data engineering is the practice of designing, building, and maintaining systems that collect, store, transform, and serve data. While database design (Chapter 5) covers individual database systems, data engineering focuses on data pipelines, warehousing, and the movement of data between systems at scale.

ETL vs. ELT

The two fundamental approaches to data processing differ in when transformation happens:

ETL (Extract, Transform, Load):
  Source → Extract → Transform (in pipeline) → Load → Data Warehouse
  Traditional approach: transform before storing.

ELT (Extract, Load, Transform):
  Source → Extract → Load → Data Warehouse → Transform (in warehouse)
  Modern approach: load raw data, transform using warehouse compute.
Aspect ETL ELT
Transform location External pipeline (Spark, Airflow, custom code) Inside the data warehouse (dbt, SQL)
Data in warehouse Pre-processed, clean, structured Raw (landing zone) + transformed (marts)
Flexibility Changes require pipeline code updates and redeployment Transform logic changes are just SQL (dbt models)
Speed to insight Slower (must design transforms upfront) Faster (raw data available immediately for ad-hoc analysis)
Compute cost Compute cost in pipeline infrastructure Compute cost in warehouse (pay for query processing)
Data lineage Harder to track (transformations in code) Easier (dbt provides lineage graphs, SQL is declarative)
Modern preference Still used for complex transformations, streaming Preferred for modern cloud warehouses

Why ELT won: Cloud data warehouses (Snowflake, BigQuery) provide virtually unlimited, elastic compute. It's cheaper and more flexible to store raw data and transform it on-demand than to pre-process everything. ELT also makes it easy to re-transform data when business logic changes—just re-run the SQL, no pipeline changes needed.

When ETL is still the right choice: When transformations require complex logic (ML feature engineering, image processing), when data must be cleaned before it enters the warehouse for compliance reasons, or when working with streaming data that needs real-time transformation.

Data Storage Architecture

┌─────────────────────── Modern Data Architecture ───────────────────────┐
│                                                                        │
│  ┌─── Data Sources ──────┐                                            │
│  │  Databases (OLTP)      │                                            │
│  │  APIs (REST, GraphQL)  │──── Ingestion ────┐                       │
│  │  Event Streams (Kafka) │    (Fivetran,     │                       │
│  │  Files (CSV, JSON)     │     Airbyte,      │                       │
│  │  SaaS Apps (Salesforce)│     Debezium)     │                       │
│  └────────────────────────┘                   │                       │
│                                                ▼                       │
│                                   ┌─── Data Lake / Lakehouse ──┐      │
│                                   │  Raw data (any format)      │      │
│                                   │  Cheap storage (S3, GCS)    │      │
│                                   │  Schema-on-read             │      │
│                                   └──────────┬──────────────────┘      │
│                                              │                         │
│                              Transformation (dbt, Spark)               │
│                                              │                         │
│                                              ▼                         │
│                                   ┌─── Data Warehouse ─────────┐      │
│                                   │  Structured, optimized      │      │
│                                   │  for analytical queries     │      │
│                                   │  (Snowflake, BigQuery,      │      │
│                                   │   Redshift, Databricks)     │      │
│                                   └──────────┬──────────────────┘      │
│                                              │                         │
│                                              ▼                         │
│                                   ┌─── Consumption Layer ──────┐      │
│                                   │  BI Tools (Tableau, Looker) │      │
│                                   │  ML Pipelines               │      │
│                                   │  Reverse ETL to SaaS        │      │
│                                   │  Operational Analytics       │      │
│                                   └─────────────────────────────┘      │
└────────────────────────────────────────────────────────────────────────┘

Data Warehouse

A centralized repository optimized for analytical queries (OLAP). Uses columnar storage, compression, and massive parallel processing (MPP). Unlike OLTP databases (optimized for many small transactions), data warehouses are optimized for fewer, complex analytical queries over large datasets.

Warehouse Provider Key Features
Snowflake Independent Separation of storage and compute (scale independently), near-zero maintenance, cross-cloud, time travel (query historical data), data sharing
BigQuery Google Serverless (no infrastructure to manage), pay-per-query, petabyte-scale, built-in ML (BigQuery ML), streaming inserts
Redshift AWS Columnar PostgreSQL-based, tight AWS integration, Redshift Spectrum (query S3 directly), materialized views
Databricks Independent Lakehouse architecture (unify lake + warehouse), Spark-based, Unity Catalog for governance, Delta Lake format

Why columnar storage matters:

Row-oriented storage (OLTP — PostgreSQL, MySQL):
  Row 1: [id=1, name="Alice", email="alice@co.com", age=30, city="NYC"]
  Row 2: [id=2, name="Bob",   email="bob@co.com",   age=25, city="SF"]
  → Fast for: SELECT * FROM users WHERE id = 1 (reads one row)
  → Slow for: SELECT AVG(age) FROM users (must read all columns to get age)

Columnar storage (OLAP — Snowflake, BigQuery, Redshift):
  Column "id":    [1, 2, 3, 4, 5, ...]
  Column "name":  ["Alice", "Bob", "Carol", ...]
  Column "age":   [30, 25, 35, ...]
  → Fast for: SELECT AVG(age) FROM users (reads only the age column)
  → Excellent compression (similar values stored together — [30, 25, 35] compresses well)
  → Slow for: SELECT * FROM users WHERE id = 1 (must read from every column)

Data Lake

A storage repository that holds raw data in its native format (structured, semi-structured, unstructured). Built on cheap object storage (S3, GCS, Azure Blob).

Data Lake vs. Data Warehouse:

Aspect Data Lake Data Warehouse
Schema Schema-on-read (flexible, no enforcement) Schema-on-write (enforced before loading)
Data types Any (JSON, Parquet, images, logs, video) Structured (tables with defined columns)
Cost Low (object storage: ~$0.023/GB/month on S3) Higher (compute-optimized storage + processing)
Query speed Variable (depends on format and engine) Optimized (columnar, indexed, cached)
Users Data engineers, data scientists Business analysts, BI users
Risk Can become a "data swamp" without governance Governed by schema enforcement

Data Lakehouse

A hybrid combining the flexibility of data lakes with the performance and ACID guarantees of data warehouses. Technologies like Delta Lake, Apache Iceberg, and Apache Hudi add table formats on top of data lakes, providing:

  • ACID transactions: Atomic, consistent reads and writes on object storage
  • Schema enforcement and evolution: Add, rename, drop columns without rewriting data
  • Time travel: Query data as it existed at a previous point in time
  • Upserts and deletes: Modify existing records (impossible on raw object storage)
  • Partition evolution: Change partitioning without rewriting data (Iceberg)
-- Delta Lake example (Spark SQL)
-- Create a table backed by S3
CREATE TABLE events
USING DELTA
LOCATION 's3://my-lake/events/'
AS SELECT * FROM raw_events;

-- Time travel: query data as of yesterday
SELECT * FROM events TIMESTAMP AS OF '2025-01-14';

-- Upsert (merge): update existing rows, insert new ones
MERGE INTO events target
USING new_events source
ON target.event_id = source.event_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Data File Formats

Choosing the right file format dramatically affects query performance and storage cost:

Format Type Compression Schema Query Performance Use Case
CSV Row-based, text Poor No Slow (no predicate pushdown) Data exchange, small datasets
JSON Row-based, text Poor Self-describing Slow APIs, logs, small datasets
Parquet Columnar, binary Excellent (snappy, gzip, zstd) Embedded Fast (predicate pushdown, column pruning) Default for analytics
ORC Columnar, binary Excellent Embedded Fast Hive ecosystem
Avro Row-based, binary Good Embedded (schema evolution) Medium Kafka, data pipelines, streaming

Why Parquet is the standard: Parquet combines columnar storage (read only needed columns), embedded statistics (skip entire row groups based on min/max values), and excellent compression (similar values in a column compress well). A 10GB CSV file might be 1GB as Parquet, and queries that read 3 of 50 columns process 95% less data.

# Working with Parquet in Python
import pandas as pd

# Write Parquet (with compression)
df.to_parquet('data.parquet', compression='snappy', index=False)

# Read Parquet (only specific columns — skips rest on disk)
df = pd.read_parquet('data.parquet', columns=['user_id', 'event_type', 'timestamp'])

# Read Parquet with predicate pushdown (filter happens at storage level)
import pyarrow.parquet as pq
table = pq.read_table('data.parquet',
    columns=['user_id', 'amount'],
    filters=[('amount', '>', 100), ('country', '=', 'US')]
)

Data Modeling for Analytics

Dimensional Modeling

Dimensional modeling (by Ralph Kimball) organizes data into fact tables (measurements/events) and dimension tables (descriptive context). This is the standard approach for data warehouses and BI.

Star Schema:

                 ┌──────────────┐
                 │ dim_product  │
                 │──────────────│
                 │ product_id   │
                 │ name         │
                 │ category     │
                 │ brand        │
                 └──────┬───────┘
                        │
┌──────────────┐  ┌─────┴──────────┐  ┌──────────────┐
│ dim_customer │──│  fact_orders   │──│ dim_date     │
│──────────────│  │────────────────│  │──────────────│
│ customer_id  │  │ order_id       │  │ date_key     │
│ name         │  │ customer_id FK │  │ date         │
│ email        │  │ product_id  FK │  │ day_of_week  │
│ segment      │  │ date_key    FK │  │ month        │
│ country      │  │ quantity       │  │ quarter      │
└──────────────┘  │ amount         │  │ year         │
                  │ discount       │  │ is_weekend   │
                  └────────────────┘  └──────────────┘

Star schema (fact table surrounded by dimension tables): Simple, fast queries, widely used.

Snowflake schema (dimensions normalized into sub-dimensions): Reduces data duplication but requires more JOINs.

Fact table types: - Transaction facts: One row per event (order placed, payment processed) - Periodic snapshot facts: One row per entity per time period (daily account balance) - Accumulating snapshot facts: One row per process, updated as stages complete (order lifecycle: placed → shipped → delivered)

Slowly Changing Dimensions (SCD)

How to handle dimensions that change over time (e.g., a customer moves from NYC to SF):

Type Description Implementation
SCD Type 1 Overwrite old value Simply UPDATE the row. History is lost.
SCD Type 2 Add new row with effective dates Add valid_from, valid_to, is_current columns. Preserves full history.
SCD Type 3 Add column for previous value Add previous_city column. Only tracks one change.
-- SCD Type 2 implementation
-- Customer dimension with history tracking:

-- When Alice moves from NYC to SF:
-- 1. Close the current record
UPDATE dim_customer
SET valid_to = '2025-01-15', is_current = FALSE
WHERE customer_id = 123 AND is_current = TRUE;

-- 2. Insert new record
INSERT INTO dim_customer (customer_id, name, city, valid_from, valid_to, is_current)
VALUES (123, 'Alice', 'SF', '2025-01-15', '9999-12-31', TRUE);

-- Query: What was Alice's city when she placed order #456?
SELECT c.city
FROM fact_orders o
JOIN dim_customer c ON o.customer_id = c.customer_id
    AND o.order_date BETWEEN c.valid_from AND c.valid_to
WHERE o.order_id = 456;

Data Ingestion

Change Data Capture (CDC)

CDC captures changes (INSERT, UPDATE, DELETE) from source databases and streams them to downstream systems in real-time or near-real-time. This is far more efficient than full-table dumps.

CDC Method Description Latency Impact on Source
Log-based Read the database's write-ahead log (WAL/binlog) Seconds Minimal (reads log, no query load)
Query-based Periodically query for changes (WHERE updated_at > last_run) Minutes Medium (adds query load, misses deletes)
Trigger-based Database triggers write changes to a changelog table Real-time High (triggers add overhead to every write)

Debezium is the leading open-source CDC tool. It reads database transaction logs and streams changes to Kafka:

// Debezium CDC event (Kafka message for an UPDATE)
{
  "before": {
    "id": 123,
    "name": "Alice",
    "email": "alice@old.com"
  },
  "after": {
    "id": 123,
    "name": "Alice",
    "email": "alice@new.com"
  },
  "source": {
    "db": "users_db",
    "table": "users",
    "ts_ms": 1705000000000
  },
  "op": "u"  // c=create, u=update, d=delete, r=read (snapshot)
}

Ingestion Tools

Tool Type Description
Fivetran Managed ELT Automated connectors for 300+ sources (SaaS, databases). Zero-maintenance.
Airbyte Open-source ELT Self-hosted or cloud. Growing connector library.
Debezium CDC Log-based CDC for PostgreSQL, MySQL, MongoDB → Kafka
Apache Kafka Streaming platform Distributed event streaming for real-time ingestion
AWS DMS Managed CDC Database Migration Service — CDC from/to databases and warehouses
Stitch Managed ETL Simple, managed data pipeline (Talend)

Batch Processing vs. Stream Processing

Aspect Batch Processing Stream Processing
Data handling Process data in bounded chunks (hourly, daily) Process data continuously as it arrives
Latency Minutes to hours Milliseconds to seconds
Data model Bounded datasets (files, tables) Unbounded streams (events, messages)
Complexity Simpler (clear start/end, retry entire batch on failure) More complex (ordering, late arrivals, exactly-once semantics)
State management Stateless (each batch is independent) or checkpoint between batches Stateful (maintain running aggregations, windows)
Cost Often cheaper (scheduled resources, process during off-peak) More expensive (always-on infrastructure)
Use cases Daily reports, ML training, historical analysis, data warehouse loading Real-time dashboards, fraud detection, alerting, IoT
Tools Apache Spark (batch), dbt, Airflow Kafka Streams, Apache Flink, Spark Structured Streaming

Apache Spark

Spark is the most widely used engine for large-scale batch (and streaming) data processing. It distributes computation across a cluster and processes data in memory.

# PySpark example: analyze user behavior
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("UserAnalytics") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Read from data lake (Parquet on S3)
events = spark.read.parquet("s3://data-lake/events/")
users = spark.read.parquet("s3://data-lake/users/")

# Transformation pipeline
daily_active_users = (
    events
    .filter(F.col("event_date") >= "2025-01-01")
    .filter(F.col("event_type") == "page_view")
    .groupBy(F.col("event_date"), F.col("user_id"))
    .agg(F.count("*").alias("page_views"))
    .join(users, "user_id")
    .groupBy("event_date", "country")
    .agg(
        F.countDistinct("user_id").alias("dau"),
        F.sum("page_views").alias("total_views"),
        F.avg("page_views").alias("avg_views_per_user"),
    )
    .orderBy("event_date", "country")
)

# Write results to warehouse
daily_active_users.write \
    .mode("overwrite") \
    .partitionBy("event_date") \
    .parquet("s3://data-lake/analytics/dau/")

Apache Kafka

Kafka is a distributed event streaming platform used as the central nervous system for real-time data pipelines:

Producer → Kafka Cluster → Consumer

Kafka Cluster:
┌─── Topic: "user-events" (partitioned) ─────────────────┐
│                                                         │
│  Partition 0: [e1] [e4] [e7] [e10] ...                 │
│  Partition 1: [e2] [e5] [e8] [e11] ...                 │
│  Partition 2: [e3] [e6] [e9] [e12] ...                 │
│                                                         │
│  Each partition is an ordered, immutable log.            │
│  Messages are retained for a configurable period         │
│  (e.g., 7 days) regardless of whether they've been read. │
└─────────────────────────────────────────────────────────┘

Key Kafka concepts:

Concept Description
Topic Named stream of records (like a table). Messages are published to topics.
Partition A topic is split into partitions for parallelism. Each partition is an ordered log.
Offset Position of a message within a partition. Consumers track their offset.
Consumer Group Multiple consumers sharing a topic. Each partition is read by exactly one consumer in the group.
Replication Each partition is replicated across brokers (typically 3x) for fault tolerance.
Retention Messages are retained by time (7 days default) or size, not by consumption. Multiple consumers can read the same data.
# Kafka producer (Python with confluent-kafka)
from confluent_kafka import Producer
import json

producer = Producer({'bootstrap.servers': 'kafka:9092'})

def send_event(topic, event):
    producer.produce(
        topic=topic,
        key=event['user_id'].encode(),  # Key determines partition
        value=json.dumps(event).encode(),
    )
    producer.flush()

send_event('user-events', {
    'user_id': 'u123',
    'event_type': 'purchase',
    'amount': 99.99,
    'timestamp': '2025-01-15T10:30:00Z'
})

Flink is the most capable stream processing engine, supporting event time processing, exactly-once semantics, and complex event processing:

# PyFlink example: real-time fraud detection
from pyflink.table import EnvironmentSettings, TableEnvironment

t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())

# Define source (Kafka)
t_env.execute_sql("""
    CREATE TABLE transactions (
        user_id STRING,
        amount DECIMAL(10, 2),
        merchant STRING,
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'transactions',
        'properties.bootstrap.servers' = 'kafka:9092',
        'format' = 'json'
    )
""")

# Detect suspicious patterns: > 3 transactions in 1 minute
t_env.execute_sql("""
    SELECT user_id, COUNT(*) as tx_count, SUM(amount) as total
    FROM transactions
    GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' MINUTE)
    HAVING COUNT(*) > 3
""")

Windowing in Stream Processing

When processing unbounded streams, windows group events into bounded chunks for aggregation:

Window Type Description Use Case
Tumbling Fixed-size, non-overlapping (every 5 minutes) Hourly report, 5-minute metrics
Sliding Fixed-size, overlapping (5-min window, slides every 1 min) Moving average, rate calculation
Session Dynamic size based on activity gap (close after 30 min of inactivity) User session analytics
Global Single window for all time Running totals, cardinality estimation
Tumbling Window (5 min):
  |──Window 1──|──Window 2──|──Window 3──|
  00:00    00:05    00:05    00:10    00:10    00:15

Sliding Window (5 min, slide 1 min):
  |──Window 1──────|
     |──Window 2──────|
        |──Window 3──────|
  00:00  00:01  00:02  00:05  00:06  00:07

Session Window (30 min gap):
  |events|..gap..|events  events|...30 min gap...|events|
  |─Session 1────────────────────|               |─Session 2─|

Workflow Orchestration

Orchestration tools manage complex data pipelines—scheduling tasks, handling dependencies, retries, and monitoring.

Tool Language Key Features
Apache Airflow Python Industry standard, DAG-based, extensive operator ecosystem, 2000+ integrations
Dagster Python Software-defined assets, built-in data quality, observability, modern UI
Prefect Python Pythonic, hybrid execution model, dynamic workflows, easier than Airflow
dbt SQL/Jinja SQL-based transforms, testing, documentation, lineage, the standard for ELT transforms
Temporal Multi-language Workflow-as-code, durable execution, fault-tolerant, not just for data

Apache Airflow

Airflow defines workflows as Directed Acyclic Graphs (DAGs) — each node is a task, and edges define dependencies.

# Apache Airflow DAG example
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
    'owner': 'data-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'email_on_failure': True,
    'email': ['data-oncall@company.com'],
}

with DAG(
    dag_id='daily_user_metrics',
    default_args=default_args,
    schedule_interval='@daily',
    start_date=days_ago(1),
    catchup=False,
    tags=['metrics', 'users'],
    doc_md="""
    ## Daily User Metrics Pipeline
    Extracts user events, computes daily metrics, and loads into warehouse.
    **Owner**: data-team | **SLA**: Complete by 6:00 UTC
    """,
) as dag:

    extract = PythonOperator(
        task_id='extract_user_events',
        python_callable=extract_user_events,
        execution_timeout=timedelta(minutes=30),
    )

    transform = PythonOperator(
        task_id='compute_metrics',
        python_callable=compute_daily_metrics,
    )

    load = PostgresOperator(
        task_id='load_to_warehouse',
        postgres_conn_id='warehouse',
        sql='sql/load_user_metrics.sql',
    )

    validate = PythonOperator(
        task_id='validate_metrics',
        python_callable=validate_row_counts,  # Check that output is reasonable
    )

    extract >> transform >> load >> validate  # Define task dependencies

dbt (Data Build Tool)

dbt is the standard for SQL-based transformations in the ELT paradigm. It lets you write transformations as SQL SELECT statements, and dbt handles materialization, testing, documentation, and lineage.

-- models/staging/stg_orders.sql
-- Staging model: clean and standardize raw data

WITH source AS (
    SELECT * FROM {{ source('ecommerce', 'raw_orders') }}
),

cleaned AS (
    SELECT
        id AS order_id,
        user_id,
        CAST(amount AS DECIMAL(10, 2)) AS amount,
        status,
        CAST(created_at AS TIMESTAMP) AS ordered_at
    FROM source
    WHERE status IS NOT NULL  -- Remove malformed records
)

SELECT * FROM cleaned
-- models/marts/fct_daily_revenue.sql
-- Business-level metric: daily revenue by product category

{{ config(
    materialized='incremental',
    unique_key='date_day || category',
    partition_by={'field': 'date_day', 'data_type': 'date'}
) }}

SELECT
    DATE_TRUNC('day', o.ordered_at) AS date_day,
    p.category,
    COUNT(DISTINCT o.order_id) AS order_count,
    COUNT(DISTINCT o.user_id) AS unique_customers,
    SUM(o.amount) AS revenue,
    AVG(o.amount) AS avg_order_value
FROM {{ ref('stg_orders') }} o
JOIN {{ ref('stg_products') }} p ON o.product_id = p.product_id
WHERE o.status = 'completed'

{% if is_incremental() %}
    -- Only process new data on incremental runs
    AND o.ordered_at > (SELECT MAX(date_day) FROM {{ this }})
{% endif %}

GROUP BY 1, 2
# models/marts/fct_daily_revenue.yml — dbt tests and documentation
version: 2

models:
  - name: fct_daily_revenue
    description: "Daily revenue metrics by product category"
    columns:
      - name: date_day
        description: "The date of the revenue"
        tests:
          - not_null
      - name: category
        description: "Product category"
        tests:
          - not_null
          - accepted_values:
              values: ['electronics', 'clothing', 'books', 'home']
      - name: revenue
        description: "Total revenue in USD"
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: ">= 0"
      - name: order_count
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: "> 0"

dbt commands:

dbt run                    # Run all models
dbt run --select marts.*   # Run only mart models
dbt test                   # Run all tests
dbt docs generate && dbt docs serve  # Generate and view documentation
dbt source freshness       # Check if source data is up to date

Data Partitioning and Optimization

Partitioning Strategies

Partitioning divides large tables into smaller physical chunks, enabling the query engine to skip irrelevant partitions entirely (partition pruning):

Strategy Description Use Case
Time-based Partition by date/month/year Event data, logs, time-series (most common)
Hash-based Partition by hash of a column (distributes evenly) Uniform distribution across partitions
List-based Partition by specific values (country, category) Regional data, categorical data
Range-based Partition by value ranges Numeric IDs, price ranges
-- BigQuery: partitioned and clustered table
CREATE TABLE analytics.events
PARTITION BY DATE(event_timestamp)
CLUSTER BY user_id, event_type
AS SELECT * FROM raw.events;

-- Query that benefits from partitioning (only reads January data):
SELECT user_id, COUNT(*) as event_count
FROM analytics.events
WHERE event_timestamp BETWEEN '2025-01-01' AND '2025-01-31'
  AND event_type = 'purchase'
GROUP BY user_id;
-- Without partitioning: scans entire table
-- With partitioning: scans only January partition
-- With clustering: further reduces data scanned within the partition

Data Compaction

Over time, small files accumulate in data lakes (from streaming inserts, frequent updates). Small files degrade query performance because each file requires a metadata lookup and I/O operation.

Solution: Periodic compaction (merge small files into larger ones):

-- Delta Lake compaction
OPTIMIZE delta.`s3://data-lake/events/`
WHERE event_date >= '2025-01-01';

-- Z-ordering (co-locate related data for faster queries)
OPTIMIZE delta.`s3://data-lake/events/`
ZORDER BY (user_id, event_type);

Data Quality

Ensuring data is accurate, complete, consistent, and timely is critical—bad data leads to bad decisions.

Dimension Description Check Example
Completeness No missing required values NOT NULL checks, row count thresholds
Accuracy Values are correct Range checks (age 0-150), referential integrity
Consistency Same data across systems Cross-source record count comparisons
Timeliness Data is up-to-date Freshness checks (last updated < 1 hour ago)
Uniqueness No unintended duplicates Primary key uniqueness tests, deduplication
Validity Data conforms to expected format/range Email format, date format, enum values

Data Quality Tools

Tool Approach Integration
dbt tests SQL assertions in YAML Built into dbt workflow
Great Expectations Python library for data validation Standalone, integrates with Airflow/Spark
Soda SQL-based checks with YAML config Standalone, integrates with orchestrators
Monte Carlo ML-based anomaly detection Managed service, observability for data
Elementary dbt-native data observability Extends dbt with anomaly detection
# Great Expectations example
import great_expectations as gx

context = gx.get_context()

# Define expectations for a dataset
validator = context.sources.pandas_default.read_csv("orders.csv")

validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=100000)
validator.expect_column_values_to_be_in_set("status", ["pending", "completed", "cancelled"])
validator.expect_table_row_count_to_be_between(min_value=1000, max_value=1000000)

# Run validation
results = validator.validate()
if not results.success:
    raise ValueError(f"Data quality check failed: {results}")

Data Governance and Cataloging

Data Governance

Data governance is the framework for managing data availability, usability, integrity, and security:

Practice Description Tools
Data catalog Searchable inventory of all data assets with metadata DataHub, Amundsen, Atlan, Alation
Data lineage Track where data comes from and how it's transformed dbt lineage, OpenLineage, Marquez
Access control Who can read/write which datasets Unity Catalog, Lake Formation, IAM policies
PII management Identify and protect personally identifiable information Column-level tagging, masking, tokenization
Data retention Policies for how long data is kept Lifecycle policies, automated deletion
Schema registry Enforce and evolve data schemas Confluent Schema Registry, AWS Glue Schema Registry

Schema Evolution

As data sources change, schemas must evolve without breaking downstream consumers:

Change Type Risk Level Handling
Add column Low (backward compatible) Default to NULL, update downstream models
Remove column High (breaks consumers reading that column) Deprecate first, then remove after migration
Rename column High (breaks consumers) Add new column, populate, deprecate old
Change type Medium-High Add new column with new type, migrate consumers

Avro schema evolution (used with Kafka): Producers can add fields with defaults, and consumers with the old schema will simply ignore the new fields. The Schema Registry enforces compatibility rules (backward, forward, full) to prevent breaking changes.

Real-World Data Pipeline Architecture

Example: E-commerce analytics pipeline

Data Sources:
  PostgreSQL (orders, users, products) ──── CDC (Debezium) ──┐
  Stripe API (payments)               ──── Fivetran ─────────┤
  Segment (website events)            ──── Webhook ──────────┤
  Salesforce (CRM)                    ──── Fivetran ─────────┤
                                                              │
                                                              ▼
  Kafka (real-time events) ──────────────────────────→ Data Lake (S3)
       │                                                    │
       │                                                    │ Airflow orchestrates daily
       ▼                                                    ▼
  Flink (real-time fraud        dbt transforms raw → staging → marts
   detection, live dashboards)         │
                                       ▼
                              Data Warehouse (Snowflake)
                                       │
                    ┌──────────────────┼──────────────────┐
                    ▼                  ▼                   ▼
              Looker (BI)     ML Training        Reverse ETL
              Dashboards      (feature store)    (sync to Salesforce,
                                                  marketing tools)

Key architectural decisions: - Kafka as the central event bus: decouples producers from consumers, enables real-time AND batch processing from the same data - S3 data lake as the source of truth: cheap, durable, format-agnostic - dbt for transformations: SQL-based, tested, documented, version-controlled - Snowflake for serving: fast analytical queries, per-query pricing, zero maintenance - Reverse ETL: Push aggregated data back to operational tools (sync customer scores to Salesforce, send cohort data to marketing platforms)