Skip to content

Commit

Permalink
Implementing the fetchSinkInfo in ConfigBasedIOResolver
Browse files Browse the repository at this point in the history
1. I think we missed implementing the fetchSinkInfo method in the ConfigBasedResolver when the API was introduced which is breaking the samza sql console tool. This fixes it.
2. latest release of mac removed realpath so the command line tools are broken. Removed the usage of realpath to fix these tools.

Thanks to nickpan47 for identifying these problems.

Author: Srinivasulu Punuru <[email protected]>

Reviewers: Yi Pan <[email protected]>

Closes apache#528 from srinipunuru/release-fix.1

(cherry picked from commit 171793b)
Signed-off-by: xiliu <[email protected]>
srinipunuru authored and xinyuiscool committed May 18, 2018
1 parent 69e63d8 commit 9bc03f7
Showing 4 changed files with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -19,7 +19,6 @@

package org.apache.samza.sql.impl;

import org.apache.commons.lang.NotImplementedException;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.operators.TableDescriptor;
@@ -62,7 +61,11 @@ public ConfigBasedIOResolver(Config config) {

@Override
public SqlIOConfig fetchSourceInfo(String source) {
String[] sourceComponents = source.split("\\.");
return fetchSystemInfo(source);
}

private SqlIOConfig fetchSystemInfo(String name) {
String[] sourceComponents = name.split("\\.");
boolean isTable = isTable(sourceComponents);

// This source resolver expects sources of format {systemName}.{streamName}[.$table]
@@ -86,7 +89,7 @@ public SqlIOConfig fetchSourceInfo(String source) {
}

if (invalidQuery) {
String msg = String.format("Source %s is not of the format {systemName}.{streamName}[.%s]", source,
String msg = String.format("Source %s is not of the format {systemName}.{streamName}[.%s]", name,
SAMZA_SQL_QUERY_TABLE_KEYWORD);
LOG.error(msg);
throw new SamzaException(msg);
@@ -97,7 +100,7 @@ public SqlIOConfig fetchSourceInfo(String source) {

TableDescriptor tableDescriptor = null;
if (isTable) {
tableDescriptor = new RocksDbTableDescriptor("InputTable-" + source)
tableDescriptor = new RocksDbTableDescriptor("InputTable-" + name)
.withSerde(KVSerde.of(
new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
@@ -108,7 +111,7 @@ public SqlIOConfig fetchSourceInfo(String source) {

@Override
public SqlIOConfig fetchSinkInfo(String sink) {
throw new NotImplementedException("No sink support in ConfigBasedIOResolver.");
return fetchSystemInfo(sink);
}

private boolean isTable(String[] sourceComponents) {
2 changes: 1 addition & 1 deletion samza-tools/scripts/eh-consumer.sh
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ if [ `uname` == 'Linux' ];
then
base_dir=$(readlink -f $(dirname $0))
else
base_dir=$(realpath $(dirname $0))
base_dir=$(dirname $0)
fi

if [ "x$LOG4J_OPTS" = "x" ]; then
2 changes: 1 addition & 1 deletion samza-tools/scripts/generate-kafka-events.sh
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ if [ `uname` == 'Linux' ];
then
base_dir=$(readlink -f $(dirname $0))
else
base_dir=$(realpath $(dirname $0))
base_dir=$(dirname $0)
fi

if [ "x$LOG4J_OPTS" = "x" ]; then
2 changes: 1 addition & 1 deletion samza-tools/scripts/samza-sql-console.sh
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ if [ `uname` == 'Linux' ];
then
base_dir=$(readlink -f $(dirname $0))
else
base_dir=$(realpath $(dirname $0))
base_dir=$(dirname $0)
fi

if [ "x$LOG4J_OPTS" = "x" ]; then

0 comments on commit 9bc03f7

Please sign in to comment.