Skip to content

Commit

Permalink
fstrings with flynt (pycontribs#1044)
Browse files Browse the repository at this point in the history
Co-authored-by: Neefs <[email protected]>
  • Loading branch information
studioj and Neefs authored May 17, 2021
1 parent 203a3ca commit d1f244c
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 116 deletions.
102 changes: 47 additions & 55 deletions jira/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,7 @@ def __call__(self, req):
# Per Atlassian docs, use %20 for whitespace when generating qsh for URL
# https://developer.atlassian.com/cloud/jira/platform/understanding-jwt/#qsh
query = "&".join(sorted(parse_result.query.split("&"))).replace("+", "%20")
qsh = "%(method)s&%(path)s&%(query)s" % {
"method": req.method.upper(),
"path": path,
"query": query,
}
qsh = f"{req.method.upper()}&{path}&{query}"

return hashlib.sha256(qsh.encode("utf-8")).hexdigest()

Expand Down Expand Up @@ -477,7 +473,7 @@ def __init__(
auth_method = (
oauth or basic_auth or jwt or kerberos or auth or "anonymous"
)
raise JIRAError("Can not log in with %s" % str(auth_method))
raise JIRAError(f"Can not log in with {str(auth_method)}")

self.deploymentType = None
if get_server_info:
Expand Down Expand Up @@ -558,7 +554,7 @@ def _check_for_html_error(self, content: str):
# embedding the error in a huge webpage.
if "<!-- SecurityTokenMissing -->" in content:
self.log.warning("Got SecurityTokenMissing")
raise JIRAError("SecurityTokenMissing: %s" % content)
raise JIRAError(f"SecurityTokenMissing: {content}")
return False
return True

Expand Down Expand Up @@ -893,7 +889,7 @@ def file_stream() -> MultipartEncoder:

js: Union[Dict[str, Any], List[Dict[str, Any]]] = json_loads(r)
if not js or not isinstance(js, Iterable):
raise JIRAError("Unable to parse JSON: %s" % js)
raise JIRAError(f"Unable to parse JSON: {js}")
jira_attachment = Attachment(
self._options, self._session, js[0] if isinstance(js, List) else js
)
Expand Down Expand Up @@ -1131,7 +1127,7 @@ def update_filter(
data["jql"] = jql or filter.jql
data["favourite"] = favourite or filter.favourite

url = self._get_url("filter/%s" % filter_id)
url = self._get_url(f"filter/{filter_id}")
r = self._session.put(
url, headers={"content-type": "application/json"}, data=json.dumps(data)
)
Expand Down Expand Up @@ -1206,7 +1202,7 @@ def group_members(self, group: str) -> OrderedDict:
while end_index < size - 1:
params = {
"groupname": group,
"expand": "users[%s:%s]" % (end_index + 1, end_index + 50),
"expand": f"users[{end_index + 1}:{end_index + 50}]",
}
r2 = self._get_json("group", params=params)
for user in r2["users"]["items"]:
Expand Down Expand Up @@ -1302,7 +1298,7 @@ def create_issue(
self,
fields: Optional[Dict[str, Any]] = None,
prefetch: bool = True,
**fieldargs
**fieldargs,
) -> Issue:
"""Create a new issue and return an issue Resource for it.
Expand Down Expand Up @@ -1892,7 +1888,7 @@ def transition_issue(
fields: Optional[Dict[str, Any]] = None,
comment: Optional[str] = None,
worklog: Optional[str] = None,
**fieldargs
**fieldargs,
):
"""Perform a transition on an issue.
Expand All @@ -1916,7 +1912,7 @@ def transition_issue(
# cannot cast to int, so try to find transitionId by name
transitionId = self.find_transitionid_by_name(issue, transition)
if transitionId is None:
raise JIRAError("Invalid transition name. %s" % transition)
raise JIRAError(f"Invalid transition name. {transition}")

data: Dict[str, Any] = {"transition": {"id": transitionId}}
if comment:
Expand All @@ -1936,7 +1932,7 @@ def transition_issue(
try:
r_json = json_loads(r)
except ValueError as e:
self.log.error("%s\n%s" % (e, r.text))
self.log.error(f"{e}\n{r.text}")
raise e
return r_json

Expand Down Expand Up @@ -2105,7 +2101,7 @@ def add_worklog(
data["updateAuthor"] = data["author"]
# report bug to Atlassian: author and updateAuthor parameters are
# ignored.
url = self._get_url("issue/{0}/worklog".format(issue))
url = self._get_url(f"issue/{issue}/worklog")
r = self._session.post(url, params=params, data=json.dumps(data))

return Worklog(self._options, self._session, json_loads(r))
Expand Down Expand Up @@ -2245,9 +2241,9 @@ def issue_type_by_name(self, name: str) -> IssueType:
if len(matching_issue_types) == 1:
return matching_issue_types[0]
elif len(matching_issue_types) == 0:
raise KeyError("Issue type '%s' is unknown." % name)
raise KeyError(f"Issue type '{name}' is unknown.")
else:
raise KeyError("Issue type '%s' appears more than once." % name)
raise KeyError(f"Issue type '{name}' appears more than once.")

def request_types(self, service_desk: ServiceDesk) -> List[RequestType]:
"""Returns request types supported by a service desk instance.
Expand All @@ -2262,7 +2258,7 @@ def request_types(self, service_desk: ServiceDesk) -> List[RequestType]:
service_desk = service_desk.id
url = (
self.server_url
+ "/rest/servicedeskapi/servicedesk/%s/requesttype" % service_desk
+ f"/rest/servicedeskapi/servicedesk/{service_desk}/requesttype"
)
headers = {"X-ExperimentalApi": "opt-in"}
r_json = json_loads(self._session.get(url, headers=headers))
Expand All @@ -2277,7 +2273,7 @@ def request_type_by_name(self, service_desk: ServiceDesk, name: str):
try:
request_type = [rt for rt in request_types if rt.name == name][0]
except IndexError:
raise KeyError("Request type '%s' is unknown." % name)
raise KeyError(f"Request type '{name}' is unknown.")
return request_type

# User permissions
Expand Down Expand Up @@ -2572,7 +2568,7 @@ def project_role(self, project: str, id: str) -> Role:
id (str): ID of the role to get
"""
if isinstance(id, Number):
id = "%s" % id
id = f"{id}"
return self._find_for_resource(Role, (project, id))

# Resolutions
Expand Down Expand Up @@ -3314,7 +3310,7 @@ def _get_json(
try:
r_json = json_loads(r)
except ValueError as e:
self.log.error("%s\n%s" % (e, r.text if r else r))
self.log.error(f"{e}\n{r.text if r else r}")
raise e
return r_json

Expand Down Expand Up @@ -3401,7 +3397,7 @@ def rename_user(self, old_user: str, new_user: str):
params = {"username": old_user}

# raw displayName
self.log.debug("renaming %s" % self.user(old_user).emailAddress)
self.log.debug(f"renaming {self.user(old_user).emailAddress}")

r = self._session.put(url, params=params, data=json.dumps(payload))
raise_on_error(r)
Expand All @@ -3421,7 +3417,7 @@ def delete_user(self, username: str) -> bool:
"""

url = self._get_latest_url("user/?username=%s" % username)
url = self._get_latest_url(f"user/?username={username}")

r = self._session.delete(url)
if 200 <= r.status_code <= 299:
Expand Down Expand Up @@ -3449,9 +3445,10 @@ def deactivate_user(self, username: str) -> Union[str, int]:
user.raw["session"]["name"],
user.raw["session"]["value"],
)
url = self._options[
"server"
] + "/admin/rest/um/1/user/deactivate?username=%s" % (username)
url = (
self._options["server"]
+ f"/admin/rest/um/1/user/deactivate?username={username}"
)
# We can't use our existing session here - this endpoint is fragile and objects to extra headers
try:
r = requests.post(
Expand All @@ -3467,13 +3464,12 @@ def deactivate_user(self, username: str) -> Union[str, int]:
return True
else:
self.log.warning(
"Got response from deactivating %s: %s"
% (username, r.status_code)
f"Got response from deactivating {username}: {r.status_code}"
)
return r.status_code
except Exception as e:
self.log.error("Error Deactivating %s: %s" % (username, e))
raise JIRAError("Error Deactivating %s: %s" % (username, e))
self.log.error(f"Error Deactivating {username}: {e}")
raise JIRAError(f"Error Deactivating {username}: {e}")
else:
url = self.server_url + "/secure/admin/user/EditUser.jspa"
self._options["headers"][
Expand All @@ -3496,13 +3492,12 @@ def deactivate_user(self, username: str) -> Union[str, int]:
return True
else:
self.log.warning(
"Got response from deactivating %s: %s"
% (username, r.status_code)
f"Got response from deactivating {username}: {r.status_code}"
)
return r.status_code
except Exception as e:
self.log.error("Error Deactivating %s: %s" % (username, e))
raise JIRAError("Error Deactivating %s: %s" % (username, e))
self.log.error(f"Error Deactivating {username}: {e}")
raise JIRAError(f"Error Deactivating {username}: {e}")

def reindex(self, force: bool = False, background: bool = True) -> bool:
"""Start jira re-indexing. Returns True if reindexing is in progress or not needed, or False.
Expand Down Expand Up @@ -3567,7 +3562,7 @@ def backup(self, filename: str = "backup.zip", attachments: bool = False):
if r.status_code == 200:
return True
else:
self.log.warning("Got %s response from calling backup." % r.status_code)
self.log.warning(f"Got {r.status_code} response from calling backup.")
return r.status_code
except Exception as e:
self.log.error("I see %s", e)
Expand Down Expand Up @@ -3625,7 +3620,7 @@ def backup_download(self, filename: str = None):
local_file = filename or remote_file
url = self.server_url + "/webdav/backupmanager/" + remote_file
try:
self.log.debug("Writing file to %s" % local_file)
self.log.debug(f"Writing file to {local_file}")
with open(local_file, "wb") as file:
try:
resp = self._session.get(
Expand All @@ -3634,12 +3629,12 @@ def backup_download(self, filename: str = None):
except Exception:
raise JIRAError()
if not resp.ok:
self.log.error("Something went wrong with download: %s" % resp.text)
self.log.error(f"Something went wrong with download: {resp.text}")
raise JIRAError(resp.text)
for block in resp.iter_content(1024):
file.write(block)
except JIRAError as je:
self.log.error("Unable to access remote backup file: %s" % je)
self.log.error(f"Unable to access remote backup file: {je}")
except IOError as ioe:
self.log.error(ioe)
return None
Expand Down Expand Up @@ -3678,7 +3673,7 @@ def delete_project(self, pid: Union[str, Project]) -> Optional[bool]:
if isinstance(pid, Project) and hasattr(pid, "id"):
pid = str(pid.id)

url = self._get_url("project/%s" % pid)
url = self._get_url(f"project/{pid}")
r = self._session.delete(url)
if r.status_code == 403:
raise JIRAError("Not enough permissions to delete project")
Expand Down Expand Up @@ -3757,7 +3752,7 @@ def projectcategories(self):
@lru_cache(maxsize=None)
def avatars(self, entity="project"):

url = self._get_url("avatar/%s/system" % entity)
url = self._get_url(f"avatar/{entity}/system")

r = self._session.get(url)
data: Dict[str, Any] = json_loads(r)
Expand Down Expand Up @@ -3802,7 +3797,7 @@ def workflows(self):

def delete_screen(self, id: str):

url = self._get_url("screens/%s" % id)
url = self._get_url(f"screens/{id}")

r = self._session.delete(url)
data = json_loads(r)
Expand All @@ -3812,7 +3807,7 @@ def delete_screen(self, id: str):

def delete_permissionscheme(self, id: str):

url = self._get_url("permissionscheme/%s" % id)
url = self._get_url(f"permissionscheme/{id}")

r = self._session.delete(url)
data = json_loads(r)
Expand Down Expand Up @@ -4103,7 +4098,7 @@ def role(self) -> List[Dict[str, Any]]:
def get_igrid(self, issueid: str, customfield: str, schemeid: str):
url = self.server_url + "/rest/idalko-igrid/1.0/datagrid/data"
if str(customfield).isdigit():
customfield = "customfield_%s" % customfield
customfield = f"customfield_{customfield}"
params = {
"_issueId": issueid,
"_fieldId": customfield,
Expand Down Expand Up @@ -4246,7 +4241,7 @@ def sprints(
return self._fetch_pages(
Sprint,
"values",
"board/%s/sprint" % board_id,
f"board/{board_id}/sprint",
startAt,
maxResults,
params,
Expand Down Expand Up @@ -4280,25 +4275,23 @@ def update_sprint(self, id, name=None, startDate=None, endDate=None, state=None)
)
payload["state"] = state

url = self._get_url("sprint/%s" % id, base=self.AGILE_BASE_URL)
url = self._get_url(f"sprint/{id}", base=self.AGILE_BASE_URL)
r = self._session.put(url, data=json.dumps(payload))

return json_loads(r)

def incompletedIssuesEstimateSum(self, board_id: str, sprint_id: str):
"""Return the total incompleted points this sprint."""
data: Dict[str, Any] = self._get_json(
"rapid/charts/sprintreport?rapidViewId=%s&sprintId=%s"
% (board_id, sprint_id),
f"rapid/charts/sprintreport?rapidViewId={board_id}&sprintId={sprint_id}",
base=self.AGILE_BASE_URL,
)
return data["contents"]["incompletedIssuesEstimateSum"]["value"]

def removed_issues(self, board_id: str, sprint_id: str):
"""Return the completed issues for the sprint."""
r_json: Dict[str, Any] = self._get_json(
"rapid/charts/sprintreport?rapidViewId=%s&sprintId=%s"
% (board_id, sprint_id),
f"rapid/charts/sprintreport?rapidViewId={board_id}&sprintId={sprint_id}",
base=self.AGILE_BASE_URL,
)
issues = [
Expand All @@ -4311,8 +4304,7 @@ def removed_issues(self, board_id: str, sprint_id: str):
def removedIssuesEstimateSum(self, board_id: str, sprint_id: str):
"""Return the total incompleted points this sprint."""
data: Dict[str, Any] = self._get_json(
"rapid/charts/sprintreport?rapidViewId=%s&sprintId=%s"
% (board_id, sprint_id),
f"rapid/charts/sprintreport?rapidViewId={board_id}&sprintId={sprint_id}",
base=self.AGILE_BASE_URL,
)
return data["contents"]["puntedIssuesEstimateSum"]["value"]
Expand Down Expand Up @@ -4428,7 +4420,7 @@ def create_sprint(
self._options["agile_rest_path"]
== GreenHopperResource.GREENHOPPER_REST_PATH
):
url = self._get_url("sprint/%s" % board_id, base=self.AGILE_BASE_URL)
url = self._get_url(f"sprint/{board_id}", base=self.AGILE_BASE_URL)
r = self._session.post(url)
raw_issue_json = json_loads(r)
""" now r contains something like:
Expand All @@ -4444,7 +4436,7 @@ def create_sprint(
}"""

url = self._get_url(
"sprint/%s" % raw_issue_json["id"], base=self.AGILE_BASE_URL
f"sprint/{raw_issue_json['id']}", base=self.AGILE_BASE_URL
)
r = self._session.put(url, data=json.dumps(payload))
raw_issue_json = json_loads(r)
Expand Down Expand Up @@ -4477,7 +4469,7 @@ def add_issues_to_sprint(self, sprint_id: int, issue_keys: List[str]) -> Respons
Response
"""
if self._options["agile_rest_path"] == GreenHopperResource.AGILE_BASE_REST_PATH:
url = self._get_url("sprint/%s/issue" % sprint_id, base=self.AGILE_BASE_URL)
url = self._get_url(f"sprint/{sprint_id}/issue", base=self.AGILE_BASE_URL)
payload = {"issues": issue_keys}
try:
return self._session.post(url, data=json.dumps(payload))
Expand Down Expand Up @@ -4535,7 +4527,7 @@ def add_issues_to_epic(
data: Dict[str, Any] = {}
data["issueKeys"] = issue_keys
data["ignoreEpics"] = ignore_epics
url = self._get_url("epics/%s/add" % epic_id, base=self.AGILE_BASE_URL)
url = self._get_url(f"epics/{epic_id}/add", base=self.AGILE_BASE_URL)
return self._session.put(url, data=json.dumps(data))

# TODO(ssbarnea): Both GreenHopper and new Jira Agile API support moving more than one issue.
Expand Down
2 changes: 1 addition & 1 deletion jira/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def findfile(path):

config_file = findfile("config.ini")
if config_file:
logging.debug("Found %s config file" % config_file)
logging.debug(f"Found {config_file} config file")

if not profile:
if config_file:
Expand Down
Loading

0 comments on commit d1f244c

Please sign in to comment.