From 545df3535ff47bc0e71c52c8f05caea05855e5c1 Mon Sep 17 00:00:00 2001 From: Adel El-Beik Date: Mon, 27 Mar 2023 18:50:24 +0100 Subject: [PATCH] ENT-9608: Update following review comments: Now do a binary search for the highest state ref less than one we have seen. Made sync points a single value now. --- .../SelectionUtilities.kt | 5 +- .../memory/services/VaultWatcherService.kt | 99 +++++++++++++------ 2 files changed, 70 insertions(+), 34 deletions(-) diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/SelectionUtilities.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/SelectionUtilities.kt index 4cbea217..b074f3a2 100644 --- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/SelectionUtilities.kt +++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/SelectionUtilities.kt @@ -15,8 +15,9 @@ import net.corda.core.node.services.vault.builder // TODO clean up the module structure of token-sdk, because these function and types (eg PartyAndAmount) should be separate from workflows // Sorts a query by state ref ascending. internal fun sortByStateRefAscending(): Sort { - val sortAttribute = SortAttribute.Standard(Sort.CommonStateAttribute.STATE_REF) - return Sort(setOf(Sort.SortColumn(sortAttribute, Sort.Direction.ASC))) + val sortAttributeTxnId = SortAttribute.Standard(Sort.CommonStateAttribute.STATE_REF_TXN_ID) + val sortAttributeIndex = SortAttribute.Standard(Sort.CommonStateAttribute.STATE_REF_INDEX) + return Sort(listOf(Sort.SortColumn(sortAttributeTxnId, Sort.Direction.ASC), Sort.SortColumn(sortAttributeIndex, Sort.Direction.ASC))) } // Returns all held token amounts of a specified token with given issuer. diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt index 9845da2c..41d9e536 100644 --- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt +++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt @@ -22,6 +22,7 @@ import net.corda.core.node.services.vault.PageSpecification import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.toHexString import rx.Observable import java.time.Duration import java.util.ArrayDeque @@ -124,7 +125,7 @@ class VaultWatcherService(private val tokenObserver: TokenObserver, UPDATER.submit { try { var shouldLoop = true - var startResultSetIndex = 0 + var startResultSetIndex = 0L while (shouldLoop) { val queryResult = queryVaultForStates(appServiceHub, startResultSetIndex) val newlyLoadedStates = queryResult.first.toSet() @@ -142,11 +143,11 @@ class VaultWatcherService(private val tokenObserver: TokenObserver, } private val pageSpecifications = ArrayDeque() - private val syncPoints = ArrayDeque() + private var syncPoint: SyncPoint? = null val initialSpecification = PageSpecification(DEFAULT_PAGE_NUM, 1999) val primes = intArrayOf(1009, 1013, 1019, 1021, 1031, 1033, 1039, 1049, 1051, 1061, 1063, 1069, 1087, 1091, 1093, 1097, 1103, 1109, 1117, 1123, 1129, 1151, 1153, 1163, 1171, 1181, 1187, 1193, 1201, 1213, 1217, 1223, 1229, 1231, 1237, 1249, 1259, 1277, 1279, 1283, 1289, 1291, 1297, 1301, 1303, 1307, 1319, 1321, 1327, 1361, 1367, 1373, 1381, 1399, 1409, 1423, 1427, 1429, 1433, 1439, 1447, 1451, 1453, 1459, 1471, 1481, 1483, 1487, 1489, 1493, 1499, 1511, 1523, 1531, 1543, 1549, 1553, 1559, 1567, 1571, 1579, 1583, 1597, 1601, 1607, 1609, 1613, 1619, 1621, 1627, 1637, 1657, 1663, 1667, 1669, 1693, 1697, 1699, 1709, 1721, 1723, 1733, 1741, 1747, 1753, 1759, 1777, 1783, 1787, 1789, 1801, 1811, 1823, 1831, 1847, 1861, 1867, 1871, 1873, 1877, 1879, 1889, 1901, 1907, 1913, 1931, 1933, 1949, 1951, 1973, 1979, 1987, 1993, 1997, 1999) - private fun queryVaultForStates(appServiceHub: AppServiceHub, startResultSetIndex: Int): Triple>, Int, Boolean> { + private fun queryVaultForStates(appServiceHub: AppServiceHub, startResultSetIndex: Long): Triple>, Long, Boolean> { var newlyLoadedStatesList: List> = emptyList() var specification = getOverlappingPageSpecification(startResultSetIndex) var newStartPos = -1 @@ -159,15 +160,15 @@ class VaultWatcherService(private val tokenObserver: TokenObserver, ).states newStartPos = isListInSync(newlyLoadedStatesList, specification) if (newStartPos == -1) { - specification = getNewPageSpecification() + specification = popPriorPageSpecification() } } val lastResultSetIndex = toResultSetIndex(newlyLoadedStatesList.lastIndex, specification) val shouldLoop = newlyLoadedStatesList.size == specification.pageSize return Triple(newlyLoadedStatesList.subList(newStartPos, newlyLoadedStatesList.size), lastResultSetIndex, shouldLoop) } - private fun getOverlappingPageSpecification(startResultSetIndex: Int): PageSpecification { - val specification = if (startResultSetIndex == 0) { + private fun getOverlappingPageSpecification(startResultSetIndex: Long): PageSpecification { + val specification = if (startResultSetIndex == 0L) { initialSpecification } else { @@ -195,35 +196,35 @@ class VaultWatcherService(private val tokenObserver: TokenObserver, if (states.isEmpty()) { return 0 } - if (syncPoints.isEmpty()) { - syncPoints.addFirst(SyncPoint(states.last(), specification, toResultSetIndex(states.lastIndex, specification))) + if (syncPoint == null) { + syncPoint = SyncPoint(states.last(), specification, toResultSetIndex(states.lastIndex, specification)) return 0 } - val highResultSetIndex = toResultSetIndex(states.lastIndex, specification) - val lowResultSetIndex = toResultSetIndex(0, specification) - - syncPoints.forEach { - val listIndex = toListEntryIndex(it.lastKnownResultSetIndex, specification) - if (listIndex in lowResultSetIndex..highResultSetIndex && states[listIndex] == it.stateAndRef) { - // No change in the list - syncPoints.addFirst(SyncPoint(states.last(), specification, toResultSetIndex(states.lastIndex, specification))) - return listIndex+1 // Return first unread entry index, which is sync pos + 1 - } - if (states.contains(it.stateAndRef)) { - // List has changed but we still see the sync point in current list - syncPoints.addFirst(SyncPoint(states.last(), specification, toResultSetIndex(states.lastIndex, specification))) - val pos = states.indexOf(it.stateAndRef) - it.lastKnownResultSetIndex = toResultSetIndex(pos, specification) - return pos + 1 // Return first unread entry index + val localSyncPoint: SyncPoint = syncPoint!! // We're single threaded here + + val listIndex = toIndexWithinPage(localSyncPoint.lastKnownResultSetIndexOfStateAndRef, specification) + if (listIndex in 0..states.lastIndex && states[listIndex] == localSyncPoint.stateAndRef) { + // No change in the list + syncPoint = SyncPoint(states.last(), specification, toResultSetIndex(states.lastIndex, specification)) + return listIndex+1 // Return first unread entry index, which is sync pos + 1 + } + val largestSeenStateRefIndex = seachForLargestSeenStateRef(states, localSyncPoint.stateAndRef) + if (largestSeenStateRefIndex != -1) { + syncPoint = SyncPoint(states.last(), specification, toResultSetIndex(states.lastIndex, specification)) + if (largestSeenStateRefIndex == states.lastIndex) { + // We have seen the whole list already, so just return last element which we have already seen, + // to simplify processing. Could happen if large number of tokens gets added while we are reading in. + return largestSeenStateRefIndex } + return largestSeenStateRefIndex + 1 // first unread element } LOG.info("Token loading has become out of sync, will re-sync") return -1 } - private fun getNewPageSpecification(): PageSpecification { + private fun popPriorPageSpecification(): PageSpecification { pageSpecifications.pollLast() return if (pageSpecifications.isEmpty()) { - syncPoints.clear() + syncPoint = null pageSpecifications.add(initialSpecification) initialSpecification } else { @@ -231,17 +232,51 @@ class VaultWatcherService(private val tokenObserver: TokenObserver, } } - private fun toResultSetIndex(listEntryIndex: Int, specification: PageSpecification): Int { - return specification.pageSize * (specification.pageNumber-1) + listEntryIndex + private fun toResultSetIndex(listEntryIndex: Int, specification: PageSpecification): Long { + return specification.pageSize * (specification.pageNumber-1) + listEntryIndex.toLong() } - private fun toListEntryIndex(resultSetIndex: Int, specification: PageSpecification): Int { - return resultSetIndex - ((specification.pageNumber-1) * specification.pageSize) + private fun toIndexWithinPage(resultSetIndex: Long, specification: PageSpecification): Int { + return (resultSetIndex - ((specification.pageNumber-1) * specification.pageSize)).toInt() + } + + fun seachForLargestSeenStateRef(values: List>, element: StateAndRef): Int { + var left = 0 + var right = values.size - 1 + var result = -1 + while (left <= right) { + val midPoint: Int = left + (right - left) / 2 + val value = values[midPoint] + when (compare(value, element)) { + 0 -> return midPoint + -1 -> { + result = midPoint + left = midPoint +1 + } + else -> right = midPoint -1 + } + } + return result + } + private fun compare(first: StateAndRef, second: StateAndRef): Int { + if (first.ref.txhash.bytes.toHexString() < second.ref.txhash.bytes.toHexString()) { + return -1 + } + if (first.ref.txhash.bytes.toHexString() > second.ref.txhash.bytes.toHexString()) { + return 1 + } + if (first.ref.index < second.ref.index) { + return -1 + } + if (first.ref.index > second.ref.index) { + return 1 + } + return 0 } } return TokenObserver(emptyList(), uncheckedCast(vaultObservable), ownerProvider, asyncLoader) } - data class SyncPoint(val stateAndRef: StateAndRef, val specification: PageSpecification, var lastKnownResultSetIndex: Int) + data class SyncPoint(val stateAndRef: StateAndRef, val specification: PageSpecification, var lastKnownResultSetIndexOfStateAndRef: Long) } init { @@ -294,7 +329,7 @@ class VaultWatcherService(private val tokenObserver: TokenObserver, for (stateAndRef in stateAndRefs) { val existingMark = __backingMap.putIfAbsent(stateAndRef, PLACE_HOLDER) existingMark?.let { - LOG.warn("Attempted to overwrite existing token ${stateAndRef.ref}, this suggests a result set re-sync occurred") + LOG.debug("Attempted to overwrite existing token ${stateAndRef.ref}, this suggests a result set re-sync occurred") } for (key in __indexed.keys) { val index = processToken(stateAndRef, IndexingType.fromHolder(key))