Skip to content

Commit

Permalink
[LIVY-622][LIVY-623][LIVY-624][LIVY-625][THRIFT] Support GetFunctions…
Browse files Browse the repository at this point in the history
…, GetSchemas, GetTables, GetColumns in Livy thrift server

## What changes were proposed in this pull request?
In this patch, we add the implementations of GetSchemas, GetFunctions, GetTables, and GetColumns in Livy Thrift server.

https://issues.apache.org/jira/browse/LIVY-622
https://issues.apache.org/jira/browse/LIVY-623
https://issues.apache.org/jira/browse/LIVY-624
https://issues.apache.org/jira/browse/LIVY-625

## How was this patch tested?
Add new unit tests and integration test. Run them with existing tests.

Author: yihengwang <[email protected]>

Closes apache#194 from yiheng/fix_575.
  • Loading branch information
yiheng authored and jerryshao committed Aug 16, 2019
1 parent fb03da3 commit cae9d97
Show file tree
Hide file tree
Showing 22 changed files with 1,395 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ class LivyCLIService(server: LivyThriftServer)
sessionHandle: SessionHandle,
catalogName: String,
schemaName: String): OperationHandle = {
// TODO
throw new HiveSQLException("Operation GET_SCHEMAS is not yet supported")
sessionManager.operationManager.getSchemas(
sessionHandle, catalogName, schemaName)
}

@throws[HiveSQLException]
Expand All @@ -226,8 +226,8 @@ class LivyCLIService(server: LivyThriftServer)
schemaName: String,
tableName: String,
tableTypes: util.List[String]): OperationHandle = {
// TODO
throw new HiveSQLException("Operation GET_TABLES is not yet supported")
sessionManager.operationManager.getTables(
sessionHandle, catalogName, schemaName, tableName, tableTypes)
}

@throws[HiveSQLException]
Expand All @@ -243,8 +243,8 @@ class LivyCLIService(server: LivyThriftServer)
schemaName: String,
tableName: String,
columnName: String): OperationHandle = {
// TODO
throw new HiveSQLException("Operation GET_COLUMNS is not yet supported")
sessionManager.operationManager.getColumns(
sessionHandle, catalogName, schemaName, tableName, columnName)
}

@throws[HiveSQLException]
Expand All @@ -253,8 +253,8 @@ class LivyCLIService(server: LivyThriftServer)
catalogName: String,
schemaName: String,
functionName: String): OperationHandle = {
// TODO
throw new HiveSQLException("Operation GET_FUNCTIONS is not yet supported")
sessionManager.operationManager.getFunctions(
sessionHandle, catalogName, schemaName, functionName)
}

@throws[HiveSQLException]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,69 @@ class LivyOperationManager(val livyThriftSessionManager: LivyThriftSessionManage
})
}

@throws[HiveSQLException]
def getTables(
sessionHandle: SessionHandle,
catalogName: String,
schemaName: String,
tableName: String,
tableTypes: util.List[String]): OperationHandle = {
executeOperation(sessionHandle, {
val op = new GetTablesOperation(
sessionHandle,
catalogName,
schemaName,
tableName,
tableTypes,
livyThriftSessionManager)
addOperation(op, sessionHandle)
op
})
}

@throws[HiveSQLException]
def getFunctions(
sessionHandle: SessionHandle,
catalogName: String,
schemaName: String,
functionName: String): OperationHandle = {
executeOperation(sessionHandle, {
val op = new GetFunctionsOperation(sessionHandle, catalogName, schemaName, functionName,
livyThriftSessionManager)
addOperation(op, sessionHandle)
op
})
}

@throws[HiveSQLException]
def getSchemas(
sessionHandle: SessionHandle,
catalogName: String,
schemaName: String): OperationHandle = {
executeOperation(sessionHandle, {
val op = new GetSchemasOperation(sessionHandle, catalogName, schemaName,
livyThriftSessionManager)
addOperation(op, sessionHandle)
op
})
}

@throws[HiveSQLException]
def getColumns(
sessionHandle: SessionHandle,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String): OperationHandle = {
executeOperation(sessionHandle, {
val op = new GetColumnsOperation(sessionHandle, catalogName, schemaName, tableName,
columnName, livyThriftSessionManager)
addOperation(op, sessionHandle)
op
})
}


