Skip to content

Commit

Permalink
Adding pk and unique index test for batch write (pingcap#1049)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhexuany authored Aug 21, 2019
1 parent bb7c646 commit 7e3b92d
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import scala.util.Random
trait BaseEnumerateDataTypesTestSpec
extends MultiColumnDataTypeTestSpec
with BaseTestGenerationSpec {
def genIndex(dataTypes: List[ReflectedDataType], r: Random): List[Index]
def genIndex(dataTypes: List[ReflectedDataType], r: Random): List[List[Index]]

def genLen(dataType: ReflectedDataType): String = {
val baseType = getBaseType(dataType)
Expand Down Expand Up @@ -40,22 +40,24 @@ trait BaseEnumerateDataTypesTestSpec
indices.zipWithIndex.map { index =>
schemaGenerator(
database,
// table name
tablePrefix + index._2,
r,
dataTypesWithDescription,
List(index._1)
// constraint
index._1
)
}
}

private def toString(dataTypes: Seq[String]): String = dataTypes.hashCode().toString

override val rowCount = 10
override val rowCount = 50

override def getTableName(dataTypes: String*): String = s"test_${toString(dataTypes)}"
// we are not using below function, we probably need decouple the logic.
override def getTableName(dataTypes: String*): String = ???

override def getTableNameWithDesc(desc: String, dataTypes: String*): String =
s"test_${desc}_${toString(dataTypes)}"
override def getTableNameWithDesc(desc: String, dataTypes: String*): String = ???

override def getIndexName(dataTypes: String*): String = s"idx_${toString(dataTypes)}"
override def getIndexName(dataTypes: String*): String = ???
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.apache.spark.sql.insertion

import com.pingcap.tikv.meta.TiColumnInfo
import com.pingcap.tispark.datasource.BaseDataSourceTest
import com.pingcap.tispark.utils.TiUtil
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.generator.DataType.ReflectedDataType
import org.apache.spark.sql.test.generator.Schema
import org.apache.spark.sql.test.generator.TestDataGenerator._

class BatchWritePKAndUniqueIndexSuite
extends BaseDataSourceTest(
"batch_write_insertion_pk_and_one_unique_index",
"batch_write_test_index"
)
with EnumerateUniqueIndexDataTypeTestAction {
// TODO: support binary insertion.
override val dataTypes: List[ReflectedDataType] = integers ::: decimals ::: doubles ::: charCharset
override val unsignedDataTypes: List[ReflectedDataType] = integers ::: decimals ::: doubles
override val database = "batch_write_test_pk_and_index"
override val testDesc = "Test for pk and unique index type in batch-write insertion"

override def beforeAll(): Unit = {
super.beforeAll()
tidbStmt.execute(s"drop database if exists $database")
tidbStmt.execute(s"create database $database")
}

private def tiRowToSparkRow(row: TiRow, tiColsInfos: java.util.List[TiColumnInfo]) = {
val sparkRow = new Array[Any](row.fieldCount())
for (i <- 0 until row.fieldCount()) {
val colTp = tiColsInfos.get(i).getType
val colVal = row.get(i, colTp)
sparkRow(i) = colVal
}
Row.fromSeq(sparkRow)
}

private def dropAndCreateTbl(schema: Schema): Unit = {
// drop table if exits
dropTable(schema.tableName)

// create table in tidb first
jdbcUpdate(schema.toString)
}

private def insertAndSelect(schema: Schema): Unit = {
val tblName = schema.tableName

val tiTblInfo = getTableInfo(database, tblName)
val tiColInfos = tiTblInfo.getColumns
// gen data
val rows =
generateRandomRows(schema, rowCount, r).map(row => tiRowToSparkRow(row, tiColInfos))
// insert data to tikv
tidbWriteWithTable(rows, TiUtil.getSchemaFromTable(tiTblInfo), tblName)
// select data from tikv and compare with tidb
compareTiDBSelectWithJDBCWithTable_V2(tblName = tblName, "col_bigint")
}

test("test pk and unique indices cases") {
val schemas = genSchema(dataTypes, table)

schemas.foreach { schema =>
dropAndCreateTbl(schema)
}

schemas.foreach { schema =>
insertAndSelect(schema)
}
}

// this is only for mute the warning
override def test(): Unit = {}

override def afterAll(): Unit =
try {
dropTable()
} finally {
super.afterAll()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ class BatchWritePkSuite
// TODO: support binary insertion.
override val dataTypes: List[ReflectedDataType] = integers ::: decimals ::: doubles ::: charCharset
override val unsignedDataTypes: List[ReflectedDataType] = integers ::: decimals ::: doubles
override val dataTypeTestDir = "batch-write-test-index"
override val database = "batch_write_test_index"
override val testDesc = "Test for single PK column and multiple unique index type"
override val database = "batch_write_test_pk"
override val testDesc = "Test for single PK column in batch-write insertion"

override def beforeAll(): Unit = {
super.beforeAll()
Expand Down Expand Up @@ -56,7 +55,7 @@ class BatchWritePkSuite
compareTiDBSelectWithJDBCWithTable_V2(tblName = tblName, "col_bigint")
}

test("test unique indices cases") {
test("test pk cases") {
val schemas = genSchema(dataTypes, table)

schemas.foreach { schema =>
Expand All @@ -68,7 +67,7 @@ class BatchWritePkSuite
}
}

// this is only for
// this is only for mute the warning
override def test(): Unit = {}

override def afterAll(): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ class BatchWriteUniqueIndexSuite
// TODO: support binary insertion.
override val dataTypes: List[ReflectedDataType] = integers ::: decimals ::: doubles ::: charCharset
override val unsignedDataTypes: List[ReflectedDataType] = integers ::: decimals ::: doubles
override val dataTypeTestDir = "batch-write-test-index"
override val database = "batch_write_test_index"
override val testDesc = "Test for single PK column and multiple unique index type"
override val testDesc = "Test for single and multiple unique index type in batch-write insertion"

override def beforeAll(): Unit = {
super.beforeAll()
Expand Down Expand Up @@ -68,7 +67,7 @@ class BatchWriteUniqueIndexSuite
}
}

// this is only for
// this is only for mute the warning
override def test(): Unit = {}

override def afterAll(): Unit =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package org.apache.spark.sql.insertion

import org.apache.commons.math3.util.Combinations
import org.apache.spark.sql.test.generator.DataType.ReflectedDataType
import org.apache.spark.sql.test.generator.{DefaultColumn, Index, IndexColumn, Key, PrefixColumn, PrimaryKey}
import org.apache.spark.sql.test.generator.TestDataGenerator.isStringType

import scala.util.Random

trait EnumeratePKAndUniqueIndexDataTypeTestAction extends BaseEnumerateDataTypesTestSpec {
private def genPk(dataTypes: List[ReflectedDataType], r: Random): List[Index] = {
val size = dataTypes.length
val keyList = scala.collection.mutable.ListBuffer.empty[PrimaryKey]
for (i <- 0 until size) {
// we add extra one to the column id since 1 is reserved to primary key
val pkCol = if (isStringType(dataTypes(i))) {
PrefixColumn(i + 1, r.nextInt(4) + 2) :: Nil
} else {
DefaultColumn(i + 1) :: Nil
}
keyList += PrimaryKey(pkCol)
}
keyList.toList
}

private def genUniqueIndex(dataTypes: List[ReflectedDataType], r: Random): List[Index] = {
val size = dataTypes.length
// the first step is generate all possible keys
val keyList = scala.collection.mutable.ListBuffer.empty[Key]
for (i <- 1 until 3) {
val combination = new Combinations(size, i)
//(i, size)
val iterator = combination.iterator()
while (iterator.hasNext) {
val intArray = iterator.next()
val indexColumnList = scala.collection.mutable.ListBuffer.empty[IndexColumn]
// index may have multiple column
for (j <- 0 until intArray.length) {
// we add extra one to the column id since 1 is reserved to primary key
if (isStringType(dataTypes(intArray(j)))) {
indexColumnList += PrefixColumn(intArray(j) + 1, r.nextInt(4) + 2)
} else {
indexColumnList += DefaultColumn(intArray(j) + 1)
}
}

keyList += Key(indexColumnList.toList)
}
}

keyList.toList
}

override def genIndex(dataTypes: List[ReflectedDataType], r: Random): List[List[Index]] = {
val pkIdxList = genPk(dataTypes, r)
val uniqueIdxList = genUniqueIndex(dataTypes, r)
val constraints = scala.collection.mutable.ListBuffer.empty[List[Index]]
for (i <- pkIdxList.indices) {
val tmpIdxList = scala.collection.mutable.ListBuffer.empty[Index]
for (j <- uniqueIdxList.indices) {
tmpIdxList += pkIdxList(i)
tmpIdxList += uniqueIdxList(j)
}
constraints += tmpIdxList.toList
}
constraints.toList
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ import org.apache.spark.sql.test.generator.{DefaultColumn, Index, PrefixColumn,
import scala.util.Random

trait EnumeratePKDataTypeTestAction extends BaseEnumerateDataTypesTestSpec {
override def genIndex(dataTypes: List[ReflectedDataType], r: Random): List[Index] = {
override def genIndex(dataTypes: List[ReflectedDataType], r: Random): List[List[Index]] = {
val size = dataTypes.length
val keyList = scala.collection.mutable.ListBuffer.empty[PrimaryKey]
val keyList = scala.collection.mutable.ListBuffer.empty[List[PrimaryKey]]
for (i <- 0 until size) {
// we add extra one to the column id since 1 is reserved to primary key
val pkCol = if (isStringType(dataTypes(i))) {
PrefixColumn(i + 1, r.nextInt(4) + 2) :: Nil
} else {
DefaultColumn(i + 1) :: Nil
}
keyList += PrimaryKey(pkCol)
keyList += PrimaryKey(pkCol) :: Nil
}
keyList.toList
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import org.apache.spark.sql.types.MultiColumnDataTypeTestSpec
import scala.util.Random

trait EnumerateUniqueIndexDataTypeTestAction extends BaseEnumerateDataTypesTestSpec {
override def genIndex(dataTypes: List[ReflectedDataType], r: Random): List[Index] = {
override def genIndex(dataTypes: List[ReflectedDataType], r: Random): List[List[Index]] = {
val size = dataTypes.length
// the first step is generate all possible keys
val keyList = scala.collection.mutable.ListBuffer.empty[Key]
val keyList = scala.collection.mutable.ListBuffer.empty[List[Key]]
for (i <- 1 until 3) {
val combination = new Combinations(size, i)
//(i, size)
Expand All @@ -31,7 +31,7 @@ trait EnumerateUniqueIndexDataTypeTestAction extends BaseEnumerateDataTypesTestS
}
}

keyList += Key(indexColumnList.toList)
keyList += Key(indexColumnList.toList) :: Nil
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.apache.spark.sql.types

trait DataTypeTestDir {
val dataTypeTestDir: String
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import org.apache.spark.sql.test.generator.TestDataGenerator.{getDecimal, getLen

trait GenerateMultiColumnDataTypeTestAction
extends MultiColumnDataTypeTestSpec
with BaseTestGenerationSpec {
with BaseTestGenerationSpec
with DataTypeTestDir {

override val rowCount = 50

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,3 @@
/*
*
* Copyright 2019 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.spark.sql.types

import org.apache.spark.sql.TiSparkTestSpec
Expand All @@ -23,7 +6,6 @@ import org.apache.spark.sql.test.generator.DataType.ReflectedDataType
trait MultiColumnDataTypeTestSpec extends TiSparkTestSpec {
val dataTypes: List[ReflectedDataType]
val unsignedDataTypes: List[ReflectedDataType]
val dataTypeTestDir: String

val extraDesc = "unsigned"
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.types

import org.apache.spark.sql.test.generator.DataType.ReflectedDataType

trait RunMultiColumnDataTypeTestAction extends MultiColumnDataTypeTestSpec {
trait RunMultiColumnDataTypeTestAction extends MultiColumnDataTypeTestSpec with DataTypeTestDir {

def startTest(dataTypes: List[ReflectedDataType]): Unit = ???

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.types.pk

import org.apache.spark.sql.test.generator.DataType.{getTypeName, BIGINT, INT, ReflectedDataType}
import org.apache.spark.sql.test.generator.TestDataGenerator._
import org.apache.spark.sql.types.{MultiColumnDataTypeTest, RunMultiColumnDataTypeTestAction}
import org.apache.spark.sql.types.{DataTypeTestDir, MultiColumnDataTypeTest, RunMultiColumnDataTypeTestAction}

trait MultiColumnPKDataTypeSuites
extends MultiColumnDataTypeTest
Expand Down

0 comments on commit 7e3b92d

Please sign in to comment.