-
Notifications
You must be signed in to change notification settings - Fork 5.6k
/
Copy pathjiraclient.py
131 lines (103 loc) · 3.91 KB
/
jiraclient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""Module to access a JIRA server."""
import logging
from enum import Enum
from typing import Any, Dict, Iterable, Optional, Sequence
from jira import JIRA, Issue
from jira.client import ResultList
from pydantic import BaseSettings
ASSIGNED_TEAMS_FIELD = "customfield_12751"
logger = logging.getLogger(__name__)
class SecurityLevel(Enum):
"""Security level of SERVER tickets."""
MONGO_INTERNAL = "Mongo Internal"
NONE = "None"
class JiraAuth(BaseSettings):
"""Auth information to connect to Jira."""
access_token: Optional[str]
access_token_secret: Optional[str]
consumer_key: Optional[str]
key_cert: Optional[str]
pat: Optional[str]
class Config:
"""Configuration for JiraAuth."""
env_prefix = "JIRA_AUTH_"
def get_token_auth(self) -> Optional[str]:
return self.pat
def get_oauth(self) -> Optional[Dict[str, Any]]:
if self.access_token and self.access_token_secret and self.consumer_key and self.key_cert:
return {
"access_token": self.access_token,
"access_token_secret": self.access_token_secret,
"consumer_key": self.consumer_key,
"key_cert": self.key_cert,
}
return None
class JiraClient:
"""A client for JIRA."""
def __init__(self, server: str, jira_auth: JiraAuth, dry_run: bool = False) -> None:
"""
Initialize the JiraClient with the server URL and user credentials.
:param server: Jira Server to connect to.
:param jira_auth: Auth connection information.
"""
opts = {"server": server, "verify": True}
token_auth = jira_auth.get_token_auth()
if token_auth:
self._jira = JIRA(options=opts, validate=True, token_auth=token_auth)
else:
self._jira = JIRA(options=opts, validate=True, oauth=jira_auth.get_oauth())
self.dry_run = dry_run
def get_ticket_security_level(self, key: str) -> SecurityLevel:
"""
Lookup the security level of the given ticket.
:param key: Key of ticket to query.
:return: Security level of the given ticket.
"""
ticket = self._jira.issue(key)
if hasattr(ticket.fields, "security"):
security_level = ticket.fields.security
return SecurityLevel(security_level.name)
return SecurityLevel.NONE
def get_issues(self, query: str) -> Iterable[Issue]:
start_at = 0
max_results = 50
while True:
results: ResultList[Issue] = self._jira.search_issues(
jql_str=query, startAt=start_at, maxResults=max_results
)
for item in results:
yield item
start_at = results.startAt + results.maxResults
if start_at > results.total:
break
def create_issue(
self,
issue_type: str,
summary: str,
description: str,
assigned_teams: Sequence[str],
jira_project: str,
owner: Optional[str] = None,
priority: str = "3",
components: Optional[Sequence[str]] = None,
labels: Optional[Sequence[str]] = None,
) -> Optional[Issue]:
assigned_teams_mapped = list(map(lambda x: {"value": x}, assigned_teams))
fields = {
"project": {"key": jira_project},
"summary": summary,
"description": description,
"issuetype": {"name": issue_type},
ASSIGNED_TEAMS_FIELD: assigned_teams_mapped,
"priority": {"id": priority},
}
if labels:
fields["labels"] = labels
if owner:
fields["assignee"] = {"name": owner}
if components:
fields["components"] = components
logger.info({"message": "Creating JIRA issue", "fields": fields})
if not self.dry_run:
return self._jira.create_issue(fields=fields)
return None