forked from zqcde/impala-lzo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hdfs-lzo-text-scanner.h
213 lines (174 loc) · 7.34 KB
/
hdfs-lzo-text-scanner.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
// Copyright (c) 2012 Cloudera, Inc. All rights reserved.
#ifndef IMPALA_LZO_TEXT_SCANNER_H
#define IMPALA_LZO_TEXT_SCANNER_H
#include "lzo-header.h"
#include <boost/thread/locks.hpp>
#include "common/version.h"
#include "exec/hdfs-text-scanner.h"
#include "runtime/string-buffer.h"
// This provides support for reading files compressed with lzop.
// The file consists of a header and compressed blocks preceeded
// by their compressed and uncompressed block sizes.
//
// The following is a pseudo-BNF grammar for LZOFile. Comments are prefixed
// with dashes:
//
// lzofile ::=
// <file-header>
// <compressed-block>+
//
// compressed-block ::=
// <uncompressed-size>
// <compressed-size>
// <uncompressed-checksums>
// <compressed-checksums>
// <compressed-data>
//
// file-header ::= -- most of this information is not used.
// <magic>
// <version>
// <lib-version>
// [<version-needed>] -- present for all modern files.
// <method>
// <flags>
// <mode>
// <mtime>
// <file-name>
// <header-checksum>
// <extra-field> -- presence indicated in flags, not currently used.
//
// <compressed-checksums> ::=
// [alder-checksum | crc-checksum]
//
// <uncompressed-checksums> ::=
// [alder-checksum | crc-checksum]
//
// <file-name> ::=
// <length> -- one byte
// <name>
//
namespace impala {
class ScannerContext;
class HdfsLzoTextScanner;
// HdfsScanner implementation that reads LZOP formatted text files.
// The format of the data, after decompression, is the same as HdfsText files.
// Records can span compresed blocks.
//
// An optional, but highly recommended, index file may exist in the same directory.
// This file is generated by running: com.hadoop.compression.lzo.DistributedLzoIndexer.
// The file contains the offsets to the start of each compressed block.
// This is used to find the beginning of a split and to skip over a bad block and
// find the next block.
// If there is no index file then the file is non-splittble. A single scan range
// will be issued for the whole file and no error recovery is done.
// Used to verify that this library was built against the expected Impala version when the
// library is loaded via dlopen.
// The function called resides in common/version.h.
extern "C" const char* GetImpalaBuildVersion() { return GetDaemonBuildVersion(); }
// The two functions below are wrappers for calling methods of HdfsLzoTextScanner
// when the library is loaded via dlopen.
// This function is a wrapper for the HdfsLzoTextScanner creator. The caller is expected
// to call delete on it.
// scan_node -- scan node that is creating this scanner.
// state -- runtime state for this scanner.
extern "C" HdfsLzoTextScanner* GetLzoTextScanner(
HdfsScanNodeBase* scan_node, RuntimeState* state);
// This function is a wrapper for HdfsLzoTextScanner::IssueInitialRanges.
// scan_node -- scan node for this scan
// files -- files that are to be scanned.
extern "C" Status LzoIssueInitialRangesImpl(
HdfsScanNodeBase* scan_node, const std::vector<HdfsFileDesc*>& files);
class HdfsLzoTextScanner : public HdfsTextScanner {
public:
HdfsLzoTextScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
virtual ~HdfsLzoTextScanner();
// Determines whether this scanner is processing an initial scan range for which it
// should only parse the file header and index file (if any). For non-initial scan
// ranges, stream_ is positioned to the first byte that contains data.
// Sets 'only_parsing_header_' and 'header_'. Sets 'eos_' to true if this scan range
// contains no tuples for which this scanner is responsible.
virtual Status Open(ScannerContext* context);
// If 'only_parsing_header_' is true, processes the header and index file, issues new
// scan ranges for the data and sets 'eos_' to true. Registers the header as scan range
// metadata in the parent scan node.
// Otherwise, calls the parent's GetNextInternal().
virtual Status GetNextInternal(RowBatch* row_batch);
// Attaches 'block_buffer_pool_' to 'row_batch'. If 'row_batch' is nullptr,
// then 'block_buffer_pool_' is freed instead. Calls the parent's Close().
virtual void Close(RowBatch* row_batch);
// Issue the initial scan ranges for all lzo-text files. This reads the
// file headers and then the reset of the file data will be issued from
// ProcessScanRange().
static Status LzoIssueInitialRangesImpl(
HdfsScanNodeBase* scan_node, const std::vector<HdfsFileDesc*>& files);
private:
enum LzoChecksum {
CHECK_NONE,
CHECK_CRC32,
CHECK_ADLER
};
// Block size in bytes used by LZOP. The compressed blocks will be no bigger than this.
const static int MAX_BLOCK_COMPRESSED_SIZE = (256 * 1024);
// This is the fixed size of the header. It can have up to 255 bytes of
// file name in it as well.
const static int MIN_HEADER_SIZE = 32;
// An over estimate of how big the header could be. There is a path name
// and an option seciton.
const static int HEADER_SIZE = 300;
// Header informatation, shared by all scanners on this file.
struct LzoFileHeader {
LzoChecksum input_checksum_type_;
LzoChecksum output_checksum_type_;
uint32_t header_size_;
// Offsets to compressed blocks.
std::vector<int64_t> offsets;
};
// Pointer to shared header information.
LzoFileHeader* header_;
// Fills the byte buffer by reading and decompressing blocks.
virtual Status FillByteBuffer(MemPool* pool, bool* eosr, int num_bytes = 0);
// Read header data and validate header.
Status ReadHeader();
// Read the index file and set up the header.offsets.
Status ReadIndexFile();
// Checksum data.
Status Checksum(LzoChecksum type,
const std::string& source, int expected_checksum, uint8_t* buffer, int length);
// Adjust the context_ to the first block at or after the current context offset.
// *found returns if a starting block was found.
Status FindFirstBlock(bool* found);
// Issue the full file ranges after reading the headers.
Status IssueFileRanges(const char* filename);
// Read a data block.
// sets: byte_buffer_ptr_, byte_buffer_read_size_ and eos_read_.
// Data will be in a mempool allocated buffer or in the disk I/O context memory
// if the data was not compressed.
// Attaches decompression buffers from previous calls that might still be referenced
// by returned batches to 'pool'.
Status ReadAndDecompressData(MemPool* pool);
// Read compress data and recover from errosr.
// Attaches decompression buffers from previous calls that might still be referenced
// by returned batches to 'pool'.
Status ReadData(MemPool* pool);
// Callback for stream_ to determine how much to read past the scan range.
static int MaxBlockCompressedSize(int64_t file_offset) {
return MAX_BLOCK_COMPRESSED_SIZE;
}
// Pool for allocating the block_buffer_.
boost::scoped_ptr<MemPool> block_buffer_pool_;
// Buffer to hold decompressed data.
uint8_t* block_buffer_;
// Allocated length of the block_buffer_
int32_t block_buffer_len_;
// Next byte to be returned from the buffer holding decompressed data blocks.
uint8_t* block_buffer_ptr_;
// Bytes remaining in the block_buffer.
int bytes_remaining_;
// True if the end of scan has been read.
bool eos_read_;
// This is set when the scanner object is constructed. Currently always true.
// HDFS checksums the blocks from the disk to the client, so this is redundent.
bool disable_checksum_;
};
}
#endif