Skip to content

Commit

Permalink
Refactor the context and entry to support asynchronous invocation chain
Browse files Browse the repository at this point in the history
Signed-off-by: Eric Zhao <[email protected]>
  • Loading branch information
sczyh30 committed Sep 12, 2018
1 parent ed30f85 commit d798794
Show file tree
Hide file tree
Showing 7 changed files with 366 additions and 74 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel;

import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;

/**
* The entry for asynchronous resources.
*
* @author Eric Zhao
* @since 0.2.0
*/
public class AsyncEntry extends CtEntry {

private Context asyncContext;

AsyncEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) {
super(resourceWrapper, chain, context);
}

/**
* Remove current entry from local context, but does not exit.
*/
void cleanCurrentEntryInLocal() {
Context originalContext = context;
if (originalContext != null) {
Entry curEntry = originalContext.getCurEntry();
if (curEntry == this) {
Entry parent = this.parent;
originalContext.setCurEntry(parent);
if (parent != null) {
((CtEntry)parent).child = null;
}
} else {
throw new IllegalStateException("Bad async context state");
}
}
}

public Context getAsyncContext() {
return asyncContext;
}

/**
* The async context should not be initialized until the node for current resource has been set to current entry.
*/
void initAsyncContext() {
if (asyncContext == null) {
this.asyncContext = Context.newAsyncContext(context.getEntranceNode(), context.getName())
.setOrigin(context.getOrigin())
.setCurEntry(this);
} else {
throw new IllegalStateException("Duplicate initialize of async context");
}
}

@Override
protected void clearEntryContext() {
super.clearEntryContext();
this.asyncContext = null;
}

@Override
protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException {
exitForContext(asyncContext, count, args);

return parent;
}
}
103 changes: 103 additions & 0 deletions sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtEntry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel;

import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.node.Node;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;

/**
* Linked entry within current context.
*
* @author jialiang.linjl
* @author Eric Zhao
*/
class CtEntry extends Entry {

protected Entry parent = null;
protected Entry child = null;

protected ProcessorSlot<Object> chain;
protected Context context;

CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) {
super(resourceWrapper);
this.chain = chain;
this.context = context;

setUpEntryFor(context);
}

private void setUpEntryFor(Context context) {
this.parent = context.getCurEntry();
if (parent != null) {
((CtEntry)parent).child = this;
}
context.setCurEntry(this);
}

@Override
public void exit(int count, Object... args) throws ErrorEntryFreeException {
trueExit(count, args);
}

protected void exitForContext(Context context, int count, Object... args) throws ErrorEntryFreeException {
if (context != null) {
if (context.getCurEntry() != this) {
// Clean previous call stack.
CtEntry e = (CtEntry)context.getCurEntry();
while (e != null) {
e.exit(count, args);
e = (CtEntry)e.parent;
}
throw new ErrorEntryFreeException("The order of entry free is can't be paired with the order of entry");
} else {
if (chain != null) {
chain.exit(context, resourceWrapper, count, args);
}
// Restore the call stack.
context.setCurEntry(parent);
if (parent != null) {
((CtEntry)parent).child = null;
}
if (parent == null) {
// Auto-created entry indicates immediate exit.
ContextUtil.exit();
}
// Clean the reference of context in current entry to avoid duplicate exit.
clearEntryContext();
}
}
}

protected void clearEntryContext() {
this.context = null;
}

@Override
protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException {
exitForContext(context, count, args);

return parent;
}

