Skip to content

Commit

Permalink
Update parameter flow rule to adapt to cluster mode and extract rule …
Browse files Browse the repository at this point in the history
…util class

- Update ParamFlowRule to support cluster mode
- Add `ParamFlowClusterConfig` to provide cluster mode items for the rule
- Update ParamFlowChecker to support cluster flow mode
- Extract ParamFlowRuleUtil class
- Change type of `flowId` from Integer to Long

Signed-off-by: Eric Zhao <[email protected]>
  • Loading branch information
sczyh30 committed Dec 14, 2018
1 parent b215e87 commit 1043648
Show file tree
Hide file tree
Showing 10 changed files with 419 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface TokenService {
* @param prioritized whether the request is prioritized
* @return result of the token request
*/
TokenResult requestToken(Integer ruleId, int acquireCount, boolean prioritized);
TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized);

/**
* Request tokens for a specific parameter from remote token server.
Expand All @@ -43,5 +43,5 @@ public interface TokenService {
* @param params parameter list
* @return result of the token request
*/
TokenResult requestParamToken(Integer ruleId, int acquireCount, Collection<Object> params);
TokenResult requestParamToken(Long ruleId, int acquireCount, Collection<Object> params);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class ClusterFlowConfig {
/**
* Global unique ID.
*/
private Integer flowId;
private Long flowId;

/**
* Threshold type (average by local value or global value).
Expand All @@ -41,15 +41,15 @@ public class ClusterFlowConfig {
*/
private int strategy = ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL;

private Integer refFlowId;
private Long refFlowId;
private int refSampleCount = 10;
private double refRatio = 1d;

public Integer getFlowId() {
public Long getFlowId() {
return flowId;
}

public ClusterFlowConfig setFlowId(Integer flowId) {
public ClusterFlowConfig setFlowId(Long flowId) {
this.flowId = flowId;
return this;
}
Expand All @@ -72,11 +72,11 @@ public ClusterFlowConfig setStrategy(int strategy) {
return this;
}

public Integer getRefFlowId() {
public Long getRefFlowId() {
return refFlowId;
}

public ClusterFlowConfig setRefFlowId(Integer refFlowId) {
public ClusterFlowConfig setRefFlowId(Long refFlowId) {
this.refFlowId = refFlowId;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule)
* @param id flow ID to check
* @return true if valid, otherwise false
*/
public static boolean validClusterRuleId(Integer id) {
public static boolean validClusterRuleId(Long id) {
return id != null && id > 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,23 @@
package com.alibaba.csp.sentinel.slots.block.flow.param;

import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;

import com.alibaba.csp.sentinel.cluster.ClusterTokenClient;
import com.alibaba.csp.sentinel.cluster.TokenClientProvider;
import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;

/**
* Rule checker for parameter flow control.
*
* @author Eric Zhao
* @since 0.2.0
*/
Expand All @@ -40,17 +49,72 @@ static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRu
return true;
}

// Get parameter value. If value is null, then pass.
Object value = args[paramIdx];
if (value == null) {
return true;
}

if (rule.isClusterMode()) {
return passClusterCheck(resourceWrapper, rule, count, value);
}

return passLocalCheck(resourceWrapper, rule, count, value);
}

private static ParameterMetric getHotParameters(ResourceWrapper resourceWrapper) {
// Should not be null.
return ParamFlowSlot.getParamMetric(resourceWrapper);
@SuppressWarnings("unchecked")
private static Collection<Object> toCollection(Object value) {
if (value instanceof Collection) {
return (Collection<Object>)value;
} else if (value.getClass().isArray()) {
List<Object> params = new ArrayList<Object>();
int length = Array.getLength(value);
for (int i = 0; i < length; i++) {
Object param = Array.get(value, i);
params.add(param);
}
return params;
} else {
return Collections.singletonList(value);
}
}

private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) {
private static boolean passClusterCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
Object value) {
try {
ClusterTokenClient client = TokenClientProvider.getClient();
if (client == null) {
return true;
}
Collection<Object> params = toCollection(value);

TokenResult result = client.requestParamToken(rule.getClusterConfig().getFlowId(), count, params);
switch (result.getStatus()) {
case TokenResultStatus.OK:
return true;
case TokenResultStatus.BLOCKED:
return false;
default:
return fallbackToLocalOrPass(resourceWrapper, rule, count, params);
}
} catch (Throwable ex) {
RecordLog.warn("[ParamFlowChecker] Request cluster token for parameter unexpected failed", ex);
return fallbackToLocalOrPass(resourceWrapper, rule, count, value);
}
}

private static boolean fallbackToLocalOrPass(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
Object value) {
if (rule.getClusterConfig().isFallbackToLocalWhenFail()) {
return passLocalCheck(resourceWrapper, rule, count, value);
} else {
// The rule won't be activated, just pass.
return true;
}
}

private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
Object value) {
try {
if (Collection.class.isAssignableFrom(value.getClass())) {
for (Object param : ((Collection)value)) {
Expand All @@ -70,7 +134,7 @@ private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlow
return passSingleValueCheck(resourceWrapper, rule, count, value);
}
} catch (Throwable e) {
RecordLog.info("[ParamFlowChecker] Unexpected error", e);
RecordLog.warn("[ParamFlowChecker] Unexpected error", e);
}

return true;
Expand All @@ -96,5 +160,10 @@ static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRu
return true;
}

private static ParameterMetric getHotParameters(ResourceWrapper resourceWrapper) {
// Should not be null.
return ParamFlowSlot.getParamMetric(resourceWrapper);
}

private ParamFlowChecker() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.flow.param;

import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant;

/**
* Parameter flow rule config in cluster mode.
*
* @author Eric Zhao
* @since 1.4.0
*/
public class ParamFlowClusterConfig {

/**
* Global unique ID.
*/
private Long flowId;

/**
* Threshold type (average by local value or global value).
*/
private int thresholdType = ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL;
private boolean fallbackToLocalWhenFail = false;

public Long getFlowId() {
return flowId;
}

public ParamFlowClusterConfig setFlowId(Long flowId) {
this.flowId = flowId;
return this;
}

public int getThresholdType() {
return thresholdType;
}

public ParamFlowClusterConfig setThresholdType(int thresholdType) {
this.thresholdType = thresholdType;
return this;
}

public boolean isFallbackToLocalWhenFail() {
return fallbackToLocalWhenFail;
}

public ParamFlowClusterConfig setFallbackToLocalWhenFail(boolean fallbackToLocalWhenFail) {
this.fallbackToLocalWhenFail = fallbackToLocalWhenFail;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }

ParamFlowClusterConfig that = (ParamFlowClusterConfig)o;

if (thresholdType != that.thresholdType) { return false; }
if (fallbackToLocalWhenFail != that.fallbackToLocalWhenFail) { return false; }
return flowId != null ? flowId.equals(that.flowId) : that.flowId == null;
}

@Override
public int hashCode() {
int result = flowId != null ? flowId.hashCode() : 0;
result = 31 * result + thresholdType;
result = 31 * result + (fallbackToLocalWhenFail ? 1 : 0);
return result;
}

@Override
public String toString() {
return "ParamFlowClusterConfig{" +
"flowId=" + flowId +
", thresholdType=" + thresholdType +
", fallbackToLocalWhenFail=" + fallbackToLocalWhenFail +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public ParamFlowRule(String resourceName) {
*/
private Map<Object, Integer> hotItems = new HashMap<Object, Integer>();

private boolean clusterMode = false;
private ParamFlowClusterConfig clusterConfig;

public int getGrade() {
return grade;
}
Expand Down Expand Up @@ -110,6 +113,25 @@ ParamFlowRule setParsedHotItems(Map<Object, Integer> hotItems) {
return this;
}

public boolean isClusterMode() {
return clusterMode;
}

public ParamFlowRule setClusterMode(boolean clusterMode) {
this.clusterMode = clusterMode;
return this;
}

public ParamFlowClusterConfig getClusterConfig() {
return clusterConfig;
}

public ParamFlowRule setClusterConfig(
ParamFlowClusterConfig clusterConfig) {
this.clusterConfig = clusterConfig;
return this;
}

@Override
@Deprecated
public boolean passCheck(Context context, DefaultNode node, int count, Object... args) {
Expand All @@ -126,8 +148,11 @@ public boolean equals(Object o) {

if (grade != rule.grade) { return false; }
if (Double.compare(rule.count, count) != 0) { return false; }
if (clusterMode != rule.clusterMode) { return false; }
if (paramIdx != null ? !paramIdx.equals(rule.paramIdx) : rule.paramIdx != null) { return false; }
return paramFlowItemList != null ? paramFlowItemList.equals(rule.paramFlowItemList) : rule.paramFlowItemList == null;
if (paramFlowItemList != null ? !paramFlowItemList.equals(rule.paramFlowItemList)
: rule.paramFlowItemList != null) { return false; }
return clusterConfig != null ? clusterConfig.equals(rule.clusterConfig) : rule.clusterConfig == null;
}

@Override
Expand All @@ -139,18 +164,20 @@ public int hashCode() {
temp = Double.doubleToLongBits(count);
result = 31 * result + (int)(temp ^ (temp >>> 32));
result = 31 * result + (paramFlowItemList != null ? paramFlowItemList.hashCode() : 0);
result = 31 * result + (clusterMode ? 1 : 0);
result = 31 * result + (clusterConfig != null ? clusterConfig.hashCode() : 0);
return result;
}

@Override
public String toString() {
return "ParamFlowRule{" +
"resource=" + getResource() +
", limitApp=" + getLimitApp() +
", grade=" + grade +
"grade=" + grade +
", paramIdx=" + paramIdx +
", count=" + count +
", paramFlowItemList=" + paramFlowItemList +
", clusterMode=" + clusterMode +
", clusterConfig=" + clusterConfig +
'}';
}
}
Loading

0 comments on commit 1043648

Please sign in to comment.