Skip to content

Commit 06a56ee

Browse files
feat: add Docker-based Spark integration test setup (#946)
* feat: add Docker-based Spark integration test setup - Add docker-compose-spark.yml with Spark Thrift Server + Hive Metastore - Add Docker config files (Dockerfile, entrypoint, hive-site.xml, spark-defaults.conf) - Add Spark profile to profiles.yml.j2 (thrift method on port 10000) - Add 'Start Spark' step in test-warehouse.yml CI workflow - Add Spark to Docker targets matrix in test-all-warehouses.yml - Fix python-dev -> python3-dev for Ubuntu 22.04+ compatibility Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * fix: handle Spark database=Undefined in test source configs and address review feedback - Fix database=Undefined error for Spark target by returning None from get_database_and_schema_properties() and omitting database field from YAML source configs when None - Upgrade Hive Metastore from postgres:9-alpine to postgres:15-alpine - Use HTTPS for Spark download in Dockerfile - Add bounded timeout to entrypoint.sh wait loop (180s default) Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * fix: add Spark-specific macros for database/schema resolution and string escaping - Add spark__get_package_database_and_schema to handle Spark's lack of catalog/database, ensuring is_elementary_enabled() returns True - Add spark__escape_special_chars to replace newlines with spaces instead of \n, which Spark's SQL parser treats as literal line breaks in INSERT VALUES statements, causing row corruption Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * fix: add Delta Lake support, Spark dispatches for schema/artifact operations, and escape fixes - Add Delta Lake JARs to Spark Docker image for v2 table support (DELETE/MERGE) - Configure file_format=delta for Spark targets in dbt_project.yml and profiles - Add spark__get_default_incremental_strategy returning 'merge' for Delta tables - Fix spark__get_delete_and_insert_queries to always use MERGE (Delta doesn't support DELETE with subqueries) - Add Spark dispatches for test helpers: edr_create_schema, edr_drop_schema, edr_schema_exists, edr_list_schemas - Add spark__get_anomaly_config dispatch to handle Spark's database==schema requirement - Fix is_elementary_enabled to check schema as fallback for adapters without database concept - Fix spark__escape_special_chars to use C-style backslash escaping (Spark doesn't support SQL-standard '') - Revert spark__get_package_database_and_schema (default dispatch returns [None, schema] correctly) Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * fix: address CodeRabbit review - escape backticks, fix MERGE aliases, add docs, slim Dockerfile Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * feat: unskip exposure_schema_validity tests for Spark, tune Spark config for speed Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * perf: add SparkDirectSeeder (bypass dbt seed) and tune Spark config for -n8 parallelism - Add SparkDirectSeeder that executes CREATE TABLE + INSERT VALUES directly via the dbt adapter, bypassing the ~4s dbt subprocess overhead per seed - Add execute_sql() and schema_name property to AdapterQueryRunner - DbtProject auto-selects SparkDirectSeeder when target is 'spark' - Tune spark-defaults.conf: executor.cores=4, default.parallelism=4, thriftServer.async=true for better concurrent session handling - Restore -n8 parallelism for Spark in CI (was -n4) Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * perf: revert Spark parallelism to -n4 (keep direct seeder optimization) The -n8 experiment may be causing resource contention on the 2-vCPU CI runner. Reverting Spark to -n4 while keeping the SparkDirectSeeder and Spark config tuning (executor.cores=4, async=true). The direct seeder alone should provide meaningful speedup (~3.6x faster per seed). Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * fix: add type inference to SparkDirectSeeder (BIGINT, DOUBLE, BOOLEAN, STRING) Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * refactor: extract BaseDirectSeeder base class for Spark and ClickHouse seeders - Shared logic: type inference, CSV writing for ref(), batched inserts, cleanup - SparkDirectSeeder and ClickHouseDirectSeeder are thin subclasses - SparkDirectSeeder now writes CSV for ref() resolution (was missing) - _create_seeder() handles both spark and clickhouse targets Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * fix: revert Spark config tuning (perf regression), add Docker healthcheck, fix MERGE comment - Revert executor.cores back to 1, default.parallelism back to 2, remove thriftServer.async — these were introduced in 6a8ec2a and correlated with a performance regression (tests taking longer than the original ~36 min) - Add Docker healthcheck to spark-thrift container (nc -z localhost 10000) - Use docker inspect healthcheck in CI instead of raw nc port polling - Add explicit container_name to spark-thrift for reliable docker inspect - Fix MERGE comment in delete_and_insert.sql to accurately describe why we use MERGE unconditionally on Spark Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * fix: restore non-Delta fallback in spark__get_delete_and_insert_queries Restore the elif branch for non-Delta Spark tables (used by dbt-databricks). The three branches are now: 1. relation.metadata and is_delta → MERGE (dbt-databricks, Delta) 2. not relation.metadata → MERGE (dbt-spark thrift, assumes Delta via config) 3. else → DELETE with subquery (dbt-databricks, non-Delta) Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * refactor: unify MERGE branches in spark__get_delete_and_insert_queries Combine the Delta and no-metadata MERGE conditions into a single branch: (relation.metadata and relation.is_delta) or not relation.metadata Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * perf: revert SparkDirectSeeder to DbtDataSeeder to isolate performance regression Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * perf: fix SparkDirectSeeder to use PyHive directly, avoiding global dbt state corruption Root cause: AdapterQueryRunner._create_adapter() calls set_from_args() and reset_adapters() which corrupt global dbt state (GLOBAL_FLAGS, adapter registry). Since tests use the in-process APIDbtRunner (dbtRunner().invoke()), this corrupted state causes subsequent dbt test calls to run with wrong flags, leading to 3-10x regressions on multi-call tests (e.g. volume_anomaly). Fix: SparkDirectSeeder now uses PyHive/Thrift directly instead of going through AdapterQueryRunner, completely avoiding the global state issue. Connection details are read from profiles.yml. Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * revert: remove SparkDirectSeeder, use DbtDataSeeder for Spark The SparkDirectSeeder (both AdapterQueryRunner and PyHive variants) caused a ~60% regression on Spark CI (58 min vs 36 min baseline). The regression was concentrated in volume_anomaly tests (3-10x slower) which call dbt_project.test() multiple times per pytest function. Two approaches were tested: 1. SparkDirectSeeder via AdapterQueryRunner: 57:15 (hypothesis: global dbt state corruption via set_from_args/reset_adapters) 2. SparkDirectSeeder via PyHive directly: 58:03 (bypasses dbt entirely) Both showed the same regression, disproving the global state corruption hypothesis. The root cause of the interaction between direct SQL seeding and subsequent dbt test calls remains undetermined. Reverting Spark to DbtDataSeeder restores the 36:47 baseline. ClickHouseDirectSeeder (via BaseDirectSeeder) is kept as it works correctly. Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Itamar Hartstein <haritamar@gmail.com>
1 parent 8731abd commit 06a56ee

23 files changed

Lines changed: 407 additions & 107 deletions

.github/workflows/test-all-warehouses.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ jobs:
4848
dbt-version:
4949
${{ inputs.dbt-version && fromJSON(format('["{0}"]', inputs.dbt-version)) ||
5050
fromJSON('["latest_official", "latest_pre"]') }}
51-
warehouse-type: [postgres, clickhouse, trino, dremio, duckdb]
51+
warehouse-type: [postgres, clickhouse, trino, dremio, spark, duckdb]
5252
exclude:
5353
# latest_pre is only tested on postgres
5454
- dbt-version: latest_pre
@@ -57,6 +57,8 @@ jobs:
5757
warehouse-type: trino
5858
- dbt-version: latest_pre
5959
warehouse-type: dremio
60+
- dbt-version: latest_pre
61+
warehouse-type: spark
6062
- dbt-version: latest_pre
6163
warehouse-type: duckdb
6264
uses: ./.github/workflows/test-warehouse.yml

.github/workflows/test-warehouse.yml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,16 @@ jobs:
100100
timeout 180 bash -c 'until [ "$(docker inspect -f {{.State.Health.Status}} dremio 2>/dev/null)" = "healthy" ]; do sleep 5; done'
101101
echo "Dremio is healthy."
102102
103+
- name: Start Spark
104+
if: inputs.warehouse-type == 'spark'
105+
working-directory: ${{ env.TESTS_DIR }}
106+
run: |
107+
docker compose -f docker-compose-spark.yml build
108+
docker compose -f docker-compose-spark.yml up -d
109+
echo "Waiting for Spark Thrift Server to become healthy..."
110+
timeout 180 bash -c 'until [ "$(docker inspect -f {{.State.Health.Status}} spark-thrift 2>/dev/null)" = "healthy" ]; do sleep 5; done'
111+
echo "Spark Thrift Server is healthy."
112+
103113
- name: Setup Python
104114
uses: actions/setup-python@v6
105115
with:
@@ -108,7 +118,7 @@ jobs:
108118

109119
- name: Install Spark requirements
110120
if: inputs.warehouse-type == 'spark'
111-
run: sudo apt-get install python-dev libsasl2-dev gcc
121+
run: sudo apt-get update && sudo apt-get install -y python3-dev libsasl2-dev gcc
112122

113123
- name: Install compatible databricks connector (not limited in older dbt-databricks versions)
114124
if: startsWith(inputs.warehouse-type, 'databricks') && inputs.dbt-version < '1.7.0'
@@ -168,7 +178,7 @@ jobs:
168178
169179
- name: Test
170180
working-directory: "${{ env.TESTS_DIR }}/tests"
171-
run: py.test -n8 -vvv --target "${{ inputs.warehouse-type }}" --junit-xml=test-results.xml --html=detailed_report_${{ inputs.warehouse-type }}_dbt_${{ inputs.dbt-version }}.html --self-contained-html --clear-on-end ${{ (inputs.dbt-version == 'fusion' && '--runner-method fusion') || '' }}
181+
run: py.test -n${{ (inputs.warehouse-type == 'spark' && '4') || '8' }} -vvv --target "${{ inputs.warehouse-type }}" --junit-xml=test-results.xml --html=detailed_report_${{ inputs.warehouse-type }}_dbt_${{ inputs.dbt-version }}.html --self-contained-html --clear-on-end ${{ (inputs.dbt-version == 'fusion' && '--runner-method fusion') || '' }}
172182

173183
- name: Upload test results
174184
if: always()

integration_tests/dbt_project/dbt_project.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ models:
2323
elementary_tests:
2424
tmp:
2525
+materialized: table
26+
+file_format: "{{ 'delta' if target.type == 'spark' else none }}"
2627

2728
elementary:
2829
+schema: elementary
2930
+enabled: "{{ var('elementary_enabled', True) }}"
31+
+file_format: "{{ 'delta' if target.type == 'spark' else none }}"

integration_tests/dbt_project/macros/ci_schemas_cleanup/test_drop_stale_ci_schemas.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,8 @@
7373
{% do run_query("CREATE DATABASE IF NOT EXISTS `" ~ schema_name ~ "`") %}
7474
{% do adapter.commit() %}
7575
{% endmacro %}
76+
77+
{% macro spark__edr_create_schema(database, schema_name) %}
78+
{% set safe_schema = schema_name | replace("`", "``") %}
79+
{% do run_query("CREATE DATABASE IF NOT EXISTS `" ~ safe_schema ~ "`") %}
80+
{% endmacro %}

integration_tests/dbt_project/macros/clear_env.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919
{% do adapter.commit() %}
2020
{% endmacro %}
2121

22+
{% macro spark__edr_drop_schema(database_name, schema_name) %}
23+
{% set safe_schema = schema_name | replace("`", "``") %}
24+
{% do run_query("DROP DATABASE IF EXISTS `" ~ safe_schema ~ "` CASCADE") %}
25+
{% endmacro %}
26+
2227
{% macro duckdb__edr_drop_schema(database_name, schema_name) %}
2328
{% do run_query("DROP SCHEMA IF EXISTS " ~ schema_name ~ " CASCADE") %}
2429
{% do adapter.commit() %}

integration_tests/dbt_project/macros/get_anomaly_config.sql

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,29 @@
2525
{% do return(elementary.get_anomalies_test_configuration(api.Relation.create("db", "schema", "mock_model"), **config)[0]) %}
2626
{% endmacro %}
2727

28+
{% macro spark__get_anomaly_config(model_config, config) %}
29+
{% set mock_model = {
30+
"alias": "mock_model",
31+
"config": {
32+
"elementary": model_config
33+
}
34+
} %}
35+
{# trick elementary into thinking this is the running model #}
36+
{% do context.update({
37+
"model": {
38+
"depends_on": {
39+
"nodes": ["id"]
40+
}
41+
},
42+
"graph": {
43+
"nodes": {
44+
"id": mock_model
45+
}
46+
}
47+
}) %}
48+
{% do return(elementary.get_anomalies_test_configuration(api.Relation.create("schema", "schema", "mock_model"), **config)[0]) %}
49+
{% endmacro %}
50+
2851
{% macro clickhouse__get_anomaly_config(model_config, config) %}
2952
{% set mock_model = {
3053
"alias": "mock_model",
@@ -46,4 +69,4 @@
4669
}
4770
}) %}
4871
{% do return(elementary.get_anomalies_test_configuration(api.Relation.create("schema", "schema", "mock_model"), **config)[0]) %}
49-
{% endmacro %}
72+
{% endmacro %}

integration_tests/dbt_project/macros/schema_utils/list_schemas.sql

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,12 @@
3838
{% endfor %}
3939
{% do return(schemas) %}
4040
{% endmacro %}
41+
42+
{% macro spark__edr_list_schemas(database) %}
43+
{% set results = run_query('SHOW DATABASES') %}
44+
{% set schemas = [] %}
45+
{% for row in results %}
46+
{% do schemas.append(row[0]) %}
47+
{% endfor %}
48+
{% do return(schemas) %}
49+
{% endmacro %}

integration_tests/dbt_project/macros/schema_utils/schema_exists.sql

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,9 @@
2929
{% set result = run_query("SELECT 1 FROM system.databases WHERE name = '" ~ safe_schema ~ "' LIMIT 1") %}
3030
{% do return(result | length > 0) %}
3131
{% endmacro %}
32+
33+
{% macro spark__edr_schema_exists(database, schema_name) %}
34+
{% set safe_schema = schema_name | replace("'", "''") %}
35+
{% set result = run_query("SHOW DATABASES LIKE '" ~ safe_schema ~ "'") %}
36+
{% do return(result | length > 0) %}
37+
{% endmacro %}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
version: "3.8"
2+
3+
services:
4+
spark-thrift:
5+
container_name: spark-thrift
6+
build:
7+
context: ./docker/spark
8+
dockerfile: Dockerfile
9+
ports:
10+
- "10000:10000"
11+
- "4040:4040"
12+
depends_on:
13+
- spark-hive-metastore
14+
command: >
15+
--class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
16+
--name Thrift JDBC/ODBC Server
17+
healthcheck:
18+
test: ["CMD-SHELL", "nc -z localhost 10000"]
19+
interval: 10s
20+
timeout: 5s
21+
retries: 20
22+
start_period: 30s
23+
volumes:
24+
- spark-warehouse:/spark-warehouse/
25+
- ./docker/spark/hive-site.xml:/usr/spark/conf/hive-site.xml
26+
- ./docker/spark/spark-defaults.conf:/usr/spark/conf/spark-defaults.conf
27+
environment:
28+
- WAIT_FOR=spark-hive-metastore:5432
29+
30+
spark-hive-metastore:
31+
image: postgres:15-alpine
32+
volumes:
33+
- hive-metastore:/var/lib/postgresql/data
34+
environment:
35+
- POSTGRES_USER=dbt
36+
- POSTGRES_PASSWORD=dbt
37+
- POSTGRES_DB=metastore
38+
39+
volumes:
40+
spark-warehouse:
41+
hive-metastore:
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
ARG OPENJDK_VERSION=8
2+
FROM eclipse-temurin:${OPENJDK_VERSION}-jre
3+
4+
ARG SPARK_VERSION=3.3.2
5+
ARG HADOOP_VERSION=3
6+
ARG DELTA_VERSION=2.2.0
7+
8+
ENV SPARK_HOME /usr/spark
9+
ENV PATH="/usr/spark/bin:/usr/spark/sbin:${PATH}"
10+
11+
RUN apt-get update && \
12+
apt-get install -y --no-install-recommends wget netcat-openbsd procps libpostgresql-jdbc-java && \
13+
wget -q "https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" && \
14+
tar xzf "spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" && \
15+
rm "spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" && \
16+
mv "spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}" /usr/spark && \
17+
ln -s /usr/share/java/postgresql-jdbc4.jar /usr/spark/jars/postgresql-jdbc4.jar && \
18+
wget -q "https://repo1.maven.org/maven2/io/delta/delta-core_2.12/${DELTA_VERSION}/delta-core_2.12-${DELTA_VERSION}.jar" \
19+
-P /usr/spark/jars/ && \
20+
wget -q "https://repo1.maven.org/maven2/io/delta/delta-storage/${DELTA_VERSION}/delta-storage-${DELTA_VERSION}.jar" \
21+
-P /usr/spark/jars/ && \
22+
apt-get remove -y wget && \
23+
apt-get autoremove -y && \
24+
apt-get clean
25+
26+
COPY entrypoint.sh /scripts/
27+
RUN chmod +x /scripts/entrypoint.sh
28+
29+
ENTRYPOINT ["/scripts/entrypoint.sh"]
30+
CMD ["--help"]

0 commit comments

Comments
 (0)