Skip to content

Commit

Permalink
Pass backfill_id in EmbeddedBackfila (#297)
Browse files Browse the repository at this point in the history
More realistic
  • Loading branch information
shellderp authored Mar 15, 2023
1 parent d4da5c5 commit cb86ff2
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ interface BackfillRun<B : Backfill> {
val parameters: Map<String, ByteString>
val rangeStart: String?
val rangeEnd: String?
val backfillId: String

var batchSize: Long
var scanSize: Long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ internal class EmbeddedBackfila @Inject internal constructor(
parameters = createRequest.parameter_map.toMutableMap(),
rangeStart = createRequest.pkey_range_start?.utf8(),
rangeEnd = createRequest.pkey_range_end?.utf8(),
backfillId = backfillId.toString(),
)

run.execute()
Expand Down Expand Up @@ -131,6 +132,7 @@ internal class EmbeddedBackfila @Inject internal constructor(
parameters = parameters.toMutableMap(),
rangeStart = rangeStart,
rangeEnd = rangeEnd,
backfillId = backfillId,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ internal class EmbeddedBackfillRun<B : Backfill>(
override val parameters: MutableMap<String, ByteString>,
override val rangeStart: String?,
override val rangeEnd: String?,
override val backfillId: String,

override var batchSize: Long = 100L,
override var scanSize: Long = 10_000L,
Expand Down Expand Up @@ -75,6 +76,7 @@ internal class EmbeddedBackfillRun<B : Backfill>(
val response =
operator.getNextBatchRange(
GetNextBatchRangeRequest.Builder()
.backfill_id(backfillId)
.partition_name(cursor.partitionName)
.backfill_range(cursor.keyRange)
.previous_end_key(cursor.previousEndKey)
Expand Down Expand Up @@ -113,6 +115,7 @@ internal class EmbeddedBackfillRun<B : Backfill>(
val response =
operator.getNextBatchRange(
GetNextBatchRangeRequest.Builder()
.backfill_id(backfillId)
.partition_name(cursor.partitionName)
.backfill_range(cursor.keyRange)
.previous_end_key(cursor.previousEndKey)
Expand Down Expand Up @@ -164,6 +167,7 @@ internal class EmbeddedBackfillRun<B : Backfill>(
val remainingBatch = batch.copy(batchRange = remainingRange)
response = operator.runBatch(
RunBatchRequest.Builder()
.backfill_id(backfillId)
.partition_name(remainingBatch.partitionName)
.batch_range(remainingBatch.batchRange)
.parameters(parameters)
Expand Down

0 comments on commit cb86ff2

Please sign in to comment.