Skip to content

Commit

Permalink
🍻 write codes
Browse files Browse the repository at this point in the history
  • Loading branch information
sanshengshui committed Aug 10, 2019
1 parent e3f1eae commit 5b3becf
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.sanshengshui.http.quota;

/**
* @author james mu
* @date 2019/8/9 下午4:59
*/
public final class Clock {
private static long time = 0L;

private Clock() {

}

public static long millis() {
return time == 0 ? System.currentTimeMillis() : time;
}

public static void setMills(long mills) {
time = mills;
}

public static void shift(long delta) {
time += delta;
}

public static void reset() {
time = 0;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.sanshengshui.http.quota.inmemory;

import com.sanshengshui.http.quota.Clock;

import java.util.concurrent.atomic.LongAdder;

/**
* @author james mu
* @date 19-8-9 下午16:50
*/
public class IntervalCount {

private final LongAdder addr = new LongAdder();
private final long intervalDurationMs;
private volatile long startTime;
private volatile long lastTickTime;

public IntervalCount(long intervalDurationMs) {
this.intervalDurationMs = intervalDurationMs;
startTime = Clock.millis();
}

public long resetIfExpiredAndTick(){
if (isExpired()){
reset();
}
tick();
return addr.sum();
}

public long silenceDuration() {
return Clock.millis() - lastTickTime;
}

public long getCount() {
return addr.sum();
}

private void tick() {
addr.add(1);
lastTickTime = Clock.millis();
}

private void reset() {
addr.reset();
lastTickTime = Clock.millis();
}

private boolean isExpired() {
return (Clock.millis() - startTime) > intervalDurationMs;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.sanshengshui.http.quota.inmemory;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* @author james mu
* @date 2019/8/10 下午5:47
*/
@Slf4j
public abstract class IntervalRegistryCleaner {

private final KeyBasedIntervalRegistry intervalRegistry;
private final long cleanPeriodMs;
private ScheduledExecutorService executor;

public IntervalRegistryCleaner(KeyBasedIntervalRegistry intervalRegistry, long cleanPeriodMs) {
this.intervalRegistry = intervalRegistry;
this.cleanPeriodMs = cleanPeriodMs;
}

public void schedule() {
if (executor != null){
throw new IllegalStateException("Registry Cleaner already scheduled");
}
executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(this::clean, cleanPeriodMs, cleanPeriodMs, TimeUnit.MILLISECONDS);
}

public void stop() {
if (executor != null) {
executor.shutdown();
}
}

public void clean() {
try {
intervalRegistry.clean();
} catch (RuntimeException ex) {
log.error("Could not clear Interval Registry", ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.sanshengshui.http.quota.inmemory;

import com.google.common.collect.MinMaxPriorityQueue;
import lombok.extern.slf4j.Slf4j;

import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* @author james mu
* @date 2019/8/10 下午5:55
*/
@Slf4j
public abstract class IntervalRegistryLogger {

private final int topSize;
private final KeyBasedIntervalRegistry intervalRegistry;
private final long logIntervalMin;
private ScheduledExecutorService executor;

public IntervalRegistryLogger(int topSize, long logIntervalMin, KeyBasedIntervalRegistry intervalRegistry) {
this.topSize = topSize;
this.logIntervalMin = logIntervalMin;
this.intervalRegistry = intervalRegistry;
}

public void schedule() {
if (executor != null) {
throw new IllegalStateException("Registry Cleaner already scheduled");
}
executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(this::logStatistic, logIntervalMin, logIntervalMin, TimeUnit.MINUTES);
}

public void stop() {
if (executor != null) {
executor.shutdown();
}
}

public void logStatistic() {
Map<String, Long> registryContent = intervalRegistry.getContent();
int uniqHosts = registryContent.size();
long requestsCount = registryContent.values().stream().mapToLong(i -> i).sum();
Map<String, Long> top = getTopElements(registryContent);
log(top, uniqHosts, requestsCount);

}

protected Map<String, Long> getTopElements(Map<String, Long> countMap) {
MinMaxPriorityQueue<Map.Entry<String, Long>> topQueue = MinMaxPriorityQueue
.orderedBy(Comparator.comparing((Function<Map.Entry<String, Long>, Long>) Map.Entry::getValue).reversed())
.maximumSize(topSize)
.create(countMap.entrySet());

return topQueue.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

protected abstract void log(Map<String, Long> top, int uniqHosts, long requestsCount);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.sanshengshui.http.quota.inmemory;

import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* @author james mu
* @date 2019/8/10 下午4:50
*/
@Slf4j
public class KeyBasedIntervalRegistry {

private final Map<String, IntervalCount> hostCounts = new ConcurrentHashMap<>();
private final long intervalDurationMs;
private final long ttlMs;
private final Set<String> whiteList;
private final Set<String> blackList;

public KeyBasedIntervalRegistry(long intervalDurationMs, long ttlMs, String whiteList, String blackList, String name) {
this.intervalDurationMs = intervalDurationMs;
this.ttlMs = ttlMs;
this.whiteList = Sets.newHashSet(StringUtils.split(whiteList, ','));
this.blackList = Sets.newHashSet(StringUtils.split(blackList, ','));

}

private void validate(String name) {
if (ttlMs < intervalDurationMs) {
log.warn("TTL for {} IntervalRegistry [{}] smaller than interval duration [{}]", name, ttlMs, intervalDurationMs);
}
log.info("Start {} KeyBasedIntervalRegistry with whitelist {}", name, whiteList);
log.info("Start {} KeyBasedIntervalRegistry with blacklist {}", name, blackList);
}

public long tick(String clientHostId) {
IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs));
long currentCount = intervalCount.resetIfExpiredAndTick();
if (whiteList.contains(clientHostId)) {
return 0;
} else if (blackList.contains(clientHostId)) {
return Long.MAX_VALUE;
}
return currentCount;
}

public void clean() {
hostCounts.entrySet().removeIf(entry -> entry.getValue().silenceDuration() > ttlMs);
}

public Map<String, Long> getContent() {
return hostCounts.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry:: getKey,
interval -> interval.getValue().getCount()
)
);
}
}

0 comments on commit 5b3becf

Please sign in to comment.