Skip to content

Commit

Permalink
add tarball merger (#853)
Browse files Browse the repository at this point in the history
* add tarball merger

add a new utility function that merges separate tarballs into one tarball data can be piped in and/or out

* add simple unit test for tarball merger

* merge_tarballs relative path correction for in-mem directory repack

* add "--cov file_utils" to PYTEST_ADDOPTS

* add to docs

* add test for extracted files; bugfix extracted files given abs path

* incorporate changes following code review by @dpark01 and @yesimon

* add additional tests for tarball_merger; prevent dummy '-' file from being created when stdout is used

* cruft removal
  • Loading branch information
tomkinsc authored Jul 23, 2018
1 parent da91e3a commit 54c912b
Show file tree
Hide file tree
Showing 17 changed files with 400 additions and 5 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,4 @@ tools/conda-tools/

*.snakemake/

.pytest_cache/
.pytest_cache/
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ jobs:
sudo: required
env:
- TRAVIS_JOB=test_py27
- PYTEST_ADDOPTS="-rsxX -n 2 --durations=25 --fixture-durations=10 --junit-xml=pytest.xml --cov-report= --cov broad_utils --cov illumina --cov assembly --cov interhost --cov intrahost --cov metagenomics --cov ncbi --cov read_utils --cov reports --cov taxon_filter --cov tools --cov util"
- PYTEST_ADDOPTS="-rsxX -n 2 --durations=25 --fixture-durations=10 --junit-xml=pytest.xml --cov-report= --cov broad_utils --cov illumina --cov assembly --cov interhost --cov intrahost --cov metagenomics --cov ncbi --cov read_utils --cov reports --cov taxon_filter --cov tools --cov util --cov file_utils"
# $BUNDLE_SECRET (for testing GATK)
- secure: KX7DwKRD85S7NgspxevgbulTtV+jHQIiM6NBus2/Ur/P0RMdpt0EQQ2wDq79qGN70bvvkw901N7EjSYd+GWCAM7StXtaxnLRrrZ3XI1gX7KMk8E3QzPf0zualLDs7cuQmL6l6WiElUAEqumLc7WGpLZZLdSPzNqFSg+CBKCmTI8=
before_install: travis/before_install.sh
Expand All @@ -184,7 +184,7 @@ jobs:
sudo: required
env:
- TRAVIS_JOB=test_py36
- PYTEST_ADDOPTS="-rsxX -n 2 --durations=25 --fixture-durations=10 --junit-xml=pytest.xml --cov-report= --cov broad_utils --cov illumina --cov assembly --cov interhost --cov intrahost --cov metagenomics --cov ncbi --cov read_utils --cov reports --cov taxon_filter --cov tools --cov util"
- PYTEST_ADDOPTS="-rsxX -n 2 --durations=25 --fixture-durations=10 --junit-xml=pytest.xml --cov-report= --cov broad_utils --cov illumina --cov assembly --cov interhost --cov intrahost --cov metagenomics --cov ncbi --cov read_utils --cov reports --cov taxon_filter --cov tools --cov util --cov file_utils"
# $BUNDLE_SECRET (for testing GATK)
- secure: KX7DwKRD85S7NgspxevgbulTtV+jHQIiM6NBus2/Ur/P0RMdpt0EQQ2wDq79qGN70bvvkw901N7EjSYd+GWCAM7StXtaxnLRrrZ3XI1gX7KMk8E3QzPf0zualLDs7cuQmL6l6WiElUAEqumLc7WGpLZZLdSPzNqFSg+CBKCmTI8=
before_install: travis/before_install.sh
Expand All @@ -207,7 +207,7 @@ jobs:
sudo: required
env:
- TRAVIS_JOB=test_snakemake
- PYTEST_ADDOPTS="-rsxX -n 2 --durations=25 --fixture-durations=10 --junit-xml=pytest.xml --cov-report= --cov broad_utils --cov illumina --cov assembly --cov interhost --cov intrahost --cov metagenomics --cov ncbi --cov read_utils --cov reports --cov taxon_filter --cov tools --cov util"
- PYTEST_ADDOPTS="-rsxX -n 2 --durations=25 --fixture-durations=10 --junit-xml=pytest.xml --cov-report= --cov broad_utils --cov illumina --cov assembly --cov interhost --cov intrahost --cov metagenomics --cov ncbi --cov read_utils --cov reports --cov taxon_filter --cov tools --cov util --cov file_utils"
# $BUNDLE_SECRET (for testing GATK)
- secure: KX7DwKRD85S7NgspxevgbulTtV+jHQIiM6NBus2/Ur/P0RMdpt0EQQ2wDq79qGN70bvvkw901N7EjSYd+GWCAM7StXtaxnLRrrZ3XI1gX7KMk8E3QzPf0zualLDs7cuQmL6l6WiElUAEqumLc7WGpLZZLdSPzNqFSg+CBKCmTI8=
before_install: travis/before_install.sh
Expand Down
1 change: 1 addition & 0 deletions docs/cmdline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ Command line tools
illumina
broad_utils
ncbi
file_utils


7 changes: 7 additions & 0 deletions docs/file_utils.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
file_utils.py - utilities to perform various file manipulations
=============================================================

.. argparse::
:module: file_utils
:func: full_parser
:prog: file_utils.py
2 changes: 1 addition & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ Contents
cmdline
pipes-wdl
pipes-snakemake
development
.. development
64 changes: 64 additions & 0 deletions file_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env python
"""
Utilities for dealing with files.
"""

__author__ = "[email protected]"
__commands__ = []

import argparse
import logging

import util.cmd
import util.file

log = logging.getLogger(__name__)


# ==============================
# *** merge_tarballs ***
# ==============================

def merge_tarballs(out_tarball, in_tarballs, threads=None, extract_to_disk_path=None, pipe_hint_in=None, pipe_hint_out=None):
''' Merges separate tarballs into one tarball
data can be piped in and/or out
'''
util.file.repack_tarballs(out_tarball, in_tarballs, threads=threads, extract_to_disk_path=extract_to_disk_path, pipe_hint_in=pipe_hint_in, pipe_hint_out=pipe_hint_out)
return 0
def parser_merge_tarballs(parser=argparse.ArgumentParser()):
parser.add_argument(
'out_tarball',
help='''output tarball (*.tar.gz|*.tar.lz4|*.tar.bz2|-);
compression is inferred by the file extension.
Note: if "-" is used, output will be written to stdout and
--pipeOutHint must be provided to indicate compression type
when compression type is not gzip (gzip is used by default).
''')
parser.add_argument(
'in_tarballs', nargs='+',
help=('input tarballs (*.tar.gz|*.tar.lz4|*.tar.bz2)')
)
parser.add_argument('--extractToDiskPath',
dest="extract_to_disk_path",
help='If specified, the tar contents will also be extracted to a local directory.')
parser.add_argument('--pipeInHint',
dest="pipe_hint_in",
default="gz",
help='If specified, the compression type used is used for piped input.')
parser.add_argument('--pipeOutHint',
dest="pipe_hint_out",
default="gz",
help='If specified, the compression type used is used for piped output.')
util.cmd.common_args(parser, (('threads', None), ('loglevel', None), ('version', None), ('tmp_dir', None)))
util.cmd.attach_main(parser, merge_tarballs, split_args=True)
return parser
__commands__.append(('merge_tarballs', parser_merge_tarballs))


# =======================
def full_parser():
return util.cmd.make_parser(__commands__, __doc__)


if __name__ == '__main__':
util.cmd.main_argparse(__commands__, __doc__)
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added test/input/TestTarballMerger/out.tar.gz
Binary file not shown.
Binary file added test/input/TestTarballMerger/raw-input/file1
Binary file not shown.
Binary file added test/input/TestTarballMerger/raw-input/file2
Binary file not shown.
Binary file added test/input/TestTarballMerger/raw-input/file3
Binary file not shown.
Binary file added test/input/TestTarballMerger/raw-input/file4
Binary file not shown.
200 changes: 200 additions & 0 deletions test/unit/test_file_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
# Unit tests for ncbi.py

__author__ = "[email protected]"

# built-ins
import os
import sys
import tempfile
import unittest
import argparse
import tarfile
import subprocess

# third-party
import pytest
from mock import patch

# module-specific
import file_utils
import util.file
from test import TestCaseWithTmp, assert_equal_contents


class TestCommandHelp(unittest.TestCase):

def test_help_parser_for_each_command(self):
for cmd_name, parser_func in file_utils.__commands__:
parser = parser_func(argparse.ArgumentParser())
helpstring = parser.format_help()

class TestTarballMerger(TestCaseWithTmp):
def setUp(self):
super(TestTarballMerger, self).setUp()
self.input_dir = util.file.get_test_input_path(self)
self.raw_files = ["file{}".format(x) for x in range(1,5)]
self.input_tgz_files = [os.path.join(self.input_dir, "compressed-input", x+".tar.gz") for x in self.raw_files]

def test_simple_merge(self):
"""
Simple repack test
"""
temp_dir = tempfile.gettempdir()
out_tarball_file = os.path.join(temp_dir,"out.tar.gz")

file_utils.merge_tarballs( out_tarball_file,
self.input_tgz_files
)

tb = tarfile.open(out_tarball_file)
tb.extractall(path=temp_dir)

for i in range(len(self.raw_files)):
inf = os.path.join(self.input_dir,"raw-input",self.raw_files[i])
outf = os.path.join(temp_dir,self.raw_files[i])

assert_equal_contents(self, inf, outf)


def test_merge_with_extract(self):
"""
Test streaming repack with intermediate extraction to disk
"""
temp_dir = tempfile.gettempdir()
out_tarball_file = os.path.join(temp_dir,"out.tar.gz")
out_extracted_path = os.path.join(temp_dir,"extracted")

file_utils.merge_tarballs( out_tarball_file,
self.input_tgz_files,
extract_to_disk_path=out_extracted_path
)

tb = tarfile.open(out_tarball_file)
tb.extractall(path=temp_dir)

# inspect merged
for i in range(len(self.raw_files)):
inf = os.path.join(self.input_dir,"raw-input",self.raw_files[i])
outf = os.path.join(temp_dir,self.raw_files[i])

assert_equal_contents(self, inf, outf)

# inspect extracted
for i in range(len(self.raw_files)):
inf = os.path.join(self.input_dir,"raw-input",self.raw_files[i])
outf = os.path.join(out_extracted_path,self.raw_files[i])

assert_equal_contents(self, inf, outf)

def test_merge_with_extract_repack_from_disk(self):
"""
Test with repack from disk source after extraction
"""
temp_dir = tempfile.gettempdir()
out_tarball_file = os.path.join(temp_dir,"out.tar.gz")
out_extracted_path = os.path.join(temp_dir,"extracted")

util.file.repack_tarballs( out_tarball_file,
self.input_tgz_files,
extract_to_disk_path=out_extracted_path,
avoid_disk_roundtrip=False
)

tb = tarfile.open(out_tarball_file)
tb.extractall(path=temp_dir)

# inspect merged
for i in range(len(self.raw_files)):
inf = os.path.join(self.input_dir,"raw-input",self.raw_files[i])
outf = os.path.join(temp_dir,self.raw_files[i])

assert_equal_contents(self, inf, outf)

# inspect extracted
for i in range(len(self.raw_files)):
inf = os.path.join(self.input_dir,"raw-input",self.raw_files[i])
outf = os.path.join(out_extracted_path,self.raw_files[i])

assert_equal_contents(self, inf, outf)


def test_piped_in_merge(self):
"""
Test with streamed input
"""
temp_dir = tempfile.gettempdir()
out_tarball_file = os.path.join(temp_dir,"out.tar.gz")

ps = subprocess.Popen("cat {files}".format(files=' '.join(self.input_tgz_files)).split(), stdout=subprocess.PIPE)
with patch('sys.stdin', ps.stdout):
file_utils.merge_tarballs( out_tarball_file,
["-"],
pipe_hint_in="gz" )
ps.wait()

tb = tarfile.open(out_tarball_file)
tb.extractall(path=temp_dir)

for i in range(len(self.raw_files)):
inf = os.path.join(self.input_dir,"raw-input",self.raw_files[i])
outf = os.path.join(temp_dir,self.raw_files[i])

assert_equal_contents(self, inf, outf)

@pytest.fixture(autouse=True)
def capsys(self, capsys):
self.capsys = capsys

def test_piped_out_merge(self):
"""
Test with streamed output
"""
temp_dir = tempfile.gettempdir()
out_tarball_file = os.path.join(temp_dir,"out.tar.gz")

with open(out_tarball_file, "wb", 0) as outf:
# temporarily disable pytest's capture of sys.stdout
with self.capsys.disabled():
with patch('sys.stdout', outf):
file_utils.merge_tarballs( "-",
self.input_tgz_files,
pipe_hint_out="gz" )

tb = tarfile.open(out_tarball_file)
tb.extractall(path=temp_dir)

for i in range(len(self.raw_files)):
inf = os.path.join(self.input_dir,"raw-input",self.raw_files[i])
outf = os.path.join(temp_dir,self.raw_files[i])

assert_equal_contents(self, inf, outf)

def test_merge_piped_in_and_out(self):
"""
Test with streamed input and output
"""
temp_dir = tempfile.gettempdir()

out_tarball_file = os.path.join(temp_dir,"out.tar.gz")

ps = subprocess.Popen("cat {files}".format(files=' '.join(self.input_tgz_files)).split(), stdout=subprocess.PIPE)
with patch('sys.stdin', ps.stdout):
with open(out_tarball_file, "wb", 0) as outf:
# temporarily disable pytest's capture of sys.stdout
with self.capsys.disabled():
with patch('sys.stdout', outf):
file_utils.merge_tarballs( "-",
["-"],
pipe_hint_out="gz",
pipe_hint_in="gz" )
ps.wait()

tb = tarfile.open(out_tarball_file)
tb.extractall(path=temp_dir)

for i in range(len(self.raw_files)):
inf = os.path.join(self.input_dir,"raw-input",self.raw_files[i])
outf = os.path.join(temp_dir,self.raw_files[i])

assert_equal_contents(self, inf, outf)

Loading

0 comments on commit 54c912b

Please sign in to comment.