Skip to content

Commit

Permalink
Removal of transaction contract state as BLOB in VaultStates table. (c…
Browse files Browse the repository at this point in the history
…orda#2034)

* Removal of transaction contract state as BLOB in VaultStates table.
Transaction contract state now resolved using StateLoader (from DBTransactionStorage).

Fixed broken JUnits.

* Changes to address review comments by RP

Address logic error.

* Fixed failing JUnit (CashExitFlowTests.exit zero cash).

* Fix VaultQueryTests to respect transaction visibility boundaries.

* Adopt consistent use of "session" using DatabaseTransactionManager.

* Removed redundant transaction demarcation boundaries in Vault Query tests.
  • Loading branch information
josecoll authored Nov 15, 2017
1 parent b423fea commit 5bdbd24
Show file tree
Hide file tree
Showing 10 changed files with 299 additions and 413 deletions.
10 changes: 10 additions & 0 deletions core/src/main/kotlin/net/corda/core/node/ServiceHub.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ interface StateLoader {
*/
@Throws(TransactionResolutionException::class)
fun loadState(stateRef: StateRef): TransactionState<*>

/**
* Given a [Set] of [StateRef]'s loads the referenced transaction and looks up the specified output [ContractState].
*
* @throws TransactionResolutionException if [stateRef] points to a non-existent transaction.
*/
// TODO: future implementation to use a Vault state ref -> contract state BLOB table and perform single query bulk load
// as the existing transaction store will become encrypted at some point
@Throws(TransactionResolutionException::class)
fun loadStates(stateRefs: Set<StateRef>): Set<StateAndRef<ContractState>>
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,19 @@ abstract class AbstractCashSelection {
stateAndRefs.clear()

var totalPennies = 0L
val stateRefs = mutableSetOf<StateRef>()
while (rs.next()) {
val txHash = SecureHash.parse(rs.getString(1))
val index = rs.getInt(2)
val stateRef = StateRef(txHash, index)
val state = rs.getBlob(3).deserialize<TransactionState<Cash.State>>(context = SerializationDefaults.STORAGE_CONTEXT)
val pennies = rs.getLong(4)
totalPennies = rs.getLong(5)
val rowLockId = rs.getString(6)
stateAndRefs.add(StateAndRef(state, stateRef))
log.trace { "ROW: $rowLockId ($lockId): $stateRef : $pennies ($totalPennies)" }
val pennies = rs.getLong(3)
totalPennies = rs.getLong(4)
val rowLockId = rs.getString(5)
stateRefs.add(StateRef(txHash, index))
log.trace { "ROW: $rowLockId ($lockId): ${StateRef(txHash, index)} : $pennies ($totalPennies)" }
}
if (stateRefs.isNotEmpty())
// TODO: future implementation to retrieve contract states from a Vault BLOB store
stateAndRefs.addAll(services.loadStates(stateRefs) as Collection<StateAndRef<Cash.State>>)

if (stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity) {
// we should have a minimum number of states to satisfy our selection `amount` criteria
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class CashSelectionH2Impl : AbstractCashSelection() {
connection.createStatement().execute("CALL SET(@t, 0);")

val selectJoin = """
SELECT vs.transaction_id, vs.output_index, vs.contract_state, ccs.pennies, SET(@t, ifnull(@t,0)+ccs.pennies) total_pennies, vs.lock_id
SELECT vs.transaction_id, vs.output_index, ccs.pennies, SET(@t, ifnull(@t,0)+ccs.pennies) total_pennies, vs.lock_id
FROM vault_states AS vs, contract_cash_states AS ccs
WHERE vs.transaction_id = ccs.transaction_id AND vs.output_index = ccs.output_index
AND vs.state_status = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ class CashSelectionPostgreSQLImpl : AbstractCashSelection() {
// 3) Currently (version 9.6), FOR UPDATE cannot be specified with window functions
override fun executeQuery(connection: Connection, amount: Amount<Currency>, lockId: UUID, notary: Party?,
onlyFromIssuerParties: Set<AbstractParty>, withIssuerRefs: Set<OpaqueBytes>) : ResultSet {
val selectJoin = """SELECT nested.transaction_id, nested.output_index, nested.contract_state, nested.pennies,
val selectJoin = """SELECT nested.transaction_id, nested.output_index, nested.pennies,
nested.total+nested.pennies as total_pennies, nested.lock_id
FROM
(SELECT vs.transaction_id, vs.output_index, vs.contract_state, ccs.pennies,
(SELECT vs.transaction_id, vs.output_index, ccs.pennies,
coalesce((SUM(ccs.pennies) OVER (PARTITION BY 1 ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING)), 0)
AS total, vs.lock_id
FROM vault_states AS vs, contract_cash_states AS ccs
Expand Down
10 changes: 7 additions & 3 deletions node/src/main/kotlin/net/corda/node/internal/StartedNode.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package net.corda.node.internal

import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionResolutionException
import net.corda.core.contracts.TransactionState
import net.corda.core.contracts.*
import net.corda.core.flows.FlowLogic
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo
Expand Down Expand Up @@ -37,4 +35,10 @@ class StateLoaderImpl(private val validatedTransactions: TransactionStorage) : S
val stx = validatedTransactions.getTransaction(stateRef.txhash) ?: throw TransactionResolutionException(stateRef.txhash)
return stx.resolveBaseTransaction(this).outputs[stateRef.index]
}

@Throws(TransactionResolutionException::class)
// TODO: future implementation to retrieve contract states from a Vault BLOB store
override fun loadStates(stateRefs: Set<StateRef>): Set<StateAndRef<ContractState>> {
return (stateRefs.map { StateAndRef(loadState(it), it) }).toSet()
}
}
184 changes: 77 additions & 107 deletions node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,10 @@ import net.corda.core.internal.*
import net.corda.core.messaging.DataFeed
import net.corda.core.node.StateLoader
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.KeyManagementService
import net.corda.core.node.services.StatesNotAvailableException
import net.corda.core.node.services.Vault
import net.corda.core.node.services.VaultQueryException
import net.corda.core.node.services.*
import net.corda.core.node.services.vault.*
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.SerializationDefaults.STORAGE_CONTEXT
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.transactions.CoreTransaction
import net.corda.core.transactions.NotaryChangeWireTransaction
import net.corda.core.transactions.WireTransaction
Expand Down Expand Up @@ -89,7 +83,6 @@ class NodeVaultService(
val state = VaultSchemaV1.VaultStates(
notary = stateAndRef.value.state.notary,
contractStateClassName = stateAndRef.value.state.data.javaClass.name,
contractState = stateAndRef.value.state.serialize(context = STORAGE_CONTEXT).bytes,
stateStatus = Vault.StateStatus.UNCONSUMED,
recordedTime = clock.instant())
state.stateRef = PersistentStateRef(stateAndRef.key)
Expand Down Expand Up @@ -169,7 +162,7 @@ class NodeVaultService(
return Vault.NoUpdate
}

return Vault.Update(consumedStates, ourNewStates.toHashSet())
return Vault.Update(consumedStates.toSet(), ourNewStates.toSet())
}

val netDelta = txns.fold(Vault.NoUpdate) { netDelta, txn -> netDelta + makeUpdate(txn) }
Expand Down Expand Up @@ -207,28 +200,10 @@ class NodeVaultService(
processAndNotify(netDelta)
}

// TODO: replace this method in favour of a VaultQuery query
private fun loadStates(refs: Collection<StateRef>): HashSet<StateAndRef<ContractState>> {
val states = HashSet<StateAndRef<ContractState>>()
if (refs.isNotEmpty()) {
val session = currentDBSession()
val criteriaBuilder = session.criteriaBuilder
val criteriaQuery = criteriaBuilder.createQuery(VaultSchemaV1.VaultStates::class.java)
val vaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
val statusPredicate = criteriaBuilder.equal(vaultStates.get<Vault.StateStatus>(VaultSchemaV1.VaultStates::stateStatus.name), Vault.StateStatus.UNCONSUMED)
val persistentStateRefs = refs.map(::PersistentStateRef)
val compositeKey = vaultStates.get<PersistentStateRef>(VaultSchemaV1.VaultStates::stateRef.name)
val stateRefsPredicate = criteriaBuilder.and(compositeKey.`in`(persistentStateRefs))
criteriaQuery.where(statusPredicate, stateRefsPredicate)
val results = session.createQuery(criteriaQuery).resultList
results.asSequence().forEach {
val txHash = SecureHash.parse(it.stateRef?.txId!!)
val index = it.stateRef?.index!!
val state = it.contractState.deserialize<TransactionState<ContractState>>(context = STORAGE_CONTEXT)
states.add(StateAndRef(state, StateRef(txHash, index)))
}
}
return states
private fun loadStates(refs: Collection<StateRef>): Collection<StateAndRef<ContractState>> {
return if (refs.isNotEmpty())
queryBy<ContractState>(QueryCriteria.VaultQueryCriteria(stateRefs = refs.toList())).states
else emptySet()
}

private fun processAndNotify(update: Vault.Update<ContractState>) {
Expand Down Expand Up @@ -428,70 +403,70 @@ class NodeVaultService(

val session = getSession()

session.use {
val criteriaQuery = criteriaBuilder.createQuery(Tuple::class.java)
val queryRootVaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
val criteriaQuery = criteriaBuilder.createQuery(Tuple::class.java)
val queryRootVaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)

// TODO: revisit (use single instance of parser for all queries)
val criteriaParser = HibernateQueryCriteriaParser(contractStateType, contractStateTypeMappings, criteriaBuilder, criteriaQuery, queryRootVaultStates)
// TODO: revisit (use single instance of parser for all queries)
val criteriaParser = HibernateQueryCriteriaParser(contractStateType, contractStateTypeMappings, criteriaBuilder, criteriaQuery, queryRootVaultStates)

try {
// parse criteria and build where predicates
criteriaParser.parse(criteria, sorting)
try {
// parse criteria and build where predicates
criteriaParser.parse(criteria, sorting)

// prepare query for execution
val query = session.createQuery(criteriaQuery)
// prepare query for execution
val query = session.createQuery(criteriaQuery)

// pagination checks
if (!paging.isDefault) {
// pagination
if (paging.pageNumber < DEFAULT_PAGE_NUM) throw VaultQueryException("Page specification: invalid page number ${paging.pageNumber} [page numbers start from $DEFAULT_PAGE_NUM]")
if (paging.pageSize < 1) throw VaultQueryException("Page specification: invalid page size ${paging.pageSize} [must be a value between 1 and $MAX_PAGE_SIZE]")
}
// pagination checks
if (!paging.isDefault) {
// pagination
if (paging.pageNumber < DEFAULT_PAGE_NUM) throw VaultQueryException("Page specification: invalid page number ${paging.pageNumber} [page numbers start from $DEFAULT_PAGE_NUM]")
if (paging.pageSize < 1) throw VaultQueryException("Page specification: invalid page size ${paging.pageSize} [must be a value between 1 and $MAX_PAGE_SIZE]")
}

query.firstResult = (paging.pageNumber - 1) * paging.pageSize
query.maxResults = paging.pageSize + 1 // detection too many results

query.firstResult = (paging.pageNumber - 1) * paging.pageSize
query.maxResults = paging.pageSize + 1 // detection too many results

// execution
val results = query.resultList

// final pagination check (fail-fast on too many results when no pagination specified)
if (paging.isDefault && results.size > DEFAULT_PAGE_SIZE)
throw VaultQueryException("Please specify a `PageSpecification` as there are more results [${results.size}] than the default page size [$DEFAULT_PAGE_SIZE]")

val statesAndRefs: MutableList<StateAndRef<T>> = mutableListOf()
val statesMeta: MutableList<Vault.StateMetadata> = mutableListOf()
val otherResults: MutableList<Any> = mutableListOf()

results.asSequence()
.forEachIndexed { index, result ->
if (result[0] is VaultSchemaV1.VaultStates) {
if (!paging.isDefault && index == paging.pageSize) // skip last result if paged
return@forEachIndexed
val vaultState = result[0] as VaultSchemaV1.VaultStates
val stateRef = StateRef(SecureHash.parse(vaultState.stateRef!!.txId!!), vaultState.stateRef!!.index!!)
val state = vaultState.contractState.deserialize<TransactionState<T>>(context = STORAGE_CONTEXT)
statesMeta.add(Vault.StateMetadata(stateRef,
vaultState.contractStateClassName,
vaultState.recordedTime,
vaultState.consumedTime,
vaultState.stateStatus,
vaultState.notary,
vaultState.lockId,
vaultState.lockUpdateTime))
statesAndRefs.add(StateAndRef(state, stateRef))
} else {
// TODO: improve typing of returned other results
log.debug { "OtherResults: ${Arrays.toString(result.toArray())}" }
otherResults.addAll(result.toArray().asList())
}
// execution
val results = query.resultList

// final pagination check (fail-fast on too many results when no pagination specified)
if (paging.isDefault && results.size > DEFAULT_PAGE_SIZE)
throw VaultQueryException("Please specify a `PageSpecification` as there are more results [${results.size}] than the default page size [$DEFAULT_PAGE_SIZE]")

val statesAndRefs: MutableList<StateAndRef<T>> = mutableListOf()
val statesMeta: MutableList<Vault.StateMetadata> = mutableListOf()
val otherResults: MutableList<Any> = mutableListOf()
val stateRefs = mutableSetOf<StateRef>()

results.asSequence()
.forEachIndexed { index, result ->
if (result[0] is VaultSchemaV1.VaultStates) {
if (!paging.isDefault && index == paging.pageSize) // skip last result if paged
return@forEachIndexed
val vaultState = result[0] as VaultSchemaV1.VaultStates
val stateRef = StateRef(SecureHash.parse(vaultState.stateRef!!.txId!!), vaultState.stateRef!!.index!!)
stateRefs.add(stateRef)
statesMeta.add(Vault.StateMetadata(stateRef,
vaultState.contractStateClassName,
vaultState.recordedTime,
vaultState.consumedTime,
vaultState.stateStatus,
vaultState.notary,
vaultState.lockId,
vaultState.lockUpdateTime))
} else {
// TODO: improve typing of returned other results
log.debug { "OtherResults: ${Arrays.toString(result.toArray())}" }
otherResults.addAll(result.toArray().asList())
}
}
if (stateRefs.isNotEmpty())
statesAndRefs.addAll(stateLoader.loadStates(stateRefs) as Collection<StateAndRef<T>>)

return Vault.Page(states = statesAndRefs, statesMetadata = statesMeta, stateTypes = criteriaParser.stateTypes, totalStatesAvailable = totalStates, otherResults = otherResults)
} catch (e: java.lang.Exception) {
log.error(e.message)
throw e.cause ?: e
}
return Vault.Page(states = statesAndRefs, statesMetadata = statesMeta, stateTypes = criteriaParser.stateTypes, totalStatesAvailable = totalStates, otherResults = otherResults)
} catch (e: java.lang.Exception) {
log.error(e.message)
throw e.cause ?: e
}
}

Expand All @@ -504,11 +479,7 @@ class NodeVaultService(
}
}

private fun getSession(): Session {
return sessionFactory.withOptions().
connection(DatabaseTransactionManager.current().connection).
openSession()
}
private fun getSession() = DatabaseTransactionManager.currentOrNew().session

/**
* Derive list from existing vault states and then incrementally update using vault observables
Expand All @@ -518,22 +489,21 @@ class NodeVaultService(
val vaultStates = criteria.from(VaultSchemaV1.VaultStates::class.java)
criteria.select(vaultStates.get("contractStateClassName")).distinct(true)
val session = getSession()
session.use {
val query = session.createQuery(criteria)
val results = query.resultList
val distinctTypes = results.map { it }

val contractInterfaceToConcreteTypes = mutableMapOf<String, MutableSet<String>>()
distinctTypes.forEach { type ->
val concreteType: Class<ContractState> = uncheckedCast(Class.forName(type))
val contractInterfaces = deriveContractInterfaces(concreteType)
contractInterfaces.map {
val contractInterface = contractInterfaceToConcreteTypes.getOrPut(it.name, { mutableSetOf() })
contractInterface.add(concreteType.name)
}

val query = session.createQuery(criteria)
val results = query.resultList
val distinctTypes = results.map { it }

val contractInterfaceToConcreteTypes = mutableMapOf<String, MutableSet<String>>()
distinctTypes.forEach { type ->
val concreteType: Class<ContractState> = uncheckedCast(Class.forName(type))
val contractInterfaces = deriveContractInterfaces(concreteType)
contractInterfaces.map {
val contractInterface = contractInterfaceToConcreteTypes.getOrPut(it.name, { mutableSetOf() })
contractInterface.add(concreteType.name)
}
return contractInterfaceToConcreteTypes
}
return contractInterfaceToConcreteTypes
}

private fun <T : ContractState> deriveContractInterfaces(clazz: Class<T>): Set<Class<T>> {
Expand Down
Loading

0 comments on commit 5bdbd24

Please sign in to comment.