Skip to content

Commit

Permalink
Merge pull request fireeye#71 from fireeye/elazar-changes
Browse files Browse the repository at this point in the history
Elazar changes
  • Loading branch information
B0fH authored Mar 18, 2021
2 parents 13061ee + 4214061 commit 4db2f44
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 29 deletions.
18 changes: 10 additions & 8 deletions hxtool_scheduler_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,14 @@ def set_schedule(self, seconds = 0, minutes = 0, hours = 0, days = 0, weeks = 0)
}

def should_run(self):
return (self.next_run is not None and
self.enabled and
self.state == task_states.TASK_STATE_SCHEDULED and
(self.parent_complete if (self.parent_id and self.wait_for_parent) else True) and
datetime.datetime.utcnow() >= self.next_run)

return (
self.enabled and
self.state is task_states.TASK_STATE_SCHEDULED and
self.next_run is not None and
datetime.datetime.utcnow() >= self.next_run and
(self.parent_complete if self.parent_id is not None and self.wait_for_parent else True)
)

def add_step(self, module, func = "run", args = (), kwargs = {}):
# This is an HXTool task module, we need to init it.
if hasattr(module, 'hxtool_task_module'):
Expand Down Expand Up @@ -201,7 +203,7 @@ def run(self, scheduler):
self.state = task_states.TASK_STATE_COMPLETE

if not self.parent_id:
hxtool_global.hxtool_scheduler.signal_child_tasks(self.task_id, self.state, self.stored_result)
scheduler.signal_child_tasks(self.task_id, self.state, self.stored_result)

self._calculate_next_run()

Expand All @@ -220,7 +222,7 @@ def run(self, scheduler):
if self.state != task_states.TASK_STATE_SCHEDULED and self._stored:
self.unstore()
if self.state != task_states.TASK_STATE_PENDING_DELETION:
hxtool_global.hxtool_scheduler.move_to_history(self.task_id)
scheduler.move_to_history(self.task_id)
else:
self.store()

