Skip to content

Commit

Permalink
new threadpool implementation (non-adapting). Fixes irmen#19. Some jy…
Browse files Browse the repository at this point in the history
…thon unittest fixes.
  • Loading branch information
irmen committed Feb 2, 2014
1 parent 810dbb4 commit 612d106
Show file tree
Hide file tree
Showing 12 changed files with 197 additions and 359 deletions.
2 changes: 2 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ Change Log
**Pyro 4.24**

- daemon no longer sends an exception response when a communication error occurred (such as a timeout). This fixes the MSG_PING/disconnect example on linux
- threadpool is now again a fixed size determined by the new THREADPOOL_SIZE config item (defaults to 16)
- config items removed: THREADPOOL_MINTHREADS, THREADPOOL_MAXTHREADS, THREADPOOL_IDLETIMEOUT


**Pyro 4.23**
Expand Down
4 changes: 1 addition & 3 deletions docs/source/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ SERVERTYPE str thread Select the Pyro server type. thre
SOCK_REUSE bool False Should SO_REUSEADDR be used on sockets that Pyro creates.
PREFER_IP_VERSION int 4 The IP address type that is preferred (4=ipv4, 6=ipv6, 0=let OS decide).
THREADING2 bool False Use the threading2 module if available instead of Python's standard threading module
THREADPOOL_MINTHREADS int 4 For the thread pool server: minimum amount of worker threads to be spawned
THREADPOOL_MAXTHREADS int 50 For the thread pool server: maximum amount of worker threads to be spawned
THREADPOOL_IDLETIMEOUT float 2.0 For the thread pool server: number of seconds to pass for an idle worker thread to be terminated
THREADPOOL_SIZE int 16 For the thread pool server: amount of worker threads to be spawned.
FLAME_ENABLED bool False Should Pyro Flame be enabled on the server
SERIALIZER str serpent The wire protocol serializer to use for clients/proxies (one of: serpent, json, marshal, pickle)
SERIALIZERS_ACCEPTED set json,marshal, The wire protocol serializers accepted in the server/daemon.
Expand Down
8 changes: 2 additions & 6 deletions src/Pyro4/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
# PYRO_LOGFILE (the name of the logfile if you don't like the default)

import os
import sys
import platform


Expand All @@ -18,8 +17,7 @@ class Configuration(object):
"COMPRESSION", "SERVERTYPE", "DOTTEDNAMES", "COMMTIMEOUT",
"POLLTIMEOUT", "THREADING2", "ONEWAY_THREADED",
"DETAILED_TRACEBACK", "SOCK_REUSE", "PREFER_IP_VERSION",
"THREADPOOL_MINTHREADS", "THREADPOOL_MAXTHREADS",
"THREADPOOL_IDLETIMEOUT", "HMAC_KEY", "AUTOPROXY",
"THREADPOOL_SIZE", "HMAC_KEY", "AUTOPROXY",
"BROADCAST_ADDRS", "NATHOST", "NATPORT", "MAX_MESSAGE_SIZE",
"FLAME_ENABLED", "SERIALIZER", "SERIALIZERS_ACCEPTED", "LOGWIRE" )