/**
* Cancel the running operation unless it is already in a terminal state
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,8 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName:
override def GetSchemas(req: TGetSchemasReq): TGetSchemasResp = {
val resp = new TGetSchemasResp
try {
val opHandle = cliService.getSchemas(
new SessionHandle(req.getSessionHandle), req.getCatalogName, req.getSchemaName)
val opHandle = cliService.getSchemas(createSessionHandle(req.getSessionHandle),
req.getCatalogName, req.getSchemaName)
resp.setOperationHandle(opHandle.toTOperationHandle)
resp.setStatus(ThriftCLIService.OK_STATUS)
} catch {
Expand All @@ -444,7 +444,7 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName:
val resp = new TGetTablesResp
try {
val opHandle = cliService.getTables(
new SessionHandle(req.getSessionHandle),
createSessionHandle(req.getSessionHandle),
req.getCatalogName,
req.getSchemaName,
req.getTableName,
Expand Down Expand Up @@ -479,7 +479,7 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName:
val resp = new TGetColumnsResp
try {
val opHandle = cliService.getColumns(
new SessionHandle(req.getSessionHandle),
createSessionHandle(req.getSessionHandle),
req.getCatalogName,
req.getSchemaName,
req.getTableName,
Expand All @@ -499,7 +499,7 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName:
val resp = new TGetFunctionsResp
try {
val opHandle = cliService.getFunctions(
new SessionHandle(req.getSessionHandle),
createSessionHandle(req.getSessionHandle),
req.getCatalogName,
req.getSchemaName,
req.getFunctionName)
Expand Down Expand Up @@ -728,6 +728,13 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName:
s"Failed to validate proxy privilege of $realUser for $proxyUser", "08S01", e)
}
}

private def createSessionHandle(tHandle: TSessionHandle): SessionHandle = {
val protocolVersion = cliService.getSessionManager
.getSessionInfo(new SessionHandle(tHandle))
.protocolVersion
new SessionHandle(tHandle, protocolVersion)
}
}

object ThriftCLIService {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.livy.thriftserver.operation

import org.apache.hive.service.cli.{HiveSQLException, OperationState, OperationType, SessionHandle}

import org.apache.livy.Logging
import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema}
import org.apache.livy.thriftserver.LivyThriftSessionManager
import org.apache.livy.thriftserver.session.{GetColumnsJob, GetFunctionsJob}

class GetColumnsOperation(
sessionHandle: SessionHandle,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String,
sessionManager: LivyThriftSessionManager)
extends SparkCatalogOperation(
sessionHandle, OperationType.GET_COLUMNS, sessionManager) with Logging {

@throws(classOf[HiveSQLException])
override protected def runInternal(): Unit = {
setState(OperationState.RUNNING)
try {
rscClient.submit(new GetColumnsJob(
convertSchemaPattern(schemaName),
convertIdentifierPattern(tableName, datanucleusFormat = true),
Option(columnName).map { convertIdentifierPattern(_, datanucleusFormat = false) }.orNull,
sessionId,
jobId
)).get()

setState(OperationState.FINISHED)
} catch {
case e: Throwable =>
error("Remote job meet an exception: ", e)
setState(OperationState.ERROR)
throw new HiveSQLException(e)
}
}

@throws(classOf[HiveSQLException])
override def getResultSetSchema: Schema = {
assertState(Seq(OperationState.FINISHED))
GetColumnsOperation.SCHEMA
}
}

object GetColumnsOperation {
val SCHEMA = Schema(
Field("TABLE_CAT", BasicDataType("string"), "Catalog name. NULL if not applicable."),
Field("TABLE_SCHEM", BasicDataType("string"), "Schema name."),
Field("TABLE_NAME", BasicDataType("string"), "Table name."),
Field("COLUMN_NAME", BasicDataType("string"), "Column name"),
Field("DATA_TYPE", BasicDataType("integer"), "SQL type from java.sql.Types"),
Field("TYPE_NAME", BasicDataType("string"),
"Data source dependent type name, for a UDT the type name is fully qualified"),
Field("COLUMN_SIZE", BasicDataType("integer"), "Column size. For char or date types this is " +
"the maximum number of characters, for numeric or decimal types this is precision."),
Field("BUFFER_LENGTH", BasicDataType("byte"), "Unused"),
Field("DECIMAL_DIGITS", BasicDataType("integer"), "The number of fractional digits"),
Field("NUM_PREC_RADIX", BasicDataType("integer"), "Radix (typically either 10 or 2)"),
Field("NULLABLE", BasicDataType("integer"), "Is NULL allowed"),
Field("REMARKS", BasicDataType("string"), "Comment describing column (may be null)"),
Field("COLUMN_DEF", BasicDataType("string"), "Default value (may be null)"),
Field("SQL_DATA_TYPE", BasicDataType("integer"), "Unused"),
Field("SQL_DATETIME_SUB", BasicDataType("integer"), "Unused"),
Field("CHAR_OCTET_LENGTH", BasicDataType("integer"), "For char types the maximum number of " +
"bytes in the column"),
Field("ORDINAL_POSITION", BasicDataType("integer"), "Index of column in table (starting at 1)"),
Field("IS_NULLABLE", BasicDataType("string"), "\"NO\" means column definitely does not " +
"allow NULL values; \"YES\" means the column might allow NULL values. An empty string " +
"means nobody knows."),
Field("SCOPE_CATALOG", BasicDataType("string"), "Catalog of table that is the scope of a " +
"reference attribute (null if DATA_TYPE isn't REF)"),
Field("SCOPE_SCHEMA", BasicDataType("string"), "Schema of table that is the scope of a " +
"reference attribute (null if the DATA_TYPE isn't REF)"),
Field("SCOPE_TABLE", BasicDataType("string"), "Table name that this the scope of a " +
"reference attribure (null if the DATA_TYPE isn't REF)"),
Field("SOURCE_DATA_TYPE", BasicDataType("short"), "Source type of a distinct type or " +
"user-generated Ref type, SQL type from java.sql.Types (null if DATA_TYPE isn't " +
"DISTINCT or user-generated REF)"),
Field("IS_AUTO_INCREMENT", BasicDataType("string"), "Indicates whether this column is " +
"auto incremented.")
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.livy.thriftserver.operation

import org.apache.hive.service.cli.{HiveSQLException, OperationState, OperationType, SessionHandle}

import org.apache.livy.Logging
import org.apache.livy.thriftserver.session.GetFunctionsJob
import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema}
import org.apache.livy.thriftserver.LivyThriftSessionManager

class GetFunctionsOperation(
sessionHandle: SessionHandle,
catalogName: String,
schemaName: String,
functionName: String,
sessionManager: LivyThriftSessionManager)
extends SparkCatalogOperation(
sessionHandle, OperationType.GET_FUNCTIONS, sessionManager) with Logging {

@throws(classOf[HiveSQLException])
override protected def runInternal(): Unit = {
setState(OperationState.RUNNING)
try {
rscClient.submit(new GetFunctionsJob(
convertSchemaPattern(schemaName),
convertFunctionName(functionName),
sessionId,
jobId
)).get()

setState(OperationState.FINISHED)
} catch {
case e: Throwable =>
error("Remote job meet an exception: ", e)
setState(OperationState.ERROR)
throw new HiveSQLException(e)
}
}

@throws(classOf[HiveSQLException])
override def getResultSetSchema: Schema = {
assertState(Seq(OperationState.FINISHED))
GetFunctionsOperation.SCHEMA
}

private def convertFunctionName(name: String): String = {
if (name == null) {
".*"
} else {
var escape = false
name.flatMap {
case c if escape =>
if (c != '\\') escape = false
c.toString
case '\\' =>
escape = true
""
case '%' => ".*"
case '_' => "."
case c => Character.toLowerCase(c).toString
}
}
}
}

object GetFunctionsOperation {
val SCHEMA = Schema(
Field("FUNCTION_CAT", BasicDataType("string"), "Function catalog (may be null)"),
Field("FUNCTION_SCHEM", BasicDataType("string"), "Function schema (may be null)"),
Field("FUNCTION_NAME", BasicDataType("string"),
"Function name. This is the name used to invoke the function"),
Field("REMARKS", BasicDataType("string"), "Explanatory comment on the function"),
Field("FUNCTION_TYPE", BasicDataType("integer"),
"Kind of function."),
Field("SPECIFIC_NAME", BasicDataType("string"),
"The name which uniquely identifies this function within its schema")
)
}
Loading

0 comments on commit cae9d97

Please sign in to comment.