Skip to content

Commit

Permalink
redefined join cancellation semantics
Browse files Browse the repository at this point in the history
  • Loading branch information
yelouafi committed Apr 18, 2016
1 parent e5c39a1 commit 909895a
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 32 deletions.
38 changes: 28 additions & 10 deletions src/internal/proc.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,30 @@ export default function proc(
of this generator
**/
function cancel({type, origin}) {
if(iterator._isRunning) {
if(iterator._isRunning && !iterator._isCancelled) {
iterator._isCancelled = true
const ex = new SagaCancellationException(type, name, origin)
// 1. cancel the main task if it's still running
if(iterator._isMainRunning) {
next.cancel(ex)
next(ex)
}
// 2. cancel all attached forks
taskQueue.cancelAll(ex)

// 3. cancel all joiners
task.joiners.slice().forEach(j => j.task.cancel(ex))
task.joiners = null
}
}
/**
attaches cancellation logic to this task's continuation
this will permit cancellation to propagate down the call chain
We do not attach cancel to the .done promise. Because in join effects the
sense of cancellation is inversed: cancellation of this task should cause
the cancellation of all joiners
**/
cont && (cont.cancel = cancel)
task.done[CANCEL] = cancel


// tracks the running status
Expand Down Expand Up @@ -177,16 +190,19 @@ export default function proc(

function end(error, result) {
iterator._isRunning = false
stdChannel.close()
if(!error) {
iterator._result = result
deferredEnd.resolve(result)
task.cont && !iterator._isCancelled && task.cont(null, result)
} else {
iterator._error = error
deferredEnd.reject(error)
task.cont && task.cont(error)
}
stdChannel.close()
if(!iterator._isCancelled) {
task.cont && task.cont(error, result)
task.joiners.forEach(j => j.cb(null, result))
task.joiners = null
}
}

function runEffect(effect, parentEffectId, label = '', cb) {
Expand Down Expand Up @@ -234,7 +250,7 @@ export default function proc(
/**
triggers/propagates the cancellation error
**/
cb(cancelError)
//cb(cancelError)
monitor( monitorActions.effectRejected(effectId, cancelError) )
}

Expand Down Expand Up @@ -405,17 +421,19 @@ export default function proc(
// Fork effects are non cancellables
}

function runJoinEffect(task, cb) {
resolvePromise(task.done, cb)
function runJoinEffect(t, cb) {
const joiner = {task, cb}
cb.cancel = () => remove(t.joiners, joiner)
t.joiners.push(joiner)
}

function runCancelEffect(task, cb) {
if(task.isRunning()) {
task.cancel(
new SagaCancellationException(MANUAL_CANCEL, name, name)
)
task.cont && task.cont()
}
task.cont && task.cont()
cb()
// cancel effects are non cancellables
}
Expand Down Expand Up @@ -541,7 +559,7 @@ export default function proc(
name,
done,
cont,

joiners: [],
cancel: error => {
if(!(error instanceof SagaCancellationException)) {
error = new SagaCancellationException(MANUAL_CANCEL, name, error)
Expand Down
115 changes: 93 additions & 22 deletions test/proc/cancellation.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ import { deferred, arrayOfDeffered } from '../../src/utils'

const DELAY = 50

//const cancelPromise = p => p[CANCEL](new SagaCancellationException(MANUAL_CANCEL, 'test'))
const cancelTask = task => task.cancel(new SagaCancellationException(MANUAL_CANCEL, 'test'))

test('processor effect cancellation handling: call effect', assert => {
test('proc cancellation: call effect', assert => {
assert.plan(1)

let actual = []
Expand All @@ -24,7 +23,6 @@ test('processor effect cancellation handling: call effect', assert => {

Promise.resolve(1)
.then(() => startDef.resolve('start'))
//.then(delay(0))
.then(() => cancelDef.resolve('cancel'))
.then(() => subroutineDef.resolve('subroutine'))

Expand Down Expand Up @@ -65,7 +63,7 @@ test('processor effect cancellation handling: call effect', assert => {
}, DELAY)
})

test('processor effect cancellation handling: forked children', assert => {
test('proc cancellation: forked children', assert => {
assert.plan(1)

const actual = []
Expand Down Expand Up @@ -157,8 +155,7 @@ test('processor effect cancellation handling: forked children', assert => {
}, DELAY)
})


test('processor effect cancellation handling: take effect', assert => {
test('proc cancellation: take effect', assert => {
assert.plan(1)

let actual = []
Expand Down Expand Up @@ -200,7 +197,75 @@ test('processor effect cancellation handling: take effect', assert => {
}, DELAY)
})

test('processor effect cancellation handling: join effect', assert => {
test('proc cancellation: join effect (joining from a different task)', assert => {
assert.plan(1)

let actual = []
let cancelDef = deferred()
let subroutineDef = deferred()

Promise.resolve(1)
.then(() => cancelDef.resolve('cancel'))
.then(() => subroutineDef.resolve('subroutine'))

function* main() {
actual.push('start')
let task = yield io.fork(subroutine)
yield io.fork(joiner1, task)
yield io.fork(joiner2, task)

actual.push(yield cancelDef.promise)
yield io.cancel(task)
}

function* subroutine() {
actual.push('subroutine start')
try {
actual.push(yield subroutineDef.promise)
} catch (e) {
if (e instanceof SagaCancellationException)
actual.push(yield 'subroutine cancelled')
}
}

function* joiner1(task) {
actual.push('joiner1 start')
try {
actual.push(yield io.join(task))
} catch (e) {
if (e instanceof SagaCancellationException)
actual.push(yield 'joiner1 cancelled')
}
}

function* joiner2(task) {
actual.push('joiner2 start')
try {
actual.push(yield io.join(task))
} catch (e) {
if (e instanceof SagaCancellationException)
actual.push(yield 'joiner2 cancelled')
}
}

const task = proc(main())
task.done.catch(err => assert.fail(err))

/**
Breaking change in 10.0:
**/
const expected = ['start', 'subroutine start', 'joiner1 start', 'joiner2 start',
'cancel', 'subroutine cancelled', 'joiner1 cancelled', 'joiner2 cancelled']

setTimeout(() => {
assert.deepEqual(actual, expected,
"cancelled task must cancel foreing joiners"
)
assert.end()
}, DELAY)
})

test('proc cancellation: join effect (join from the same task\'s parent)', assert => {
assert.plan(1)

let actual = []
Expand All @@ -210,7 +275,6 @@ test('processor effect cancellation handling: join effect', assert => {

Promise.resolve(1)
.then(() => startDef.resolve('start'))
//.then(delay(0))
.then(() => cancelDef.resolve('cancel'))
.then(() => subroutineDef.resolve('subroutine'))

Expand Down Expand Up @@ -242,18 +306,28 @@ test('processor effect cancellation handling: join effect', assert => {
})
task.done.catch(err => assert.fail(err))

const expected = ['start', 'subroutine start',
'cancel', 'subroutine cancelled', 'cancelled']
/**
Breaking change in 10.0: Since now attached forks are cancelled when their parent is cancelled
cancellation of main will trigger in order: 1. cancel parent (main) 2. then cancel children (subroutine)
Join cancellation has the following semantics: cancellation of a task triggers cancellation of all its
joiners (similar to promise1.then(promise2): promise2 depends on promise1, if promise1 os cancelled,
then so promise2 must be cancelled).
In the present test, main is joining on of its proper children, so this would cause an endless loop, but
since cancellation is noop on an already terminated task the deadlock wont happen
**/
const expected = ['start', 'subroutine start', 'cancel', 'cancelled', 'subroutine cancelled']

setTimeout(() => {
assert.deepEqual(actual, expected,
"cancelled join effect must cancel joined subroutine"
"cancelled routine must cancel proper joiners"
)
assert.end()
}, DELAY)
})

test('processor effect cancellation handling: parallel effect', assert => {
test('proc cancellation: parallel effect', assert => {
assert.plan(1)

let actual = []
Expand All @@ -263,7 +337,6 @@ test('processor effect cancellation handling: parallel effect', assert => {

Promise.resolve(1)
.then(() => startDef.resolve('start'))
//.then(delay(0))
.then(() => subroutineDefs[0].resolve('subroutine 1'))
.then(() => cancelDef.resolve('cancel'))
.then(() => subroutineDefs[1].resolve('subroutine 2'))
Expand Down Expand Up @@ -322,7 +395,7 @@ test('processor effect cancellation handling: parallel effect', assert => {
}, DELAY)
})

test('processor effect cancellation handling: race effect', assert => {
test('proc cancellation: race effect', assert => {
assert.plan(1)

let actual = []
Expand All @@ -332,7 +405,6 @@ test('processor effect cancellation handling: race effect', assert => {

Promise.resolve(1)
.then(() => startDef.resolve('start'))
//.then(delay(0))
.then(() => cancelDef.resolve('cancel'))
.then(() => subroutineDefs[0].resolve('subroutine 1'))
.then(() => subroutineDefs[1].resolve('subroutine 2'))
Expand Down Expand Up @@ -390,7 +462,7 @@ test('processor effect cancellation handling: race effect', assert => {
}, DELAY)
})

test('processor automatic parallel effect cancellation handling', assert => {
test('proc cancellation: automatic parallel effect cancellation', assert => {
assert.plan(1);

let actual = []
Expand All @@ -401,7 +473,6 @@ test('processor automatic parallel effect cancellation handling', assert => {
.then(() => subtask1Defs[0].resolve('subtask_1'))
.then(() => subtask2Defs[0].resolve('subtask_2'))
.then(() => subtask1Defs[1].reject('subtask_1 rejection'))
//.then(delay(0))
.then(() => subtask2Defs[1].resolve('subtask_2_2'))

function* subtask1() {
Expand Down Expand Up @@ -444,7 +515,7 @@ test('processor automatic parallel effect cancellation handling', assert => {

})

test('processor automatic race competitor cancellation handling', assert => {
test('proc cancellation: automatic race competitor cancellation', assert => {
assert.plan(1);

let actual = []
Expand Down Expand Up @@ -514,7 +585,7 @@ test('processor automatic race competitor cancellation handling', assert => {
}, 0)
})

test('processor manual task cancellation handling', assert => {
test('proc cancellation: manual task cancellation', assert => {
assert.plan(1);

let actual = [];
Expand Down Expand Up @@ -561,7 +632,7 @@ test('processor manual task cancellation handling', assert => {

});

test('processor nested task cancellation handling', assert => {
test('proc cancellation: nested task cancellation', assert => {
assert.plan(1)

let actual = []
Expand Down Expand Up @@ -636,7 +707,7 @@ test('processor nested task cancellation handling', assert => {
}, DELAY)
})

test('processor nested forked task cancellation handling', assert => {
test('proc cancellation: nested forked task cancellation', assert => {
assert.plan(1)

let actual = []
Expand Down Expand Up @@ -692,7 +763,7 @@ test('processor nested forked task cancellation handling', assert => {
setTimeout(() => {

assert.deepEqual(actual, expected,
'processor must cancel forked task but and its forked nested subtask'
'processor must cancel forked task and its forked nested subtask'
)

}, DELAY)
Expand Down

0 comments on commit 909895a

Please sign in to comment.