Skip to content

Commit

Permalink
Refactored the gateway infrastructure to use the sensei plugin framework
Browse files Browse the repository at this point in the history
  • Loading branch information
vzhabiuk committed Dec 13, 2011
1 parent 5abece4 commit 879891e
Show file tree
Hide file tree
Showing 19 changed files with 83 additions and 156 deletions.
5 changes: 2 additions & 3 deletions example/cars/conf/sensei.properties
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ sensei.index.directory = /tmp/sensei-example-cars

# gateway parameters

sensei.gateway.type=file
sensei.gateway.file.path = example/cars/data/cars.json

sensei.gateway.class=com.sensei.indexing.api.gateway.file.LinedFileDataProviderBuilder
sensei.gateway.path = example/cars/data/cars.json
# index manager parameters

sensei.index.manager.default.maxpartition.id = 1
Expand Down
Binary file not shown.
8 changes: 4 additions & 4 deletions example/tweets/conf/sensei.properties
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ sensei.index.freshness = 1000
# gateway parameters

#sensei.gateway.filter
sensei.gateway.type=custom
sensei.gateway.custom.class = com.senseidb.example.tweets.gateway.TwitterSampleGateway
sensei.gateway.custom.username =
sensei.gateway.custom.password =
sensei.gateway.class = com.senseidb.example.tweets.gateway.TwitterSampleGateway
sensei.gateway.username = vzhabiuk
sensei.gateway.password = <Zhaba1>

# index manager parameters

Expand All @@ -35,3 +34,4 @@ sensei.index.manager.default.maxpartition.id = 1
sensei.broker.port = 8080



Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.util.Comparator;

import org.apache.commons.configuration.Configuration;
import org.json.JSONObject;

import proj.zoie.impl.indexing.StreamDataProvider;
Expand All @@ -12,14 +11,12 @@

