Skip to content

Commit

Permalink
Move from threadpool to multiprocessing.pool (errbotio#968)
Browse files Browse the repository at this point in the history
Threadpool is obsolete and recommends to switch to a modern alternative.
  • Loading branch information
murinicanor authored and gbin committed Apr 8, 2017
1 parent 409e704 commit 30aa0b4
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 14 deletions.
4 changes: 2 additions & 2 deletions errbot/backends/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import sys
import pprint
from functools import lru_cache
from threadpool import WorkRequest
from multiprocessing.pool import ThreadPool

from markdown import Markdown
from markdown.extensions.extra import ExtraExtension
Expand Down Expand Up @@ -679,7 +679,7 @@ def send_stream_request(self, identifier, fsource, name='file', size=None, strea
stream = Stream(identifier, fsource, name, size, stream_type)
log.debug("Requesting upload of {0} to {1} (size hint: {2}, stream type: {3})".format(name,
identifier.channelname, size, stream_type))
self.thread_pool.putRequest(WorkRequest(self._slack_upload, args=(stream,)))
self.thread_pool.apply_async(self._slack_upload, (stream,))
return stream

def send_card(self, card: Card):
Expand Down
4 changes: 2 additions & 2 deletions errbot/backends/telegram_messenger.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import sys
from threadpool import WorkRequest
from multiprocessing.pool import ThreadPool

from errbot.backends.base import RoomError, Identifier, Person, RoomOccupant, Stream, ONLINE, Room
from errbot.core import ErrBot
Expand Down Expand Up @@ -454,7 +454,7 @@ def _is_valid_url(url):
stream = Stream(identifier, content, name, size, stream_type)
log.debug("Requesting upload of {0} to {1} (size hint: {2}, stream type: {3})".format(name,
identifier, size, stream_type))
self.thread_pool.putRequest(WorkRequest(self._telegram_upload_stream, args=(stream,)))
self.thread_pool.apply_async(self._telegram_upload_stream, (stream,))

return stream

Expand Down
14 changes: 7 additions & 7 deletions errbot/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from threading import RLock

import collections
from threadpool import ThreadPool, WorkRequest
from multiprocessing.pool import ThreadPool

from errbot import CommandError
from errbot.flow import FlowExecutor, FlowRoot
Expand Down Expand Up @@ -369,7 +369,9 @@ def _process_command(self, mess, cmd, args, match):
if f._err_command_admin_only and self.bot_config.BOT_ASYNC:
# If it is an admin command, wait until the queue is completely depleted so
# we don't have strange concurrency issues on load/unload/updates etc...
self.thread_pool.wait()
self.thread_pool.close()
self.thread_pool.join()
self.thread_pool = ThreadPool(self.bot_config.BOT_ASYNC_POOLSIZE)

if f._err_command_historize:
user_cmd_history.append((cmd, args)) # add it to the history only if it is authorized to be so
Expand All @@ -392,17 +394,15 @@ def _process_command(self, mess, cmd, args, match):
return

if self.bot_config.BOT_ASYNC:
wr = WorkRequest(
result = self.thread_pool.apply_async(
self._execute_and_send,
[],
{'cmd': cmd, 'args': args, 'match': match, 'mess': mess,
'template_name': f._err_command_template}
{'cmd': cmd, 'args': args, 'match': match, 'mess': mess, 'template_name': f._err_command_template}
)
self.thread_pool.putRequest(wr)
if f._err_command_admin_only:
# Again, if it is an admin command, wait until the queue is completely
# depleted so we don't have strange concurrency issues.
self.thread_pool.wait()
result.wait()
else:
self._execute_and_send(cmd=cmd, args=args, match=match, mess=mess,
template_name=f._err_command_template)
Expand Down
4 changes: 2 additions & 2 deletions errbot/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from threading import RLock
from typing import Mapping, List, Tuple, Union, Callable, Any

from threadpool import ThreadPool, WorkRequest
from multiprocessing.pool import ThreadPool
from yapsy.IPlugin import IPlugin

from errbot import Message
Expand Down Expand Up @@ -363,7 +363,7 @@ def _enqueue_flow(self, flow):
with self._lock:
if flow not in self.in_flight:
self.in_flight.append(flow)
self._pool.putRequest(WorkRequest(self.execute, args=(flow, )))
self._pool.apply_async(self.execute, (flow, ))

def execute(self, flow: Flow):
"""
Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
deps = ['webtest',
'setuptools',
'bottle',
'threadpool',
'rocket-errbot',
'requests',
'jinja2',
Expand Down

0 comments on commit 30aa0b4

Please sign in to comment.