Skip to content

Commit

Permalink
clean release messages on demand
Browse files Browse the repository at this point in the history
  • Loading branch information
nobodyiam committed Jan 22, 2017
1 parent cac111a commit 35cde2b
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,28 +1,50 @@
package com.ctrip.framework.apollo.biz.message;

import com.google.common.collect.Queues;

import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository;
import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory;
import com.ctrip.framework.apollo.tracer.Tracer;
import com.ctrip.framework.apollo.tracer.spi.Transaction;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.PostConstruct;

/**
* @author Jason Song([email protected])
*/
@Component
public class DatabaseMessageSender implements MessageSender {
private static final Logger logger = LoggerFactory.getLogger(DatabaseMessageSender.class);
private static final int CLEAN_QUEUE_MAX_SIZE = 100;
private BlockingQueue<Long> toClean = Queues.newLinkedBlockingQueue(CLEAN_QUEUE_MAX_SIZE);
private final ExecutorService cleanExecutorService;
private final AtomicBoolean cleanStopped;

@Autowired
private ReleaseMessageRepository releaseMessageRepository;

public DatabaseMessageSender() {
cleanExecutorService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create("DatabaseMessageSender", true));
cleanStopped = new AtomicBoolean(false);
}

@Override
@Transactional
public void sendMessage(String message, String channel) {
logger.info("Sending message {} to channel {}", message, channel);
if (!Objects.equals(channel, Topics.APOLLO_RELEASE_TOPIC)) {
Expand All @@ -33,7 +55,8 @@ public void sendMessage(String message, String channel) {
Tracer.logEvent("Apollo.AdminService.ReleaseMessage", message);
Transaction transaction = Tracer.newTransaction("Apollo.AdminService", "sendMessage");
try {
releaseMessageRepository.save(new ReleaseMessage(message));
ReleaseMessage newMessage = releaseMessageRepository.save(new ReleaseMessage(message));
toClean.offer(newMessage.getId());
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
logger.error("Sending message to database failed", ex);
Expand All @@ -42,4 +65,45 @@ public void sendMessage(String message, String channel) {
transaction.complete();
}
}

@PostConstruct
private void initialize() {
cleanExecutorService.submit(() -> {
while (!cleanStopped.get() && !Thread.currentThread().isInterrupted()) {
try {
Long rm = toClean.poll(1, TimeUnit.SECONDS);
if (rm != null) {
cleanMessage(rm);
} else {
TimeUnit.SECONDS.sleep(5);
}
} catch (Throwable ex) {
Tracer.logError(ex);
}
}
});
}

private void cleanMessage(Long id) {
boolean hasMore = true;
//double check in case the release message is rolled back
ReleaseMessage releaseMessage = releaseMessageRepository.findOne(id);
if (releaseMessage == null) {
return;
}
while (hasMore && !Thread.currentThread().isInterrupted()) {
List<ReleaseMessage> messages = releaseMessageRepository.findFirst100ByMessageAndIdLessThanOrderByIdAsc(
releaseMessage.getMessage(), releaseMessage.getId());

releaseMessageRepository.delete(messages);
hasMore = messages.size() == 100;

messages.forEach(toRemove -> Tracer.logEvent(
String.format("ReleaseMessage.Clean.%s", toRemove.getMessage()), String.valueOf(toRemove.getId())));
}
}

void stopClean() {
cleanStopped.set(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public interface ReleaseMessageRepository extends PagingAndSortingRepository<Rel

ReleaseMessage findTopByMessageInOrderByIdDesc(Collection<String> messages);

List<ReleaseMessage> findFirst100ByMessageAndIdLessThanOrderByIdAsc(String message, Long id);

@Query("select message, max(id) as id from ReleaseMessage where message in :messages group by message")
List<Object[]> findLatestReleaseMessagesGroupByMessages(@Param("messages") Collection<String> messages);
}

0 comments on commit 35cde2b

Please sign in to comment.