Skip to content

Commit

Permalink
adding context parameter for pool with a warning for not being suppor…
Browse files Browse the repository at this point in the history
  • Loading branch information
AmeerHajAli authored and edoakes committed Jan 17, 2020
1 parent a3a2684 commit 9f9c3f5
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
3 changes: 3 additions & 0 deletions doc/source/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ instructions to run on a multi-node Ray cluster instead.
The full ``multiprocessing.Pool`` API is currently supported. Please see the
`multiprocessing documentation`_ for details.

.. warning::
The ``context`` argument in the ``Pool`` constructor is ignored when using Ray.

.. _`multiprocessing documentation`: https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool

Run on a Cluster
Expand Down
13 changes: 11 additions & 2 deletions python/ray/experimental/multiprocessing/pool.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from multiprocessing import TimeoutError
import os
import time
Expand All @@ -9,6 +10,8 @@

import ray

logger = logging.getLogger(__name__)

RAY_ADDRESS_ENV = "RAY_ADDRESS"


Expand Down Expand Up @@ -319,13 +322,19 @@ def __init__(self,
initializer=None,
initargs=None,
maxtasksperchild=None,
context=None,
ray_address=None):
self._closed = False
self._initializer = initializer
self._initargs = initargs
self._maxtasksperchild = maxtasksperchild or -1
self._actor_deletion_ids = []

if context:
logger.warning("The 'context' argument is not supported using "
"ray. Please refer to the documentation for how "
"to control ray initialization.")

processes = self._init_ray(processes, ray_address)
self._start_actor_pool(processes)

Expand All @@ -339,12 +348,12 @@ def _init_ray(self, processes=None, ray_address=None):

# Cluster mode.
if ray_address is not None:
print("Connecting to ray cluster at address='{}'".format(
logger.info("Connecting to ray cluster at address='{}'".format(
ray_address))
ray.init(address=ray_address)
# Local mode.
else:
print("Starting local ray cluster")
logger.info("Starting local ray cluster")
ray.init(num_cpus=processes)

ray_cpus = int(ray.state.cluster_resources()["CPU"])
Expand Down

0 comments on commit 9f9c3f5

Please sign in to comment.