Skip to content

Commit

Permalink
CORDA-1393: Make Explorer GUI recover on RPC connection loss. (corda#…
Browse files Browse the repository at this point in the history
…3093)

* CORDA-1393: Install `onError()` handler for folding action
or else `ErrorNotImplementedAction` will be invoked which is never a good thing

* CORDA-1335: Improve exception handling in `cleanUpOnConnectionLoss()`

* CORDA-1335: Try to trick the logic to pretend we are running in HA mode to have a chance of re-connecting.

* CORDA-1416: Make `NodeMonitorModel` code react to proxy changing.

* CORDA-1416: Workaround `CordaRPCOps.equals()` calls when listener dispatching change.

* CORDA-1416: Increase re-try interval to allow enough time for server to come back online.

* CORDA-1355: Properly close RPC connection we are moving away from.

* CORDA-1355: Unsubscribe on Error to prevent propagation of it downstream.

* CORDA-1355: For downstream subscribers ignore errors properly. Thanka to @exFalso for the hint.

This fixes: Transaction Updates do not flow after re-connect

* CORDA-1355: Bugfix eliminate duplicating items on "Transactions" blotter after re-connect.

* CORDA-1355: Bugfix eliminate double counting on dashboards.

* CORDA-1355: Bugfix eliminate same parties in dropdowns.

* CORDA-1355: Stop using `SecureHash.randomSHA256()` for painting widget icon.
Instead use combined SHA hash such that icon represents the whole population of trades.
That way two transactions blotters can be compared by a single glimpse at corresponding icons.

Also minor refactoring.

* CORDA-1416: Make RPC re-connection faster/more robust.

* CORDA-1416: Properly announce thet Proxy may not be available during re-connect and prevent UI crashing.

* CORDA-1416: Disable UI until RPC proxy is available.

* CORDA-1416: Correct typo.

* CORDA-1416: Unit test fix.

* CORDA-1416: GUI cosmetic changes.

* CORDA-1416: Correct spaces.

* CORDA-1416: Remove un-necessary overrides in CordaRPCOpsWrapper.

* CORDA-1416: Switch from using `doOnError` to installing an error handler upon subscription.
  • Loading branch information
vkolomeyko authored May 10, 2018
1 parent 36d1312 commit 15e8705
Show file tree
Hide file tree
Showing 18 changed files with 253 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ class NodeMonitorModelTest {
networkMapUpdates = monitor.networkMap.bufferUntilSubscribed()

monitor.register(aliceNodeHandle.rpcAddress, cashUser.username, cashUser.password)
rpc = monitor.proxyObservable.value!!
rpc = monitor.proxyObservable.value!!.cordaRPCOps
notaryParty = defaultNotaryIdentity

val bobNodeHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(cashUser)).getOrThrow()
bobNode = bobNodeHandle.nodeInfo
val monitorBob = NodeMonitorModel()
stateMachineUpdatesBob = monitorBob.stateMachineUpdates.bufferUntilSubscribed()
monitorBob.register(bobNodeHandle.rpcAddress, cashUser.username, cashUser.password)
rpcBob = monitorBob.proxyObservable.value!!
rpcBob = monitorBob.proxyObservable.value!!.cordaRPCOps
runTest()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package net.corda.client.jfx.model

import javafx.collections.FXCollections
import javafx.collections.ObservableList
import net.corda.client.jfx.utils.distinctBy
import net.corda.client.jfx.utils.fold
import net.corda.client.jfx.utils.map
import net.corda.core.contracts.ContractState
Expand Down Expand Up @@ -31,7 +32,7 @@ class ContractStateModel {
val cashStates: ObservableList<StateAndRef<Cash.State>> = cashStatesDiff.fold(FXCollections.observableArrayList()) { list: MutableList<StateAndRef<Cash.State>>, statesDiff ->
list.removeIf { it in statesDiff.removed }
list.addAll(statesDiff.added)
}
}.distinctBy { it.ref }

