11{% macro databricks__snapshot_merge_sql(target, source, insert_cols) - %}
2+ {%- set insert_cols_csv = insert_cols | join (' , ' ) - %}
3+ {# Get the hard_deletes configuration from config #}
4+ {%- set hard_deletes = config .get (' hard_deletes' , ' ignore' ) - %}
5+ {%- set invalidate_hard_deletes = (hard_deletes == ' invalidate' ) - %}
6+
7+ {# Get column names configuration #}
28 {%- set columns = config .get (" snapshot_table_column_names" ) or get_snapshot_table_column_names() - %}
39
410 merge into {{ target }} as DBT_INTERNAL_DEST
5- {% if target .is_iceberg %}
6- {# create view only supports a name (no catalog, or schema) #}
7- using {{ source .identifier }} as DBT_INTERNAL_SOURCE
8- {% else %}
9- using {{ source }} as DBT_INTERNAL_SOURCE
10- {% endif %}
11+ {%- if target .is_iceberg %}
12+ {# create view only supports a name (no catalog, or schema) #}
13+ using {{ source .identifier }} as DBT_INTERNAL_SOURCE
14+ {%- else %}
15+ using {{ source }} as DBT_INTERNAL_SOURCE
16+ {%- endif %}
1117 on DBT_INTERNAL_SOURCE.{{ adapter .quote (columns .dbt_scd_id ) }} = DBT_INTERNAL_DEST.{{ adapter .quote (columns .dbt_scd_id ) }}
1218 when matched
13- {% if config .get (" dbt_valid_to_current" ) %}
14- and ( DBT_INTERNAL_DEST.{{ adapter .quote (columns .dbt_valid_to ) }} = {{ config .get (' dbt_valid_to_current' ) }} or
15- DBT_INTERNAL_DEST.{{ adapter .quote (columns .dbt_valid_to ) }} is null )
16- {% else %}
17- and DBT_INTERNAL_DEST.{{ adapter .quote (columns .dbt_valid_to ) }} is null
18- {% endif %}
19- and DBT_INTERNAL_SOURCE.{{ adapter .quote (' dbt_change_type' ) }} in (' update' , ' delete' )
19+ {% - if config .get (" dbt_valid_to_current" ) %}
20+ and ( DBT_INTERNAL_DEST.{{ adapter .quote (columns .dbt_valid_to ) }} = {{ config .get (' dbt_valid_to_current' ) }} or
21+ DBT_INTERNAL_DEST.{{ adapter .quote (columns .dbt_valid_to ) }} is null )
22+ {% - else %}
23+ and DBT_INTERNAL_DEST.{{ adapter .quote (columns .dbt_valid_to ) }} is null
24+ {% - endif %}
25+ and DBT_INTERNAL_SOURCE.{{ adapter .quote (' dbt_change_type' ) }} in (' update' , ' delete' )
2026 then update
2127 set {{ adapter .quote (columns .dbt_valid_to ) }} = DBT_INTERNAL_SOURCE.{{ adapter .quote (columns .dbt_valid_to ) }}
2228
2329 when not matched
24- and DBT_INTERNAL_SOURCE.{{ adapter .quote (' dbt_change_type' ) }} = ' insert'
25- then insert *
30+ and DBT_INTERNAL_SOURCE.{{ adapter .quote (' dbt_change_type' ) }} = ' insert'
31+ then insert ({{ insert_cols_csv }})
32+ values ({{ insert_cols_csv }})
33+
34+ {%- if invalidate_hard_deletes %}
35+ when not matched by source
36+ and DBT_INTERNAL_DEST.{{ columns .dbt_valid_to }} is null
37+ then update set
38+ {{ columns .dbt_valid_to }} = current_timestamp ()
39+ {%- endif %}
2640 ;
2741{% endmacro %}
2842
2943
3044{% macro databricks__create_columns(relation, columns) %}
31- {% if columns|length > 0 %}
32- {% call statement() %}
33- alter table {{ relation }} add columns (
34- {% for column in columns %}
35- {{ adapter .quote (column .name ) }} {{ column .data_type }} {{- ' ,' if not loop .last - }}
36- {% endfor %}
37- );
38- {% endcall %}
45+ {%- if columns|length > 0 %}
46+ {%- call statement() %}
47+ alter table {{ relation }} add columns (
48+ {%- for column in columns %}
49+ {{ adapter .quote (column .name ) }} {{ column .data_type }} {{- ' ,' if not loop .last - }}
50+ {%- endfor %}
51+ );
52+ {%- endcall %}
53+ {%- endif %}
54+ {% endmacro %}
55+
56+
57+ {% macro databricks__build_snapshot_table(strategy, sql) %}
58+ {%- set columns = config .get (" snapshot_table_column_names" ) or get_snapshot_table_column_names() - %}
59+ {%- set hard_deletes = strategy .hard_deletes - %}
60+
61+ select * ,
62+ {{ strategy .scd_id }} as {{ columns .dbt_scd_id }},
63+ {{ strategy .updated_at }} as {{ columns .dbt_updated_at }},
64+ {{ strategy .updated_at }} as {{ columns .dbt_valid_from }},
65+ {{ get_dbt_valid_to_current(strategy, columns) }}
66+ {%- if hard_deletes == ' new_record' - %}
67+ , false as {{ columns .dbt_is_deleted }}
68+ {%- endif %}
69+ from (
70+ {{ sql }}
71+ ) sbq
72+ {% endmacro %}
73+
74+ {% macro databricks__snapshot_staging_table(strategy, source_sql, target_relation) - %}
75+ {% set columns = config .get (' snapshot_table_column_names' ) or get_snapshot_table_column_names() %}
76+ {% if strategy .hard_deletes == ' new_record' %}
77+ {% set new_scd_id = snapshot_hash_arguments([columns .dbt_scd_id , snapshot_get_time()]) %}
3978 {% endif %}
40- {% endmacro %}
79+ with snapshot_query as (
80+
81+ {{ source_sql }}
82+
83+ ),
84+
85+ snapshotted_data as (
86+
87+ select * , {{ unique_key_fields(strategy .unique_key ) }}
88+ from {{ target_relation }}
89+ where
90+ {% if config .get (' dbt_valid_to_current' ) %}
91+ {% set source_unique_key = columns .dbt_valid_to | trim %}
92+ {% set target_unique_key = config .get (' dbt_valid_to_current' ) | trim %}
93+
94+ {# The exact equals semantics between NULL values depends on the current behavior flag set. Also, update records if the source field is null #}
95+ ( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null )
96+ {% else %}
97+ {{ columns .dbt_valid_to }} is null
98+ {% endif %}
99+
100+ ),
101+
102+ insertions_source_data as (
103+
104+ select * , {{ unique_key_fields(strategy .unique_key ) }},
105+ {{ strategy .updated_at }} as {{ columns .dbt_updated_at }},
106+ {{ strategy .updated_at }} as {{ columns .dbt_valid_from }},
107+ {{ get_dbt_valid_to_current(strategy, columns) }},
108+ {{ strategy .scd_id }} as {{ columns .dbt_scd_id }}
109+
110+ from snapshot_query
111+ ),
112+
113+ updates_source_data as (
114+
115+ select * , {{ unique_key_fields(strategy .unique_key ) }},
116+ {{ strategy .updated_at }} as {{ columns .dbt_updated_at }},
117+ {{ strategy .updated_at }} as {{ columns .dbt_valid_from }},
118+ {{ strategy .updated_at }} as {{ columns .dbt_valid_to }}
119+
120+ from snapshot_query
121+ ),
122+
123+ {%- if strategy .hard_deletes == ' invalidate' or strategy .hard_deletes == ' new_record' %}
124+
125+ deletes_source_data as (
126+
127+ select * , {{ unique_key_fields(strategy .unique_key ) }}
128+ from snapshot_query
129+ ),
130+ {% endif %}
131+
132+ insertions as (
133+
134+ select
135+ ' insert' as dbt_change_type,
136+ source_data.*
137+ {%- if strategy .hard_deletes == ' new_record' - %}
138+ , false as {{ columns .dbt_is_deleted }}
139+ {%- endif %}
140+
141+ from insertions_source_data as source_data
142+ left outer join snapshotted_data
143+ on {{ unique_key_join_on(strategy .unique_key , " snapshotted_data" , " source_data" ) }}
144+ where {{ unique_key_is_null(strategy .unique_key , " snapshotted_data" ) }}
145+ or ({{ unique_key_is_not_null(strategy .unique_key , " snapshotted_data" ) }} and (
146+ {{ strategy .row_changed }} {%- if strategy .hard_deletes == ' new_record' - %} or snapshotted_data.{{ columns .dbt_is_deleted }} = true {% endif %}
147+ )
148+
149+ )
150+
151+ ),
152+
153+ updates as (
154+
155+ select
156+ ' update' as dbt_change_type,
157+ source_data.* ,
158+ snapshotted_data.{{ columns .dbt_scd_id }}
159+ {%- if strategy .hard_deletes == ' new_record' - %}
160+ , snapshotted_data.{{ columns .dbt_is_deleted }}
161+ {%- endif %}
162+
163+ from updates_source_data as source_data
164+ join snapshotted_data
165+ on {{ unique_key_join_on(strategy .unique_key , " snapshotted_data" , " source_data" ) }}
166+ where (
167+ {{ strategy .row_changed }} {%- if strategy .hard_deletes == ' new_record' - %} or snapshotted_data.{{ columns .dbt_is_deleted }} = true {% endif %}
168+ )
169+ )
170+
171+ {%- if strategy .hard_deletes == ' invalidate' or strategy .hard_deletes == ' new_record' %}
172+ ,
173+ deletes as (
174+
175+ select
176+ ' delete' as dbt_change_type,
177+ source_data.* ,
178+ {{ snapshot_get_time() }} as {{ columns .dbt_valid_from }},
179+ {{ snapshot_get_time() }} as {{ columns .dbt_updated_at }},
180+ {{ snapshot_get_time() }} as {{ columns .dbt_valid_to }},
181+ snapshotted_data.{{ columns .dbt_scd_id }}
182+ {%- if strategy .hard_deletes == ' new_record' - %}
183+ , snapshotted_data.{{ columns .dbt_is_deleted }}
184+ {%- endif %}
185+ from snapshotted_data
186+ left join deletes_source_data as source_data
187+ on {{ unique_key_join_on(strategy .unique_key , " snapshotted_data" , " source_data" ) }}
188+ where {{ unique_key_is_null(strategy .unique_key , " source_data" ) }}
189+
190+ {%- if strategy .hard_deletes == ' new_record' %}
191+ and not (
192+ -- avoid updating the record's valid_to if the latest entry is marked as deleted
193+ snapshotted_data.{{ columns .dbt_is_deleted }} = true
194+ and
195+ {% if config .get (' dbt_valid_to_current' ) - %}
196+ snapshotted_data.{{ columns .dbt_valid_to }} = {{ config .get (' dbt_valid_to_current' ) }}
197+ {%- else - %}
198+ snapshotted_data.{{ columns .dbt_valid_to }} is null
199+ {%- endif %}
200+ )
201+ {%- endif %}
202+ )
203+ {%- endif %}
204+
205+ {%- if strategy .hard_deletes == ' new_record' %}
206+ {# Databricks-specific: Extract column names from agate.Row tuples #}
207+ {% set target_columns_raw = get_columns_in_relation(target_relation) %}
208+ {% set snapshotted_cols = [] %}
209+ {% for row in target_columns_raw %}
210+ {# agate.Row is a tuple: (col_name, data_type, comment) #}
211+ {# Filter out Databricks metadata rows (starting with # or empty) #}
212+ {% set col_name = row[0 ] %}
213+ {% if col_name and not col_name .startswith (' #' ) %}
214+ {% do snapshotted_cols .append (col_name) %}
215+ {% endif %}
216+ {% endfor %}
217+ {% set source_sql_cols = get_column_schema_from_query(source_sql) %}
218+ ,
219+ deletion_records as (
220+
221+ select
222+ ' insert' as dbt_change_type,
223+ {#
224+ If a column has been added to the source it won' t yet exist in the
225+ snapshotted table so we insert a null value as a placeholder for the column.
226+ #}
227+ {%- for col in source_sql_cols -%}
228+ {%- if col.name in snapshotted_cols -%}
229+ snapshotted_data.{{ adapter.quote(col.column) }},
230+ {%- else -%}
231+ NULL as {{ adapter.quote(col.column) }},
232+ {%- endif -%}
233+ {% endfor -%}
234+ {%- if strategy.unique_key | is_list -%}
235+ {%- for key in strategy.unique_key -%}
236+ snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }},
237+ {% endfor -%}
238+ {%- else -%}
239+ snapshotted_data.dbt_unique_key as dbt_unique_key,
240+ {% endif -%}
241+ {{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
242+ {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
243+ snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }},
244+ {{ new_scd_id }} as {{ columns.dbt_scd_id }},
245+ true as {{ columns.dbt_is_deleted }}
246+ from snapshotted_data
247+ left join deletes_source_data as source_data
248+ on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
249+ where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
250+ and not (
251+ -- avoid inserting a new record if the latest one is marked as deleted
252+ snapshotted_data.{{ columns.dbt_is_deleted }} = true
253+ and
254+ {% if config.get(' dbt_valid_to_current' ) -%}
255+ snapshotted_data.{{ columns.dbt_valid_to }} = {{ config.get(' dbt_valid_to_current' ) }}
256+ {%- else -%}
257+ snapshotted_data.{{ columns.dbt_valid_to }} is null
258+ {%- endif %}
259+ )
260+
261+ )
262+ {%- endif %}
263+
264+ select * from insertions
265+ union all
266+ select * from updates
267+ {%- if strategy.hard_deletes == ' invalidate' or strategy.hard_deletes == ' new_record' %}
268+ union all
269+ select * from deletes
270+ {%- endif %}
271+ {%- if strategy.hard_deletes == ' new_record' %}
272+ union all
273+ select * from deletion_records
274+ {%- endif %}
275+
276+
277+ {%- endmacro %}
0 commit comments