diff options
-rw-r--r-- | .github/workflows/test.yml | 5 | ||||
-rw-r--r-- | src/snakeoil/compression/_bzip2.py | 15 | ||||
-rw-r--r-- | src/snakeoil/compression/_util.py | 4 | ||||
-rw-r--r-- | src/snakeoil/test/__init__.py | 15 | ||||
-rw-r--r-- | tests/compression/__init__.py | 0 | ||||
-rw-r--r-- | tests/compression/test_bzip2.py | 141 |
6 files changed, 168 insertions, 12 deletions
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b512934d..bdff1444 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -40,6 +40,11 @@ jobs: python -m pip install --upgrade pip pip install .[test] + - name: Install non-python deps + if: ${{ matrix.os == 'ubuntu-latest' }} + run: | + sudo apt install lbzip2 + - name: Test with pytest env: PY_COLORS: 1 # forcibly enable pytest colors diff --git a/src/snakeoil/compression/_bzip2.py b/src/snakeoil/compression/_bzip2.py index 05db221c..b10e3ec3 100644 --- a/src/snakeoil/compression/_bzip2.py +++ b/src/snakeoil/compression/_bzip2.py @@ -24,9 +24,7 @@ bz2_path = process.find_binary("bzip2") try: - from bz2 import BZ2File - from bz2 import compress as _compress_data - from bz2 import decompress as _decompress_data + from bz2 import BZ2File, compress as _compress_data, decompress as _decompress_data native = True except ImportError: @@ -40,16 +38,15 @@ except ImportError: _compress_handle = partial(_util.compress_handle, bz2_path) _decompress_handle = partial(_util.decompress_handle, bz2_path) -lbzip2_path = None -parallelizable = False -lbzip2_compress_args = lbzip2_decompress_args = () try: lbzip2_path = process.find_binary("lbzip2") - lbzip2_compress_args = ('-n%i' % multiprocessing.cpu_count(),) + lbzip2_compress_args = (f'-n{multiprocessing.cpu_count()}', ) lbzip2_decompress_args = lbzip2_compress_args parallelizable = True except process.CommandNotFound: - pass + lbzip2_path = None + parallelizable = False + lbzip2_compress_args = lbzip2_decompress_args = () def compress_data(data, level=9, parallelize=False): @@ -76,6 +73,6 @@ def decompress_handle(handle, parallelize=False): if parallelize and parallelizable: return _util.decompress_handle(lbzip2_path, handle, extra_args=lbzip2_decompress_args) - elif (native and isinstance(handle, str)): + elif native and isinstance(handle, str): return BZ2File(handle, mode='r') return _decompress_handle(handle) diff --git a/src/snakeoil/compression/_util.py b/src/snakeoil/compression/_util.py index 70da8563..6a3c7258 100644 --- a/src/snakeoil/compression/_util.py +++ b/src/snakeoil/compression/_util.py @@ -12,9 +12,9 @@ def _drive_process(args, mode, data): try: stdout, stderr = p.communicate(data) if p.returncode != 0: + args = ' '.join(args) raise ValueError( - "%s returned %i exitcode from '%s', stderr=%r" % - (mode, p.returncode, ' '.join(args), stderr)) + f"{mode} returned {p.returncode} exitcode from '{args}', stderr={stderr.decode()}") return stdout finally: if p is not None and p.returncode is None: diff --git a/src/snakeoil/test/__init__.py b/src/snakeoil/test/__init__.py index 72ea2b4e..bb9381f7 100644 --- a/src/snakeoil/test/__init__.py +++ b/src/snakeoil/test/__init__.py @@ -5,6 +5,7 @@ import random import string import subprocess import sys +from unittest.mock import patch # not relative imports so protect_process() works properly from snakeoil import klass @@ -67,8 +68,20 @@ def protect_process(functor, name=None): if wipe: os.environ.pop(_PROTECT_ENV_VAR, None) - for x in "__doc__ __name__".split(): + for x in ("__doc__", "__name__"): if hasattr(functor, x): setattr(_inner_run, x, getattr(functor, x)) method_name = getattr(functor, '__name__', None) return _inner_run + + +def hide_imports(*import_names: str): + """Hide imports from the specified modules.""" + orig_import = __import__ + + def mock_import(name, *args, **kwargs): + if name in import_names: + raise ImportError() + return orig_import(name, *args, **kwargs) + + return patch('builtins.__import__', side_effect=mock_import) diff --git a/tests/compression/__init__.py b/tests/compression/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/compression/__init__.py diff --git a/tests/compression/test_bzip2.py b/tests/compression/test_bzip2.py new file mode 100644 index 00000000..f8f1d1fb --- /dev/null +++ b/tests/compression/test_bzip2.py @@ -0,0 +1,141 @@ +import importlib +from bz2 import decompress +from pathlib import Path +from unittest import mock + +import pytest +from snakeoil.compression import _bzip2 +from snakeoil.process import CommandNotFound, find_binary +from snakeoil.test import hide_imports + + +def hide_binary(*binaries: str): + def mock_find_binary(name): + if name in binaries: + raise CommandNotFound(name) + return find_binary(name) + + return mock.patch('snakeoil.process.find_binary', side_effect=mock_find_binary) + + +def test_no_native(): + with hide_imports('bz2'): + importlib.reload(_bzip2) + assert not _bzip2.native + + +def test_missing_bzip2_binary(): + with hide_binary('bzip2'): + with pytest.raises(CommandNotFound, match='bzip2'): + importlib.reload(_bzip2) + + +def test_missing_lbzip2_binary(): + with hide_binary('lbzip2'): + importlib.reload(_bzip2) + assert not _bzip2.parallelizable + + +decompressed_test_data = b'Some text here\n' +compressed_test_data = ( + b'BZh91AY&SY\x1bM\x00\x02\x00\x00\x01\xd3\x80\x00\x10@\x00\x08\x00\x02' + b'B\x94@ \x00"\r\x03\xd4\x0c \t!\x1b\xb7\x80u/\x17rE8P\x90\x1bM\x00\x02' +) + + +class Base: + + @pytest.mark.parametrize('parallelize', (True, False)) + @pytest.mark.parametrize('level', (1, 9)) + def test_compress_data(self, level, parallelize): + compressed = _bzip2.compress_data(decompressed_test_data, level=level, parallelize=parallelize) + assert compressed + assert decompress(compressed) == decompressed_test_data + + @pytest.mark.parametrize('parallelize', (True, False)) + def test_decompress_data(self, parallelize): + assert decompressed_test_data == _bzip2.decompress_data(compressed_test_data, parallelize=parallelize) + + @pytest.mark.parametrize('parallelize', (True, False)) + @pytest.mark.parametrize('level', (1, 9)) + def test_compress_handle(self, tmp_path, level, parallelize): + path = tmp_path / 'test.bz2' + + stream = _bzip2.compress_handle(str(path), level=level, parallelize=parallelize) + stream.write(decompressed_test_data) + stream.close() + assert decompress(path.read_bytes()) == decompressed_test_data + + with path.open("wb") as file: + stream = _bzip2.compress_handle(file, level=level, parallelize=parallelize) + stream.write(decompressed_test_data) + stream.close() + assert decompress(path.read_bytes()) == decompressed_test_data + + with path.open("wb") as file: + stream = _bzip2.compress_handle(file.fileno(), level=level, parallelize=parallelize) + stream.write(decompressed_test_data) + stream.close() + assert decompress(path.read_bytes()) == decompressed_test_data + + with pytest.raises(TypeError): + _bzip2.compress_handle(b'', level=level, parallelize=parallelize) + + @pytest.mark.parametrize('parallelize', (True, False)) + def test_decompress_handle(self, tmp_path, parallelize): + path: Path = tmp_path / 'test.bz2' + path.write_bytes(compressed_test_data) + + stream = _bzip2.decompress_handle(str(path), parallelize=parallelize) + assert stream.read() == decompressed_test_data + stream.close() + + with path.open("rb") as file: + stream = _bzip2.decompress_handle(file, parallelize=parallelize) + assert stream.read() == decompressed_test_data + stream.close() + + with path.open("rb") as file: + stream = _bzip2.decompress_handle(file.fileno(), parallelize=parallelize) + assert stream.read() == decompressed_test_data + stream.close() + + with pytest.raises(TypeError): + _bzip2.decompress_handle(b'', parallelize=parallelize) + + +class TestStdlib(Base): + + @pytest.fixture(autouse=True, scope='class') + def _setup(self): + try: + find_binary('bzip2') + except CommandNotFound: + pytest.skip('bzip2 binary not found') + with hide_binary('lbzip2'): + importlib.reload(_bzip2) + yield + + +class TestBzip2(Base): + + @pytest.fixture(autouse=True, scope='class') + def _setup(self): + with hide_binary('lbzip2'): + importlib.reload(_bzip2) + yield + + +class TestLbzip2(Base): + + @pytest.fixture(autouse=True, scope='class') + def _setup(self): + try: + find_binary('lbzip2') + except CommandNotFound: + pytest.skip('lbzip2 binary not found') + importlib.reload(_bzip2) + + def test_bad_level(self): + with pytest.raises(ValueError, match='unknown option "-0"'): + _bzip2.compress_data(decompressed_test_data, level=90, parallelize=True) |