Skip to content

Commit

Permalink
Updated incremental loader to use the nanny. Implemented a memory che…
Browse files Browse the repository at this point in the history
…ck limit for workers.

darcs-hash:20090113121902-f1522-4c6931cd840061655cf5918fab52fc1b9f012972.gz
  • Loading branch information
scudette committed Jan 13, 2009
1 parent 3ac19f9 commit c754b36
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 35 deletions.
22 changes: 17 additions & 5 deletions src/pyflag/Farm.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,19 @@ def terminate_children():
## Stop our logging thread
pyflaglog.kill_logging_thread()

config.add_option("MAXIMUM_WORKER_MEMORY", default=0, type='int',
help='Maximum amount of memory (Mb) the worker is allowed to consume (0=unlimited,default)')

def check_mem(cb, *args, **kwargs):
""" Checks for our current memory usage - if it exceeds the limit
we exit and let the nanny restart us.
"""
if config.MAXIMUM_WORKER_MEMORY > 0:
mem = open("/proc/%s/statm" % os.getpid()).read().split()
if int(mem[1])*4096/1024/1024 > config.MAXIMUM_WORKER_MEMORY:
pyflaglog.log(pyflaglog.WARNING, "Process resident memory exceeds threshold. Exiting")
cb(*args, **kwargs)

class Task:
""" All distributed tasks need to extend this subclass """

Expand Down Expand Up @@ -271,13 +284,11 @@ def nanny(cb, *args, **kwargs):

## This should never return but just in case we wait a bit
## so as not to spin if the child fails to start
time.sleep(60)
time.sleep(10)


def worker_run():
""" The main loop of the worker """
atexit.register(child_exist)

## It is an error to fork with db connections
## established... they can not be shared:
if DB.db_connections > 0:
Expand Down Expand Up @@ -306,6 +317,9 @@ def worker_run():
except: pass

while 1:
## Check for memory usage
check_mem(sys.exit,0)

## Check for new tasks:
if not jobs:
try:
Expand Down Expand Up @@ -413,10 +427,8 @@ def start_workers():
def handler(sig, frame):
pyflaglog.kill_logging_thread()
if sig==signal.SIGABRT:
print "This is %s handling signal %s" % (os.getpid(), sig)
for child in children:
try:
print "Killing %s from %s using %s" % (pid, os.getpid(), sig)
os.kill(pid, sig)
except: pass

Expand Down
84 changes: 54 additions & 30 deletions utilities/incremental_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
NOTE: This loader does not start any workers, if you want to scan the
data as well you will need to start seperate workers.
if output_file is '-' we skip writing altogether. This is useful if
you never want to examine packets and only want to see reassembled
streams.
""", version = "Version: %%prog PyFlag %s" % config.VERSION)

config.add_option("case", default=None,
Expand Down Expand Up @@ -92,30 +96,33 @@

## Check if the file is already there:
filename = config.UPLOADDIR + '/' + output_file
try:
os.stat(filename)
## Yep its there:
output_fd = open(filename, 'a')
output_fd.seek(0,os.SEEK_END)
offset = output_fd.tell()

## There can be only one:
if output_file != '-':
try:
fcntl.flock(output_fd,fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError,e:
print "Highlander Error: %s" % e
sys.exit(1)

except OSError:
output_fd = open(filename, 'w')
os.stat(filename)
## Yep its there:
output_fd = open(filename, 'a')
output_fd.seek(0,os.SEEK_END)
offset = output_fd.tell()

## There can be only one:
try:
fcntl.flock(output_fd,fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError,e:
print "Highlander Error: %s" % e
sys.exit(1)

## This is a hardcoded header for the output file:
header = '\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x01\x00\x00\x00'
offset = len(header)
except OSError:
output_fd = open(filename, 'w')

## Write the file header on
output_fd.write(header)
output_fd.flush()
## This is a hardcoded header for the output file:
header = '\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x01\x00\x00\x00'
offset = len(header)

## Write the file header on
output_fd.write(header)
output_fd.flush()
else:
output_fd = None

## Make a new IO source for the output:
try:
Expand Down Expand Up @@ -178,12 +185,15 @@ def load_file(filename):

processor.process(packet)

## Write the packet on the output file:
packet_data = packet.serialise("little")
offset += len(packet_data)
output_fd.write(packet_data)
if output_fd:
## Write the packet on the output file:
packet_data = packet.serialise("little")
offset += len(packet_data)
output_fd.write(packet_data)

output_fd.flush()
if output_fd:
output_fd.flush()

pcap_dbh.delete("connection_details",
where = "inode_id is null")
pcap_dbh.mass_insert_commit()
Expand All @@ -206,7 +216,7 @@ def load_file(filename):
## Start up some workers if needed:
Farm.start_workers()

while 1:
def run():
t = os.stat(directory).st_mtime
if t>=last_mtime:
last_mtime = t
Expand Down Expand Up @@ -242,7 +252,7 @@ def load_file(filename):
continue
else:
break

sys.exit(0)

## We need to flush the decoder:
Expand All @@ -251,5 +261,19 @@ def load_file(filename):
processor.flush()
last_time = time.time()

print "%s: Sleeping for %s seconds" % (time.ctime(), config.sleep)
time.sleep(config.sleep)
def finish():
print "Loader Process size increased above threashold. Exiting and restarting."
processor.flush()
sys.exit(0)

def main():
while 1:
run()
if config.MAXIMUM_WORKER_MEMORY > 0:
Farm.check_mem(finish)

print "%s: Sleeping for %s seconds" % (time.ctime(), config.sleep)
time.sleep(config.sleep)

## Run the main loader under the nanny
Farm.nanny(main)

0 comments on commit c754b36

Please sign in to comment.