forked from apple/turicreate
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgeneral_fstream_sink.cpp
117 lines (103 loc) · 3.43 KB
/
general_fstream_sink.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/* Copyright © 2017 Apple Inc. All rights reserved.
*
* Use of this source code is governed by a BSD-3-clause license that can
* be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
*/
#include <boost/algorithm/string.hpp>
#include <fileio/general_fstream_sink.hpp>
#include <fileio/sanitize_url.hpp>
#include <logger/logger.hpp>
namespace turi {
namespace fileio_impl {
general_fstream_sink::general_fstream_sink(std::string file) {
open_file(file, boost::ends_with(file, ".gz"));
}
general_fstream_sink::general_fstream_sink(std::string file,
bool gzip_compressed) {
open_file(file, gzip_compressed);
}
void general_fstream_sink::open_file(std::string file, bool gzip_compressed) {
sanitized_filename = sanitize_url(file);
out_file = std::make_shared<union_fstream>(file, std::ios_base::out | std::ios_base::binary);
is_gzip_compressed = gzip_compressed;
if (gzip_compressed) {
compressor = std::make_shared<boost::iostreams::gzip_compressor>();
}
// get the underlying stream inside the union stream
underlying_stream = out_file->get_ostream();
}
bool general_fstream_sink::is_open() const {
return underlying_stream && !underlying_stream->bad();
}
std::streamsize general_fstream_sink::write(const char* c,
std::streamsize bufsize) {
#ifdef _WIN32
// windows has interesting issues if bufsize >= 2GB
// we cut up the buffer and read it in 1GB increments
const std::streamsize WIN_WRITE_LIMIT = 1LL*1024*1024*1024; // 1GB
// cut into smaller buffers
std::streamsize remaining_size = bufsize;
while(remaining_size > 0) {
std::streamsize limit = std::min(remaining_size, WIN_WRITE_LIMIT);
if (is_gzip_compressed) {
compressor->write(*underlying_stream, c, limit);
} else {
underlying_stream->write(c, limit);
if (underlying_stream->fail()) return 0;
}
remaining_size -= limit;
c += limit;
}
return bufsize;
#else
if (is_gzip_compressed) {
return compressor->write(*underlying_stream, c, bufsize);
} else {
underlying_stream->write(c, bufsize);
if (underlying_stream->fail()) return 0;
else return bufsize;
}
#endif
}
general_fstream_sink::~general_fstream_sink() {
// if I am the only reference to the object, close it.
if (out_file && out_file.unique()) {
try {
close();
} catch (...) {
logstream(LOG_ERROR) << "Exception occured on closing "
<< sanitized_filename
<< ". The file may not be properly written" << std::endl;
}
}
}
void general_fstream_sink::close() {
if (compressor) {
compressor->close(*underlying_stream, std::ios_base::out);
compressor.reset();
}
underlying_stream.reset();
out_file.reset();
}
bool general_fstream_sink::good() const {
return underlying_stream && underlying_stream->good();
}
bool general_fstream_sink::bad() const {
// if stream is NULL. the stream is bad
if (underlying_stream == nullptr) return true;
return underlying_stream->bad();
}
bool general_fstream_sink::fail() const {
// if stream is NULL. the stream is bad
if (underlying_stream == nullptr) return true;
return underlying_stream->fail();
}
size_t general_fstream_sink::get_bytes_written() const {
if (underlying_stream) {
return underlying_stream->tellp();
} else {
return (size_t)(-1);
}
}
} // namespace fileio_impl
} // namespace turi