gh-116738: Make zlib module thread-safe (gh-142432)

Makes the zlib module thread-safe free-threading build. Even though operations
are protected by locks, attributes exposed via PyMemberDef (eof, needs_input,
unused_data, unconsumed_tail) should still be stored atomically within locked
sections, since they can be read without acquiring the lock.
This commit is contained in:
Alper 2025-12-12 10:14:42 -08:00 committed by GitHub
parent 40ac3a9343
commit 1eddef8193
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 161 additions and 54 deletions

View File

@ -0,0 +1,80 @@
import itertools
import unittest
from test.support import import_helper, threading_helper
from test.support.threading_helper import run_concurrently
zlib = import_helper.import_module("zlib")
from test.test_zlib import HAMLET_SCENE
NTHREADS = 10
@threading_helper.requires_working_threading()
class TestZlib(unittest.TestCase):
def test_compressor(self):
comp = zlib.compressobj()
# First compress() outputs zlib header
header = comp.compress(HAMLET_SCENE)
self.assertGreater(len(header), 0)
def worker():
# it should return empty bytes as it buffers data internally
data = comp.compress(HAMLET_SCENE)
self.assertEqual(data, b"")
run_concurrently(worker_func=worker, nthreads=NTHREADS - 1)
full_compressed = header + comp.flush()
decompressed = zlib.decompress(full_compressed)
# The decompressed data should be HAMLET_SCENE repeated NTHREADS times
self.assertEqual(decompressed, HAMLET_SCENE * NTHREADS)
def test_decompressor_concurrent_attribute_reads(self):
input_data = HAMLET_SCENE * NTHREADS
compressed = zlib.compress(input_data)
decomp = zlib.decompressobj()
decomp_size_per_loop = len(input_data) // 1000
decompressed_parts = []
def decomp_worker():
# Decompress in chunks, which updates eof, unused_data, unconsumed_tail
decompressed_parts.append(
decomp.decompress(compressed, decomp_size_per_loop)
)
while decomp.unconsumed_tail:
decompressed_parts.append(
decomp.decompress(
decomp.unconsumed_tail, decomp_size_per_loop
)
)
def decomp_attr_reader():
# Read attributes concurrently while another thread decompresses
for _ in range(1000):
_ = decomp.unused_data
_ = decomp.unconsumed_tail
_ = decomp.eof
counter = itertools.count()
def worker():
# First thread decompresses, others read attributes
if next(counter) == 0:
decomp_worker()
else:
decomp_attr_reader()
run_concurrently(worker_func=worker, nthreads=NTHREADS)
self.assertTrue(decomp.eof)
self.assertEqual(decomp.unused_data, b"")
decompressed = b"".join(decompressed_parts)
self.assertEqual(decompressed, HAMLET_SCENE * NTHREADS)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,2 @@
Make the attributes in :mod:`zlib` thread-safe on the :term:`free threaded
<free threading>` build.

View File

