Skip to content

Commit

Permalink
wait for output thread to finish reading (oracle#2126)
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladimir Kotal authored May 29, 2018
1 parent 6b4b1ac commit 64006f5
Showing 1 changed file with 23 additions and 9 deletions.
32 changes: 23 additions & 9 deletions tools/sync/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,24 +113,27 @@ class OutputThread(threading.Thread):
stdout/stderr buffers fill up.
"""

def __init__(self):
def __init__(self, event, logger):
super(OutputThread, self).__init__()
self.read_fd, self.write_fd = os.pipe()
self.pipe_fobj = os.fdopen(self.read_fd)
self.out = []
self.event = event
self.logger = logger
self.start()

def run(self):
"""
It might happen that after the process is gone, the thread
still has data to read from the pipe. Should probably introduce
a boolean and set it to True under the 'if not line' block
below and make the caller wait for it to become True.
still has data to read from the pipe. Hence, event is used
to synchronize with the caller.
"""
while True:
line = self.pipe_fobj.readline()
if not line:
self.logger.debug("end of output")
self.pipe_fobj.close()
self.event.set()
return

self.out.append(line)
Expand All @@ -142,6 +145,7 @@ def fileno(self):
return self.write_fd

def close(self):
self.logger.debug("closed")
os.close(self.write_fd)

orig_work_dir = None
Expand All @@ -163,7 +167,8 @@ def close(self):
return

timeout_thread = None
output_thread = OutputThread()
event = threading.Event()
output_thread = OutputThread(event, self.logger)
try:
start_time = time.time()
try:
Expand All @@ -183,18 +188,20 @@ def close(self):
self.pid = p.pid

if self.timeout:
condition = threading.Condition()
time_condition = threading.Condition()
self.logger.debug("Setting timeout to {}".format(self.timeout))
timeout_thread = TimeoutThread(self.logger, self.timeout,
condition, p)
time_condition, p)

self.logger.debug("Waiting for process with PID {}".format(p.pid))
p.wait()
self.logger.debug("done waiting")

if self.timeout:
e = timeout_thread.get_exception()
if e:
raise e

except KeyboardInterrupt as e:
self.logger.info("Got KeyboardException while processing ",
exc_info=True)
Expand All @@ -211,9 +218,16 @@ def close(self):
self.logger.debug("{} -> {}".format(self.cmd, self.getretcode()))
finally:
if self.timeout != 0 and timeout_thread:
with condition:
condition.notifyAll()
with time_condition:
time_condition.notifyAll()

# The subprocess module does not close the write pipe descriptor
# it fetched via OutputThread's fileno() so in order to gracefully
# exit the read loop we have to close it here ourselves.
output_thread.close()
self.logger.debug("Waiting on output thread to finish reading")
event.wait()

self.out = output_thread.getoutput()
elapsed_time = time.time() - start_time
self.logger.debug("Command {} took {} seconds".
Expand Down

0 comments on commit 64006f5

Please sign in to comment.