Skip to content

Commit

Permalink
Fixed a bug in job queue processing which seems to be introduced when…
Browse files Browse the repository at this point in the history
… priorities were implemented for jobs.

Ignore-this: 8214705f7e3bd478779ba5939fc9906

darcs-hash:20090523144352-20ca2-b68f9a977c83f1e667938613cc16f89f54cdb19f.gz
  • Loading branch information
scudette committed May 23, 2009
1 parent 63d7141 commit f28259d
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 27 deletions.
17 changes: 14 additions & 3 deletions src/plugins/Core.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,10 @@ def schedule(self):
present.
"""
dbh=DB.DBO()
dbh.execute("lock table jobs write")
dbh.execute("lock table high_priority_jobs write")
try:
dbh.execute("delete from jobs where command='Periodic' and state='pending'")
dbh.insert("jobs", _fast=True,
dbh.execute("delete from high_priority_jobs where command='Periodic' and state='pending'")
dbh.insert("high_priority_jobs", _fast=True,
command="Periodic", priority=20,
_when_valid="from_unixtime(%r)" % (int(time.time()) + config.PERIOD),
state = 'pending', cookie=0)
Expand Down Expand Up @@ -475,9 +475,20 @@ def startup(self):
except: continue

## Check the schema:
dbh.check_index("jobs", "state")
DB.check_column_in_table(None, 'jobs', 'priority', 'int default 10')
DB.check_column_in_table(None, 'jobs', 'pid', 'int default 0')
DB.check_column_in_table(None, 'jobs', 'when_valid',
'TIMESTAMP ON UPDATE CURRENT_TIMESTAMP NOT NULL')

## Check for the high_priority_jobs table (its basically
## another jobs table for high priority jobs - so workers
## first check this table before the main jobs table).
try:
dbh.execute("select * from high_priority_jobs limit 1")
except:
dbh.execute("create table if not exists high_priority_jobs like jobs")

## Schedule the first periodic task:
task = Periodic()
task.schedule()
Expand Down
61 changes: 40 additions & 21 deletions src/pyflag/Farm.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ def worker_run(keepalive=None):
## These are all the methods we support
jobs = []

my_pid = os.getpid()

## This is the last broadcast message we handled. We will
## only handle broadcasts newer than this.
broadcast_id = 0
Expand Down Expand Up @@ -366,31 +368,48 @@ def worker_run(keepalive=None):
try:
dbh = DB.DBO()
try:
dbh.execute("lock tables jobs write")
## Higher priority jobs take precendence over lower
## priority jobs. We only want jobs which are valid
## now (jobs can be set in the future). We assume
## that we actually can process all jobs (all workers
## must have all the same plugins).
dbh.execute("select * from jobs where when_valid <= now() "
" and ((state='pending') or (state='broadcast' and "
" id>%r)) order by id asc, priority desc limit %s",
(broadcast_id, config.JOB_QUEUE))
jobs = [ row for row in dbh ]

if not jobs:
continue

## See if there are any high priority jobs to do:
dbh.execute("lock tables high_priority_jobs write")
dbh.execute("select * from high_priority_jobs where "
"when_valid <= now() and state='pending' limit %s", config.JOB_QUEUE)
jobs = [ row for row in dbh ]

## Ensure the jobs are marked as processing so other jobs dont touch them:
for row in jobs:
if row['state'] == 'pending':
dbh.execute("update jobs set state='processing' where id=%r", row['id'])
elif row['state'] == 'broadcast':
broadcast_id = row['id']
if jobs:
for row in jobs:
if row['state'] == 'pending':
dbh.execute("update high_priority_jobs set state='processing', pid=%r where id=%r",
my_pid,
row['id'])
else:
dbh.execute("unlock tables")
dbh.execute("lock tables jobs write")
## Higher priority jobs take precendence over lower
## priority jobs. We only want jobs which are valid
## now (jobs can be set in the future). We assume
## that we actually can process all jobs (all workers
## must have all the same plugins).
dbh.execute("select * from jobs where ((state='pending') "
"or (state='broadcast' and id>%r)) limit %s",
(broadcast_id, config.JOB_QUEUE))
jobs = [ row for row in dbh ]

if not jobs:
continue

## Ensure the jobs are marked as processing so other jobs dont touch them:
for row in jobs:
if row['state'] == 'pending':
dbh.execute("update jobs set state='processing', pid=%r where id=%r",
my_pid,
row['id'])
elif row['state'] == 'broadcast':
broadcast_id = row['id']
finally:
if dbh:
dbh.execute("unlock tables")
except:
except Exception,e:
print e
continue

## Now do the jobs
Expand Down
2 changes: 1 addition & 1 deletion src/pyflag/Scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ def get_factories(case,scanners):
try:
cls=Registry.SCANNERS.dispatch(scanner)
except:
pyflaglog.log(pyflaglog.WARNING, "Unable to find scanner for %s", scanner)
#pyflaglog.log(pyflaglog.WARNING, "Unable to find scanner for %s", scanner)
continue

#Instatiate it:
Expand Down
4 changes: 2 additions & 2 deletions tests/pyflash
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ INSTALL_DIR=/var/tmp/build/pyflag/
######### END CONFIGURATION ####################

export PYFLAG_PLUGINS=$SRC_DIR/src/plugins
export PYTHONPATH=$SRC_DIR/src/pyflag:$SRC_DIR/src/:/usr/loca/lib/python2.5/site-packages/:$INSTALL_DIR/lib/python2.5/site-packages/pyflag
export PYTHONPATH=$SRC_DIR/src/pyflag:$SRC_DIR/src/:/usr/loca/lib/python2.5/site-packages/:$INSTALL_DIR/lib/python2.5/site-packages/pyflag:/usr/loca/lib/python2.6/site-packages/:$INSTALL_DIR/lib/python2.6/site-packages/pyflag

echo $PYTHONPATH

exec /usr/bin/python2.5 $SRC_DIR/src/pyflag/pyflagsh.py --plugins=$SRC_DIR/src/plugins $@
exec /usr/bin/python $SRC_DIR/src/pyflag/pyflagsh.py --plugins=$SRC_DIR/src/plugins $@

0 comments on commit f28259d

Please sign in to comment.