Skip to content

Commit

Permalink
optimizing: For a big set, replace sequential iteration addition with…
Browse files Browse the repository at this point in the history
… a parallel stream when calculating results. (apache#3540)

refactor: Renaming variables lockXXX to xxxLock may be better.
  • Loading branch information
XiaoyiPeng authored Dec 2, 2021
1 parent 16cabf0 commit 5d5d5f5
Showing 1 changed file with 29 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ public class StoreStatsService extends ServiceThread {
private volatile long putMessageEntireTimeMax = 0;
private volatile long getMessageEntireTimeMax = 0;
// for putMessageEntireTimeMax
private ReentrantLock lockPut = new ReentrantLock();
private ReentrantLock putLock = new ReentrantLock();
// for getMessageEntireTimeMax
private ReentrantLock lockGet = new ReentrantLock();
private ReentrantLock getLock = new ReentrantLock();

private volatile long dispatchMaxBuffer = 0;

private ReentrantLock lockSampling = new ReentrantLock();
private ReentrantLock samplingLock = new ReentrantLock();
private long lastPrintTimestamp = System.currentTimeMillis();

public StoreStatsService() {
Expand Down Expand Up @@ -138,10 +138,10 @@ else if (value < 10000) {
}

if (value > this.putMessageEntireTimeMax) {
this.lockPut.lock();
this.putLock.lock();
this.putMessageEntireTimeMax =
value > this.putMessageEntireTimeMax ? value : this.putMessageEntireTimeMax;
this.lockPut.unlock();
this.putLock.unlock();
}
}

Expand All @@ -151,10 +151,10 @@ public long getGetMessageEntireTimeMax() {

public void setGetMessageEntireTimeMax(long value) {
if (value > this.getMessageEntireTimeMax) {
this.lockGet.lock();
this.getLock.lock();
this.getMessageEntireTimeMax =
value > this.getMessageEntireTimeMax ? value : this.getMessageEntireTimeMax;
this.lockGet.unlock();
this.getLock.unlock();
}
}

Expand Down Expand Up @@ -193,11 +193,11 @@ public String toString() {
}

public long getPutMessageTimesTotal() {
long rs = 0;
for (LongAdder data : putMessageTopicTimesTotal.values()) {
rs += data.longValue();
}
return rs;
Map<String, LongAdder> map = putMessageTopicTimesTotal;
return map.values()
.parallelStream()
.mapToLong(LongAdder::longValue)
.sum();
}

private String getFormatRuntime() {
Expand All @@ -217,11 +217,11 @@ private String getFormatRuntime() {
}

public long getPutMessageSizeTotal() {
long rs = 0;
for (LongAdder data : putMessageTopicSizeTotal.values()) {
rs += data.longValue();
}
return rs;
Map<String, LongAdder> map = putMessageTopicSizeTotal;
return map.values()
.parallelStream()
.mapToLong(LongAdder::longValue)
.sum();
}

private String getPutMessageDistributeTimeStringInfo(Long total) {
Expand Down Expand Up @@ -315,7 +315,7 @@ private String putMessageDistributeTimeToString() {

private String getPutTps(int time) {
String result = "";
this.lockSampling.lock();
this.samplingLock.lock();
try {
CallSnapshot last = this.putTimesList.getLast();

Expand All @@ -325,14 +325,14 @@ private String getPutTps(int time) {
}

} finally {
this.lockSampling.unlock();
this.samplingLock.unlock();
}
return result;
}

private String getGetFoundTps(int time) {
String result = "";
this.lockSampling.lock();
this.samplingLock.lock();
try {
CallSnapshot last = this.getTimesFoundList.getLast();

Expand All @@ -342,15 +342,15 @@ private String getGetFoundTps(int time) {
result += CallSnapshot.getTPS(lastBefore, last);
}
} finally {
this.lockSampling.unlock();
this.samplingLock.unlock();
}

return result;
}

private String getGetMissTps(int time) {
String result = "";
this.lockSampling.lock();
this.samplingLock.lock();
try {
CallSnapshot last = this.getTimesMissList.getLast();

Expand All @@ -361,14 +361,14 @@ private String getGetMissTps(int time) {
}

} finally {
this.lockSampling.unlock();
this.samplingLock.unlock();
}

return result;
}

private String getGetTotalTps(int time) {
this.lockSampling.lock();
this.samplingLock.lock();
double found = 0;
double miss = 0;
try {
Expand All @@ -392,15 +392,15 @@ private String getGetTotalTps(int time) {
}

} finally {
this.lockSampling.unlock();
this.samplingLock.unlock();
}

return Double.toString(found + miss);
}

private String getGetTransferedTps(int time) {
String result = "";
this.lockSampling.lock();
this.samplingLock.lock();
try {
CallSnapshot last = this.transferedMsgCountList.getLast();

Expand All @@ -411,7 +411,7 @@ private String getGetTransferedTps(int time) {
}

} finally {
this.lockSampling.unlock();
this.samplingLock.unlock();
}

return result;
Expand Down Expand Up @@ -469,7 +469,7 @@ public String getServiceName() {
}

private void sampling() {
this.lockSampling.lock();
this.samplingLock.lock();
try {
this.putTimesList.add(new CallSnapshot(System.currentTimeMillis(), getPutMessageTimesTotal()));
if (this.putTimesList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
Expand All @@ -495,7 +495,7 @@ private void sampling() {
}

} finally {
this.lockSampling.unlock();
this.samplingLock.unlock();
}
}

Expand Down

0 comments on commit 5d5d5f5

Please sign in to comment.