Skip to content

Commit

Permalink
feat: run BQ queries as single statement (apache#11904)
Browse files Browse the repository at this point in the history
* feat: run BQ queries as single statement

* Update deps

* Fix lint

* Update superset/sql_lab.py

Co-authored-by: Ville Brofeldt <[email protected]>

Co-authored-by: Ville Brofeldt <[email protected]>
  • Loading branch information
betodealmeida and villebro authored Dec 3, 2020
1 parent 04f993e commit 54bf707
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 7 deletions.
6 changes: 5 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ def get_git_sha():
],
extras_require={
"athena": ["pyathena>=1.10.8,<1.11"],
"bigquery": ["pandas_gbq>=0.10.0", "pybigquery>=0.4.10"],
"bigquery": [
"pandas_gbq>=0.10.0",
"pybigquery>=0.4.10",
"google-cloud-bigquery>=2.4.0",
],
"clickhouse": ["clickhouse-sqlalchemy>= 0.1.4, <0.2"],
"cockroachdb": ["cockroachdb>=0.3.5, <0.4"],
"cors": ["flask-cors>=2.0.0"],
Expand Down
5 changes: 3 additions & 2 deletions superset/db_engine_specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
arraysize = 0
max_column_name_length = 0
try_remove_schema_from_table_name = True # pylint: disable=invalid-name
run_multiple_statements_as_one = False

# default matching patterns for identifying column types
db_column_types: Dict[utils.DbColumnType, Tuple[Pattern[Any], ...]] = {
Expand Down Expand Up @@ -454,7 +455,7 @@ def set_or_update_query_limit(cls, sql: str, limit: int) -> str:

@staticmethod
def csv_to_df(**kwargs: Any) -> pd.DataFrame:
""" Read csv into Pandas DataFrame
"""Read csv into Pandas DataFrame
:param kwargs: params to be passed to DataFrame.read_csv
:return: Pandas DataFrame containing data from csv
"""
Expand All @@ -466,7 +467,7 @@ def csv_to_df(**kwargs: Any) -> pd.DataFrame:

@classmethod
def df_to_sql(cls, df: pd.DataFrame, **kwargs: Any) -> None:
""" Upload data from a Pandas DataFrame to a database. For
"""Upload data from a Pandas DataFrame to a database. For
regular engines this calls the DataFrame.to_sql() method. Can be
overridden for engines that don't work well with to_sql(), e.g.
BigQuery.
Expand Down
4 changes: 4 additions & 0 deletions superset/db_engine_specs/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class BigQueryEngineSpec(BaseEngineSpec):
engine_name = "Google BigQuery"
max_column_name_length = 128

# BigQuery doesn't maintain context when running multiple statements in the
# same cursor, so we need to run all statements at once
run_multiple_statements_as_one = True

"""
https://www.python.org/dev/peps/pep-0249/#arraysize
raw_connections bypass the pybigquery query execution context and deal with
Expand Down
14 changes: 10 additions & 4 deletions superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ def _serialize_and_expand_data(
return (data, selected_columns, all_columns, expanded_columns)


def execute_sql_statements( # pylint: disable=too-many-arguments, too-many-locals, too-many-statements
def execute_sql_statements( # pylint: disable=too-many-arguments, too-many-locals, too-many-statements, too-many-branches
query_id: int,
rendered_query: str,
return_results: bool,
Expand All @@ -322,9 +322,15 @@ def execute_sql_statements( # pylint: disable=too-many-arguments, too-many-loca
raise SqlLabException("Results backend isn't configured.")

# Breaking down into multiple statements
parsed_query = ParsedQuery(rendered_query)
statements = parsed_query.get_statements()
logger.info("Query %s: Executing %i statement(s)", str(query_id), len(statements))
if not db_engine_spec.run_multiple_statements_as_one:
parsed_query = ParsedQuery(rendered_query)
statements = parsed_query.get_statements()
logger.info(
"Query %s: Executing %i statement(s)", str(query_id), len(statements)
)
else:
statements = [rendered_query]
logger.info("Query %s: Executing query as a single statement", str(query_id))

logger.info("Query %s: Set query to 'running'", str(query_id))
query.status = QueryStatus.RUNNING
Expand Down

0 comments on commit 54bf707

Please sign in to comment.