Skip to content

Commit

Permalink
Core, Test: Refactor tests to support ClickHouse Cloud (#349)
Browse files Browse the repository at this point in the history
* Build, Core: Test with ClickHouse Cloud

* Spark 3.5: Test with ClickHouse Cloud

* Spark 3.4: Test with ClickHouse Cloud

* Spark 3.3: Test with ClickHouse Cloud

* Add withNodeClient username/password/opts & change Table engine from Log() to Memory()

* Add ssl only if isSslEnabled is true

* Extend table name to a void tests from hitting same table name

* Add connection timeout & set database name for the Client

* Fix spotless

* Disable parallel tun of Cloud tests

* Do not expect AnalysisException when running cloud test against Spark 3.3

* Code cleanup

* move import & set parallel = 1 when testing

* Add conditional sleep to older version

* Add missing import

* Fix Versions

* Moving back to parallel for local tests

---------

Co-authored-by: Cheng Pan <[email protected]>
  • Loading branch information
mzitnik and pan3793 authored Aug 6, 2024
1 parent b1e99cc commit 71b0715
Show file tree
Hide file tree
Showing 30 changed files with 406 additions and 80 deletions.
58 changes: 58 additions & 0 deletions .github/workflows/cloud.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name: "ClickHouse Cloud"

on:
push:
branches:
- "branch-*"
- "main"
pull_request:
branches:
- "branch-*"
- "main"

jobs:
run-tests-with-clickhouse-cloud:
runs-on: ubuntu-22.04
strategy:
max-parallel: 1
fail-fast: false
matrix:
spark: [ 3.3, 3.4, 3.5 ]
scala: [ 2.12, 2.13 ]
env:
CLICKHOUSE_CLOUD_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }}
CLICKHOUSE_CLOUD_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v4
with:
distribution: zulu
java-version: 8
cache: gradle
- run: >-
./gradlew clean cloudTest --no-daemon --refresh-dependencies
-Dspark_binary_version=${{ matrix.spark }}
-Dscala_binary_version=${{ matrix.scala }}
-PmavenCentralMirror=https://maven-central.storage-download.googleapis.com/maven2/
- name: Upload test logs
if: failure()
uses: actions/upload-artifact@v3
with:
name: log-clickhouse-cloud-spark-${{ matrix.spark }}-scala-${{ matrix.scala }}
path: |
**/build/unit-tests.log
log/**
14 changes: 14 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ subprojects {
environment("ROOT_PROJECT_DIR", rootProject.projectDir)
tags {
exclude 'org.scalatest.tags.Slow'
exclude 'org.scalatest.tags.Cloud'
}
}

Expand All @@ -130,6 +131,15 @@ subprojects {
}
}

tasks.register('cloudTest', Test) {
jvmArgs += extraJvmArgs
maxParallelForks = 1
environment("ROOT_PROJECT_DIR", rootProject.projectDir)
tags {
include 'org.scalatest.tags.Cloud'
}
}

scoverage {
scoverageVersion = "2.0.11"
reportDir.set(file("${rootProject.buildDir}/reports/scoverage"))
Expand Down Expand Up @@ -220,6 +230,10 @@ project(":clickhouse-core-it") {
slowTest {
classpath += files("${project(':clickhouse-core').projectDir}/src/testFixtures/conf")
}

cloudTest {
classpath += files("${project(':clickhouse-core').projectDir}/src/testFixtures/conf")
}
}

boolean isVersionFileExists() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@

package com.clickhouse.spark

import com.clickhouse.spark.base.ClickHouseSingleMixIn
import com.clickhouse.spark.base.{ClickHouseCloudMixIn, ClickHouseProvider, ClickHouseSingleMixIn}
import com.clickhouse.spark.client.NodeClient
import com.clickhouse.spark.hash.{CityHash64, HashUtils, Murmurhash2_32, Murmurhash2_64, Murmurhash3_32, Murmurhash3_64}
import com.clickhouse.spark.hash._
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.tags.Cloud

class HashSuite extends ClickHouseSingleMixIn with Logging {
@Cloud
class ClickHouseCloudHashSuite extends HashSuite with ClickHouseCloudMixIn

class ClickHouseSingleHashSuite extends HashSuite with ClickHouseSingleMixIn

abstract class HashSuite extends AnyFunSuite with ClickHouseProvider with Logging {

def testHash(
client: NodeClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,18 @@

package com.clickhouse.spark

import com.clickhouse.spark.base.ClickHouseSingleMixIn
import com.clickhouse.spark.base.{ClickHouseCloudMixIn, ClickHouseProvider, ClickHouseSingleMixIn}
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.tags.Cloud

import java.time.{LocalDateTime, ZoneId}

class UtilsClickHouseSuite extends ClickHouseSingleMixIn with Logging {
@Cloud
class ClickHouseCloudUtilsSuite extends UtilsSuite with ClickHouseCloudMixIn

class ClickHouseSingleUtilsSuite extends UtilsSuite with ClickHouseSingleMixIn

abstract class UtilsSuite extends AnyFunSuite with ClickHouseProvider with Logging {

test("parse date with nano seconds") {
withNodeClient() { client =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ object NodeClient {
}

class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
// TODO: add configurable timeout
private val timeout: Int = 30000

private lazy val userAgent = {
val title = getClass.getPackage.getImplementationTitle
Expand All @@ -53,7 +55,6 @@ class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
"Spark-ClickHouse-Connector"
}
}

private val node: ClickHouseNode = ClickHouseNode.builder()
.options(nodeSpec.options)
.host(nodeSpec.host)
Expand Down Expand Up @@ -158,6 +159,7 @@ class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
val req = client.read(node)
.query(sql, queryId).asInstanceOf[ClickHouseRequest[_]]
.format(ClickHouseFormat.valueOf(outputFormat)).asInstanceOf[ClickHouseRequest[_]]
.option(ClickHouseClientOption.CONNECTION_TIMEOUT, timeout).asInstanceOf[ClickHouseRequest[_]]
settings.foreach { case (k, v) => req.set(k, v).asInstanceOf[ClickHouseRequest[_]] }
Try(req.executeAndWait()) match {
case Success(resp) => Right(deserializer(resp.getInputStream))
Expand Down Expand Up @@ -193,6 +195,7 @@ class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
.query(sql, queryId).asInstanceOf[ClickHouseRequest[_]]
.compressServerResponse(outputCompressionType).asInstanceOf[ClickHouseRequest[_]]
.format(ClickHouseFormat.valueOf(outputFormat)).asInstanceOf[ClickHouseRequest[_]]
.option(ClickHouseClientOption.CONNECTION_TIMEOUT, timeout).asInstanceOf[ClickHouseRequest[_]]
settings.foreach { case (k, v) => req.set(k, v).asInstanceOf[ClickHouseRequest[_]] }
Try(req.executeAndWait()) match {
case Success(resp) => resp
Expand All @@ -211,4 +214,6 @@ class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
|$sql
|""".stripMargin
)
def ping(timeout: Int = timeout) =
client.ping(node, timeout)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.scalatest.tags;

import java.lang.annotation.*;
import org.scalatest.TagAnnotation;

@TagAnnotation
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
@Inherited
public @interface Cloud {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.clickhouse.spark.base

import com.clickhouse.data.ClickHouseVersion
import com.clickhouse.spark.Utils

trait ClickHouseCloudMixIn extends ClickHouseProvider {

override def clickhouseHost: String = Utils.load("CLICKHOUSE_CLOUD_HOST")

override def clickhouseHttpPort: Int = Utils.load("CLICKHOUSE_CLOUD_HTTP_PORT", "8443").toInt

override def clickhouseTcpPort: Int = Utils.load("CLICKHOUSE_CLOUD_TCP_PORT", "9000").toInt

override def clickhouseUser: String = Utils.load("CLICKHOUSE_CLOUD_USER", "default")

override def clickhousePassword: String = Utils.load("CLICKHOUSE_CLOUD_PASSWORD")

override def clickhouseDatabase: String = "default"

override def clickhouseVersion: ClickHouseVersion = ClickHouseVersion.of("latest")

override def isSslEnabled: Boolean = true
override def isCloud: Boolean = true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.clickhouse.spark.base

import com.clickhouse.client.ClickHouseProtocol
import com.clickhouse.client.ClickHouseProtocol.HTTP
import com.clickhouse.data.ClickHouseVersion
import com.clickhouse.spark.Utils
import com.clickhouse.spark.client.NodeClient
import com.clickhouse.spark.spec.NodeSpec
import scala.collection.JavaConverters._

trait ClickHouseProvider {
def clickhouseHost: String
def clickhouseHttpPort: Int
def clickhouseTcpPort: Int
def clickhouseUser: String
def clickhousePassword: String
def clickhouseDatabase: String
def clickhouseVersion: ClickHouseVersion
def isSslEnabled: Boolean
def isCloud: Boolean = false

def withNodeClient(protocol: ClickHouseProtocol = HTTP)(block: NodeClient => Unit): Unit =
Utils.tryWithResource {
NodeClient(NodeSpec(
clickhouseHost,
Some(clickhouseHttpPort),
Some(clickhouseTcpPort),
protocol,
username = clickhouseUser,
database = clickhouseDatabase,
password = clickhousePassword,
options = Map("ssl" -> isSslEnabled.toString).asJava
))
} {
client => block(client)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,26 @@
package com.clickhouse.spark.base

import com.clickhouse.spark.Utils
import com.clickhouse.client.ClickHouseProtocol
import com.clickhouse.client.ClickHouseProtocol._
import com.clickhouse.data.ClickHouseVersion
import com.clickhouse.spark.client.NodeClient
import com.clickhouse.spark.spec.NodeSpec
import com.dimafeng.testcontainers.{ForAllTestContainer, JdbcDatabaseContainer, SingleContainer}
import org.scalatest.funsuite.AnyFunSuite
import org.testcontainers.containers.ClickHouseContainer
import org.testcontainers.utility.{DockerImageName, MountableFile}

import java.nio.file.{Path, Paths}

import scala.collection.JavaConverters._

trait ClickHouseSingleMixIn extends AnyFunSuite with ForAllTestContainer {
trait ClickHouseSingleMixIn extends AnyFunSuite with ForAllTestContainer with ClickHouseProvider {
// format: off
val CLICKHOUSE_IMAGE: String = Utils.load("CLICKHOUSE_IMAGE", "clickhouse/clickhouse-server:23.8")
val CLICKHOUSE_USER: String = Utils.load("CLICKHOUSE_USER", "default")
val CLICKHOUSE_PASSWORD: String = Utils.load("CLICKHOUSE_PASSWORD", "")
val CLICKHOUSE_DB: String = Utils.load("CLICKHOUSE_DB", "")
private val CLICKHOUSE_IMAGE: String = Utils.load("CLICKHOUSE_IMAGE", "clickhouse/clickhouse-server:23.8")
private val CLICKHOUSE_USER: String = Utils.load("CLICKHOUSE_USER", "default")
private val CLICKHOUSE_PASSWORD: String = Utils.load("CLICKHOUSE_PASSWORD", "")
private val CLICKHOUSE_DB: String = Utils.load("CLICKHOUSE_DB", "")

private val CLICKHOUSE_HTTP_PORT = 8123
private val CLICKHOUSE_TPC_PORT = 9000
// format: on

protected val clickhouseVersion: ClickHouseVersion = ClickHouseVersion.of(CLICKHOUSE_IMAGE.split(":").last)
override val clickhouseVersion: ClickHouseVersion = ClickHouseVersion.of(CLICKHOUSE_IMAGE.split(":").last)

protected val rootProjectDir: Path = {
val thisClassURI = this.getClass.getProtectionDomain.getCodeSource.getLocation.toURI
Expand Down Expand Up @@ -77,16 +71,12 @@ trait ClickHouseSingleMixIn extends AnyFunSuite with ForAllTestContainer {
)
.asInstanceOf[ClickHouseContainer]
}
// format: off
def clickhouseHost: String = container.host
def clickhouseHttpPort: Int = container.mappedPort(CLICKHOUSE_HTTP_PORT)
def clickhouseTcpPort: Int = container.mappedPort(CLICKHOUSE_TPC_PORT)
// format: on

def withNodeClient(protocol: ClickHouseProtocol = HTTP)(block: NodeClient => Unit): Unit =
Utils.tryWithResource {
NodeClient(NodeSpec(clickhouseHost, Some(clickhouseHttpPort), Some(clickhouseTcpPort), protocol))
} {
client => block(client)
}
override def clickhouseHost: String = container.host
override def clickhouseHttpPort: Int = container.mappedPort(CLICKHOUSE_HTTP_PORT)
override def clickhouseTcpPort: Int = container.mappedPort(CLICKHOUSE_TPC_PORT)
override def clickhouseUser: String = CLICKHOUSE_USER
override def clickhousePassword: String = CLICKHOUSE_PASSWORD
override def clickhouseDatabase: String = CLICKHOUSE_DB
override def isSslEnabled: Boolean = false
}
4 changes: 4 additions & 0 deletions spark-3.3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,8 @@ project(":clickhouse-spark-it-${spark_binary_version}_$scala_binary_version") {
slowTest {
classpath += files("${project(':clickhouse-core').projectDir}/src/testFixtures/conf")
}

cloudTest {
classpath += files("${project(':clickhouse-core').projectDir}/src/testFixtures/conf")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,22 @@

package org.apache.spark.sql.clickhouse.single

import com.clickhouse.spark.base.{ClickHouseCloudMixIn, ClickHouseSingleMixIn}
import org.apache.spark.sql.clickhouse.ClickHouseSQLConf.USE_NULLABLE_QUERY_SCHEMA
import org.apache.spark.sql.clickhouse.SparkUtils
import org.apache.spark.sql.types.DataTypes.{createArrayType, createMapType}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row}
import org.scalatest.tags.Cloud

import java.math.MathContext

class ClickHouseDataTypeSuite extends SparkClickHouseSingleTest {
@Cloud
class ClickHouseCloudDataTypeSuite extends ClickHouseDataTypeSuite with ClickHouseCloudMixIn

class ClickHouseSingleDataTypeSuite extends ClickHouseDataTypeSuite with ClickHouseSingleMixIn

abstract class ClickHouseDataTypeSuite extends SparkClickHouseSingleTest {

val SPARK_43390_ENABLED: Boolean = sys.env.contains("SPARK_43390_ENABLED") || {
SparkUtils.MAJOR_MINOR_VERSION match {
Expand Down Expand Up @@ -177,6 +184,9 @@ class ClickHouseDataTypeSuite extends SparkClickHouseSingleTest {
: Unit = {
val db = "test_kv_db"
val tbl = "test_kv_tbl"
if (!clickhouseVersion.isNewerOrEqualTo("23.3") || isCloud) {
Thread.sleep(1000)
}
withKVTable(db, tbl, valueColDef = valueColDef) {
prepare(db, tbl)
val df = spark.sql(s"SELECT key, value FROM $db.$tbl ORDER BY key")
Expand Down
Loading

0 comments on commit 71b0715

Please sign in to comment.