Skip to content

Commit

Permalink
IMPALA-6568 add missing Query Compilation section to profiles.
Browse files Browse the repository at this point in the history
The profile command is used to display low-level information about the
most recent query. When a client makes request for the Profile, it
sends a GetRuntimeProfile request for the last queryId to to the server.
The queryId is used to find the ClientRequestState, an object which tracks
information about the execution, including Profile data which is stored in
several RuntimeProfile objects. The reply to the GetRuntimeProfile message
contains the Profile, pretty-printed as a string.

When a query is sent to the Front End to be compiled, the TExecRequest
that is returned from createExecRequest() in the JVM contains a Timeline,
which is a named sequence of events with timing information. This Timeline
is added to the Summary Profile in the ClientRequestState.

In the following cases the Front End was not setting the Timeline in the TExecRequest:
 - All DDL operations
 - Load data statements
 - Set statements
 - Explain statements
And this meant that the profile would not contain the "Query Compilation" timeline.

I refactored the main createExecRequest() method to
- try to make the flow clearer,
- allow the timeline to be set in the TExecRequest in only one place.
- to set "Planning finished" in all timelines

TESTING:

Add a new test to test_observability.py which checks that the "Query
Compilation" and "Planning finished" timelines appear in the profile for
various queries designed to exercise the new code paths in createExecRequest.

Change-Id: I869eaeb4be4291b6b938f91847f624c76ec90ea5
Reviewed-on: http://gerrit.cloudera.org:8080/11387
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
  • Loading branch information
bartash authored and cloudera-hudson committed Sep 13, 2018
1 parent 7df7ba8 commit f104701
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 59 deletions.
167 changes: 108 additions & 59 deletions fe/src/main/java/org/apache/impala/service/Frontend.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
import org.apache.impala.catalog.Function;
import org.apache.impala.catalog.ImpaladCatalog;
import org.apache.impala.catalog.Type;
import org.apache.impala.catalog.local.InconsistentMetadataFetchException;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaException;
Expand Down Expand Up @@ -1032,6 +1031,14 @@ public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainS
// Timeline of important events in the planning process, used for debugging
// and profiling.
EventSequence timeline = new EventSequence("Query Compilation");
TExecRequest result = getTExecRequest(queryCtx, timeline, explainString);
timeline.markEvent("Planning finished");
result.setTimeline(timeline.toThrift());
return result;
}

private TExecRequest getTExecRequest(TQueryCtx queryCtx, EventSequence timeline,
StringBuilder explainString) throws ImpalaException {
LOG.info("Analyzing query: " + queryCtx.client_request.stmt);

// Parse stmt and collect/load metadata to populate a stmt-local table cache
Expand All @@ -1048,11 +1055,7 @@ public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainS
timeline.markEvent("Analysis finished");
Preconditions.checkNotNull(analysisResult.getStmt());

TExecRequest result = new TExecRequest();
result.setQuery_options(queryCtx.client_request.getQuery_options());
result.setAccess_events(analysisResult.getAccessEvents());
result.analysis_warnings = analysisResult.getAnalyzer().getWarnings();
result.setUser_has_profile_access(analysisResult.userHasProfileAccess());
TExecRequest result = createBaseExecRequest(queryCtx, analysisResult);

TQueryOptions queryOptions = queryCtx.client_request.query_options;
if (analysisResult.isCatalogOp()) {
Expand All @@ -1062,21 +1065,15 @@ public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainS
if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
result.catalog_op_request.setLineage_graph(thriftLineageGraph);
}
// Set MT_DOP=4 for COMPUTE STATS on Parquet/ORC tables, unless the user has already
// provided another value for MT_DOP.
if (!queryOptions.isSetMt_dop() &&
analysisResult.isComputeStatsStmt() &&
analysisResult.getComputeStatsStmt().isColumnar()) {
queryOptions.setMt_dop(4);
}
// If unset, set MT_DOP to 0 to simplify the rest of the code.
if (!queryOptions.isSetMt_dop()) queryOptions.setMt_dop(0);
setMtDopForCatalogOp(analysisResult, queryOptions);
// All DDL operations except for CTAS are done with analysis at this point.
if (!analysisResult.isCreateTableAsSelectStmt()) return result;
if (!analysisResult.isCreateTableAsSelectStmt()) {
return result;
}
} else if (analysisResult.isLoadDataStmt()) {
result.stmt_type = TStmtType.LOAD;
result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
new TColumn("summary", Type.STRING.toThrift()))));
result.setResult_set_metadata(new TResultSetMetadata(
Collections.singletonList(new TColumn("summary", Type.STRING.toThrift()))));
result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift());
return result;
} else if (analysisResult.isSetStmt()) {
Expand All @@ -1098,16 +1095,8 @@ public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainS
if (!queryOptions.isSetMt_dop()) queryOptions.setMt_dop(0);

// create TQueryExecRequest
Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt()
|| analysisResult.isCreateTableAsSelectStmt() || analysisResult.isUpdateStmt()
|| analysisResult.isDeleteStmt());

