Skip to content

Commit

Permalink
Durable extenstion support part I
Browse files Browse the repository at this point in the history
  • Loading branch information
jruizgit committed Jul 22, 2019
1 parent 901749e commit adba7cb
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 10 deletions.
8 changes: 8 additions & 0 deletions libjs/durableEngine.js
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ exports = module.exports = durableEngine = function () {
r.assertTimers(handle);
};

that.setStoreMessageCallback = function (func) {
r.setStoreMessageCallback(handle, func);
};

var flushActions = function (state, resultContainer, stateOffset, complete) {
while (resultContainer['message']) {
var actionName = null;
Expand Down Expand Up @@ -902,6 +906,10 @@ exports = module.exports = durableEngine = function () {
that.getRuleset(rulesetName).renewActionLease(sid);
};

that.setStoreMessageCallback = function (rulesetName, func) {
that.getRuleset(rulesetName).setStoreMessageCallback(func);
};

var dispatchRules = function (index) {
if (!rulesList.length) {
setTimeout(dispatchRules, 500, index);
Expand Down
35 changes: 30 additions & 5 deletions src/rules/events.c
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,7 @@ static unsigned int handleBetaFrame(ruleset *tree,

static unsigned int handleDeleteMessage(ruleset *tree,
stateNode *state,
char *mid,
unsigned int messageOffset) {
unsigned int result;
unsigned int count = 0;
Expand Down Expand Up @@ -1139,7 +1140,10 @@ static unsigned int handleDeleteMessage(ruleset *tree,
}
}

result = deleteMessage(state, messageOffset);
result = deleteMessage(tree,
state,
mid,
messageOffset);
if (result == ERR_NODE_DELETED) {
return RULES_OK;
} else if (result != RULES_OK) {
Expand Down Expand Up @@ -1702,7 +1706,7 @@ static unsigned int handleMessageCore(ruleset *tree,

sid[sidProperty->valueLength] = '\0';

#ifdef _WIN32
#ifdef _WIN32
char *mid = (char *)_alloca(sizeof(char)*(midProperty->valueLength + 1));
#else
char mid[midProperty->valueLength + 1];
Expand All @@ -1725,11 +1729,13 @@ static unsigned int handleMessageCore(ruleset *tree,
if (sidState->factOffset != UNDEFINED_HASH_OFFSET) {
CHECK_RESULT(handleDeleteMessage(tree,
sidState,
mid,
sidState->factOffset));

}

CHECK_RESULT(storeMessage(sidState,
CHECK_RESULT(storeMessage(tree,
sidState,
mid,
jo,
MESSAGE_TYPE_FACT,
Expand All @@ -1752,10 +1758,12 @@ static unsigned int handleMessageCore(ruleset *tree,
if (*messageOffset != UNDEFINED_HASH_OFFSET) {
CHECK_RESULT(handleDeleteMessage(tree,
sidState,
mid,
*messageOffset));
}
} else {
CHECK_RESULT(storeMessage(sidState,
CHECK_RESULT(storeMessage(tree,
sidState,
mid,
jo,
(actionType == ACTION_ASSERT_FACT ? MESSAGE_TYPE_FACT : MESSAGE_TYPE_EVENT),
Expand Down Expand Up @@ -2054,8 +2062,25 @@ static unsigned int deleteCurrentAction(ruleset *tree,


if (currentMessageNode->messageType == MESSAGE_TYPE_EVENT) {
jsonObject *jo = &currentMessageNode->jo;
jsonProperty *midProperty = &jo->properties[jo->idIndex];

#ifdef _WIN32
char *mid = (char *)_alloca(sizeof(char)*(midProperty->valueLength + 1));
#else
char mid[midProperty->valueLength + 1];
#endif
if (midProperty->valueOffset) {
strncpy(mid, jo->content + midProperty->valueOffset, midProperty->valueLength);
} else {
strncpy(mid, jo->idBuffer, midProperty->valueLength);
}

mid[midProperty->valueLength] = '\0';

CHECK_RESULT(handleDeleteMessage(tree,
state,
state,
mid,
currentMessageFrame->messageNodeOffset));
}
}
Expand Down
20 changes: 20 additions & 0 deletions src/rules/rete.c
Original file line number Diff line number Diff line change
Expand Up @@ -1852,6 +1852,9 @@ unsigned int createRuleset(unsigned int *handle, char *name, char *rules) {
tree->actionCount = 0;
tree->bindingsList = NULL;
tree->currentStateIndex = 0;
tree->storeMessageCallback = NULL;
tree->storeMessageCallbackContext = NULL;
tree->deleteMessageCallback = NULL;
memset(tree->stateIndex, 0, MAX_STATE_INDEX_LENGTH * sizeof(unsigned int) * 2);
memset(tree->reverseStateIndex, 0, MAX_STATE_INDEX_LENGTH * sizeof(unsigned int));
initStatePool(tree);
Expand Down Expand Up @@ -1894,4 +1897,21 @@ unsigned int deleteRuleset(unsigned int handle) {
return RULES_OK;
}

unsigned int setStoreMessageCallback(unsigned int handle,
void *context,
unsigned int (*callback)(void *, char *, char *, char *)) {
ruleset *tree;
RESOLVE_HANDLE(handle, &tree);

tree->storeMessageCallbackContext = context;
tree->storeMessageCallback = callback;
return RULES_OK;
}

unsigned int setDeleteMessageCallback(unsigned int handle, unsigned int (*callback)(char *, char *)) {
ruleset *tree;
RESOLVE_HANDLE(handle, &tree);

tree->deleteMessageCallback = callback;
return RULES_OK;
}
4 changes: 4 additions & 0 deletions src/rules/rete.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ typedef struct ruleset {
unsigned int stateIndex[MAX_STATE_INDEX_LENGTH];
unsigned int reverseStateIndex[MAX_STATE_INDEX_LENGTH];
unsigned int currentStateIndex;

unsigned int (*storeMessageCallback)(void*, char *, char *, char *);
void *storeMessageCallbackContext;
unsigned int (*deleteMessageCallback)(char *, char *);
} ruleset;

#ifdef _PRINT
Expand Down
6 changes: 6 additions & 0 deletions src/rules/rules.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ unsigned int createRuleset(unsigned int *handle,
char *name,
char *rules);

unsigned int setStoreMessageCallback(unsigned int handle,
void *context,
unsigned int (*callback)(void *, char *, char *, char *));

unsigned int setDeleteMessageCallback(unsigned int handle, unsigned int (*callback)(char *, char *));

unsigned int deleteRuleset(unsigned int handle);

unsigned int createClient(unsigned int *handle,
Expand Down
25 changes: 22 additions & 3 deletions src/rules/state.c
Original file line number Diff line number Diff line change
Expand Up @@ -670,10 +670,17 @@ unsigned int createRightFrame(stateNode *state,
return RULES_OK;
}

unsigned int deleteMessage(stateNode *state,
unsigned int deleteMessage(void *tree,
stateNode *state,
char *mid,
unsigned int messageNodeOffset) {

messageNode *node = MESSAGE_NODE(state, messageNodeOffset);
ruleset *rulesetTree = (ruleset*)tree;
if (rulesetTree->deleteMessageCallback) {
return rulesetTree->deleteMessageCallback(state->sid, mid);
}

if (node->jo.content) {
free(node->jo.content);
free(node->locationPool.content);
Expand All @@ -686,6 +693,7 @@ unsigned int deleteMessage(stateNode *state,
MAX_MESSAGE_INDEX_LENGTH,
state->messagePool,
messageNodeOffset);

return RULES_OK;
}

Expand Down Expand Up @@ -724,7 +732,8 @@ static unsigned int copyMessage(jsonObject *targetMessage,
return RULES_OK;
}

unsigned int storeMessage(stateNode *state,
unsigned int storeMessage(void *tree,
stateNode *state,
char *mid,
jsonObject *message,
unsigned char messageType,
Expand Down Expand Up @@ -762,7 +771,17 @@ unsigned int storeMessage(stateNode *state,
memset(node->locationIndex, 0, MAX_LOCATION_INDEX_LENGTH * sizeof(unsigned int) * 2);

node->messageType = messageType;
return copyMessage(&node->jo, message);
CHECK_RESULT(copyMessage(&node->jo, message));

ruleset *rulesetTree = (ruleset*)tree;
if (rulesetTree->storeMessageCallback) {
return rulesetTree->storeMessageCallback(rulesetTree->storeMessageCallbackContext,
state->sid,
mid,
message->content);
}

return RULES_OK;
}

unsigned int ensureStateNode(void *tree,
Expand Down
7 changes: 5 additions & 2 deletions src/rules/state.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,17 @@ unsigned int createActionFrame(stateNode *state,
leftFrameNode **newNode,
frameLocation *newLocation);

unsigned int deleteMessage(stateNode *state,
unsigned int deleteMessage(void *tree,
stateNode *state,
char *mid,
unsigned int messageNodeOffset);

unsigned int getMessage(stateNode *state,
char *mid,
unsigned int *valueOffset);

unsigned int storeMessage(stateNode *state,
unsigned int storeMessage(void *tree,
stateNode *state,
char *mid,
jsonObject *message,
unsigned char messageType,
Expand Down
52 changes: 52 additions & 0 deletions src/rulesjs/rules.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ class ObjectProxy {
};


class CallbackProxy {
public:

explicit CallbackProxy(Handle<Function> value) {
Isolate* isolate = Isolate::GetCurrent();
func.Reset(isolate, value);

}

Persistent<Function> func;
};

void jsCreateRuleset(const FunctionCallbackInfo<Value>& args) {
Isolate* isolate;
isolate = args.GetIsolate();
Expand Down Expand Up @@ -515,6 +527,43 @@ void jsRenewActionLease(const FunctionCallbackInfo<Value>& args) {
}
}

static unsigned int storeMessageCallback(void *context, char *sid, char *mid, char *content) {
CallbackProxy *p = (CallbackProxy *)context;
Isolate* isolate = Isolate::GetCurrent();

Local<Value> args[3];
args[0] = String::NewFromUtf8(isolate, sid);
args[1] = String::NewFromUtf8(isolate, mid);
args[2] = String::NewFromUtf8(isolate, content);

Local<Function> storeMessageFunc = Local<Function>::New(isolate, p->func);
Local<Value> result = storeMessageFunc->Call(isolate->GetCurrentContext()->Global(), 3, args);
return TO_NUMBER(isolate, result);
}

void jsSetStoreMessageCallback(const FunctionCallbackInfo<Value>& args) {
Isolate* isolate;
isolate = args.GetIsolate();
if (args.Length() < 2) {
isolate->ThrowException(Exception::TypeError(String::NewFromUtf8(isolate, "Wrong number of arguments")));
} else if (!args[0]->IsNumber() || !args[1]->IsFunction()) {
isolate->ThrowException(Exception::TypeError(String::NewFromUtf8(isolate, "Wrong argument type")));
} else {
CallbackProxy *p = new CallbackProxy(Handle<Function>::Cast(args[1]));
unsigned int result = setStoreMessageCallback(TO_NUMBER(isolate, args[0]),
p,
&storeMessageCallback);
if (result != RULES_OK) {
char *message = NULL;
if (asprintf(&message, "Could not set storage message callback, error code: %d", result) == -1) {
isolate->ThrowException(Exception::Error(String::NewFromUtf8(isolate, "Out of memory")));
} else {
isolate->ThrowException(Exception::Error(String::NewFromUtf8(isolate, message)));
}
}
}
}

void jsCreateProxy(const FunctionCallbackInfo<Value>& args) {
Isolate* isolate;
isolate = args.GetIsolate();
Expand Down Expand Up @@ -631,6 +680,9 @@ void init(Handle<Object> exports) {

exports->Set(String::NewFromUtf8(isolate, "createProxy", String::kInternalizedString),
FunctionTemplate::New(isolate, jsCreateProxy)->GetFunction());

exports->Set(String::NewFromUtf8(isolate, "setStoreMessageCallback", String::kInternalizedString),
FunctionTemplate::New(isolate, jsSetStoreMessageCallback)->GetFunction());
}

NODE_MODULE(rulesjs, init)

0 comments on commit adba7cb

Please sign in to comment.