Skip to content

Commit

Permalink
IMPALA-8561: Eliminate mtime=-1 for HDFS scan ranges (part 1)
Browse files Browse the repository at this point in the history
The file handle cache uses the mtime to distinguish
different versions of a file separate. For example,
if a file at mtime=1 is overwritten with a version
at mtime=2, the old file handle from mtime=1 will
not be used for the mtime=2 version.

In some codepaths, for legacy reasons, the mtime
would be unconditionally set to -1, and this
eliminates the ability to distinguish between different
versions of files. There is no need to set the mtime to
-1. It seems to be a legacy bit of cruft.

This removes the mtime=-1 behavior for HDFS scan ranges.
It removes mtime from BufferOpts and plumbs the mtime
through the scan range codepaths separately. Local
non-HDFS files do not use the mtime, so those continue
to use mtime=-1.

Testing:
 - Passed core tests

Backport conflicts:
 - The page skipping is not present, so some overloads of
   HdfsScanNodeBase::AllocateScanRange() don't exist.

Change-Id: I48b7ed60d6ab9104b993237b4fe23de5dc058672
Reviewed-on: http://gerrit.cloudera.org:8080/13522
Reviewed-by: Michael Ho <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-on: https://gerrit.sjc.cloudera.com/c/cdh/impala/+/49654
Tested-by: Jenkins User <[email protected]>
CDH-Build: Jenkins User <[email protected]>
Quasar-L0: quasar-precommit User
  • Loading branch information
joemcdonnell committed Jun 21, 2019
1 parent b7ec1fa commit bbd1839
Show file tree
Hide file tree
Showing 16 changed files with 63 additions and 59 deletions.
2 changes: 1 addition & 1 deletion be/src/exec/base-sequence-scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
// 1 queue for each NIC as well?
ScanRange* header_range = scan_node->AllocateScanRange(files[i]->fs,
files[i]->filename.c_str(), header_size, 0, header_metadata, -1, false,
files[i]->is_erasure_coded, BufferOpts::Uncached());
files[i]->is_erasure_coded, files[i]->mtime, BufferOpts::Uncached());
header_ranges.push_back(header_range);
}
// When the header is parsed, we will issue more AddDiskIoRanges in
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/hdfs-orc-scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ void HdfsOrcScanner::ScanRangeInputStream::read(void* buf, uint64_t length,
ScanRange* range = scanner_->scan_node_->AllocateScanRange(
metadata_range->fs(), scanner_->filename(), length, offset, partition_id,
split_range->disk_id(), expected_local, split_range->is_erasure_coded(),
split_range->mtime(),
BufferOpts::ReadInto(reinterpret_cast<uint8_t*>(buf), length));

unique_ptr<BufferDescriptor> io_buffer;
Expand Down
12 changes: 6 additions & 6 deletions be/src/exec/hdfs-scan-node-base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
file_desc->splits.push_back(
AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), split.length,
split.offset, split.partition_id, params.volume_id, expected_local,
file_desc->is_erasure_coded, BufferOpts(try_cache, file_desc->mtime)));
file_desc->is_erasure_coded, file_desc->mtime, BufferOpts(try_cache)));
}

// Update server wide metrics for number of scan ranges and ranges that have
Expand Down Expand Up @@ -581,17 +581,17 @@ int64_t HdfsScanNodeBase::IncreaseReservationIncrementally(int64_t curr_reservat

ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
bool is_erasure_coded, const BufferOpts& buffer_opts,
bool is_erasure_coded, int64_t mtime, const BufferOpts& buffer_opts,
const ScanRange* original_split) {
ScanRangeMetadata* metadata = runtime_state_->obj_pool()->Add(
new ScanRangeMetadata(partition_id, original_split));
return AllocateScanRange(fs, file, len, offset, metadata, disk_id, expected_local,
is_erasure_coded, buffer_opts);
mtime, is_erasure_coded, buffer_opts);
}

ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
int64_t len, int64_t offset, ScanRangeMetadata* metadata, int disk_id, bool expected_local,
bool is_erasure_coded, const BufferOpts& buffer_opts) {
bool is_erasure_coded, int64_t mtime, const BufferOpts& buffer_opts) {
DCHECK_GE(disk_id, -1);
// Require that the scan range is within [0, file_length). While this cannot be used
// to guarantee safety (file_length metadata may be stale), it avoids different
Expand All @@ -606,7 +606,7 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,

ScanRange* range = runtime_state_->obj_pool()->Add(new ScanRange);
range->Reset(fs, file, len, offset, disk_id, expected_local, is_erasure_coded,
buffer_opts, metadata);
mtime, buffer_opts, metadata);
return range;
}