@Override
public Node getLastNode() {
return parent == null ? null : parent.getCurNode();
}
}
109 changes: 46 additions & 63 deletions sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtSph.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.context.NullContext;
import com.alibaba.csp.sentinel.node.Node;
import com.alibaba.csp.sentinel.slotchain.MethodResourceWrapper;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotChain;
Expand Down Expand Up @@ -53,6 +52,46 @@ public class CtSph implements Sph {

private static final Object LOCK = new Object();

private AsyncEntry asyncEntryInternal(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// Init the entry only. No rule checking will occur.
return new AsyncEntry(resourceWrapper, null, context);
}
if (context == null) {
context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
}

// Global switch is turned off, so no rule checking will be done.
if (!Constants.ON) {
return new AsyncEntry(resourceWrapper, null, context);
}

ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

// Means processor cache size exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, so no rule checking will be done.
if (chain == null) {
return new AsyncEntry(resourceWrapper, null, context);
}

AsyncEntry asyncEntry = new AsyncEntry(resourceWrapper, chain, context);
try {
chain.entry(context, resourceWrapper, null, count, args);
// Initiate the async context.
asyncEntry.initAsyncContext();
} catch (BlockException e1) {
asyncEntry.exit(count, args);
throw e1;
} catch (Throwable e1) {
RecordLog.info("Sentinel unexpected exception", e1);
} finally {
// The asynchronous call may take time in background, and current context should not be hanged on it.
// So we need to remove current async entry from current context.
asyncEntry.cleanCurrentEntryInLocal();
}
return asyncEntry;
}

/**
* Do all {@link Rule}s checking about the resource.
*
Expand Down Expand Up @@ -145,68 +184,6 @@ private ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper)
return chain;
}

private static class CtEntry extends Entry {

protected Entry parent = null;
protected Entry child = null;
private ProcessorSlot<Object> chain;
private Context context;

CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) {
super(resourceWrapper);
this.chain = chain;
this.context = context;
parent = context.getCurEntry();
if (parent != null) {
((CtEntry)parent).child = this;
}
context.setCurEntry(this);
}

@Override
public void exit(int count, Object... args) throws ErrorEntryFreeException {
trueExit(count, args);
}

@Override
protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException {
if (context != null) {
if (context.getCurEntry() != this) {
// Clean previous call stack.
CtEntry e = (CtEntry)context.getCurEntry();
while (e != null) {
e.exit(count, args);
e = (CtEntry)e.parent;
}
throw new ErrorEntryFreeException(
"The order of entry free is can't be paired with the order of entry");
} else {
if (chain != null) {
chain.exit(context, resourceWrapper, count, args);
}
// Modify the call stack.
context.setCurEntry(parent);
if (parent != null) {
((CtEntry)parent).child = null;
}
if (parent == null) {
// Auto-created entry indicates immediate exit.
ContextUtil.exit();
}
// Clean the reference of context in current entry to avoid duplicate exit.
context = null;
}
}
return parent;

}

@Override
public Node getLastNode() {
return parent == null ? null : parent.getCurNode();
}
}

/**
* This class is used for skip context name checking.
*/
Expand Down Expand Up @@ -275,4 +252,10 @@ public Entry entry(String name, EntryType type, int count, Object... args) throw
StringResourceWrapper resource = new StringResourceWrapper(name, type);
return entry(resource, count, args);
}

@Override
public AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException {
StringResourceWrapper resource = new StringResourceWrapper(name, type);
return asyncEntryInternal(resource, count, args);
}
}
19 changes: 16 additions & 3 deletions sentinel-core/src/main/java/com/alibaba/csp/sentinel/Sph.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* @author qinan.qn
* @author jialiang.linjl
* @author leyou
* @author Eric Zhao
*/
public interface Sph {

Expand Down Expand Up @@ -135,11 +136,23 @@ public interface Sph {
* @param type the resource is an inbound or an outbound method. This is used
* to mark whether it can be blocked when the system is unstable
* @param count the count that the resource requires
* @param args the parameters of the method. It can also be counted by setting
* hot parameter rule
* @return entry get.
* @param args the parameters of the method. It can also be counted by setting hot parameter rule
* @return entry get
* @throws BlockException if the block criteria is met
*/
Entry entry(String name, EntryType type, int count, Object... args) throws BlockException;

/**
* Create a protected asynchronous resource.
*
* @param name the unique name for the protected resource
* @param type the resource is an inbound or an outbound method. This is used
* to mark whether it can be blocked when the system is unstable
* @param count the count that the resource requires
* @param args the parameters of the method. It can also be counted by setting hot parameter rule
* @return created asynchronous entry
* @throws BlockException if the block criteria is met
* @since 0.2.0
*/
AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException;
}
Loading

0 comments on commit d798794

Please sign in to comment.