Skip to content

Commit

Permalink
Merge branch 'master' into astyanax
Browse files Browse the repository at this point in the history
  • Loading branch information
boriwo committed Apr 2, 2014
2 parents c221e72 + c7220d5 commit e18dd31
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 28 deletions.
26 changes: 26 additions & 0 deletions RELEASE.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,29 @@
CMB VERSION 2.2.39
--------------------------------------------------------------------

New features:

- AWS signature V4 support
- list number of subscriptions in web ui
- various web ui usability improvements (short cut links, alternating color in tables etc.)
- web ui for testing receive and delete message
- support publish to Redis channels
- support for aws sdk 1.6.12
- logging of thread pool sizes as well as async and io wait time
- logging of external http headers injected by nginx
- performance improvements for publish() (less logging, in-line api calls)
- ability to turn cns workers on/off dynamically (via api or web ui)
- reduced Redis key size to improve Redis memory consumption and performance

Bug fixes:

- various fixes for large number of subscribers (1 mio and more)
- cqs fix to support boto aws lib
- cns fix for multi DC support (workers to always look for messages in queues locally)
- minor fix for aws compliant policies
- bug fix for occasional NPE on startup (from initializing stats logging)
- fix for message size logging

CMB VERSION 2.2.37
--------------------------------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion packaging/linux/centos/cmb.spec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Summary: Cloud Message Bus - a clone of SQS/SNS
Name: cmb
Version: 2.2.38
Version: 2.2.39
BuildArch: noarch
Release: 1
Group: Applications
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<!-- Use this pom.xml file to create a single jar build for all cmb components using the embedded Jetty option (recommended) -->

<properties>
<buildnum>2.2.38</buildnum>
<buildnum>2.2.39</buildnum>
<projectname>cmb</projectname>
</properties>

Expand Down
31 changes: 16 additions & 15 deletions src/com/comcast/cmb/common/controller/CMBControllerServlet.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ abstract public class CMBControllerServlet extends HttpServlet {
*/
public final static ValueAccumulator valueAccumulator = new ValueAccumulator();

public final static String VERSION = "2.2.38";
public final static String VERSION = "2.2.39";

public final static int HARD_TIMEOUT_SEC = CMBProperties.getInstance().getCMBRequestTimeoutSec();

Expand Down Expand Up @@ -385,11 +385,11 @@ private void logStats(String action, long responseTimeMS, long redisTimeMS, long
}
}