Expand Down Expand Up @@ -47,9 +45,7 @@ def reset(self, useenvironment=True):
self.THREADING2 = False # use threading2 if available?
self.ONEWAY_THREADED = True # oneway calls run in their own thread
self.DETAILED_TRACEBACK = False
self.THREADPOOL_MINTHREADS = 4
self.THREADPOOL_MAXTHREADS = 50
self.THREADPOOL_IDLETIMEOUT = 2.0
self.THREADPOOL_SIZE = 16
self.HMAC_KEY = None # must be bytes type
self.AUTOPROXY = True
self.MAX_MESSAGE_SIZE = 0 # 0 = unlimited
Expand Down
4 changes: 0 additions & 4 deletions src/Pyro4/naming.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,6 @@ def locateNS(host=None, port=None):
if err not in Pyro4.socketutil.ERRNO_EADDRINUSE: # and jython likes to throw thses...
raise
data, _=sock.recvfrom(100)
try:
sock.shutdown(socket.SHUT_RDWR)
except (OSError, socket.error):
pass
sock.close()
if sys.version_info>=(3, 0):
data=data.decode("iso-8859-1")
Expand Down
99 changes: 99 additions & 0 deletions src/Pyro4/socketserver/threadpool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""
Thread pooled job queue with a fixed number of worker threads.
Pyro - Python Remote Objects. Copyright by Irmen de Jong ([email protected]).
"""

from __future__ import with_statement
import logging
import Pyro4.threadutil
import Pyro4.util
try:
import queue
except ImportError:
import Queue as queue


__all__ = ["PoolError", "Pool"]

log = logging.getLogger("Pyro4.threadpool")


class PoolError(Exception):
pass


class Worker(Pyro4.threadutil.Thread):
"""
Worker thread that picks jobs from the job queue and executes them.
If it encounters the sentinel None, it will stop running.
"""
def __init__(self, jobs):
super(Worker, self).__init__()
self.daemon = True
self.jobs = jobs
self.name = "Pyro-Worker-%d " % id(self)

def run(self):
for job in self.jobs:
if job is None:
break
try:
job()
except Exception:
tb = "".join(Pyro4.util.getPyroTraceback())
log.error("unhandled exception from job in worker thread %s: %s", self.name, tb)
# we continue running, just pick another job from the queue


class Pool(object):
"""
A job queue that is serviced by a pool of worker threads.
The size of the pool is configurable but stays fixed.
"""
def __init__(self):
self.pool = []
self.jobs = queue.Queue()
self.closed = False
for _ in range(Pyro4.config.THREADPOOL_SIZE):
worker = Worker(self.jobs_generator())
self.pool.append(worker)
worker.start()
log.debug("worker pool of size %d created", self.num_workers())

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def close(self):
"""Close down the thread pool, signaling to all remaining worker threads to shut down."""
for _ in range(self.num_workers()):
self.jobs.put(None) # None as a job means: terminate the worker
log.debug("closing down, %d halt-jobs issued", self.num_workers())
self.closed = True
self.pool = []

def __repr__(self):
return "<%s.%s at 0x%x, %d workers, %d jobs>" % \
(self.__class__.__module__, self.__class__.__name__, id(self), self.num_workers(), self.num_jobs())

def num_jobs(self):
return self.jobs.qsize()

def num_workers(self):
return len(self.pool)

def process(self, job):
"""
Add the job to the general job queue. Job is any callable object.
"""
if self.closed:
raise PoolError("job queue is closed")
self.jobs.put(job)

def jobs_generator(self):
"""generator that yields jobs from the queue"""
while not self.closed:
yield self.jobs.get() # this is a thread-safe operation (on queue) so we don't need our own locking
28 changes: 11 additions & 17 deletions src/Pyro4/socketserver/threadpoolserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from __future__ import with_statement
import socket, logging, sys, os
import struct
import Pyro4.util
from Pyro4 import socketutil, errors
import Pyro4.tpjobqueue
from .threadpool import Pool

log=logging.getLogger("Pyro4.socketserver.threadpool")
log=logging.getLogger("Pyro4.threadpoolserver")


class ClientConnectionJob(object):
Expand All @@ -26,7 +27,6 @@ def __init__(self, clientSocket, clientAddr, daemon):
self.daemon = daemon

def __call__(self):
log.debug("job call() %s", self.caddr) # XXX remove this after issue #19 is fixed
if self.handleConnection():
try:
while True:
Expand All @@ -39,7 +39,7 @@ def __call__(self):
except errors.SecurityError:
log.debug("security error on client %s", self.caddr)
break
# other errors simply crash the client worker thread (and close its connection)
# other errors simply crash this loop and abort the job (and close the client connection)
finally:
self.csock.close()

Expand Down Expand Up @@ -93,18 +93,17 @@ def init(self, daemon, host, port, unixsocket=None):
self.locationStr="[%s]:%d" % (host, port)
else:
self.locationStr="%s:%d" % (host, port)
self.jobqueue = Pyro4.tpjobqueue.ThreadPooledJobQueue()
log.info("%d workers started", self.jobqueue.workercount)
self.pool = Pool()

def __del__(self):
if self.sock is not None:
self.sock.close()
if self.jobqueue is not None:
self.jobqueue.close()
if self.pool is not None:
self.pool.close()

def __repr__(self):
return "<%s on %s, %d workers, %d jobs>" % (self.__class__.__name__, self.locationStr,
self.jobqueue.workercount, self.jobqueue.jobcount)
self.pool.num_workers(), self.pool.num_jobs())

def loop(self, loopCondition=lambda: True):
log.debug("threadpool server requestloop")
Expand Down Expand Up @@ -137,11 +136,11 @@ def events(self, eventsockets):
log.debug("connected %s", caddr)
if Pyro4.config.COMMTIMEOUT:
csock.settimeout(Pyro4.config.COMMTIMEOUT)
self.jobqueue.process(ClientConnectionJob(csock, caddr, self.daemon))
self.pool.process(ClientConnectionJob(csock, caddr, self.daemon))
except socket.timeout:
pass # just continue the loop on a timeout on accept

def close(self, joinWorkers=True):
def close(self):
log.debug("closing threadpool server")
if self.sock:
sockname=None
Expand All @@ -158,12 +157,7 @@ def close(self, joinWorkers=True):
except Exception:
pass
self.sock=None
self.jobqueue.close()
for worker in self.jobqueue.busy.copy():
if worker.job is not None:
worker.job.interrupt() # try to break all busy workers
if joinWorkers:
self.jobqueue.drain()
self.pool.close()

@property
def sockets(self):
Expand Down
Loading

0 comments on commit 612d106

Please sign in to comment.