Google Colab에서 실습하기

Touch the Original and There’s No Going Back

In Part 1 , we established the Bronze layer principle. Data from source systems is stored without any transformation. No type casting, no column renaming.

The principle is simple, but sticking to it in practice is hard. The temptation arises: “The date column is a string – can’t I just cast it to DATE on the way in?” No. If you change types in Bronze, you lose the ability to restore the original. When the source system sends an invalid date like "2026-02-30", casting to DATE either throws an error or silently converts it to NULL. You lose track of what the original value was.

Bronze is insurance. Even if your Silver transformation logic has a bug, or the source system suddenly changes its schema, you can start over from Bronze. Give up this insurance, and every time something goes wrong, you’ll need to pull data from the source system again. There’s no guarantee the source system owner will be cooperative.

Full Load and Incremental Load

There are two main ways to load data into Bronze.

Full Load fetches the entire source table and overwrites it every time. Simple. Since what’s in the source is exactly what’s in Bronze, there’s no data consistency headache. The downside is that costs grow as data grows. If the orders table has 100 million rows but only 10,000 new orders come in daily, you’re re-fetching the same 99.99 million rows that haven’t changed.

Incremental Load fetches only the data that changed since the last load. Efficient – you only bring 10,000 rows. But complex. You need to decide how to determine “since the last load” and how to detect deleted records.

Which to use depends on the table’s characteristics.

AspectFull LoadIncremental Load
Implementation difficultyLowHigh
Network / costProportional to data sizeProportional to changes
Delete detectionAutomatic (full overwrite)Requires separate handling
Best forCode tables, small mastersHigh-volume transactions

In practice, you use both. Small tables like code tables or product masters get Full Load for simplicity. High-volume tables like orders, logs, and events get Incremental Load to bring only the changes.

How to Define the Increment

The most critical decision in Incremental Load is the criterion for “what has changed.” Three approaches are commonly used.

Timestamp-based. If the source table has a column like updated_at, this is the simplest approach. You fetch only rows newer than the last load timestamp. One condition: the source system must honestly update the modification timestamp. Surprisingly many systems UPDATE data without touching updated_at.

Auto-incrementing key-based. If there’s a monotonically increasing PK like order_id, you fetch rows after the last loaded ID. This catches INSERTs but misses UPDATEs. Best suited for log-style tables where the ID never changes once issued.

CDC (Change Data Capture). Read the source database’s change log directly. Tools like Debezium capture MySQL or PostgreSQL WAL (Write-Ahead Log) and catch all INSERTs, UPDATEs, and DELETEs. Most accurate, but requires separate infrastructure.

Timestamp-based:        WHERE updated_at > 'last_load_timestamp'
Auto-incrementing key:  WHERE order_id > last_loaded_id
CDC:                    Capture database change logs

Comparing Both Approaches in DuckDB

We continue from the environment set up in Part 1 .

import duckdb

conn = duckdb.connect('warehouse.duckdb')

Full Load Simulation

Full Load is straightforward. Drop the existing data and reload everything.

# Simulate a source data change scenario
# In practice, you'd SELECT * from the source system

conn.execute("""
-- Full Load: replace entirely
CREATE OR REPLACE TABLE bronze.orders AS
SELECT *
FROM read_csv_auto(
  'https://raw.githubusercontent.com/dbt-labs/jaffle_shop/main/seeds/raw_orders.csv'
);
""")

print("Full Load complete:",
      conn.execute("SELECT count(*) FROM bronze.orders").fetchone()[0], "rows")

CREATE OR REPLACE TABLE is the key. The table is recreated every time. Previous data is gone, and the current state of the source is loaded as-is.

Incremental Load Simulation

Incremental Load requires one extra step. You need to remember where you left off.

# Watermark table: records the last load point
conn.execute("""
CREATE TABLE IF NOT EXISTS bronze.watermarks (
    table_name VARCHAR PRIMARY KEY,
    last_loaded_id INTEGER,
    last_loaded_at TIMESTAMP DEFAULT current_timestamp
);
""")

# Check current watermark
watermark = conn.execute("""
SELECT COALESCE(last_loaded_id, 0)
FROM bronze.watermarks
WHERE table_name = 'orders'
""").fetchone()

last_id = watermark[0] if watermark else 0
print(f"Last loaded ID: {last_id}")
# Incremental load: only fetch data after last_id
conn.execute(f"""
INSERT INTO bronze.orders
SELECT *
FROM read_csv_auto(
  'https://raw.githubusercontent.com/dbt-labs/jaffle_shop/main/seeds/raw_orders.csv'
)
WHERE id > {last_id};
""")

