diff --git a/pom.xml b/pom.xml index 93ed10a2..4bff037a 100644 --- a/pom.xml +++ b/pom.xml @@ -241,6 +241,12 @@ 2.10.0 + + com.netflix.astyanax + astyanax + 1.56.48 + + diff --git a/src/com/comcast/cmb/common/controller/CMBControllerServlet.java b/src/com/comcast/cmb/common/controller/CMBControllerServlet.java index ba89b9ec..8d52dd38 100644 --- a/src/com/comcast/cmb/common/controller/CMBControllerServlet.java +++ b/src/com/comcast/cmb/common/controller/CMBControllerServlet.java @@ -51,7 +51,6 @@ import java.util.ArrayList; import java.util.Date; import java.util.Enumeration; -import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; diff --git a/src/com/comcast/cmb/common/persistence/AbstractCassandraPersistence.java b/src/com/comcast/cmb/common/persistence/AbstractCassandraPersistence.java index 79a727de..dc279a26 100644 --- a/src/com/comcast/cmb/common/persistence/AbstractCassandraPersistence.java +++ b/src/com/comcast/cmb/common/persistence/AbstractCassandraPersistence.java @@ -3,23 +3,57 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Random; import com.comcast.cmb.common.util.CMBProperties; -import me.prettyprint.cassandra.service.template.ColumnFamilyTemplate; -import me.prettyprint.cassandra.service.template.SuperCfTemplate; -import me.prettyprint.hector.api.HConsistencyLevel; -import me.prettyprint.hector.api.Keyspace; -import me.prettyprint.hector.api.Serializer; -import me.prettyprint.hector.api.beans.ColumnSlice; -import me.prettyprint.hector.api.beans.HSuperColumn; -import me.prettyprint.hector.api.beans.Row; -import me.prettyprint.hector.api.beans.SuperSlice; -import me.prettyprint.hector.api.exceptions.HectorException; -import me.prettyprint.hector.api.mutation.MutationResult; - public abstract class AbstractCassandraPersistence { + protected static Random rand = new Random(); + + public static enum CMB_SERIALIZER { STRING_SERIALIZER, COMPOSITE_SERIALIZER }; + + public static class CmbColumn { + public CmbColumn(N name, V value) { + this.name = name; + this.value = value; + } + public N name; + public V value; + } + + public static class CmbRow { + public CmbRow(K key, List> row) { + this.key = key; + this.row = row; + } + public K key; + public List> row; + } + + public static class CmbColumnSlice { + public CmbColumnSlice(List> slice) { + this.slice = slice; + } + public List> slice; + } + + public static class CmbSuperColumnSlice { + public CmbSuperColumnSlice(List> slice) { + this.slice = slice; + } + public List> slice; + } + + public static class CmbSuperColumn { + public CmbSuperColumn(SN superName, List> columns) { + this.superName = superName; + this.columns = columns; + } + public SN superName; + public List> columns; + } + public static final String CLUSTER_NAME = CMBProperties.getInstance().getClusterName(); public static final String CLUSTER_URL = CMBProperties.getInstance().getClusterUrl(); @@ -44,606 +78,120 @@ public static long getTimestampFromHash(long t){ t = t >> 21; return t; } + + public java.util.UUID getTimeUUID(long timeMillis) throws InterruptedException { + return new java.util.UUID(newTime(timeMillis, false), com.eaio.uuid.UUIDGen.getClockSeqAndNode()); + } + + public java.util.UUID getUniqueTimeUUID(long millis) { + return new java.util.UUID(com.eaio.uuid.UUIDGen.createTime(millis), com.eaio.uuid.UUIDGen.getClockSeqAndNode()); + } + + public long getTimeLong(long timeMillis) throws InterruptedException { + long newTime = timeMillis * 1000000000 + (System.nanoTime() % 1000000) * 1000 + rand.nextInt(999999); + return newTime; + } public abstract boolean isAlive(); - public abstract Keyspace getKeySpace(HConsistencyLevel consistencyLevel); - - /** - * Update single key value pair in column family. - * - * @param template - * column family - * @param key - * row key - * @param column - * column name - * @param value - * value K - type of row-key N - type of column name V - type of - * column value - * @throws HectorException - */ - public abstract void update(String columnFamily, K key, N column, V value, Serializer keySerializer, Serializer nameSerializer, Serializer valueSerializer, HConsistencyLevel level) - throws HectorException; + public abstract void update(String keyspace, String columnFamily, K key, N column, V value, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER nameSerializer, CMB_SERIALIZER valueSerializer) throws Exception; - /** - * Insert single key value pair for a super column. - * - * @param columnFamily - * column family - * @param key - * row key - * @param keySerializer - * The serializer for the key - * @param superName - * super column name - * @param ttl - * The time to live - * @param superNameSerializer - * serializer for the super column name - * @param subColumnNameValues - * name, value pair for the sub columns - * @param columnSerializer - * the serializer for sub column name - * @param valueSerializer - * the serializer for sub column value K - type of row-key SN - - * type of super column name N - type of sub column name V - type - * of column value - * @throws HectorException - */ - public abstract MutationResult insertSuperColumn( - String columnFamily, K key, Serializer keySerializer, - SN superName, Integer ttl, Serializer superNameSerializer, - Map subColumnNameValues, Serializer columnSerializer, - Serializer valueSerializer, HConsistencyLevel level) - throws HectorException; + public abstract void insertSuperColumn(String keyspace, String columnFamily, K key, CMB_SERIALIZER keySerializer, SN superName, Integer ttl, + CMB_SERIALIZER superNameSerializer, Map subColumnNameValues, CMB_SERIALIZER columnSerializer, + CMB_SERIALIZER valueSerializer) throws Exception; - /** - * Insert a map of key value pairs for a super column. - * - * @param columnFamily - * column family - * @param key - * row key - * @param keySerializer - * The serializer for the key - * @param superNameSubColumnsMap - * A map of super column names as the key and a map of sub-column - * name values as the value - * @param ttl - * The time to live - * @param superNameSerializer - * serializer for the super column name - * @param subColumnNameValues - * name, value pair for the sub columns - * @param columnSerializer - * the serializer for sub column name - * @param valueSerializer - * the serializer for sub column value K - type of row-key SN - - * type of super column name N - type of sub column name V - type - * of column value - * @throws HectorException - */ - public abstract MutationResult insertSuperColumns( - String columnFamily, K key, Serializer keySerializer, + public abstract void insertSuperColumns( + String keyspace, String columnFamily, K key, CMB_SERIALIZER keySerializer, Map> superNameSubColumnsMap, int ttl, - Serializer superNameSerializer, Serializer columnSerializer, - Serializer valueSerializer, HConsistencyLevel level) - throws HectorException; - - /** - * Read single value from column family. - * - * @param template - * column family - * @param key - * row key - * @param column - * column name - * @return value of type T N - type of column name V - type of column value - * Note: Assumed the row key is a string - * @throws HectorException - * - */ - public abstract V read(ColumnFamilyTemplate template, - String key, N column, V returnType) throws HectorException; - - /** - * Read next numRows rows with a maximum of 100 columns per row. - * - * @param columnFamily - * column family - * @param lastKey - * last key read before or null - * @param numRows - * maximum number of rows to read - * @param keySerializer - * @param columnNameSerializer - * @param valueSerializer - * @param level - * consistency level - * @return list of rows K - type of row-key N - type of column name V - type - * of column value - */ - public abstract List> readNextNRows( - String columnFamily, K lastKey, int numRows, - Serializer keySerializer, Serializer columnNameSerializer, - Serializer valueSerializer, HConsistencyLevel level); - - /** - * Read next numRows rows with a maximum of numCols columns per row. - * - * @param columnFamily - * column family - * @param lastKey - * last key read before or null - * @param numRows - * maximum number of rows to read - * @param numCols - * maximum number of columns to read - * @param keySerializer - * @param columnNameSerializer - * @param valueSerializer - * @param level - * consistency level - * @return list of rows K - type of row-key N - type of column name V - type - * of column value - */ - public abstract List> readNextNRows( - String columnFamily, K lastKey, int numRows, int numCols, - Serializer keySerializer, Serializer columnNameSerializer, - Serializer valueSerializer, HConsistencyLevel level); - - /** - * Read next numRows rows with a maximum of numCols columns per row. Ensure - * all returned rows are non-empty rows. - * - * @param columnFamily - * column family - * @param lastKey - * last key read before or null - * @param numRows - * maximum number of rows to read - * @param numCols - * maximum number of columns to read - * @param keySerializer - * @param columnNameSerializer - * @param valueSerializer - * @param level - * consistency level - * @return list of rows K - type of row-key N - type of column name V - type - * of column value - */ - public abstract List> readNextNNonEmptyRows( - String columnFamily, K lastKey, int numRows, int numCols, - Serializer keySerializer, Serializer columnNameSerializer, - Serializer valueSerializer, HConsistencyLevel level); - - /** - * Read next numRows rows obeying single where clause. - * - * @param columnFamily - * column family - * @param lastKey - * last key read before or null - * @param whereColumn - * where clause column - * @param whereValue - * where clause value - * @param numRows - * maximum number of rows - * @param numCols - * maximum number of columns - * @param keySerializer - * @param columnNameSerializer - * @param valueSerializer - * @param level - * consistency level - * @return list of rows K - type of row-key N - type of column name V - type - * of column value - */ - public abstract List> readNextNRows( - String columnFamily, K lastKey, N whereColumn, V whereValue, - int numRows, int numCols, Serializer keySerializer, - Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level); - - /** - * Read next numRows rows obeying complex where clause. - * - * @param columnFamily - * column family - * @param lastKey - * last key read before or null - * @param columnValues - * hash map with key value pairs for where clause - * @param numRows - * maximum number of rows to read - * @param numCols - * maximum number of columns to read - * @param keySerializer - * @param columnNameSerializer - * @param valueSerializer - * @param level - * consistency level - * @return list of rows K - type of row-key N - type of column name V - type - * of column value - */ - public abstract List> readNextNRows( - String columnFamily, K lastKey, Map columnValues, - int numRows, int numCols, Serializer keySerializer, - Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level); + CMB_SERIALIZER superNameSerializer, CMB_SERIALIZER columnSerializer, + CMB_SERIALIZER valueSerializer) throws Exception; + + public abstract List> readNextNNonEmptyRows( + String keyspace, String columnFamily, K lastKey, int numRows, int numCols, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER columnNameSerializer, + CMB_SERIALIZER valueSerializer) throws Exception; + + public abstract List> readNextNRows( + String keyspace, String columnFamily, K lastKey, int numRows, int numCols, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER columnNameSerializer, + CMB_SERIALIZER valueSerializer) throws Exception; + + public abstract List> readNextNRows( + String keyspace, String columnFamily, K lastKey, N whereColumn, V whereValue, + int numRows, int numCols, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer) throws Exception; + + public abstract List> readNextNRows( + String keyspace, String columnFamily, K lastKey, Map columnValues, + int numRows, int numCols, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer) throws Exception; + + public abstract CmbColumnSlice readColumnSlice( + String keyspace, String columnFamily, K key, N firstColumnName, N lastColumnName, + int numCols, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer) throws Exception; + + public abstract CmbSuperColumnSlice readRowFromSuperColumnFamily( + String keyspace, String columnFamily, K key, SN firstColumnName, SN lastColumnName, + int numCols, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER superNameSerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer) throws Exception; + + public abstract CmbSuperColumn readColumnFromSuperColumnFamily( + String keyspace, String columnFamily, K key, SN columnName, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER superNameSerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer) throws Exception; + + public abstract List> readMultipleColumnsFromSuperColumnFamily( + String keyspace, String columnFamily, Collection keys, + Collection columnNames, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER superNameSerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer) throws Exception; + + public abstract List> readColumnsFromSuperColumnFamily( + String keyspace, String columnFamily, K key, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER superNameSerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer, + SN firstCol, SN lastCol, int numCol) throws Exception; + + public abstract void insertRow(K rowKey, + String keyspace, String columnFamily, Map columnValues, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER nameSerializer, + CMB_SERIALIZER valueSerializer, Integer ttl) throws Exception; + + public abstract void insertRows( + String keyspace, Map> rowColumnValues, String columnFamily, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER nameSerializer, + CMB_SERIALIZER valueSerializer, Integer ttl) throws Exception; + + public abstract void delete(String keyspace, String columnFamily, K key, N column, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER columnSerializer) throws Exception; + + public abstract void deleteBatch(String keyspace, String columnFamily, + List keyList, List columnList, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnSerializer) throws Exception; - /** - * Read single row by row key. - * - * @param columnFamily - * column family - * @param key - * row key - * @param numCols - * maximum number of columns - * @param keySerializer - * @param columnNameSerializer - * @param valueSerializer - * @param level - * consistency level - * @return list of rows K - type of row-key N - type of column name V - type - * of column value - * @return row Will get row starting from the beginning - */ - public abstract ColumnSlice readColumnSlice( - String columnFamily, K key, int numCols, - Serializer keySerializer, Serializer columnNameSerializer, - Serializer valueSerializer, HConsistencyLevel level); - - /** - * Read single row by row key. - * - * @param columnFamily - * column family - * @param key - * row key - * @param firstColumnName - * the beginning of the slice column - * @param lastColumnName - * the end of the slice column - * @param numCols - * maximum number of columns - * @param keySerializer - * @param columnNameSerializer - * @param valueSerializer - * @param level - * consistency level - * @return list of rows K - type of row-key N - type of column name V - type - * of column value - * @return row - */ - public abstract ColumnSlice readColumnSlice( - String columnFamily, K key, N firstColumnName, N lastColumnName, - int numCols, Serializer keySerializer, - Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level); - - /** - * Read a row slice by row key and super column slice. - * - * @param columnFamily - * column family - * @param key - * row key - * @param firstColumnName - * the beginning of the slice column - * @param lastColumnName - * the end of the slice column - * @param numCols - * maximum number of columns - * @param keySerializer - * @param columnNameSerializer - * @param valueSerializer - * @param level - * consistency level - * @return list of rows K - type of row-key N - type of column name V - type - * of column value - * @return row - */ - public abstract SuperSlice readRowFromSuperColumnFamily( - String columnFamily, K key, SN firstColumnName, SN lastColumnName, - int numCols, Serializer keySerializer, - Serializer superNameSerializer, - Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level); - - /** - * Read a single column by row key using SuperColumnQuery. - * - * @param columnFamily - * column family - * @param key - * row key - * @param columnName - * the column key of the super column - * @param keySerializer - * @param columnNameSerializer - * @param valueSerializer - * @param level - * consistency level - * @return list of rows K - type of row-key N - type of column name V - type - * of column value - * @return row - */ - public abstract HSuperColumn readColumnFromSuperColumnFamily( - String columnFamily, K key, SN columnName, - Serializer keySerializer, Serializer superNameSerializer, - Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level); - - public abstract List> readMultipleColumnsFromSuperColumnFamily( - String columnFamily, Collection keys, - Collection columnNames, Serializer keySerializer, - Serializer superNameSerializer, - Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level); - - /** - * Read multiple supercolumns from a column family given roe-key, and optional first-col, last-col and numCol - * @param - * @param - * @param - * @param - * @param columnFamily - * @param key - * @param keySerializer - * @param superNameSerializer - * @param columnNameSerializer - * @param valueSerializer - * @param level - * @param firstCol The starting of the range - * @param lastCol The end of the range - * @param numCol the number of columns to read - * @return - */ - public abstract List> readColumnsFromSuperColumnFamily( - String columnFamily, K key, Serializer keySerializer, - Serializer superNameSerializer, - Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level, SN firstCol, SN lastCol, int numCol); - - /** - * Insert or update single row. - * - * @param rowKey - * row key - * @param columnFamily - * column family - * @param columnValues - * hash map containing key value pairs for columns to insert or - * update - * @param level - * consistency level - * @return Note: This method assumes a String for key, column-name & value - */ - public abstract MutationResult insertOrUpdateRow(String rowKey, - String columnFamily, Map columnValues, - HConsistencyLevel level); - - /** - * Insert or update single row with ttl. - * - * @param rowKey - * row key - * @param columnFamily - * column family - * @param columnValues - * hash map containing key value pairs for columns to insert or - * update - * @param level - * consistency level - * @param ttl - * time to live - * @return Note: This method assumes a String for key, column-name & value - */ - public abstract MutationResult insertRow(String rowKey, - String columnFamily, Map columnValues, - HConsistencyLevel level, Integer ttl); - - /** - * Insert or update single row with ttl. K is the key type, N is column name - * type and V is the column value type - * - * @param rowKey - * row key - * @param columnFamily - * column family - * @param columnValues - * hash map containing key value pairs for columns to insert or - * update - * @param level - * consistency level - * @param ttl - * time to live - * @return Note: This method assumes a String for key, column-name & value - */ - public abstract MutationResult insertRow(K rowKey, - String columnFamily, Map columnValues, - Serializer keySerializer, Serializer nameSerializer, - Serializer valueSerializer, HConsistencyLevel level, Integer ttl); - - /** - * Insert or update single row with ttl. K is the key type, N is column name - * type and V is the column value type - * - * @param rowColumnValues - * A map containing row keys and a map of column names, column - * values - * @param columnFamily - * column family - * @param level - * consistency level - * @param ttl - * time to live - * @return Note: This method assumes a String for key, column-name & value - */ - public abstract MutationResult insertRows( - Map> rowColumnValues, String columnFamily, - Serializer keySerializer, Serializer nameSerializer, - Serializer valueSerializer, HConsistencyLevel level, Integer ttl); - - /** - * - * @param columnFamily - * @param key - * @param column - * @param keySerializer - * @param columnSerializer - * @param level - * @throws HectorException - */ - public abstract void delete(String columnFamily, K key, N column, - Serializer keySerializer, Serializer columnSerializer, - HConsistencyLevel level) throws HectorException; - - - /** - * Delete single column value or the entire row - * - * @param template - * column family - * @param key - * row key - * @param column - * column name. If column is null, the entire row is deleted - * @throws HectorException - */ - public abstract void deleteBatch(String columnFamily, - List keyList, List columnList, Serializer keySerializer, - HConsistencyLevel level, Serializer columnSerializer) - throws HectorException; - - /** - * Delete single column value or the entire row - * - * @param template - * column family - * @param key - * row key - * @param superColumn - * column name. If column is null, the entire row is deleted - * @throws HectorException - */ public abstract void deleteSuperColumn( - String superColumnFamily, K key, SN superColumn, Serializer keySerializer, Serializer superColumnSerializer, - HConsistencyLevel level) - throws HectorException; - - /** - * Create a unique time based UUID - * - * @param timeMillis - * Time in milli seconds since Jan 1, 1970 - * @throws InterruptedException - */ - public abstract java.util.UUID getTimeUUID(long timeMillis) - throws InterruptedException; - - /** - * Create a unique time based UUID - * - * @param millis - * time - */ - public abstract java.util.UUID getUniqueTimeUUID(long millis); - - /** - * Create a unique time based long value - * - * @param timeMillis - * Time in milli seconds since Jan 1, 1970 - * @throws InterruptedException - */ - public abstract long getTimeLong(long timeMillis) - throws InterruptedException; + String keyspace, String superColumnFamily, K key, SN superColumn, CMB_SERIALIZER keySerializer, CMB_SERIALIZER superColumnSerializer) throws Exception; - /** - * Return the column count. - * - * @param columnFamily - * The name of the column family. - * @param key - * The row key (as a string) - * @param level - * The consistency level of the keyspace. - */ - public abstract int getCount(String columnFamily, K key, - Serializer keySerializer, Serializer columnNameSerializer, - HConsistencyLevel level) throws HectorException; + public abstract int getCount(String keyspace, String columnFamily, K key, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER columnNameSerializer) throws Exception; - /** - * Increment Cassandra counter by incrementBy - * @param - * @param - * @param columnFamily - * @param rowKey - * @param columnName - * @param incrementBy - * @param keySerializer - * @param columnNameSerializer - * @param level - */ - public abstract void incrementCounter(String columnFamily, K rowKey, - String columnName, int incrementBy, Serializer keySerializer, - Serializer columnNameSerializer, HConsistencyLevel level); + public abstract void incrementCounter(String keyspace, String columnFamily, K rowKey, + String columnName, int incrementBy, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnNameSerializer) throws Exception; - /** - * Decrement Cassandra counter by decrementBy - * - * @param - * @param - * @param columnFamily - * @param rowKey - * @param columnName - * @param decrementBy - * @param keySerializer - * @param columnNameSerializer - * @param level - */ - public abstract void decrementCounter(String columnFamily, K rowKey, - String columnName, int decrementBy, Serializer keySerializer, - Serializer columnNameSerializer, HConsistencyLevel level); + public abstract void decrementCounter(String keyspace, String columnFamily, K rowKey, + String columnName, int decrementBy, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnNameSerializer) throws Exception; - /** - * Decrement Cassandra counter by decrementBy - * - * @param - * @param - * @param columnFamily - * @param rowKey - * @param columnName - * @param decrementBy - * @param keySerializer - * @param columnNameSerializer - * @param level - */ - public abstract void deleteCounter(String columnFamily, K rowKey, - N columnName, Serializer keySerializer, - Serializer columnNameSerializer, HConsistencyLevel level); + public abstract void deleteCounter(String keyspace, String columnFamily, K rowKey, + N columnName, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnNameSerializer) throws Exception; - /** - * Return current value of Cassandra counter - * - * @param - * @param - * @param columnFamily - * @param rowKey - * @param columnName - * @param keySerializer - * @param columnNameSerializer - * @param level - * @return - */ - public abstract long getCounter(String columnFamily, K rowKey, - N columnName, Serializer keySerializer, - Serializer columnNameSerializer, HConsistencyLevel level); + public abstract long getCounter(String keyspace, String columnFamily, K rowKey, + N columnName, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnNameSerializer) throws Exception; } \ No newline at end of file diff --git a/src/com/comcast/cmb/common/persistence/CassandraAstyanaxPersistence.java b/src/com/comcast/cmb/common/persistence/CassandraAstyanaxPersistence.java index de7ebe1e..7f96d81d 100644 --- a/src/com/comcast/cmb/common/persistence/CassandraAstyanaxPersistence.java +++ b/src/com/comcast/cmb/common/persistence/CassandraAstyanaxPersistence.java @@ -1,23 +1,53 @@ package com.comcast.cmb.common.persistence; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; -import me.prettyprint.cassandra.service.template.ColumnFamilyTemplate; -import me.prettyprint.cassandra.service.template.SuperCfTemplate; -import me.prettyprint.hector.api.HConsistencyLevel; -import me.prettyprint.hector.api.Keyspace; -import me.prettyprint.hector.api.Serializer; -import me.prettyprint.hector.api.beans.ColumnSlice; -import me.prettyprint.hector.api.beans.HSuperColumn; -import me.prettyprint.hector.api.beans.Row; -import me.prettyprint.hector.api.beans.SuperSlice; -import me.prettyprint.hector.api.exceptions.HectorException; -import me.prettyprint.hector.api.mutation.MutationResult; +import com.comcast.cmb.common.util.CMBProperties; +import com.netflix.astyanax.AstyanaxContext; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.connectionpool.NodeDiscoveryType; +import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl; +import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor; +import com.netflix.astyanax.impl.AstyanaxConfigurationImpl; +import com.netflix.astyanax.thrift.ThriftFamilyFactory; public class CassandraAstyanaxPersistence extends AbstractCassandraPersistence { + + private static Map keyspaces = new HashMap(); + + private void init() { + + List keyspaceNames = new ArrayList(); + keyspaceNames.add(CMBProperties.getInstance().getCMBKeyspace()); + keyspaceNames.add(CMBProperties.getInstance().getCNSKeyspace()); + keyspaceNames.add(CMBProperties.getInstance().getCQSKeyspace()); + + for (String k : keyspaceNames) { + + AstyanaxContext context = new AstyanaxContext.Builder() + .forCluster(CLUSTER_NAME) + .forKeyspace(CMBProperties.getInstance().getCMBKeyspace()) + .withAstyanaxConfiguration(new AstyanaxConfigurationImpl() + .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE) + ) + .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("CMBAstyananxConnectionPool") + //.setPort(9160) + .setMaxConnsPerHost(1) + .setSeeds(AbstractCassandraPersistence.CLUSTER_URL) + ) + .withConnectionPoolMonitor(new CountingConnectionPoolMonitor()) + .buildKeyspace(ThriftFamilyFactory.getInstance()); + + context.start(); + Keyspace keyspace = context.getClient(); + + keyspaces.put(CMBProperties.getInstance().getCMBKeyspace(), keyspace); + } + } @Override public boolean isAlive() { @@ -26,266 +56,209 @@ public boolean isAlive() { } @Override - public Keyspace getKeySpace(HConsistencyLevel consistencyLevel) { - // TODO Auto-generated method stub - return null; - } - - @Override - public void update(String columnFamily, K key, N column, V value, Serializer keySerializer, Serializer nameSerializer, Serializer valueSerializer, HConsistencyLevel level) - throws HectorException { + public void update(String keyspace, String columnFamily, K key, + N column, V value, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER nameSerializer, CMB_SERIALIZER valueSerializer) + throws Exception { // TODO Auto-generated method stub - + } @Override - public MutationResult insertSuperColumn(String columnFamily, - K key, Serializer keySerializer, SN superName, Integer ttl, - Serializer superNameSerializer, Map subColumnNameValues, - Serializer columnSerializer, Serializer valueSerializer, - HConsistencyLevel level) throws HectorException { + public void insertSuperColumn(String keyspace, + String columnFamily, K key, CMB_SERIALIZER keySerializer, + SN superName, Integer ttl, CMB_SERIALIZER superNameSerializer, + Map subColumnNameValues, CMB_SERIALIZER columnSerializer, + CMB_SERIALIZER valueSerializer) throws Exception { // TODO Auto-generated method stub - return null; + } @Override - public MutationResult insertSuperColumns(String columnFamily, - K key, Serializer keySerializer, + public void insertSuperColumns(String keyspace, + String columnFamily, K key, CMB_SERIALIZER keySerializer, Map> superNameSubColumnsMap, int ttl, - Serializer superNameSerializer, Serializer columnSerializer, - Serializer valueSerializer, HConsistencyLevel level) - throws HectorException { + CMB_SERIALIZER superNameSerializer, + CMB_SERIALIZER columnSerializer, CMB_SERIALIZER valueSerializer) + throws Exception { // TODO Auto-generated method stub - return null; + } @Override - public V read(ColumnFamilyTemplate template, String key, - N column, V returnType) throws HectorException { + public List> readNextNNonEmptyRows( + String keyspace, String columnFamily, K lastKey, int numRows, + int numCols, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer) + throws Exception { // TODO Auto-generated method stub return null; } @Override - public List> readNextNRows(String columnFamily, - K lastKey, int numRows, Serializer keySerializer, - Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level) { - // TODO Auto-generated method stub - return null; - } - - @Override - public List> readNextNRows(String columnFamily, - K lastKey, int numRows, int numCols, Serializer keySerializer, - Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level) { - // TODO Auto-generated method stub - return null; - } - - @Override - public List> readNextNNonEmptyRows( + public List> readNextNRows(String keyspace, String columnFamily, K lastKey, int numRows, int numCols, - Serializer keySerializer, Serializer columnNameSerializer, - Serializer valueSerializer, HConsistencyLevel level) { - // TODO Auto-generated method stub - return null; - } - - @Override - public List> readNextNRows(String columnFamily, - K lastKey, N whereColumn, V whereValue, int numRows, int numCols, - Serializer keySerializer, Serializer columnNameSerializer, - Serializer valueSerializer, HConsistencyLevel level) { - // TODO Auto-generated method stub - return null; - } - - @Override - public List> readNextNRows(String columnFamily, - K lastKey, Map columnValues, int numRows, int numCols, - Serializer keySerializer, Serializer columnNameSerializer, - Serializer valueSerializer, HConsistencyLevel level) { + CMB_SERIALIZER keySerializer, CMB_SERIALIZER columnNameSerializer, + CMB_SERIALIZER valueSerializer) throws Exception { // TODO Auto-generated method stub return null; } @Override - public ColumnSlice readColumnSlice(String columnFamily, - K key, int numCols, Serializer keySerializer, - Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level) { + public List> readNextNRows(String keyspace, + String columnFamily, K lastKey, N whereColumn, V whereValue, + int numRows, int numCols, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer) + throws Exception { // TODO Auto-generated method stub return null; } @Override - public ColumnSlice readColumnSlice(String columnFamily, - K key, N firstColumnName, N lastColumnName, int numCols, - Serializer keySerializer, Serializer columnNameSerializer, - Serializer valueSerializer, HConsistencyLevel level) { + public List> readNextNRows(String keyspace, + String columnFamily, K lastKey, Map columnValues, + int numRows, int numCols, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer) + throws Exception { // TODO Auto-generated method stub return null; } @Override - public SuperSlice readRowFromSuperColumnFamily( - String columnFamily, K key, SN firstColumnName, SN lastColumnName, - int numCols, Serializer keySerializer, - Serializer superNameSerializer, - Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level) { + public CmbColumnSlice readColumnSlice(String keyspace, + String columnFamily, K key, N firstColumnName, N lastColumnName, + int numCols, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer) + throws Exception { // TODO Auto-generated method stub return null; } @Override - public HSuperColumn readColumnFromSuperColumnFamily( - String columnFamily, K key, SN columnName, - Serializer keySerializer, Serializer superNameSerializer, - Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level) { + public CmbSuperColumnSlice readRowFromSuperColumnFamily( + String keyspace, String columnFamily, K key, SN firstColumnName, + SN lastColumnName, int numCols, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER superNameSerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer) + throws Exception { // TODO Auto-generated method stub return null; } @Override - public List> readMultipleColumnsFromSuperColumnFamily( - String columnFamily, Collection keys, - Collection columnNames, Serializer keySerializer, - Serializer superNameSerializer, - Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level) { + public CmbSuperColumn readColumnFromSuperColumnFamily( + String keyspace, String columnFamily, K key, SN columnName, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER superNameSerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer) + throws Exception { // TODO Auto-generated method stub return null; } @Override - public List> readColumnsFromSuperColumnFamily( - String columnFamily, K key, Serializer keySerializer, - Serializer superNameSerializer, - Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level, SN firstCol, SN lastCol, int numCol) { + public List> readMultipleColumnsFromSuperColumnFamily( + String keyspace, String columnFamily, Collection keys, + Collection columnNames, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER superNameSerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer) + throws Exception { // TODO Auto-generated method stub return null; } @Override - public MutationResult insertOrUpdateRow(String rowKey, String columnFamily, - Map columnValues, HConsistencyLevel level) { + public List> readColumnsFromSuperColumnFamily( + String keyspace, String columnFamily, K key, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER superNameSerializer, + CMB_SERIALIZER columnNameSerializer, + CMB_SERIALIZER valueSerializer, SN firstCol, SN lastCol, int numCol) + throws Exception { // TODO Auto-generated method stub return null; } @Override - public MutationResult insertRow(String rowKey, String columnFamily, - Map columnValues, HConsistencyLevel level, - Integer ttl) { + public void insertRow(K rowKey, String keyspace, + String columnFamily, Map columnValues, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER nameSerializer, + CMB_SERIALIZER valueSerializer, Integer ttl) throws Exception { // TODO Auto-generated method stub - return null; - } - - @Override - public MutationResult insertRow(K rowKey, String columnFamily, - Map columnValues, Serializer keySerializer, - Serializer nameSerializer, Serializer valueSerializer, - HConsistencyLevel level, Integer ttl) { - // TODO Auto-generated method stub - return null; + } @Override - public MutationResult insertRows( + public void insertRows(String keyspace, Map> rowColumnValues, String columnFamily, - Serializer keySerializer, Serializer nameSerializer, - Serializer valueSerializer, HConsistencyLevel level, Integer ttl) { + CMB_SERIALIZER keySerializer, CMB_SERIALIZER nameSerializer, + CMB_SERIALIZER valueSerializer, Integer ttl) throws Exception { // TODO Auto-generated method stub - return null; + } @Override - public void delete(String columnFamily, K key, N column, - Serializer keySerializer, Serializer columnSerializer, - HConsistencyLevel level) throws HectorException { + public void delete(String keyspace, String columnFamily, K key, + N column, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnSerializer) throws Exception { // TODO Auto-generated method stub - + } @Override - public void deleteBatch(String columnFamily, List keyList, - List columnList, Serializer keySerializer, - HConsistencyLevel level, Serializer columnSerializer) - throws HectorException { + public void deleteBatch(String keyspace, String columnFamily, + List keyList, List columnList, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnSerializer) throws Exception { // TODO Auto-generated method stub - + } @Override - public void deleteSuperColumn( - String superColumnFamily, K key, SN superColumn, Serializer keySerializer, Serializer superColumnSerializer, - HConsistencyLevel level) - throws HectorException { + public void deleteSuperColumn(String keyspace, + String superColumnFamily, K key, SN superColumn, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER superColumnSerializer) + throws Exception { // TODO Auto-generated method stub - - } - - @Override - public UUID getTimeUUID(long timeMillis) throws InterruptedException { - // TODO Auto-generated method stub - return null; + } @Override - public UUID getUniqueTimeUUID(long millis) { - // TODO Auto-generated method stub - return null; - } - - @Override - public long getTimeLong(long timeMillis) throws InterruptedException { - // TODO Auto-generated method stub - return 0; - } - - @Override - public int getCount(String columnFamily, K key, - Serializer keySerializer, Serializer columnNameSerializer, - HConsistencyLevel level) throws HectorException { + public int getCount(String keyspace, String columnFamily, K key, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER columnNameSerializer) + throws Exception { // TODO Auto-generated method stub return 0; } @Override - public void incrementCounter(String columnFamily, K rowKey, - String columnName, int incrementBy, Serializer keySerializer, - Serializer columnNameSerializer, HConsistencyLevel level) { + public void incrementCounter(String keyspace, String columnFamily, + K rowKey, String columnName, int incrementBy, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER columnNameSerializer) + throws Exception { // TODO Auto-generated method stub - + } @Override - public void decrementCounter(String columnFamily, K rowKey, - String columnName, int decrementBy, Serializer keySerializer, - Serializer columnNameSerializer, HConsistencyLevel level) { + public void decrementCounter(String keyspace, String columnFamily, + K rowKey, String columnName, int decrementBy, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER columnNameSerializer) + throws Exception { // TODO Auto-generated method stub - + } @Override - public void deleteCounter(String columnFamily, K rowKey, - N columnName, Serializer keySerializer, - Serializer columnNameSerializer, HConsistencyLevel level) { + public void deleteCounter(String keyspace, String columnFamily, + K rowKey, N columnName, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnNameSerializer) throws Exception { // TODO Auto-generated method stub - + } @Override - public long getCounter(String columnFamily, K rowKey, N columnName, - Serializer keySerializer, Serializer columnNameSerializer, - HConsistencyLevel level) { + public long getCounter(String keyspace, String columnFamily, + K rowKey, N columnName, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnNameSerializer) throws Exception { // TODO Auto-generated method stub return 0; } - } diff --git a/src/com/comcast/cmb/common/persistence/CassandraHectorPersistence.java b/src/com/comcast/cmb/common/persistence/CassandraHectorPersistence.java index db8a26c3..d71d7918 100644 --- a/src/com/comcast/cmb/common/persistence/CassandraHectorPersistence.java +++ b/src/com/comcast/cmb/common/persistence/CassandraHectorPersistence.java @@ -70,6 +70,10 @@ import org.apache.log4j.Logger; import com.comcast.cmb.common.controller.CMBControllerServlet; +import com.comcast.cmb.common.persistence.AbstractCassandraPersistence.CMB_SERIALIZER; +import com.comcast.cmb.common.persistence.AbstractCassandraPersistence.CmbColumn; +import com.comcast.cmb.common.util.CMBErrorCodes; +import com.comcast.cmb.common.util.CMBException; import com.comcast.cmb.common.util.CMBProperties; import com.comcast.cmb.common.util.ValueAccumulator.AccumulatorName; @@ -78,13 +82,9 @@ * @author aseem, bwolf, vvenkatraman, jorge, baosen, michael */ public class CassandraHectorPersistence extends AbstractCassandraPersistence { - /** - * To support varying level of consistency levels for each query, we must - * hold on to a keyspace instance per consistency-level. - * - * Class is not thread-safe. It should be confined to use within a - * single-thread - */ + + //TODO: surround everything with try-finally + private static final int hectorPoolSize = CMBProperties.getInstance().getHectorPoolSize(); private static final String hectorBalancingPolicy = CMBProperties.getInstance().getHectorBalancingPolicy(); private static final Map credentials = CMBProperties.getInstance().getHectorCredentials(); @@ -92,38 +92,29 @@ public class CassandraHectorPersistence extends AbstractCassandraPersistence { protected String keyspaceName = CMBProperties.getInstance().getCMBKeyspace(); protected Cluster cluster; - protected Map keyspaces; - - protected static Random random = new Random(); + protected Map keyspaces; private static Logger logger = Logger.getLogger(CassandraHectorPersistence.class); - /** - * Returns the same consistency-level as passed in the constructor - */ - class SimpleConsistencyPolicy implements ConsistencyLevelPolicy { + private static class SimpleConsistencyPolicy implements ConsistencyLevelPolicy { + private final HConsistencyLevel level; - + public SimpleConsistencyPolicy(HConsistencyLevel l) { level = l; } - + @Override public HConsistencyLevel get(OperationType arg0, String arg1) { return level; } - + @Override public HConsistencyLevel get(OperationType arg0) { return level; } } - public CassandraHectorPersistence(String keyspaceName) { - this.keyspaceName = keyspaceName; - initPersistence(); - } - /** * Initialize the internal handlers to hector. Should be called only once in the begenning */ @@ -133,7 +124,7 @@ private void initPersistence() { CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator(); - cassandraHostConfigurator.setHosts(AbstractCassandraPersistence.CLUSTER_URL); + cassandraHostConfigurator.setHosts(CLUSTER_URL); cassandraHostConfigurator.setMaxActive(hectorPoolSize); cassandraHostConfigurator.setCassandraThriftSocketTimeout(CMBProperties.getInstance().getCassandraThriftSocketTimeOutMS()); @@ -162,11 +153,18 @@ private void initPersistence() { //cassandraHostConfigurator.setExhaustedPolicy(ExhaustedPolicy.WHEN_EXHAUSTED_GROW); cluster = HFactory.getOrCreateCluster(AbstractCassandraPersistence.CLUSTER_NAME, cassandraHostConfigurator, credentials); - keyspaces = new HashMap(); + keyspaces = new HashMap(); + + List keyspaceNames = new ArrayList(); + keyspaceNames.add(CMBProperties.getInstance().getCMBKeyspace()); + keyspaceNames.add(CMBProperties.getInstance().getCNSKeyspace()); + keyspaceNames.add(CMBProperties.getInstance().getCQSKeyspace()); + + //TODO: for now back to one consistency level for everything - for (HConsistencyLevel level : HConsistencyLevel.values()) { - Keyspace keyspace = HFactory.createKeyspace(keyspaceName, cluster, new SimpleConsistencyPolicy(level)); - keyspaces.put(level, keyspace); + for (String k : keyspaceNames) { + Keyspace keyspace = HFactory.createKeyspace(k, cluster, new SimpleConsistencyPolicy(CMBProperties.getInstance().getWriteConsistencyLevel())); + keyspaces.put(k, keyspace); } long ts2 = System.currentTimeMillis(); @@ -195,48 +193,99 @@ public boolean isAlive() { return alive; } - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#getKeySpace(me.prettyprint.hector.api.HConsistencyLevel) - */ - @Override - public Keyspace getKeySpace(HConsistencyLevel consistencyLevel) { - return keyspaces.get(consistencyLevel); + private Keyspace getKeyspace(String keyspace) { + return keyspaces.get(keyspace); + } + + private static Serializer getSerializer(CMB_SERIALIZER s) throws Exception { + if (s == CMB_SERIALIZER.STRING_SERIALIZER) { + return StringSerializer.get(); + } + throw new CMBException(CMBErrorCodes.InternalError, "Unknown serializer " + s); + } + + private List> getRows(List> rows) throws Exception { + + List> l = new ArrayList>(); + + for (Row r : rows) { + List> cmbColumns = new ArrayList>(); + ColumnSlice cs = r.getColumnSlice(); + List> cl = cs.getColumns(); + for (HColumn hc : cl) { + cmbColumns.add(new CmbColumn(hc.getName(), hc.getValue())); + } + l.add(new CmbRow(r.getKey(), cmbColumns)); + } + + return l; + } + + private CmbColumnSlice getSlice(ColumnSlice slice) throws Exception { + List> cl = new ArrayList>(); + for (HColumn s : slice.getColumns()) { + cl.add(new CmbColumn(s.getName(), s.getValue())); + } + return new CmbColumnSlice(cl); + } + + private CmbSuperColumn getSuperColumn(HSuperColumn superColumn) throws Exception { + List> columns = new ArrayList>(); + for (HColumn c : superColumn.getColumns()) { + columns.add(new CmbColumn(c.getName(), c.getValue())); + } + return new CmbSuperColumn(superColumn.getName(), columns); + } + + private List> getSuperColumns(List> superColumns) throws Exception { + List> l = new ArrayList>(); + for (HSuperColumn superColumn : superColumns) { + List> columns = new ArrayList>(); + for (HColumn c : superColumn.getColumns()) { + columns.add(new CmbColumn(c.getName(), c.getValue())); + } + l.add(new CmbSuperColumn(superColumn.getName(), columns)); + } + return l; + } + + private CmbSuperColumnSlice getSuperColumnSlice(SuperSlice superSlice) throws Exception { + List> l = new ArrayList>(); + for (HSuperColumn sc : superSlice.getSuperColumns()) { + l.add(getSuperColumn(sc)); + } + return new CmbSuperColumnSlice(l); } - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#update(me.prettyprint.cassandra.service.template.ColumnFamilyTemplate, K, N, V, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer) - */ @Override - public void update(String columnFamily, K key, N column, V value, Serializer keySerializer, Serializer nameSerializer, Serializer valueSerializer, HConsistencyLevel level) throws HectorException { + public void update(String keyspace, String columnFamily, K key, N column, V value, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER nameSerializer, CMB_SERIALIZER valueSerializer) throws Exception { long ts1 = System.currentTimeMillis(); logger.debug("event=update column_family=" + columnFamily + " key=" + key + " column=" + column + " value=" + value); - Mutator mutator = HFactory.createMutator(keyspaces.get(level), keySerializer); - mutator.addInsertion(key, columnFamily, HFactory.createColumn(column, value, nameSerializer, valueSerializer)); + Mutator mutator = HFactory.createMutator(getKeyspace(keyspace), getSerializer(keySerializer)); + mutator.addInsertion(key, columnFamily, HFactory.createColumn(column, value, getSerializer(nameSerializer), getSerializer(valueSerializer))); mutator.execute(); long ts2 = System.currentTimeMillis(); CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraTime, (ts2 - ts1)); CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraWrite, 1L); } - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#insertSuperColumn(java.lang.String, K, me.prettyprint.hector.api.Serializer, SN, java.lang.Integer, me.prettyprint.hector.api.Serializer, java.util.Map, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.HConsistencyLevel) - */ @Override - public MutationResult insertSuperColumn(String columnFamily, K key, Serializer keySerializer, SN superName, Integer ttl, - Serializer superNameSerializer, Map subColumnNameValues, Serializer columnSerializer, Serializer valueSerializer, - HConsistencyLevel level) throws HectorException { + public void insertSuperColumn(String keyspace, String columnFamily, K key, CMB_SERIALIZER keySerializer, SN superName, Integer ttl, + CMB_SERIALIZER superNameSerializer, Map subColumnNameValues, CMB_SERIALIZER columnSerializer, + CMB_SERIALIZER valueSerializer) throws Exception { long ts1 = System.currentTimeMillis(); logger.debug("event=insert_super_column key=" + key + " cf=" + columnFamily + " super_name=" + superName + " ttl=" + (ttl == null ? "null" : ttl) + " sub_column_values=" + subColumnNameValues); List> subColumns = new ArrayList>(); - Mutator mutator = HFactory.createMutator(keyspaces.get(level), keySerializer); + Mutator mutator = HFactory.createMutator(getKeyspace(keyspace), getSerializer(keySerializer)); for (N name : subColumnNameValues.keySet()) { V value = subColumnNameValues.get(name); - HColumn subColumn = HFactory.createColumn(name, value, columnSerializer, valueSerializer); + HColumn subColumn = HFactory.createColumn(name, value, getSerializer(columnSerializer), getSerializer(valueSerializer)); if (ttl != null) { subColumn.setTtl(ttl); @@ -245,33 +294,27 @@ public MutationResult insertSuperColumn(String columnFamily, K key subColumns.add(subColumn); } - HSuperColumn superColumn = HFactory.createSuperColumn(superName, subColumns, Calendar.getInstance().getTimeInMillis(), superNameSerializer, columnSerializer, valueSerializer); - MutationResult result = mutator.insert(key, columnFamily, superColumn); + HSuperColumn superColumn = HFactory.createSuperColumn(superName, subColumns, System.currentTimeMillis(), getSerializer(superNameSerializer), getSerializer(columnSerializer), getSerializer(valueSerializer)); + mutator.insert(key, columnFamily, superColumn); long ts2 = System.currentTimeMillis(); CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraTime, (ts2 - ts1)); CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraWrite, 1L); - - return result; } - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#insertSuperColumns(java.lang.String, K, me.prettyprint.hector.api.Serializer, java.util.Map, int, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.HConsistencyLevel) - */ @Override - public MutationResult insertSuperColumns(String columnFamily, - K key, Serializer keySerializer, + public void insertSuperColumns(String keyspace, String columnFamily, K key, CMB_SERIALIZER keySerializer, Map> superNameSubColumnsMap, int ttl, - Serializer superNameSerializer, Serializer columnSerializer, - Serializer valueSerializer, HConsistencyLevel level) - throws HectorException { + CMB_SERIALIZER superNameSerializer, CMB_SERIALIZER columnSerializer, + CMB_SERIALIZER valueSerializer) + throws Exception { long ts1 = System.currentTimeMillis(); logger.debug("event=insert_super_columns cf=" + columnFamily + " columns=" + superNameSubColumnsMap); List> subColumns = new ArrayList>(); - Mutator mutator = HFactory.createMutator(keyspaces.get(level), keySerializer); + Mutator mutator = HFactory.createMutator(getKeyspace(keyspace), getSerializer(keySerializer)); for (SN superName : superNameSubColumnsMap.keySet()) { @@ -283,148 +326,28 @@ public MutationResult insertSuperColumns(String columnFamily, for (N name : subColumnsMap.keySet()) { V value = subColumnsMap.get(name); - HColumn subColumn = HFactory.createColumn(name, value, columnSerializer, valueSerializer); + HColumn subColumn = HFactory.createColumn(name, value, getSerializer(columnSerializer), getSerializer(valueSerializer)); subColumn.setTtl(ttl); subColumns.add(subColumn); } - HSuperColumn superColumn = HFactory.createSuperColumn(superName, subColumns, Calendar.getInstance().getTimeInMillis(), superNameSerializer, columnSerializer, valueSerializer); + HSuperColumn superColumn = HFactory.createSuperColumn(superName, subColumns, System.currentTimeMillis(), getSerializer(superNameSerializer), getSerializer(columnSerializer), getSerializer(valueSerializer)); mutator.addInsertion(key, columnFamily, superColumn); superColumn = null; } } - MutationResult result = mutator.execute(); + mutator.execute(); long ts2 = System.currentTimeMillis(); CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraTime, (ts2 - ts1)); CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraWrite, superNameSubColumnsMap.size()); - - return result; } - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#read(me.prettyprint.cassandra.service.template.ColumnFamilyTemplate, java.lang.String, N, V) - */ - @Override - @SuppressWarnings("unchecked") - public V read(ColumnFamilyTemplate template, String key, N column, V returnType) throws HectorException { - - long ts1 = System.currentTimeMillis(); - - logger.debug("event=read cf=" + template.getColumnFamily() + " key=" + key + " column=" + column); - - try { - - ColumnFamilyResult res = template.queryColumns(key); - - if (returnType instanceof String) { - return (V) res.getString(column); - } else if (returnType instanceof Date) { - return (V) res.getDate(column); - } else if (returnType instanceof Integer) { - return (V) res.getInteger(column); - } else if (returnType instanceof Long) { - return (V) res.getLong(column); - } else if (returnType instanceof UUID) { - return (V) res.getUUID(column); - } else if (returnType instanceof byte[]) { - return (V) res.getByteArray(column); - } else if (returnType instanceof UUID) { - return (V) res.getUUID(column); - } else if (returnType instanceof Composite) { - - HColumn col = res.getColumn(column); - - if (col == null) { - return (V) null; - } - - return (V) Composite.fromByteBuffer(col.getValue()); - - } else if (returnType instanceof DynamicComposite) { - - HColumn col = res.getColumn(column); - - if (col == null) { - return (V) null; - } - - return (V) DynamicComposite.fromByteBuffer(col.getValue()); - - } else { - throw new IllegalArgumentException("Unsupported type of return type: " + returnType); - } - - } finally { - long ts2 = System.currentTimeMillis(); - CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraTime, (ts2 - ts1)); - } - } - - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#readNextNRows(java.lang.String, K, int, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.HConsistencyLevel) - */ - @Override - public List> readNextNRows(String columnFamily, - K lastKey, int numRows, Serializer keySerializer, Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level) { - return readNextNRows(columnFamily, lastKey, numRows, 100, keySerializer, columnNameSerializer, valueSerializer, level); - } - - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#readNextNRows(java.lang.String, K, int, int, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.HConsistencyLevel) - */ - @Override - public List> readNextNRows(String columnFamily, - K lastKey, int numRows, int numCols, Serializer keySerializer, Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level) { - - long ts1 = System.currentTimeMillis(); - - logger.debug("event=read_nextn_rows cf=" + columnFamily + " last_key=" + lastKey + " num_rows=" + numRows + " num_cols=" + numCols); - - List> rows = new ArrayList>(); - Keyspace keyspace = keyspaces.get(level); - - RangeSlicesQuery rangeSlicesQuery = HFactory.createRangeSlicesQuery(keyspace, keySerializer, columnNameSerializer, valueSerializer) - .setColumnFamily(columnFamily) - .setRange(null, null, false, numCols).setRowCount(numRows) - .setKeys(lastKey, null); - - QueryResult> result = rangeSlicesQuery.execute(); - - OrderedRows orderedRows = result.get(); - Iterator> rowsIterator = orderedRows.iterator(); - - if (lastKey != null && rowsIterator != null && rowsIterator.hasNext()) { - rowsIterator.next(); - } - - while (rowsIterator.hasNext()) { - - Row row = rowsIterator.next(); - - if (row.getColumnSlice().getColumns().isEmpty()) { - continue; - } - - rows.add(row); - } - - long ts2 = System.currentTimeMillis(); - CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraTime, (ts2 - ts1)); - CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraRead, 1L); - - return rows; - } - - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#readNextNNonEmptyRows(java.lang.String, K, int, int, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.HConsistencyLevel) - */ @Override - public List> readNextNNonEmptyRows(String columnFamily, K lastKey, int numRows, int numCols, - Serializer keySerializer, Serializer columnNameSerializer, Serializer valueSerializer, HConsistencyLevel level) { + public List> readNextNNonEmptyRows(String keyspace, String columnFamily, K lastKey, int numRows, int numCols, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER columnNameSerializer, + CMB_SERIALIZER valueSerializer) throws Exception { long ts1 = System.currentTimeMillis(); @@ -435,7 +358,6 @@ public List> readNextNNonEmptyRows(String columnFamily, K int pageSize = 100; List> rows = new ArrayList>(); - Keyspace keyspace = keyspaces.get(level); RangeSlicesQuery rangeSlicesQuery; // page through rows in increments of 100 until the desired number of @@ -443,8 +365,8 @@ public List> readNextNNonEmptyRows(String columnFamily, K while (true) { - rangeSlicesQuery = HFactory.createRangeSlicesQuery(keyspace, keySerializer, - columnNameSerializer, valueSerializer) + rangeSlicesQuery = HFactory.createRangeSlicesQuery(getKeyspace(keyspace), getSerializer(keySerializer), + getSerializer(columnNameSerializer), getSerializer(valueSerializer)) .setColumnFamily(columnFamily) .setRange(null, null, false, numCols).setRowCount(pageSize) .setKeys(lastKey, null); @@ -465,7 +387,7 @@ public List> readNextNNonEmptyRows(String columnFamily, K // return if there are no more rows in cassandra if (!rowsIterator.hasNext()) { - return rows; + return getRows(rows); } while (rowsIterator.hasNext()) { @@ -483,7 +405,7 @@ public List> readNextNNonEmptyRows(String columnFamily, K // return if we have the desired number of rows if (rows.size() >= numRows) { - return rows; + return getRows(rows); } } } @@ -494,34 +416,73 @@ public List> readNextNNonEmptyRows(String columnFamily, K } } - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#readNextNRows(java.lang.String, K, N, V, int, int, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.HConsistencyLevel) - */ @Override - public List> readNextNRows(String columnFamily, K lastKey, N whereColumn, V whereValue, int numRows, int numCols, - Serializer keySerializer, Serializer columnNameSerializer, Serializer valueSerializer, HConsistencyLevel level) { + public List> readNextNRows(String keyspace, String columnFamily, K lastKey, int numRows, int numCols, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER columnNameSerializer, + CMB_SERIALIZER valueSerializer) throws Exception { + + long ts1 = System.currentTimeMillis(); + + logger.debug("event=read_nextn_rows cf=" + columnFamily + " last_key=" + lastKey + " num_rows=" + numRows + " num_cols=" + numCols); + + List> rows = new ArrayList>(); + + RangeSlicesQuery rangeSlicesQuery = HFactory.createRangeSlicesQuery(getKeyspace(keyspace), getSerializer(keySerializer), getSerializer(columnNameSerializer), getSerializer(valueSerializer)) + .setColumnFamily(columnFamily) + .setRange(null, null, false, numCols).setRowCount(numRows) + .setKeys(lastKey, null); + + QueryResult> result = rangeSlicesQuery.execute(); + + OrderedRows orderedRows = result.get(); + Iterator> rowsIterator = orderedRows.iterator(); + + if (lastKey != null && rowsIterator != null && rowsIterator.hasNext()) { + rowsIterator.next(); + } + + while (rowsIterator.hasNext()) { + + Row row = rowsIterator.next(); + + if (row.getColumnSlice().getColumns().isEmpty()) { + continue; + } + + rows.add(row); + } + + long ts2 = System.currentTimeMillis(); + CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraTime, (ts2 - ts1)); + CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraRead, 1L); + + return getRows(rows); + } + + + @Override + public List> readNextNRows(String keyspace, String columnFamily, K lastKey, N whereColumn, V whereValue, + int numRows, int numCols, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer) throws Exception { Map columnValues = new HashMap(); columnValues.put(whereColumn, whereValue); - return readNextNRows(columnFamily, lastKey, columnValues, numRows, numCols, keySerializer, columnNameSerializer, valueSerializer, level); + return readNextNRows(keyspace, columnFamily, lastKey, columnValues, numRows, numCols, keySerializer, columnNameSerializer, valueSerializer); } - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#readNextNRows(java.lang.String, K, java.util.Map, int, int, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.HConsistencyLevel) - */ @Override - public List> readNextNRows(String columnFamily, K lastKey, Map columnValues, int numRows, int numCols, - Serializer keySerializer, Serializer columnNameSerializer, Serializer valueSerializer, HConsistencyLevel level) { + public List> readNextNRows(String keyspace, String columnFamily, K lastKey, Map columnValues, + int numRows, int numCols, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer) throws Exception { long ts1 = System.currentTimeMillis(); logger.debug("event=read_nextn_rows cf=" + columnFamily + " last_key=" + lastKey + " num_rows=" + numRows + " num_cols=" + numCols + " values=" + columnValues); List> rows = new ArrayList>(); - Keyspace keyspace = keyspaces.get(level); - IndexedSlicesQuery indexedSlicesQuery = HFactory.createIndexedSlicesQuery(keyspace, keySerializer, columnNameSerializer, valueSerializer) + IndexedSlicesQuery indexedSlicesQuery = HFactory.createIndexedSlicesQuery(getKeyspace(keyspace), getSerializer(keySerializer), getSerializer(columnNameSerializer), getSerializer(valueSerializer)) .setColumnFamily(columnFamily) .setRange(null, null, false, numCols).setRowCount(numRows) .setStartKey(lastKey); @@ -555,33 +516,19 @@ public List> readNextNRows(String columnFamily, K lastKey long ts2 = System.currentTimeMillis(); CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraTime, (ts2 - ts1)); - return rows; + return getRows(rows); } - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#readColumnSlice(java.lang.String, K, int, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.HConsistencyLevel) - */ - @Override - public ColumnSlice readColumnSlice(String columnFamily, K key, int numCols, - Serializer keySerializer, Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level) { - return readColumnSlice(columnFamily, key, null, null, numCols, keySerializer, columnNameSerializer, valueSerializer, level); - } - - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#readColumnSlice(java.lang.String, K, N, N, int, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.HConsistencyLevel) - */ @Override - public ColumnSlice readColumnSlice(String columnFamily, K key, N firstColumnName, N lastColumnName, int numCols, - Serializer keySerializer, Serializer columnNameSerializer, Serializer valueSerializer, HConsistencyLevel level) { + public CmbColumnSlice readColumnSlice(String keyspace, String columnFamily, K key, N firstColumnName, N lastColumnName, + int numCols, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer) throws Exception { long ts1 = System.currentTimeMillis(); logger.debug("event=read_row cf=" + columnFamily + " key=" + key + " first_col=" + firstColumnName + " last_col=" + lastColumnName + " num_cols=" + numCols); - Keyspace keyspace = keyspaces.get(level); - - SliceQuery sliceQuery = HFactory.createSliceQuery(keyspace, keySerializer, columnNameSerializer, valueSerializer) + SliceQuery sliceQuery = HFactory.createSliceQuery(getKeyspace(keyspace), getSerializer(keySerializer), getSerializer(columnNameSerializer), getSerializer(valueSerializer)) .setColumnFamily(columnFamily) .setRange(firstColumnName, lastColumnName, false, numCols) .setKey(key); @@ -597,7 +544,7 @@ public ColumnSlice readColumnSlice(String columnFamily, K key, N return null; } - return slice; + return getSlice(slice); /*RangeSlicesQuery rangeSliceQuery = HFactory.createRangeSlicesQuery(keyspace, keySerializer, columnNameSerializer, valueSerializer) .setColumnFamily(columnFamily) @@ -623,13 +570,11 @@ public ColumnSlice readColumnSlice(String columnFamily, K key, N return row.getColumnSlice();*/ } - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#readRowFromSuperColumnFamily(java.lang.String, K, SN, SN, int, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.HConsistencyLevel) - */ @Override - public SuperSlice readRowFromSuperColumnFamily(String columnFamily, K key, SN firstColumnName, SN lastColumnName, int numCols, - Serializer keySerializer, Serializer superNameSerializer, Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level) { + public CmbSuperColumnSlice readRowFromSuperColumnFamily(String keyspace, String columnFamily, K key, SN firstColumnName, SN lastColumnName, + int numCols, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER superNameSerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer) throws Exception { long ts1 = System.currentTimeMillis(); @@ -637,9 +582,7 @@ public SuperSlice readRowFromSuperColumnFamily(String co try { - Keyspace keyspace = keyspaces.get(level); - - SuperSliceQuery rangeSlicesQuery = HFactory.createSuperSliceQuery(keyspace, keySerializer, superNameSerializer, columnNameSerializer, valueSerializer) + SuperSliceQuery rangeSlicesQuery = HFactory.createSuperSliceQuery(getKeyspace(keyspace), getSerializer(keySerializer), getSerializer(superNameSerializer), getSerializer(columnNameSerializer), getSerializer(valueSerializer)) .setColumnFamily(columnFamily) .setRange(firstColumnName, lastColumnName, false, numCols) .setKey(key); @@ -654,7 +597,7 @@ public SuperSlice readRowFromSuperColumnFamily(String co return null; } - return superSlice; + return getSuperColumnSlice(superSlice); } finally { long ts2 = System.currentTimeMillis(); @@ -662,13 +605,10 @@ public SuperSlice readRowFromSuperColumnFamily(String co } } - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#readColumnFromSuperColumnFamily(java.lang.String, K, SN, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.HConsistencyLevel) - */ @Override - public HSuperColumn readColumnFromSuperColumnFamily(String columnFamily, K key, SN columnName, - Serializer keySerializer, Serializer superNameSerializer, Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level) { + public CmbSuperColumn readColumnFromSuperColumnFamily(String keyspace, String columnFamily, K key, SN columnName, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER superNameSerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer) throws Exception { long ts1 = System.currentTimeMillis(); @@ -676,9 +616,7 @@ public HSuperColumn readColumnFromSuperColumnFamily(Stri try { - Keyspace keyspace = keyspaces.get(level); - - SuperColumnQuery superColumnQuery = HFactory.createSuperColumnQuery(keyspace, keySerializer, superNameSerializer, columnNameSerializer, valueSerializer) + SuperColumnQuery superColumnQuery = HFactory.createSuperColumnQuery(getKeyspace(keyspace), getSerializer(keySerializer), getSerializer(superNameSerializer), getSerializer(columnNameSerializer), getSerializer(valueSerializer)) .setColumnFamily(columnFamily) .setSuperName(columnName) .setKey(key); @@ -693,7 +631,7 @@ public HSuperColumn readColumnFromSuperColumnFamily(Stri return null; } - return superColumn; + return getSuperColumn(superColumn); } finally { long ts2 = System.currentTimeMillis(); @@ -701,13 +639,11 @@ public HSuperColumn readColumnFromSuperColumnFamily(Stri } } - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#readMultipleColumnsFromSuperColumnFamily(java.lang.String, java.util.Collection, java.util.Collection, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.HConsistencyLevel) - */ @Override - public List> readMultipleColumnsFromSuperColumnFamily(String columnFamily, Collection keys, Collection columnNames, - Serializer keySerializer, Serializer superNameSerializer, Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level) { + public List> readMultipleColumnsFromSuperColumnFamily(String keyspace, String columnFamily, Collection keys, + Collection columnNames, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER superNameSerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer) throws Exception { List> list = new ArrayList>(); @@ -717,9 +653,7 @@ public List> readMultipleColumnsFromSuperCo try { - Keyspace keyspace = keyspaces.get(level); - - MultigetSuperSliceQuery query = HFactory.createMultigetSuperSliceQuery(keyspace, keySerializer, superNameSerializer, columnNameSerializer, valueSerializer) + MultigetSuperSliceQuery query = HFactory.createMultigetSuperSliceQuery(getKeyspace(keyspace), getSerializer(keySerializer), getSerializer(superNameSerializer), getSerializer(columnNameSerializer), getSerializer(valueSerializer)) .setColumnFamily(columnFamily) .setColumnNames(columnNames) .setKeys(keys); @@ -737,7 +671,7 @@ public List> readMultipleColumnsFromSuperCo CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraRead, 1L); - return list; + return getSuperColumns(list); } finally { long ts2 = System.currentTimeMillis(); @@ -745,13 +679,11 @@ public List> readMultipleColumnsFromSuperCo } } - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#readColumnsFromSuperColumnFamily(java.lang.String, K, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.HConsistencyLevel, SN, SN, int) - */ @Override - public List> readColumnsFromSuperColumnFamily(String columnFamily, K key, - Serializer keySerializer, Serializer superNameSerializer, Serializer columnNameSerializer, Serializer valueSerializer, - HConsistencyLevel level, SN firstCol, SN lastCol, int numCol) { + public List> readColumnsFromSuperColumnFamily(String keyspace, String columnFamily, K key, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER superNameSerializer, + CMB_SERIALIZER columnNameSerializer, CMB_SERIALIZER valueSerializer, + SN firstCol, SN lastCol, int numCol) throws Exception { long ts1 = System.currentTimeMillis(); @@ -759,9 +691,7 @@ public List> readColumnsFromSuperColumnFam try { - Keyspace keyspace = keyspaces.get(level); - - SuperSliceQuery superSliceQuery = HFactory.createSuperSliceQuery(keyspace, keySerializer, superNameSerializer, columnNameSerializer, valueSerializer) + SuperSliceQuery superSliceQuery = HFactory.createSuperSliceQuery(getKeyspace(keyspace), getSerializer(keySerializer), getSerializer(superNameSerializer), getSerializer(columnNameSerializer), getSerializer(valueSerializer)) .setColumnFamily(columnFamily) .setKey(key) .setRange(firstCol, lastCol, false, numCol); @@ -770,7 +700,7 @@ public List> readColumnsFromSuperColumnFam CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraRead, 1L); - return result.get().getSuperColumns(); + return getSuperColumns(result.get().getSuperColumns()); } finally { long ts2 = System.currentTimeMillis(); @@ -778,39 +708,21 @@ public List> readColumnsFromSuperColumnFam } } - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#insertOrUpdateRow(java.lang.String, java.lang.String, java.util.Map, me.prettyprint.hector.api.HConsistencyLevel) - */ @Override - public MutationResult insertOrUpdateRow(String rowKey, String columnFamily, Map columnValues, HConsistencyLevel level) { - return insertRow(rowKey, columnFamily, columnValues, level, null); - } - - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#insertRow(java.lang.String, java.lang.String, java.util.Map, me.prettyprint.hector.api.HConsistencyLevel, java.lang.Integer) - */ - @Override - public MutationResult insertRow(String rowKey, String columnFamily, Map columnValues, HConsistencyLevel level, Integer ttl) { - return this.insertRow(rowKey, columnFamily, columnValues, StringSerializer.get(), StringSerializer.get(), StringSerializer.get(), level, ttl); - } - - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#insertRow(K, java.lang.String, java.util.Map, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.HConsistencyLevel, java.lang.Integer) - */ - @Override - public MutationResult insertRow(K rowKey, String columnFamily, Map columnValues, - Serializer keySerializer, Serializer nameSerializer, Serializer valueSerializer, - HConsistencyLevel level, Integer ttl) { + public void insertRow(K rowKey, + String keyspace, String columnFamily, Map columnValues, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER nameSerializer, + CMB_SERIALIZER valueSerializer, Integer ttl) throws Exception { long ts1 = System.currentTimeMillis(); logger.debug("event=insert_row key=" + rowKey + " cf=" + columnFamily + " ttl=" + (ttl == null ? "null" : ttl)); - Mutator mutator = HFactory.createMutator(keyspaces.get(level), keySerializer); + Mutator mutator = HFactory.createMutator(getKeyspace(keyspace), getSerializer(keySerializer)); for (N key : columnValues.keySet()) { - HColumn col = HFactory.createColumn(key, columnValues.get(key), nameSerializer, valueSerializer); + HColumn col = HFactory.createColumn(key, columnValues.get(key), getSerializer(nameSerializer), getSerializer(valueSerializer)); if (ttl != null) { col.setTtl(ttl); @@ -825,22 +737,18 @@ public MutationResult insertRow(K rowKey, String columnFamily, Map MutationResult insertRows(Map> rowColumnValues, String columnFamily, - Serializer keySerializer, Serializer nameSerializer, Serializer valueSerializer, HConsistencyLevel level, Integer ttl) { + public void insertRows(String keyspace, Map> rowColumnValues, String columnFamily, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER nameSerializer, + CMB_SERIALIZER valueSerializer, Integer ttl) throws Exception { long ts1 = System.currentTimeMillis(); logger.debug("event=insert_rows row_column_values=" + rowColumnValues + " cf=" + columnFamily + " ttl=" + (ttl == null ? "null" : ttl)); - Mutator mutator = HFactory.createMutator(keyspaces.get(level), keySerializer); + Mutator mutator = HFactory.createMutator(getKeyspace(keyspace), getSerializer(keySerializer)); for (K rowKey : rowColumnValues.keySet()) { @@ -848,7 +756,7 @@ public MutationResult insertRows(Map> rowColumnValues, St for (N key : columnValues.keySet()) { - HColumn col = HFactory.createColumn(key, columnValues.get(key), nameSerializer, valueSerializer); + HColumn col = HFactory.createColumn(key, columnValues.get(key), getSerializer(nameSerializer), getSerializer(valueSerializer)); if (ttl != null) { col.setTtl(ttl); @@ -864,8 +772,6 @@ public MutationResult insertRows(Map> rowColumnValues, St long ts2 = System.currentTimeMillis(); CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraTime, (ts2 - ts1)); - - return result; } /** @@ -890,18 +796,16 @@ public MutationResult insertRows(Map> rowColumnValues, St return res; }*/ - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#delete(me.prettyprint.cassandra.service.template.ColumnFamilyTemplate, K, N) - */ @Override - public void delete(String columnFamily, K key, N column, Serializer keySerializer, Serializer columnSerializer, HConsistencyLevel level) throws HectorException { + public void delete(String keyspace, String columnFamily, K key, N column, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER columnSerializer) throws Exception { long ts1 = System.currentTimeMillis(); logger.debug("event=delete key=" + key + " column=" + column + " cf=" + columnFamily); - Mutator mutator = HFactory.createMutator(keyspaces.get(level), keySerializer); + Mutator mutator = HFactory.createMutator(getKeyspace(keyspace), getSerializer(keySerializer)); if (column != null) { - mutator.addDeletion(key, columnFamily, column, columnSerializer); + mutator.addDeletion(key, columnFamily, column, getSerializer(columnSerializer)); } else { mutator.addDeletion(key, columnFamily); } @@ -912,25 +816,22 @@ public void delete(String columnFamily, K key, N column, Serializer ke CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraTime, (ts2 - ts1)); } - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#deleteBatch(java.lang.String, java.util.List, java.util.List, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.HConsistencyLevel, me.prettyprint.hector.api.Serializer) - */ @Override - public void deleteBatch(String columnFamily, List keyList, List columnList, - Serializer keySerializer, - HConsistencyLevel level, - Serializer columnSerializer) throws HectorException { + public void deleteBatch(String keyspace, String columnFamily, + List keyList, List columnList, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnSerializer) throws Exception { long ts1 = System.currentTimeMillis(); - Mutator mutator = HFactory.createMutator(keyspaces.get(level), keySerializer); - if(columnList==null ||columnList.isEmpty()){ + Mutator mutator = HFactory.createMutator(getKeyspace(keyspace), getSerializer(keySerializer)); + + if (columnList==null ||columnList.isEmpty()) { for (int i=0; i< keyList.size();i++) { mutator.addDeletion(keyList.get(i), columnFamily); } - }else{ + } else { for (int i=0; i< keyList.size();i++) { - mutator.addDeletion(keyList.get(i), columnFamily, columnList.get(i), columnSerializer); + mutator.addDeletion(keyList.get(i), columnFamily, columnList.get(i), getSerializer(columnSerializer)); } } @@ -940,19 +841,16 @@ public void deleteBatch(String columnFamily, List keyList, List col long ts2 = System.currentTimeMillis(); CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraTime, (ts2 - ts1)); } - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#deleteSuperColumn(me.prettyprint.cassandra.service.template.SuperCfTemplate, K, SN) - */ + @Override - public void deleteSuperColumn(String superColumnFamily, K key, SN superColumn, Serializer keySerializer, Serializer superColumnSerializer, - HConsistencyLevel level) throws HectorException { + public void deleteSuperColumn(String keyspace, String superColumnFamily, K key, SN superColumn, CMB_SERIALIZER keySerializer, CMB_SERIALIZER superColumnSerializer) throws Exception { long ts1 = System.currentTimeMillis(); logger.debug("event=delete key=" + key + " super_column=" + superColumn + " cf=" + superColumnFamily); - Mutator mutator = HFactory.createMutator(keyspaces.get(level), keySerializer); + Mutator mutator = HFactory.createMutator(getKeyspace(keyspace), getSerializer(keySerializer)); if (superColumn != null) { - mutator.addSuperDelete(key, superColumnFamily, superColumn, superColumnSerializer); + mutator.addSuperDelete(key, superColumnFamily, superColumn, getSerializer(superColumnSerializer)); } else { mutator.addSuperDelete(key, superColumnFamily, null, null); } @@ -963,43 +861,16 @@ public void deleteSuperColumn(String superColumnFamily, K key, SN sup CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraTime, (ts2 - ts1)); } - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#getTimeUUID(long) - */ - @Override - public java.util.UUID getTimeUUID(long timeMillis) throws InterruptedException { - return new java.util.UUID(newTime(timeMillis, false), com.eaio.uuid.UUIDGen.getClockSeqAndNode()); - } - - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#getUniqueTimeUUID(long) - */ - @Override - public java.util.UUID getUniqueTimeUUID(long millis) { - return new java.util.UUID(com.eaio.uuid.UUIDGen.createTime(millis), com.eaio.uuid.UUIDGen.getClockSeqAndNode()); - } - - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#getTimeLong(long) - */ - @Override - public long getTimeLong(long timeMillis) throws InterruptedException { - long newTime = timeMillis * 1000000000 + (System.nanoTime() % 1000000) * 1000 + random.nextInt(999999); - return newTime; - } - - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#getCount(java.lang.String, K, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.HConsistencyLevel) - */ @Override - public int getCount(String columnFamily, K key, Serializer keySerializer, Serializer columnNameSerializer, HConsistencyLevel level) throws HectorException { + public int getCount(String keyspace, String columnFamily, K key, + CMB_SERIALIZER keySerializer, CMB_SERIALIZER columnNameSerializer) throws Exception { long ts1 = System.currentTimeMillis(); logger.debug("event=get_count cf=" + columnFamily + " key=" + key); @SuppressWarnings("unchecked") - MultigetCountQuery query = new MultigetCountQuery(keyspaces.get(level), keySerializer, columnNameSerializer) + MultigetCountQuery query = new MultigetCountQuery(getKeyspace(keyspace), getSerializer(keySerializer), getSerializer(columnNameSerializer)) .setColumnFamily(columnFamily).setKeys(key) .setRange(null, null, 2000000000); @@ -1017,35 +888,33 @@ public int getCount(String columnFamily, K key, Serializer keySerializ return count; } - throw new HectorException("Count not found for key " + key); + return -1; } - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#incrementCounter(java.lang.String, K, java.lang.String, int, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.HConsistencyLevel) - */ @Override - public void incrementCounter(String columnFamily, K rowKey, String columnName, int incrementBy, Serializer keySerializer, Serializer columnNameSerializer, HConsistencyLevel level) { + public void incrementCounter(String keyspace, String columnFamily, K rowKey, + String columnName, int incrementBy, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnNameSerializer) throws Exception { logger.debug("event=increment_counter cf=" + columnFamily + " key=" + rowKey + " column=" + columnName + " inc=" + incrementBy); - Mutator mutator = HFactory.createMutator(keyspaces.get(level), keySerializer); + Mutator mutator = HFactory.createMutator(getKeyspace(keyspace), getSerializer(keySerializer)); mutator.incrementCounter(rowKey, columnFamily, columnName, incrementBy); mutator.execute(); CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraWrite, 1L); } - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#decrementCounter(java.lang.String, K, java.lang.String, int, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.HConsistencyLevel) - */ @Override - public void decrementCounter(String columnFamily, K rowKey, String columnName, int decrementBy, Serializer keySerializer, Serializer columnNameSerializer, HConsistencyLevel level) { + public void decrementCounter(String keyspace, String columnFamily, K rowKey, + String columnName, int decrementBy, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnNameSerializer) throws Exception { long ts1 = System.currentTimeMillis(); logger.debug("event=decrement_counter cf=" + columnFamily + " key=" + rowKey + " column=" + columnName + " dec=" + decrementBy); - Mutator mutator = HFactory.createMutator(keyspaces.get(level), keySerializer); + Mutator mutator = HFactory.createMutator(getKeyspace(keyspace), getSerializer(keySerializer)); mutator.decrementCounter(rowKey, columnFamily, columnName, decrementBy); mutator.execute(); @@ -1054,18 +923,17 @@ public void decrementCounter(String columnFamily, K rowKey, String column CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraTime, (ts2 - ts1)); } - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#deleteCounter(java.lang.String, K, N, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.HConsistencyLevel) - */ @Override - public void deleteCounter(String columnFamily, K rowKey, N columnName, Serializer keySerializer, Serializer columnNameSerializer, HConsistencyLevel level) { + public void deleteCounter(String keyspace, String columnFamily, K rowKey, + N columnName, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnNameSerializer) throws Exception { long ts1 = System.currentTimeMillis(); logger.debug("event=decrement_counter cf=" + columnFamily + " key=" + rowKey + " column=" + columnName); - Mutator mutator = HFactory.createMutator(keyspaces.get(level), keySerializer); - mutator.deleteCounter(rowKey, columnFamily, columnName, columnNameSerializer); + Mutator mutator = HFactory.createMutator(getKeyspace(keyspace), getSerializer(keySerializer)); + mutator.deleteCounter(rowKey, columnFamily, columnName, getSerializer(columnNameSerializer)); mutator.execute(); CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraWrite, 1L); @@ -1073,18 +941,17 @@ public void deleteCounter(String columnFamily, K rowKey, N columnName, Se CMBControllerServlet.valueAccumulator.addToCounter(AccumulatorName.CassandraTime, (ts2 - ts1)); } - /* (non-Javadoc) - * @see com.comcast.cmb.common.persistence.IPersistence#getCounter(java.lang.String, K, N, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.Serializer, me.prettyprint.hector.api.HConsistencyLevel) - */ @Override - public long getCounter(String columnFamily, K rowKey, N columnName, Serializer keySerializer, Serializer columnNameSerializer, HConsistencyLevel level) { + public long getCounter(String keyspace, String columnFamily, K rowKey, + N columnName, CMB_SERIALIZER keySerializer, + CMB_SERIALIZER columnNameSerializer) throws Exception { long ts1 = System.currentTimeMillis(); logger.debug("event=get_counter cf=" + columnFamily + " key=" + rowKey + " column=" + columnName); try { - CounterQuery countQuery = HFactory.createCounterColumnQuery(keyspaces.get(level), keySerializer, columnNameSerializer); + CounterQuery countQuery = HFactory.createCounterColumnQuery(getKeyspace(keyspace), getSerializer(keySerializer), getSerializer(columnNameSerializer)); countQuery.setColumnFamily(columnFamily).setKey(rowKey).setName(columnName); QueryResult> result = countQuery.execute(); diff --git a/src/com/comcast/cmb/common/persistence/CassandraPersistenceFactory.java b/src/com/comcast/cmb/common/persistence/CassandraPersistenceFactory.java index c293548a..75320e0c 100644 --- a/src/com/comcast/cmb/common/persistence/CassandraPersistenceFactory.java +++ b/src/com/comcast/cmb/common/persistence/CassandraPersistenceFactory.java @@ -11,7 +11,7 @@ private void CassandraPersistenceFactory() { public static AbstractCassandraPersistence getInstance(String keyspace) { if (!cassandraPersistenceMap.containsKey(keyspace)) { - cassandraPersistenceMap.put(keyspace, new CassandraHectorPersistence(keyspace)); + cassandraPersistenceMap.put(keyspace, new CassandraHectorPersistence()); } return cassandraPersistenceMap.get(keyspace); }