Planner planner = new Planner(analysisResult, queryCtx, timeline);
TQueryExecRequest queryExecRequest = createExecRequest(planner, explainString);
queryCtx.setDesc_tbl(
planner.getAnalysisResult().getAnalyzer().getDescTbl().toThrift());
queryExecRequest.setQuery_ctx(queryCtx);
queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList());
TQueryExecRequest queryExecRequest =
getPlannedExecRequest(queryCtx, analysisResult, timeline, explainString);

TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph();
if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
Expand All @@ -1127,54 +1116,114 @@ public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainS

result.setQuery_exec_request(queryExecRequest);
if (analysisResult.isQueryStmt()) {
// fill in the metadata
LOG.trace("create result set metadata");
result.stmt_type = TStmtType.QUERY;
result.query_exec_request.stmt_type = result.stmt_type;
TResultSetMetadata metadata = new TResultSetMetadata();
QueryStmt queryStmt = analysisResult.getQueryStmt();
int colCnt = queryStmt.getColLabels().size();
for (int i = 0; i < colCnt; ++i) {
TColumn colDesc = new TColumn();
colDesc.columnName = queryStmt.getColLabels().get(i);
colDesc.columnType = queryStmt.getResultExprs().get(i).getType().toThrift();
metadata.addToColumns(colDesc);
}
result.setResult_set_metadata(metadata);
// fill in the metadata
result.setResult_set_metadata(createQueryResultSetMetadata(analysisResult));
} else if (analysisResult.isInsertStmt() ||
analysisResult.isCreateTableAsSelectStmt()) {
// For CTAS the overall TExecRequest statement type is DDL, but the
// query_exec_request should be DML
result.stmt_type =
analysisResult.isCreateTableAsSelectStmt() ? TStmtType.DDL : TStmtType.DML;
result.query_exec_request.stmt_type = TStmtType.DML;

// create finalization params of insert stmt
InsertStmt insertStmt = analysisResult.getInsertStmt();
if (insertStmt.getTargetTable() instanceof FeFsTable) {
TFinalizeParams finalizeParams = new TFinalizeParams();
finalizeParams.setIs_overwrite(insertStmt.isOverwrite());
finalizeParams.setTable_name(insertStmt.getTargetTableName().getTbl());
finalizeParams.setTable_id(DescriptorTable.TABLE_SINK_ID);
String db = insertStmt.getTargetTableName().getDb();
finalizeParams.setTable_db(db == null ? queryCtx.session.database : db);
FeFsTable hdfsTable = (FeFsTable) insertStmt.getTargetTable();
finalizeParams.setHdfs_base_dir(hdfsTable.getHdfsBaseDir());
finalizeParams.setStaging_dir(
hdfsTable.getHdfsBaseDir() + "/_impala_insert_staging");
queryExecRequest.setFinalize_params(finalizeParams);
}
addFinalizationParamsForInsert(
queryCtx, queryExecRequest, analysisResult.getInsertStmt());
} else {
Preconditions.checkState(analysisResult.isUpdateStmt() || analysisResult.isDeleteStmt());
Preconditions.checkState(
analysisResult.isUpdateStmt() || analysisResult.isDeleteStmt());
result.stmt_type = TStmtType.DML;
result.query_exec_request.stmt_type = TStmtType.DML;
}

timeline.markEvent("Planning finished");
result.setTimeline(timeline.toThrift());
return result;
}

/**
* Set MT_DOP based on the analysis result
*/
private static void setMtDopForCatalogOp(
AnalysisResult analysisResult, TQueryOptions queryOptions) {
// Set MT_DOP=4 for COMPUTE STATS on Parquet/ORC tables, unless the user has already
// provided another value for MT_DOP.
if (!queryOptions.isSetMt_dop() && analysisResult.isComputeStatsStmt()
&& analysisResult.getComputeStatsStmt().isColumnar()) {
queryOptions.setMt_dop(4);
}
// If unset, set MT_DOP to 0 to simplify the rest of the code.
if (!queryOptions.isSetMt_dop()) queryOptions.setMt_dop(0);
}

/**
* Create the TExecRequest and initialize it
*/
private static TExecRequest createBaseExecRequest(
TQueryCtx queryCtx, AnalysisResult analysisResult) {
TExecRequest result = new TExecRequest();
result.setQuery_options(queryCtx.client_request.getQuery_options());
result.setAccess_events(analysisResult.getAccessEvents());
result.analysis_warnings = analysisResult.getAnalyzer().getWarnings();
result.setUser_has_profile_access(analysisResult.userHasProfileAccess());
return result;
}

