diff --git a/.gitignore b/.gitignore index fe3e9d070..5a3e21a79 100644 --- a/.gitignore +++ b/.gitignore @@ -73,4 +73,4 @@ tools/conda-tools/ *.snakemake/ -.pytest_cache/ \ No newline at end of file +.pytest_cache/ diff --git a/.travis.yml b/.travis.yml index 748ddaf7b..7b6da85e6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -163,7 +163,7 @@ jobs: sudo: required env: - TRAVIS_JOB=test_py27 - - PYTEST_ADDOPTS="-rsxX -n 2 --durations=25 --fixture-durations=10 --junit-xml=pytest.xml --cov-report= --cov broad_utils --cov illumina --cov assembly --cov interhost --cov intrahost --cov metagenomics --cov ncbi --cov read_utils --cov reports --cov taxon_filter --cov tools --cov util" + - PYTEST_ADDOPTS="-rsxX -n 2 --durations=25 --fixture-durations=10 --junit-xml=pytest.xml --cov-report= --cov broad_utils --cov illumina --cov assembly --cov interhost --cov intrahost --cov metagenomics --cov ncbi --cov read_utils --cov reports --cov taxon_filter --cov tools --cov util --cov file_utils" # $BUNDLE_SECRET (for testing GATK) - secure: KX7DwKRD85S7NgspxevgbulTtV+jHQIiM6NBus2/Ur/P0RMdpt0EQQ2wDq79qGN70bvvkw901N7EjSYd+GWCAM7StXtaxnLRrrZ3XI1gX7KMk8E3QzPf0zualLDs7cuQmL6l6WiElUAEqumLc7WGpLZZLdSPzNqFSg+CBKCmTI8= before_install: travis/before_install.sh @@ -184,7 +184,7 @@ jobs: sudo: required env: - TRAVIS_JOB=test_py36 - - PYTEST_ADDOPTS="-rsxX -n 2 --durations=25 --fixture-durations=10 --junit-xml=pytest.xml --cov-report= --cov broad_utils --cov illumina --cov assembly --cov interhost --cov intrahost --cov metagenomics --cov ncbi --cov read_utils --cov reports --cov taxon_filter --cov tools --cov util" + - PYTEST_ADDOPTS="-rsxX -n 2 --durations=25 --fixture-durations=10 --junit-xml=pytest.xml --cov-report= --cov broad_utils --cov illumina --cov assembly --cov interhost --cov intrahost --cov metagenomics --cov ncbi --cov read_utils --cov reports --cov taxon_filter --cov tools --cov util --cov file_utils" # $BUNDLE_SECRET (for testing GATK) - secure: KX7DwKRD85S7NgspxevgbulTtV+jHQIiM6NBus2/Ur/P0RMdpt0EQQ2wDq79qGN70bvvkw901N7EjSYd+GWCAM7StXtaxnLRrrZ3XI1gX7KMk8E3QzPf0zualLDs7cuQmL6l6WiElUAEqumLc7WGpLZZLdSPzNqFSg+CBKCmTI8= before_install: travis/before_install.sh @@ -207,7 +207,7 @@ jobs: sudo: required env: - TRAVIS_JOB=test_snakemake - - PYTEST_ADDOPTS="-rsxX -n 2 --durations=25 --fixture-durations=10 --junit-xml=pytest.xml --cov-report= --cov broad_utils --cov illumina --cov assembly --cov interhost --cov intrahost --cov metagenomics --cov ncbi --cov read_utils --cov reports --cov taxon_filter --cov tools --cov util" + - PYTEST_ADDOPTS="-rsxX -n 2 --durations=25 --fixture-durations=10 --junit-xml=pytest.xml --cov-report= --cov broad_utils --cov illumina --cov assembly --cov interhost --cov intrahost --cov metagenomics --cov ncbi --cov read_utils --cov reports --cov taxon_filter --cov tools --cov util --cov file_utils" # $BUNDLE_SECRET (for testing GATK) - secure: KX7DwKRD85S7NgspxevgbulTtV+jHQIiM6NBus2/Ur/P0RMdpt0EQQ2wDq79qGN70bvvkw901N7EjSYd+GWCAM7StXtaxnLRrrZ3XI1gX7KMk8E3QzPf0zualLDs7cuQmL6l6WiElUAEqumLc7WGpLZZLdSPzNqFSg+CBKCmTI8= before_install: travis/before_install.sh diff --git a/docs/cmdline.rst b/docs/cmdline.rst index 978f534b3..c7067533e 100644 --- a/docs/cmdline.rst +++ b/docs/cmdline.rst @@ -13,5 +13,6 @@ Command line tools illumina broad_utils ncbi + file_utils diff --git a/docs/file_utils.rst b/docs/file_utils.rst new file mode 100644 index 000000000..1214e53d9 --- /dev/null +++ b/docs/file_utils.rst @@ -0,0 +1,7 @@ +file_utils.py - utilities to perform various file manipulations +============================================================= + +.. argparse:: + :module: file_utils + :func: full_parser + :prog: file_utils.py diff --git a/docs/index.rst b/docs/index.rst index b878c18ff..25488e039 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -19,4 +19,4 @@ Contents cmdline pipes-wdl pipes-snakemake - development +.. development diff --git a/file_utils.py b/file_utils.py new file mode 100755 index 000000000..3b6fbf8bb --- /dev/null +++ b/file_utils.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python +""" +Utilities for dealing with files. +""" + +__author__ = "tomkinsc@broadinstitute.org" +__commands__ = [] + +import argparse +import logging + +import util.cmd +import util.file + +log = logging.getLogger(__name__) + + +# ============================== +# *** merge_tarballs *** +# ============================== + +def merge_tarballs(out_tarball, in_tarballs, threads=None, extract_to_disk_path=None, pipe_hint_in=None, pipe_hint_out=None): + ''' Merges separate tarballs into one tarball + data can be piped in and/or out + ''' + util.file.repack_tarballs(out_tarball, in_tarballs, threads=threads, extract_to_disk_path=extract_to_disk_path, pipe_hint_in=pipe_hint_in, pipe_hint_out=pipe_hint_out) + return 0 +def parser_merge_tarballs(parser=argparse.ArgumentParser()): + parser.add_argument( + 'out_tarball', + help='''output tarball (*.tar.gz|*.tar.lz4|*.tar.bz2|-); + compression is inferred by the file extension. + Note: if "-" is used, output will be written to stdout and + --pipeOutHint must be provided to indicate compression type + when compression type is not gzip (gzip is used by default). + ''') + parser.add_argument( + 'in_tarballs', nargs='+', + help=('input tarballs (*.tar.gz|*.tar.lz4|*.tar.bz2)') + ) + parser.add_argument('--extractToDiskPath', + dest="extract_to_disk_path", + help='If specified, the tar contents will also be extracted to a local directory.') + parser.add_argument('--pipeInHint', + dest="pipe_hint_in", + default="gz", + help='If specified, the compression type used is used for piped input.') + parser.add_argument('--pipeOutHint', + dest="pipe_hint_out", + default="gz", + help='If specified, the compression type used is used for piped output.') + util.cmd.common_args(parser, (('threads', None), ('loglevel', None), ('version', None), ('tmp_dir', None))) + util.cmd.attach_main(parser, merge_tarballs, split_args=True) + return parser +__commands__.append(('merge_tarballs', parser_merge_tarballs)) + + +# ======================= +def full_parser(): + return util.cmd.make_parser(__commands__, __doc__) + + +if __name__ == '__main__': + util.cmd.main_argparse(__commands__, __doc__) \ No newline at end of file diff --git a/test/input/TestTarballMerger/compressed-input/file1.tar.gz b/test/input/TestTarballMerger/compressed-input/file1.tar.gz new file mode 100644 index 000000000..09c330ea7 Binary files /dev/null and b/test/input/TestTarballMerger/compressed-input/file1.tar.gz differ diff --git a/test/input/TestTarballMerger/compressed-input/file2.tar.gz b/test/input/TestTarballMerger/compressed-input/file2.tar.gz new file mode 100644 index 000000000..9a271a591 Binary files /dev/null and b/test/input/TestTarballMerger/compressed-input/file2.tar.gz differ diff --git a/test/input/TestTarballMerger/compressed-input/file3.tar.gz b/test/input/TestTarballMerger/compressed-input/file3.tar.gz new file mode 100644 index 000000000..5fd0bf750 Binary files /dev/null and b/test/input/TestTarballMerger/compressed-input/file3.tar.gz differ diff --git a/test/input/TestTarballMerger/compressed-input/file4.tar.gz b/test/input/TestTarballMerger/compressed-input/file4.tar.gz new file mode 100644 index 000000000..305c00c24 Binary files /dev/null and b/test/input/TestTarballMerger/compressed-input/file4.tar.gz differ diff --git a/test/input/TestTarballMerger/out.tar.gz b/test/input/TestTarballMerger/out.tar.gz new file mode 100644 index 000000000..58bfc4394 Binary files /dev/null and b/test/input/TestTarballMerger/out.tar.gz differ diff --git a/test/input/TestTarballMerger/raw-input/file1 b/test/input/TestTarballMerger/raw-input/file1 new file mode 100644 index 000000000..5b006ceab Binary files /dev/null and b/test/input/TestTarballMerger/raw-input/file1 differ diff --git a/test/input/TestTarballMerger/raw-input/file2 b/test/input/TestTarballMerger/raw-input/file2 new file mode 100644 index 000000000..2f7d4853b Binary files /dev/null and b/test/input/TestTarballMerger/raw-input/file2 differ diff --git a/test/input/TestTarballMerger/raw-input/file3 b/test/input/TestTarballMerger/raw-input/file3 new file mode 100644 index 000000000..ca1d83a5a Binary files /dev/null and b/test/input/TestTarballMerger/raw-input/file3 differ diff --git a/test/input/TestTarballMerger/raw-input/file4 b/test/input/TestTarballMerger/raw-input/file4 new file mode 100644 index 000000000..d4da5af8e Binary files /dev/null and b/test/input/TestTarballMerger/raw-input/file4 differ diff --git a/test/unit/test_file_utils.py b/test/unit/test_file_utils.py new file mode 100644 index 000000000..628b824de --- /dev/null +++ b/test/unit/test_file_utils.py @@ -0,0 +1,200 @@ +# Unit tests for ncbi.py + +__author__ = "tomkinsc@broadinstitute.org" + +# built-ins +import os +import sys +import tempfile +import unittest +import argparse +import tarfile +import subprocess + +# third-party +import pytest +from mock import patch + +# module-specific +import file_utils +import util.file +from test import TestCaseWithTmp, assert_equal_contents + + +class TestCommandHelp(unittest.TestCase): + + def test_help_parser_for_each_command(self): + for cmd_name, parser_func in file_utils.__commands__: + parser = parser_func(argparse.ArgumentParser()) + helpstring = parser.format_help() + +class TestTarballMerger(TestCaseWithTmp): + def setUp(self): + super(TestTarballMerger, self).setUp() + self.input_dir = util.file.get_test_input_path(self) + self.raw_files = ["file{}".format(x) for x in range(1,5)] + self.input_tgz_files = [os.path.join(self.input_dir, "compressed-input", x+".tar.gz") for x in self.raw_files] + + def test_simple_merge(self): + """ + Simple repack test + """ + temp_dir = tempfile.gettempdir() + out_tarball_file = os.path.join(temp_dir,"out.tar.gz") + + file_utils.merge_tarballs( out_tarball_file, + self.input_tgz_files + ) + + tb = tarfile.open(out_tarball_file) + tb.extractall(path=temp_dir) + + for i in range(len(self.raw_files)): + inf = os.path.join(self.input_dir,"raw-input",self.raw_files[i]) + outf = os.path.join(temp_dir,self.raw_files[i]) + + assert_equal_contents(self, inf, outf) + + + def test_merge_with_extract(self): + """ + Test streaming repack with intermediate extraction to disk + """ + temp_dir = tempfile.gettempdir() + out_tarball_file = os.path.join(temp_dir,"out.tar.gz") + out_extracted_path = os.path.join(temp_dir,"extracted") + + file_utils.merge_tarballs( out_tarball_file, + self.input_tgz_files, + extract_to_disk_path=out_extracted_path + ) + + tb = tarfile.open(out_tarball_file) + tb.extractall(path=temp_dir) + + # inspect merged + for i in range(len(self.raw_files)): + inf = os.path.join(self.input_dir,"raw-input",self.raw_files[i]) + outf = os.path.join(temp_dir,self.raw_files[i]) + + assert_equal_contents(self, inf, outf) + + # inspect extracted + for i in range(len(self.raw_files)): + inf = os.path.join(self.input_dir,"raw-input",self.raw_files[i]) + outf = os.path.join(out_extracted_path,self.raw_files[i]) + + assert_equal_contents(self, inf, outf) + + def test_merge_with_extract_repack_from_disk(self): + """ + Test with repack from disk source after extraction + """ + temp_dir = tempfile.gettempdir() + out_tarball_file = os.path.join(temp_dir,"out.tar.gz") + out_extracted_path = os.path.join(temp_dir,"extracted") + + util.file.repack_tarballs( out_tarball_file, + self.input_tgz_files, + extract_to_disk_path=out_extracted_path, + avoid_disk_roundtrip=False + ) + + tb = tarfile.open(out_tarball_file) + tb.extractall(path=temp_dir) + + # inspect merged + for i in range(len(self.raw_files)): + inf = os.path.join(self.input_dir,"raw-input",self.raw_files[i]) + outf = os.path.join(temp_dir,self.raw_files[i]) + + assert_equal_contents(self, inf, outf) + + # inspect extracted + for i in range(len(self.raw_files)): + inf = os.path.join(self.input_dir,"raw-input",self.raw_files[i]) + outf = os.path.join(out_extracted_path,self.raw_files[i]) + + assert_equal_contents(self, inf, outf) + + + def test_piped_in_merge(self): + """ + Test with streamed input + """ + temp_dir = tempfile.gettempdir() + out_tarball_file = os.path.join(temp_dir,"out.tar.gz") + + ps = subprocess.Popen("cat {files}".format(files=' '.join(self.input_tgz_files)).split(), stdout=subprocess.PIPE) + with patch('sys.stdin', ps.stdout): + file_utils.merge_tarballs( out_tarball_file, + ["-"], + pipe_hint_in="gz" ) + ps.wait() + + tb = tarfile.open(out_tarball_file) + tb.extractall(path=temp_dir) + + for i in range(len(self.raw_files)): + inf = os.path.join(self.input_dir,"raw-input",self.raw_files[i]) + outf = os.path.join(temp_dir,self.raw_files[i]) + + assert_equal_contents(self, inf, outf) + + @pytest.fixture(autouse=True) + def capsys(self, capsys): + self.capsys = capsys + + def test_piped_out_merge(self): + """ + Test with streamed output + """ + temp_dir = tempfile.gettempdir() + out_tarball_file = os.path.join(temp_dir,"out.tar.gz") + + with open(out_tarball_file, "wb", 0) as outf: + # temporarily disable pytest's capture of sys.stdout + with self.capsys.disabled(): + with patch('sys.stdout', outf): + file_utils.merge_tarballs( "-", + self.input_tgz_files, + pipe_hint_out="gz" ) + + tb = tarfile.open(out_tarball_file) + tb.extractall(path=temp_dir) + + for i in range(len(self.raw_files)): + inf = os.path.join(self.input_dir,"raw-input",self.raw_files[i]) + outf = os.path.join(temp_dir,self.raw_files[i]) + + assert_equal_contents(self, inf, outf) + + def test_merge_piped_in_and_out(self): + """ + Test with streamed input and output + """ + temp_dir = tempfile.gettempdir() + + out_tarball_file = os.path.join(temp_dir,"out.tar.gz") + + ps = subprocess.Popen("cat {files}".format(files=' '.join(self.input_tgz_files)).split(), stdout=subprocess.PIPE) + with patch('sys.stdin', ps.stdout): + with open(out_tarball_file, "wb", 0) as outf: + # temporarily disable pytest's capture of sys.stdout + with self.capsys.disabled(): + with patch('sys.stdout', outf): + file_utils.merge_tarballs( "-", + ["-"], + pipe_hint_out="gz", + pipe_hint_in="gz" ) + ps.wait() + + tb = tarfile.open(out_tarball_file) + tb.extractall(path=temp_dir) + + for i in range(len(self.raw_files)): + inf = os.path.join(self.input_dir,"raw-input",self.raw_files[i]) + outf = os.path.join(temp_dir,self.raw_files[i]) + + assert_equal_contents(self, inf, outf) + diff --git a/util/file.py b/util/file.py index ddc2456e0..df38b9fb2 100644 --- a/util/file.py +++ b/util/file.py @@ -19,6 +19,7 @@ import io import csv import inspect +import tarfile import util.cmd import util.misc @@ -830,3 +831,125 @@ def join_paired_fastq(input_fastqs, output_format='fastq', num_n=None): } rec = SeqRecord(jseq, id=rid, description='', letter_annotations=labbrevs) yield rec + + +def repack_tarballs(out_compressed_tarball, + input_compressed_tarballs, + extract_to_disk_path=None, + extract_numeric_owner=False, + avoid_disk_roundtrip=True, + ignore_zeros=True, + pipe_hint_in=None, + pipe_hint_out=None, + threads=None): + threads = util.misc.sanitize_thread_count(threads) + + def choose_compressor(filepath, threads=8): + return_obj = {} + filepath = filepath.lower() + if re.search(r'(\.?tgz|\.?gz)$', filepath): + compressor = 'pigz {threads}'.format(threads="-p "+str(threads) if threads else "").split() + return_obj["decompress_cmd"] = compressor + ["-dc"] + return_obj["compress_cmd"] = compressor + ["-c"] + elif re.search(r'\.?bz2$', filepath): + compressor = 'lbzip2 {threads}'.format(threads="-n "+str(threads) if threads else "").split() + return_obj["decompress_cmd"] = compressor + ["-dc"] + return_obj["compress_cmd"] = compressor + ["-c"] + elif re.search(r'\.?lz4$', filepath): + compressor = ['lz4'] + return_obj["decompress_cmd"] = compressor + ["-dc"] + return_obj["compress_cmd"] = compressor + ["-c"] + elif re.search(r'\.?tar$', filepath): + compressor = ['cat'] + return_obj["decompress_cmd"] = compressor + return_obj["compress_cmd"] = compressor + else: + raise IOError("An input file of unknown type was provided: %s" % filepath) + return return_obj + + class FileDiverter(object): + """ + This reads bytes from a TarInfo file stream, writes them to a disk file + and returns the buffered bytes as they are read + """ + def __init__(self, fileinfo, fileobj, written_mirror_file=None, extract_numeric_owner=False): + self.written_mirror_file = open(written_mirror_file,"wb") + self.fileinfo = fileinfo + self.fileobj = fileobj + self.extract_numeric_owner = extract_numeric_owner + + def __del__(self): + self.written_mirror_file.close() + + tar_in.chown(self.fileinfo, self.written_mirror_file.name, self.extract_numeric_owner) + if not self.fileinfo.issym(): + tar_in.chmod(self.fileinfo, self.written_mirror_file.name) + tar_in.utime(self.fileinfo, self.written_mirror_file.name) + + def read(self, size): + assert size is not None + + buf = self.fileobj.read(size) + self.written_mirror_file.write(buf) + return buf + + if extract_to_disk_path and not os.path.isdir(extract_to_disk_path): + mkdir_p(extract_to_disk_path) + + if out_compressed_tarball == "-": + if not pipe_hint_out: + raise IOError("cannot autodetect compression for stdoud unless pipeOutHint provided") + compressor = choose_compressor(pipe_hint_out)["compress_cmd"] + outfile = None + else: + compressor =choose_compressor(out_compressed_tarball)["compress_cmd"] + outfile = open(out_compressed_tarball, "w") + + out_compress_ps = subprocess.Popen(compressor, stdout=sys.stdout if out_compressed_tarball == "-" else outfile, stdin=subprocess.PIPE) + + tar_out = tarfile.open(fileobj=out_compress_ps.stdin, mode="w|") + + for in_compressed_tarball in input_compressed_tarballs: + if in_compressed_tarball != "-": + pigz_ps = subprocess.Popen(choose_compressor(in_compressed_tarball)["decompress_cmd"] + [in_compressed_tarball], stdout=subprocess.PIPE) + else: + if not pipe_hint_in: + raise IOError("cannot autodetect compression for stdin unless pipeInHint provided") + pigz_ps = subprocess.Popen(choose_compressor(pipe_hint_in)["decompress_cmd"] + [in_compressed_tarball], stdout=subprocess.PIPE, stdin=sys.stdin) + tar_in = tarfile.open(fileobj=pigz_ps.stdout, mode="r|", ignore_zeros=True) + + fileinfo = tar_in.next() + while fileinfo is not None: + if extract_to_disk_path: + target_path = os.path.normpath(os.path.join(extract_to_disk_path, fileinfo.name).rstrip("/")) + containing_path = os.path.dirname(target_path) + mkdir_p(containing_path) + + if avoid_disk_roundtrip and fileinfo.isreg(): + # avoid disk round trip for regular files (don't re-read from disk to write to new tar) + fileobj = tar_in.extractfile(fileinfo) + tar_out.addfile(fileinfo, fileobj=FileDiverter(fileinfo, fileobj, written_mirror_file=target_path)) + else: + # write to disk, add to new tarball from disk + outfile = tar_in.extract(fileinfo, path=extract_to_disk_path) + with pushd_popd(extract_to_disk_path): + tar_out.add(fileinfo.name) + else: + # if we're not extracting to disk, stream directly between tarballs + fileobj = tar_in.extractfile(fileinfo) + tar_out.addfile(fileinfo, fileobj=fileobj) + + fileinfo = tar_in.next() + pigz_ps.wait() + tar_in.close() + if pigz_ps.returncode != 0: + raise subprocess.CalledProcessError(pigz_ps.returncode, "Call error %s" % pigz_ps.returncode) + + tar_out.close() + out_compress_ps.stdin.close() + out_compress_ps.wait() + if out_compress_ps.returncode != 0: + raise subprocess.CalledProcessError(out_compress_ps.returncode, "Call error %s" % out_compress_ps.returncode) + + if outfile is not None: + outfile.close()