aboutsummaryrefslogtreecommitdiff
blob: 3ca737cb815bebdba37452df1c084e6070c9e29a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
"""Functionality for reading bash like files

Please note that while this functionality can do variable interpolation,
it strictly treats the source as non-executable code.  It cannot parse
subshells, variable additions, etc.

Its primary usage is for reading things like gentoo make.conf's, or
libtool .la files that are bash compatible, but non-executable.
"""

from shlex import shlex

from .demandload import demand_compile_regexp
from .fileutils import readlines
from .log import logger
from .mappings import ProtectedDict

demand_compile_regexp("line_cont_regexp", r"^(.*[^\\]|)\\$")
demand_compile_regexp("inline_comment_regexp", r"^.*\s#.*$")
demand_compile_regexp("var_find", r"\\?(\${\w+}|\$\w+)")
demand_compile_regexp("backslash_find", r"\\.")
demand_compile_regexp("ansi_escape_re", r"(\x9B|\x1B\[)[0-?]*[ -/]*[@-~]")

__all__ = (
    "iter_read_bash",
    "read_bash",
    "read_dict",
    "read_bash_dict",
    "bash_parser",
    "BashParseError",
)


def iter_read_bash(
    bash_source, allow_inline_comments=True, allow_line_cont=False, enum_line=False
):
    """Iterate over a file honoring bash commenting rules and line continuations.

    Note that it's considered good behaviour to close filehandles, as
    such, either iterate fully through this, or use read_bash instead.
    Once the file object is no longer referenced the handle will be
    closed, but be proactive instead of relying on the garbage
    collector.

    :param bash_source: either a file to read from
        or a string holding the filename to open.
    :param allow_inline_comments: whether or not to prune characters
        after a # that isn't at the start of a line.
    :param allow_line_cont: whether or not to respect line continuations
    :return: yields lines w/ commenting stripped out
    """
    if isinstance(bash_source, str):
        bash_source = readlines(bash_source, True)
    s = ""
    for lineno, line in enumerate(bash_source, 1):
        if allow_line_cont and s:
            s += line
        else:
            s = line.lstrip()

        if s:
            if s[0] != "#":
                if allow_inline_comments:
                    if not allow_line_cont or (
                        allow_line_cont and inline_comment_regexp.match(line)
                    ):
                        s = s.split("#", 1)[0].rstrip()
                if allow_line_cont and line_cont_regexp.match(line):
                    s = s.rstrip("\\\n")
                    continue
                if enum_line:
                    yield lineno, s.rstrip()
                else:
                    yield s.rstrip()
            s = ""
    if s:
        if enum_line:
            yield lineno, s
        else:
            yield s


def read_bash(*args, **kwargs):
    """Read a file honoring bash commenting rules.

    See :py:func:`iter_read_bash` for parameter details.

    Returns a list of lines w/ comments stripped out.
    """
    return list(iter_read_bash(*args, **kwargs))


def read_bash_dict(bash_source, vars_dict=None, sourcing_command=None):
    """Read bash source, yielding a dict of vars.

    :param bash_source: either a file to read from
        or a string holding the filename to open
    :param vars_dict: initial 'env' for the sourcing.
        Is protected from modification.
    :type vars_dict: dict or None
    :param sourcing_command: controls whether a source command exists.
        If one does and is encountered, then this func is called.
    :type sourcing_command: callable
    :raise BashParseError: thrown if invalid syntax is encountered.
    :return: dict representing the resultant env if bash executed the source.
    """

    # quite possibly I'm missing something here, but the original
    # portage_util getconfig/varexpand seemed like it only went
    # halfway. The shlex posix mode *should* cover everything.

    if vars_dict is not None:
        d, protected = ProtectedDict(vars_dict), True
    else:
        d, protected = {}, False

    close = False
    infile = None
    if isinstance(bash_source, str):
        f = open(bash_source, "r")
        close = True
        infile = bash_source
    else:
        f = bash_source
    s = bash_parser(f, sourcing_command=sourcing_command, env=d, infile=infile)

    try:
        tok = ""
        try:
            while tok is not None:
                key = s.get_token()
                if key == "export":
                    # discard 'export' token from "export VAR=VALUE" lines
                    key = s.get_token()
                if key is None:
                    break
                elif key.isspace():
                    # we specifically have to check this, since we're
                    # screwing with the whitespace filters below to
                    # detect empty assigns
                    continue
                eq = s.get_token()
                if eq != "=":
                    raise BashParseError(
                        bash_source, s.lineno, "got token %r, was expecting '='" % eq
                    )
                val = s.get_token()
                if val is None:
                    val = ""
                elif val == "export":
                    val = s.get_token()
                # look ahead to see if we just got an empty assign.
                next_tok = s.get_token()
                if next_tok == "=":
                    # ... we did.
                    # leftmost insertions, thus reversed ordering
                    s.push_token(next_tok)
                    s.push_token(val)
                    val = ""
                else:
                    s.push_token(next_tok)
                d[key] = val
        except ValueError as e:
            raise BashParseError(bash_source, s.lineno, str(e)) from e
    finally:
        if close and f is not None:
            f.close()
    if protected:
        d = d.new
    return d


