@@ -29,7 +29,23 @@ namespace turi {
29
29
using fileio::file_status;
30
30
31
31
/* *
32
- * Code from
32
+ * Escape BOMs
33
+ */
34
+ void skip_BOM (general_ifstream& fin) {
35
+ char fChar , sChar , tChar;
36
+ fChar = fin.get ();
37
+ sChar = fin.get ();
38
+ tChar = fin.get ();
39
+ bool isBOM = ((fChar == (char )0xEF ) && (sChar == (char )0xBB ) && (tChar == (char )0xBF ));
40
+ if (!isBOM) {
41
+ fin.putback (tChar);
42
+ fin.putback (sChar );
43
+ fin.putback (fChar );
44
+ }
45
+ }
46
+
47
+ /* *
48
+ * Code from
33
49
* http://stackoverflow.com/questions/6089231/getting-std-ifstream-to-handle-lf-cr-and-crlf
34
50
*
35
51
* A getline implementation which supports '\n', '\r' and '\r\n'
@@ -70,13 +86,13 @@ std::istream& eol_safe_getline(std::istream& is, std::string& t) {
70
86
/* *
71
87
* Reads until the eol string is encountered
72
88
*/
73
- std::istream& custom_eol_getline (std::istream& is,
74
- std::string& t,
89
+ std::istream& custom_eol_getline (std::istream& is,
90
+ std::string& t,
75
91
const std::string& eol) {
76
92
t.clear ();
77
93
if (eol.empty ()) {
78
94
// read the entire stream
79
- t = std::string (std::istreambuf_iterator<char >(is),
95
+ t = std::string (std::istreambuf_iterator<char >(is),
80
96
std::istreambuf_iterator<char >());
81
97
return is;
82
98
} else {
@@ -108,12 +124,12 @@ std::istream& custom_eol_getline(std::istream& is,
108
124
}
109
125
110
126
/* *
111
- * if eol == "\n", this will get read a line until the next
127
+ * if eol == "\n", this will get read a line until the next
112
128
* "\n", "\r" or "\r\n" sequence.
113
129
* Otherwise, it will read until the eol string appears
114
130
*/
115
- std::istream& eol_getline (std::istream& is,
116
- std::string& t,
131
+ std::istream& eol_getline (std::istream& is,
132
+ std::string& t,
117
133
const std::string& eol) {
118
134
if (eol == " \n " ) {
119
135
return eol_safe_getline (is, t);
@@ -138,11 +154,11 @@ class parallel_csv_parser {
138
154
* the type of each column. Generally this should the same
139
155
* number of columns as in the CSV. If only a subset if
140
156
* (say 3 out of 4) columns are to be stored, this should
141
- * contain the types of the output columns, and
157
+ * contain the types of the output columns, and
142
158
* column_output_order is used to map the CSV columns to
143
- * the output columns.
159
+ * the output columns.
144
160
* \param tokenizer The tokenizer rules to use
145
- * \param continue_on_failure Whether to keep going even when an error is
161
+ * \param continue_on_failure Whether to keep going even when an error is
146
162
* encountered.
147
163
* \param store_errors Whether to store bad lines in a separate SArray
148
164
* \param row_limit Maximum number of rows to read
@@ -151,30 +167,30 @@ class parallel_csv_parser {
151
167
* if output_order[i] == -1, the column is ignored.
152
168
* If output_order is empty (default), this is equivalent
153
169
* to the having output_order[i] == i.
154
- * \param num_threads Amount of parallelism to use.
170
+ * \param num_threads Amount of parallelism to use.
155
171
*/
156
- parallel_csv_parser (std::vector<flex_type_enum> column_types,
172
+ parallel_csv_parser (std::vector<flex_type_enum> column_types,
157
173
csv_line_tokenizer tokenizer,
158
174
bool continue_on_failure,
159
175
bool store_errors,
160
176
size_t row_limit,
161
177
std::vector<size_t > column_output_order = std::vector<size_t >(),
162
178
size_t num_threads = thread_pool::get_instance().size()):
163
- nthreads (std::max<size_t >(num_threads, 2 ) - 1),
164
- parsed_buffer(nthreads), parsed_buffer_last_elem(nthreads),
165
- writing_buffer(nthreads), writing_buffer_last_elem(nthreads),
179
+ nthreads (std::max<size_t >(num_threads, 2 ) - 1),
180
+ parsed_buffer(nthreads), parsed_buffer_last_elem(nthreads),
181
+ writing_buffer(nthreads), writing_buffer_last_elem(nthreads),
166
182
error_buffer(nthreads), writing_error_buffer(nthreads),
167
- thread_local_tokenizer(nthreads, tokenizer),
168
- read_group(thread_pool::get_instance()),
183
+ thread_local_tokenizer(nthreads, tokenizer),
184
+ read_group(thread_pool::get_instance()),
169
185
write_group(thread_pool::get_instance()), column_types(column_types),
170
186
column_output_order(column_output_order),
171
- row_limit(row_limit),
172
- continue_on_failure(continue_on_failure),
187
+ row_limit(row_limit),
188
+ continue_on_failure(continue_on_failure),
173
189
store_errors(store_errors),
174
190
line_terminator(tokenizer.line_terminator),
175
191
is_regular_line_terminator(line_terminator == " \n " ) {
176
192
};
177
-
193
+
178
194
/* *
179
195
* Sets the total size of all inputs. Required if multiple output segments
180
196
* are desired. Otherwise all outputs will go to segment 0.
@@ -185,16 +201,16 @@ class parallel_csv_parser {
185
201
/* *
186
202
* Parses an input file into an output frame
187
203
*/
188
- void parse (general_ifstream& fin,
189
- sframe& output_frame,
204
+ void parse (general_ifstream& fin,
205
+ sframe& output_frame,
190
206
sarray<flexible_type>& errors) {
191
207
size_t num_output_segments = output_frame.num_segments ();
192
208
size_t current_input_file_size = fin.file_size ();
193
209
try {
194
210
timer ti;
195
211
bool fill_buffer_is_good = true ;
196
- while (fin.good () && fill_buffer_is_good &&
197
- (row_limit == 0 || lines_read.value < row_limit)) {
212
+ while (fin.good () && fill_buffer_is_good &&
213
+ (row_limit == 0 || lines_read.value < row_limit)) {
198
214
fill_buffer_is_good = fill_buffer (fin);
199
215
if (buffer.size () == 0 ) break ;
200
216
@@ -234,10 +250,10 @@ class parallel_csv_parser {
234
250
if (total_input_file_sizes > 0 ) {
235
251
// compute the current output segment
236
252
// It really is simply.
237
- // current_output_segment =
238
- // (fin.get_bytes_read() + cumulative_file_read_sizes)
253
+ // current_output_segment =
254
+ // (fin.get_bytes_read() + cumulative_file_read_sizes)
239
255
// * num_output_segments / total_input_file_sizes;
240
- // But a lot of sanity checking is required because
256
+ // But a lot of sanity checking is required because
241
257
// - fin.get_bytes_read() may fail.
242
258
// - files on disk may change after I last computed the file sizes, so
243
259
// there is no guarantee that cumulatively, they will all add up.
@@ -252,7 +268,7 @@ class parallel_csv_parser {
252
268
read_pos += cumulative_file_read_sizes;
253
269
}
254
270
next_output_segment = read_pos * num_output_segments / total_input_file_sizes;
255
- // sanity boundary check
271
+ // sanity boundary check
256
272
if (next_output_segment >= num_output_segments) next_output_segment = num_output_segments - 1 ;
257
273
// we never go back
258
274
current_output_segment = std::max (current_output_segment, next_output_segment);
@@ -275,9 +291,9 @@ class parallel_csv_parser {
275
291
276
292
cumulative_file_read_sizes += current_input_file_size;
277
293
} catch (...) {
278
- try { read_group.join (); } catch (...) { }
279
- try { write_group.join (); } catch (...) { }
280
- // even on a failure, we still increment the cumulative read count
294
+ try { read_group.join (); } catch (...) { }
295
+ try { write_group.join (); } catch (...) { }
296
+ // even on a failure, we still increment the cumulative read count
281
297
cumulative_file_read_sizes += current_input_file_size;
282
298
throw ;
283
299
}
@@ -292,7 +308,7 @@ class parallel_csv_parser {
292
308
293
309
/* *
294
310
* Returns the number of CSV lines read
295
- */
311
+ */
296
312
size_t num_lines_read () const {
297
313
return lines_read.value ;
298
314
}
@@ -304,7 +320,7 @@ class parallel_csv_parser {
304
320
if (column_output_order.empty ()) return column_types.size ();
305
321
else return column_output_order.size ();
306
322
}
307
-
323
+
308
324
/* *
309
325
* Returns the number of output columns in the CSV file
310
326
*/
@@ -372,7 +388,7 @@ class parallel_csv_parser {
372
388
373
389
inline bool is_end_line_str (char * c, char * cend) const {
374
390
if (is_regular_line_terminator) return (*c) == ' \n ' || (*c) == ' \r ' ;
375
- else if (line_terminator.empty () == false &&
391
+ else if (line_terminator.empty () == false &&
376
392
cend - c >= (int )(line_terminator.length ())) {
377
393
for (char nl : line_terminator) {
378
394
if (nl != (*c)) return false ;
@@ -400,7 +416,7 @@ class parallel_csv_parser {
400
416
newline_was_matched = true ;
401
417
return c + 1 ;
402
418
} else if ((*c) == ' \r ' ) {
403
- // its a \r. It could be just a \r, or a \r\n.
419
+ // its a \r. It could be just a \r, or a \r\n.
404
420
// check for \r\n
405
421
if (c + 1 < cend && (*(c+1 )) == ' \n ' ) {
406
422
// its a \r\n, advance past and return
@@ -429,7 +445,7 @@ class parallel_csv_parser {
429
445
}
430
446
++c;
431
447
}
432
- }
448
+ }
433
449
newline_was_matched = false ;
434
450
return cend;
435
451
}
@@ -448,13 +464,13 @@ class parallel_csv_parser {
448
464
local_tokens[i].reset (column_types[i]);
449
465
}
450
466
}
451
- const std::vector<size_t >* ptr_to_output_order =
467
+ const std::vector<size_t >* ptr_to_output_order =
452
468
column_output_order.empty () ? nullptr : &column_output_order;
453
469
454
- size_t num_tokens_parsed =
470
+ size_t num_tokens_parsed =
455
471
thread_local_tokenizer[threadid].
456
472
tokenize_line (pstart, pnext - pstart,
457
- local_tokens,
473
+ local_tokens,
458
474
true /* permit undefined*/ ,
459
475
ptr_to_output_order);
460
476
@@ -473,17 +489,17 @@ class parallel_csv_parser {
473
489
if (num_failures.value < 10 ) {
474
490
std::string badline = std::string (pstart, pnext - pstart);
475
491
if (badline.length () > 256 ) badline=badline.substr (0 , 256 ) + " ..." ;
476
- logprogress_stream << std::string (" Unable to parse line \" " ) +
492
+ logprogress_stream << std::string (" Unable to parse line \" " ) +
477
493
badline + " \" " << std::endl;
478
494
}
479
495
++num_failures;
480
496
} else {
481
- log_and_throw (std::string (" Unable to parse line \" " ) +
497
+ log_and_throw (std::string (" Unable to parse line \" " ) +
482
498
std::string (pstart, pnext - pstart) + " \"\n " +
483
499
" Set error_bad_lines=False to skip bad lines" );
484
500
}
485
- }
486
- }
501
+ }
502
+ }
487
503
}
488
504
/* *
489
505
* Performs the parse on a section of the buffer (threadid in nthreads)
@@ -500,7 +516,7 @@ class parallel_csv_parser {
500
516
if (threadid == nthreads - 1 ) pend = bufend;
501
517
502
518
// ok, this is important. Pay attention.
503
- // We are sweeping from
519
+ // We are sweeping from
504
520
// - the first line which begins AFTER pstart, but before pend
505
521
// - And we are finishing on the last line which ends AFTER pend.
506
522
// (if we are the last thread, something special happens and
@@ -515,10 +531,10 @@ class parallel_csv_parser {
515
531
// hello, world abcd
516
532
// 1, 2 abcd
517
533
// 3, 4 abcd
518
- //
534
+ //
519
535
// Then whichever range includes a "d" handles the line after that.
520
536
//
521
- // This is a little subtle when the line_terminator may be multiple
537
+ // This is a little subtle when the line_terminator may be multiple
522
538
// characters.
523
539
//
524
540
@@ -531,34 +547,34 @@ class parallel_csv_parser {
531
547
bool start_position_found = (threadid == 0 );
532
548
if (threadid > 0 ) {
533
549
// find the first line beginning after pstart but before pend
534
-
535
- // if we have a multicharacter line terminator, we have to be a bit
536
- // intelligent. to match the "last character" of the terminator,
537
- // we need to shift the newline search backwards by
550
+
551
+ // if we have a multicharacter line terminator, we have to be a bit
552
+ // intelligent. to match the "last character" of the terminator,
553
+ // we need to shift the newline search backwards by
538
554
// line_terminator.length() - 1 characters
539
- if (!is_regular_line_terminator &&
555
+ if (!is_regular_line_terminator &&
540
556
line_terminator.length () > 1 &&
541
557
// make sure there is enough room to shift backwards
542
- pstart - bufstart >= int (line_terminator.length () - 1 )) {
558
+ pstart - bufstart >= int (line_terminator.length () - 1 )) {
543
559
pstart -= line_terminator.length () - 1 ;
544
560
}
545
561
bool newline_was_matched;
546
562
pstart = advance_past_newline (pstart, pend, newline_was_matched);
547
563
if (newline_was_matched) {
548
564
start_position_found = true ;
549
565
}
550
- }
566
+ }
551
567
if (start_position_found) {
552
568
/* *************************************************************************/
553
569
/* */
554
570
/* Find the End Position */
555
571
/* */
556
572
/* *************************************************************************/
557
573
// find the end position
558
- if (!is_regular_line_terminator &&
574
+ if (!is_regular_line_terminator &&
559
575
line_terminator.length () > 1 &&
560
576
// make sure there is enough room to shift backwards
561
- pend - bufstart >= int (line_terminator.length () - 0 )) {
577
+ pend - bufstart >= int (line_terminator.length () - 0 )) {
562
578
pend -= line_terminator.length () - 1 ;
563
579
}
564
580
bool newline_was_matched_unused;
@@ -572,7 +588,7 @@ class parallel_csv_parser {
572
588
char * pnext = pstart;
573
589
574
590
// the rule that every line must end with a terminator is wrong when
575
- // the line terminator is empty. some special handling is needed for this
591
+ // the line terminator is empty. some special handling is needed for this
576
592
// case.
577
593
if (line_terminator.empty ()) {
578
594
parse_line (pstart, pend, threadid);
@@ -596,17 +612,17 @@ class parallel_csv_parser {
596
612
}
597
613
598
614
/* *
599
- * Adds a line terminator to the buffer if it does not already
600
- * end with a line terminator. Used by the buffer reading routines on
615
+ * Adds a line terminator to the buffer if it does not already
616
+ * end with a line terminator. Used by the buffer reading routines on
601
617
* EOF so that the parser is always guaranteed that every line
602
- * ends with a line terminator, even the last line.
618
+ * ends with a line terminator, even the last line.
603
619
*/
604
620
void add_line_terminator_to_buffer () {
605
- if (is_regular_line_terminator &&
606
- buffer[buffer.length () - 1 ] != ' \n ' &&
621
+ if (is_regular_line_terminator &&
622
+ buffer[buffer.length () - 1 ] != ' \n ' &&
607
623
buffer[buffer.length () - 1 ] != ' \r ' ) {
608
624
buffer.push_back (' \n ' );
609
- } else if (!is_regular_line_terminator &&
625
+ } else if (!is_regular_line_terminator &&
610
626
buffer.length () >= line_terminator.length () &&
611
627
buffer.substr (buffer.length () - line_terminator.length ()) != line_terminator) {
612
628
buffer += line_terminator;
@@ -649,7 +665,7 @@ class parallel_csv_parser {
649
665
// parse buffer in parallel
650
666
mutex last_parsed_token_lock;
651
667
char * last_parsed_token = &(buffer[0 ]);
652
-
668
+
653
669
for (size_t threadid = 0 ; threadid < nthreads; ++threadid) {
654
670
read_group.launch (
655
671
[=,&last_parsed_token_lock,&last_parsed_token](void ) {
@@ -668,13 +684,13 @@ class parallel_csv_parser {
668
684
}
669
685
670
686
/* *
671
- * Spins up a background thread to write parse results from parallel_parse
672
- * to the output frame. First the parsed_buffer is swapped into the
687
+ * Spins up a background thread to write parse results from parallel_parse
688
+ * to the output frame. First the parsed_buffer is swapped into the
673
689
* writing_buffer, thus permitting the parsed_buffer to be used again in
674
690
* a different thread.
675
691
*/
676
- void start_background_write (sframe& output_frame,
677
- sarray<flexible_type>& errors_array,
692
+ void start_background_write (sframe& output_frame,
693
+ sarray<flexible_type>& errors_array,
678
694
size_t output_segment) {
679
695
// switch the parse buffer with the write buffer
680
696
writing_buffer.swap (parsed_buffer);
@@ -688,17 +704,17 @@ class parallel_csv_parser {
688
704
write_group.launch ([&, output_segment] {
689
705
auto iter = output_frame.get_output_iterator (output_segment);
690
706
for (size_t i = 0 ; i < writing_buffer.size (); ++i) {
691
- std::copy (writing_buffer[i].begin (),
707
+ std::copy (writing_buffer[i].begin (),
692
708
writing_buffer[i].begin () + writing_buffer_last_elem[i], iter);
693
709
lines_read.inc (writing_buffer_last_elem[i]);
694
710
}
695
- if (store_errors) {
711
+ if (store_errors) {
696
712
auto errors_iter = errors_array.get_output_iterator (0 );
697
713
for (auto & chunk_errors : writing_error_buffer) {
698
714
std::copy (chunk_errors.begin (), chunk_errors.end (), errors_iter);
699
715
chunk_errors.clear ();
700
716
}
701
- }
717
+ }
702
718
background_thread_running = false ;
703
719
});
704
720
}
@@ -718,12 +734,12 @@ class parallel_csv_parser {
718
734
*
719
735
* e.g.
720
736
* {"A", "A", "A.1"} --> {"A", "A.2", "A.1"}
721
- *
737
+ *
722
738
* \param column_names The set of column names to be renamed. The vector
723
739
* will be modified in place.
724
740
*/
725
741
void make_unique_column_names (std::vector<std::string>& column_names) {
726
- // this is the set of column names to the left of the column we
742
+ // this is the set of column names to the left of the column we
727
743
// are current inspected. i.e. these column names are already validated to
728
744
// be correct.
729
745
log_func_entry ();
@@ -738,7 +754,7 @@ void make_unique_column_names(std::vector<std::string>& column_names) {
738
754
// already exists.
739
755
std::set<std::string> all_column_names (column_names.begin (),
740
756
column_names.end ());
741
- // start incrementing at A.1, A.2, etc.
757
+ // start incrementing at A.1, A.2, etc.
742
758
size_t number = 1 ;
743
759
std::string new_column_name;
744
760
while (1 ) {
@@ -783,6 +799,7 @@ void read_csv_header(csv_info& info,
783
799
if (!probe_fin.good ()) {
784
800
log_and_throw (" Fail reading " + sanitize_url (path));
785
801
}
802
+ skip_BOM (probe_fin);
786
803
787
804
// skip skip_rows lines
788
805
std::string skip_string;
@@ -794,8 +811,8 @@ void read_csv_header(csv_info& info,
794
811
while (first_line_tokens.size () == 0 && probe_fin.good ()) {
795
812
eol_getline (probe_fin, first_line, tokenizer.line_terminator );
796
813
boost::algorithm::trim (first_line);
797
- tokenizer.tokenize_line (&(first_line[0 ]),
798
- first_line.length (),
814
+ tokenizer.tokenize_line (&(first_line[0 ]),
815
+ first_line.length (),
799
816
first_line_tokens);
800
817
}
801
818
@@ -829,7 +846,7 @@ void read_csv_header(csv_info& info,
829
846
/* - column_types.size() == column_names.size() == ncols */
830
847
/* */
831
848
/* *************************************************************************/
832
- void get_column_types (csv_info& info,
849
+ void get_column_types (csv_info& info,
833
850
std::map<std::string, flex_type_enum> column_type_hints) {
834
851
info.column_types .resize (info.ncols , flex_type_enum::STRING);
835
852
@@ -838,7 +855,7 @@ void get_column_types(csv_info& info,
838
855
} else if (column_type_hints.count (" __X0__" )) {
839
856
if (column_type_hints.size () != info.column_types .size ()) {
840
857
std::stringstream warning_msg;
841
- warning_msg << " column_type_hints has different size from actual number of columns: "
858
+ warning_msg << " column_type_hints has different size from actual number of columns: "
842
859
<< " column_type_hints.size()=" << column_type_hints.size ()
843
860
<< " ;number of columns=" << info.ncols
844
861
<< std::endl;
@@ -874,18 +891,19 @@ void get_column_types(csv_info& info,
874
891
875
892
} // anonymous namespace
876
893
894
+
877
895
/* *
878
896
* Parsed a CSV file to an SFrame.
879
897
*
880
898
* \param path The file to open as a csv
881
- * \param tokenizer The tokenizer configuration to use. This should be
899
+ * \param tokenizer The tokenizer configuration to use. This should be
882
900
* filled with all the tokenization rules (like what
883
- * separator character to use, what quoting character to use,
901
+ * separator character to use, what quoting character to use,
884
902
* etc.)
885
903
* \param writer The sframe writer to use.
886
- * \param frame_sidx_file Where to write the frame to
904
+ * \param frame_sidx_file Where to write the frame to
887
905
* \param parallel_csv_parser A parallel_csv_parser
888
- * \param errors A reference to a map in which to store an sarray of bad lines
906
+ * \param errors A reference to a map in which to store an sarray of bad lines
889
907
* for each input file.
890
908
*/
891
909
void parse_csv_to_sframe (
@@ -907,14 +925,15 @@ void parse_csv_to_sframe(
907
925
{
908
926
general_ifstream fin (path);
909
927
if (!fin.good ()) log_and_throw (" Cannot open " + sanitize_url (path));
928
+ skip_BOM (fin);
910
929
911
930
// skip skip_rows lines
912
931
std::string skip_string;
913
932
for (size_t i = 0 ;i < skip_rows; ++i) {
914
933
eol_getline (fin, skip_string, tokenizer.line_terminator );
915
934
}
916
935
917
- // if use_header, we keep throwing away empty or comment lines until we
936
+ // if use_header, we keep throwing away empty or comment lines until we
918
937
// get one good line
919
938
if (use_header) {
920
939
std::vector<std::string> first_line_tokens;
@@ -924,16 +943,16 @@ void parse_csv_to_sframe(
924
943
eol_getline (fin, line, tokenizer.line_terminator );
925
944
tokenizer.tokenize_line (&(line[0 ]), line.length (), first_line_tokens);
926
945
}
927
- // if we are going to store errors, we don't do early skippng on
946
+ // if we are going to store errors, we don't do early skippng on
928
947
// mismatched files
929
- if (!store_errors &&
948
+ if (!store_errors &&
930
949
first_line_tokens.size () != parser.num_input_columns ()) {
931
950
logprogress_stream << " Unexpected number of columns found in " << path
932
951
<< " . Skipping this file." << std::endl;
933
952
return ;
934
953
}
935
954
}
936
-
955
+
937
956
// store errors for this particular file in an sarray
938
957
auto file_errors = std::make_shared<sarray<flexible_type>>();
939
958
if (store_errors) {
@@ -950,8 +969,8 @@ void parse_csv_to_sframe(
950
969
}
951
970
952
971
if (continue_on_failure && parser.num_lines_failed () > 0 ) {
953
- logprogress_stream << parser.num_lines_failed ()
954
- << " lines failed to parse correctly"
972
+ logprogress_stream << parser.num_lines_failed ()
973
+ << " lines failed to parse correctly"
955
974
<< std::endl;
956
975
}
957
976
@@ -980,14 +999,14 @@ std::map<std::string, std::shared_ptr<sarray<flexible_type>>> parse_csvs_to_sfra
980
999
auto output_columns = options.output_columns ;
981
1000
auto row_limit = options.row_limit ;
982
1001
auto skip_rows = options.skip_rows ;
983
-
1002
+
984
1003
if (store_errors) continue_on_failure = true ;
985
- // otherwise, check that url is valid directory, and get its listing if no
1004
+ // otherwise, check that url is valid directory, and get its listing if no
986
1005
// pattern present
987
1006
std::vector<std::string> files;
988
1007
bool found_zero_byte_files = false ;
989
1008
std::vector<std::pair<std::string, file_status>> file_and_status = fileio::get_glob_files (url);
990
-
1009
+
991
1010
for (auto p : file_and_status) {
992
1011
if (p.second == file_status::REGULAR_FILE) {
993
1012
// throw away empty files
@@ -1007,7 +1026,7 @@ std::map<std::string, std::shared_ptr<sarray<flexible_type>>> parse_csvs_to_sfra
1007
1026
<< std::endl;
1008
1027
}
1009
1028
1010
- logstream (LOG_INFO) << " Adding CSV file "
1029
+ logstream (LOG_INFO) << " Adding CSV file "
1011
1030
<< sanitize_url (p.first )
1012
1031
<< " to list of files to parse"
1013
1032
<< std::endl;
@@ -1016,7 +1035,7 @@ std::map<std::string, std::shared_ptr<sarray<flexible_type>>> parse_csvs_to_sfra
1016
1035
}
1017
1036
1018
1037
file_and_status.clear (); // don't need these anymore
1019
-
1038
+
1020
1039
// ensure that we actually found some valid files
1021
1040
if (files.empty ()) {
1022
1041
if (found_zero_byte_files) {
@@ -1056,11 +1075,11 @@ std::map<std::string, std::shared_ptr<sarray<flexible_type>>> parse_csvs_to_sfra
1056
1075
for (size_t i = 0 ;i < output_columns.size (); ++i) {
1057
1076
const auto & outcol = output_columns[i];
1058
1077
auto iter = std::find (info.column_names .begin (),
1059
- info.column_names .end (),
1078
+ info.column_names .end (),
1060
1079
outcol);
1061
1080
// Cannot find this column in the talble?
1062
1081
// is output_columns a positional type? i.e. "X" something
1063
- if (iter == info.column_names .end () &&
1082
+ if (iter == info.column_names .end () &&
1064
1083
outcol.length () > 1 && outcol[i] == ' X' ) {
1065
1084
size_t colnumber = stoull (outcol.substr (1 ));
1066
1085
// column number is 1 based
@@ -1097,8 +1116,8 @@ std::map<std::string, std::shared_ptr<sarray<flexible_type>>> parse_csvs_to_sfra
1097
1116
if (!frame.is_opened_for_write ()) {
1098
1117
// open as many segments as there are temp directories.
1099
1118
// But at least one segment
1100
- frame.open_for_write (info.column_names , info.column_types ,
1101
- frame_sidx_file,
1119
+ frame.open_for_write (info.column_names , info.column_types ,
1120
+ frame_sidx_file,
1102
1121
std::max<size_t >(1 , num_temp_directories ()));
1103
1122
}
1104
1123
@@ -1109,19 +1128,19 @@ std::map<std::string, std::shared_ptr<sarray<flexible_type>>> parse_csvs_to_sfra
1109
1128
parser.start_timer ();
1110
1129
1111
1130
for (auto file : files) {
1112
- // check that we've read < row_limit
1113
- if (parser.num_lines_read () < row_limit || row_limit == 0 ) {
1114
- parse_csv_to_sframe (file, tokenizer, options, frame,
1131
+ // check that we've read < row_limit
1132
+ if (parser.num_lines_read () < row_limit || row_limit == 0 ) {
1133
+ parse_csv_to_sframe (file, tokenizer, options, frame,
1115
1134
frame_sidx_file, parser, errors);
1116
1135
} else break ;
1117
1136
}
1118
-
1137
+
1119
1138
logprogress_stream << " Parsing completed. Parsed " << parser.num_lines_read ()
1120
1139
<< " lines in " << parser.get_time_elapsed () << " secs." << std::endl;
1121
1140
1122
-
1141
+
1123
1142
if (frame.is_opened_for_write ()) frame.close ();
1124
-
1143
+
1125
1144
return errors;
1126
1145
}
1127
1146
0 commit comments