Skip to content

Commit

Permalink
Can stop unpack tasks with TaskControl.
Browse files Browse the repository at this point in the history
  • Loading branch information
jpakkane committed Nov 5, 2016
1 parent 189a204 commit 4c5d631
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 17 deletions.
51 changes: 38 additions & 13 deletions src/decompress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
#include"utils.h"
#include"fileutils.h"
#include"file.h"
#include"taskcontrol.h"

#include<portable_endian.h>
#include"portable_endian.h"
#include<zlib.h>

#ifdef _WIN32
Expand All @@ -54,17 +55,20 @@

namespace {

uint32_t inflate_to_file(const unsigned char *data_start, uint64_t data_size, FILE *ofile);
uint32_t lzma_to_file(const unsigned char *data_start, uint64_t data_size, FILE *ofile);
uint32_t unstore_to_file(const unsigned char *data_start, uint64_t data_size, FILE *ofile);
uint32_t inflate_to_file(const unsigned char *data_start, uint64_t data_size, FILE *ofile, TaskControl &tc);
uint32_t lzma_to_file(const unsigned char *data_start, uint64_t data_size, FILE *ofile, TaskControl &tc);
uint32_t unstore_to_file(const unsigned char *data_start, uint64_t data_size, FILE *ofile, TaskControl &tc);

/* Decompress from file source to file dest until stream ends or EOF.
inf() returns Z_OK on success, Z_MEM_ERROR if memory could not be
allocated for processing, Z_DATA_ERROR if the deflate data is
invalid or incomplete, Z_VERSION_ERROR if the version of zlib.h and
the version of the library linked do not match, or Z_ERRNO if there
is an error reading or writing the files. */
uint32_t inflate_to_file(const unsigned char *data_start, uint64_t data_size, FILE *ofile) {
uint32_t inflate_to_file(const unsigned char *data_start,
uint64_t data_size,
FILE *ofile,
TaskControl &tc) {
uint32_t crcvalue = crc32(0, Z_NULL, 0);
int ret;
unsigned have;
Expand Down Expand Up @@ -93,6 +97,7 @@ uint32_t inflate_to_file(const unsigned char *data_start, uint64_t data_size, FI

/* run inflate() on input until output buffer not full */
do {
tc.check_for_stopping();
strm.avail_out = CHUNK;
strm.next_out = out.get();
ret = inflate(&strm, Z_NO_FLUSH);
Expand Down Expand Up @@ -125,7 +130,10 @@ uint32_t lzma_to_file(const unsigned char *data_start, uint64_t data_size, FILE
}

#else
uint32_t lzma_to_file(const unsigned char *data_start, uint64_t data_size, FILE *ofile) {
uint32_t lzma_to_file(const unsigned char *data_start,
uint64_t data_size,
FILE *ofile,
TaskControl &tc) {
uint32_t crcvalue = crc32(0, Z_NULL, 0);
std::unique_ptr<unsigned char[]> out(new unsigned char [CHUNK]);
lzma_stream strm = LZMA_STREAM_INIT;
Expand Down Expand Up @@ -154,6 +162,7 @@ uint32_t lzma_to_file(const unsigned char *data_start, uint64_t data_size, FILE
strm.next_in = current;
/* decompress until data ends */
do {
tc.check_for_stopping();
if (strm.total_in == data_size - offset)
break;

Expand All @@ -175,7 +184,11 @@ uint32_t lzma_to_file(const unsigned char *data_start, uint64_t data_size, FILE
}
#endif

uint32_t unstore_to_file(const unsigned char *data_start, uint64_t data_size, FILE *ofile) {
uint32_t unstore_to_file(const unsigned char *data_start,
uint64_t data_size,
FILE *ofile,
TaskControl &tc) {
tc.check_for_stopping();
auto bytes_written = fwrite(data_start, 1, data_size, ofile);
if(bytes_written != data_size) {
throw_system("Could not write file fully:");
Expand All @@ -192,7 +205,12 @@ void create_symlink(const unsigned char *data_start, uint64_t data_size, const s
#endif
}

void create_file(const localheader &lh, const centralheader &ch, const unsigned char *data_start, uint64_t data_size, const std::string &outname) {
void create_file(const localheader &lh,
const centralheader &ch,
const unsigned char *data_start,
uint64_t data_size,
const std::string &outname,
TaskControl &tc) {
decltype(unstore_to_file) *f;
if(ch.compression_method == ZIP_NO_COMPRESSION) {
f = unstore_to_file;
Expand All @@ -211,7 +229,7 @@ void create_file(const localheader &lh, const centralheader &ch, const unsigned
File ofile(extraction_name.c_str(), "w+b");
uint32_t crc32;
try {
crc32 = (*f)(data_start, data_size, ofile.get());
crc32 = (*f)(data_start, data_size, ofile.get(), tc);
} catch(...) {
unlink(extraction_name.c_str());
throw;
Expand Down Expand Up @@ -272,12 +290,17 @@ filetype detect_filetype(const localheader &lh, const centralheader &ch) {
return FILE_ENTRY;
}

void do_unpack(const localheader &lh, const centralheader &ch, const unsigned char *data_start, uint64_t data_size, const std::string &outname) {
void do_unpack(const localheader &lh,
const centralheader &ch,
const unsigned char *data_start,
uint64_t data_size,
const std::string &outname,
TaskControl &tc) {
switch(detect_filetype(lh, ch)) {
case DIRECTORY_ENTRY : mkdirp(outname); break;
case SYMLINK_ENTRY : create_symlink(data_start, data_size, outname); break;
case CHARDEV_ENTRY : create_device(lh, outname); break;
case FILE_ENTRY : create_file(lh, ch, data_start, data_size, outname); break;
case FILE_ENTRY : create_file(lh, ch, data_start, data_size, outname, tc); break;
default : throw std::runtime_error("Unknown file type.");
}
}
Expand Down Expand Up @@ -305,7 +328,9 @@ void set_unix_permissions(const localheader &lh, const centralheader &ch, const

UnpackResult unpack_entry(const std::string &prefix, const localheader &lh,
const centralheader &ch,
const unsigned char *data_start, uint64_t data_size) {
const unsigned char *data_start,
uint64_t data_size,
TaskControl &tc) {
try {
std::string ofname;
if(prefix.empty()) {
Expand All @@ -317,7 +342,7 @@ UnpackResult unpack_entry(const std::string &prefix, const localheader &lh,
ofname = prefix + lh.fname;
}
}
do_unpack(lh, ch, data_start, data_size, ofname);
do_unpack(lh, ch, data_start, data_size, ofname, tc);
if(ch.version_made_by>>8 == MADE_BY_UNIX) {
set_unix_permissions(lh, ch, ofname);
}
Expand Down
6 changes: 5 additions & 1 deletion src/decompress.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include"zipdefs.h"
#include<string>

class TaskControl;

struct UnpackResult {
bool success;
std::string msg;
Expand All @@ -28,4 +30,6 @@ struct UnpackResult {
UnpackResult unpack_entry(const std::string &prefix,
const localheader &lh,
const centralheader &ch,
const unsigned char *data_start, uint64_t data_size);
const unsigned char *data_start,
uint64_t data_size,
TaskControl &tc);
12 changes: 12 additions & 0 deletions src/taskcontrol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,15 @@ void TaskControl::add_failure(const std::string &msg) {
results.push_back(msg);
num_failures++;
}

void TaskControl::stop() {
std::lock_guard<std::mutex> l(m);
should_stop = true;
}

void TaskControl::check_for_stopping() {
std::lock_guard<std::mutex> l(m);
if(should_stop) {
throw std::runtime_error("Stopping task evaluation.");
}
}
4 changes: 4 additions & 0 deletions src/taskcontrol.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@ class TaskControl final {
size_t finished() const;
std::string entry(size_t i) const;

void stop();
void check_for_stopping(); // throws a stopping exception

private:
mutable std::mutex m;
TaskState cur_state;
std::vector<std::string> results;
int num_success;
int num_failures;
int total_tasks;
bool should_stop = false;
};
7 changes: 4 additions & 3 deletions src/zipfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,10 @@ void ZipFile::run(const std::string &prefix, int num_threads) const {
wait_for_slot(futures, num_threads, tc);
auto unstoretask = [this, file_start, i, &prefix](){
return unpack_entry(prefix, entries[i],
centrals[i],
file_start + data_offsets[i],
entries[i].compressed_size);
centrals[i],
file_start + data_offsets[i],
entries[i].compressed_size,
tc);
};
futures.emplace_back(std::async(std::launch::async, unstoretask));
}
Expand Down

0 comments on commit 4c5d631

Please sign in to comment.