Skip to content

Commit ce9b6c1

Browse files
authored
CORDA-311-post PR merged fixes (corda#2106)
* SSH server integration
1 parent 502d0df commit ce9b6c1

File tree

13 files changed

+102
-227
lines changed

13 files changed

+102
-227
lines changed

core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt

+12
Original file line numberDiff line numberDiff line change
@@ -348,13 +348,25 @@ abstract class FlowLogic<out T> {
348348
}
349349
}
350350

351+
/**
352+
* Returns a pair of the current progress step index (as integer) in steps tree of current [progressTracker], and an observable
353+
* of its upcoming changes.
354+
*
355+
* @return Returns null if this flow has no progress tracker.
356+
*/
351357
fun trackStepsTreeIndex(): DataFeed<Int, Int>? {
352358
// TODO this is not threadsafe, needs an atomic get-step-and-subscribe
353359
return progressTracker?.let {
354360
DataFeed(it.stepsTreeIndex, it.stepsTreeIndexChanges)
355361
}
356362
}
357363

364+
/**
365+
* Returns a pair of the current steps tree of current [progressTracker] as pairs of zero-based depth and stringified step
366+
* label and observable of upcoming changes to the structure.
367+
*
368+
* @return Returns null if this flow has no progress tracker.
369+
*/
358370
fun trackStepsTree(): DataFeed<List<Pair<Int,String>>, List<Pair<Int,String>>>? {
359371
// TODO this is not threadsafe, needs an atomic get-step-and-subscribe
360372
return progressTracker?.let {

core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt

+17-5
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,17 @@ import rx.Observable
88

99
/**
1010
* [FlowHandle] is a serialisable handle for the started flow, parameterised by the type of the flow's return value.
11-
*
12-
* @property id The started state machine's ID.
13-
* @property returnValue A [CordaFuture] of the flow's return value.
1411
*/
1512
@DoNotImplement
1613
interface FlowHandle<A> : AutoCloseable {
14+
/**
15+
* The started state machine's ID.
16+
*/
1717
val id: StateMachineRunId
18+
19+
/**
20+
* A [CordaFuture] of the flow's return value.
21+
*/
1822
val returnValue: CordaFuture<A>
1923

2024
/**
@@ -25,15 +29,23 @@ interface FlowHandle<A> : AutoCloseable {
2529

2630
/**
2731
* [FlowProgressHandle] is a serialisable handle for the started flow, parameterised by the type of the flow's return value.
28-
*
29-
* @property progress The stream of progress tracker events.
3032
*/
3133
interface FlowProgressHandle<A> : FlowHandle<A> {
34+
/**
35+
* The stream of progress tracker events.
36+
*/
3237
val progress: Observable<String>
3338

39+
/**
40+
* [DataFeed] of current step in the steps tree, see [ProgressTracker]
41+
*/
3442
val stepsTreeIndexFeed: DataFeed<Int, Int>?
3543

44+
/**
45+
* [DataFeed] of current steps tree, see [ProgressTracker]
46+
*/
3647
val stepsTreeFeed: DataFeed<List<Pair<Int, String>>, List<Pair<Int, String>>>?
48+
3749
/**
3850
* Use this function for flows whose returnValue and progress are not going to be used or tracked, so as to free up
3951
* server resources.

core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt

+11
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ class ProgressTracker(vararg steps: Step) {
9999
field = value
100100
}
101101

102+
/** The zero-bases index of the current step in a [allStepsLabels] list */
102103
var stepsTreeIndex: Int = -1
103104
private set(value) {
104105
field = value
@@ -226,6 +227,10 @@ class ProgressTracker(vararg steps: Step) {
226227
*/
227228
val allSteps: List<Pair<Int, Step>> get() = _allStepsCache
228229

230+
/**
231+
* A list of all steps label in this ProgressTracker and the children, with the indent level provided starting at zero.
232+
* Note that UNSTARTED is never counted, and DONE is only counted at the calling level.
233+
*/
229234
val allStepsLabels: List<Pair<Int, String>> get() = _allStepsLabels()
230235

231236
private var curChangeSubscription: Subscription? = null
@@ -245,8 +250,14 @@ class ProgressTracker(vararg steps: Step) {
245250
*/
246251
val changes: Observable<Change> get() = _changes
247252

253+
/**
254+
* An observable stream of changes to the [allStepsLabels]
255+
*/
248256
val stepsTreeChanges: Observable<List<Pair<Int,String>>> get() = _stepsTreeChanges
249257

258+
/**
259+
* An observable stream of changes to the [stepsTreeIndex]
260+
*/
250261
val stepsTreeIndexChanges: Observable<Int> get() = _stepsTreeIndexChanges
251262

252263
/** Returns true if the progress tracker has ended, either by reaching the [DONE] step or prematurely with an error */

core/src/test/kotlin/net/corda/core/utilities/ProgressTrackerTest.kt

+10-10
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ class ProgressTrackerTest {
9898

9999
val allSteps = pt.allSteps
100100

101-
//capture notifications
101+
// Capture notifications.
102102
val stepsIndexNotifications = LinkedList<Int>()
103103
pt.stepsTreeIndexChanges.subscribe {
104104
stepsIndexNotifications += it
@@ -113,7 +113,7 @@ class ProgressTrackerTest {
113113
assertEquals(step, allSteps[pt.stepsTreeIndex].second)
114114
}
115115

116-
//travel tree
116+
// Travel tree.
117117
pt.currentStep = SimpleSteps.ONE
118118
assertCurrentStepsTree(0, SimpleSteps.ONE)
119119

@@ -126,7 +126,7 @@ class ProgressTrackerTest {
126126
pt.currentStep = SimpleSteps.THREE
127127
assertCurrentStepsTree(5, SimpleSteps.THREE)
128128

129-
//assert no structure changes and proper steps propagation
129+
// Assert no structure changes and proper steps propagation.
130130
assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(0, 1, 3, 5))
131131
assertThat(stepsTreeNotification).isEmpty()
132132
}
@@ -135,13 +135,13 @@ class ProgressTrackerTest {
135135
fun `structure changes are pushed down when progress trackers are added`() {
136136
pt.setChildProgressTracker(SimpleSteps.TWO, pt2)
137137

138-
//capture notifications
138+
// Capture notifications.
139139
val stepsIndexNotifications = LinkedList<Int>()
140140
pt.stepsTreeIndexChanges.subscribe {
141141
stepsIndexNotifications += it
142142
}
143143

144-
//put current state as a first change for simplicity when asserting
144+
// Put current state as a first change for simplicity when asserting.
145145
val stepsTreeNotification = mutableListOf(pt.allStepsLabels)
146146
println(pt.allStepsLabels)
147147
pt.stepsTreeChanges.subscribe {
@@ -164,7 +164,7 @@ class ProgressTrackerTest {
164164

165165
assertCurrentStepsTree(9, SimpleSteps.FOUR)
166166

167-
//assert no structure changes and proper steps propagation
167+
// Assert no structure changes and proper steps propagation.
168168
assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(1, 6, 9))
169169
assertThat(stepsTreeNotification).hasSize(2) // 1 change + 1 our initial state
170170
}
@@ -173,13 +173,13 @@ class ProgressTrackerTest {
173173
fun `structure changes are pushed down when progress trackers are removed`() {
174174
pt.setChildProgressTracker(SimpleSteps.TWO, pt2)
175175

176-
//capture notifications
176+
// Capture notifications.
177177
val stepsIndexNotifications = LinkedList<Int>()
178178
pt.stepsTreeIndexChanges.subscribe {
179179
stepsIndexNotifications += it
180180
}
181181

182-
//put current state as a first change for simplicity when asserting
182+
// Put current state as a first change for simplicity when asserting.
183183
val stepsTreeNotification = mutableListOf(pt.allStepsLabels)
184184
pt.stepsTreeChanges.subscribe {
185185
stepsTreeNotification += it
@@ -199,9 +199,9 @@ class ProgressTrackerTest {
199199

200200
assertCurrentStepsTree(2, BabySteps.UNOS)
201201

202-
//assert no structure changes and proper steps propagation
202+
// Assert no structure changes and proper steps propagation.
203203
assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(1, 4, 2))
204-
assertThat(stepsTreeNotification).hasSize(2) // 1 change + 1 our initial state
204+
assertThat(stepsTreeNotification).hasSize(2) // 1 change + 1 our initial state.
205205
}
206206

207207
@Test

docs/source/shell.rst

+4-4
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ Shell can also be accessible via SSH. By default SSH server is *disabled*. To en
3535
3636
Authentication and authorization
3737
--------------------------------
38-
SSH require user to login first - using the same users as RPC system. In fact, shell serves as a proxy to RPC and communicates
39-
with node using RPC calls. This also means that RPC permissions are enforced. No permissions are required to allow the connection
40-
and login in.
38+
SSH requires users to login first - using the same users as RPC system. In fact, the shell serves as a proxy to RPC and communicates
39+
with the node using RPC calls. This also means that RPC permissions are enforced. No permissions are required to allow the connection
40+
and log in.
4141
Watching flows (``flow watch``) requires ``InvokeRpc.stateMachinesFeed`` while starting flows requires
4242
``InvokeRpc.startTrackedFlowDynamic`` and ``InvokeRpc.registeredFlows`` in addition to a permission for a particular flow.
4343

@@ -51,7 +51,7 @@ errors.
5151
Connecting
5252
----------
5353

54-
Linux and MacOS computers usually come with SSH client preinstalled. On Windows it usually require extra download.
54+
Linux and MacOS computers usually come with SSH client preinstalled. On Windows it usually requires extra download.
5555
Usual connection syntax is ``ssh user@host -p 2222`` - where ``user`` is a RPC username, and ``-p`` specifies a port parameters -
5656
it's the same as setup in ``node.conf`` file. ``host`` should point to a node hostname, usually ``localhost`` if connecting and
5757
running node on the same computer. Password will be asked after establishing connection.

node/src/integration-test/kotlin/net/corda/node/SSHServerTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,8 @@ class SSHServerTest {
139139

140140
val response = String(Streams.readAll(channel.inputStream))
141141

142-
//There are ANSI control characters involved, so we want to avoid direct byte to byte matching
143-
assertThat(response.lines()).filteredOn( { it.contains("") && it.contains("Done")}).hasSize(1)
142+
// There are ANSI control characters involved, so we want to avoid direct byte to byte matching.
143+
assertThat(response.lines()).filteredOn( { it.contains("Done")}).hasSize(1)
144144
}
145145
}
146146

node/src/main/java/net/corda/node/shell/FlowShellCommand.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,14 @@
44

55
import net.corda.core.messaging.CordaRPCOps;
66
import net.corda.node.utilities.ANSIProgressRenderer;
7-
import net.corda.node.utilities.CRaSHNSIProgressRenderer;
7+
import net.corda.node.utilities.CRaSHANSIProgressRenderer;
88
import org.crsh.cli.*;
99
import org.crsh.command.*;
1010
import org.crsh.text.*;
1111
import org.crsh.text.ui.TableElement;
1212

1313
import java.util.*;
1414

15-
import static net.corda.node.services.messaging.RPCServerKt.CURRENT_RPC_CONTEXT;
1615
import static net.corda.node.shell.InteractiveShell.*;
1716

1817
@Man(
@@ -49,7 +48,7 @@ static void startFlow(@Usage("The class name of the flow to run, or an unambiguo
4948
return;
5049
}
5150
String inp = input == null ? "" : String.join(" ", input).trim();
52-
runFlowByNameFragment(name, inp, out, rpcOps, ansiProgressRenderer != null ? ansiProgressRenderer : new CRaSHNSIProgressRenderer(out) );
51+
runFlowByNameFragment(name, inp, out, rpcOps, ansiProgressRenderer != null ? ansiProgressRenderer : new CRaSHANSIProgressRenderer(out) );
5352
}
5453

5554
@Command

node/src/main/java/net/corda/node/shell/RunShellCommand.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public Object main(
3030
return null;
3131
}
3232

33-
return InteractiveShell.runRPCFromString(command, out, context);
33+
return InteractiveShell.runRPCFromString(command, out, context, ops());
3434
}
3535

3636
private void emitHelp(InvocationContext<Map> context, StringToMethodCallParser<CordaRPCOps> parser) {

node/src/main/java/net/corda/node/shell/StartShellCommand.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
// A simple forwarder to the "flow start" command, for easier typing.
44

55
import net.corda.node.utilities.ANSIProgressRenderer;
6-
import net.corda.node.utilities.CRaSHNSIProgressRenderer;
6+
import net.corda.node.utilities.CRaSHANSIProgressRenderer;
77
import org.crsh.cli.*;
88

99
import java.util.*;
@@ -14,6 +14,6 @@ public class StartShellCommand extends InteractiveShellCommand {
1414
public void main(@Usage("The class name of the flow to run, or an unambiguous substring") @Argument String name,
1515
@Usage("The data to pass as input") @Argument(unquote = false) List<String> input) {
1616
ANSIProgressRenderer ansiProgressRenderer = ansiProgressRenderer();
17-
FlowShellCommand.startFlow(name, input, out, ops(), ansiProgressRenderer != null ? ansiProgressRenderer : new CRaSHNSIProgressRenderer(out));
17+
FlowShellCommand.startFlow(name, input, out, ops(), ansiProgressRenderer != null ? ansiProgressRenderer : new CRaSHANSIProgressRenderer(out));
1818
}
1919
}

node/src/main/kotlin/net/corda/node/shell/CordaAuthenticationPlugin.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class CordaAuthenticationPlugin(val rpcOps:CordaRPCOps, val userService:RPCUserS
2525

2626
if (user != null && user.password == credential) {
2727
val actor = Actor(Actor.Id(username), userService.id, nodeLegalName)
28-
return CordaSSHAuthInfo(true, RPCOpsWithContext(rpcOps, InvocationContext.rpc(actor), RpcPermissions(user.permissions)))
28+
return CordaSSHAuthInfo(true, makeRPCOpsWithContext(rpcOps, InvocationContext.rpc(actor), RpcPermissions(user.permissions)))
2929
}
3030

3131
return AuthInfo.UNSUCCESSFUL;

node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt

+10-9
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,10 @@ object InteractiveShell {
101101
this.nodeLegalName = configuration.myLegalName
102102
this.database = database
103103
val dir = configuration.baseDirectory
104-
val runSshDeamon = configuration.sshd != null
104+
val runSshDaemon = configuration.sshd != null
105105

106106
val config = Properties()
107-
if (runSshDeamon) {
107+
if (runSshDaemon) {
108108
val sshKeysDir = dir / "sshkey"
109109
sshKeysDir.toFile().mkdirs()
110110

@@ -120,7 +120,7 @@ object InteractiveShell {
120120
ExternalResolver.INSTANCE.addCommand("start", "An alias for 'flow start'", StartShellCommand::class.java)
121121
shell = ShellLifecycle(dir).start(config)
122122

123-
if (runSshDeamon) {
123+
if (runSshDaemon) {
124124
Node.printBasicNodeInfo("SSH server listening on port", configuration.sshd!!.port.toString())
125125
}
126126
}
@@ -182,7 +182,7 @@ object InteractiveShell {
182182
context.refresh()
183183
this.config = config
184184
start(context)
185-
return context.getPlugin(ShellFactory::class.java).create(null, CordaSSHAuthInfo(false, RPCOpsWithContext(rpcOps, net.corda.core.context.InvocationContext.shell(), RpcPermissions.ALL), StdoutANSIProgressRenderer))
185+
return context.getPlugin(ShellFactory::class.java).create(null, CordaSSHAuthInfo(false, makeRPCOpsWithContext(rpcOps, net.corda.core.context.InvocationContext.shell(), RpcPermissions.ALL), StdoutANSIProgressRenderer))
186186
}
187187
}
188188

@@ -236,7 +236,7 @@ object InteractiveShell {
236236
try {
237237
// Show the progress tracker on the console until the flow completes or is interrupted with a
238238
// Ctrl-C keypress.
239-
val stateObservable = runFlowFromString({ clazz,args -> rpcOps.startTrackedFlowDynamic (clazz, *args) }, inputData, clazz)
239+
val stateObservable = runFlowFromString({ clazz, args -> rpcOps.startTrackedFlowDynamic(clazz, *args) }, inputData, clazz)
240240

241241
val latch = CountDownLatch(1)
242242
ansiProgressRenderer.render(stateObservable, { latch.countDown() })
@@ -247,7 +247,6 @@ object InteractiveShell {
247247
} catch (e: InterruptedException) {
248248
// TODO: When the flow framework allows us to kill flows mid-flight, do so here.
249249
}
250-
251250
} catch (e: NoApplicableConstructor) {
252251
output.println("No matching constructor found:", Color.red)
253252
e.errors.forEach { output.println("- $it", Color.red) }
@@ -326,7 +325,9 @@ object InteractiveShell {
326325
val (stateMachines, stateMachineUpdates) = proxy.stateMachinesFeed()
327326
val currentStateMachines = stateMachines.map { StateMachineUpdate.Added(it) }
328327
val subscriber = FlowWatchPrintingSubscriber(out)
329-
stateMachineUpdates.startWith(currentStateMachines).subscribe(subscriber)
328+
database.transaction {
329+
stateMachineUpdates.startWith(currentStateMachines).subscribe(subscriber)
330+
}
330331
var result: Any? = subscriber.future
331332
if (result is Future<*>) {
332333
if (!result.isDone) {
@@ -348,7 +349,7 @@ object InteractiveShell {
348349
}
349350

350351
@JvmStatic
351-
fun runRPCFromString(input: List<String>, out: RenderPrintWriter, context: InvocationContext<out Any>): Any? {
352+
fun runRPCFromString(input: List<String>, out: RenderPrintWriter, context: InvocationContext<out Any>, cordaRPCOps: CordaRPCOps): Any? {
352353
val parser = StringToMethodCallParser(CordaRPCOps::class.java, context.attributes["mapper"] as ObjectMapper)
353354

354355
val cmd = input.joinToString(" ").trim { it <= ' ' }
@@ -363,7 +364,7 @@ object InteractiveShell {
363364
var result: Any? = null
364365
try {
365366
InputStreamSerializer.invokeContext = context
366-
val call = database.transaction { parser.parse(context.attributes["ops"] as CordaRPCOps, cmd) }
367+
val call = database.transaction { parser.parse(cordaRPCOps, cmd) }
367368
result = call.call()
368369
if (result != null && result !is kotlin.Unit && result !is Void) {
369370
result = printAndFollowRPCResponse(result, out)

0 commit comments

Comments
 (0)