@ -8,6 +8,7 @@
#endif
#include "Python.h"
#include "pycore_pyatomic_ft_wrappers.h" // FT_ATOMIC_STORE_CHAR_RELAXED
#include "zlib.h"
#include "stdbool.h"
@ -181,15 +182,6 @@ OutputBuffer_WindowOnError(_BlocksOutputBuffer *buffer, _Uint32Window *window)
}
#define ENTER_ZLIB(obj) do { \
if (!PyThread_acquire_lock((obj)->lock, 0)) { \
Py_BEGIN_ALLOW_THREADS \
PyThread_acquire_lock((obj)->lock, 1); \
Py_END_ALLOW_THREADS \
} } while (0)
#define LEAVE_ZLIB(obj) PyThread_release_lock((obj)->lock);
/* The following parameters are copied from zutil.h, version 0.95 */
#define DEFLATED 8
#if MAX_MEM_LEVEL >= 8
@ -228,7 +220,7 @@ typedef struct
char eof;
bool is_initialised;
PyObject *zdict;
PyThread_type_lock lock;
PyMutex mutex;
} compobject;
#define _compobject_CAST(op) ((compobject *)op)
@ -291,12 +283,7 @@ newcompobject(PyTypeObject *type)
Py_DECREF(self);
return NULL;
}
self->lock = PyThread_allocate_lock();
if (self->lock == NULL) {
Py_DECREF(self);
PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
return NULL;
}
self->mutex = (PyMutex){0};
return self;
}
@ -720,10 +707,10 @@ compobject_dealloc_impl(PyObject *op, int (*dealloc)(z_streamp))
PyTypeObject *type = Py_TYPE(op);
PyObject_GC_UnTrack(op);
compobject *self = _compobject_CAST(op);
assert(!PyMutex_IsLocked(&self->mutex));
if (self->is_initialised) {
(void)dealloc(&self->zst);
}
PyThread_free_lock(self->lock);
Py_XDECREF(self->unused_data);
Py_XDECREF(self->unconsumed_tail);
Py_XDECREF(self->zdict);
@ -777,7 +764,7 @@ zlib_Compress_compress_impl(compobject *self, PyTypeObject *cls,
_BlocksOutputBuffer buffer = {.writer = NULL};
zlibstate *state = PyType_GetModuleState(cls);
ENTER_ZLIB(self);
PyMutex_Lock(&self->mutex);
self->zst.next_in = data->buf;
Py_ssize_t ibuflen = data->len;
@ -819,7 +806,7 @@ zlib_Compress_compress_impl(compobject *self, PyTypeObject *cls,
OutputBuffer_OnError(&buffer);
return_value = NULL;
success:
LEAVE_ZLIB(self);
PyMutex_Unlock(&self->mutex);
return return_value;
}
@ -909,7 +896,7 @@ zlib_Decompress_decompress_impl(compobject *self, PyTypeObject *cls,
max_length = -1;
}
ENTER_ZLIB(self);
PyMutex_Lock(&self->mutex);
self->zst.next_in = data->buf;
ibuflen = data->len;
@ -962,7 +949,7 @@ zlib_Decompress_decompress_impl(compobject *self, PyTypeObject *cls,
if (err == Z_STREAM_END) {
/* This is the logical place to call inflateEnd, but the old behaviour
of only calling it on flush() is preserved. */
self->eof = 1;
FT_ATOMIC_STORE_CHAR_RELAXED(self->eof, 1);
} else if (err != Z_OK && err != Z_BUF_ERROR) {
/* We will only get Z_BUF_ERROR if the output buffer was full
but there wasn't more output when we tried again, so it is
@ -981,7 +968,7 @@ zlib_Decompress_decompress_impl(compobject *self, PyTypeObject *cls,
OutputBuffer_OnError(&buffer);
return_value = NULL;
success:
LEAVE_ZLIB(self);
PyMutex_Unlock(&self->mutex);
return return_value;
}
@ -1014,7 +1001,7 @@ zlib_Compress_flush_impl(compobject *self, PyTypeObject *cls, int mode)
return Py_GetConstant(Py_CONSTANT_EMPTY_BYTES);
}
ENTER_ZLIB(self);
PyMutex_Lock(&self->mutex);
self->zst.avail_in = 0;
@ -1070,7 +1057,7 @@ error:
OutputBuffer_OnError(&buffer);
return_value = NULL;
success:
LEAVE_ZLIB(self);
PyMutex_Unlock(&self->mutex);
return return_value;
}
@ -1094,9 +1081,9 @@ zlib_Compress_copy_impl(compobject *self, PyTypeObject *cls)
if (!return_value) return NULL;
/* Copy the zstream state
* We use ENTER_ZLIB / LEAVE_ZLIB to make this thread-safe
* We use mutex to make this thread-safe
*/
ENTER_ZLIB(self);
PyMutex_Lock(&self->mutex);
int err = deflateCopy(&return_value->zst, &self->zst);
switch (err) {
case Z_OK:
@ -1120,11 +1107,11 @@ zlib_Compress_copy_impl(compobject *self, PyTypeObject *cls)
/* Mark it as being initialized */
return_value->is_initialised = 1;
LEAVE_ZLIB(self);
PyMutex_Unlock(&self->mutex);
return (PyObject *)return_value;
error:
LEAVE_ZLIB(self);
PyMutex_Unlock(&self->mutex);
Py_XDECREF(return_value);
return NULL;
}
@ -1178,9 +1165,9 @@ zlib_Decompress_copy_impl(compobject *self, PyTypeObject *cls)
if (!return_value) return NULL;
/* Copy the zstream state
* We use ENTER_ZLIB / LEAVE_ZLIB to make this thread-safe
* We use mutex to make this thread-safe
*/
ENTER_ZLIB(self);
PyMutex_Lock(&self->mutex);
int err = inflateCopy(&return_value->zst, &self->zst);
switch (err) {
case Z_OK:
@ -1205,11 +1192,11 @@ zlib_Decompress_copy_impl(compobject *self, PyTypeObject *cls)
/* Mark it as being initialized */
return_value->is_initialised = 1;
LEAVE_ZLIB(self);
PyMutex_Unlock(&self->mutex);
return (PyObject *)return_value;
error:
LEAVE_ZLIB(self);
PyMutex_Unlock(&self->mutex);
Py_XDECREF(return_value);
return NULL;
}
@ -1282,10 +1269,10 @@ zlib_Decompress_flush_impl(compobject *self, PyTypeObject *cls,
return NULL;
}
ENTER_ZLIB(self);
PyMutex_Lock(&self->mutex);
if (PyObject_GetBuffer(self->unconsumed_tail, &data, PyBUF_SIMPLE) == -1) {
LEAVE_ZLIB(self);
PyMutex_Unlock(&self->mutex);
return NULL;
}
@ -1333,7 +1320,7 @@ zlib_Decompress_flush_impl(compobject *self, PyTypeObject *cls,
/* If at end of stream, clean up any memory allocated by zlib. */
if (err == Z_STREAM_END) {
self->eof = 1;
FT_ATOMIC_STORE_CHAR_RELAXED(self->eof, 1);
self->is_initialised = 0;
err = inflateEnd(&self->zst);
if (err != Z_OK) {
@ -1352,7 +1339,7 @@ zlib_Decompress_flush_impl(compobject *self, PyTypeObject *cls,
return_value = NULL;
success:
PyBuffer_Release(&data);
LEAVE_ZLIB(self);
PyMutex_Unlock(&self->mutex);
return return_value;
}
@ -1361,7 +1348,7 @@ typedef struct {
PyObject_HEAD
z_stream zst;
PyObject *zdict;
PyThread_type_lock lock;
PyMutex mutex;
PyObject *unused_data;
uint8_t *input_buffer;
Py_ssize_t input_buffer_size;
@ -1387,7 +1374,7 @@ ZlibDecompressor_dealloc(PyObject *op)
PyTypeObject *type = Py_TYPE(op);
PyObject_GC_UnTrack(op);
ZlibDecompressor *self = ZlibDecompressor_CAST(op);
PyThread_free_lock(self->lock);
assert(!PyMutex_IsLocked(&self->mutex));
if (self->is_initialised) {
inflateEnd(&self->zst);
}
@ -1545,7 +1532,7 @@ decompress_buf(ZlibDecompressor *self, Py_ssize_t max_length)
} while(err != Z_STREAM_END && self->avail_in_real != 0);
if (err == Z_STREAM_END) {
self->eof = 1;
FT_ATOMIC_STORE_CHAR_RELAXED(self->eof, 1);
self->is_initialised = 0;
/* Unlike the Decompress object we call inflateEnd here as there are no
backwards compatibility issues */
@ -1633,7 +1620,7 @@ decompress(ZlibDecompressor *self, uint8_t *data,
}
if (self->eof) {
self->needs_input = 0;
FT_ATOMIC_STORE_CHAR_RELAXED(self->needs_input, 0);
if (self->avail_in_real > 0) {
PyObject *unused_data = PyBytes_FromStringAndSize(
@ -1646,10 +1633,10 @@ decompress(ZlibDecompressor *self, uint8_t *data,
}
else if (self->avail_in_real == 0) {
self->zst.next_in = NULL;
self->needs_input = 1;
FT_ATOMIC_STORE_CHAR_RELAXED(self->needs_input, 1);
}
else {
self->needs_input = 0;
FT_ATOMIC_STORE_CHAR_RELAXED(self->needs_input, 0);
/* If we did not use the input buffer, we now have
to copy the tail from the caller's buffer into the
@ -1718,14 +1705,14 @@ zlib__ZlibDecompressor_decompress_impl(ZlibDecompressor *self,
{
PyObject *result = NULL;
ENTER_ZLIB(self);
PyMutex_Lock(&self->mutex);
if (self->eof) {
PyErr_SetString(PyExc_EOFError, "End of stream already reached");
}
else {
result = decompress(self, data->buf, data->len, max_length);
}
LEAVE_ZLIB(self);
PyMutex_Unlock(&self->mutex);
return result;
}
@ -1767,12 +1754,7 @@ zlib__ZlibDecompressor_impl(PyTypeObject *type, int wbits, PyObject *zdict)
self->zst.next_in = NULL;
self->zst.avail_in = 0;
self->unused_data = Py_GetConstant(Py_CONSTANT_EMPTY_BYTES);
self->lock = PyThread_allocate_lock();
if (self->lock == NULL) {
Py_DECREF(self);
PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
return NULL;
}
self->mutex = (PyMutex){0};
int err = inflateInit2(&(self->zst), wbits);
switch (err) {
case Z_OK:
@ -1827,10 +1809,36 @@ static PyMethodDef ZlibDecompressor_methods[] = {
{NULL}
};
static PyObject *
Decomp_unused_data_get(PyObject *op, void *Py_UNUSED(ignored))
{
compobject *self = _compobject_CAST(op);
PyMutex_Lock(&self->mutex);
assert(self->unused_data != NULL);
PyObject *result = Py_NewRef(self->unused_data);
PyMutex_Unlock(&self->mutex);
return result;
}
static PyObject *
Decomp_unconsumed_tail_get(PyObject *op, void *Py_UNUSED(ignored))
{
compobject *self = _compobject_CAST(op);
PyMutex_Lock(&self->mutex);
assert(self->unconsumed_tail != NULL);
PyObject *result = Py_NewRef(self->unconsumed_tail);
PyMutex_Unlock(&self->mutex);
return result;
}
static PyGetSetDef Decomp_getset[] = {
{"unused_data", Decomp_unused_data_get, NULL, NULL},
{"unconsumed_tail", Decomp_unconsumed_tail_get, NULL, NULL},
{NULL},
};
#define COMP_OFF(x) offsetof(compobject, x)
static PyMemberDef Decomp_members[] = {
{"unused_data", _Py_T_OBJECT, COMP_OFF(unused_data), Py_READONLY},
{"unconsumed_tail", _Py_T_OBJECT, COMP_OFF(unconsumed_tail), Py_READONLY},
{"eof", Py_T_BOOL, COMP_OFF(eof), Py_READONLY},
{NULL},
};
@ -1844,11 +1852,26 @@ PyDoc_STRVAR(ZlibDecompressor_unused_data__doc__,
PyDoc_STRVAR(ZlibDecompressor_needs_input_doc,
"True if more input is needed before more decompressed data can be produced.");
static PyObject *
ZlibDecompressor_unused_data_get(PyObject *op, void *Py_UNUSED(ignored))
{
ZlibDecompressor *self = ZlibDecompressor_CAST(op);
PyMutex_Lock(&self->mutex);
assert(self->unused_data != NULL);
PyObject *result = Py_NewRef(self->unused_data);
PyMutex_Unlock(&self->mutex);
return result;
}
static PyGetSetDef ZlibDecompressor_getset[] = {
{"unused_data", ZlibDecompressor_unused_data_get, NULL,
ZlibDecompressor_unused_data__doc__},
{NULL},
};
static PyMemberDef ZlibDecompressor_members[] = {
{"eof", Py_T_BOOL, offsetof(ZlibDecompressor, eof),
Py_READONLY, ZlibDecompressor_eof__doc__},
{"unused_data", Py_T_OBJECT_EX, offsetof(ZlibDecompressor, unused_data),
Py_READONLY, ZlibDecompressor_unused_data__doc__},
{"needs_input", Py_T_BOOL, offsetof(ZlibDecompressor, needs_input), Py_READONLY,
ZlibDecompressor_needs_input_doc},
{NULL},
@ -2074,6 +2097,7 @@ static PyType_Slot Decomptype_slots[] = {
{Py_tp_traverse, compobject_traverse},
{Py_tp_methods, Decomp_methods},
{Py_tp_members, Decomp_members},
{Py_tp_getset, Decomp_getset},
{0, 0},
};
@ -2093,6 +2117,7 @@ static PyType_Slot ZlibDecompressor_type_slots[] = {
{Py_tp_dealloc, ZlibDecompressor_dealloc},
{Py_tp_traverse, ZlibDecompressor_traverse},
{Py_tp_members, ZlibDecompressor_members},
{Py_tp_getset, ZlibDecompressor_getset},
{Py_tp_new, zlib__ZlibDecompressor},
{Py_tp_doc, (char *)zlib__ZlibDecompressor__doc__},
{Py_tp_methods, ZlibDecompressor_methods},