diff --git a/python/ray/serve/_private/replica_scheduler/pow_2_scheduler.py b/python/ray/serve/_private/replica_scheduler/pow_2_scheduler.py index 9999e9d766b7..0bea07693050 100644 --- a/python/ray/serve/_private/replica_scheduler/pow_2_scheduler.py +++ b/python/ray/serve/_private/replica_scheduler/pow_2_scheduler.py @@ -469,10 +469,17 @@ async def _probe_queue_lens( self.queue_len_response_deadline_s, self.max_queue_len_response_deadline_s, ) - queue_len_response_deadline_s = min( - self.queue_len_response_deadline_s * (2**backoff_index), - max_queue_len_response_deadline_s, - ) + + try: + queue_len_response_deadline_s = min( + self.queue_len_response_deadline_s * (2**backoff_index), + max_queue_len_response_deadline_s, + ) + except OverflowError: + # self.queue_len_response_deadline_s * (2**backoff_index) + # can overflow if backoff_index gets sufficiently large (e.g. + # 1024 when queue_len_response_deadline_s is 0.1). + queue_len_response_deadline_s = max_queue_len_response_deadline_s get_queue_len_tasks = [] for r in replicas: @@ -649,6 +656,7 @@ async def fulfill_pending_requests(self): """ try: while len(self._scheduling_tasks) <= self.target_num_scheduling_tasks: + start_time = time.time() backoff_index = 0 request_metadata = self._get_next_pending_request_metadata_to_schedule() async for candidates in self.choose_two_replicas_with_backoff( @@ -662,6 +670,23 @@ async def fulfill_pending_requests(self): break backoff_index += 1 + if backoff_index >= 50 and backoff_index % 50 == 0: + scheduling_time_elapsed = time.time() - start_time + warning_log = ( + "Failed to schedule request after " + f"{backoff_index} attempts over " + f"{scheduling_time_elapsed:.2f}s. Retrying." + ) + if request_metadata is not None: + warning_log += ( + f" Request ID: {request_metadata.request_id}." + ) + if request_metadata.multiplexed_model_id: + warning_log += ( + " Multiplexed model ID: " + f"{request_metadata.multiplexed_model_id}." + ) + logger.warning(warning_log) except Exception: logger.exception("Unexpected error in fulfill_pending_requests.") diff --git a/python/ray/serve/tests/unit/test_pow_2_replica_scheduler.py b/python/ray/serve/tests/unit/test_pow_2_replica_scheduler.py index b4521e0d47ec..2603dcad6b35 100644 --- a/python/ray/serve/tests/unit/test_pow_2_replica_scheduler.py +++ b/python/ray/serve/tests/unit/test_pow_2_replica_scheduler.py @@ -1598,5 +1598,34 @@ async def test_queue_len_cache_entries_added_correctly(pow_2_scheduler): TIMER.advance(staleness_timeout_s + 1) +@pytest.mark.asyncio +@pytest.mark.parametrize( + "pow_2_scheduler", + [ + {"prefer_local_node": True, "prefer_local_az": True}, + ], + indirect=True, +) +@pytest.mark.parametrize("backoff_index", [0, 10, 2048]) +async def test_backoff_index_handling(pow_2_scheduler, backoff_index: int): + """Ensure that different ranges of backoff_index are valid. + + In the past, high backoff_indexes (greater than 1024) have caused + OverflowErrors. See https://github.com/ray-project/ray/issues/43964. + """ + s = pow_2_scheduler + + r1 = FakeReplicaWrapper("r1") + r1.set_queue_len_response(0) + + r2 = FakeReplicaWrapper("r2") + r2.set_queue_len_response(0) + + s.update_replicas([r1, r2]) + + r = await s.select_from_candidate_replicas([r1, r2], backoff_index) + assert r in [r1, r2] + + if __name__ == "__main__": sys.exit(pytest.main(["-v", "-s", __file__]))