Skip to content

Commit

Permalink
Merge branch 'redissortedset'
Browse files Browse the repository at this point in the history
  • Loading branch information
toyangxia committed May 1, 2014
2 parents ba9b4f1 + 79dcb1a commit 3f1d087
Show file tree
Hide file tree
Showing 30 changed files with 3,056 additions and 103 deletions.
2 changes: 1 addition & 1 deletion config/cmb.properties
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ cmb.cns.keyspace=CNS

cmb.redis.serverList=localhost:6379

cmb.redis.connectionsMaxActive=100
cmb.redis.connectionsMaxTotal=100
cmb.redis.fillerThreads=16
cmb.redis.revisibleThreads=16
cmb.redis.expireTTLSec=1209600
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.1.0</version>
<version>2.4.2</version>
</dependency>

<dependency>
Expand Down
4 changes: 2 additions & 2 deletions src/com/comcast/cmb/common/controller/HealthCheckShallow.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.comcast.cmb.common.persistence.DurablePersistenceFactory;
import com.comcast.cmb.common.util.CMBProperties;
import com.comcast.cqs.controller.CQSAction;
import com.comcast.cqs.persistence.RedisCachedCassandraPersistence;
import com.comcast.cqs.persistence.RedisSortedSetPersistence;

/**
* Provide a basic health-check URL for load-balancers to hit to monitor whether service is up and version
Expand Down Expand Up @@ -62,7 +62,7 @@ public boolean doAction(User user, AsyncContext asyncContext) throws Exception {
if (CMBProperties.getInstance().getCQSServiceEnabled()) {
try {

if (RedisCachedCassandraPersistence.isAlive()) {
if (RedisSortedSetPersistence.isAlive()) {
sb.append("\t<Redis>OK</Redis>\n");
} else {
sb.append("\t<Redis>All shards down.</Redis>\n");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2012 Comcast Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.comcast.cmb.common.persistence;

import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2012 Comcast Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.comcast.cmb.common.persistence;

import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2012 Comcast Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.comcast.cmb.common.persistence;

public class DurablePersistenceFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.comcast.cqs.persistence.CQSQueueCassandraPersistence;
import com.comcast.cqs.persistence.ICQSMessagePersistence;
import com.comcast.cqs.persistence.ICQSQueuePersistence;
import com.comcast.cqs.persistence.RedisCachedCassandraPersistence;
import com.comcast.cqs.persistence.RedisSortedSetPersistence;


/**
Expand All @@ -41,7 +41,7 @@ public class PersistenceFactory {
public static ICNSTopicPersistence cnsTopicPersistence = new CNSTopicCassandraPersistence();
public static IUserPersistence userPersistence = new UserCassandraPersistence();
public static ICNSAttributesPersistence cnsAttributePersistence = new CNSAttributesCassandraPersistence();
public static ICQSMessagePersistence cqsMessagePersistence = RedisCachedCassandraPersistence.getInstance();
public static ICQSMessagePersistence cqsMessagePersistence = RedisSortedSetPersistence.getInstance();

public static IUserPersistence getUserPersistence() {
return userPersistence;
Expand Down Expand Up @@ -77,7 +77,7 @@ public static synchronized void reset() {
cnsTopicPersistence = new CNSTopicCassandraPersistence();
userPersistence = new UserCassandraPersistence();
cnsAttributePersistence = new CNSAttributesCassandraPersistence();
cqsMessagePersistence = RedisCachedCassandraPersistence.getInstance();
cqsMessagePersistence = RedisSortedSetPersistence.getInstance();
}
}

8 changes: 4 additions & 4 deletions src/com/comcast/cmb/common/util/CMBProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public enum IO_MODE {

private final int rollingWindowTimeSec;

private final int redisConnectionsMaxActive;
private final int redisConnectionsMaxTotal;
private final String redisServerList;
private final int redisFillerThreads;
private final int redisRevisibleThreads;
Expand Down Expand Up @@ -355,7 +355,7 @@ private CMBProperties() {

rollingWindowTimeSec = Integer.parseInt(props.getProperty("cmb.rollingWindowSizeSec", "600"));

redisConnectionsMaxActive = Integer.parseInt(props.getProperty("cmb.redis.connectionsMaxActive", "100"));
redisConnectionsMaxTotal = Integer.parseInt(props.getProperty("cmb.redis.connectionsMaxTotal", "100"));
redisServerList = props.getProperty("cmb.redis.serverList");
redisFillerThreads = Integer.parseInt(props.getProperty("cmb.redis.fillerThreads", "5"));
redisRevisibleThreads = Integer.parseInt(props.getProperty("cmb.redis.revisibleThreads", "3"));
Expand Down Expand Up @@ -745,8 +745,8 @@ public String getRedisServerList() {
return redisServerList;
}

public int getRedisConnectionsMaxActive() {
return redisConnectionsMaxActive;
public int getRedisConnectionsMaxTotal() {
return redisConnectionsMaxTotal;
}

public int getRollingWindowTimeSec() {
Expand Down
15 changes: 15 additions & 0 deletions src/com/comcast/cns/util/CNSWorkerStatWrapper.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2012 Comcast Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.comcast.cns.util;

import java.lang.management.ManagementFactory;
Expand Down
3 changes: 1 addition & 2 deletions src/com/comcast/cqs/api/CQSAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.comcast.cqs.controller.CQSLongPollSender;
import com.comcast.cqs.model.CQSMessage;
import com.comcast.cqs.model.CQSQueue;
import com.comcast.cqs.persistence.RedisCachedCassandraPersistence;
import com.comcast.cqs.util.CQSConstants;
import com.comcast.cqs.util.CQSErrorCodes;

Expand Down Expand Up @@ -258,7 +257,7 @@ public static CQSQueue createQueue(String userId, String queueName, Integer visi

PersistenceFactory.getQueuePersistence().createQueue(queue);
for (int shard = 0; shard < numberOfShards; shard++) {
RedisCachedCassandraPersistence.getInstance().checkCacheConsistency(queue.getRelativeUrl(), shard, false);
PersistenceFactory.getCQSMessagePersistence().checkCacheConsistency(queue.getRelativeUrl(), shard, false);
}

long ts2 = System.currentTimeMillis();
Expand Down
5 changes: 2 additions & 3 deletions src/com/comcast/cqs/controller/CQSControllerServlet.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import com.comcast.cmb.common.util.ValueAccumulator.AccumulatorName;
import com.comcast.cqs.model.CQSQueue;
import com.comcast.cqs.persistence.ICQSMessagePersistence;
import com.comcast.cqs.persistence.RedisCachedCassandraPersistence;
import com.comcast.cqs.persistence.RedisSortedSetPersistence;
/**
* The main controller for CQS
* @author baosen, bwolf, vvenkatraman, aseem
Expand Down Expand Up @@ -267,8 +267,7 @@ public void destroy() {

logger.info("event=servlet_destroy");

RedisCachedCassandraPersistence.executor.shutdown();
RedisCachedCassandraPersistence.revisibilityExecutor.shutdown();
RedisSortedSetPersistence.shutdown();
CQSLongPollSender.shutdown();
CQSLongPollReceiver.shutdown();

Expand Down
3 changes: 1 addition & 2 deletions src/com/comcast/cqs/controller/CQSCreateQueueAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.comcast.cmb.common.util.CMBProperties;
import com.comcast.cqs.io.CQSQueuePopulator;
import com.comcast.cqs.model.CQSQueue;
import com.comcast.cqs.persistence.RedisCachedCassandraPersistence;
import com.comcast.cqs.util.CQSConstants;
import com.comcast.cqs.util.CQSErrorCodes;

Expand Down Expand Up @@ -268,7 +267,7 @@ public boolean doAction(User user, AsyncContext asyncContext) throws Exception {
PersistenceFactory.getQueuePersistence().createQueue(newQueue);

for (int shard=0; shard<numberOfShards; shard++) {
RedisCachedCassandraPersistence.getInstance().checkCacheConsistency(newQueue.getRelativeUrl(), shard, false);
PersistenceFactory.getCQSMessagePersistence().checkCacheConsistency(newQueue.getRelativeUrl(), shard, false);
}

String out = CQSQueuePopulator.getCreateQueueResponse(newQueue);
Expand Down
4 changes: 2 additions & 2 deletions src/com/comcast/cqs/controller/CQSGetAPIStatsAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import com.comcast.cmb.common.persistence.DurablePersistenceFactory;
import com.comcast.cqs.io.CQSAPIStatsPopulator;
import com.comcast.cqs.model.CQSAPIStats;
import com.comcast.cqs.persistence.RedisCachedCassandraPersistence;
import com.comcast.cqs.persistence.RedisSortedSetPersistence;
import com.comcast.cqs.util.CQSAPIStatWrapper;
/**
* Subscribe action
Expand Down Expand Up @@ -183,7 +183,7 @@ public boolean doAction(User user, AsyncContext asyncContext) throws Exception {
}

try {
if (!RedisCachedCassandraPersistence.isAlive()) {
if (!RedisSortedSetPersistence.isAlive()) {
stats.addStatus("REDIS UNAVAILABLE");
}
} catch (Exception ex) {
Expand Down
8 changes: 4 additions & 4 deletions src/com/comcast/cqs/controller/CQSLongPollSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

import com.comcast.cqs.model.CQSAPIStats;
import com.comcast.cqs.persistence.RedisCachedCassandraPersistence;
import com.comcast.cqs.util.Util;

import org.apache.log4j.Logger;
Expand All @@ -53,6 +53,7 @@
import com.comcast.cmb.common.persistence.AbstractDurablePersistence.CMB_SERIALIZER;
import com.comcast.cmb.common.persistence.AbstractDurablePersistence.CmbRow;
import com.comcast.cmb.common.persistence.DurablePersistenceFactory;
import com.comcast.cmb.common.persistence.PersistenceFactory;
import com.comcast.cmb.common.util.CMBProperties;

public class CQSLongPollSender {
Expand Down Expand Up @@ -312,9 +313,8 @@ public void run() {
// if messageSendCound is already been received by local or empty queue, finish
try {
if (messageReceiveCount >= messageSendCount
|| RedisCachedCassandraPersistence
.getInstance()
.getRedisQueueMessageCount(
|| PersistenceFactory.getCQSMessagePersistence()
.getQueueMessageCount(
Util.getRelativeQueueUrlForArn(queueArn)) == 0) {
continue;
}
Expand Down
4 changes: 2 additions & 2 deletions src/com/comcast/cqs/controller/CQSManageServiceAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import com.comcast.cns.io.CNSPopulator;
import com.comcast.cns.util.CNSErrorCodes;
import com.comcast.cqs.io.CQSPopulator;
import com.comcast.cqs.persistence.RedisCachedCassandraPersistence;
import com.comcast.cqs.persistence.RedisSortedSetPersistence;
import com.comcast.cqs.util.CQSErrorCodes;

/**
Expand Down Expand Up @@ -68,7 +68,7 @@ public boolean doAction(User user, AsyncContext asyncContext) throws Exception {

if (task.equals("ClearCache")) {

RedisCachedCassandraPersistence.flushAll();
RedisSortedSetPersistence.flushAll();
String out = CQSPopulator.getResponseMetadata();
writeResponse(out, response);
return true;
Expand Down
18 changes: 10 additions & 8 deletions src/com/comcast/cqs/controller/CQSMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@

import com.comcast.cmb.common.controller.CMB;
import com.comcast.cmb.common.controller.CMBControllerServlet;
import com.comcast.cmb.common.persistence.PersistenceFactory;
import com.comcast.cmb.common.util.CMBProperties;
import com.comcast.cmb.common.util.PersistenceException;
import com.comcast.cmb.common.util.RollingWindowCapture;
import com.comcast.cmb.common.util.RollingWindowCapture.PayLoad;
import com.comcast.cqs.persistence.RedisCachedCassandraPersistence;
import com.comcast.cqs.persistence.ICQSMessagePersistence;
import com.comcast.cqs.persistence.RedisSortedSetPersistence;

/**
* Implement the monitoring for CQS
Expand Down Expand Up @@ -175,7 +177,7 @@ public void addNumberOfMessagesReceived(String queueUrl, int numMessages) {

@Override
public int getNumberOpenRedisConnections() {
return RedisCachedCassandraPersistence.getInstance().getNumRedisConnections();
return PersistenceFactory.getCQSMessagePersistence().getNumConnections();
}

/**
Expand Down Expand Up @@ -259,7 +261,7 @@ public void registerEmptyResp(String queueUrl, int num) {
@Override
public Long getOldestAvailableMessageTS(String queueUrl) {

RedisCachedCassandraPersistence redisP = RedisCachedCassandraPersistence.getInstance();
RedisSortedSetPersistence redisP = RedisSortedSetPersistence.getInstance();
List<String> ids;
try {

Expand All @@ -269,7 +271,7 @@ public Long getOldestAvailableMessageTS(String queueUrl) {
return null;
}

return RedisCachedCassandraPersistence.getMemQueueMessageCreatedTS(ids.get(0));
return RedisSortedSetPersistence.getMemQueueMessageCreatedTS(ids.get(0));

} catch (PersistenceException ex) {
logger.error("event=failed_to_get_oldest_queue_message_timestamp queue_url=" + queueUrl, ex);
Expand Down Expand Up @@ -368,17 +370,17 @@ public long getNumberOfLongPollReceivesForQueue(String queueArn) {

@Override
public int getNumberOfRedisShards() {
return RedisCachedCassandraPersistence.getNumberOfRedisShards();
return RedisSortedSetPersistence.getNumberOfRedisShards();
}

@Override
public List<Map<String, String>> getRedisShardInfos() {
return RedisCachedCassandraPersistence.getInfo();
return RedisSortedSetPersistence.getInfo();
}

@Override
public void flushRedis() {
RedisCachedCassandraPersistence.flushAll();
RedisSortedSetPersistence.flushAll();
}

@Override
Expand Down Expand Up @@ -445,7 +447,7 @@ public boolean isJettyCNSRequestHandlerPoolLowOnThreads() {
public int getQueueDepth(String queueUrl){
int numberOfMessages = 0;
try {
numberOfMessages = (int) RedisCachedCassandraPersistence.getInstance().getRedisQueueMessageCount(queueUrl);
numberOfMessages = (int) PersistenceFactory.getCQSMessagePersistence().getCacheQueueMessageCount(queueUrl);
} catch (Exception ex) {
logger.error("event=failed_to_get_redis_number_of_messages queue_url=" + queueUrl);
}
Expand Down
Loading

0 comments on commit 3f1d087

Please sign in to comment.