Skip to content

Commit

Permalink
ENT-9608: Update following review comments:
Browse files Browse the repository at this point in the history
  Now do a binary search for the highest state ref less than one we have seen.
  Made sync points a single value now.
  • Loading branch information
adelel1 committed Mar 27, 2023
1 parent 3a12582 commit 545df35
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import net.corda.core.node.services.vault.builder
// TODO clean up the module structure of token-sdk, because these function and types (eg PartyAndAmount) should be separate from workflows
// Sorts a query by state ref ascending.
internal fun sortByStateRefAscending(): Sort {
val sortAttribute = SortAttribute.Standard(Sort.CommonStateAttribute.STATE_REF)
return Sort(setOf(Sort.SortColumn(sortAttribute, Sort.Direction.ASC)))
val sortAttributeTxnId = SortAttribute.Standard(Sort.CommonStateAttribute.STATE_REF_TXN_ID)
val sortAttributeIndex = SortAttribute.Standard(Sort.CommonStateAttribute.STATE_REF_INDEX)
return Sort(listOf(Sort.SortColumn(sortAttributeTxnId, Sort.Direction.ASC), Sort.SortColumn(sortAttributeIndex, Sort.Direction.ASC)))
}

// Returns all held token amounts of a specified token with given issuer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.toHexString
import rx.Observable
import java.time.Duration
import java.util.ArrayDeque
Expand Down Expand Up @@ -124,7 +125,7 @@ class VaultWatcherService(private val tokenObserver: TokenObserver,
UPDATER.submit {
try {
var shouldLoop = true
var startResultSetIndex = 0
var startResultSetIndex = 0L
while (shouldLoop) {
val queryResult = queryVaultForStates(appServiceHub, startResultSetIndex)
val newlyLoadedStates = queryResult.first.toSet()
Expand All @@ -142,11 +143,11 @@ class VaultWatcherService(private val tokenObserver: TokenObserver,
}

private val pageSpecifications = ArrayDeque<PageSpecification>()
private val syncPoints = ArrayDeque<SyncPoint>()
private var syncPoint: SyncPoint? = null
val initialSpecification = PageSpecification(DEFAULT_PAGE_NUM, 1999)
val primes = intArrayOf(1009, 1013, 1019, 1021, 1031, 1033, 1039, 1049, 1051, 1061, 1063, 1069, 1087, 1091, 1093, 1097, 1103, 1109, 1117, 1123, 1129, 1151, 1153, 1163, 1171, 1181, 1187, 1193, 1201, 1213, 1217, 1223, 1229, 1231, 1237, 1249, 1259, 1277, 1279, 1283, 1289, 1291, 1297, 1301, 1303, 1307, 1319, 1321, 1327, 1361, 1367, 1373, 1381, 1399, 1409, 1423, 1427, 1429, 1433, 1439, 1447, 1451, 1453, 1459, 1471, 1481, 1483, 1487, 1489, 1493, 1499, 1511, 1523, 1531, 1543, 1549, 1553, 1559, 1567, 1571, 1579, 1583, 1597, 1601, 1607, 1609, 1613, 1619, 1621, 1627, 1637, 1657, 1663, 1667, 1669, 1693, 1697, 1699, 1709, 1721, 1723, 1733, 1741, 1747, 1753, 1759, 1777, 1783, 1787, 1789, 1801, 1811, 1823, 1831, 1847, 1861, 1867, 1871, 1873, 1877, 1879, 1889, 1901, 1907, 1913, 1931, 1933, 1949, 1951, 1973, 1979, 1987, 1993, 1997, 1999)

private fun queryVaultForStates(appServiceHub: AppServiceHub, startResultSetIndex: Int): Triple<List<StateAndRef<FungibleToken>>, Int, Boolean> {
private fun queryVaultForStates(appServiceHub: AppServiceHub, startResultSetIndex: Long): Triple<List<StateAndRef<FungibleToken>>, Long, Boolean> {
var newlyLoadedStatesList: List<StateAndRef<FungibleToken>> = emptyList()
var specification = getOverlappingPageSpecification(startResultSetIndex)
var newStartPos = -1
Expand All @@ -159,15 +160,15 @@ class VaultWatcherService(private val tokenObserver: TokenObserver,
).states
newStartPos = isListInSync(newlyLoadedStatesList, specification)
if (newStartPos == -1) {
specification = getNewPageSpecification()
specification = popPriorPageSpecification()
}
}
val lastResultSetIndex = toResultSetIndex(newlyLoadedStatesList.lastIndex, specification)
val shouldLoop = newlyLoadedStatesList.size == specification.pageSize
return Triple(newlyLoadedStatesList.subList(newStartPos, newlyLoadedStatesList.size), lastResultSetIndex, shouldLoop)
}
private fun getOverlappingPageSpecification(startResultSetIndex: Int): PageSpecification {
val specification = if (startResultSetIndex == 0) {
private fun getOverlappingPageSpecification(startResultSetIndex: Long): PageSpecification {
val specification = if (startResultSetIndex == 0L) {
initialSpecification
}
else {
Expand Down Expand Up @@ -195,53 +196,87 @@ class VaultWatcherService(private val tokenObserver: TokenObserver,
if (states.isEmpty()) {
return 0
}
if (syncPoints.isEmpty()) {
syncPoints.addFirst(SyncPoint(states.last(), specification, toResultSetIndex(states.lastIndex, specification)))
if (syncPoint == null) {
syncPoint = SyncPoint(states.last(), specification, toResultSetIndex(states.lastIndex, specification))
return 0
}
val highResultSetIndex = toResultSetIndex(states.lastIndex, specification)
val lowResultSetIndex = toResultSetIndex(0, specification)

syncPoints.forEach {
val listIndex = toListEntryIndex(it.lastKnownResultSetIndex, specification)
if (listIndex in lowResultSetIndex..highResultSetIndex && states[listIndex] == it.stateAndRef) {
// No change in the list
syncPoints.addFirst(SyncPoint(states.last(), specification, toResultSetIndex(states.lastIndex, specification)))
return listIndex+1 // Return first unread entry index, which is sync pos + 1
}
if (states.contains(it.stateAndRef)) {
// List has changed but we still see the sync point in current list
syncPoints.addFirst(SyncPoint(states.last(), specification, toResultSetIndex(states.lastIndex, specification)))
val pos = states.indexOf(it.stateAndRef)
it.lastKnownResultSetIndex = toResultSetIndex(pos, specification)
return pos + 1 // Return first unread entry index
val localSyncPoint: SyncPoint = syncPoint!! // We're single threaded here

val listIndex = toIndexWithinPage(localSyncPoint.lastKnownResultSetIndexOfStateAndRef, specification)
if (listIndex in 0..states.lastIndex && states[listIndex] == localSyncPoint.stateAndRef) {
// No change in the list
syncPoint = SyncPoint(states.last(), specification, toResultSetIndex(states.lastIndex, specification))
return listIndex+1 // Return first unread entry index, which is sync pos + 1
}
val largestSeenStateRefIndex = seachForLargestSeenStateRef(states, localSyncPoint.stateAndRef)
if (largestSeenStateRefIndex != -1) {
syncPoint = SyncPoint(states.last(), specification, toResultSetIndex(states.lastIndex, specification))
if (largestSeenStateRefIndex == states.lastIndex) {
// We have seen the whole list already, so just return last element which we have already seen,
// to simplify processing. Could happen if large number of tokens gets added while we are reading in.
return largestSeenStateRefIndex
}
return largestSeenStateRefIndex + 1 // first unread element
}
LOG.info("Token loading has become out of sync, will re-sync")
return -1
}
private fun getNewPageSpecification(): PageSpecification {
private fun popPriorPageSpecification(): PageSpecification {
pageSpecifications.pollLast()
return if (pageSpecifications.isEmpty()) {
syncPoints.clear()
syncPoint = null
pageSpecifications.add(initialSpecification)
initialSpecification
} else {
pageSpecifications.last
}
}

private fun toResultSetIndex(listEntryIndex: Int, specification: PageSpecification): Int {
return specification.pageSize * (specification.pageNumber-1) + listEntryIndex
private fun toResultSetIndex(listEntryIndex: Int, specification: PageSpecification): Long {
return specification.pageSize * (specification.pageNumber-1) + listEntryIndex.toLong()
}

private fun toListEntryIndex(resultSetIndex: Int, specification: PageSpecification): Int {
return resultSetIndex - ((specification.pageNumber-1) * specification.pageSize)
private fun toIndexWithinPage(resultSetIndex: Long, specification: PageSpecification): Int {
return (resultSetIndex - ((specification.pageNumber-1) * specification.pageSize)).toInt()
}

fun seachForLargestSeenStateRef(values: List<StateAndRef<FungibleToken>>, element: StateAndRef<FungibleToken>): Int {
var left = 0
var right = values.size - 1
var result = -1
while (left <= right) {
val midPoint: Int = left + (right - left) / 2
val value = values[midPoint]
when (compare(value, element)) {
0 -> return midPoint
-1 -> {
result = midPoint
left = midPoint +1
}
else -> right = midPoint -1
}
}
return result
}
private fun compare(first: StateAndRef<FungibleToken>, second: StateAndRef<FungibleToken>): Int {
if (first.ref.txhash.bytes.toHexString() < second.ref.txhash.bytes.toHexString()) {
return -1
}
if (first.ref.txhash.bytes.toHexString() > second.ref.txhash.bytes.toHexString()) {
return 1
}
if (first.ref.index < second.ref.index) {
return -1
}
if (first.ref.index > second.ref.index) {
return 1
}
return 0
}
}
return TokenObserver(emptyList(), uncheckedCast(vaultObservable), ownerProvider, asyncLoader)
}
data class SyncPoint(val stateAndRef: StateAndRef<FungibleToken>, val specification: PageSpecification, var lastKnownResultSetIndex: Int)
data class SyncPoint(val stateAndRef: StateAndRef<FungibleToken>, val specification: PageSpecification, var lastKnownResultSetIndexOfStateAndRef: Long)
}

init {
Expand Down Expand Up @@ -294,7 +329,7 @@ class VaultWatcherService(private val tokenObserver: TokenObserver,
for (stateAndRef in stateAndRefs) {
val existingMark = __backingMap.putIfAbsent(stateAndRef, PLACE_HOLDER)
existingMark?.let {
LOG.warn("Attempted to overwrite existing token ${stateAndRef.ref}, this suggests a result set re-sync occurred")
LOG.debug("Attempted to overwrite existing token ${stateAndRef.ref}, this suggests a result set re-sync occurred")
}
for (key in __indexed.keys) {
val index = processToken(stateAndRef, IndexingType.fromHolder(key))
Expand Down

0 comments on commit 545df35

Please sign in to comment.