forked from yuriverweij/webcollector
-
Notifications
You must be signed in to change notification settings - Fork 0
/
publish.py
111 lines (91 loc) · 4.04 KB
/
publish.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import requests
import urllib3
from requests.auth import HTTPBasicAuth
# Disable proxy settings
session = requests.Session()
session.trust_env = False
# Disable SSL warnings
urllib3.disable_warnings()
class Publisher:
def __init__(self, username, password, space, host):
self.username = username
self.password = password
self.space = space
self.confluence_host = host
def get_content(self, url):
if not self.username or not self.password:
raise SyntaxError("User or Password not set")
response = session.get(url, auth=HTTPBasicAuth(self.username, self.password), verify=False,
headers={'Cache-Control': 'no-cache',
'Pragma': 'no-cache',
'Expires': 'Thu, 01 Jan 1970 00:00:00 GMT'})
return response
def post_content(self, url, data):
if not self.username or not self.password:
raise SyntaxError("User or Password not set")
response = session.post(url, json=data, auth=HTTPBasicAuth(self.username, self.password), verify=False,
headers={'Cache-Control': 'no-cache',
'Pragma': 'no-cache',
'Expires': 'Thu, 01 Jan 1970 00:00:00 GMT'})
return response
def put_content(self, url, data):
if not self.username or not self.password:
raise SyntaxError("User or Password not set")
response = session.put(url, json=data, auth=HTTPBasicAuth(self.username, self.password), verify=False,
headers={'Cache-Control': 'no-cache',
'Pragma': 'no-cache',
'Expires': 'Thu, 01 Jan 1970 00:00:00 GMT'})
if response.status_code != 200:
raise ConnectionError(response.text)
return response
def publish(self, parent_page, title, content):
parent_title = parent_page.replace(' ', '%20')
url = "{confluence_host}/confluence/rest/api/content?spaceKey={space}&title={title}".format(
confluence_host=self.confluence_host, space=self.space, content=content)
response = self.get_content(url)
result = response.json()['results'][0]
parent_id = result['id']
url_children = "{confluence_host}/confluence/rest/api/{id}/child/page".format(
confluence_host=self.confluence_host, id=parent_id)
response = self.get_content(url_children)
results = response.json()['results']
found = None
for result in results:
if title in result['title']:
found = result['id']
break
if found:
print("Already exists: " + found)
get_page_url = "{}/confluence/rest/api/content/{}".format(self.confluence_host, found)
update_page_url = get_page_url + "?expand=body.storage"
r = self.get_content(get_page_url)
version = r.json()['version']['number']
version += 1
page = {
"type": "page",
"title": title,
"version": {"number": version},
"body": {
"storage": {
"value": content,
"representation": "storage"
}
}
}
r = self.put_content(update_page_url, page)
else:
new_page_url = "{}/confluence/rest/api/content".format(self.confluence_host)
page = {
"type": "page",
"title": title,
"ancestors": [{"id": parent_id}],
"space": {"key": self.space},
"body": {
"storage": {
"value": content,
"representation": "storage"
}
}
}
r = self.post_content(new_page_url, page)
print(r.status_code)