Skip to content

Commit b219dcc

Browse files
committed
gh-116738: Make zlib module thread-safe
1 parent 9e3d7cd commit b219dcc

3 files changed

Lines changed: 117 additions & 53 deletions

File tree

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import itertools
2+
import unittest
3+
4+
from test.support import import_helper, threading_helper
5+
from test.support.threading_helper import run_concurrently
6+
7+
zlib = import_helper.import_module("zlib")
8+
9+
from test.test_zlib import HAMLET_SCENE
10+
11+
12+
NTHREADS = 10
13+
14+
15+
@threading_helper.requires_working_threading()
16+
class TestZlib(unittest.TestCase):
17+
def test_compressor(self):
18+
comp = zlib.compressobj()
19+
20+
# First compress() outputs zlib header
21+
header = comp.compress(HAMLET_SCENE)
22+
self.assertGreater(len(header), 0)
23+
24+
def worker():
25+
# it should return empty bytes as it buffers data internally
26+
data = comp.compress(HAMLET_SCENE)
27+
self.assertEqual(data, b"")
28+
29+
run_concurrently(worker_func=worker, nthreads=NTHREADS - 1)
30+
full_compressed = header + comp.flush()
31+
decompressed = zlib.decompress(full_compressed)
32+
# The decompressed data should be HAMLET_SCENE repeated NTHREADS times
33+
self.assertEqual(decompressed, HAMLET_SCENE * NTHREADS)
34+
35+
def test_decompressor_concurrent_attribute_reads(self):
36+
input_data = HAMLET_SCENE * NTHREADS
37+
compressed = zlib.compress(input_data)
38+
39+
decomp = zlib.decompressobj()
40+
decomp_size_per_loop = len(input_data) // 1000
41+
decompressed_parts = []
42+
43+
def decomp_worker():
44+
# Decompress in chunks, which updates eof, unused_data, unconsumed_tail
45+
decompressed_parts.append(
46+
decomp.decompress(compressed, decomp_size_per_loop)
47+
)
48+
while decomp.unconsumed_tail:
49+
decompressed_parts.append(
50+
decomp.decompress(
51+
decomp.unconsumed_tail, decomp_size_per_loop
52+
)
53+
)
54+
55+
def decomp_attr_reader():
56+
# Read attributes concurrently while another thread decompresses
57+
for _ in range(1000):
58+
_ = decomp.unused_data
59+
_ = decomp.unconsumed_tail
60+
_ = decomp.eof
61+
62+
counter = itertools.count()
63+
64+
def worker():
65+
# First thread decompresses, others read attributes
66+
if next(counter) == 0:
67+
decomp_worker()
68+
else:
69+
decomp_attr_reader()
70+
71+
run_concurrently(worker_func=worker, nthreads=NTHREADS)
72+
73+
self.assertTrue(decomp.eof)
74+
self.assertEqual(decomp.unused_data, b"")
75+
decompressed = b"".join(decompressed_parts)
76+
self.assertEqual(decompressed, HAMLET_SCENE * NTHREADS)
77+
78+
79+
if __name__ == "__main__":
80+
unittest.main()
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Make the attributes in :mod:`zlib` thread-safe on the :term:`free threaded
2+
<free threading>` build.

Modules/zlibmodule.c

Lines changed: 35 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#endif
99

1010
#include "Python.h"
11+
#include "pycore_object.h" // _PyObject_XSetRefDelayed
1112

1213
#include "zlib.h"
1314
#include "stdbool.h"
@@ -181,15 +182,6 @@ OutputBuffer_WindowOnError(_BlocksOutputBuffer *buffer, _Uint32Window *window)
181182
}
182183

183184

184-
#define ENTER_ZLIB(obj) do { \
185-
if (!PyThread_acquire_lock((obj)->lock, 0)) { \
186-
Py_BEGIN_ALLOW_THREADS \
187-
PyThread_acquire_lock((obj)->lock, 1); \
188-
Py_END_ALLOW_THREADS \
189-
} } while (0)
190-
#define LEAVE_ZLIB(obj) PyThread_release_lock((obj)->lock);
191-
192-
193185
/* The following parameters are copied from zutil.h, version 0.95 */
194186
#define DEFLATED 8
195187
#if MAX_MEM_LEVEL >= 8
@@ -228,7 +220,7 @@ typedef struct
228220
char eof;
229221
bool is_initialised;
230222
PyObject *zdict;
231-
PyThread_type_lock lock;
223+
PyMutex mutex;
232224
} compobject;
233225