def read_dict(
    bash_source,
    splitter="=",
    source_isiter=False,
    allow_inline_comments=True,
    strip=False,
    filename=None,
    ignore_errors=False,
):
    """Read key value pairs from a file, ignoring bash-style comments.

    :param splitter: the string to split on.  Can be None to
        default to str.split's default
    :param bash_source: either a file to read from,
        or a string holding the filename to open.
    :param allow_inline_comments: whether or not to prune characters
        after a # that isn't at the start of a line.
    :param ignore_errors: parse errors are logged instead of raised
    :raise: :py:class:`BashParseError` if there are parse errors found.
    """
    d = {}
    if not source_isiter:
        filename = bash_source
        i = iter_read_bash(bash_source, allow_inline_comments=allow_inline_comments)
    else:
        if filename is None:
            # XXX what to do?
            filename = "<unknown>"
        i = bash_source
    line_count = 0
    try:
        for k in i:
            line_count += 1
            try:
                k, v = k.split(splitter, 1)
            except ValueError as e:
                if filename == "<unknown>":
                    filename = getattr(bash_source, "name", bash_source)
                if ignore_errors:
                    logger.error(
                        "bash parse error in %r, line %s", filename, line_count
                    )
                    continue
                else:
                    raise BashParseError(filename, line_count) from e
            if strip:
                k, v = k.strip(), v.strip()
            if len(v) > 2 and v[0] == v[-1] and v[0] in ("'", '"'):
                v = v[1:-1]
            d[k] = v
    finally:
        del i
    return d


def _nuke_backslash(s):
    s = s.group()
    if s == "\\\n":
        return "\n"
    try:
        return chr(ord(s))
    except TypeError:
        return s[1]


class bash_parser(shlex):
    """Fixed up shlex version for bash parsing.

    Corrects corner cases in quote expansion and adds variable interpolation.
    While it's a fair bit slower than stdlib shlex, it parses a more complete
    subset of bash syntax than stdlib shlex.
    """

    def __init__(self, source, sourcing_command=None, env=None, infile=None):
        """
        :param source: file handle to read from
        :param sourcing_command: token to treat as an include command
        :type sourcing_command: either None, or a string; if None, no includes
            are allowed in this parsing
        :param env: initial environment to use for variable interpolation
        :type env: must be a mapping; if None, an empty dict is used
        """
        self.__dict__["state"] = " "
        super().__init__(source, posix=True, infile=infile)
        self.wordchars += "@${}/.-+/:~^*"
        self.wordchars = frozenset(self.wordchars)
        if sourcing_command is not None:
            self.source = sourcing_command
        if env is None:
            env = {}
        self.env = env
        self.__pos = 0

    def __setattr__(self, attr, val):
        if attr == "state":
            if (self.state, val) in (('"', "a"), ("a", '"'), ("a", " "), ("'", "a")):
                strl = len(self.token)
                if self.__pos != strl:
                    self.changed_state.append((self.state, self.token[self.__pos :]))
                self.__pos = strl
        self.__dict__[attr] = val

    def sourcehook(self, newfile):
        try:
            return super().sourcehook(newfile)
        except IOError as e:
            raise BashParseError(newfile, 0, str(e)) from e

    def read_token(self):
        self.changed_state = []
        self.__pos = 0
        token = super().read_token()
        if token is None:
            return token
        if self.state is None:
            # eof reached.
            self.changed_state.append((self.state, token[self.__pos :]))
        else:
            self.changed_state.append((self.state, self.token[self.__pos :]))
        tok = ""
        for s, t in self.changed_state:
            if s in ('"', "a"):
                tok += self.var_expand(t).replace("\\\n", "")
            else:
                tok += t
        return tok

    def var_expand(self, val):
        prev, pos = 0, 0
        l = []
        while match := var_find.search(val, pos):
            pos = match.start()
            if val[pos] == "\\":
                # it's escaped. either it's \\$ or \\${ , either way,
                # skipping two ahead handles it.
                pos += 2
            else:
                var = val[match.start() : match.end()].strip("${}")
                if prev != pos:
                    l.append(val[prev:pos])
                if var in self.env:
                    if not isinstance(self.env[var], str):
                        raise ValueError(
                            "env key %r must be a string, not %s: %r"
                            % (var, type(self.env[var]), self.env[var])
                        )
                    l.append(self.env[var])
                else:
                    l.append("")
                prev = pos = match.end()

        # do \\ cleansing, collapsing val down also.
        val = backslash_find.sub(_nuke_backslash, "".join(l) + val[prev:])
        return val


class BashParseError(Exception):
    """Exception thrown when a handle being parsed isn't valid bash."""

    def __init__(self, filename, line, errmsg=None):
        if errmsg is not None:
            super().__init__(
                "error parsing '%s' on or before line %i: err %s"
                % (filename, line, errmsg)
            )
        else:
            super().__init__(
                "error parsing '%s' on or before line %i" % (filename, line)
            )
        self.file, self.line, self.errmsg = filename, line, errmsg