Skip to content

Commit

Permalink
Merge pull request byzer-org#886 from allwefantasy/ISSUE-885
Browse files Browse the repository at this point in the history
Save Redis/Hive/Carbondata fail
  • Loading branch information
allwefantasy authored Jan 10, 2019
2 parents b0aa8dc + 07a8b70 commit 051e735
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 21 deletions.
26 changes: 6 additions & 20 deletions streamingpro-mlsql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,12 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>tech.mlsql</groupId>
<artifactId>streamingpro-redis</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
Expand Down Expand Up @@ -671,25 +677,5 @@
</dependency>

</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
<configuration>
<argLine>-XX:MaxPermSize=128m</argLine>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class BatchSaveAdaptor(val scriptSQLExecListener: ScriptSQLExecListener,
mode, Option(oldDF)))
}.getOrElse {

if (final_path.contains("\\.")) {
if (final_path.contains(".")) {
val Array(_dbname, _dbtable) = final_path.split("\\.", 2)
ConnectMeta.presentThenCall(DBMappingKey(format, _dbname), options => {
final_path = _dbtable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package streaming.test.datasource

import org.apache.spark.streaming.BasicSparkOperation
import org.scalatest.BeforeAndAfterAll
import streaming.core.strategy.platform.SparkRuntime
import streaming.core.{BasicMLSQLConfig, SpecFunctions}
import streaming.dsl.ScriptSQLExec
import streaming.log.Logging

/**
* 2019-01-10 WilliamZhu([email protected])
*/
class RedisSpec extends BasicSparkOperation with SpecFunctions with BasicMLSQLConfig with BeforeAndAfterAll with Logging {
"load solr" should "work fine" in {

withBatchContext(setupBatchContext(batchParams, "classpath:///test/empty.json")) { runtime: SparkRuntime =>

implicit val spark = runtime.sparkSession

var sq = createSSEL
ScriptSQLExec.parse(
s"""
|
|connect redis
|where host=""
|and port="6379"
|and dbNum="0"
|as redis_test;
|
|select "a" as id, "b" as ck
|as tmp_test1;
|
|save overwrite tmp_test1
|as redis.`redis_test.tmp_test1_key`
|options insertType="listInsertAsString";
""".stripMargin, sq)

val (a, b) = server.exec("redis", "redis-cli get tmp_test1_key")
assume((a + b) == "a")
}
}

val server = new streaming.test.servers.RedisServer("5.0.3")

override protected def beforeAll(): Unit = {
server.startServer
}

override protected def afterAll(): Unit = {
server.stopServer
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package streaming.test.servers

/**
* 2018-12-21 WilliamZhu([email protected])
*/
class RedisServer(version: String) extends WowBaseTestServer {

override def composeYaml: String =
s"""
|version: '2'
|
|services:
| redis:
| image: 'redis:${version}'
| ports:
| - "6379:6379"
""".stripMargin

override def waitToServiceReady: Boolean = {
// wait mongo to ready, runs on host server
val shellCommand = s"redis-cli -r 3 ping"
readyCheck("redis", shellCommand, true)
}
}

0 comments on commit 051e735

Please sign in to comment.