Skip to content

Commit

Permalink
PHOENIX-2359 Configuration for PQS to use Protobuf serialization inst…
Browse files Browse the repository at this point in the history
…ead of JSON (elserj)
  • Loading branch information
Mujtaba committed Nov 17, 2015
1 parent 6a37307 commit 5a18fa1
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 48 deletions.
30 changes: 28 additions & 2 deletions bin/sqlline-thin.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def kill_child():

url = "localhost:8765"
sqlfile = ""
serialization_key = 'phoenix.queryserver.serialization'

def usage_and_exit():
sys.exit("usage: sqlline-thin.py [host[:port]] [sql_file]")
Expand All @@ -53,6 +54,29 @@ def cleanup_url(url):
url = url + ":8765"
return url

def get_serialization():
default_serialization='PROTOBUF'
env=os.environ.copy()
hbase_cmd = phoenix_utils.which('hbase')
if hbase_cmd is None:
print 'Failed to find hbase executable on PATH, defaulting serialization to %s.' % default_serialization
return default_serialization

env['HBASE_CONF_DIR'] = phoenix_utils.hbase_conf_dir
proc = subprocess.Popen([hbase_cmd, 'org.apache.hadoop.hbase.util.HBaseConfTool', serialization_key],
env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = proc.communicate()
if proc.returncode != 0:
print 'Failed to extract serialization from hbase-site.xml, defaulting to %s.' % default_serialization
return default_serialization
# Don't expect this to happen, but give a default value just in case
if stdout is None:
return default_serialization

stdout = stdout.strip()
if stdout == 'null':
return default_serialization
return stdout

if len(sys.argv) == 1:
pass
Expand Down Expand Up @@ -81,6 +105,8 @@ def cleanup_url(url):
# HBase/Phoenix client side property override
hbase_config_path = os.getenv('HBASE_CONF_DIR', phoenix_utils.current_dir)

serialization = get_serialization()

java_home = os.getenv('JAVA_HOME')

# load hbase-env.??? to extract JAVA_HOME, HBASE_PID_DIR, HBASE_LOG_DIR
Expand Down Expand Up @@ -112,10 +138,10 @@ def cleanup_url(url):
java = 'java'

java_cmd = java + ' -cp "' + phoenix_utils.hbase_conf_dir + os.pathsep + phoenix_utils.phoenix_thin_client_jar + \
os.pathsep + phoenix_utils.hadoop_conf + os.pathsep + phoenix_utils.hadoop_classpath + '" -Dlog4j.configuration=file:' + \
os.pathsep + phoenix_utils.hadoop_conf + os.pathsep + phoenix_utils.hadoop_classpath + '" -Dlog4j.configuration=file:' + \
os.path.join(phoenix_utils.current_dir, "log4j.properties") + \
" sqlline.SqlLine -d org.apache.phoenix.queryserver.client.Driver " + \
" -u jdbc:phoenix:thin:url=" + url + \
" -u jdbc:phoenix:thin:url='" + url + ";serialization=" + serialization + "'" + \
" -n none -p none --color=" + colorSetting + " --fastConnect=false --verbose=true " + \
" --isolation=TRANSACTION_READ_COMMITTED " + sqlfile

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,17 @@ public interface QueryServices extends SQLCloseable {
public static final String RETURN_SEQUENCE_VALUES_ATTRIB = "phoenix.sequence.returnValues";
public static final String EXTRA_JDBC_ARGUMENTS_ATTRIB = "phoenix.jdbc.extra.arguments";

// queryserver configuration keys
public static final String QUERY_SERVER_SERIALIZATION_ATTRIB = "phoenix.queryserver.serialization";
public static final String QUERY_SERVER_META_FACTORY_ATTRIB = "phoenix.queryserver.metafactory.class";
public static final String QUERY_SERVER_HTTP_PORT_ATTRIB = "phoenix.queryserver.http.port";
public static final String QUERY_SERVER_ENV_LOGGING_ATTRIB = "phoenix.queryserver.envvars.logging.disabled";
public static final String QUERY_SERVER_ENV_LOGGING_SKIPWORDS_ATTRIB = "phoenix.queryserver.envvars.logging.skipwords";
public static final String QUERY_SERVER_KEYTAB_FILENAME_ATTRIB = "phoenix.queryserver.keytab.file";
public static final String QUERY_SERVER_KERBEROS_PRINCIPAL_ATTRIB = "phoenix.queryserver.kerberos.principal";
public static final String QUERY_SERVER_DNS_NAMESERVER_ATTRIB = "phoenix.queryserver.dns.nameserver";
public static final String QUERY_SERVER_DNS_INTERFACE_ATTRIB = "phoenix.queryserver.dns.interface";
public static final String QUERY_SERVER_HBASE_SECURITY_CONF_ATTRIB = "hbase.security.authentication";

/**
* Get executor service used for parallel scans
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import static org.apache.phoenix.query.QueryServices.USE_BYTE_BASED_REGEX_ATTRIB;
import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;

import java.util.HashSet;
import java.util.Set;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -204,6 +206,20 @@ public class QueryServicesOptions {
public static final boolean DEFAULT_RETURN_SEQUENCE_VALUES = false;
public static final String DEFAULT_EXTRA_JDBC_ARGUMENTS = "";

// QueryServer defaults -- ensure ThinClientUtil is also updated since phoenix-server-client
// doesn't depend on phoenix-core.
public static final String DEFAULT_QUERY_SERVER_SERIALIZATION = "PROTOBUF";
public static final int DEFAULT_QUERY_SERVER_HTTP_PORT = 8765;
@SuppressWarnings("serial")
public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() {
{
add("secret");
add("passwd");
add("password");
add("credential");
}
};

private final Configuration config;

private QueryServicesOptions(Configuration config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
* Utilities for thin clients.
*/
public final class ThinClientUtil {
// The default serialization is also defined in QueryServicesOptions. phoenix-server-client
// currently doesn't depend on phoenix-core so we have to deal with the duplication.
private static final String DEFAULT_SERIALIZATION = "PROTOBUF";

private ThinClientUtil() {}

Expand All @@ -29,7 +32,11 @@ public static String getConnectionUrl(String hostname, int port) {
}

public static String getConnectionUrl(String protocol, String hostname, int port) {
String urlFmt = Driver.CONNECT_STRING_PREFIX + "url=%s://%s:%s";
return String.format(urlFmt, protocol, hostname, port);
return getConnectionUrl(protocol, hostname, port, DEFAULT_SERIALIZATION);
}

public static String getConnectionUrl(String protocol, String hostname, int port, String serialization) {
String urlFmt = Driver.CONNECT_STRING_PREFIX + "url=%s://%s:%s;serialization=%s";
return String.format(urlFmt, protocol, hostname, port, serialization);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.queryserver.client.ThinClientUtil;
import org.apache.phoenix.queryserver.server.Main;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
Expand Down Expand Up @@ -57,7 +57,7 @@ public class QueryServerBasicsIT extends BaseHBaseManagedTimeIT {
@BeforeClass
public static void beforeClass() throws Exception {
CONF = getTestClusterConfig();
CONF.setInt(Main.QUERY_SERVER_HTTP_PORT_KEY, 0);
CONF.setInt(QueryServices.QUERY_SERVER_HTTP_PORT_ATTRIB, 0);
String url = getUrl();
AVATICA_SERVER = new QueryServerThread(new String[] { url }, CONF,
QueryServerBasicsIT.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

import com.google.common.annotations.VisibleForTesting;
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.avatica.remote.Driver;
import org.apache.calcite.avatica.remote.LocalService;
import org.apache.calcite.avatica.remote.Service;
import org.apache.calcite.avatica.server.AvaticaHandler;
import org.apache.calcite.avatica.server.HandlerFactory;
import org.apache.calcite.avatica.server.HttpServer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -33,6 +34,9 @@
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.eclipse.jetty.server.Handler;

import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
Expand All @@ -48,36 +52,8 @@
*/
public final class Main extends Configured implements Tool, Runnable {

public static final String QUERY_SERVER_META_FACTORY_KEY =
"phoenix.queryserver.metafactory.class";

public static final String QUERY_SERVER_HTTP_PORT_KEY =
"phoenix.queryserver.http.port";
public static final int DEFAULT_HTTP_PORT = 8765;

public static final String QUERY_SERVER_ENV_LOGGING_KEY =
"phoenix.queryserver.envvars.logging.disabled";
public static final String QUERY_SERVER_ENV_LOGGING_SKIPWORDS_KEY =
"phoenix.queryserver.envvars.logging.skipwords";

public static final String KEYTAB_FILENAME_KEY = "phoenix.queryserver.keytab.file";
public static final String KERBEROS_PRINCIPAL_KEY = "phoenix.queryserver.kerberos.principal";
public static final String DNS_NAMESERVER_KEY = "phoenix.queryserver.dns.nameserver";
public static final String DNS_INTERFACE_KEY = "phoenix.queryserver.dns.interface";
public static final String HBASE_SECURITY_CONF_KEY = "hbase.security.authentication";

protected static final Log LOG = LogFactory.getLog(Main.class);

@SuppressWarnings("serial")
private static final Set<String> DEFAULT_SKIP_WORDS = new HashSet<String>() {
{
add("secret");
add("passwd");
add("password");
add("credential");
}
};

private final String[] argv;
private final CountDownLatch runningLatch = new CountDownLatch(1);
private HttpServer server = null;
Expand Down Expand Up @@ -107,10 +83,10 @@ public static void logJVMInfo() {
*/
public static void logProcessInfo(Configuration conf) {
// log environment variables unless asked not to
if (conf == null || !conf.getBoolean(QUERY_SERVER_ENV_LOGGING_KEY, false)) {
Set<String> skipWords = new HashSet<String>(DEFAULT_SKIP_WORDS);
if (conf == null || !conf.getBoolean(QueryServices.QUERY_SERVER_ENV_LOGGING_ATTRIB, false)) {
Set<String> skipWords = new HashSet<String>(QueryServicesOptions.DEFAULT_QUERY_SERVER_SKIP_WORDS);
if (conf != null) {
String[] confSkipWords = conf.getStrings(QUERY_SERVER_ENV_LOGGING_SKIPWORDS_KEY);
String[] confSkipWords = conf.getStrings(QueryServices.QUERY_SERVER_ENV_LOGGING_SKIPWORDS_ATTRIB);
if (confSkipWords != null) {
skipWords.addAll(Arrays.asList(confSkipWords));
}
Expand Down Expand Up @@ -183,26 +159,29 @@ public int run(String[] args) throws Exception {
logProcessInfo(getConf());
try {
// handle secure cluster credentials
if ("kerberos".equalsIgnoreCase(getConf().get(HBASE_SECURITY_CONF_KEY))) {
if ("kerberos".equalsIgnoreCase(getConf().get(QueryServices.QUERY_SERVER_HBASE_SECURITY_CONF_ATTRIB))) {
String hostname = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
getConf().get(DNS_INTERFACE_KEY, "default"),
getConf().get(DNS_NAMESERVER_KEY, "default")));
getConf().get(QueryServices.QUERY_SERVER_DNS_INTERFACE_ATTRIB, "default"),
getConf().get(QueryServices.QUERY_SERVER_DNS_NAMESERVER_ATTRIB, "default")));
if (LOG.isDebugEnabled()) {
LOG.debug("Login to " + hostname + " using " + getConf().get(KEYTAB_FILENAME_KEY)
+ " and principal " + getConf().get(KERBEROS_PRINCIPAL_KEY) + ".");
LOG.debug("Login to " + hostname + " using " + getConf().get(QueryServices.QUERY_SERVER_KEYTAB_FILENAME_ATTRIB)
+ " and principal " + getConf().get(QueryServices.QUERY_SERVER_KERBEROS_PRINCIPAL_ATTRIB) + ".");
}
SecurityUtil.login(getConf(), KEYTAB_FILENAME_KEY, KERBEROS_PRINCIPAL_KEY, hostname);
SecurityUtil.login(getConf(), QueryServices.QUERY_SERVER_KEYTAB_FILENAME_ATTRIB,
QueryServices.QUERY_SERVER_KERBEROS_PRINCIPAL_ATTRIB, hostname);
LOG.info("Login successful.");
}
Class<? extends PhoenixMetaFactory> factoryClass = getConf().getClass(
QUERY_SERVER_META_FACTORY_KEY, PhoenixMetaFactoryImpl.class, PhoenixMetaFactory.class);
int port = getConf().getInt(QUERY_SERVER_HTTP_PORT_KEY, DEFAULT_HTTP_PORT);
QueryServices.QUERY_SERVER_META_FACTORY_ATTRIB, PhoenixMetaFactoryImpl.class, PhoenixMetaFactory.class);
int port = getConf().getInt(QueryServices.QUERY_SERVER_HTTP_PORT_ATTRIB,
QueryServicesOptions.DEFAULT_QUERY_SERVER_HTTP_PORT);
LOG.debug("Listening on port " + port);
PhoenixMetaFactory factory =
factoryClass.getDeclaredConstructor(Configuration.class).newInstance(getConf());
Meta meta = factory.create(Arrays.asList(args));
final HandlerFactory handlerFactory = new HandlerFactory();
Service service = new LocalService(meta);
server = new HttpServer(port, new AvaticaHandler(service));
server = new HttpServer(port, getHandler(getConf(), service, handlerFactory));
server.start();
runningLatch.countDown();
server.join();
Expand All @@ -214,6 +193,34 @@ public int run(String[] args) throws Exception {
}
}

/**
* Instantiates the Handler for use by the Avatica (Jetty) server.
*
* @param conf The configuration
* @param service The Avatica Service implementation
* @param handlerFactory Factory used for creating a Handler
* @return The Handler to use based on the configuration.
*/
Handler getHandler(Configuration conf, Service service, HandlerFactory handlerFactory) {
String serializationName = conf.get(QueryServices.QUERY_SERVER_SERIALIZATION_ATTRIB,
QueryServicesOptions.DEFAULT_QUERY_SERVER_SERIALIZATION);

Driver.Serialization serialization;
// Otherwise, use what was provided in the configuration
try {
serialization = Driver.Serialization.valueOf(serializationName);
} catch (Exception e) {
LOG.error("Unknown message serialization type for " + serializationName);
throw e;
}

Handler handler = handlerFactory.getHandler(service, serialization);

LOG.info("Instantiated " + handler.getClass() + " for QueryServer");

return handler;
}

@Override public void run() {
try {
retCode = run(argv);
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
<collections.version>3.2.1</collections.version>
<jodatime.version>2.7</jodatime.version>
<joni.version>2.1.2</joni.version>
<calcite.version>1.3.0-incubating</calcite.version>
<calcite.version>1.5.0</calcite.version>
<jettyVersion>8.1.7.v20120910</jettyVersion>

<!-- Test Dependencies -->
Expand Down

0 comments on commit 5a18fa1

Please sign in to comment.