public class TwitterSampleGateway extends SenseiGateway<JSONObject> {

public TwitterSampleGateway(Configuration conf) {
super(conf);
}


@Override
public StreamDataProvider<JSONObject> buildDataProvider(
DataSourceFilter<JSONObject> dataFilter, String oldSinceKey) throws Exception {
return new TwitterSampleStreamer(_conf, SenseiGateway.DEFAULT_VERSION_COMPARATOR);
return new TwitterSampleStreamer(config, SenseiGateway.DEFAULT_VERSION_COMPARATOR);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.json.JSONObject;

import proj.zoie.api.DataConsumer.DataEvent;
import proj.zoie.impl.indexing.StreamDataProvider;

public class TwitterSampleStreamer extends StreamDataProvider<JSONObject> {

private static Logger logger = Logger.getLogger(TwitterSampleStreamer.class);
// following hashtag extraction logic taken from twitter-text-java: https://github.com/twitter/twitter-text-java
private static String LATIN_ACCENTS_CHARS = "\\u00c0-\\u00d6\\u00d8-\\u00f6\\u00f8-\\u00ff\\u015f";

private static final String HASHTAG_ALPHA_CHARS = "a-z" + LATIN_ACCENTS_CHARS +
"\\u0400-\\u04ff\\u0500-\\u0527" + // Cyrillic
"\\u2de0–\\u2dff\\ua640–\\ua69f" + // Cyrillic Extended A/B
Expand All @@ -41,7 +41,7 @@ public class TwitterSampleStreamer extends StreamDataProvider<JSONObject> {

public static final Pattern AUTO_LINK_HASHTAGS = Pattern.compile("(^|[^&/" + HASHTAG_ALPHA_NUMERIC_CHARS + "])(#|\uFF03)(" + HASHTAG_ALPHA_NUMERIC + "*" + HASHTAG_ALPHA + HASHTAG_ALPHA_NUMERIC + "*)", Pattern.CASE_INSENSITIVE);
public static final Pattern HASHTAG_MATCH_END = Pattern.compile("^(?:[##]|://)");

private static List<String> extractHashtags(String text) {
List<String> extracted = new ArrayList<String>();
Matcher matcher = AUTO_LINK_HASHTAGS.matcher(text);
Expand All @@ -52,29 +52,29 @@ private static List<String> extractHashtags(String text) {
}
}
return extracted;

}

private final BufferedReader _tweetReader;
public TwitterSampleStreamer(Configuration conf,Comparator<String> versionComparator) throws Exception {

public TwitterSampleStreamer(Map<String, String> config,Comparator<String> versionComparator) throws Exception {
super(versionComparator);
String username = conf.getString("username");
String password = conf.getString("password");
String username = config.get("username");
String password = config.get("password");

URL url = new URL("https://stream.twitter.com/1/statuses/sample.json");
URLConnection uc = url.openConnection();

String userPassword = username+":"+password;

String encoding = new sun.misc.BASE64Encoder().encode (userPassword.getBytes());
uc.setRequestProperty ("Authorization", "Basic " + encoding);


InputStream in = uc.getInputStream();
_tweetReader =new BufferedReader(new InputStreamReader(in,"UTF-8"));
}

@Override
public DataEvent<JSONObject> next() {
DataEvent<JSONObject> tweetEvent = null;
Expand All @@ -86,17 +86,17 @@ public DataEvent<JSONObject> next() {
if (id!=null){
JSONObject tweetJSON = new JSONObject();
tweetJSON.put("id", Long.parseLong(id));

String textString = jsonObj.optString("text","");

long time = new SimpleDateFormat("EEE MMM dd HH:mm:ss Z yyyy").parse(jsonObj.getString("created_at")).getTime();
tweetJSON.put("time", time);
JSONObject user = jsonObj.optJSONObject("user");
String screenName = user.optString("screen_name","");
tweetJSON.put("tweeter", screenName);

List<String> hashtags = extractHashtags(textString);

StringBuilder contentBuilder = new StringBuilder();
contentBuilder.append(textString).append("\n");
contentBuilder.append(screenName).append("\n");
Expand All @@ -115,9 +115,9 @@ public DataEvent<JSONObject> next() {
}
tweetJSON.put("hashtags", buf.toString());
}

tweetJSON.put("contents", contentBuilder.toString());

tweetJSON.put("tweet", jsonObj);
tweetEvent = new DataEvent<JSONObject>(tweetJSON,String.valueOf(time));
if (logger.isDebugEnabled()){
Expand All @@ -134,12 +134,12 @@ public DataEvent<JSONObject> next() {

@Override
public void setStartingOffset(String version) {

}

@Override
public void reset() {

}

}
33 changes: 5 additions & 28 deletions sensei-core/src/main/java/com/sensei/conf/SenseiServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -66,7 +65,6 @@
import com.sensei.indexing.api.DefaultStreamingIndexingManager;
import com.sensei.indexing.api.SenseiIndexPruner;
import com.sensei.indexing.api.gateway.SenseiGateway;
import com.sensei.indexing.api.gateway.SenseiGatewayRegistry;
import com.sensei.plugin.SenseiPluginRegistry;
import com.sensei.search.client.servlet.DefaultSenseiJSONServlet;
import com.sensei.search.client.servlet.SenseiConfigServletContextListener;
Expand Down Expand Up @@ -107,7 +105,7 @@ public class SenseiServerBuilder implements SenseiConfParams{
private final SenseiSchema _senseiSchema;
private final Server _jettyServer;
private final SenseiGateway _gateway;

static final String SENSEI_CONTEXT_PATH = "sensei";

private final static Map<String,TimeUnit> TIMEUNIT_MAP = new HashMap<String,TimeUnit>();
Expand Down Expand Up @@ -137,7 +135,7 @@ public ClusterClient buildClusterClient()
private static NetworkServer buildNetworkServer(Configuration conf,ClusterClient clusterClient){
NetworkServerConfig networkConfig = new NetworkServerConfig();
networkConfig.setClusterClient(clusterClient);

networkConfig.setRequestThreadCorePoolSize(conf.getInt(SERVER_REQ_THREAD_POOL_SIZE, 20));
networkConfig.setRequestThreadMaxPoolSize(conf.getInt(SERVER_REQ_THREAD_POOL_MAXSIZE,70));
networkConfig.setRequestThreadKeepAliveTimeSecs(conf.getInt(SERVER_REQ_THREAD_POOL_KEEPALIVE,300));
Expand All @@ -155,7 +153,7 @@ private static NetworkServer buildNetworkServer(Configuration conf,ClusterClient

public Server buildHttpRestServer() throws Exception{
int port = _senseiConf.getInt(SERVER_BROKER_PORT);

String webappPath = _senseiConf.getString(SERVER_BROKER_WEBAPP_PATH,"sensei-core/src/main/webapp");


Expand Down Expand Up @@ -305,30 +303,9 @@ public static int[] buildPartitions(String[] partitionArray) throws Configuratio
}

private SenseiGateway constructGateway(Configuration conf) throws ConfigurationException{
Configuration subConf = conf.subset(SENSEI_GATEWAY);
Class gatewayClass = null;
try{
String type = subConf.getString("type");

Configuration myConf = subConf.subset(type);
if ("custom".equals(type)){
String clz = myConf.getString("class");
gatewayClass = Class.forName(clz);
}
else{
gatewayClass = SenseiGatewayRegistry.getGatewayClass(type);
}
if (gatewayClass==null){
throw new ConfigurationException("unsupported provider type: "+type);
}

Constructor constructor = gatewayClass.getConstructor(Configuration.class);
SenseiGateway gateway = (SenseiGateway)(constructor.newInstance(myConf));
SenseiGateway gateway = pluginRegistry.getBeanByFullPrefix(SENSEI_GATEWAY, SenseiGateway.class);
return gateway;
}
catch(Exception e){
throw new ConfigurationException(e.getMessage(),e);
}

}

public SenseiCore buildCore() throws ConfigurationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,21 @@

import java.util.Comparator;

import org.apache.commons.configuration.Configuration;
import org.json.JSONObject;

import proj.zoie.impl.indexing.StreamDataProvider;
import proj.zoie.impl.indexing.ZoieConfig;

import com.sensei.conf.SenseiSchema;
import com.sensei.indexing.api.DataSourceFilter;
import com.sensei.plugin.AbstractSenseiPlugin;
import com.sensei.plugin.SenseiPluginRegistry;

public abstract class SenseiGateway<V>{
protected Configuration _conf;

protected SenseiPluginRegistry pluginRegistry;
public abstract class SenseiGateway<V> extends AbstractSenseiPlugin {

public static Comparator<String> DEFAULT_VERSION_COMPARATOR = ZoieConfig.DEFAULT_VERSION_COMPARATOR;


public SenseiGateway(Configuration conf){
_conf = conf;
pluginRegistry = SenseiPluginRegistry.get(conf);
}

final public DataSourceFilter<V> getDataSourceFilter(SenseiSchema senseiSchema, SenseiPluginRegistry pluginRegistry){
DataSourceFilter<V> dataSourceFilter = pluginRegistry.getBeanByFullPrefix("sensei.gateway.filter", DataSourceFilter.class);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.io.File;
import java.util.Comparator;

import org.apache.commons.configuration.Configuration;
import org.json.JSONObject;

import proj.zoie.impl.indexing.StreamDataProvider;
Expand All @@ -15,18 +14,14 @@

public class LinedFileDataProviderBuilder extends SenseiGateway<String>{

public static final String name = "file";
private Comparator<String> _versionComparator;
public LinedFileDataProviderBuilder(Configuration conf){
super(conf);
_versionComparator = ZoieConfig.DEFAULT_VERSION_COMPARATOR;
}
private Comparator<String> _versionComparator = ZoieConfig.DEFAULT_VERSION_COMPARATOR;


@Override
public StreamDataProvider<JSONObject> buildDataProvider(DataSourceFilter<String> dataFilter,
String oldSinceKey) throws Exception{

String path = _conf.getString("path");
String path = config.get("path");
long offset = oldSinceKey == null ? 0L : Long.parseLong(oldSinceKey);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import javax.naming.ConfigurationException;

import org.apache.commons.configuration.Configuration;
import org.json.JSONObject;

import proj.zoie.api.DataConsumer.DataEvent;
Expand All @@ -24,25 +23,24 @@

public class JdbcDataProviderBuilder extends SenseiGateway<ResultSet>{

public static final String name = "jdbc";
private SenseiPluginRegistry pluginRegistry;
private Comparator<String> _versionComparator;

public JdbcDataProviderBuilder(Configuration conf) throws Exception{
super(conf);
pluginRegistry = SenseiPluginRegistry.get(conf);
_versionComparator = pluginRegistry.getBeanByName("versionComparator", Comparator.class);
@Override
public void start() {
_versionComparator = pluginRegistry.getBeanByName("versionComparator", Comparator.class);
}


@Override
public StreamDataProvider<JSONObject> buildDataProvider(final DataSourceFilter<ResultSet> dataFilter,
String oldSinceKey) throws Exception{

final String url = _conf.getString("url");
final String username = _conf.getString("username",null);
final String password = _conf.getString("password",null);
final String driver = _conf.getString("driver");
final String adaptor = _conf.getString("adaptor");
final String url = config.get("url");
final String username = config.get("username");
final String password = config.get("password");
final String driver = config.get("driver");
final String adaptor = config.get("adaptor");

final SenseiJDBCAdaptor senseiAdaptor = pluginRegistry.getBeanByFullPrefix("jdbc.adaptor", SenseiJDBCAdaptor.class);
if (senseiAdaptor==null){
Expand Down
Loading

0 comments on commit 879891e

Please sign in to comment.