val cash = cashStates.map { it.state.data.amount }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,8 @@ import com.github.benmanes.caffeine.cache.Caffeine
import javafx.beans.value.ObservableValue
import javafx.collections.FXCollections
import javafx.collections.ObservableList
import net.corda.client.jfx.utils.ChosenList
import net.corda.client.jfx.utils.filterNotNull
import net.corda.client.jfx.utils.fold
import net.corda.client.jfx.utils.map
import net.corda.client.jfx.utils.*
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.Party
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache.MapChange
import java.security.PublicKey
Expand All @@ -35,13 +31,13 @@ class NetworkIdentityModel {

private val identityCache = Caffeine.newBuilder()
.build<PublicKey, ObservableValue<NodeInfo?>>({ publicKey ->
publicKey?.let { rpcProxy.map { it?.nodeInfoFromParty(AnonymousParty(publicKey)) } }
publicKey.let { rpcProxy.map { it?.cordaRPCOps?.nodeInfoFromParty(AnonymousParty(publicKey)) } }
})
val notaries = ChosenList(rpcProxy.map { FXCollections.observableList(it?.notaryIdentities() ?: emptyList()) })
val notaryNodes: ObservableList<NodeInfo> = notaries.map { rpcProxy.value?.nodeInfoFromParty(it) }.filterNotNull()
val notaries = ChosenList(rpcProxy.map { FXCollections.observableList(it?.cordaRPCOps?.notaryIdentities() ?: emptyList()) }, "notaries")
val notaryNodes: ObservableList<NodeInfo> = notaries.map { rpcProxy.value?.cordaRPCOps?.nodeInfoFromParty(it) }.filterNotNull()
val parties: ObservableList<NodeInfo> = networkIdentities
.filtered { it.legalIdentities.all { it !in notaries } }
val myIdentity = rpcProxy.map { it?.nodeInfo()?.legalIdentitiesAndCerts?.first()?.party }
.filtered { it.legalIdentities.all { it !in notaries } }.unique()
val myIdentity = rpcProxy.map { it?.cordaRPCOps?.nodeInfo()?.legalIdentitiesAndCerts?.first()?.party }

fun partyFromPublicKey(publicKey: PublicKey): ObservableValue<NodeInfo?> = identityCache[publicKey]!!
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package net.corda.client.jfx.model

import com.sun.javafx.application.PlatformImpl
import javafx.application.Platform
import javafx.beans.property.SimpleObjectProperty
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.CordaRPCConnection
import net.corda.core.contracts.ContractState
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.staticField
import net.corda.core.messaging.*
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.node.services.Vault
Expand All @@ -15,9 +19,14 @@ import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.seconds
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import rx.Observable
import rx.Subscription
import rx.subjects.PublishSubject
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference

data class ProgressTrackingEvent(val stateMachineId: StateMachineRunId, val message: String) {
companion object {
Expand All @@ -34,6 +43,7 @@ data class ProgressTrackingEvent(val stateMachineId: StateMachineRunId, val mess
*/
class NodeMonitorModel {

private val retryableStateMachineUpdatesSubject = PublishSubject.create<StateMachineUpdate>()
private val stateMachineUpdatesSubject = PublishSubject.create<StateMachineUpdate>()
private val vaultUpdatesSubject = PublishSubject.create<Vault.Update<ContractState>>()
private val transactionsSubject = PublishSubject.create<SignedTransaction>()
Expand All @@ -48,27 +58,76 @@ class NodeMonitorModel {
val progressTracking: Observable<ProgressTrackingEvent> = progressTrackingSubject
val networkMap: Observable<MapChange> = networkMapSubject

val proxyObservable = SimpleObjectProperty<CordaRPCOps?>()
val proxyObservable = SimpleObjectProperty<CordaRPCOpsWrapper?>()
lateinit var notaryIdentities: List<Party>

companion object {
val logger = contextLogger()

private fun runLaterIfInitialized(op: () -> Unit) {

val initialized = PlatformImpl::class.java.staticField<AtomicBoolean>("initialized")

// Only execute using "runLater()" if JavaFX been initialized.
// It may not be initialized in the unit test.
if(initialized.value.get()) {
Platform.runLater(op)
} else {
op()
}
}
}

/**
* This is needed as JavaFX listener framework attempts to call `equals()` before dispatching notification change.
* And calling `CordaRPCOps.equals()` results in (unhandled) remote call.
*/
class CordaRPCOpsWrapper(val cordaRPCOps: CordaRPCOps)

/**
* Register for updates to/from a given vault.
* TODO provide an unsubscribe mechanism
*/
fun register(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) {
val client = CordaRPCClient(
nodeHostAndPort,
object : CordaRPCClientConfiguration {
override val connectionMaxRetryInterval = 10.seconds
}
)
val connection = client.start(username, password)
val proxy = connection.proxy
notaryIdentities = proxy.notaryIdentities()

val (stateMachines, stateMachineUpdates) = proxy.stateMachinesFeed()
// `retryableStateMachineUpdatesSubject` will change it's upstream subscriber in case of RPC connection failure, this `Observable` should
// never produce an error.
// `stateMachineUpdatesSubject` will stay firmly subscribed to `retryableStateMachineUpdatesSubject`
retryableStateMachineUpdatesSubject.subscribe(stateMachineUpdatesSubject)

// Proxy may change during re-connect, ensure that subject wiring accurately reacts to this activity.
proxyObservable.addListener { _, _, wrapper ->
if(wrapper != null) {
val proxy = wrapper.cordaRPCOps
// Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates
val (statesSnapshot, vaultUpdates) = proxy.vaultTrackBy<ContractState>(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL),
PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE))
val unconsumedStates = statesSnapshot.states.filterIndexed { index, _ ->
statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED
}.toSet()
val consumedStates = statesSnapshot.states.toSet() - unconsumedStates
val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates)
vaultUpdates.startWith(initialVaultUpdate).subscribe({ vaultUpdatesSubject.onNext(it) }, {})

// Transactions
val (transactions, newTransactions) = proxy.internalVerifiedTransactionsFeed()
newTransactions.startWith(transactions).subscribe({ transactionsSubject.onNext(it) }, {})

// SM -> TX mapping
val (smTxMappings, futureSmTxMappings) = proxy.stateMachineRecordedTransactionMappingFeed()
futureSmTxMappings.startWith(smTxMappings).subscribe({ stateMachineTransactionMappingSubject.onNext(it) }, {})

// Parties on network
val (parties, futurePartyUpdate) = proxy.networkMapFeed()
futurePartyUpdate.startWith(parties.map { MapChange.Added(it) }).subscribe({ networkMapSubject.onNext(it) }, {})
}
}

val stateMachines = performRpcReconnect(nodeHostAndPort, username, password)

// Extract the flow tracking stream
// TODO is there a nicer way of doing this? Stream of streams in general results in code like this...
// TODO `progressTrackingSubject` doesn't seem to be used anymore - should it be removed?
val currentProgressTrackerUpdates = stateMachines.mapNotNull { stateMachine ->
ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachine)
}
Expand All @@ -82,33 +141,74 @@ class NodeMonitorModel {

// We need to retry, because when flow errors, we unsubscribe from progressTrackingSubject. So we end up with stream of state machine updates and no progress trackers.
futureProgressTrackerUpdates.startWith(currentProgressTrackerUpdates).flatMap { it }.retry().subscribe(progressTrackingSubject)
}

private fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): List<StateMachineInfo> {

val connection = establishConnectionWithRetry(nodeHostAndPort, username, password)
val proxy = connection.proxy

val (stateMachineInfos, stateMachineUpdatesRaw) = proxy.stateMachinesFeed()

val retryableStateMachineUpdatesSubscription: AtomicReference<Subscription?> = AtomicReference(null)
val subscription: Subscription = stateMachineUpdatesRaw
.startWith(stateMachineInfos.map { StateMachineUpdate.Added(it) })
.subscribe({ retryableStateMachineUpdatesSubject.onNext(it) }, {
// Terminate subscription such that nothing gets past this point to downstream Observables.
retryableStateMachineUpdatesSubscription.get()?.unsubscribe()
// Flag to everyone that proxy is no longer available.
runLaterIfInitialized { proxyObservable.set(null) }
// It is good idea to close connection to properly mark the end of it. During re-connect we will create a new
// client and a new connection, so no going back to this one. Also the server might be down, so we are
// force closing the connection to avoid propagation of notification to the server side.
connection.forceClose()
// Perform re-connect.
performRpcReconnect(nodeHostAndPort, username, password)
})

retryableStateMachineUpdatesSubscription.set(subscription)
runLaterIfInitialized { proxyObservable.set(CordaRPCOpsWrapper(proxy)) }
notaryIdentities = proxy.notaryIdentities()

// Now the state machines
val currentStateMachines = stateMachines.map { StateMachineUpdate.Added(it) }
stateMachineUpdates.startWith(currentStateMachines).subscribe(stateMachineUpdatesSubject)

// Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates
val (statesSnapshot, vaultUpdates) = proxy.vaultTrackBy<ContractState>(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL),
PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE))
val unconsumedStates = statesSnapshot.states.filterIndexed { index, _ ->
statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED
}.toSet()
val consumedStates = statesSnapshot.states.toSet() - unconsumedStates
val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates)
vaultUpdates.startWith(initialVaultUpdate).subscribe(vaultUpdatesSubject)

