Skip to content

Commit

Permalink
rework cursor store sync w.r.t to index order. resolve issues with sk…
Browse files Browse the repository at this point in the history
  • Loading branch information
gtully committed Aug 29, 2014
1 parent b2afb8c commit 54e2e3b
Show file tree
Hide file tree
Showing 35 changed files with 1,152 additions and 350 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -47,7 +45,6 @@
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.ResourceAllocationException;
import javax.transaction.xa.XAException;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
Expand Down Expand Up @@ -75,20 +72,19 @@
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.IndexListener;
import org.apache.activemq.store.ListenableFuture;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.transaction.Transaction;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.util.BrokerSupport;
Expand All @@ -101,7 +97,7 @@
* The Queue is a List of MessageEntry objects that are dispatched to matching
* subscriptions.
*/
public class Queue extends BaseDestination implements Task, UsageListener {
public class Queue extends BaseDestination implements Task, UsageListener, IndexListener {
protected static final Logger LOG = LoggerFactory.getLogger(Queue.class);
protected final TaskRunnerFactory taskFactory;
protected TaskRunner taskRunner;
Expand Down Expand Up @@ -241,6 +237,9 @@ public Queue(BrokerService brokerService, final ActiveMQDestination destination,
super(brokerService, store, destination, parentStats);
this.taskFactory = taskFactory;
this.dispatchSelector = new QueueDispatchSelector(destination);
if (store != null) {
store.registerIndexListener(this);
}
}

@Override
Expand Down Expand Up @@ -746,158 +745,81 @@ private void registerCallbackForNotFullNotification() {
}
}

final ConcurrentHashMap<Transaction, SendSync> sendSyncs = new ConcurrentHashMap<Transaction, SendSync>();
private final LinkedList<Transaction> orderIndexUpdates = new LinkedList<Transaction>();

// roll up all message sends
class SendSync extends Synchronization {

class MessageContext {
public Message message;
public ConnectionContext context;

public MessageContext(ConnectionContext context, Message message) {
this.context = context;
this.message = message;
}
}

final Transaction transaction;
List<MessageContext> additions = new ArrayList<MessageContext>();

public SendSync(Transaction transaction) {
this.transaction = transaction;
}

public void add(ConnectionContext context, Message message) {
additions.add(new MessageContext(context, message));
}
private final LinkedList<MessageContext> indexOrderedCursorUpdates = new LinkedList<>();

@Override
public void beforeCommit() throws Exception {
synchronized (orderIndexUpdates) {
orderIndexUpdates.addLast(transaction);
}
@Override
public void onAdd(MessageContext messageContext) {
synchronized (indexOrderedCursorUpdates) {
indexOrderedCursorUpdates.addLast(messageContext);
}
}

@Override
public void afterCommit() throws Exception {
ArrayList<SendSync> syncs = new ArrayList<SendSync>(200);
sendLock.lockInterruptibly();
try {
synchronized (orderIndexUpdates) {
Transaction next = orderIndexUpdates.peek();
while( next!=null && next.isCommitted() ) {
syncs.add(sendSyncs.remove(orderIndexUpdates.removeFirst()));
next = orderIndexUpdates.peek();
private void doPendingCursorAdditions() throws Exception {
LinkedList<MessageContext> orderedUpdates = new LinkedList<>();
sendLock.lockInterruptibly();
try {
synchronized (indexOrderedCursorUpdates) {
MessageContext candidate = indexOrderedCursorUpdates.peek();
while (candidate != null && candidate.message.getMessageId().getFutureOrSequenceLong() != null) {
candidate = indexOrderedCursorUpdates.removeFirst();
// check for duplicate adds suppressed by the store
if (candidate.message.getMessageId().getFutureOrSequenceLong() instanceof Long && ((Long)candidate.message.getMessageId().getFutureOrSequenceLong()).compareTo(-1l) == 0) {
LOG.warn("{} messageStore indicated duplicate add attempt for {}, suppressing duplicate dispatch", this, candidate.message.getMessageId());
} else {
orderedUpdates.add(candidate);
}
candidate = indexOrderedCursorUpdates.peek();
}
for (SendSync sync : syncs) {
sync.processSend();
}
} finally {
sendLock.unlock();
}
for (SendSync sync : syncs) {
sync.processSent();
}
}

// called with sendLock
private void processSend() throws Exception {

for (Iterator<MessageContext> iterator = additions.iterator(); iterator.hasNext(); ) {
MessageContext messageContext = iterator.next();
// It could take while before we receive the commit
// op, by that time the message could have expired..
if (broker.isExpired(messageContext.message)) {
broker.messageExpired(messageContext.context, messageContext.message, null);
destinationStatistics.getExpired().increment();
iterator.remove();
continue;
for (MessageContext messageContext : orderedUpdates) {
if (!cursorAdd(messageContext.message)) {
// cursor suppressed a duplicate
messageContext.duplicate = true;
}
sendMessage(messageContext.message);
messageContext.message.decrementReferenceCount();
}
} finally {
sendLock.unlock();
}

private void processSent() throws Exception {
for (MessageContext messageContext : additions) {
for (MessageContext messageContext : orderedUpdates) {
if (!messageContext.duplicate) {
messageSent(messageContext.context, messageContext.message);
}
}

@Override
public void afterRollback() throws Exception {
try {
for (MessageContext messageContext : additions) {
messageContext.message.decrementReferenceCount();
}
} finally {
sendSyncs.remove(transaction);
if (messageContext.onCompletion != null) {
messageContext.onCompletion.run();
}
}
orderedUpdates.clear();
}

class OrderedNonTransactionWorkTx extends Transaction {

@Override
public void commit(boolean onePhase) throws XAException, IOException {
}

@Override
public void rollback() throws XAException, IOException {
}

@Override
public int prepare() throws XAException, IOException {
return 0;
}
final class CursorAddSync extends Synchronization {

@Override
public TransactionId getTransactionId() {
return null;
}
private final MessageContext messageContext;

@Override
public Logger getLog() {
return null;
CursorAddSync(MessageContext messageContext) {
this.messageContext = messageContext;
}

@Override
public boolean isCommitted() {
return true;
}

@Override
public void addSynchronization(Synchronization s) {
try {
s.beforeCommit();
} catch (Exception e) {
LOG.error("Failed to add not transactional message to orderedWork", e);
public void afterCommit() throws Exception {
if (store != null && messageContext.message.isPersistent()) {
doPendingCursorAdditions();
} else {
cursorAdd(messageContext.message);
messageSent(messageContext.context, messageContext.message);
}
messageContext.message.decrementReferenceCount();
}
}

// called while holding the sendLock
private void registerSendSync(Message message, ConnectionContext context) {
final Transaction transaction =
message.isInTransaction() ? context.getTransaction()
: new OrderedNonTransactionWorkTx();
Queue.SendSync currentSync = sendSyncs.get(transaction);
if (currentSync == null) {
currentSync = new Queue.SendSync(transaction);
transaction.addSynchronization(currentSync);
sendSyncs.put(transaction, currentSync);
@Override
public void afterRollback() throws Exception {
messageContext.message.decrementReferenceCount();
}
currentSync.add(context, message);
}

void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException,
Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
ListenableFuture<Object> result = null;
boolean needsOrderingWithTransactions = context.isInTransaction();

producerExchange.incrementSend();
checkUsage(context, producerExchange, message);
Expand All @@ -922,26 +844,11 @@ void doMessageSend(final ProducerBrokerExchange producerExchange, final Message
throw e;
}
}
// did a transaction commit beat us to the index?
synchronized (orderIndexUpdates) {
needsOrderingWithTransactions |= !orderIndexUpdates.isEmpty();
}
if (needsOrderingWithTransactions ) {
// If this is a transacted message.. increase the usage now so that
// a big TX does not blow up
// our memory. This increment is decremented once the tx finishes..
message.incrementReferenceCount();

registerSendSync(message, context);
} else {
// Add to the pending list, this takes care of incrementing the
// usage manager.
sendMessage(message);
}
orderedCursorAdd(message, context);
} finally {
sendLock.unlock();
}
if (!needsOrderingWithTransactions) {
if (store == null || (!context.isInTransaction() && !message.isPersistent())) {
messageSent(context, message);
}
if (result != null && message.isResponseRequired() && !result.isCancelled()) {
Expand All @@ -954,6 +861,17 @@ void doMessageSend(final ProducerBrokerExchange producerExchange, final Message
}
}

private void orderedCursorAdd(Message message, ConnectionContext context) throws Exception {
if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new CursorAddSync(new MessageContext(context, message, null)));
} else if (store != null && message.isPersistent()) {
doPendingCursorAdditions();
} else {
// no ordering issue with non persistent messages
cursorAdd(message);
}
}

private void checkUsage(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Message message) throws ResourceAllocationException, IOException, InterruptedException {
if (message.isPersistent()) {
if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
Expand Down Expand Up @@ -1860,10 +1778,10 @@ public void messageExpired(ConnectionContext context, Subscription subs, Message
}
}

final void sendMessage(final Message msg) throws Exception {
final boolean cursorAdd(final Message msg) throws Exception {
messagesLock.writeLock().lock();
try {
messages.addMessageLast(msg);
return messages.addMessageLast(msg);
} finally {
messagesLock.writeLock().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,7 @@ public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference
if (deadLetterStrategy.isSendToDeadLetterQueue(message)) {
// message may be inflight to other subscriptions so do not modify
message = message.copy();
message.getMessageId().setFutureOrSequenceLong(null);
stampAsExpired(message);
message.setExpiration(0);
if (!message.isPersistent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ public boolean isRecoveryRequired() {
public void addMessageFirst(MessageReference node) throws Exception {
}

public void addMessageLast(MessageReference node) throws Exception {
public boolean addMessageLast(MessageReference node) throws Exception {
return true;
}

public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
addMessageLast(node);
return true;
return addMessageLast(node);
}

public void addRecoveredMessage(MessageReference node) throws Exception {
Expand Down
Loading

0 comments on commit 54e2e3b

Please sign in to comment.