/**
* Add the finalize params for an insert statement to the queryExecRequest
*/
private static void addFinalizationParamsForInsert(
TQueryCtx queryCtx, TQueryExecRequest queryExecRequest, InsertStmt insertStmt) {
if (insertStmt.getTargetTable() instanceof FeFsTable) {
TFinalizeParams finalizeParams = new TFinalizeParams();
finalizeParams.setIs_overwrite(insertStmt.isOverwrite());
finalizeParams.setTable_name(insertStmt.getTargetTableName().getTbl());
finalizeParams.setTable_id(DescriptorTable.TABLE_SINK_ID);
String db = insertStmt.getTargetTableName().getDb();
finalizeParams.setTable_db(db == null ? queryCtx.session.database : db);
FeFsTable hdfsTable = (FeFsTable) insertStmt.getTargetTable();
finalizeParams.setHdfs_base_dir(hdfsTable.getHdfsBaseDir());
finalizeParams.setStaging_dir(
hdfsTable.getHdfsBaseDir() + "/_impala_insert_staging");
queryExecRequest.setFinalize_params(finalizeParams);
}
}

/**
* Add the metadata for the result set
*/
private static TResultSetMetadata createQueryResultSetMetadata(
AnalysisResult analysisResult) {
LOG.trace("create result set metadata");
TResultSetMetadata metadata = new TResultSetMetadata();
QueryStmt queryStmt = analysisResult.getQueryStmt();
int colCnt = queryStmt.getColLabels().size();
for (int i = 0; i < colCnt; ++i) {
TColumn colDesc = new TColumn();
colDesc.columnName = queryStmt.getColLabels().get(i);
colDesc.columnType = queryStmt.getResultExprs().get(i).getType().toThrift();
metadata.addToColumns(colDesc);
}
return metadata;
}

/**
* Get the TQueryExecRequest and use it to populate the query context
*/
private TQueryExecRequest getPlannedExecRequest(TQueryCtx queryCtx,
AnalysisResult analysisResult, EventSequence timeline, StringBuilder explainString)
throws ImpalaException {
Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt()
|| analysisResult.isCreateTableAsSelectStmt() || analysisResult.isUpdateStmt()
|| analysisResult.isDeleteStmt());
Planner planner = new Planner(analysisResult, queryCtx, timeline);
TQueryExecRequest queryExecRequest = createExecRequest(planner, explainString);
queryCtx.setDesc_tbl(
planner.getAnalysisResult().getAnalyzer().getDescTbl().toThrift());
queryExecRequest.setQuery_ctx(queryCtx);
queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList());
return queryExecRequest;
}

/**
* The MAX_MEM_ESTIMATE_FOR_ADMISSION query option can override the planner memory
* estimate if set. Sets queryOptions.per_host_mem_estimate if the override is
Expand All @@ -1200,7 +1249,7 @@ private void createExplainRequest(String explainString, TExecRequest result) {
result.setResult_set_metadata(metadata);

// create the explain result set - split the explain string into one line per row
String[] explainStringArray = explainString.toString().split("\n");
String[] explainStringArray = explainString.split("\n");
TExplainResult explainResult = new TExplainResult();
explainResult.results = Lists.newArrayList();
for (int i = 0; i < explainStringArray.length; ++i) {
Expand Down
36 changes: 36 additions & 0 deletions tests/query_test/test_observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,42 @@ def __verify_profile_event_sequence(self, event_regexes, runtime_profile):
assert event_regex_index == len(event_regexes), \
"Didn't find all events in profile: \n" + runtime_profile

def test_query_profile_contains_all_events(self, unique_database):
"""Test that the expected events show up in a query profile for various queries"""
# make a data file to load data from
path = "tmp/{0}/data_file".format(unique_database)
self.hdfs_client.delete_file_dir(path)
self.hdfs_client.create_file(path, "1")
use_query = "use {0}".format(unique_database)
self.execute_query(use_query)
# all the events we will see for every query
event_regexes = [
r'Query Compilation:',
r'Query Timeline:',
r'Planning finished'
]
# queries that explore different code paths in Frontend compilation
queries = [
'create table if not exists impala_6568 (i int)',
'select * from impala_6568',
'explain select * from impala_6568',
'describe impala_6568',
'alter table impala_6568 set tblproperties(\'numRows\'=\'10\')',
"load data inpath '/{0}' into table impala_6568".format(path)
]
# run each query...
for query in queries:
runtime_profile = self.execute_query(query).runtime_profile
# and check that all the expected events appear in the resulting profile
self.__verify_profile_contains_every_event(event_regexes, runtime_profile, query)

def __verify_profile_contains_every_event(self, event_regexes, runtime_profile, query):
"""Test that all the expected events show up in a given query profile."""
for regex in event_regexes:
assert any(re.search(regex, line) for line in runtime_profile.splitlines()), \
"Didn't find event '" + regex + "' for query '" + query + \
"' in profile: \n" + runtime_profile

class TestThriftProfile(ImpalaTestSuite):
@classmethod
def get_workload(self):
Expand Down

0 comments on commit f104701

Please sign in to comment.