mirror of
https://github.com/python/cpython.git
synced 2026-01-26 12:55:08 +00:00
gh-116738: Make _bz2 module thread-safe (gh-142756)
Make the attributes in _bz2 module thread-safe on the free-threading build. Attributes (eof, needs_input, unused_data) are now stored atomically or accessed via mutex-protected getters.
This commit is contained in:
parent
a882ae198a
commit
1a9cdaf63a
@ -42,11 +42,22 @@ class TestBZ2(unittest.TestCase):
|
||||
data = bz2d.decompress(compressed, chunk_size)
|
||||
self.assertEqual(len(data), chunk_size)
|
||||
output.append(data)
|
||||
# Read attributes concurrently with other threads decompressing
|
||||
self.assertIsInstance(bz2d.eof, bool)
|
||||
self.assertIsInstance(bz2d.needs_input, bool)
|
||||
self.assertIsInstance(bz2d.unused_data, bytes)
|
||||
|
||||
run_concurrently(worker_func=worker, nthreads=NTHREADS)
|
||||
self.assertEqual(len(output), NTHREADS)
|
||||
# Verify the expected chunks (order doesn't matter due to append race)
|
||||
self.assertEqual(set(output), set(chunks))
|
||||
self.assertTrue(bz2d.eof)
|
||||
self.assertFalse(bz2d.needs_input)
|
||||
# Each thread added full compressed data to the buffer, but only 1 copy
|
||||
# is consumed to produce the output. The rest remains as unused_data.
|
||||
self.assertEqual(
|
||||
len(bz2d.unused_data), len(compressed) * (NTHREADS - 1)
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@ -0,0 +1,2 @@
|
||||
Make the attributes in :mod:`bz2` thread-safe on the :term:`free threaded
|
||||
<free threading>` build.
|
||||
@ -12,6 +12,7 @@
|
||||
|
||||
// Blocks output buffer wrappers
|
||||
#include "pycore_blocks_output_buffer.h"
|
||||
#include "pycore_pyatomic_ft_wrappers.h" // FT_ATOMIC_STORE_CHAR_RELAXED
|
||||
|
||||
#if OUTPUT_BUFFER_MAX_BLOCK_SIZE > UINT32_MAX
|
||||
#error "The maximum block size accepted by libbzip2 is UINT32_MAX."
|
||||
@ -437,7 +438,7 @@ decompress_buf(BZ2Decompressor *d, Py_ssize_t max_length)
|
||||
if (catch_bz2_error(bzret))
|
||||
goto error;
|
||||
if (bzret == BZ_STREAM_END) {
|
||||
d->eof = 1;
|
||||
FT_ATOMIC_STORE_CHAR_RELAXED(d->eof, 1);
|
||||
break;
|
||||
} else if (d->bzs_avail_in_real == 0) {
|
||||
break;
|
||||
@ -521,7 +522,7 @@ decompress(BZ2Decompressor *d, char *data, size_t len, Py_ssize_t max_length)
|
||||
}
|
||||
|
||||
if (d->eof) {
|
||||
d->needs_input = 0;
|
||||
FT_ATOMIC_STORE_CHAR_RELAXED(d->needs_input, 0);
|
||||
if (d->bzs_avail_in_real > 0) {
|
||||
Py_XSETREF(d->unused_data,
|
||||
PyBytes_FromStringAndSize(bzs->next_in, d->bzs_avail_in_real));
|
||||
@ -531,10 +532,10 @@ decompress(BZ2Decompressor *d, char *data, size_t len, Py_ssize_t max_length)
|
||||
}
|
||||
else if (d->bzs_avail_in_real == 0) {
|
||||
bzs->next_in = NULL;
|
||||
d->needs_input = 1;
|
||||
FT_ATOMIC_STORE_CHAR_RELAXED(d->needs_input, 1);
|
||||
}
|
||||
else {
|
||||
d->needs_input = 0;
|
||||
FT_ATOMIC_STORE_CHAR_RELAXED(d->needs_input, 0);
|
||||
|
||||
/* If we did not use the input buffer, we now have
|
||||
to copy the tail from the caller's buffer into the
|
||||
@ -682,11 +683,28 @@ PyDoc_STRVAR(BZ2Decompressor_unused_data__doc__,
|
||||
PyDoc_STRVAR(BZ2Decompressor_needs_input_doc,
|
||||
"True if more input is needed before more decompressed data can be produced.");
|
||||
|
||||
static PyObject *
|
||||
BZ2Decompressor_unused_data_get(PyObject *op, void *Py_UNUSED(ignored))
|
||||
{
|
||||
BZ2Decompressor *self = _BZ2Decompressor_CAST(op);
|
||||
PyMutex_Lock(&self->mutex);
|
||||
PyObject *result = Py_XNewRef(self->unused_data);
|
||||
PyMutex_Unlock(&self->mutex);
|
||||
if (result == NULL) {
|
||||
PyErr_SetString(PyExc_AttributeError, "unused_data");
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
static PyGetSetDef BZ2Decompressor_getset[] = {
|
||||
{"unused_data", BZ2Decompressor_unused_data_get, NULL,
|
||||
BZ2Decompressor_unused_data__doc__},
|
||||
{NULL},
|
||||
};
|
||||
|
||||
static PyMemberDef BZ2Decompressor_members[] = {
|
||||
{"eof", Py_T_BOOL, offsetof(BZ2Decompressor, eof),
|
||||
Py_READONLY, BZ2Decompressor_eof__doc__},
|
||||
{"unused_data", Py_T_OBJECT_EX, offsetof(BZ2Decompressor, unused_data),
|
||||
Py_READONLY, BZ2Decompressor_unused_data__doc__},
|
||||
{"needs_input", Py_T_BOOL, offsetof(BZ2Decompressor, needs_input), Py_READONLY,
|
||||
BZ2Decompressor_needs_input_doc},
|
||||
{NULL}
|
||||
@ -697,6 +715,7 @@ static PyType_Slot bz2_decompressor_type_slots[] = {
|
||||
{Py_tp_methods, BZ2Decompressor_methods},
|
||||
{Py_tp_doc, (char *)_bz2_BZ2Decompressor__doc__},
|
||||
{Py_tp_members, BZ2Decompressor_members},
|
||||
{Py_tp_getset, BZ2Decompressor_getset},
|
||||
{Py_tp_new, _bz2_BZ2Decompressor},
|
||||
{0, 0}
|
||||
};
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user