forked from apache/superset
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcelery_tests.py
302 lines (260 loc) · 11.5 KB
/
celery_tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
"""Unit tests for Caravel Celery worker"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import imp
import json
import os
import subprocess
import time
import unittest
import pandas as pd
import caravel
from caravel import app, appbuilder, db, models, sql_lab, utils, dataframe
from .base_tests import CaravelTestCase
QueryStatus = models.QueryStatus
BASE_DIR = app.config.get('BASE_DIR')
cli = imp.load_source('cli', BASE_DIR + '/bin/caravel')
class CeleryConfig(object):
BROKER_URL = 'sqla+sqlite:///' + app.config.get('SQL_CELERY_DB_FILE_PATH')
CELERY_IMPORTS = ('caravel.sql_lab', )
CELERY_RESULT_BACKEND = 'db+sqlite:///' + app.config.get('SQL_CELERY_RESULTS_DB_FILE_PATH')
CELERY_ANNOTATIONS = {'sql_lab.add': {'rate_limit': '10/s'}}
CONCURRENCY = 1
app.config['CELERY_CONFIG'] = CeleryConfig
class UtilityFunctionTests(CaravelTestCase):
# TODO(bkyryliuk): support more cases in CTA function.
def test_create_table_as(self):
select_query = "SELECT * FROM outer_space;"
updated_select_query = sql_lab.create_table_as(
select_query, "tmp")
self.assertEqual(
"CREATE TABLE tmp AS \nSELECT * FROM outer_space;",
updated_select_query)
updated_select_query_with_drop = sql_lab.create_table_as(
select_query, "tmp", override=True)
self.assertEqual(
"DROP TABLE IF EXISTS tmp;\n"
"CREATE TABLE tmp AS \nSELECT * FROM outer_space;",
updated_select_query_with_drop)
select_query_no_semicolon = "SELECT * FROM outer_space"
updated_select_query_no_semicolon = sql_lab.create_table_as(
select_query_no_semicolon, "tmp")
self.assertEqual(
"CREATE TABLE tmp AS \nSELECT * FROM outer_space",
updated_select_query_no_semicolon)
multi_line_query = (
"SELECT * FROM planets WHERE\n"
"Luke_Father = 'Darth Vader';")
updated_multi_line_query = sql_lab.create_table_as(
multi_line_query, "tmp")
expected_updated_multi_line_query = (
"CREATE TABLE tmp AS \nSELECT * FROM planets WHERE\n"
"Luke_Father = 'Darth Vader';")
self.assertEqual(
expected_updated_multi_line_query,
updated_multi_line_query)
class CeleryTestCase(CaravelTestCase):
def __init__(self, *args, **kwargs):
super(CeleryTestCase, self).__init__(*args, **kwargs)
self.client = app.test_client()
def get_query_by_name(self, sql):
session = db.session
query = session.query(models.Query).filter_by(sql=sql).first()
session.close()
return query
def get_query_by_id(self, id):
session = db.session
query = session.query(models.Query).filter_by(id=id).first()
session.close()
return query
@classmethod
def setUpClass(cls):
try:
os.remove(app.config.get('SQL_CELERY_DB_FILE_PATH'))
except OSError as e:
app.logger.warn(str(e))
try:
os.remove(app.config.get('SQL_CELERY_RESULTS_DB_FILE_PATH'))
except OSError as e:
app.logger.warn(str(e))
utils.init(caravel)
worker_command = BASE_DIR + '/bin/caravel worker'
subprocess.Popen(
worker_command, shell=True, stdout=subprocess.PIPE)
admin = appbuilder.sm.find_user('admin')
if not admin:
appbuilder.sm.add_user(
'admin', 'admin', ' user', '[email protected]',
appbuilder.sm.find_role('Admin'),
password='general')
cli.load_examples(load_test_data=True)
@classmethod
def tearDownClass(cls):
subprocess.call(
"ps auxww | grep 'celeryd' | awk '{print $2}' | xargs kill -9",
shell=True
)
subprocess.call(
"ps auxww | grep 'caravel worker' | awk '{print $2}' | "
"xargs kill -9",
shell=True
)
def run_sql(self, dbid, sql, client_id, cta='false', tmp_table='tmp',
async='false'):
self.login()
resp = self.client.post(
'/caravel/sql_json/',
data=dict(
database_id=dbid,
sql=sql,
async=async,
select_as_cta=cta,
tmp_table_name=tmp_table,
client_id=client_id,
),
)
self.logout()
return json.loads(resp.data.decode('utf-8'))
def test_add_limit_to_the_query(self):
session = db.session
db_to_query = session.query(models.Database).filter_by(
id=1).first()
eng = db_to_query.get_sqla_engine()
select_query = "SELECT * FROM outer_space;"
updated_select_query = db_to_query.wrap_sql_limit(select_query, 100)
# Different DB engines have their own spacing while compiling
# the queries, that's why ' '.join(query.split()) is used.
# In addition some of the engines do not include OFFSET 0.
self.assertTrue(
"SELECT * FROM (SELECT * FROM outer_space;) AS inner_qry "
"LIMIT 100" in ' '.join(updated_select_query.split())
)
select_query_no_semicolon = "SELECT * FROM outer_space"
updated_select_query_no_semicolon = db_to_query.wrap_sql_limit(
select_query_no_semicolon, 100)
self.assertTrue(
"SELECT * FROM (SELECT * FROM outer_space) AS inner_qry "
"LIMIT 100" in
' '.join(updated_select_query_no_semicolon.split())
)
multi_line_query = (
"SELECT * FROM planets WHERE\n Luke_Father = 'Darth Vader';"
)
updated_multi_line_query = db_to_query.wrap_sql_limit(multi_line_query, 100)
self.assertTrue(
"SELECT * FROM (SELECT * FROM planets WHERE "
"Luke_Father = 'Darth Vader';) AS inner_qry LIMIT 100" in
' '.join(updated_multi_line_query.split())
)
def test_run_sync_query(self):
main_db = db.session.query(models.Database).filter_by(
database_name="main").first()
eng = main_db.get_sqla_engine()
# Case 1.
# Table doesn't exist.
sql_dont_exist = 'SELECT name FROM table_dont_exist'
result1 = self.run_sql(1, sql_dont_exist, "1", cta='true')
self.assertTrue('error' in result1)
# Case 2.
# Table and DB exists, CTA call to the backend.
sql_where = "SELECT name FROM ab_permission WHERE name='can_sql'"
result2 = self.run_sql(
1, sql_where, "2", tmp_table='tmp_table_2', cta='true')
self.assertEqual(QueryStatus.SUCCESS, result2['query']['state'])
self.assertEqual([], result2['data'])
self.assertEqual([], result2['columns'])
query2 = self.get_query_by_id(result2['query']['serverId'])
# Check the data in the tmp table.
df2 = pd.read_sql_query(sql=query2.select_sql, con=eng)
data2 = df2.to_dict(orient='records')
self.assertEqual([{'name': 'can_sql'}], data2)
# Case 3.
# Table and DB exists, CTA call to the backend, no data.
sql_empty_result = 'SELECT * FROM ab_user WHERE id=666'
result3 = self.run_sql(
1, sql_empty_result, "3", tmp_table='tmp_table_3', cta='true',)
self.assertEqual(QueryStatus.SUCCESS, result3['query']['state'])
self.assertEqual([], result3['data'])
self.assertEqual([], result3['columns'])
query3 = self.get_query_by_id(result3['query']['serverId'])
self.assertEqual(QueryStatus.SUCCESS, query3.status)
def test_run_async_query(self):
main_db = db.session.query(models.Database).filter_by(
database_name="main").first()
eng = main_db.get_sqla_engine()
# Schedule queries
# Case 1.
# Table and DB exists, async CTA call to the backend.
sql_where = "SELECT name FROM ab_role WHERE name='Admin'"
result1 = self.run_sql(
1, sql_where, "4", async='true', tmp_table='tmp_async_1', cta='true')
assert result1['query']['state'] in (
QueryStatus.PENDING, QueryStatus.RUNNING, QueryStatus.SUCCESS)
time.sleep(1)
# Case 1.
query1 = self.get_query_by_id(result1['query']['serverId'])
df1 = pd.read_sql_query(query1.select_sql, con=eng)
self.assertEqual(QueryStatus.SUCCESS, query1.status)
self.assertEqual([{'name': 'Admin'}], df1.to_dict(orient='records'))
self.assertEqual(QueryStatus.SUCCESS, query1.status)
self.assertTrue("SELECT * \nFROM tmp_async_1" in query1.select_sql)
self.assertTrue("LIMIT 666" in query1.select_sql)
self.assertEqual(
"CREATE TABLE tmp_async_1 AS \nSELECT name FROM ab_role "
"WHERE name='Admin'", query1.executed_sql)
self.assertEqual(sql_where, query1.sql)
if eng.name != 'sqlite':
self.assertEqual(1, query1.rows)
self.assertEqual(666, query1.limit)
self.assertEqual(False, query1.limit_used)
self.assertEqual(True, query1.select_as_cta)
self.assertEqual(True, query1.select_as_cta_used)
def test_get_columns_dict(self):
main_db = db.session.query(models.Database).filter_by(
database_name='main').first()
df = main_db.get_df("SELECT * FROM multiformat_time_series", None)
cdf = dataframe.CaravelDataFrame(df)
if main_db.sqlalchemy_uri.startswith('sqlite'):
self.assertEqual(
[{'is_date': True, 'type': 'datetime_string', 'name': 'ds',
'is_dim': False},
{'is_date': True, 'type': 'datetime_string', 'name': 'ds2',
'is_dim': False},
{'agg': 'sum', 'is_date': False, 'type': 'int64',
'name': 'epoch_ms', 'is_dim': False},
{'agg': 'sum', 'is_date': False, 'type': 'int64',
'name': 'epoch_s', 'is_dim': False},
{'is_date': True, 'type': 'datetime_string', 'name': 'string0',
'is_dim': False},
{'is_date': False, 'type': 'object',
'name': 'string1', 'is_dim': True},
{'is_date': True, 'type': 'datetime_string', 'name': 'string2',
'is_dim': False},
{'is_date': False, 'type': 'object',
'name': 'string3', 'is_dim': True}]
, cdf.columns_dict
)
else:
self.assertEqual(
[{'is_date': True, 'type': 'datetime_string', 'name': 'ds',
'is_dim': False},
{'is_date': True, 'type': 'datetime64[ns]',
'name': 'ds2', 'is_dim': False},
{'agg': 'sum', 'is_date': False, 'type': 'int64',
'name': 'epoch_ms', 'is_dim': False},
{'agg': 'sum', 'is_date': False, 'type': 'int64',
'name': 'epoch_s', 'is_dim': False},
{'is_date': True, 'type': 'datetime_string', 'name': 'string0',
'is_dim': False},
{'is_date': False, 'type': 'object',
'name': 'string1', 'is_dim': True},
{'is_date': True, 'type': 'datetime_string', 'name': 'string2',
'is_dim': False},
{'is_date': False, 'type': 'object',
'name': 'string3', 'is_dim': True}]
, cdf.columns_dict
)
if __name__ == '__main__':
unittest.main()