Expand All @@ -615,7 +615,7 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
bool expected_local, int mtime, bool is_erasure_coded,
const ScanRange* original_split) {
return AllocateScanRange(fs, file, len, offset, partition_id, disk_id, expected_local,
is_erasure_coded, BufferOpts(try_cache, mtime), original_split);
is_erasure_coded, mtime, BufferOpts(try_cache), original_split);
}

Status HdfsScanNodeBase::AddDiskIoRanges(const vector<ScanRange*>& ranges,
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/hdfs-scan-node-base.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,15 @@ class HdfsScanNodeBase : public ScanNode {
/// This is thread safe.
io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
bool is_erasure_coded,
bool is_erasure_coded, int64_t mtime,
const io::BufferOpts& buffer_opts,
const io::ScanRange* original_split = NULL);

/// Same as above, but it takes a pointer to a ScanRangeMetadata object which contains
/// the partition_id, original_splits, and other information about the scan range.
io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
int64_t offset, ScanRangeMetadata* metadata, int disk_id, bool expected_local,
bool is_erasure_coded, const io::BufferOpts& buffer_opts);
bool is_erasure_coded, int64_t mtime, const io::BufferOpts& buffer_opts);

/// Old API for compatibility with text scanners (e.g. LZO text scanner).
io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/hdfs-scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -813,13 +813,15 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
files[i]->filename.c_str(), footer_size, footer_start,
split_metadata->partition_id, footer_split->disk_id(),
footer_split->expected_local(), files[i]->is_erasure_coded,
BufferOpts(footer_split->try_cache(), files[i]->mtime), split);
files[i]->mtime, BufferOpts(footer_split->try_cache()),
split);
} else {
// If we did not find the last split, we know it is going to be a remote read.
footer_range =
scan_node->AllocateScanRange(files[i]->fs, files[i]->filename.c_str(),
footer_size, footer_start, split_metadata->partition_id, -1, false,
files[i]->is_erasure_coded, BufferOpts::Uncached(), split);
files[i]->is_erasure_coded, files[i]->mtime, BufferOpts::Uncached(),
split);
}

footer_ranges.push_back(footer_range);
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/hdfs-text-scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
ScanRange* file_range = scan_node->AllocateScanRange(files[i]->fs,
files[i]->filename.c_str(), files[i]->file_length, 0,
metadata->partition_id, split->disk_id(), split->expected_local(),
files[i]->is_erasure_coded,
BufferOpts(split->try_cache(), files[i]->mtime));
files[i]->is_erasure_coded, files[i]->mtime,
BufferOpts(split->try_cache()));
compressed_text_scan_ranges.push_back(file_range);
scan_node->max_compressed_text_file_length()->Set(files[i]->file_length);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/parquet/hdfs-parquet-scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,7 @@ Status HdfsParquetScanner::ProcessFooter() {
ScanRange* metadata_range = scan_node_->AllocateScanRange(
metadata_range_->fs(), filename(), metadata_size, metadata_start, partition_id,
metadata_range_->disk_id(), metadata_range_->expected_local(),
metadata_range_->is_erasure_coded(),
metadata_range_->is_erasure_coded(), metadata_range_->mtime(),
BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size));

