From 8f9008af12fe9d332655f03d58024df4e2275622 Mon Sep 17 00:00:00 2001 From: Rone Charles Date: Wed, 7 Aug 2024 12:11:08 -0400 Subject: [PATCH] Performance and quality fo life improvements to k2 script --- scripts/k2 | 513 +++++++++++++++++++++++++++++++++++------------------ 1 file changed, 341 insertions(+), 172 deletions(-) diff --git a/scripts/k2 b/scripts/k2 index 3434130..c90b72b 100755 --- a/scripts/k2 +++ b/scripts/k2 @@ -2,19 +2,22 @@ import argparse import bz2 +import concurrent.futures import ftplib import glob import gzip import hashlib +import http.client import inspect +import io import itertools import logging import lzma import math -import multiprocessing import os import re import shutil +import signal import subprocess import sys import tarfile @@ -22,7 +25,6 @@ import tempfile import threading import urllib import urllib.error -from urllib.error import HTTPError, ContentTooShortError import urllib.parse import urllib.request @@ -132,7 +134,7 @@ class FTP: f.flush() self.close() sys.exit(1) - except Exception: + except ftplib.all_errors: f.flush() self.reconnect() self.cwd(remote_dir) @@ -376,7 +378,7 @@ def hash_file(filename, buf_size=8192): break md5.update(data) digest = md5.hexdigest() - LOG.info("MD5 sum is {}\n".format(digest)) + LOG.info("MD5 sum of {} is {}\n".format(filename, digest)) return digest @@ -395,7 +397,7 @@ def hash_file(filename, buf_size=8192): # column. # def scan_fasta_file(in_file, out_file, lenient=False, seq_name_to_url_table=None): - LOG.info("Generating prelim_map.txt...") + LOG.info("Generating prelim_map.txt.\n") iterator = in_file iterator_is_dict = False if type(seq_name_to_url_table) is dict: @@ -420,7 +422,7 @@ def scan_fasta_file(in_file, out_file, lenient=False, seq_name_to_url_table=None out_file.write("TAXID\t{:s}\t{:s}\t{:s}\t{:s}\n".format(seqid, taxid, comment, remote_filepath)) else: out_file.write("ACCNUM\t{:s}\t{:s}\t{:s}\t{:s}\n".format(seqid, taxid, comment, remote_filepath)) - LOG.info("done.\n") + LOG.info("Finished generating prelim_map.txt.\n") # This function is part of the Kraken 2 taxonomic sequence @@ -464,16 +466,17 @@ def lookup_accession_numbers( break if target_lists: LOG.warning( - "{}/{} accession numbers remain unmapped, see unmapped_accessions.txt in DB directory\n".format( + "{}/{} accession numbers remain unmapped, see unmapped_accessions.txt in {} directory\n".format( len(target_lists), - initial_target_count), + initial_target_count, + os.path.abspath(os.curdir)) ) with open("unmapped_accessions.txt", "w") as f: for k in target_lists: f.write(k + "\n") -def spawn_masking_subprocess(output_file, protein=False): +def spawn_masking_subprocess(output_file, protein=False, threads=1): masking_binary = "segmasker" if protein else "k2mask" if "MASKER" in os.environ: masking_binary = os.environ["MASKER"] @@ -482,9 +485,7 @@ def spawn_masking_subprocess(output_file, protein=False): argv = masking_binary + " -outfmt fasta | sed -e '/^>/!s/[a-z]/x/g'" if masking_binary.find("k2mask") >= 0: # k2mask can run multithreaded - argv = masking_binary + " -outfmt fasta -threads {} -r x".format( - multiprocessing.cpu_count() // 2 - ) + argv = masking_binary + " -outfmt fasta -threads {} -r x".format(threads) p = subprocess.Popen( argv, shell=True, stdin=subprocess.PIPE, stdout=output_file @@ -500,9 +501,9 @@ def spawn_masking_subprocess(output_file, protein=False): # Mask low complexity sequences in the database -def mask_files(input_filenames, output_filename, protein=False): +def mask_files(input_filenames, output_filename, protein=False, threads=1): with open(output_filename, "wb") as fout: - masker = spawn_masking_subprocess(fout, protein) + masker = spawn_masking_subprocess(fout, protein, threads=threads) number_of_files = len(input_filenames) for i, input_filename in enumerate(input_filenames): library_name = os.path.basename(os.getcwd()) @@ -555,7 +556,7 @@ def add_to_library(args): shutil.copyfile(filename, destination) if not args.no_masking: mask_files( - [destination], destination + ".masked", protein=args.protein + [destination], destination + ".masked", protein=args.protein, threads=args.threads ) shutil.move(destination + ".masked", destination) with open("added.md5", "a") as out_file: @@ -579,7 +580,7 @@ def make_manifest_from_assembly_summary( continue remote_path = ftp_path + "/" + os.path.basename(ftp_path) + suffix url_components = urllib.parse.urlsplit(remote_path) - local_path = url_components.path.replace("/genomes/", "") + local_path = url_components.path.replace("/", "", 1) manifest_to_taxid[local_path] = taxid with open("manifest.txt", "w") as f: for k in manifest_to_taxid: @@ -587,44 +588,119 @@ def make_manifest_from_assembly_summary( return manifest_to_taxid -def assign_taxid_to_sequences(manifest_to_taxid, is_protein): +# def assign_taxid_to_sequences(manifest_to_taxid, is_protein): +# LOG.info("Assigning taxonomic IDs to sequences\n") +# out_filename = "library.faa" if is_protein else "library.fna" +# sequence_name_to_remote_filepath = {} +# with open(out_filename, "w") as f: +# projects_added = 0 +# total_projects = len(manifest_to_taxid) +# sequences_added = 0 +# ch_added = 0 +# ch = "aa" if is_protein else "bp" +# max_out_chars = 0 + +# out_line = progress_line( +# projects_added, total_projects, sequences_added, ch_added, ch +# ) +# LOG.debug("\r{:s}\r{:s}".format(" ", out_line)) +# for filepath in sorted(manifest_to_taxid): +# remote_filepath = urllib.parse.urljoin("/genomes/", filepath) +# taxid = manifest_to_taxid[filepath] +# with gzip.open(filepath, "rt") as in_file: +# while True: +# line = in_file.readline() +# if line == "": +# break +# if line.startswith(">"): +# line = line.replace( +# ">", ">kraken:taxid|" + taxid + "|", 1 +# ) +# sequence_name_to_remote_filepath[line] = remote_filepath +# sequences_added += 1 +# else: +# ch_added += len(line) - 1 +# f.write(line) +# projects_added += 1 +# max_out_chars = max(len(out_line), max_out_chars) +# space_line = " " * max_out_chars +# out_line = progress_line( +# projects_added, total_projects, sequences_added, ch_added, ch +# ) +# LOG.debug("\r{:s}\r{:s}".format(space_line, out_line)) +# LOG.info("\nFinished assigning taxonomic IDs to sequences\n") + +# return sequence_name_to_remote_filepath + +def assign_taxids(filepath, manifest_to_taxid): + sequences_added = 0 + ch_added = 0 + #nonlocal sequence_name_to_remote_filepath + # remote_filepath = urllib.parse.urljoin("/genomes/", filepath) + taxid = manifest_to_taxid[filepath] + out_filepath = os.path.splitext(filepath)[0] + with open(out_filepath, "w") as out_file: + with gzip.open(filepath, "rt") as in_file: + while True: + line = in_file.readline() + if line == "": + break + if line.startswith(">"): + line = line.replace( + ">", ">kraken:taxid|" + taxid + "|", 1 + ) + # sequence_name_to_remote_filepath[line] = remote_filepath + sequences_added += 1 + else: + ch_added += len(line) - 1 + out_file.write(line) + return (sequences_added, ch_added) + +def assign_taxid_to_sequences(manifest_to_taxid, is_protein, threads=1): LOG.info("Assigning taxonomic IDs to sequences\n") - out_filename = "library.faa" if is_protein else "library.fna" + library_filename = "library.faa" if is_protein else "library.fna" sequence_name_to_remote_filepath = {} - with open(out_filename, "w") as f: - projects_added = 0 - total_projects = len(manifest_to_taxid) - sequences_added = 0 - ch_added = 0 - ch = "aa" if is_protein else "bp" - max_out_chars = 0 - for filepath in sorted(manifest_to_taxid): - remote_filepath = urllib.parse.urljoin("/genomes/", filepath) - taxid = manifest_to_taxid[filepath] - with gzip.open(filepath, "rt") as in_file: - while True: - line = in_file.readline() - if line == "": - break - if line.startswith(">"): - line = line.replace( - ">", ">kraken:taxid|" + taxid + "|", 1 - ) - sequence_name_to_remote_filepath[line] = remote_filepath - sequences_added += 1 - else: - ch_added += len(line) - 1 - f.write(line) + projects_added = 0 + total_projects = len(manifest_to_taxid) + sequences_added = 0 + ch_added = 0 + ch = "aa" if is_protein else "bp" + + out_line = progress_line( + projects_added, total_projects, sequences_added, ch_added, ch + ) + + LOG.debug("\r{:s}\r{:s}".format(" ", out_line)) + filepaths = sorted(manifest_to_taxid) + with concurrent.futures.ProcessPoolExecutor(max_workers=threads) as pool: + futures = [] + max_out_line_len = 0 + for filepath in filepaths: + futures.append(pool.submit(assign_taxids, filepath, manifest_to_taxid)) + for future in concurrent.futures.as_completed(futures): + result = future.result() + sequences_added += result[0] + ch_added += result[1] projects_added += 1 out_line = progress_line( projects_added, total_projects, sequences_added, ch_added, ch ) - max_out_chars = max(len(out_line), max_out_chars) - space_line = " " * max_out_chars - LOG.debug("\r{:s}\r{:s}".format(space_line, out_line)) - LOG.info("\nFinished assigning taxonomic IDs to sequences\n") + max_out_line_len = max(len(out_line), max_out_line_len) + spaces = " " * max_out_line_len + LOG.debug("\r{:s}\r{:s}".format(spaces, out_line)) + + # assign_taxids(filepaths[0]) + LOG.info("\nFinished assigning taxonomic IDs to sequences\n") + + LOG.info("Generating {:s}\n".format( library_filename)) + with open(library_filename, "w") as out_file: + for filepath in [os.path.splitext(filepath)[0] for filepath in filepaths]: + with open(filepath, "r") as in_file: + shutil.copyfileobj(in_file, out_file) - return sequence_name_to_remote_filepath + LOG.info("Finished generating {:s}\n".format(library_filename)) + + return sequence_name_to_remote_filepath def progress_line(projects, total_projects, seqs, chars, ch): @@ -636,9 +712,9 @@ def progress_line(projects, total_projects, seqs, chars, ch): line += " project(s), {:d} sequence(s), ".format(seqs) prefix = None for p in ["k", "M", "G", "T", "P", "E"]: - if chars >= 1000: + if chars >= 1024: prefix = p - chars /= 1000 + chars /= 1024 else: break if prefix: @@ -649,6 +725,8 @@ def progress_line(projects, total_projects, seqs, chars, ch): def decompress_files(compressed_filenames, out_filename=None, buf_size=8192): + if isinstance(compressed_filenames, str): + compressed_filenames = [compressed_filenames] if out_filename: if os.path.exists(out_filename + ".tmp"): os.remove(out_filename + ".tmp") @@ -666,7 +744,6 @@ def decompress_files(compressed_filenames, out_filename=None, buf_size=8192): with open(out_filename + ".tmp", "wb") as out: decompress_file(gz, out, buf_size) os.rename(out_filename + ".tmp", out_filename) - remove_files(*compressed_filenames) def decompress_file(in_file, out_file, buf_size=8129): @@ -674,11 +751,6 @@ def decompress_file(in_file, out_file, buf_size=8129): "Decompressing {:s}\n".format(os.path.join(os.getcwd(), in_file.name)) ) shutil.copyfileobj(in_file, out_file, buf_size) - # while True: - # data = in_file.read(buf_size) - # out_file.write(data) - # if data == b"": - # break LOG.info( "Finished decompressing {:s}\n".format( os.path.join(os.getcwd(), in_file.name) @@ -686,14 +758,14 @@ def decompress_file(in_file, out_file, buf_size=8129): ) -def download_log(filename): +def download_log(filename, total_size=None): pb = None current_size = 0 - def inner(block_number, read_size, total_size): - nonlocal pb, current_size + def inner(block_number, read_size, size): + nonlocal pb, current_size, total_size if not pb: - pb = ProgressBar(total_size) + pb = ProgressBar(total_size or size) current_size += read_size pb.progress(current_size) LOG.debug( @@ -703,7 +775,7 @@ def download_log(filename): return inner -def http_download_file(url, local_name=None): +def http_download_file(url, local_name=None, call_back=None): if not local_name: local_name = urllib.parse.urlparse(url).path.split("/")[-1] else: @@ -720,18 +792,72 @@ def http_download_file(url, local_name=None): LOG.info("Beginning download of {:s}\n".format(url)) urllib.request.urlretrieve( - url, local_name, reporthook=download_log(local_name) + url, local_name, reporthook=(call_back or download_log(local_name)) ) clear_console_line() LOG.info("Saved {:s} to {:s}\n".format(local_name, os.getcwd())) +def http_download_file2(server, urls, save_to=None, md5sums=None): + files_downloaded = 0 + conn = http.client.HTTPSConnection(server) + for url in urls: + url = url.strip() + filename = os.path.basename(url) + local_name = os.path.abspath(url) + if save_to: + local_name = os.path.join(save_to, filename) + local_directory = os.path.dirname(local_name) + os.makedirs(os.path.dirname(local_name), exist_ok=True) + + if os.path.exists(local_name): + md5 = [] + if md5sums is None: + remote_dirname = os.path.dirname(url) + conn.request("GET", "/" + remote_dirname + "/md5checksums.txt") + response = conn.getresponse() + + if response.status == 200: + md5 = response.readlines() + else: + response.read() + conn.request("GET", "/" + url + ".md5") + response = conn.getresponse() + if response.status == 200: + md5 = response.readlines() + response.read() + else: + md5 = md5sums + + if len(md5) > 0: + found = False + for entry in md5: + (md5sum, remote_filename) = entry.split() + if remote_filename.decode().endswith(filename) and md5sum.decode() == hash_file(local_name): + found = True + break + if found: + LOG.info("Already downloaded {:s}\n".format(urllib.parse.urljoin(server, url))) + continue + LOG.info("Beginning download of {:s}\n".format(server + "/" + url)) + with open(local_name, "wb") as out_file: + conn.request("GET", "/" + url) + response = conn.getresponse() + if response.status == 200: + shutil.copyfileobj(response, out_file) + else: + LOG.warning("Unable to download file {}\n".format(url)) + LOG.info("Saved {:s} to {:s}\n".format(filename, local_directory)) + files_downloaded += 1 -def make_manifest_filter(file, regex): + conn.close() + + return files_downloaded + +def make_file_filter(file_handle, regex): def inner(listing): - nonlocal file, regex path = listing.split()[-1] if path.endswith(regex): - file.write(path + "\n") + file_handle.write(path + "\n") return inner @@ -744,98 +870,125 @@ def move(src, dst): shutil.move(src, dst) -def get_manifest(server, remote_path, regex): - with open("manifest.txt", "w") as f: - ftp = ftplib.FTP(server) - ftp.login() - ftp.cwd(remote_path) - ftp.retrlines("LIST", callback=make_manifest_filter(f, regex)) - ftp.close() +def get_manifest_and_md5sums(server, remote_directory, regex): + ftp = ftplib.FTP(server) + ftp.login() + + sstream = io.StringIO() + ftp.cwd(remote_directory) + ftp.retrlines("LIST", callback=make_file_filter(sstream, regex)) + with open("manifest.txt", "w") as out: + for line in sstream.getvalue().split(): + out.write(urllib.parse.urljoin(remote_directory, line) + '\n') + + sstream.truncate(0) + sstream.seek(0) + ftp.cwd("/refseq/release/release-catalog") + ftp.retrlines("LIST", callback=make_file_filter(sstream, "installed")) + install_file = sstream.getvalue().strip() + + bstream = io.BytesIO() + + ftp.retrbinary("RETR " + install_file, callback=bstream.write) + ftp.close() + md5sums = [line for line in bstream.getvalue().split(b'\n') if line.find(b"plasmid") != -1] + + return md5sums + +# def check_if_files_exists(server, remote_dir, filepaths): +# total_size = 0 +# filepaths_found = [] + +# conn = http.client.HTTPSConnection(server) +# for filepath in filepaths: +# filepath = filepath.strip() +# url = urllib.parse.urljoin(remote_dir, filepath) +# # if not ftp.exists(url): +# conn.request('HEAD', '/' + url) +# resp = conn.getresponse() +# resp.close() +# if resp.status != 200: +# if filepath_to_taxid_table: +# del filepath_to_taxid_table[filepaths[i]] +# LOG.warning( +# "{:s} does not exist on server, skipping\n".format( +# remote_dir + filepath +# ) +# ) +# continue + +# filepaths_found.append(filepath) +# total_size += int(resp.getheader("Content-Length")) + +# conn.close() +# return (filepaths_found, total_size) + +# Rewrite this so we don't check if files exist on server it takes too m def download_files_from_manifest( - server, - remote_dir, - manifest_filename="manifest.txt", - filepath_to_taxid_table=None, + server, + threads=1, + manifest_filename="manifest.txt", + filepath_to_taxid_table=None, + md5sums=None ): - with open(manifest_filename, "r") as f: - filepaths = [] - ftp = FTP(server) - spinner = ["|", "/", "—", "\\"] - i = 0 - for filepath in f: - LOG.debug( - "Checking if manifest files exist on server {:s}\r".format( - spinner[i % 4] - ) - ) - i += 1 - filepath = filepath.strip() - if not ftp.exists(urllib.parse.urljoin(remote_dir, filepath)): - if filepath_to_taxid_table: - del filepath_to_taxid_table[filepath] - LOG.warning( - "{:s} does not exist on server, skipping\n".format( - remote_dir + filepath - ) - ) - continue - filepaths.append(filepath) - downloaded = 0 - for filepath in filepaths: - url = urllib.parse.urlunsplit(("https", server, remote_dir + filepath, "", "")) - try: - http_download_file(url, filepath) - except [ContentTooShortError, HTTPError]: - break - downloaded += 1 - filepaths = filepaths[downloaded:] - if filepaths: - LOG.warning( - "There was an issue downloading files over HTTP, falling back to FTP for remaining {} file(s).\n".format(len(filepaths)) - ) - ftp.reconnect() - ftp.download(remote_dir, filepaths) - ftp.close() + with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as pool: + with open(manifest_filename, "r") as f: + filepaths = f.readlines() + partitions = [] + futures = [] + step = len(filepaths) // threads + if step == 0: + step = 1 + for i in range(0, len(filepaths), step): + partitions.append(filepaths[i:i + step]) + # http_download_file2(server, filepaths, None, None) + for partition in partitions: + futures.append(pool.submit(http_download_file2, server, partition, None, md5sums)) + concurrent.futures.wait(futures) def download_taxonomy(args): taxonomy_path = os.path.join(args.db, "taxonomy") os.makedirs(taxonomy_path, exist_ok=True) os.chdir(taxonomy_path) - ftp = FTP(NCBI_SERVER) - if not args.skip_maps: - if not args.protein: - for subsection in ["gb", "wgs"]: - LOG.info( - "Downloading nucleotide {:s} accession to taxon map\n".format( - subsection - ) - ) - filename = "nucl_" + subsection + ".accession2taxid.gz" - ftp.download("/pub/taxonomy/accession2taxid/", filename) - else: - LOG.info("Downloading protein accession to taxon map\n") - ftp.download( - "/pub/taxonomy/accession2taxid", "prot.accession2taxid.gz" - ) - LOG.info("Downloaded accession to taxon map(s)") - LOG.info("and taxonomy tree data\n") - ftp.download("/pub/taxonomy", "taxdump.tar.gz") - ftp.close() - LOG.info("Decompressing taxonomy data\n") - decompress_files(glob.glob("*accession2taxid.gz")) + futures = [] + + with concurrent.futures.ProcessPoolExecutor(max_workers=2) as pool: + maps = set() + if not args.skip_maps: + if not args.protein: + for subsection in ["gb", "wgs"]: + filename = "pub/taxonomy/accession2taxid/" + filename += "nucl_" + subsection + ".accession2taxid.gz" + maps.add(os.path.basename(filename)) + futures.append(pool.submit(http_download_file2, NCBI_SERVER, [filename], save_to=os.path.abspath(os.curdir))) + else: + filename = "/pub/taxonomy/accession2taxid/" + filename += "prot.accession2taxid.gz" + futures.append(pool.submit(http_download_file2, NCBI_SERVER, [filename], save_to=os.path.abspath(os.curdir))) + maps.add(os.path.basename(filename)) + + for future in concurrent.futures.as_completed(futures): + download_count = future.result() + maps_downloaded = set(glob.glob("*.accession2taxid.gz")) + map_to_decompress = maps_downloaded.intersection(maps) + for gz_filename in map_to_decompress: + if not os.path.exists(gz_filename[:-3]) or download_count != 0: + futures.append(pool.submit(decompress_files, gz_filename)) + maps.remove(gz_filename) + + LOG.info("Downloading taxonomy tree data\n") + filename = "pub/taxonomy/taxdump.tar.gz" + http_download_file2(NCBI_SERVER, [filename], save_to=os.path.abspath(os.curdir)) LOG.info("Untarring taxonomy tree data\n") with tarfile.open("taxdump.tar.gz", "r:gz") as tar: tar.extractall() - remove_files(*glob.glob("*.gz")) - def download_genomic_library(args): library_filename = "library.faa" if args.protein else "library.fna" library_pathname = os.path.join(args.db, "library") LOG.info("Adding {:s} to {:s}\n".format(args.library, args.db)) - files_to_remove = None if args.library in [ "archaea", "bacteria", @@ -856,10 +1009,8 @@ def download_genomic_library(args): if args.library == "human": remote_dir_name = "vertebrate_mammalian/Homo_sapiens" try: - url = "ftp://{:s}/genomes/refseq/{:s}/assembly_summary.txt".format( - NCBI_SERVER, remote_dir_name - ) - http_download_file(url) + url = "genomes/refseq/{:s}/assembly_summary.txt".format(remote_dir_name) + http_download_file2(NCBI_SERVER, [url], save_to=os.path.abspath(os.curdir)) except urllib.error.URLError: LOG.error( "Error downloading assembly summary file for {:s}, exiting\n".format( @@ -880,43 +1031,42 @@ def download_genomic_library(args): ) download_files_from_manifest( NCBI_SERVER, - "genomes/", - filepath_to_taxid_table=filepath_to_taxid_table, + args.threads, + filepath_to_taxid_table=filepath_to_taxid_table ) seq_name_to_url_table = \ - assign_taxid_to_sequences(filepath_to_taxid_table, args.protein) + assign_taxid_to_sequences(filepath_to_taxid_table, args.protein, threads=args.threads) with open(library_filename, "r") as in_file: with open("prelim_map.txt", "w") as out_file: out_file.write("# prelim_map for " + args.library + "\n") scan_fasta_file(in_file, out_file, seq_name_to_url_table=seq_name_to_url_table) - files_to_remove = ["all", "manifest.txt"] elif args.library == "plasmid": library_pathname = os.path.join(library_pathname, args.library) os.makedirs(library_pathname, exist_ok=True) os.chdir(library_pathname) pat = ".faa.gz" if args.protein else ".fna.gz" - get_manifest(NCBI_SERVER, "genomes/refseq/plasmid/", pat) + md5 = get_manifest_and_md5sums(NCBI_SERVER, "genomes/refseq/plasmid/", pat) download_files_from_manifest( NCBI_SERVER, - "genomes/refseq/plasmid/" + args.threads, + md5sums=md5 ) seq_name_to_url_table = {} + LOG.info("Generating {}\n".format(library_filename)) with open(library_filename, "w") as library: with open("manifest.txt", "r") as manifest: for filename in manifest: filename = filename.strip() - remote_filepath = "/genomes/refseq/plasmid" + "/" + filename with gzip.open(filename, "rt") as f: for line in f: if line.startswith(">"): - seq_name_to_url_table[line.strip()] = remote_filepath + seq_name_to_url_table[line.strip()] = filename library.write(line) + LOG.info("Finished generating {}\n".format(library_filename)) with open(library_filename, "r") as in_file: with open("prelim_map.txt", "w") as out_file: out_file.write("# prelim_map for " + args.library + "\n") scan_fasta_file(in_file, out_file, seq_name_to_url_table=seq_name_to_url_table) - files_to_remove = glob.glob("plasmid.*") - files_to_remove.append("manifest.txt") elif args.library in ["nr", "nt"]: protein_lib = args.library == "nr" if protein_lib and not args.protein: @@ -929,8 +1079,8 @@ def download_genomic_library(args): library_pathname = os.path.join(library_pathname, args.library) os.makedirs(library_pathname, exist_ok=True) os.chdir(library_pathname) - ftp = FTP(NCBI_SERVER) - ftp.download("blast/db/FASTA/", args.library + ".gz") + url = "blast/db/FASTA/" + args.library + ".gz" + http_download_file2(NCBI_SERVER, [url], save_to=os.path.abspath(os.curdir)) with gzip.open(args.library + ".gz", mode="rt") as in_file: with open(library_filename, "w") as out_file: shutil.copyfileobj(in_file, out_file) @@ -938,21 +1088,19 @@ def download_genomic_library(args): with open("prelim_map.txt", "w") as out_file: out_file.write("# prelim_map for " + args.library + "\n") scan_fasta_file(in_file, out_file, seq_name_to_url_table="blast/db/FASTA/nt.gz", lenient=True) - files_to_remove = glob.glob("*.gz") elif args.library in ["UniVec", "UniVec_Core"]: if args.protein: LOG.error( - "{:s} is for nucleotide databases only\n".format(args.library) + "{:s} is available for nucleotide databases only\n".format(args.library) ) sys.exit(1) library_pathname = os.path.join(library_pathname, args.library) os.makedirs(library_pathname, exist_ok=True) os.chdir(library_pathname) - ftp = FTP(NCBI_SERVER) - ftp.download("pub/UniVec", args.library) + http_download_file2(NCBI_SERVER, ["pub/UniVec/" + args.library], save_to=os.path.abspath(os.curdir)) special_taxid = 28384 LOG.info( - "Adding taxonomy ID of {:d} to all sequences\n".format( + "Assigning taxonomy ID of {:d} to all sequences\n".format( special_taxid ) ) @@ -970,16 +1118,12 @@ def download_genomic_library(args): with open("prelim_map.txt", "w") as out_file: out_file.write("# prelim_map for " + args.library + "\n") scan_fasta_file(in_file, out_file, seq_name_to_url_table="pub/UniVec/" + args.library) - files_to_remove = [args.library] if not args.no_masking: mask_files( - [library_filename], library_filename + ".masked", args.protein + [library_filename], library_filename + ".masked", args.protein, threads=args.threads ) shutil.move(library_filename + ".masked", library_filename) LOG.info("Added {:s} to {:s}\n".format(args.library, args.db)) - if files_to_remove: - clean_up(files_to_remove) - def get_abs_path(filename): return os.path.abspath(filename) @@ -1749,17 +1893,22 @@ def format_bytes(size): def clean_up(filenames): - LOG.info("Removing extra files\n") - disk_usage_before = shutil.disk_usage(os.getcwd()).free + LOG.info("Removing the following files: {}\n".format(filenames)) + # walk the directory tree to get the size of the individual files + # sum them up to get the usage stat + disk_usage_before = shutil.disk_usage(os.getcwd()).used remove_files(*filenames) - disk_usage_after = shutil.disk_usage(os.getcwd()).free - freed = disk_usage_after - disk_usage_before + disk_usage_after = shutil.disk_usage(os.getcwd()).used + freed = disk_usage_before - disk_usage_after LOG.info("Cleaned up {:s} of space\n".format(format_bytes(freed))) def clean_db(args): os.chdir(args.db) - clean_up(["data", "library", "taxonomy", "seqid2taxid.map", "prelim_map.txt"]) + if args.pattern: + clean_up(glob.glob(args.pattern, recursive=False)) + else: + clean_up(["data", "library", "taxonomy", "seqid2taxid.map", "prelim_map.txt"]) def make_build_parser(subparsers): @@ -1963,6 +2112,13 @@ def make_download_library_parser(subparsers): default=None, help="Specify a log filename (default stderr)", ) + parser.add_argument( + "--threads", + type=int, + metavar="THREADS", + default=1, + help="The number of threads/processes k2 uses when downloading and processing library files." + ) def make_add_to_library_parser(subparsers): @@ -2180,7 +2336,7 @@ def make_inspect_parser(subparsers): def make_clean_parser(subparsers): parser = subparsers.add_parser( - "clean", help="Remove unneeded files from database" + "clean", help="Removes all intermediary files from database" ) parser.add_argument( "--db", @@ -2196,6 +2352,13 @@ def make_clean_parser(subparsers): default=None, help="Specify a log filename (default: stderr)", ) + parser.add_argument( + "--pattern", + type=str, + metavar="SHELL_REGEX", + default=None, + help="Files that match this regular expression will be deleted." + ) class HelpAction(argparse._HelpAction): @@ -2322,7 +2485,9 @@ def k2_main(): elif args.special == "silva": build_16S_silva(args) else: - build_16S_rdp(args) + LOG.error("RDP database no longer supported.\n") + sys.exit(1) + # build_16S_rdp(args) else: if args.no_masking: LOG.warning( @@ -2332,5 +2497,9 @@ def k2_main(): build_kraken2_db(args) +def sigint(signum, frame): + sys.exit(1) + if __name__ == "__main__": + signal.signal(signal.SIGINT, sigint) k2_main()