2020from concurrent .futures import ThreadPoolExecutor
2121
2222import pyarrow as pa
23- from datafusion import Config , SessionContext , col , lit
23+ from datafusion import SessionContext , col , lit
2424from datafusion import functions as f
2525from datafusion .common import SqlSchema
2626
@@ -34,16 +34,14 @@ def _run_in_threads(fn, count: int = 8) -> None:
3434
3535
3636def test_concurrent_access_to_shared_structures () -> None :
37- """Exercise SqlSchema, Config, and DataFrame concurrently."""
37+ """Exercise SqlSchema and DataFrame concurrently."""
3838
3939 schema = SqlSchema ("concurrency" )
40- config = Config ()
4140 ctx = SessionContext ()
4241
4342 batch = pa .record_batch ([pa .array ([1 , 2 , 3 ], type = pa .int32 ())], names = ["value" ])
4443 df = ctx .create_dataframe ([[batch ]])
4544
46- config_key = "datafusion.execution.batch_size"
4745 expected_rows = batch .num_rows
4846
4947 def worker (index : int ) -> None :
@@ -54,41 +52,12 @@ def worker(index: int) -> None:
5452 assert isinstance (schema .views , list )
5553 assert isinstance (schema .functions , list )
5654
57- config .set (config_key , str (1024 + index ))
58- assert config .get (config_key ) is not None
59- # Access the full config map to stress lock usage.
60- assert config_key in config .get_all ()
61-
6255 batches = df .collect ()
6356 assert sum (batch .num_rows for batch in batches ) == expected_rows
6457
6558 _run_in_threads (worker , count = 12 )
6659
6760
68- def test_config_set_during_get_all () -> None :
69- """Ensure config writes proceed while another thread reads all entries."""
70-
71- config = Config ()
72- key = "datafusion.execution.batch_size"
73-
74- def reader () -> None :
75- for _ in range (200 ):
76- # get_all should not hold the lock while converting to Python objects
77- config .get_all ()
78-
79- def writer () -> None :
80- for index in range (200 ):
81- config .set (key , str (1024 + index ))
82-
83- with ThreadPoolExecutor (max_workers = 2 ) as executor :
84- reader_future = executor .submit (reader )
85- writer_future = executor .submit (writer )
86- reader_future .result (timeout = 10 )
87- writer_future .result (timeout = 10 )
88-
89- assert config .get (key ) is not None
90-
91-
9261def test_case_builder_reuse_from_multiple_threads () -> None :
9362 """Ensure the case builder can be safely reused across threads."""
9463
0 commit comments