-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapp.py
128 lines (110 loc) · 3.75 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import re
import flask
import pymysql
import config
from level import levels, groups
app = flask.Flask(__name__)
def get_schema(schema_file):
schema = ''
# TODO: don't get path traversalled
with open(f"../db/{schema_file}") as f:
schema = f.read().split('-- BEGIN PUBLIC')[1].split('-- END PUBLIC')[0].strip()
return schema
def make_template(template):
return re.sub(
r"\{(q\d*)\}",
lambda m: f'<input type="text" name="{m.group(1)}">',
template
)
def make_show(show_dict):
return ''.join([
f'<input type="text" name="{name}", placeholder="{placeholder}">'
for name, placeholder in show_dict.items()
])
def query(sql, db=None, user=None):
conn = pymysql.connect(
host=config.db_host,
user=user,
password=config.get_password(user),
db=db
)
with conn.cursor() as cursor:
try:
cursor.execute(sql)
rv = (cursor.fetchall(), True)
except Exception as e:
rv = ([['Error', str(e)]], False)
conn.close()
return rv
def get_groups():
hgroups = []
for i, group in enumerate(groups):
hgroups.append({
'name': group['name'],
'difficulty': group['difficulty'],
'flavor': group['flavor'],
'id': i,
'levels': []
})
for i, level in enumerate(levels):
hgroups[level['group']]['levels'].append({
'id': i,
'name': level['name']
})
return hgroups
def match_result(match, result):
if len(match) != len(result):
return False
for i in range(len(match)):
if len(match[i]) != len(result[i]):
return False
for j in range(len(result[i])):
if result[i][j] != match[i][j]:
return False
return True
@app.route('/api/solve', methods=["POST"])
def x():
challenge_id = flask.request.args.get('level', '')
if challenge_id == '':
return '', 400
challenge = levels[int(challenge_id)]
sql = challenge['spec']['template'].format(**flask.request.form.to_dict())
group = groups[challenge['group']]
db_spec = group['db']
result, success = query(sql, db=db_spec['database'], user=db_spec['user'])
match, _ = query(challenge['spec']['match'], db=db_spec['database'], user=db_spec['user'])
if not success:
if challenge['spec']['error-control'] == 'full':
result += [['Query', sql]]
elif challenge['spec']['error-control'] == 'error':
result = result
elif challenge['spec']['error-control'] == 'blind':
result = []
return flask.jsonify({
'data': result,
'message': challenge['spec']['flag'] if match_result(match, result) else False
})
@app.route('/')
def index():
challenge_id = flask.request.args.get('level', '')
if challenge_id == '':
return flask.render_template('index.html', groups=get_groups())
challenge = levels[int(challenge_id)]
group = groups[challenge['group']]
if challenge['type'] == 'sql':
return flask.render_template('sql.html',
schema=get_schema(group['db']['schema']),
title=group['name'],
question=challenge['spec']['question'],
template=make_template(challenge['spec']['template'])
)
elif challenge['type'].startswith('sqli'):
return flask.render_template('sqli.html',
schema=get_schema(group['db']['schema']),
title=group['name'],
question=challenge['spec']['question'],
template=make_show(challenge['spec']['show'])
)
return flask.render_template('404.html')
if __name__ == '__main__':
app.run(port=5001, host='0.0.0.0', debug=True)