1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
import os
from pathlib import Path
import pytest
from pkgcore.fs import fs, livefs
from pkgcore.fs.contents import contentsSet
class TestFsObjs:
def check_attrs(self, obj, path, offset=None):
if offset is None:
st = path.lstat()
else:
st = (offset / path).lstat()
if offset is not None:
assert offset.is_absolute(), f"path must be absolute, got {path!r}"
assert (obj.mode & 0o7777) == (st.st_mode & 0o7777)
assert obj.uid == st.st_uid
assert obj.gid == st.st_gid
if fs.isreg(obj):
if offset is None:
assert obj.data.path == str(path)
else:
assert obj.data.path == str(offset / path)
def test_data_source(self):
o = livefs.gen_obj("/tmp/etc/passwd", real_location="/etc/passwd")
assert o.location, "/tmp/etc/passwd"
assert o.data.path, "/etc/passwd"
with open("/etc/passwd", "rb") as f:
with o.data.bytes_fileobj() as fileobj:
assert fileobj.read() == f.read()
def test_gen_obj_reg(self, tmp_path):
(path := tmp_path / "reg_obj").touch()
o = livefs.gen_obj(str(path))
assert fs.isreg(o)
self.check_attrs(o, path)
o2 = livefs.gen_obj(str(path), inode=None)
self.check_attrs(o, path)
assert o.inode != o2.inode
def test_gen_obj_dir(self, tmp_path):
o = livefs.gen_obj(str(tmp_path))
assert fs.isdir(o)
self.check_attrs(o, tmp_path)
def test_gen_obj_sym(self, tmp_path):
(src := tmp_path / "s").touch()
(link := tmp_path / "t").symlink_to(src)
obj = livefs.gen_obj(str(link))
assert isinstance(obj, fs.fsSymlink)
self.check_attrs(obj, link)
assert os.readlink(link) == obj.target
def test_gen_obj_fifo(self, tmp_path):
os.mkfifo(path := tmp_path / "fifo")
o = livefs.gen_obj(str(path))
self.check_attrs(o, path)
def test_iterscan(self, tmp_path):
(path := tmp_path / "iscan").mkdir()
files = [path / x for x in ("tmp", "blah", "dar")]
for x in files:
x.touch()
dirs = [path / x for x in ("a", "b", "c")]
for x in dirs:
x.mkdir()
dirs.append(path)
for obj in livefs.iter_scan(str(path)):
assert isinstance(obj, fs.fsBase)
if fs.isreg(obj):
assert Path(obj.location) in files
elif fs.isdir(obj):
assert Path(obj.location) in dirs
else:
pytest.fail(f"unknown object popped up in testing dir, {obj!r}")
self.check_attrs(obj, Path(obj.location))
# do offset verification now.
offset = path
for obj in livefs.iter_scan(str(path), offset=str(offset)):
self.check_attrs(obj, Path(obj.location).relative_to("/"), offset=offset)
seen = []
for obj in livefs.iter_scan(str(files[0])):
self.check_attrs(obj, Path(obj.location))
seen.append(obj.location)
assert [str(files[0])] == sorted(seen)
def test_sorted_scan(self, tmp_path):
for x in ("tmp", "blah", "dar"):
(tmp_path / x).touch()
for x in ("a", "b", "c"):
(tmp_path / x).mkdir()
# regular directory scanning
sorted_files = livefs.sorted_scan(str(tmp_path))
assert sorted_files == [str(tmp_path / x) for x in ("blah", "dar", "tmp")]
# nonexistent paths
nonexistent_path = str(tmp_path / "foobar")
assert livefs.sorted_scan(nonexistent_path) == []
assert livefs.sorted_scan(nonexistent_path, nonexistent=True) == [
nonexistent_path
]
def test_sorted_scan_hidden(self, tmp_path):
for x in (".tmp", "blah"):
(tmp_path / x).touch()
sorted_files = livefs.sorted_scan(str(tmp_path))
assert [str(tmp_path / x) for x in (".tmp", "blah")] == sorted_files
sorted_files = livefs.sorted_scan(str(tmp_path), hidden=False)
assert [str(tmp_path / x) for x in ("blah",)] == sorted_files
def test_sorted_scan_backup(self, tmp_path):
for x in ("blah", "blah~"):
(tmp_path / x).touch()
sorted_files = livefs.sorted_scan(str(tmp_path))
assert [str(tmp_path / x) for x in ("blah", "blah~")] == sorted_files
sorted_files = livefs.sorted_scan(str(tmp_path), backup=False)
assert [str(tmp_path / x) for x in ("blah",)] == sorted_files
def test_relative_sym(self, tmp_path):
(path := tmp_path / "relative-symlink-test").symlink_to("../sym1/blah")
o = livefs.gen_obj(str(path))
assert o.target == "../sym1/blah"
def test_intersect(self, tmp_path):
(tmp_path / "reg").touch()
cset = contentsSet([fs.fsFile("reg", strict=False)])
cset = cset.insert_offset(str(tmp_path))
assert contentsSet(livefs.intersect(cset)) == cset
cset = contentsSet(
[
fs.fsFile("reg/foon", strict=False),
fs.fsFile("reg/dar", strict=False),
fs.fsDir("reg/dir", strict=False),
]
).insert_offset(str(tmp_path))
assert not list(livefs.intersect(cset))
cset = contentsSet([fs.fsDir("reg", strict=False)])
assert not list(livefs.intersect(cset))
|