diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index cf7f16aa8..bfdf64187 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -347,7 +347,9 @@ def parallel_bulk( class BlockingPool(ThreadPool): def _setup_queues(self): super(BlockingPool, self)._setup_queues() - self._inqueue = Queue(queue_size) + # The queue must be at least the size of the number of threads to + # prevent hanging when inserting sentinel values during teardown. + self._inqueue = Queue(max(queue_size, thread_count)) self._quick_put = self._inqueue.put pool = BlockingPool(thread_count)