From 3626382f9e718b446cdb68341df0dc46036f1068 Mon Sep 17 00:00:00 2001 From: yihengwang Date: Sun, 25 Aug 2019 09:47:49 +0200 Subject: [PATCH] [LIVY-574][TESTS][THRIFT] Add tests for metadata operations ## What changes were proposed in this pull request? Add unit test for existing meta operation: GetCatalogsOperation/GetTableTypesOperation/GetTypeInfoOperation. We also fix issues we met. ## How was this patch tested? Add new unit tests and existing test. We also use SquirrelSQL test relate operations. ![image](https://user-images.githubusercontent.com/1297418/62930007-26a93400-bdee-11e9-9364-259308724db6.png) ![image](https://user-images.githubusercontent.com/1297418/62930073-42143f00-bdee-11e9-9409-62d07ad0eabd.png) Author: yihengwang Closes #197 from yiheng/fix_574. --- .../thriftserver/cli/ThriftCLIService.scala | 6 +- .../operation/GetCatalogsOperation.scala | 4 +- .../operation/GetTypeInfoOperation.scala | 11 +- .../operation/MetadataOperation.scala | 2 +- .../thriftserver/serde/ThriftResultSet.scala | 8 ++ .../thriftserver/ThriftServerSuites.scala | 102 +++++++++++++++++- .../thriftserver/session/ColumnBuffer.java | 96 ++++++++++++++--- .../session/ColumnBufferTest.java | 96 +++++++++++++++++ 8 files changed, 300 insertions(+), 25 deletions(-) diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftCLIService.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftCLIService.scala index da108ab4f..9cced79d7 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftCLIService.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftCLIService.scala @@ -397,7 +397,7 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName: override def GetTypeInfo(req: TGetTypeInfoReq): TGetTypeInfoResp = { val resp = new TGetTypeInfoResp try { - val operationHandle = cliService.getTypeInfo(new SessionHandle(req.getSessionHandle)) + val operationHandle = cliService.getTypeInfo(createSessionHandle(req.getSessionHandle)) resp.setOperationHandle(operationHandle.toTOperationHandle) resp.setStatus(ThriftCLIService.OK_STATUS) } catch { @@ -412,7 +412,7 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName: override def GetCatalogs(req: TGetCatalogsReq): TGetCatalogsResp = { val resp = new TGetCatalogsResp try { - val opHandle = cliService.getCatalogs(new SessionHandle(req.getSessionHandle)) + val opHandle = cliService.getCatalogs(createSessionHandle(req.getSessionHandle)) resp.setOperationHandle(opHandle.toTOperationHandle) resp.setStatus(ThriftCLIService.OK_STATUS) } catch { @@ -463,7 +463,7 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName: override def GetTableTypes(req: TGetTableTypesReq): TGetTableTypesResp = { val resp = new TGetTableTypesResp try { - val opHandle = cliService.getTableTypes(new SessionHandle(req.getSessionHandle)) + val opHandle = cliService.getTableTypes(createSessionHandle(req.getSessionHandle)) resp.setOperationHandle(opHandle.toTOperationHandle) resp.setStatus(ThriftCLIService.OK_STATUS) } catch { diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetCatalogsOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetCatalogsOperation.scala index 57687b0df..781c02266 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetCatalogsOperation.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetCatalogsOperation.scala @@ -33,11 +33,11 @@ class GetCatalogsOperation(sessionHandle: SessionHandle) @throws(classOf[HiveSQLException]) override def runInternal(): Unit = { setState(OperationState.RUNNING) - info("Fetching table type metadata") + info("Fetching catalogs metadata") try { // catalogs are actually not supported in spark, so this is a no-op setState(OperationState.FINISHED) - info("Fetching table type metadata has been successfully finished") + info("Fetching catalogs has been successfully finished") } catch { case e: Throwable => setState(OperationState.ERROR) diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTypeInfoOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTypeInfoOperation.scala index a587445d0..b91a3356b 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTypeInfoOperation.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTypeInfoOperation.scala @@ -44,21 +44,22 @@ class GetTypeInfoOperation(sessionHandle: SessionHandle) val data = Array[Any]( t.name, t.sqlType, - t.precision, + t.precision.orNull, null, // LITERAL_PREFIX null, // LITERAL_SUFFIX null, // CREATE_PARAMS - DatabaseMetaData.typeNullable, // All types are nullable + DatabaseMetaData.typeNullable.toShort, // All types are nullable t.caseSensitive, + t.searchable, t.unsignedAttribute, false, // FIXED_PREC_SCALE false, // AUTO_INCREMENT null, // LOCAL_TYPE_NAME - 0, // MINIMUM_SCALE - 0, // MAXIMUM_SCALE + 0.toShort, // MINIMUM_SCALE + 0.toShort, // MAXIMUM_SCALE null, // SQL_DATA_TYPE null, // SQL_DATETIME_SUB - t.numPrecRadix) + t.numPrecRadix.orNull) rowSet.addRow(data) } setState(OperationState.FINISHED) diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala index 4689f26da..48c17cc07 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala @@ -50,6 +50,6 @@ abstract class MetadataOperation(sessionHandle: SessionHandle, opType: Operation if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setRowOffset(0) } - rowSet + rowSet.extractSubset(maxRows.toInt) } } diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ThriftResultSet.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ThriftResultSet.scala index 2a58c4a88..529352e67 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ThriftResultSet.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ThriftResultSet.scala @@ -32,6 +32,7 @@ abstract class ThriftResultSet { def addRow(row: Array[Any]): Unit def setRowOffset(rowOffset: Long): Unit def numRows: Int + def extractSubset(maxRows: Int): ThriftResultSet } object ThriftResultSet { @@ -124,4 +125,11 @@ class ColumnOrientedResultSet( override def setRowOffset(rowOffset: Long): Unit = this.rowOffset = rowOffset override def numRows: Int = columns.headOption.map(_.size).getOrElse(0) + + override def extractSubset(maxRows: Int): ThriftResultSet = { + val nRows = Math.min(numRows, maxRows) + val result = new ColumnOrientedResultSet(columns.map(_.extractSubset(nRows))) + rowOffset += nRows + result + } } diff --git a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala index e91c0d95c..438d86c7f 100644 --- a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala +++ b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala @@ -17,7 +17,7 @@ package org.apache.livy.thriftserver -import java.sql.{Connection, Date, SQLException, Statement} +import java.sql.{Connection, Date, SQLException, Statement, Types} import org.apache.hive.jdbc.HiveStatement @@ -190,6 +190,70 @@ trait CommonThriftTests { "Livy session has not yet started. Please wait for it to be ready...") assert(!logIterator.hasNext) } + + def getCatalogTest(connection: Connection): Unit = { + val metadata = connection.getMetaData + val catalogResultSet = metadata.getCatalogs() + // Spark doesn't support getCatalog. In current implementation, it's a no-op and does not return + // any data + assert(!catalogResultSet.next()) + } + + def getTableTypeTest(connection: Connection): Unit = { + val metadata = connection.getMetaData + val tableTypesResultSet = metadata.getTableTypes() + tableTypesResultSet.next() + assert(tableTypesResultSet.getString(1) == "TABLE") + tableTypesResultSet.next() + assert(tableTypesResultSet.getString(1) == "VIEW") + assert(!tableTypesResultSet.next()) + } + + def getTypeInfoTest(connection: Connection): Unit = { + val metadata = connection.getMetaData + val typeInfoResultSet = metadata.getTypeInfo() + val expectResults = Array( + ("void", Types.NULL, 0, 1, false, 0, true, false, false, null, 0, 0, 0, 0, 0), + ("boolean", Types.BOOLEAN, 0, 1, false, 2, true, false, false, null, 0, 0, 0, 0, 0), + ("byte", Types.TINYINT, 3, 1, false, 2, false, false, false, null, 0, 0, 0, 0, 10), + ("short", Types.SMALLINT, 5, 1, false, 2, false, false, false, null, 0, 0, 0, 0, 10), + ("integer", Types.INTEGER, 10, 1, false, 2, false, false, false, null, 0, 0, 0, 0, 10), + ("long", Types.BIGINT, 19, 1, false, 2, false, false, false, null, 0, 0, 0, 0, 10), + ("float", Types.FLOAT, 7, 1, false, 2, false, false, false, null, 0, 0, 0, 0, 10), + ("double", Types.DOUBLE, 15, 1, false, 2, false, false, false, null, 0, 0, 0, 0, 10), + ("date", Types.DATE, 0, 1, false, 2, true, false, false, null, 0, 0, 0, 0, 0), + ("timestamp", Types.TIMESTAMP, 0, 1, false, 2, true, false, false, null, 0, 0, 0, 0, 0), + ("string", Types.VARCHAR, 0, 1, true, 3, true, false, false, null, 0, 0, 0, 0, 0), + ("binary", Types.BINARY, 0, 1, false, 2, true, false, false, null, 0, 0, 0, 0, 0), + ("decimal", Types.DECIMAL, 38, 1, false, 2, false, false, false, null, 0, 0, 0, 0, 10), + ("array", Types.ARRAY, 0, 1, false, 2, true, false, false, null, 0, 0, 0, 0, 0), + ("map", Types.OTHER, 0, 1, false, 0, true, false, false, null, 0, 0, 0, 0, 0), + ("struct", Types.STRUCT, 0, 1, false, 2, true, false, false, null, 0, 0, 0, 0, 0), + ("udt", Types.OTHER, 0, 1, false, 0, true, false, false, null, 0, 0, 0, 0, 0) + ) + for (expect <- expectResults) { + typeInfoResultSet.next() + assert(typeInfoResultSet.getString(1) == expect._1) + assert(typeInfoResultSet.getInt(2) == expect._2) + assert(typeInfoResultSet.getInt(3) == expect._3) + assert(typeInfoResultSet.getString(4) == null) + assert(typeInfoResultSet.getString(5) == null) + assert(typeInfoResultSet.getString(6) == null) + assert(typeInfoResultSet.getShort(7) == expect._4) + assert(typeInfoResultSet.getBoolean(8) == expect._5) + assert(typeInfoResultSet.getShort(9) == expect._6) + assert(typeInfoResultSet.getBoolean(10) == expect._7) + assert(typeInfoResultSet.getBoolean(11) == expect._8) + assert(typeInfoResultSet.getBoolean(12) == expect._9) + assert(typeInfoResultSet.getString(13) == expect._10) + assert(typeInfoResultSet.getShort(14) == expect._11) + assert(typeInfoResultSet.getShort(15) == expect._12) + assert(typeInfoResultSet.getInt(16) == expect._13) + assert(typeInfoResultSet.getInt(17) == expect._14) + assert(typeInfoResultSet.getInt(18) == expect._15) + } + assert(!typeInfoResultSet.next()) + } } class BinaryThriftServerSuite extends ThriftServerBaseTest with CommonThriftTests { @@ -314,6 +378,24 @@ class BinaryThriftServerSuite extends ThriftServerBaseTest with CommonThriftTest operationLogRetrievalTest(statement) } } + + test("fetch catalog test") { + withJdbcConnection { c => + getCatalogTest(c) + } + } + + test("get table types test") { + withJdbcConnection { c => + getTableTypeTest(c) + } + } + + test("get types info test") { + withJdbcConnection { c => + getTypeInfoTest(c) + } + } } class HttpThriftServerSuite extends ThriftServerBaseTest with CommonThriftTests { @@ -356,4 +438,22 @@ class HttpThriftServerSuite extends ThriftServerBaseTest with CommonThriftTests operationLogRetrievalTest(statement) } } + + test("fetch catalog test") { + withJdbcConnection { c => + getCatalogTest(c) + } + } + + test("get table types test") { + withJdbcConnection { c => + getTableTypeTest(c) + } + } + + test("get types info test") { + withJdbcConnection { c => + getTypeInfoTest(c) + } + } } diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java index d4ec7470c..f58d53617 100644 --- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java @@ -90,6 +90,42 @@ public ColumnBuffer(DataType type) { } } + private ColumnBuffer(DataType type, byte[] nulls, Object values, int currentSize) { + this.type = type; + this.nulls = nulls; + this.currentSize = currentSize; + + switch (type) { + case BOOLEAN: + bools = (boolean[]) values; + break; + case BYTE: + bytes = (byte[]) values; + break; + case SHORT: + shorts = (short[]) values; + break; + case INTEGER: + ints = (int[]) values; + break; + case LONG: + longs = (long[]) values; + break; + case FLOAT: + floats = (float[]) values; + break; + case DOUBLE: + doubles = (double[]) values; + break; + case BINARY: + buffers = (byte[][]) values; + break; + case STRING: + strings = (String[]) values; + break; + } + } + public DataType getType() { return type; } @@ -204,39 +240,73 @@ public BitSet getNulls() { return nulls != null ? BitSet.valueOf(nulls) : new BitSet(); } - public ColumnBuffer extractSubset(int start, int end) { - ColumnBuffer subset = new ColumnBuffer(type); - subset.currentSize = end - start; - subset.ensureCapacity(); + /** + * Extract subset data to a new ColumnBuffer. It will remove the extracted data from current + * ColumnBuffer. + * + * @param end index of the end row, exclusive + */ + public ColumnBuffer extractSubset(int end) { + if (end > this.currentSize) { + end = this.currentSize; + } + if (end < 0) { + end = 0; + } + + byte[] subNulls = getNulls().get(0, end).toByteArray(); + int split = 0; + ColumnBuffer subset = null; switch (type) { case BOOLEAN: - System.arraycopy(bools, start, subset.bools, 0, end - start); + split = Math.min(bools.length, end); + subset = new ColumnBuffer(type, subNulls, Arrays.copyOfRange(bools, 0, split), end); + bools = Arrays.copyOfRange(bools, split, bools.length); break; case BYTE: - System.arraycopy(bytes, start, subset.bytes, 0, end - start); + split = Math.min(bytes.length, end); + subset = new ColumnBuffer(type, subNulls, Arrays.copyOfRange(bytes, 0, split), end); + bytes = Arrays.copyOfRange(bytes, split, bytes.length); break; case SHORT: - System.arraycopy(shorts, start, subset.shorts, 0, end - start); + split = Math.min(shorts.length, end); + subset = new ColumnBuffer(type, subNulls, Arrays.copyOfRange(shorts, 0, split), end); + shorts = Arrays.copyOfRange(shorts, split, shorts.length); break; case INTEGER: - System.arraycopy(ints, start, subset.ints, 0, end - start); + split = Math.min(ints.length, end); + subset = new ColumnBuffer(type, subNulls, Arrays.copyOfRange(ints, 0, split), end); + ints = Arrays.copyOfRange(ints, split, ints.length); break; case LONG: - System.arraycopy(longs, start, subset.longs, 0, end - start); + split = Math.min(longs.length, end); + subset = new ColumnBuffer(type, subNulls, Arrays.copyOfRange(longs, 0, split), end); + longs = Arrays.copyOfRange(longs, split, longs.length); break; case FLOAT: - System.arraycopy(floats, start, subset.floats, 0, end - start); + split = Math.min(floats.length, end); + subset = new ColumnBuffer(type, subNulls, Arrays.copyOfRange(floats, 0, split), end); + floats = Arrays.copyOfRange(floats, split, floats.length); break; case DOUBLE: - System.arraycopy(doubles, start, subset.doubles, 0, end - start); + split = Math.min(doubles.length, end); + subset = new ColumnBuffer(type, subNulls, Arrays.copyOfRange(doubles, 0, split), end); + doubles = Arrays.copyOfRange(doubles, split, doubles.length); break; case BINARY: - System.arraycopy(buffers, start, subset.buffers, 0, end - start); + split = Math.min(buffers.length, end); + subset = new ColumnBuffer(type, subNulls, Arrays.copyOfRange(buffers, 0, split), end); + buffers = Arrays.copyOfRange(buffers, split, buffers.length); break; case STRING: - System.arraycopy(strings, start, subset.strings, 0, end - start); + split = Math.min(strings.length, end); + subset = new ColumnBuffer(type, subNulls, Arrays.copyOfRange(strings, 0, split), end); + strings = Arrays.copyOfRange(strings, split, strings.length); break; } + nulls = getNulls().get(end, currentSize).toByteArray(); + currentSize = currentSize - end; + return subset; } diff --git a/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ColumnBufferTest.java b/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ColumnBufferTest.java index 3a36ba1c4..518146a6c 100644 --- a/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ColumnBufferTest.java +++ b/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ColumnBufferTest.java @@ -229,6 +229,102 @@ public void testBinaryColumn() { } } + @Test + public void testExtractSubset() { + testExtractSubsetWithType(DataType.BOOLEAN, new Object[]{true, false, true, false, null}); + testExtractSubsetWithType(DataType.BYTE, + new Object[]{(byte)0, (byte)1, (byte)2, (byte)3, null}); + testExtractSubsetWithType(DataType.SHORT, + new Object[]{(short)0, (short)1, (short)2, (short)3, null}); + testExtractSubsetWithType(DataType.INTEGER, new Object[]{1, 2, 3, 4, null}); + testExtractSubsetWithType(DataType.LONG, new Object[]{1L, 2L, 3L, 4L, null}); + testExtractSubsetWithType(DataType.FLOAT, new Object[]{1.0f, 2.0f, 3.0f, 4.0f, null}); + testExtractSubsetWithType(DataType.DOUBLE, new Object[]{1.0, 2.0, 3.0, 4.0, null}); + testExtractSubsetWithType(DataType.BINARY, new Object[]{ + new byte[]{0, 1}, + new byte[]{0}, + new byte[]{}, + new byte[]{0, 1, 2}, + null}); + testExtractSubsetWithType(DataType.STRING, new Object[]{"a", "b", "c", "d", null}); + + // When null bits is less than currentSize + ColumnBuffer buffer = new ColumnBuffer(DataType.STRING); + buffer.add(null); + buffer.add("a"); + buffer.add("b"); + ColumnBuffer subset = buffer.extractSubset(2); + assertEquals(2, subset.size()); + assertNull(subset.get(0)); + assertEquals("a", subset.get(1)); + assertTrue(subset.getNulls().get(0)); + + // When value array length is less than currentSize + buffer = new ColumnBuffer(DataType.STRING); + for (int i = 0; i < 110; i++) { + buffer.add(null); + } + subset = buffer.extractSubset(110); + BitSet nullBits = subset.getNulls(); + assertEquals(110, subset.size()); + for (int i = 0; i < 110; i++) { + assertNull(subset.get(i)); + assertTrue(nullBits.get(i)); + } + assertEquals(0, buffer.size()); + } + + private void testExtractSubsetWithType( + DataType type, + Object[] initValues) { + ColumnBuffer buffer = new ColumnBuffer(type); + + // Check the passed in test data + // The number of initial value for test should be 5 + assertEquals(5, initValues.length); + // The last one should be null + assertNull(initValues[4]); + + for (int i = 0; i < initValues.length; i++) { + buffer.add(initValues[i]); + // Binary column buffer will wrap byte[] in a ByteBuffer + // We update initValues for the following comparision for Binary ColumnBuffer + if (type == DataType.BINARY && initValues[i] != null) { + initValues[i] = ByteBuffer.wrap((byte[]) initValues[i]); + } + } + + ColumnBuffer buffer1 = buffer.extractSubset(100); + assertEquals(5, buffer1.size()); + // null bit should be set + assertTrue(buffer1.getNulls().get(4)); + assertEquals(initValues[0], buffer1.get(0)); + assertEquals(initValues[1], buffer1.get(1)); + assertEquals(initValues[2], buffer1.get(2)); + assertEquals(initValues[3], buffer1.get(3)); + assertEquals(initValues[4], buffer1.get(4)); + assertEquals(0, buffer.size()); + + // extract negative index + ColumnBuffer buffer2 = buffer1.extractSubset(-1); + assertEquals(0, buffer2.size()); + assertEquals(5, buffer1.size()); + + // Extract single element + ColumnBuffer buffer3 = buffer1.extractSubset(1); + assertEquals(1, buffer3.size()); + assertFalse(buffer3.getNulls().get(0)); + assertEquals(initValues[0], buffer3.get(0)); + assertEquals(4, buffer1.size()); + + // null bits should not be set + ColumnBuffer buffer4 = buffer1.extractSubset(2); + assertEquals(0, buffer4.getNulls().size()); + assertEquals(initValues[1], buffer4.get(0)); + assertEquals(initValues[2], buffer4.get(1)); + assertEquals(2, buffer1.size()); + } + public static class TestBean { private int id; private boolean bool;