// Transactions
val (transactions, newTransactions) = proxy.internalVerifiedTransactionsFeed()
newTransactions.startWith(transactions).subscribe(transactionsSubject)

// SM -> TX mapping
val (smTxMappings, futureSmTxMappings) = proxy.stateMachineRecordedTransactionMappingFeed()
futureSmTxMappings.startWith(smTxMappings).subscribe(stateMachineTransactionMappingSubject)

// Parties on network
val (parties, futurePartyUpdate) = proxy.networkMapFeed()
futurePartyUpdate.startWith(parties.map { MapChange.Added(it) }).subscribe(networkMapSubject)

proxyObservable.set(proxy)
return stateMachineInfos
}
}

private fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): CordaRPCConnection {

val retryInterval = 5.seconds

do {
val connection = try {
logger.info("Connecting to: $nodeHostAndPort")
val client = CordaRPCClient(
nodeHostAndPort,
object : CordaRPCClientConfiguration {
override val connectionMaxRetryInterval = retryInterval
}
)
val _connection = client.start(username, password)
// Check connection is truly operational before returning it.
val nodeInfo = _connection.proxy.nodeInfo()
require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty())
_connection
} catch(secEx: ActiveMQSecurityException) {
// Happens when incorrect credentials provided - no point to retry connecting.
throw secEx
}
catch(th: Throwable) {
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
logger.info("Exception upon establishing connection: " + th.message)
null
}

if(connection != null) {
logger.info("Connection successfully established with: $nodeHostAndPort")
return connection
}
// Could not connect this time round - pause before giving another try.
Thread.sleep(retryInterval.toMillis())
} while (connection == null)

