Skip to content

Commit

Permalink
Fix interoperability with Hive client version >= V10 (cloudera#396) (c…
Browse files Browse the repository at this point in the history
…loudera#397)

Impyla uses Hive client version V6 and doesn't work correctly
against Hive client protocol version >= V10.

The issue happens when async compilation is enabled:
1. Impyla waits for the query to get into a fetchable state.
2. Impyla checks TOperationHandle.hasResultSet before trying to
fetch the results.

The problem is that TOperationHandle.hasResultSet is not set
correctly any longer. The flag was moved to
TGetOperationStatusResp in later Hive client versions.

This fix makes sure that TGetOperationStatusResp.hasResultSet is
used instead of TOperationHandle.hasResultSet if present.

Testing:
- Manually tested against a Hive client version V10.

Co-authored-by: Attila Jeges <[email protected]>
  • Loading branch information
attilajeges and Attila Jeges authored May 1, 2020
1 parent ed0f8bc commit e14138d
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 6 deletions.
18 changes: 17 additions & 1 deletion impala/_thrift_gen/TCLIService/ttypes.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 19 additions & 5 deletions impala/hiveserver2.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ def _wait_to_finish(self):
while True:
req = TGetOperationStatusReq(operationHandle=self._last_operation.handle)
resp = self._last_operation._rpc('GetOperationStatus', req)
self._last_operation.update_has_result_set(resp)
operation_state = TOperationState._VALUES_TO_NAMES[resp.operationState]

log.debug('_wait_to_finish: waited %s seconds so far',
Expand Down Expand Up @@ -1179,21 +1180,34 @@ def __init__(self, session, handle, retries=3):
self.session = session
self.handle = handle
self._schema = None
self._state_has_result_set = None
ThriftRPC.__init__(self, self.session.client, retries=retries)

@property
def has_result_set(self):
return self.handle.hasResultSet
# When HIVE_CLI_SERVICE_PROTOCOL_V10 or later API is used and async compilation is
# enabled, self.handle.hasResultSet is not set any longer.
# In this case self._state_has_result_set should be used instead.
if self._state_has_result_set is not None:
return self._state_has_result_set
else:
return self.handle.hasResultSet

def update_has_result_set(self, state):
self._state_has_result_set = state.hasResultSet

def get_status(self):
# pylint: disable=protected-access
req = TGetOperationStatusReq(operationHandle=self.handle)
resp = self._rpc('GetOperationStatus', req)
self.update_has_result_set(resp)
return TOperationState._VALUES_TO_NAMES[resp.operationState]

def get_state(self):
req = TGetOperationStatusReq(operationHandle=self.handle)
return self._rpc('GetOperationStatus', req)
resp = self._rpc('GetOperationStatus', req)
self.update_has_result_set(resp)
return resp

def get_log(self, max_rows=1024, orientation=TFetchOrientation.FETCH_NEXT):
try:
Expand Down Expand Up @@ -1239,7 +1253,7 @@ def fetch(self, schema=None, max_rows=1024,
orientation=TFetchOrientation.FETCH_NEXT,
convert_types=True):
if not self.has_result_set:
log.debug('fetch_results: operation_handle.hasResultSet=False')
log.debug('fetch_results: has_result_set=False')
return None

# the schema is necessary to pull the proper values (i.e., coalesce)
Expand Down Expand Up @@ -1267,8 +1281,8 @@ def is_columnar(self):
return _is_columnar_protocol(protocol)

def get_result_schema(self):
if not self.handle.hasResultSet:
log.debug('get_result_schema: handle.hasResultSet=False')
if not self.has_result_set:
log.debug('get_result_schema: has_result_set=False')
return None

req = TGetResultSetMetadataReq(operationHandle=self.handle)
Expand Down
3 changes: 3 additions & 0 deletions impala/thrift/TCLIService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,9 @@ struct TGetOperationStatusResp {

// Error message
5: optional string errorMessage

// If the operation has the result
9: optional bool hasResultSet
}


Expand Down

0 comments on commit e14138d

Please sign in to comment.