diff --git a/CHANGELOG.md b/CHANGELOG.md index df87b0b4..29ce30f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ ## Version 2.0.0-beta ### Feature updates -* `ensure_binary`, `ensure_str`, `ensure_text` and `assert_regex` utility methods have been migrated from `six.py` to `splunklib/__init__.py` +* `ensure_binary`, `ensure_str`, `ensure_text` and `assert_regex` utility methods have been migrated from `six.py` to `splunklib/utils.py` ### Major changes * Removed Code specific to Python2 diff --git a/splunklib/__init__.py b/splunklib/__init__.py index 90d93921..035e0f81 100644 --- a/splunklib/__init__.py +++ b/splunklib/__init__.py @@ -30,49 +30,5 @@ def setup_logging(level, log_format=DEFAULT_LOG_FORMAT, date_format=DEFAULT_DATE datefmt=date_format) -def ensure_binary(s, encoding='utf-8', errors='strict'): - """ - - `str` -> encoded to `bytes` - - `bytes` -> `bytes` - """ - if isinstance(s, str): - return s.encode(encoding, errors) - - if isinstance(s, bytes): - return s - - raise TypeError(f"not expecting type '{type(s)}'") - - -def ensure_str(s, encoding='utf-8', errors='strict'): - """ - - `str` -> `str` - - `bytes` -> decoded to `str` - """ - if isinstance(s, bytes): - return s.decode(encoding, errors) - - if isinstance(s, str): - return s - - raise TypeError(f"not expecting type '{type(s)}'") - - -def ensure_text(s, encoding='utf-8', errors='strict'): - """ - - `str` -> `str` - - `bytes` -> decoded to `str` - """ - if isinstance(s, bytes): - return s.decode(encoding, errors) - if isinstance(s, str): - return s - raise TypeError(f"not expecting type '{type(s)}'") - - -def assertRegex(self, *args, **kwargs): - return getattr(self, "assertRegex")(*args, **kwargs) - - __version_info__ = (2, 0, 0) __version__ = ".".join(map(str, __version_info__)) diff --git a/splunklib/modularinput/event.py b/splunklib/modularinput/event.py index 1ad738b8..2e398cfa 100644 --- a/splunklib/modularinput/event.py +++ b/splunklib/modularinput/event.py @@ -15,7 +15,7 @@ from io import TextIOBase import xml.etree.ElementTree as ET -from splunklib import ensure_text +from splunklib.utils import ensure_text class Event: diff --git a/splunklib/modularinput/event_writer.py b/splunklib/modularinput/event_writer.py index adc9195e..cfff8721 100644 --- a/splunklib/modularinput/event_writer.py +++ b/splunklib/modularinput/event_writer.py @@ -14,7 +14,7 @@ import sys -from splunklib import ensure_str +from splunklib.utils import ensure_str from .event import ET diff --git a/splunklib/searchcommands/search_command.py b/splunklib/searchcommands/search_command.py index 47504ed9..ab8be07c 100644 --- a/splunklib/searchcommands/search_command.py +++ b/splunklib/searchcommands/search_command.py @@ -34,6 +34,8 @@ from urllib.parse import urlsplit from warnings import warn from xml.etree import ElementTree +from splunklib.utils import ensure_str + # Relative imports import splunklib @@ -888,7 +890,7 @@ def _read_chunk(istream): if not header: return None - match = SearchCommand._header.match(splunklib.ensure_str(header)) + match = SearchCommand._header.match(ensure_str(header)) if match is None: raise RuntimeError(f'Failed to parse transport header: {header}') @@ -905,7 +907,7 @@ def _read_chunk(istream): decoder = MetadataDecoder() try: - metadata = decoder.decode(splunklib.ensure_str(metadata)) + metadata = decoder.decode(ensure_str(metadata)) except Exception as error: raise RuntimeError(f'Failed to parse metadata of length {metadata_length}: {error}') @@ -919,7 +921,7 @@ def _read_chunk(istream): except Exception as error: raise RuntimeError(f'Failed to read body of length {body_length}: {error}') - return metadata, splunklib.ensure_str(body,errors="replace") + return metadata, ensure_str(body,errors="replace") _header = re.compile(r'chunked\s+1.0\s*,\s*(\d+)\s*,\s*(\d+)\s*\n') diff --git a/splunklib/utils.py b/splunklib/utils.py new file mode 100644 index 00000000..3bb80ca2 --- /dev/null +++ b/splunklib/utils.py @@ -0,0 +1,60 @@ +# Copyright © 2011-2023 Splunk, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"): you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""The **splunklib.utils** File for utility functions. +""" + + +def ensure_binary(s, encoding='utf-8', errors='strict'): + """ + - `str` -> encoded to `bytes` + - `bytes` -> `bytes` + """ + if isinstance(s, str): + return s.encode(encoding, errors) + + if isinstance(s, bytes): + return s + + raise TypeError(f"not expecting type '{type(s)}'") + + +def ensure_str(s, encoding='utf-8', errors='strict'): + """ + - `str` -> `str` + - `bytes` -> decoded to `str` + """ + if isinstance(s, bytes): + return s.decode(encoding, errors) + + if isinstance(s, str): + return s + + raise TypeError(f"not expecting type '{type(s)}'") + + +def ensure_text(s, encoding='utf-8', errors='strict'): + """ + - `str` -> `str` + - `bytes` -> decoded to `str` + """ + if isinstance(s, bytes): + return s.decode(encoding, errors) + if isinstance(s, str): + return s + raise TypeError(f"not expecting type '{type(s)}'") + + +def assertRegex(self, *args, **kwargs): + return getattr(self, "assertRegex")(*args, **kwargs) diff --git a/tests/searchcommands/test_search_command.py b/tests/searchcommands/test_search_command.py index ace24be1..8ecb3fd7 100755 --- a/tests/searchcommands/test_search_command.py +++ b/tests/searchcommands/test_search_command.py @@ -32,15 +32,16 @@ from splunklib.searchcommands.decorators import ConfigurationSetting, Option from splunklib.searchcommands.search_command import SearchCommand from splunklib.client import Service +from splunklib.utils import ensure_binary from io import StringIO, BytesIO def build_command_input(getinfo_metadata, execute_metadata, execute_body): - input = (f'chunked 1.0,{len(splunklib.ensure_binary(getinfo_metadata))},0\n{getinfo_metadata}' + - f'chunked 1.0,{len(splunklib.ensure_binary(execute_metadata))},{len(splunklib.ensure_binary(execute_body))}\n{execute_metadata}{execute_body}') + input = (f'chunked 1.0,{len(ensure_binary(getinfo_metadata))},0\n{getinfo_metadata}' + + f'chunked 1.0,{len(ensure_binary(execute_metadata))},{len(ensure_binary(execute_body))}\n{execute_metadata}{execute_body}') - ifile = BytesIO(splunklib.ensure_binary(input)) + ifile = BytesIO(ensure_binary(input)) ifile = TextIOWrapper(ifile)