throw IllegalArgumentException("Never reaches here")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ data class PartiallyResolvedTransaction(
*/
class TransactionDataModel {
private val transactions by observable(NodeMonitorModel::transactions)
private val collectedTransactions = transactions.recordInSequence()
private val collectedTransactions = transactions.recordInSequence().distinctBy { it.id }
private val vaultUpdates by observable(NodeMonitorModel::vaultUpdates)
private val stateMap = vaultUpdates.fold(FXCollections.observableHashMap<StateRef, StateAndRef<ContractState>>()) { map, update ->
val states = update.consumed + update.produced
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import javafx.collections.ObservableListBase
* The above will create a list that chooses and delegates to the appropriate filtered list based on the type of filter.
*/
class ChosenList<E>(
private val chosenListObservable: ObservableValue<out ObservableList<out E>>
private val chosenListObservable: ObservableValue<out ObservableList<out E>>,
private val logicalName: String? = null
) : ObservableListBase<E>() {

private var currentList = chosenListObservable.value
Expand Down Expand Up @@ -58,4 +59,7 @@ class ChosenList<E>(
endChange()
}

}
override fun toString(): String {
return "ChosenList: $logicalName"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@ import javafx.beans.value.ObservableValue
import javafx.collections.FXCollections
import javafx.collections.ObservableList
import javafx.collections.ObservableMap
import org.slf4j.LoggerFactory
import rx.Observable
import java.util.concurrent.TimeUnit

/**
* Simple utilities for converting an [rx.Observable] into a javafx [ObservableValue]/[ObservableList]
*/

private val logger = LoggerFactory.getLogger("ObservableFold")

private fun onError(th: Throwable) {
logger.debug("OnError when folding", th)
}

/**
* [foldToObservableValue] takes an [rx.Observable] stream and creates an [ObservableValue] out of it.
* @param initial The initial value of the returned observable.
Expand All @@ -23,11 +30,11 @@ import java.util.concurrent.TimeUnit
*/
fun <A, B> Observable<A>.foldToObservableValue(initial: B, folderFun: (A, B) -> B): ObservableValue<B> {
val result = SimpleObjectProperty<B>(initial)
subscribe {
subscribe ({
Platform.runLater {
result.set(folderFun(it, result.get()))
}
}
}, ::onError)
return result
}

Expand All @@ -42,7 +49,7 @@ fun <T, R> Observable<T>.fold(accumulator: R, folderFun: (R, T) -> Unit): R {
* This capture is fine, as [Platform.runLater] runs closures in order.
* The buffer is to avoid flooding FX thread with runnable.
*/
buffer(1, TimeUnit.SECONDS).subscribe {
buffer(1, TimeUnit.SECONDS).subscribe({
if (it.isNotEmpty()) {
Platform.runLater {
it.fold(accumulator) { list, item ->
Expand All @@ -51,7 +58,7 @@ fun <T, R> Observable<T>.fold(accumulator: R, folderFun: (R, T) -> Unit): R {
}
}
}
}
}, ::onError)
return accumulator
}

Expand Down
Loading

0 comments on commit 15e8705

Please sign in to comment.