Skip to content

Commit

Permalink
Replace concurrent request model with a scheduler and optimizer.
Browse files Browse the repository at this point in the history
  • Loading branch information
rustyrazorblade committed Oct 24, 2023
1 parent a290b68 commit 48a3f7e
Show file tree
Hide file tree
Showing 15 changed files with 458 additions and 143 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ tlp-stress.iws
/*.rpm
buildSrc/.gradle/
buildSrc/build/
.jira-url
*.ipr
*.iws
.sdkmanrc
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ apply plugin: 'application'
apply plugin: 'com.github.johnrengelman.shadow'

group 'com.thelastpickle'
version '5.0.0'
version '6.0.0'

sourceCompatibility = 11

Expand Down Expand Up @@ -67,7 +67,7 @@ dependencies {
implementation group: 'io.dropwizard.metrics', name: 'metrics-core', version: '3.2.2'

// https://mvnrepository.com/artifact/com.google.guava/guava
implementation group: 'com.google.guava', name: 'guava', version: '31.1-jre'
implementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre'

// https://mvnrepository.com/artifact/com.github.ajalt/mordant
implementation group: 'com.github.ajalt', name: 'mordant', version: '1.1.0'
Expand Down
14 changes: 8 additions & 6 deletions src/main/kotlin/com/thelastpickle/tlpstress/Metrics.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import io.prometheus.client.exporter.HTTPServer

import org.HdrHistogram.SynchronizedHistogram

class Metrics(metricRegistry: MetricRegistry, val reporters: List<ScheduledReporter>, val httpPort : Int) {
class Metrics(val metricRegistry: MetricRegistry, val reporters: List<ScheduledReporter>, val httpPort : Int) {

val server: HTTPServer

Expand All @@ -29,14 +29,19 @@ class Metrics(metricRegistry: MetricRegistry, val reporters: List<ScheduledRepor
}
}

fun resetErrors() {
metricRegistry.remove("errors")
errors = metricRegistry.meter("errors")
}

init {
CollectorRegistry.defaultRegistry.register(DropwizardExports(metricRegistry))
server = HTTPServer(httpPort)

}


val errors = metricRegistry.meter("errors")
var errors = metricRegistry.meter("errors")
val mutations = metricRegistry.timer("mutations")
val selects = metricRegistry.timer("selects")
val deletions = metricRegistry.timer("deletions")
Expand All @@ -47,7 +52,4 @@ class Metrics(metricRegistry: MetricRegistry, val reporters: List<ScheduledRepor
val mutationHistogram = SynchronizedHistogram(2)
val selectHistogram = SynchronizedHistogram(2)
val deleteHistogram = SynchronizedHistogram(2)



}
}
13 changes: 4 additions & 9 deletions src/main/kotlin/com/thelastpickle/tlpstress/OperationCallback.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,16 @@ package com.thelastpickle.tlpstress

import com.datastax.driver.core.ResultSet
import com.google.common.util.concurrent.FutureCallback
import java.util.*
import com.codahale.metrics.Timer
import com.thelastpickle.tlpstress.profiles.IStressRunner
import com.thelastpickle.tlpstress.profiles.Operation
import org.apache.logging.log4j.kotlin.logger
import java.util.concurrent.Semaphore

/**
* Callback after a mutation or select
* This was moved out of the inline ProfileRunner to make populate mode easier
* as well as reduce clutter
*/
class OperationCallback(val context: StressContext,
val semaphore: Semaphore,
val startTime: Timer.Context,
val runner: IStressRunner,
val op: Operation,
val paginate: Boolean = false) : FutureCallback<ResultSet> {
Expand All @@ -26,9 +21,7 @@ class OperationCallback(val context: StressContext,
}

override fun onFailure(t: Throwable) {
semaphore.release()
context.metrics.errors.mark()

log.error { t }

}
Expand All @@ -42,8 +35,7 @@ class OperationCallback(val context: StressContext,
}
}

semaphore.release()
val time = startTime.stop()
val time = op.startTime.stop()

