Skip to content

Commit 7369fa9

Browse files
authored
feat(scheduler): add back pressure for table dispatcher (moiot#188)
* feat(scheduler): add back pressure for table dispatcher
1 parent aa2dcbe commit 7369fa9

File tree

1 file changed

+12
-2
lines changed

1 file changed

+12
-2
lines changed

pkg/schedulers/batch_table_scheduler/batch_table_scheduler.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -512,8 +512,18 @@ func (scheduler *batchScheduler) startTableDispatcher(tableKey string) {
512512
metrics.QueueLength.WithLabelValues(env.PipelineName, "table-latch", key).Set(float64(len(latches)))
513513
metrics.QueueLength.WithLabelValues(env.PipelineName, "table-latch-ack", key).Set(float64(latchQLen))
514514

515-
if len(batch) >= scheduler.cfg.MaxBatchPerWorker || ((queueLen+latchQLen) == 0 && len(batch) > 0) {
516-
flushFunc()
515+
if len(batch) > 0 {
516+
if len(batch) >= scheduler.cfg.MaxBatchPerWorker {
517+
flushFunc()
518+
} else if (queueLen + latchQLen) == 0 {
519+
queueIdx := round % uint(scheduler.cfg.NrWorker)
520+
round++
521+
if len(scheduler.workerQueues[queueIdx]) < scheduler.cfg.QueueSize/2 {
522+
flushFunc()
523+
} else {
524+
// worker queue has many items pending, try to accumulate message in the batch.
525+
}
526+
}
517527
}
518528
}
519529

0 commit comments

Comments
 (0)