Skip to content

Commit

Permalink
Merge pull request apache#224 from usergrid/fetchsize-fixes
Browse files Browse the repository at this point in the history
Fetch size fixes and optimizations
  • Loading branch information
shawnfeldman committed Jun 25, 2014
2 parents bbae662 + fd29736 commit b31108f
Show file tree
Hide file tree
Showing 41 changed files with 1,655 additions and 313 deletions.
3 changes: 3 additions & 0 deletions stack/config/src/main/resources/usergrid-default.properties
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ cassandra.readcl=QUORUM
#Write consistency level for the cassandra cluster
cassandra.writecl=QUORUM

#The maximum number of pending mutations allowed in ram before it is flushed to cassandra
cassandra.mutation.flushsize=2000

#Keyspace to use for locking
#Note that if this is deployed in a production cluster, the RF on the keyspace MUST be updated to use an odd number for it's replication Factor.
#Even numbers for RF can potentially case the locks to fail, via "split brain" when read at QUORUM on lock verification
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.usergrid.persistence.cassandra.CounterUtils;
import org.apache.usergrid.persistence.cassandra.CounterUtils.AggregateCounterSelection;
import org.apache.usergrid.persistence.exceptions.TransactionNotFoundException;
import org.apache.usergrid.persistence.hector.CountingMutator;
import org.apache.usergrid.utils.UUIDUtils;

import com.fasterxml.uuid.UUIDComparator;
Expand All @@ -83,7 +84,7 @@

import static me.prettyprint.hector.api.factory.HFactory.createColumn;
import static me.prettyprint.hector.api.factory.HFactory.createCounterSliceQuery;
import static me.prettyprint.hector.api.factory.HFactory.createMutator;

import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
import static org.apache.usergrid.mq.Queue.QUEUE_CREATED;
import static org.apache.usergrid.mq.Queue.QUEUE_MODIFIED;
Expand Down Expand Up @@ -229,7 +230,8 @@ public Message batchPostToQueue( Mutator<ByteBuffer> batch, String queuePath, Me
@Override
public Message postToQueue( String queuePath, Message message ) {
long timestamp = cass.createTimestamp();
Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );

queuePath = normalizeQueuePath( queuePath );

Expand All @@ -248,7 +250,7 @@ public Message postToQueue( String queuePath, Message message ) {
break;
}

batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ), be );
for ( QueueInfo q : subscribers.getQueues() ) {
batchPostToQueue( batch, q.getPath(), message, indexUpdate, timestamp );

Expand Down Expand Up @@ -428,7 +430,8 @@ public QueueSet subscribeToQueue( String publisherQueuePath, String subscriberQu
UUID timestampUuid = newTimeUUID();
long timestamp = getTimestampInMicros( timestampUuid );

Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );

batchSubscribeToQueue( batch, publisherQueuePath, publisherQueueId, subscriberQueuePath, subscriberQueueId,
timestamp );
Expand Down Expand Up @@ -473,7 +476,8 @@ public QueueSet unsubscribeFromQueue( String publisherQueuePath, String subscrib
UUID timestampUuid = newTimeUUID();
long timestamp = getTimestampInMicros( timestampUuid );

Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );

batchUnsubscribeFromQueue( batch, publisherQueuePath, publisherQueueId, subscriberQueuePath, subscriberQueueId,
timestamp );
Expand Down Expand Up @@ -567,7 +571,8 @@ public QueueSet addSubscribersToQueue( String publisherQueuePath, List<String> s
UUID timestampUuid = newTimeUUID();
long timestamp = getTimestampInMicros( timestampUuid );

Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );

QueueSet queues = new QueueSet();

Expand Down Expand Up @@ -607,7 +612,8 @@ public QueueSet removeSubscribersFromQueue( String publisherQueuePath, List<Stri
UUID timestampUuid = newTimeUUID();
long timestamp = getTimestampInMicros( timestampUuid );

Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );

QueueSet queues = new QueueSet();

Expand Down Expand Up @@ -647,7 +653,8 @@ public QueueSet subscribeToQueues( String subscriberQueuePath, List<String> publ
UUID timestampUuid = newTimeUUID();
long timestamp = getTimestampInMicros( timestampUuid );

Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );

QueueSet queues = new QueueSet();

Expand Down Expand Up @@ -687,7 +694,8 @@ public QueueSet unsubscribeFromQueues( String subscriberQueuePath, List<String>
UUID timestampUuid = newTimeUUID();
long timestamp = getTimestampInMicros( timestampUuid );

Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );

QueueSet queues = new QueueSet();

Expand Down Expand Up @@ -721,7 +729,8 @@ public QueueSet unsubscribeFromQueues( String subscriberQueuePath, List<String>
@Override
public void incrementAggregateQueueCounters( String queuePath, String category, String counterName, long value ) {
long timestamp = cass.createTimestamp();
Mutator<ByteBuffer> m = createMutator( cass.getApplicationKeyspace( applicationId ), be );
Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );
counterUtils.batchIncrementAggregateCounters( m, applicationId, null, null, getQueueId( queuePath ), category,
counterName, value, timestamp );
batchExecute( m, CassandraService.RETRY_COUNT );
Expand Down Expand Up @@ -864,7 +873,8 @@ public Results getAggregateQueueCounters( String queuePath, CounterQuery query )
@Override
public void incrementQueueCounters( String queuePath, Map<String, Long> counts ) {
long timestamp = cass.createTimestamp();
Mutator<ByteBuffer> m = createMutator( cass.getApplicationKeyspace( applicationId ), be );
Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );
counterUtils.batchIncrementQueueCounters( m, getQueueId( queuePath ), counts, timestamp, applicationId );
batchExecute( m, CassandraService.RETRY_COUNT );
}
Expand All @@ -873,7 +883,8 @@ public void incrementQueueCounters( String queuePath, Map<String, Long> counts )
@Override
public void incrementQueueCounter( String queuePath, String name, long value ) {
long timestamp = cass.createTimestamp();
Mutator<ByteBuffer> m = createMutator( cass.getApplicationKeyspace( applicationId ), be );
Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );
counterUtils.batchIncrementQueueCounter( m, getQueueId( queuePath ), name, value, timestamp, applicationId );
batchExecute( m, CassandraService.RETRY_COUNT );
}
Expand Down Expand Up @@ -943,7 +954,8 @@ public Queue updateQueue( String queuePath, Queue queue ) {
UUID timestampUuid = newTimeUUID();
long timestamp = getTimestampInMicros( timestampUuid );

Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );

addQueueToMutator( batch, queue, timestamp );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.usergrid.mq.QueueResults;
import org.apache.usergrid.mq.cassandra.io.NoTransactionSearch.SearchParam;
import org.apache.usergrid.persistence.exceptions.QueueException;
import org.apache.usergrid.persistence.hector.CountingMutator;
import org.apache.usergrid.utils.UUIDUtils;

import me.prettyprint.hector.api.Keyspace;
Expand All @@ -46,7 +47,7 @@

import static me.prettyprint.hector.api.factory.HFactory.createColumn;
import static me.prettyprint.hector.api.factory.HFactory.createMultigetSliceQuery;
import static me.prettyprint.hector.api.factory.HFactory.createMutator;

import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
import static org.apache.usergrid.mq.Queue.QUEUE_NEWEST;
import static org.apache.usergrid.mq.Queue.QUEUE_OLDEST;
Expand Down Expand Up @@ -300,7 +301,7 @@ protected void writeClientPointer( UUID queueId, UUID consumerId, UUID lastRetur
// conditions with clock drift.
long colTimestamp = UUIDUtils.getTimestampInMicros( lastReturnedId );

Mutator<UUID> mutator = createMutator( ko, ue );
Mutator<UUID> mutator = CountingMutator.createFlushingMutator( ko, ue );

if ( logger.isDebugEnabled() )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.usergrid.persistence.cassandra.CassandraService;
import org.apache.usergrid.persistence.exceptions.QueueException;
import org.apache.usergrid.persistence.exceptions.TransactionNotFoundException;
import org.apache.usergrid.persistence.hector.CountingMutator;
import org.apache.usergrid.utils.UUIDUtils;

import me.prettyprint.hector.api.Keyspace;
Expand All @@ -43,7 +44,7 @@
import me.prettyprint.hector.api.query.SliceQuery;

import static me.prettyprint.hector.api.factory.HFactory.createColumn;
import static me.prettyprint.hector.api.factory.HFactory.createMutator;

import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
import static org.apache.usergrid.mq.cassandra.CassandraMQUtils.getConsumerId;
import static org.apache.usergrid.mq.cassandra.CassandraMQUtils.getQueueClientTransactionKey;
Expand Down Expand Up @@ -128,7 +129,8 @@ public UUID renewTransaction( String queuePath, UUID transactionId, QueueQuery q

logger.debug( "Writing new timeout at '{}' for message '{}'", expirationId, messageId );

Mutator<ByteBuffer> mutator = createMutator( ko, be );

Mutator<ByteBuffer> mutator = CountingMutator.createFlushingMutator( ko, be );

mutator.addInsertion( key, CONSUMER_QUEUE_TIMEOUTS.getColumnFamily(),
createColumn( expirationId, messageId, cass.createTimestamp(), ue, ue ) );
Expand Down Expand Up @@ -162,7 +164,7 @@ public void deleteTransaction( String queuePath, UUID transactionId, QueueQuery
private void deleteTransaction( UUID queueId, UUID consumerId, UUID transactionId )
{

Mutator<ByteBuffer> mutator = createMutator( ko, be );
Mutator<ByteBuffer> mutator = CountingMutator.createFlushingMutator( ko, be );
ByteBuffer key = getQueueClientTransactionKey( queueId, consumerId );

mutator.addDeletion( key, CONSUMER_QUEUE_TIMEOUTS.getColumnFamily(), transactionId, ue,
Expand Down Expand Up @@ -374,7 +376,7 @@ protected void deleteTransactionPointers( List<TransactionPointer> pointers, int
return;
}

Mutator<ByteBuffer> mutator = createMutator( ko, be );
Mutator<ByteBuffer> mutator = CountingMutator.createFlushingMutator( ko, be );
ByteBuffer key = getQueueClientTransactionKey( queueId, consumerId );

for ( int i = 0; i < maxIndex && i < pointers.size(); i++ )
Expand Down Expand Up @@ -406,7 +408,7 @@ protected void deleteTransactionPointers( List<TransactionPointer> pointers, int
protected void writeTransactions( List<Message> messages, final long futureTimeout, UUID queueId, UUID consumerId )
{

Mutator<ByteBuffer> mutator = createMutator( ko, be );
Mutator<ByteBuffer> mutator = CountingMutator.createFlushingMutator( ko, be );

ByteBuffer key = getQueueClientTransactionKey( queueId, consumerId );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,15 +328,8 @@ public static Mutator<ByteBuffer> buildSetIdListMutator( Mutator<ByteBuffer> bat


public static MutationResult batchExecute( Mutator<?> m, int retries ) {
for ( int i = 0; i < retries; i++ ) {
try {
return m.execute();
}
catch ( Exception e ) {
logger.error( "Unable to execute mutation, retrying...", e );
}
}
return m.execute();

}


Expand Down
Loading

0 comments on commit b31108f

Please sign in to comment.