unique_ptr<BufferDescriptor> io_buffer;
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/parquet/parquet-column-readers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1018,8 +1018,8 @@ Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc,
&& col_end <= split_range->offset() + split_range->len();
scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
filename(), col_len, col_start, partition_id, split_range->disk_id(),
col_range_local, split_range->is_erasure_coded(),
BufferOpts(split_range->try_cache(), file_desc.mtime));
col_range_local, split_range->is_erasure_coded(), file_desc.mtime,
BufferOpts(split_range->try_cache()));
ClearDictionaryDecoder();
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/scanner-context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
ScanRange* range = parent_->scan_node_->AllocateScanRange(
scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id,
scan_range_->disk_id(), false, scan_range_->is_erasure_coded(),
BufferOpts::Uncached());
scan_range_->mtime(), BufferOpts::Uncached());
bool needs_buffers;
RETURN_IF_ERROR(
parent_->scan_node_->reader_context()->StartScanRange(range, &needs_buffers));
Expand Down
5 changes: 3 additions & 2 deletions be/src/runtime/io/data-cache-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,10 @@ TEST_F(DataCacheTest, TestBasics) {
ASSERT_FALSE(cache.Store(FNAME, MTIME, 0, test_buffer(), cache_size * 2));

// Test with uncacheable 'mtime' to make sure the entry is not stored.
ASSERT_FALSE(cache.Store(FNAME, BufferOpts::NEVER_CACHE, 0, test_buffer(),
ASSERT_FALSE(cache.Store(FNAME, ScanRange::INVALID_MTIME, 0, test_buffer(),
TEMP_BUFFER_SIZE));
ASSERT_EQ(0, cache.Lookup(FNAME, BufferOpts::NEVER_CACHE, 0, TEMP_BUFFER_SIZE, buffer));
ASSERT_EQ(0, cache.Lookup(FNAME, ScanRange::INVALID_MTIME, 0, TEMP_BUFFER_SIZE,
buffer));

// Test with bad 'mtime' to make sure the entry is not stored.
ASSERT_FALSE(cache.Store(FNAME, -1000, 0, test_buffer(), TEMP_BUFFER_SIZE));
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/io/disk-io-mgr-stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ void DiskIoMgrStress::NewClient(int i) {

ScanRange* range = client.obj_pool.Add(new ScanRange);
range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, assigned_len,
0, false, false, BufferOpts::Uncached());
0, false, false, ScanRange::INVALID_MTIME, BufferOpts::Uncached());
client.scan_ranges.push_back(range);
assigned_len += range_len;
}
Expand Down
17 changes: 9 additions & 8 deletions be/src/runtime/io/disk-io-mgr-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ class DiskIoMgrTest : public testing::Test {
if (status.ok()) {
ScanRange* scan_range = pool_.Add(new ScanRange());
scan_range->Reset(nullptr, (*written_range)->file(), (*written_range)->len(),
(*written_range)->offset(), 0, false, false, BufferOpts::Uncached());
(*written_range)->offset(), 0, false, false, ScanRange::INVALID_MTIME,
BufferOpts::Uncached());
ValidateSyncRead(io_mgr, reader, client, scan_range,
reinterpret_cast<const char*>(data), sizeof(int32_t));
}
Expand Down Expand Up @@ -241,8 +242,8 @@ class DiskIoMgrTest : public testing::Test {
int disk_id, int64_t mtime, void* meta_data = nullptr, bool is_cached = false,
std::vector<ScanRange::SubRange> sub_ranges = {}) {
ScanRange* range = pool->Add(new ScanRange);
range->Reset(nullptr, file_path, len, offset, disk_id, true, false,
BufferOpts(is_cached, mtime), move(sub_ranges), meta_data);
range->Reset(nullptr, file_path, len, offset, disk_id, true, false, mtime,
BufferOpts(is_cached), move(sub_ranges), meta_data);
EXPECT_EQ(mtime, range->mtime());
return range;
}
Expand Down Expand Up @@ -1445,7 +1446,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
vector<uint8_t> client_buffer(buffer_len);
int scan_len = min(len, buffer_len);
ScanRange* range = pool_.Add(new ScanRange);
range->Reset(nullptr, tmp_file, scan_len, 0, 0, true, false,
range->Reset(nullptr, tmp_file, scan_len, 0, 0, true, false, ScanRange::INVALID_MTIME,
BufferOpts::ReadInto(client_buffer.data(), buffer_len));
bool needs_buffers;
ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
Expand Down Expand Up @@ -1491,9 +1492,9 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferSubRanges) {
int result_len = strlen(expected_result);
vector<uint8_t> client_buffer(result_len);
ScanRange* range = pool_.Add(new ScanRange);
range->Reset(nullptr, tmp_file, data_len, 0, 0, true, false,
BufferOpts::ReadInto(fake_cache, stat_val.st_mtime, client_buffer.data(),
result_len), move(sub_ranges));
range->Reset(nullptr, tmp_file, data_len, 0, 0, true, false, stat_val.st_mtime,
BufferOpts::ReadInto(fake_cache, client_buffer.data(), result_len),
move(sub_ranges));
if (fake_cache) {
SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cache, data_len));
}
Expand Down Expand Up @@ -1541,7 +1542,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
unique_ptr<RequestContext> reader = io_mgr->RegisterContext();
ScanRange* range = pool_.Add(new ScanRange);
range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true, false,
range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true, false, ScanRange::INVALID_MTIME,
BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN));
bool needs_buffers;
ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/io/handle-cache.inline.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ Status FileHandleCache::Init() {
Status FileHandleCache::GetFileHandle(
const hdfsFS& fs, std::string* fname, int64_t mtime, bool require_new_handle,
CachedHdfsFileHandle** handle_out, bool* cache_hit) {
DCHECK_GT(mtime, 0);
// Hash the key and get appropriate partition
int index = HashUtil::Hash(fname->data(), fname->size(), 0) % cache_partitions_.size();
FileHandleCachePartition& p = cache_partitions_[index];
Expand Down
46 changes: 21 additions & 25 deletions be/src/runtime/io/request-ranges.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,58 +167,46 @@ class RequestRange : public InternalQueue<RequestRange>::Node {
struct BufferOpts {
public:
// Set options for a read into an IoMgr-allocated or HDFS-cached buffer. Caching is
// enabled if 'try_cache' is true, the file is in the HDFS cache and 'mtime' matches
// the modified time of the cached file in the HDFS cache.
BufferOpts(bool try_cache, int64_t mtime)
// enabled if 'try_cache' is true and the file is in the HDFS cache.
BufferOpts(bool try_cache)
: try_cache_(try_cache),
mtime_(mtime),
client_buffer_(nullptr),
client_buffer_len_(-1) {}

/// Set options for an uncached read into an IoMgr-allocated buffer.
static BufferOpts Uncached() {
return BufferOpts(false, NEVER_CACHE, nullptr, -1);
return BufferOpts(false, nullptr, -1);
}

/// Set options to read the entire scan range into 'client_buffer'. The length of the
/// buffer, 'client_buffer_len', must fit the entire scan range. HDFS caching is not
/// enabled in this case.
static BufferOpts ReadInto(uint8_t* client_buffer, int64_t client_buffer_len) {
return BufferOpts(false, NEVER_CACHE, client_buffer, client_buffer_len);
return BufferOpts(false, client_buffer, client_buffer_len);
}

/// Use only when you don't want to to read the entire scan range, but only sub-ranges
/// in it. In this case you can copy the relevant parts from the HDFS cache into the
/// client buffer. The length of the buffer, 'client_buffer_len' must fit the
/// concatenation of all the sub-ranges.
static BufferOpts ReadInto(bool try_cache, int64_t mtime, uint8_t* client_buffer,
static BufferOpts ReadInto(bool try_cache, uint8_t* client_buffer,
int64_t client_buffer_len) {
return BufferOpts(try_cache, mtime, client_buffer, client_buffer_len);
return BufferOpts(try_cache, client_buffer, client_buffer_len);
}

private:
friend class ScanRange;
friend class HdfsFileReader;
FRIEND_TEST(DataCacheTest, TestBasics);

BufferOpts(
bool try_cache, int64_t mtime, uint8_t* client_buffer, int64_t client_buffer_len)
BufferOpts(bool try_cache, uint8_t* client_buffer, int64_t client_buffer_len)
: try_cache_(try_cache),
mtime_(mtime),
client_buffer_(client_buffer),
client_buffer_len_(client_buffer_len) {}

/// If 'mtime_' is set to NEVER_CACHE, the file handle will never be cached, because
/// the modification time won't match.
const static int64_t NEVER_CACHE = -1;

/// If true, read from HDFS cache if possible.
const bool try_cache_;

/// Last modified time of the file associated with the scan range. If set to
/// NEVER_CACHE, caching is disabled.
const int64_t mtime_;

/// A destination buffer provided by the client, nullptr and -1 if no buffer.
uint8_t* const client_buffer_;
const int64_t client_buffer_len_;
Expand All @@ -244,22 +232,26 @@ class ScanRange : public RequestRange {
/// local filesystem). The scan range must be non-empty and fall within the file bounds
/// (len > 0 and offset >= 0 and offset + len <= file_length). 'disk_id' is the disk
/// queue to add the range to. If 'expected_local' is true, a warning is generated if
/// the read did not come from a local disk. 'buffer_opts' specifies buffer management
/// options - see the DiskIoMgr class comment and the BufferOpts comments for details.
/// the read did not come from a local disk. 'mtime' is the last modification time for
/// 'file'; the mtime must change when the file changes. 'is_erasure_coded' is whether
/// 'file' is stored using HDFS erasure coding.
/// 'buffer_opts' specifies buffer management options - see the DiskIoMgr class comment
/// and the BufferOpts comments for details.
/// 'meta_data' is an arbitrary client-provided pointer for any auxiliary data.
///
/// TODO: IMPALA-4249: clarify if a ScanRange can be reused after Reset(). Currently
/// it is not generally safe to do so, but some unit tests reuse ranges after
/// successfully reading to eos.
void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
bool expected_local, bool is_erasure_coded, const BufferOpts& buffer_opts,
void* meta_data = nullptr);
bool expected_local, bool is_erasure_coded, int64_t mtime,
const BufferOpts& buffer_opts, void* meta_data = nullptr);

/// Same as above, but it also adds sub-ranges. No need to merge contiguous sub-ranges
/// in advance, as this method will do the merge.
void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
bool expected_local, bool is_erasure_coded, const BufferOpts& buffer_opts,
std::vector<SubRange>&& sub_ranges, void* meta_data = nullptr);
bool expected_local, bool is_erasure_coded, int64_t mtime,
const BufferOpts& buffer_opts, std::vector<SubRange>&& sub_ranges,
void* meta_data = nullptr);

void* meta_data() const { return meta_data_; }
bool try_cache() const { return try_cache_; }
Expand Down Expand Up @@ -292,6 +284,10 @@ class ScanRange : public RequestRange {
/// return a descriptive string for debug.
std::string DebugString() const;

/// Non-HDFS files (e.g. local files) do not use mtime, so they should use this known
/// bogus mtime.
const static int64_t INVALID_MTIME = -1;

int64_t mtime() const { return mtime_; }

bool HasSubRanges() const { return !sub_ranges_.empty(); }
Expand Down
12 changes: 7 additions & 5 deletions be/src/runtime/io/scan-range.cc
Original file line number Diff line number Diff line change
Expand Up @@ -446,14 +446,14 @@ ScanRange::~ScanRange() {
}

void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
int disk_id, bool expected_local, bool is_erasure_coded,
int disk_id, bool expected_local, bool is_erasure_coded, int64_t mtime,
const BufferOpts& buffer_opts, void* meta_data) {
Reset(fs, file, len, offset, disk_id, expected_local, is_erasure_coded, buffer_opts,
{}, meta_data);
Reset(fs, file, len, offset, disk_id, expected_local, is_erasure_coded, mtime,
buffer_opts, {}, meta_data);
}

void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
int disk_id, bool expected_local, bool is_erasure_coded,
int disk_id, bool expected_local, bool is_erasure_coded, int64_t mtime,
const BufferOpts& buffer_opts, vector<SubRange>&& sub_ranges, void* meta_data) {
DCHECK(ready_buffers_.empty());
DCHECK(!read_in_flight_);
Expand All @@ -474,7 +474,9 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
offset_ = offset;
disk_id_ = disk_id;
try_cache_ = buffer_opts.try_cache_;
mtime_ = buffer_opts.mtime_;
// HDFS ranges must have an mtime > 0. Local ranges do not use mtime.
if (fs_) DCHECK_GT(mtime, 0);
mtime_ = mtime;
meta_data_ = meta_data;
if (buffer_opts.client_buffer_ != nullptr) {
external_buffer_tag_ = ExternalBufferTag::CLIENT_BUFFER;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/tmp-file-mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ Status TmpFileMgr::FileGroup::ReadAsync(WriteHandle* handle, MemRange buffer) {
handle->read_range_ = scan_range_pool_.Add(new ScanRange);
handle->read_range_->Reset(nullptr, handle->write_range_->file(),
handle->write_range_->len(), handle->write_range_->offset(),
handle->write_range_->disk_id(), false, false,
handle->write_range_->disk_id(), false, false, ScanRange::INVALID_MTIME,
BufferOpts::ReadInto(buffer.data(), buffer.len()));
read_counter_->Add(1);
bytes_read_counter_->Add(buffer.len());
Expand Down

0 comments on commit bbd1839

Please sign in to comment.