Skip to content

Commit

Permalink
Some clean up of behave code (corda#3907)
Browse files Browse the repository at this point in the history
In particular, fixing the recursive call of the "use" method
  • Loading branch information
shamsasari authored and pokeybot committed Sep 6, 2018
1 parent 8ad540d commit c6400cf
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import net.corda.behave.await
import net.corda.behave.file.currentDirectory
import net.corda.behave.process.output.OutputListener
import net.corda.behave.waitFor
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import rx.Observable
Expand All @@ -20,8 +20,10 @@ open class Command(
private val directory: Path = currentDirectory,
private val timeout: Duration = 2.minutes
): Closeable {

protected val log = loggerFor<Command>()
companion object {
private val WAIT_BEFORE_KILL: Duration = 5.seconds
private val log = contextLogger()
}

private val terminationLatch = CountDownLatch(1)

Expand All @@ -36,21 +38,16 @@ open class Command(
var exitCode = -1
private set

val output: Observable<String> = Observable.create<String>({ emitter ->
val output: Observable<String> = Observable.create<String> { emitter ->
outputListener = object : OutputListener {
override fun onNewLine(line: String) {
emitter.onNext(line)
}

override fun onEndOfStream() {
emitter.onCompleted()
}
override fun onNewLine(line: String) = emitter.onNext(line)
override fun onEndOfStream() = emitter.onCompleted()
}
}).share()
}.share()

private val thread = Thread(Runnable {
try {
log.info("Command: $command")
log.info("Executing command: $command from directory: $directory")
val processBuilder = ProcessBuilder(command)
.directory(directory.toFile())
.redirectErrorStream(true)
Expand Down Expand Up @@ -132,21 +129,22 @@ open class Command(
}

override fun close() {
if (process?.isAlive == true) {
kill()
}
waitFor()
}

fun run() = use { _ -> }

fun use(action: (Command) -> Unit): Int {
fun run(action: (Command) -> Unit = { }): Int {
use {
start()
action(this)
}
return exitCode
}

fun use(subscriber: Subscriber<String>, action: (Command, Observable<String>) -> Unit = { _, _ -> }): Int {
use {
fun run(subscriber: Subscriber<String>, action: (Command, Observable<String>) -> Unit = { _, _ -> }): Int {
run {
output.subscribe(subscriber)
start()
action(this, output)
Expand All @@ -155,11 +153,4 @@ open class Command(
}

override fun toString() = "Command(${command.joinToString(" ")})"

companion object {

private val WAIT_BEFORE_KILL: Duration = 5.seconds

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ abstract class ContainerService(
log.info("Container $id info: $info")

client.startContainer(id)

true
} catch (e: Exception) {
id = null
Expand All @@ -79,9 +78,7 @@ abstract class ContainerService(
override fun checkPrerequisites() {
if (!client.listImages().any { true == it.repoTags()?.contains(imageReference) }) {
log.info("Pulling image $imageReference ...")
client.pull(imageReference, { _ ->
run { }
})
client.pull(imageReference) { }
log.info("Image $imageReference downloaded")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class CommandTests {
@Test
fun `output stream for command can be observed`() {
val subscriber = TestSubscriber<String>()
val exitCode = Command(listOf("ls", "/")).use(subscriber) { _, _ ->
val exitCode = Command(listOf("ls", "/")).run(subscriber) { _, _ ->
subscriber.awaitTerminalEvent()
subscriber.assertCompleted()
subscriber.assertNoErrors()
Expand Down

0 comments on commit c6400cf

Please sign in to comment.