// we log to the HDR histogram and do the callback for mutations
// might extend this to select, but I can't see a reason for it now
Expand All @@ -60,6 +52,9 @@ class OperationCallback(val context: StressContext,
is Operation.SelectStatement -> {
context.metrics.selectHistogram.recordValue(time)
}
is Operation.Stop -> {
throw OperationStopException()
}
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.thelastpickle.tlpstress

class OperationStopException : Throwable() {

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package com.thelastpickle.tlpstress

sealed class PopulateOption {
class Standard : PopulateOption()
class Custom(val rows: Long) : PopulateOption()
class Custom(val rows: Long, val deletes: Boolean = true) : PopulateOption()
}
111 changes: 29 additions & 82 deletions src/main/kotlin/com/thelastpickle/tlpstress/ProfileRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -95,113 +95,60 @@ class ProfileRunner(val context: StressContext,
* Used for both pre-populating data and for performing the actual runner
*/
private fun executeOperations(iterations: Long, duration: Long) {

val desiredEndTime = LocalDateTime.now().plusMinutes(duration)
var operations = 0
// create a semaphore local to the thread to limit the query concurrency
val sem = Semaphore(context.permits)

val runner = profile.getRunner(context)

// we use MAX_VALUE since it's essentially infinite if we give a duration
val totalValues = if (duration > 0) Long.MAX_VALUE else iterations

for (key in partitionKeyGenerator.generateKey(totalValues, context.mainArguments.partitionValues)) {
if (duration > 0 && desiredEndTime.isBefore(LocalDateTime.now())) {
break
}
// get next thing from the profile
// thing could be a statement, or it could be a failure command
// certain profiles will want to deterministically inject failures
// others can be randomly injected by the runner
// I should be able to just tell the runner to inject gossip failures in any test
// without having to write that code in the profile
val nextOp = ThreadLocalRandom.current().nextInt(0, 100)
val op : Operation = getNextOperation(nextOp, runner, key)

// if we're using the rate limiter (unlikely) grab a permit
context.rateLimiter?.run {
acquire(1)
}

sem.acquire()
// if we have a custom generator for the populate phase we'll use that

val startTime = when(op) {
is Operation.Mutation -> context.metrics.mutations.time()
is Operation.SelectStatement -> context.metrics.selects.time()
is Operation.Deletion -> context.metrics.deletions.time()
}
val queue = RequestQueue(partitionKeyGenerator, context, totalValues, duration, runner, readRate, deleteRate)
queue.start()

// pull requests off the queue instead of using generateKey
// move the getNextOperation into the queue thing
for (op in queue.getNextOperation()) {
val future = context.session.executeAsync(op.bound)
Futures.addCallback(future, OperationCallback(context, sem, startTime, runner, op, paginate = context.mainArguments.paginate), MoreExecutors.directExecutor())
operations++
Futures.addCallback(future, OperationCallback(context, runner, op, paginate = context.mainArguments.paginate), MoreExecutors.directExecutor())
}

// block until all the queries are finished
sem.acquireUninterruptibly(context.permits)
}

private fun getNextOperation(nextOp: Int, runner: IStressRunner, key: PartitionKey): Operation {
return if (readRate * 100 > nextOp) {
runner.getNextSelect(key)
} else if ((readRate * 100) + (deleteRate * 100) > nextOp) {
runner.getNextDelete(key)
} else {
runner.getNextMutation(key)
}
}

/**
* Prepopulates the database with numRows
* Mutations only, does not count towards the normal metrics
* Records all timers in the populateMutations metrics
* Can (and should) be graphed separately
*/
fun populate(numRows: Long) {
fun populate(numRows: Long, deletes:Boolean = true) {

val runner = profile.getRunner(context)
val sem = Semaphore(context.permits)

fun executePopulate(op: Operation) {
context.rateLimiter?.run {
acquire(1)
}
sem.acquire()

val startTime = context.metrics.populate.time()
val future = context.session.executeAsync(op.bound)
Futures.addCallback(future, OperationCallback(context, sem, startTime, runner, op), MoreExecutors.directExecutor())
}

when(profile.getPopulateOption(context.mainArguments)) {
is PopulateOption.Custom -> {
log.info { "Starting a custom population" }

for (op in runner.customPopulateIter()) {
executePopulate(op)
}
}
is PopulateOption.Standard -> {

log.info("Populating Cassandra with $numRows rows")

// we follow the same access pattern as normal writes when pre-populating
for (key in partitionKeyGenerator.generateKey(numRows, context.mainArguments.partitionValues)) {
// we should be inserting tombstones at the --deletes rate
val nextOp = ThreadLocalRandom.current().nextInt(0, 100)

// populate should populate with tombstones in the case of --deletes being set
val op = if (deleteRate * 100 > nextOp) {
runner.getNextDelete(key)
} else {
runner.getNextMutation(key)
}
executePopulate(op)
}

val populatePartitionKeyGenerator = profile.getPopulatePartitionKeyGenerator().orElse(partitionKeyGenerator)

val queue = RequestQueue(populatePartitionKeyGenerator, context, numRows, 0, runner, 0.0,
if (deletes) deleteRate else 0.0,
populatePhase = true)
queue.start()

try {
for (op in queue.getNextOperation()) {
val future = context.session.executeAsync(op.bound)
Futures.addCallback(
future,
OperationCallback(context, runner, op, false),
MoreExecutors.directExecutor()
)
}
} catch (_: OperationStopException) {
log.info("Received Stop signal")
Thread.sleep(3000)
} catch (e: Exception) {
log.warn("Received unknown exception ${e.message}")
throw e
}
sem.acquireUninterruptibly(context.permits)
}


Expand Down
122 changes: 122 additions & 0 deletions src/main/kotlin/com/thelastpickle/tlpstress/RateLimiterOptimizer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package com.thelastpickle.tlpstress

import com.google.common.util.concurrent.RateLimiter
import org.apache.logging.log4j.kotlin.logger
import java.lang.Math.cbrt
import java.util.*
import java.util.concurrent.TimeUnit
import kotlin.math.sqrt

/**
*
*/
class RateLimiterOptimizer(val rateLimiter: RateLimiter,
val metrics: Metrics,
val maxReadLatency: Long?,
val maxWriteLatency: Long?) {
companion object {
val log = logger()
}
val durationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1)

/**
* Updates the rate limiter to use the new value and returns the new rate limit
*/
fun execute(): Double {
// determine the latency number that's closest to it's limit
getCurrentAndMaxLatency().map {
val newLimit = getNextValue(rateLimiter.rate, it.first, it.second)
return@map if (newLimit == rateLimiter.rate) {
log.info("Optimizer has nothing to do")
newLimit
} else {
log.info("Updating rate limiter from ${rateLimiter.rate} to ${newLimit}")
rateLimiter.rate = newLimit
rateLimiter.rate
}
}.orElse(
return rateLimiter.rate
)
}

/**
* Added to prevent the rate limiter from acting when queries aren't running, generally during populate phase
*/
fun getTotalOperations() : Long {
return metrics.mutations.count + metrics.selects.count
}

/**
* Returns current, target Pair
*/
fun getCurrentAndMaxLatency() : Optional<Pair<Double, Long>> {
if (maxWriteLatency == null && maxReadLatency == null) {
return Optional.empty()
}
// if we're in the populate phase
if (metrics.mutations.count == 0L &&
metrics.selects.count == 0L &&
metrics.deletions.count == 0L &&
metrics.populate.count > 0L )
if (maxWriteLatency != null) {
log.info("In populate phase, using populate latency")
return Optional.of(Pair(getPopulateLatency(), maxWriteLatency))
} else {
return Optional.empty<Pair<Double, Long>>()
}
if (maxReadLatency == null) {
return Optional.of(Pair(getWriteLatency(), maxWriteLatency!!))
}
if (maxWriteLatency == null) {
return Optional.of(Pair(getReadLatency(), maxReadLatency))
}

val rLatP = getReadLatency() / maxReadLatency
val wLatP = getWriteLatency() / maxWriteLatency

// if either is over, return the one that's the most % over
return if (rLatP > wLatP) {
Optional.of(Pair(getReadLatency(), maxReadLatency))
} else Optional.of(Pair(getWriteLatency(), maxWriteLatency))
}

/**
* Provide the new value for the rate limiter
* If we're over, we significantly reduce traffic
* If we're within 95% of our target we increase by 1 (just to do something)
* if we're under, we increase by up to 2x
*/
fun getNextValue(currentRateLimiterValue: Double, currentLatency: Double, maxLatency: Long): Double {
// we set our max increase relative to the total latency we can tolerate, at most increasing by 5%
// small latency requirements (< 10ms) should barely adjust the throughput b/c it's so sensitive
var maxIncrease = (1.0 + sqrt(maxLatency.toDouble()) / 100.0).coerceAtMost(1.05)

if (currentLatency > maxLatency) {
log.info("Current Latency ($currentLatency) over Max Latency ($maxLatency) reducing throughput by 10%")
return currentRateLimiterValue * .90
}
else if (currentLatency / maxLatency.toDouble() > .90) {
log.info("Current latency ($currentLatency) within 95% of max ($maxLatency), not adjusting")
return currentRateLimiterValue
}
else {
// increase a reasonable amount
// should provide a very gentle increase when we get close to the right number
val adjustmentFactor = (1 + cbrt(maxLatency.toDouble() - currentLatency) / maxLatency.toDouble()).coerceAtMost(maxIncrease)
val newLimit = currentRateLimiterValue * adjustmentFactor
log.info("Current limiter: $currentRateLimiterValue latency $currentLatency, max: $maxLatency adjustment factor: $adjustmentFactor, new limit: $newLimit")
return newLimit
}
}

fun getReadLatency() =
metrics.selects.snapshot.get99thPercentile() * durationFactor


fun getWriteLatency() =
metrics.mutations.snapshot.get99thPercentile() * durationFactor

fun getPopulateLatency() =
metrics.populate.snapshot.get99thPercentile() * durationFactor

}
Loading

0 comments on commit 48a3f7e

Please sign in to comment.