Building robust data pipelines with Automated DDL changes handling using Snowflake and dbt

Building robust data pipelines with Automated DDL changes handling using Snowflake and dbt

In most scenarios, the data engineering team creates processes that import data in batches from transactional database sources into a central analytical database. We also usually keep the history of such tables by implementing SCD-2 to keep track of the changes happening for each record.

One frequent error that occurs in these data pipelines and ELT jobs is when there are changes in schemas that are happening to the transactional source database. So it's quite common to see errors like:

Number of columns in source (=25) and destination (=26) table differs

In this post, we see how the we, the BI data engineering team at New Work SE, implement data pipelines that can handle such changes automatically.

Ingesting data into Snowflake

Every table is exported from source transactional database into AWS S3 bucket using a Spark job. In our case, we chose to have the tables stored in Parquet file format in the S3 bucket.

Before ingesting the files into Snowflake using a COPY INTO statement, we use Snowflake's INFER_SCHEMA function to recreate the table in our staging database in Snowflake:

CREATE OR REPLACE TABLE DATABASE.SCHEMA.TABLE
  USING TEMPLATE (
    SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
      FROM TABLE(
    INFER_SCHEMA(
      LOCATION=>'@aws_stage/path/to/todays/snapshot/of/table'--path to aws s3 bucket stage
      , FILE_FORMAT=>'PARQUET_FILE_FORMAT'--file format used
      , IGNORE_CASE=>TRUE
      )
    ));

This guarantees that the COPY INTO statement will succeed regardless of recent changes in the DDL in the transactional database.

Tables historization using SCD-2 in dbt & Snowflake

Dbt offers SCD-2 historization out of the box using dbt snapshots. I'm not going into the details of how to implement it in your dbt project and what are the different configurations as it is already very well described in the official documentation. However, we're going to focus here on how to make snapshots handle DDL changes in the source tables automatically. We will be using the following "ORDER_STATUS" table as an example:

With the goal to end up with the following DAG in dbt:

We use the following snapshot sql file in dbt:

{# dbt snapshot that uses generate_raw_snapshot macro instead of a select statement#}

{%- snapshot archive_exasol_analytics__order_status_snapshot -%}
{%- set key_cols = ['id'] -%} {# list of the columns that make up the key columns. In our case, it's the ID column#}
{{-
    config(
    unique_key='dbt_unique_sk', {#this unique key is generated by hashing all columns that make up the key cols list in the macro generate_raw_snapshot#}
    target_schema='snapshots',
    strategy='check',
    check_cols=['dbt_hashdiff'],{#dbt_hashdiff column is generated in the generate_raw_snapshot by creating a hash for all the columns that are not in the key_cols list#}
    )
-}}
{{- generate_raw_snapshot(key_cols) -}} {#calling the custom macro#}
{%- endsnapshot -%}

When we execute the snapshot in dbt, we get the following output:

We can see that dbt automatically generated the "DBT_UNIQUE_SK" & "DBT_HASHDIFF" columns automatically through the macro and "DBT_VALID_FROM" & "DBT_VALID_TO" are set correctly as well for the initial run.

Next, we will introduce a new column to the source table "COMMENTS":

Adding new columns to the source table

And we now re-execute the dbt snapshot to get the following output in the snapshot table:

We can already see that dbt automatically created the new "COMMENTS" column and since the "DBT_HASHDIFF" column was changed due to the introduction of the new column, it resulted in new versions of the rows we already had.

Removing columns from the source table

In order to test the effects of removing a column in the source table, we will again remove the "COMMENTS" column and re-execute the dbt snapshot for the third time. Now we get the following results in the snapshot table:

We see that the currently valid rows (with DBT_VALID_TO equals null) all have null values in the COMMENTS column since it no longer existed in our source table. Also, for the same reason, the DBT_HASHDIFF column values has been changed which resulted in adding the new versions for each value we had.

Conclusion

We have seen how we can utilize Snowflake's infer schema's functionality and dbt's snapshots customizability in order to create data pipelines that can handle some DDL changes such as adding or removing columns from source tables.

image reference:

cover image source