234226
#define _compobject_CAST(op) ((compobject *)op)
@@ -291,12 +283,7 @@ newcompobject(PyTypeObject *type)
291283
Py_DECREF(self);
292284
return NULL;
293285
}
294-
self->lock = PyThread_allocate_lock();
295-
if (self->lock == NULL) {
296-
Py_DECREF(self);
297-
PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
298-
return NULL;
299-
}
286+
self->mutex = (PyMutex){0};
300287
return self;
301288
}
302289

@@ -720,10 +707,10 @@ compobject_dealloc_impl(PyObject *op, int (*dealloc)(z_streamp))
720707
PyTypeObject *type = Py_TYPE(op);
721708
PyObject_GC_UnTrack(op);
722709
compobject *self = _compobject_CAST(op);
710+
assert(!PyMutex_IsLocked(&self->mutex));
723711
if (self->is_initialised) {
724712
(void)dealloc(&self->zst);
725713
}
726-
PyThread_free_lock(self->lock);
727714
Py_XDECREF(self->unused_data);
728715
Py_XDECREF(self->unconsumed_tail);
729716
Py_XDECREF(self->zdict);
@@ -777,7 +764,7 @@ zlib_Compress_compress_impl(compobject *self, PyTypeObject *cls,
777764
_BlocksOutputBuffer buffer = {.writer = NULL};
778765
zlibstate *state = PyType_GetModuleState(cls);
779766

780-
ENTER_ZLIB(self);
767+
PyMutex_Lock(&self->mutex);
781768

782769
self->zst.next_in = data->buf;
783770
Py_ssize_t ibuflen = data->len;
@@ -819,7 +806,7 @@ zlib_Compress_compress_impl(compobject *self, PyTypeObject *cls,
819806
OutputBuffer_OnError(&buffer);
820807
return_value = NULL;
821808
success:
822-
LEAVE_ZLIB(self);
809+
PyMutex_Unlock(&self->mutex);
823810
return return_value;
824811
}
825812

@@ -850,7 +837,7 @@ save_unconsumed_input(compobject *self, Py_buffer *data, int err)
850837
if (new_unused_data == NULL) {
851838
return -1;
852839
}
853-
Py_SETREF(self->unused_data, new_unused_data);
840+
_PyObject_XSetRefDelayed(&self->unused_data, new_unused_data);
854841
self->zst.avail_in = 0;
855842
}
856843
}
@@ -864,7 +851,7 @@ save_unconsumed_input(compobject *self, Py_buffer *data, int err)
864851
(char *)self->zst.next_in, left_size);
865852
if (new_data == NULL)
866853
return -1;
867-
Py_SETREF(self->unconsumed_tail, new_data);
854+
_PyObject_XSetRefDelayed(&self->unconsumed_tail, new_data);
868855
}
869856

870857
return 0;
@@ -909,7 +896,7 @@ zlib_Decompress_decompress_impl(compobject *self, PyTypeObject *cls,
909896
max_length = -1;
910897
}
911898

912-
ENTER_ZLIB(self);
899+
PyMutex_Lock(&self->mutex);
913900