private String getLogLine(AsyncContext asyncContext, CQSHttpServletRequest request, User user, long responseTimeMS, String success) {
private String getLogLine(AsyncContext asyncContext, CQSHttpServletRequest request, User user, long responseTimeMS, String status) {

StringBuffer logLine = new StringBuffer("");

logLine.append("event=request status="+success+" client=").append(request.getRemoteAddr());
logLine.append("event=request status="+status+" client=").append(request.getRemoteAddr());

logLine.append(((this instanceof CQSControllerServlet) ? (" queue_url=" + request.getRequestURL()) : ""));

Expand All @@ -403,17 +403,18 @@ private String getLogLine(AsyncContext asyncContext, CQSHttpServletRequest reque
logLine.append((user != null ? "user=" + user.getUserName() : ""));

if (request.getAttribute("lp") == null) {

logLine.append(" resp_ms=").append(responseTimeMS);
logLine.append(" cass_ms=" + valueAccumulator.getCounter(AccumulatorName.CassandraTime));
logLine.append(" cass_num_rd=" + valueAccumulator.getCounter(AccumulatorName.CassandraRead));
logLine.append(" cass_num_wr=" + valueAccumulator.getCounter(AccumulatorName.CassandraWrite));
logLine.append(((this instanceof CNSControllerServlet) ? (" cnscqs_ms=" + CMBControllerServlet.valueAccumulator.getCounter(AccumulatorName.CNSCQSTime)) : ""));
logLine.append(((this instanceof CQSControllerServlet) ? (" redis_ms=" + valueAccumulator.getCounter(AccumulatorName.RedisTime)) : ""));
logLine.append(" io_ms=" + valueAccumulator.getCounter(AccumulatorName.IOTime));
logLine.append(" asyncq_ms=" + valueAccumulator.getCounter(AccumulatorName.AsyncQueueTime));
logLine.append(" auth_ms=" + valueAccumulator.getCounter(AccumulatorName.CMBControllerPreHandleAction));

//if status is timeout for normal action, it does not have below info
if((status != null) && (!status.equals("timeout"))){
logLine.append(" resp_ms=").append(responseTimeMS);
logLine.append(" cass_ms=" + valueAccumulator.getCounter(AccumulatorName.CassandraTime));
logLine.append(" cass_num_rd=" + valueAccumulator.getCounter(AccumulatorName.CassandraRead));
logLine.append(" cass_num_wr=" + valueAccumulator.getCounter(AccumulatorName.CassandraWrite));
logLine.append(((this instanceof CNSControllerServlet) ? (" cnscqs_ms=" + CMBControllerServlet.valueAccumulator.getCounter(AccumulatorName.CNSCQSTime)) : ""));
logLine.append(((this instanceof CQSControllerServlet) ? (" redis_ms=" + valueAccumulator.getCounter(AccumulatorName.RedisTime)) : ""));
logLine.append(" io_ms=" + valueAccumulator.getCounter(AccumulatorName.IOTime));
logLine.append(" asyncq_ms=" + valueAccumulator.getCounter(AccumulatorName.AsyncQueueTime));
logLine.append(" auth_ms=" + valueAccumulator.getCounter(AccumulatorName.CMBControllerPreHandleAction));
}
} else if (request.getAttribute("lp").equals("yy")) { // long poll receive with messages

logLine.append(" resp_ms=").append(responseTimeMS);
Expand Down Expand Up @@ -766,7 +767,7 @@ public void onTimeout(AsyncEvent asyncEvent) throws IOException {
}

} else {
logger.error("event=on_timeout");
logger.error("event=on_timeout "+getLogLine(asyncContext, request, authModule.getUserByRequest(request), 0, "timeout"));
}
asyncContext.complete();
}
Expand Down
54 changes: 44 additions & 10 deletions src/com/comcast/cqs/controller/CQSLongPollSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import com.comcast.cqs.model.CQSAPIStats;
import com.comcast.cqs.persistence.RedisCachedCassandraPersistence;
import com.comcast.cqs.util.Util;

import org.apache.log4j.Logger;
import org.jboss.netty.bootstrap.ClientBootstrap;
Expand Down Expand Up @@ -265,20 +267,23 @@ public LongPollSenderThread() {
}

public void run() {

String queueArn = null;
String queueMessageNumberString = null;
int messageSendCount = 1;
int separatorIndex = -1;

while (true) {

String queueArn = null;


// blocking wait for next pending notification

try {
queueArn = pendingNotifications.take();
queueMessageNumberString = pendingNotifications.take();
} catch (InterruptedException ex) {
logger.warn("event=taking_pending_notifcation_from_queue_failed");
}

if (queueArn == null) {
if (queueMessageNumberString == null) {

try {
Thread.sleep(1000);
Expand All @@ -287,18 +292,37 @@ public void run() {
}

continue;
} else {
//queueArn example: cmb:cqs:ccp:390328612038:test, this means a send with 1 message
if (Util.isValidQueueArn(queueMessageNumberString)){
queueArn = queueMessageNumberString;
messageSendCount = 1;
} else { //send with multiple message
separatorIndex = queueMessageNumberString.lastIndexOf(":");
queueArn = queueMessageNumberString.substring(0, separatorIndex);
messageSendCount = Integer.parseInt(queueMessageNumberString.substring(separatorIndex+1));
}
}

// don't go through tcp stack for loopback

int messageCount = CQSLongPollReceiver.processNotification(queueArn, "localhost");
logger.debug("event=longpoll_notification_sent endpoint=localhost queue_arn=" + queueArn + " num_msg_found=" + messageCount);
int messageReceiveCount = CQSLongPollReceiver.processNotification(queueArn, "localhost");
logger.debug("event=longpoll_notification_sent endpoint=localhost queue_arn=" + queueArn + " num_msg_found=" + messageReceiveCount);

if (messageCount > 0) {
continue;
// if messageSendCound is already been received by local or empty queue, finish
try {
if (messageReceiveCount >= messageSendCount
|| RedisCachedCassandraPersistence
.getInstance()
.getRedisQueueMessageCount(
Util.getRelativeQueueUrlForArn(queueArn)) == 0) {
continue;
}
} catch (Exception ex) {
logger.error("event=error_check_queue_depth", ex);
}

// if no messages found locally, send notification on all other established channels to remote cqs api servers
// send notification on all other established channels to remote cqs api servers

for (String endpoint : activeCQSApiServers.keySet()) {

Expand Down Expand Up @@ -350,4 +374,14 @@ public static void send(String queueArn) {

pendingNotifications.add(queueArn);
}

public static void send(String queueArn, int messageNum) {

if (!initialized) {
return;
}
//for batch sending, add message number in the pendingNotification for optimization.
//as AWS, queue name only allow alphanumeric characters plus hyphens (-) and underscores (_).
pendingNotifications.add(queueArn+":"+messageNum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public boolean doAction(User user, AsyncContext asyncContext) throws Exception {
Map<String, String> result = PersistenceFactory.getCQSMessagePersistence().sendMessageBatch(queue, shard, msgList);

try {
CQSLongPollSender.send(queue.getArn());
CQSLongPollSender.send(queue.getArn(), msgList.size());
} catch (Exception ex) {
logger.warn("event=failed_to_send_longpoll_notification", ex);
}
Expand Down

0 comments on commit e18dd31

Please sign in to comment.