# Update watermark
conn.execute("""
INSERT OR REPLACE INTO bronze.watermarks (table_name, last_loaded_id, last_loaded_at)
SELECT 'orders', MAX(id), current_timestamp
FROM bronze.orders;
""")

print("Incremental Load complete")

The watermarks table is the heart of incremental loading. It records how far you’ve loaded, and the next run fetches only what comes after. This pattern is called the High Watermark pattern.

Adding Metadata Columns

If you only store the raw data in Bronze, you’ll eventually face questions you can’t answer. “When was this data loaded?” “Which source did it come from?”

Keep the original columns intact and add metadata columns alongside them.

conn.execute("""
CREATE OR REPLACE TABLE bronze.orders_with_meta AS
SELECT
    *,
    current_timestamp AS _loaded_at,
    'jaffle_shop' AS _source_system,
    'full' AS _load_type
FROM read_csv_auto(
  'https://raw.githubusercontent.com/dbt-labs/jaffle_shop/main/seeds/raw_orders.csv'
);
""")

conn.execute("SELECT * FROM bronze.orders_with_meta LIMIT 3").fetchdf()

_loaded_at, _source_system, _load_type. The underscore prefix distinguishes them from original columns. The source might have its own loaded_at column, after all.

With these metadata columns, when something goes wrong in Silver transformation, you can narrow down the scope: “Data loaded up to this point was fine; it’s the data after that which is problematic.” The deduplication pattern using _loaded_at in Part 3 also starts here.

Loading Patterns Summary

Here’s a summary of Bronze loading patterns.

PatternTargetImplementation
Full Load (overwrite)Code tables, small mastersCREATE OR REPLACE TABLE
Full Load (snapshot)When daily state history is neededUse load date as partition key
Incremental (timestamp)Tables with updated_atWHERE updated_at > watermark
Incremental (auto-increment key)Logs, events, ordersWHERE id > watermark
CDCWhen delete detection is neededDebezium + Kafka

There’s one more Full Load variant: the snapshot approach. Instead of overwriting, you store each day’s full state separately, partitioned by load date. Useful when you want to compare yesterday’s product master state with today’s. It consumes more storage, but as we discussed in Part 1 , storage costs in cloud environments are practically negligible.

Practical Reference: Bronze Loading with Airflow

When you implement Bronze loading as an Airflow DAG, you can separate tasks per table based on Full Load vs. Incremental Load.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import duckdb

def load_full(table_name, source_url, **context):
    """Full Load: complete replacement"""
    conn = duckdb.connect('warehouse.duckdb')
    conn.execute(f"""
        CREATE OR REPLACE TABLE bronze.{table_name} AS
        SELECT *, current_timestamp AS _loaded_at,
               '{table_name}' AS _source_system, 'full' AS _load_type
        FROM read_csv_auto('{source_url}')
    """)
    conn.close()

def load_incremental(table_name, source_url, key_column, **context):
    """Incremental Load: only after watermark"""
    conn = duckdb.connect('warehouse.duckdb')
    wm = conn.execute(f"""
        SELECT COALESCE(last_loaded_id, 0)
        FROM bronze.watermarks WHERE table_name = '{table_name}'
    """).fetchone()
    last_id = wm[0] if wm else 0

    conn.execute(f"""
        INSERT INTO bronze.{table_name}
        SELECT *, current_timestamp AS _loaded_at
        FROM read_csv_auto('{source_url}')
        WHERE {key_column} > {last_id}
    """)
    conn.close()

with DAG(
    dag_id='bronze_ingestion',
    schedule='0 5 * * *',
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag:

    # Small master → Full Load
    load_customers = PythonOperator(
        task_id='load_customers_full',
        python_callable=load_full,
        op_kwargs={'table_name': 'customers', 'source_url': '...'},
    )

    # High-volume transactions → Incremental Load
    load_orders = PythonOperator(
        task_id='load_orders_incremental',
        python_callable=load_incremental,
        op_kwargs={
            'table_name': 'orders',
            'source_url': '...',
            'key_column': 'id',
        },
    )

    # Parallel execution — no dependencies between tables
    [load_customers, load_orders]

Small masters use load_full, high-volume transactions use load_incremental. Functions are split by table characteristics. Since there are no dependencies between tables, Airflow runs them in parallel.

The next post covers the Silver layer. The process of cleansing and standardizing the raw data we’ve stacked in Bronze. This is where dbt starts to shine.

Google Colab에서 실습하기