aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/test.yml5
-rw-r--r--src/snakeoil/compression/_bzip2.py15
-rw-r--r--src/snakeoil/compression/_util.py4
-rw-r--r--src/snakeoil/test/__init__.py15
-rw-r--r--tests/compression/__init__.py0
-rw-r--r--tests/compression/test_bzip2.py141
6 files changed, 168 insertions, 12 deletions
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index b512934d..bdff1444 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -40,6 +40,11 @@ jobs:
python -m pip install --upgrade pip
pip install .[test]
+ - name: Install non-python deps
+ if: ${{ matrix.os == 'ubuntu-latest' }}
+ run: |
+ sudo apt install lbzip2
+
- name: Test with pytest
env:
PY_COLORS: 1 # forcibly enable pytest colors
diff --git a/src/snakeoil/compression/_bzip2.py b/src/snakeoil/compression/_bzip2.py
index 05db221c..b10e3ec3 100644
--- a/src/snakeoil/compression/_bzip2.py
+++ b/src/snakeoil/compression/_bzip2.py
@@ -24,9 +24,7 @@ bz2_path = process.find_binary("bzip2")
try:
- from bz2 import BZ2File
- from bz2 import compress as _compress_data
- from bz2 import decompress as _decompress_data
+ from bz2 import BZ2File, compress as _compress_data, decompress as _decompress_data
native = True
except ImportError:
@@ -40,16 +38,15 @@ except ImportError:
_compress_handle = partial(_util.compress_handle, bz2_path)
_decompress_handle = partial(_util.decompress_handle, bz2_path)
-lbzip2_path = None
-parallelizable = False
-lbzip2_compress_args = lbzip2_decompress_args = ()
try:
lbzip2_path = process.find_binary("lbzip2")
- lbzip2_compress_args = ('-n%i' % multiprocessing.cpu_count(),)
+ lbzip2_compress_args = (f'-n{multiprocessing.cpu_count()}', )
lbzip2_decompress_args = lbzip2_compress_args
parallelizable = True
except process.CommandNotFound:
- pass
+ lbzip2_path = None
+ parallelizable = False
+ lbzip2_compress_args = lbzip2_decompress_args = ()
def compress_data(data, level=9, parallelize=False):
@@ -76,6 +73,6 @@ def decompress_handle(handle, parallelize=False):
if parallelize and parallelizable:
return _util.decompress_handle(lbzip2_path, handle,
extra_args=lbzip2_decompress_args)
- elif (native and isinstance(handle, str)):
+ elif native and isinstance(handle, str):
return BZ2File(handle, mode='r')
return _decompress_handle(handle)
diff --git a/src/snakeoil/compression/_util.py b/src/snakeoil/compression/_util.py
index 70da8563..6a3c7258 100644
--- a/src/snakeoil/compression/_util.py
+++ b/src/snakeoil/compression/_util.py
@@ -12,9 +12,9 @@ def _drive_process(args, mode, data):
try:
stdout, stderr = p.communicate(data)
if p.returncode != 0:
+ args = ' '.join(args)
raise ValueError(
- "%s returned %i exitcode from '%s', stderr=%r" %
- (mode, p.returncode, ' '.join(args), stderr))
+ f"{mode} returned {p.returncode} exitcode from '{args}', stderr={stderr.decode()}")
return stdout
finally:
if p is not None and p.returncode is None:
diff --git a/src/snakeoil/test/__init__.py b/src/snakeoil/test/__init__.py
index 72ea2b4e..bb9381f7 100644
--- a/src/snakeoil/test/__init__.py
+++ b/src/snakeoil/test/__init__.py
@@ -5,6 +5,7 @@ import random
import string
import subprocess
import sys
+from unittest.mock import patch
# not relative imports so protect_process() works properly
from snakeoil import klass
@@ -67,8 +68,20 @@ def protect_process(functor, name=None):
if wipe:
os.environ.pop(_PROTECT_ENV_VAR, None)
- for x in "__doc__ __name__".split():
+ for x in ("__doc__", "__name__"):
if hasattr(functor, x):
setattr(_inner_run, x, getattr(functor, x))
method_name = getattr(functor, '__name__', None)
return _inner_run
+
+
+def hide_imports(*import_names: str):
+ """Hide imports from the specified modules."""
+ orig_import = __import__
+
+ def mock_import(name, *args, **kwargs):
+ if name in import_names:
+ raise ImportError()
+ return orig_import(name, *args, **kwargs)
+
+ return patch('builtins.__import__', side_effect=mock_import)
diff --git a/tests/compression/__init__.py b/tests/compression/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/compression/__init__.py
diff --git a/tests/compression/test_bzip2.py b/tests/compression/test_bzip2.py
new file mode 100644
index 00000000..f8f1d1fb
--- /dev/null
+++ b/tests/compression/test_bzip2.py
@@ -0,0 +1,141 @@
+import importlib
+from bz2 import decompress
+from pathlib import Path
+from unittest import mock
+
+import pytest
+from snakeoil.compression import _bzip2
+from snakeoil.process import CommandNotFound, find_binary
+from snakeoil.test import hide_imports
+
+
+def hide_binary(*binaries: str):
+ def mock_find_binary(name):
+ if name in binaries:
+ raise CommandNotFound(name)
+ return find_binary(name)
+
+ return mock.patch('snakeoil.process.find_binary', side_effect=mock_find_binary)
+
+
+def test_no_native():
+ with hide_imports('bz2'):
+ importlib.reload(_bzip2)
+ assert not _bzip2.native
+
+
+def test_missing_bzip2_binary():
+ with hide_binary('bzip2'):
+ with pytest.raises(CommandNotFound, match='bzip2'):
+ importlib.reload(_bzip2)
+
+
+def test_missing_lbzip2_binary():
+ with hide_binary('lbzip2'):
+ importlib.reload(_bzip2)
+ assert not _bzip2.parallelizable
+
+
+decompressed_test_data = b'Some text here\n'
+compressed_test_data = (
+ b'BZh91AY&SY\x1bM\x00\x02\x00\x00\x01\xd3\x80\x00\x10@\x00\x08\x00\x02'
+ b'B\x94@ \x00"\r\x03\xd4\x0c \t!\x1b\xb7\x80u/\x17rE8P\x90\x1bM\x00\x02'
+)
+
+
+class Base:
+
+ @pytest.mark.parametrize('parallelize', (True, False))
+ @pytest.mark.parametrize('level', (1, 9))
+ def test_compress_data(self, level, parallelize):
+ compressed = _bzip2.compress_data(decompressed_test_data, level=level, parallelize=parallelize)
+ assert compressed
+ assert decompress(compressed) == decompressed_test_data
+
+ @pytest.mark.parametrize('parallelize', (True, False))
+ def test_decompress_data(self, parallelize):
+ assert decompressed_test_data == _bzip2.decompress_data(compressed_test_data, parallelize=parallelize)
+
+ @pytest.mark.parametrize('parallelize', (True, False))
+ @pytest.mark.parametrize('level', (1, 9))
+ def test_compress_handle(self, tmp_path, level, parallelize):
+ path = tmp_path / 'test.bz2'
+
+ stream = _bzip2.compress_handle(str(path), level=level, parallelize=parallelize)
+ stream.write(decompressed_test_data)
+ stream.close()
+ assert decompress(path.read_bytes()) == decompressed_test_data
+
+ with path.open("wb") as file:
+ stream = _bzip2.compress_handle(file, level=level, parallelize=parallelize)
+ stream.write(decompressed_test_data)
+ stream.close()
+ assert decompress(path.read_bytes()) == decompressed_test_data
+
+ with path.open("wb") as file:
+ stream = _bzip2.compress_handle(file.fileno(), level=level, parallelize=parallelize)
+ stream.write(decompressed_test_data)
+ stream.close()
+ assert decompress(path.read_bytes()) == decompressed_test_data
+
+ with pytest.raises(TypeError):
+ _bzip2.compress_handle(b'', level=level, parallelize=parallelize)
+
+ @pytest.mark.parametrize('parallelize', (True, False))
+ def test_decompress_handle(self, tmp_path, parallelize):
+ path: Path = tmp_path / 'test.bz2'
+ path.write_bytes(compressed_test_data)
+
+ stream = _bzip2.decompress_handle(str(path), parallelize=parallelize)
+ assert stream.read() == decompressed_test_data
+ stream.close()
+
+ with path.open("rb") as file:
+ stream = _bzip2.decompress_handle(file, parallelize=parallelize)
+ assert stream.read() == decompressed_test_data
+ stream.close()
+
+ with path.open("rb") as file:
+ stream = _bzip2.decompress_handle(file.fileno(), parallelize=parallelize)
+ assert stream.read() == decompressed_test_data
+ stream.close()
+
+ with pytest.raises(TypeError):
+ _bzip2.decompress_handle(b'', parallelize=parallelize)
+
+
+class TestStdlib(Base):
+
+ @pytest.fixture(autouse=True, scope='class')
+ def _setup(self):
+ try:
+ find_binary('bzip2')
+ except CommandNotFound:
+ pytest.skip('bzip2 binary not found')
+ with hide_binary('lbzip2'):
+ importlib.reload(_bzip2)
+ yield
+
+
+class TestBzip2(Base):
+
+ @pytest.fixture(autouse=True, scope='class')
+ def _setup(self):
+ with hide_binary('lbzip2'):
+ importlib.reload(_bzip2)
+ yield
+
+
+class TestLbzip2(Base):
+
+ @pytest.fixture(autouse=True, scope='class')
+ def _setup(self):
+ try:
+ find_binary('lbzip2')
+ except CommandNotFound:
+ pytest.skip('lbzip2 binary not found')
+ importlib.reload(_bzip2)
+
+ def test_bad_level(self):
+ with pytest.raises(ValueError, match='unknown option "-0"'):
+ _bzip2.compress_data(decompressed_test_data, level=90, parallelize=True)