Expand Down
10 changes: 8 additions & 2 deletions hxtool_task_modules/bulk_acquisition_task_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,14 @@ def run(self, script = None, hostset_id = None, comment = None, skip_base64 = Fa
if download and bulk_download_eid:
hxtool_global.hxtool_db.bulkDownloadUpdate(bulk_download_eid, bulk_acquisition_id = response_data['data']['_id'], hosts = {})
result['bulk_download_eid'] = bulk_download_eid
else:
self.logger.error("Bulk acquisition submission failed. Response code: {}, response data: {}".format(response_code, response_data))
elif not ret:
if self.can_retry(response_data):
self.logger.warning("Bulk acquisition submission failed, will defer and retry up to {} times. Response code: {}, response data: {}".format(task_module.MAX_RETRY, response_code, response_data))
self.retry_count +=1
self.parent_task.defer()
ret = True
else:
self.logger.error("Bulk acquisition submission failed and the retry count has been exceeded. Response code: {}, response data: {}".format(response_code, response_data))
else:
self.logger.warn("No task API session for profile: {}".format(self.parent_task.profile_id))
else:
Expand Down
27 changes: 18 additions & 9 deletions hxtool_task_modules/bulk_download_monitor_task_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,22 @@ def run(self, bulk_download_eid = None, task_profile = None):
hx_api_object = self.get_task_api_object()
if hx_api_object:
(ret, response_code, response_data) = hx_api_object.restGetBulkDetails(bulk_download_job['bulk_acquisition_id'])
if ret:
if response_data['data']['state'] != 'RUNNING':
self.logger.warning("The bulk acquisition job {} is not in a running state. Controller state: {}".format(bulk_download_job['bulk_acquisition_id'], response_data['data']['state']))
hxtool_global.hxtool_db.bulkDownloadUpdate(bulk_download_eid, stopped=True)
self.parent_task.stop()
return(ret, result)
else:
self.logger.error("Failed to get bulk acquisition job status for ID {}".format(bulk_download_job['bulk_acquisition_id']))
if ret and response_data['data']['state'] != 'RUNNING':
self.logger.warning("The bulk acquisition job {} is not in a running state. Controller state: {}".format(bulk_download_job['bulk_acquisition_id'], response_data['data']['state']))
hxtool_global.hxtool_db.bulkDownloadUpdate(bulk_download_eid, stopped=True)
self.parent_task.stop()
return(ret, result)
elif not ret:
if self.can_retry(response_data):
self.logger.warning("Failed to get bulk acquisition job status for ID {}, will defer and retry up to {} times.".format(bulk_download_job['bulk_acquisition_id'], task_module.MAX_RETRY))
self.retry_count +=1
self.parent_task.defer()
ret = True
else:
self.logger.error("Failed to get bulk acquisition job status for ID {}, and the retry count has been exceeded.".format(bulk_download_job['bulk_acquisition_id']))
self.parent_task.stop()

return(ret, result)

(ret, response_code, response_data) = hx_api_object.restListBulkHosts(bulk_download_job['bulk_acquisition_id'], filter_term = {'state' : 'COMPLETE'})
if ret:
Expand Down Expand Up @@ -162,8 +168,11 @@ def run(self, bulk_download_eid = None, task_profile = None):

self.parent_task.defer()
ret = True
# TODO: Add retry - though if status is successful then hosts should be successful too.
else:
self.logger.error("No task API session for profile: {}".format(self.parent_task.profile_id))
self.logger.error("Failed to get bulk acquisition job host status for ID {}.".format(bulk_download_job['bulk_acquisition_id']))
else:
self.logger.error("No task API session for profile: {}".format(self.parent_task.profile_id))
else:
self.logger.warning("Bulk download database entry {} is marked as stopped.".format(bulk_download_eid))
self.parent_task.stop()
Expand Down
18 changes: 14 additions & 4 deletions hxtool_task_modules/bulk_download_task_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def run(self, bulk_download_eid = None, agent_id = None, host_name = None):
hx_api_object = self.get_task_api_object()
if hx_api_object:
(ret, response_code, response_data) = hx_api_object.restGetBulkHost(bulk_download_job['bulk_acquisition_id'], agent_id)
if ret and isinstance(response_data, dict) and (response_data['data']['state'] == "COMPLETE" and response_data['data']['result']):
if ret and 'data' in response_data and (response_data['data']['state'] == "COMPLETE" and response_data['data']['result']):
self.logger.debug("Processing bulk download for host: {0}".format(host_name))
download_directory = make_download_directory(hx_api_object.hx_host, bulk_download_job['bulk_acquisition_id'])
full_path = os.path.join(download_directory, get_download_filename(host_name, agent_id))
Expand All @@ -85,15 +85,25 @@ def run(self, bulk_download_eid = None, agent_id = None, host_name = None):
result['bulk_download_path'] = full_path
result['agent_id'] = agent_id
result['host_name'] = host_name
elif (ret and isinstance(response_data, dict) and (response_data['data']['state'] in {'FAILED', 'CANCELLED', 'ABORTED'} or
(response_code == 404 and response_data['details'][0]['code'] == 1005))) or not ret:
self.logger.debug("Error! Controller returned code: {}, data: {}".format(response_code, response_data))
else:
self.logger.error("Failed to download bulk acquisition package for {}. Response code: {}, response data: {}".format(agent_id, response_code, response_data))
elif 'data' in response_data and (response_code == 404 and response_data['details'][0]['code'] == 1005) or (response_data['data']['state'] in {'FAILED', 'CANCELLED', 'ABORTED'}):
self.logger.error("The bulk acquisition job {} for {} has failed, been canceled, aborted or cannot be found. Response code: {}, response data: {}".format(bulk_download_job['bulk_acquisition_id'], agent_id, response_code, response_data))
self.parent_task.stop()
hxtool_global.hxtool_db.bulkDownloadDeleteHost(bulk_download_eid, agent_id)
ret = False
elif ret:
self.logger.debug("Deferring bulk download task for: {}".format(host_name))
self.parent_task.defer()
elif not ret:
if self.can_retry(response_data):
self.logger.warning("Failed to check bulk acquisition job status for {}, will defer and retry up to {} times. Response code: {}, response data: {}".format(agent_id, task_module.MAX_RETRY, response_code, response_data))
self.retry_count +=1
self.parent_task.defer()
ret = True
else:
self.logger.error("Failed to check bulk acquisition job status for {} and the retry count has been exceeded. Response code: {}, response data: {}".format(agent_id, response_code, response_data))

else:
self.logger.warn("No task API session for profile: {}".format(self.parent_task.profile_id))
else:
Expand Down
5 changes: 2 additions & 3 deletions hxtool_task_modules/task_api_session_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

# This is a system task module that performs the API logins needed by the task scheduler.

import hxtool_global
from .task_module import *
from hx_lib import *

Expand Down Expand Up @@ -43,13 +42,13 @@ def output_args():

def run(self, profile_id = None, username = None, password = None):
ret = False
hx_api_object = self.parent_task.scheduler.task_hx_api_sessions.get(self.parent_task.profile_id, None)
hx_api_object = self.parent_task.scheduler.task_hx_api_sessions.get(profile_id, None)
if hx_api_object is not None:
(ret, response_code, response_data) = hx_api_object.restLogin(username, password, auto_renew_token = True)
if ret:
self.logger.info("Successfully initialized task API session for host {} ({})".format(hx_api_object.hx_host, profile_id))
else:
self.logger.warn("Failed to initialize task API session for host {} ({})".format(hx_api_object.hx_host, profile_id))
self.logger.warn("Failed to initialize task API session for host {} ({}). Response code: {}, response data: {}".format(hx_api_object.hx_host, profile_id, response_code, response_data))
del self.parent_task.scheduler.task_hx_api_sessions[profile_id]
password = None
hx_api_object = None
Expand Down
13 changes: 10 additions & 3 deletions hxtool_task_modules/task_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@
from hxtool_util import *

class task_module(object):
def __init__(self, parent_task):
self.parent_task = parent_task
MAX_RETRY = 10

# TODO: parent_task should probably be renamed to just task, as modules are associated with tasks
# and this confuses the parent/child task relationship.
def __init__(self, task):
self.parent_task = task
self.logger = hxtool_logging.getLogger(__name__)
self.enabled = True
self.retry_count = 0

def get_task_api_object(self):
s = self.parent_task.scheduler.task_hx_api_sessions.get(self.parent_task.profile_id, None)
Expand All @@ -20,7 +25,9 @@ def get_task_api_object(self):
self.logger.error("There is no valid background task API session for profile {}".format(self.parent_task.profile_id))
return None


def can_retry(self, err):
return('connection' in str(type(err)).lower() and self.retry_count < task_module.MAX_RETRY)

def yield_audit_results(self, bulk_download_path, batch_mode, host_name, agent_id, bulk_acquisition_id = None):
hx_host = None
api_object = self.get_task_api_object()
Expand Down

0 comments on commit 4db2f44

Please sign in to comment.