914901
self->zst.next_in = data->buf;
915902
ibuflen = data->len;
@@ -962,7 +949,7 @@ zlib_Decompress_decompress_impl(compobject *self, PyTypeObject *cls,
962949
if (err == Z_STREAM_END) {
963950
/* This is the logical place to call inflateEnd, but the old behaviour
964951
of only calling it on flush() is preserved. */
965-
self->eof = 1;
952+
FT_ATOMIC_STORE_CHAR_RELAXED(self->eof, 1);
966953
} else if (err != Z_OK && err != Z_BUF_ERROR) {
967954
/* We will only get Z_BUF_ERROR if the output buffer was full
968955
but there wasn't more output when we tried again, so it is
@@ -981,7 +968,7 @@ zlib_Decompress_decompress_impl(compobject *self, PyTypeObject *cls,
981968
OutputBuffer_OnError(&buffer);
982969
return_value = NULL;
983970
success:
984-
LEAVE_ZLIB(self);
971+
PyMutex_Unlock(&self->mutex);
985972
return return_value;
986973
}
987974

@@ -1014,7 +1001,7 @@ zlib_Compress_flush_impl(compobject *self, PyTypeObject *cls, int mode)
10141001
return Py_GetConstant(Py_CONSTANT_EMPTY_BYTES);
10151002
}
10161003

1017-
ENTER_ZLIB(self);
1004+
PyMutex_Lock(&self->mutex);
10181005

10191006
self->zst.avail_in = 0;
10201007

@@ -1070,7 +1057,7 @@ zlib_Compress_flush_impl(compobject *self, PyTypeObject *cls, int mode)
10701057
OutputBuffer_OnError(&buffer);
10711058
return_value = NULL;
10721059
success:
1073-
LEAVE_ZLIB(self);
1060+
PyMutex_Unlock(&self->mutex);
10741061
return return_value;
10751062
}
10761063

@@ -1094,9 +1081,9 @@ zlib_Compress_copy_impl(compobject *self, PyTypeObject *cls)
10941081
if (!return_value) return NULL;
10951082

10961083
/* Copy the zstream state
1097-
* We use ENTER_ZLIB / LEAVE_ZLIB to make this thread-safe
1084+
* We use mutex to make this thread-safe
10981085
*/
1099-
ENTER_ZLIB(self);
1086+
PyMutex_Lock(&self->mutex);
11001087
int err = deflateCopy(&return_value->zst, &self->zst);
11011088
switch (err) {
11021089
case Z_OK:
@@ -1120,11 +1107,11 @@ zlib_Compress_copy_impl(compobject *self, PyTypeObject *cls)
11201107
/* Mark it as being initialized */
11211108
return_value->is_initialised = 1;
11221109

1123-
LEAVE_ZLIB(self);
1110+
PyMutex_Unlock(&self->mutex);
11241111
return (PyObject *)return_value;
11251112

11261113
error:
1127-
LEAVE_ZLIB(self);
1114+
PyMutex_Unlock(&self->mutex);
11281115
Py_XDECREF(return_value);
11291116
return NULL;
11301117
}
@@ -1178,9 +1165,9 @@ zlib_Decompress_copy_impl(compobject *self, PyTypeObject *cls)
11781165
if (!return_value) return NULL;
11791166

11801167
/* Copy the zstream state
1181-
* We use ENTER_ZLIB / LEAVE_ZLIB to make this thread-safe
1168+
* We use mutex to make this thread-safe
11821169
*/
1183-
ENTER_ZLIB(self);
1170+
PyMutex_Lock(&self->mutex);
11841171
int err = inflateCopy(&return_value->zst, &self->zst);
11851172
switch (err) {
11861173
case Z_OK:
@@ -1205,11 +1192,11 @@ zlib_Decompress_copy_impl(compobject *self, PyTypeObject *cls)
12051192
/* Mark it as being initialized */
12061193
return_value->is_initialised = 1;
12071194

1208-
LEAVE_ZLIB(self);
1195+
PyMutex_Unlock(&self->mutex);
12091196
return (PyObject *)return_value;
12101197

12111198
error:
1212-
LEAVE_ZLIB(self);
1199+
PyMutex_Unlock(&self->mutex);
12131200
Py_XDECREF(return_value);
12141201
return NULL;
12151202
}
@@ -1282,10 +1269,10 @@ zlib_Decompress_flush_impl(compobject *self, PyTypeObject *cls,
12821269
return NULL;
12831270
}
12841271

1285-
ENTER_ZLIB(self);
1272+
PyMutex_Lock(&self->mutex);
12861273

12871274
if (PyObject_GetBuffer(self->unconsumed_tail, &data, PyBUF_SIMPLE) == -1) {
1288-
LEAVE_ZLIB(self);
1275+
PyMutex_Unlock(&self->mutex);
12891276
return NULL;
12901277
}
12911278

@@ -1333,7 +1320,7 @@ zlib_Decompress_flush_impl(compobject *self, PyTypeObject *cls,
13331320

13341321
/* If at end of stream, clean up any memory allocated by zlib. */
13351322
if (err == Z_STREAM_END) {
1336-
self->eof = 1;
1323+
FT_ATOMIC_STORE_CHAR_RELAXED(self->eof, 1);
13371324
self->is_initialised = 0;
13381325
err = inflateEnd(&self->zst);
13391326
if (err != Z_OK) {
@@ -1352,7 +1339,7 @@ zlib_Decompress_flush_impl(compobject *self, PyTypeObject *cls,
13521339
return_value = NULL;
13531340
success:
13541341
PyBuffer_Release(&data);
1355-
LEAVE_ZLIB(self);
1342+
PyMutex_Unlock(&self->mutex);
13561343
return return_value;
13571344
}
13581345

@@ -1361,7 +1348,7 @@ typedef struct {
13611348
PyObject_HEAD
13621349
z_stream zst;
13631350
PyObject *zdict;
1364-
PyThread_type_lock lock;
1351+
PyMutex mutex;
13651352
PyObject *unused_data;
13661353
uint8_t *input_buffer;
13671354
Py_ssize_t input_buffer_size;
@@ -1387,7 +1374,7 @@ ZlibDecompressor_dealloc(PyObject *op)
13871374
PyTypeObject *type = Py_TYPE(op);
13881375
PyObject_GC_UnTrack(op);
13891376
ZlibDecompressor *self = ZlibDecompressor_CAST(op);
1390-
PyThread_free_lock(self->lock);
1377+
assert(!PyMutex_IsLocked(&self->mutex));
13911378
if (self->is_initialised) {
13921379
inflateEnd(&self->zst);
13931380
}
@@ -1545,7 +1532,7 @@ decompress_buf(ZlibDecompressor *self, Py_ssize_t max_length)
15451532
} while(err != Z_STREAM_END && self->avail_in_real != 0);
15461533

15471534
if (err == Z_STREAM_END) {
1548-
self->eof = 1;
1535+
FT_ATOMIC_STORE_CHAR_RELAXED(self->eof, 1);
15491536
self->is_initialised = 0;
15501537
/* Unlike the Decompress object we call inflateEnd here as there are no
15511538
backwards compatibility issues */
@@ -1633,23 +1620,23 @@ decompress(ZlibDecompressor *self, uint8_t *data,
16331620
}
16341621

16351622
if (self->eof) {
1636-
self->needs_input = 0;
1623+
FT_ATOMIC_STORE_CHAR_RELAXED(self->needs_input, 0);
16371624

16381625
if (self->avail_in_real > 0) {
16391626
PyObject *unused_data = PyBytes_FromStringAndSize(
16401627
(char *)self->zst.next_in, self->avail_in_real);
16411628
if (unused_data == NULL) {
16421629
goto error;
16431630
}
1644-
Py_XSETREF(self->unused_data, unused_data);
1631+
_PyObject_XSetRefDelayed(&self->unused_data, unused_data);
16451632
}
16461633
}
16471634
else if (self->avail_in_real == 0) {
16481635
self->zst.next_in = NULL;
1649-
self->needs_input = 1;
1636+
FT_ATOMIC_STORE_CHAR_RELAXED(self->needs_input, 1);
16501637
}
16511638
else {
1652-
self->needs_input = 0;
1639+
FT_ATOMIC_STORE_CHAR_RELAXED(self->needs_input, 0);
16531640

16541641
/* If we did not use the input buffer, we now have
16551642
to copy the tail from the caller's buffer into the
@@ -1718,14 +1705,14 @@ zlib__ZlibDecompressor_decompress_impl(ZlibDecompressor *self,
17181705
{
17191706
PyObject *result = NULL;
17201707

1721-
ENTER_ZLIB(self);
1708+
PyMutex_Lock(&self->mutex);
17221709
if (self->eof) {
17231710
PyErr_SetString(PyExc_EOFError, "End of stream already reached");
17241711
}
17251712
else {
17261713
result = decompress(self, data->buf, data->len, max_length);
17271714
}
1728-
LEAVE_ZLIB(self);
1715+
PyMutex_Unlock(&self->mutex);
17291716
return result;
17301717
}
17311718

@@ -1767,12 +1754,7 @@ zlib__ZlibDecompressor_impl(PyTypeObject *type, int wbits, PyObject *zdict)
17671754
self->zst.next_in = NULL;
17681755
self->zst.avail_in = 0;
17691756
self->unused_data = Py_GetConstant(Py_CONSTANT_EMPTY_BYTES);
1770-
self->lock = PyThread_allocate_lock();
1771-
if (self->lock == NULL) {
1772-
Py_DECREF(self);
1773-
PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
1774-
return NULL;
1775-
}
1757+
self->mutex = (PyMutex){0};
17761758
int err = inflateInit2(&(self->zst), wbits);
17771759
switch (err) {
17781760
case Z_OK:

0 commit comments

Comments
 (0)