forked from JuliaData/CSV.jl
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfile.jl
877 lines (842 loc) · 43.2 KB
/
file.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
# when iterating through rows that are views into `Column2`s,
# it's efficient to keep track of which column chunk we're iterating
# through, hence ThreadedIterationState
mutable struct ThreadedIterationState
row::Int64
array_index::Int64
array_i::Int64
array_len::Int64
array_lens::Vector{Int64}
end
# normal single-threaded Column with a single `tape` that holds
# our results from parsing
struct Column{T, P} <: AbstractVector{T}
tape::Vector{UInt64}
len::Int
e::UInt8
catg::Bool
refs::Union{Vector{String}, Nothing}
buf::Vector{UInt8}
sentinel::UInt64
end
# Column2 holds a `Column` per thread that was parsed
# each `Column` has a tape for a chunk of the file
struct Column2{T, P} <: AbstractVector{T}
columns::Vector{Column{T, P}}
len::Int
end
_eltype(::Type{T}) where {T} = T
_eltype(::Type{PooledString}) = String
_eltype(::Type{Union{PooledString, Missing}}) = Union{String, Missing}
Base.size(c::Union{Column, Column2}) = (c.len,)
Base.IndexStyle(::Type{<:Column}) = Base.IndexLinear()
Base.IndexStyle(::Type{<:Column2}) = Base.IndexLinear()
# getindex definitions in tables.jl
# a Row "view" type for iterating `CSV.File`
struct Row{threaded} <: Tables.AbstractRow
names::Vector{Symbol}
columns::Vector{AbstractVector}
lookup::Dict{Symbol, AbstractVector}
row::Int64
array_index::Int64
array_i::Int64
end
getnames(r::Row) = getfield(r, :names)
getcolumn(r::Row, col::Int) = getfield(r, :columns)[col]
getcolumn(r::Row, col::Symbol) = getfield(r, :lookup)[col]
getrow(r::Row) = getfield(r, :row)
getarrayindex(r::Row) = getfield(r, :array_index)
getarrayi(r::Row) = getfield(r, :array_i)
Tables.columnnames(r::Row) = getnames(r)
# main structure when parsing an entire file and inferring column types
struct File{threaded} <: AbstractVector{Row{threaded}}
name::String
names::Vector{Symbol}
types::Vector{Type}
rows::Int64
cols::Int64
columns::Vector{Union{Column, Column2}}
lookup::Dict{Symbol, Union{Column, Column2}}
end
getname(f::File) = getfield(f, :name)
getnames(f::File) = getfield(f, :names)
gettypes(f::File) = getfield(f, :types)
getrows(f::File) = getfield(f, :rows)
getcols(f::File) = getfield(f, :cols)
getcolumns(f::File) = getfield(f, :columns)
getlookup(f::File) = getfield(f, :lookup)
getcolumn(f::File, col::Int) = getfield(f, :columns)[col]
getcolumn(f::File, col::Symbol) = getfield(f, :lookup)[col]
function Base.show(io::IO, f::File)
println(io, "CSV.File(\"$(getname(f))\"):")
println(io, "Size: $(getrows(f)) x $(getcols(f))")
show(io, Tables.schema(f))
end
Base.IndexStyle(::Type{File}) = Base.IndexLinear()
Base.eltype(f::File{threaded}) where {threaded} = Row{threaded}
Base.size(f::File) = (getrows(f),)
function allocate(rowsguess, ncols, typecodes)
tapes = Vector{UInt64}[Vector{UInt64}(undef, usermissing(typecodes[i]) ? 0 : rowsguess) for i = 1:ncols]
poslens = Vector{Vector{UInt64}}(undef, ncols)
for i = 1:ncols
if !user(typecodes[i])
poslens[i] = Vector{UInt64}(undef, rowsguess)
end
end
return tapes, poslens
end
"""
CSV.File(source; kwargs...) => CSV.File
Read a UTF-8 CSV input (a filename given as a String or FilePaths.jl type, or any other IO source), returning a `CSV.File` object.
Opens the file and uses passed arguments to detect the number of columns and column types, unless column types are provided
manually via the `types` keyword argument. Note that passing column types manually can increase performance and reduce the
memory use for each column type provided (column types can be given as a `Vector` for all columns, or specified per column via
name or index in a `Dict`). For text encodings other than UTF-8, see the [StringEncodings.jl](https://github.com/JuliaStrings/StringEncodings.jl)
package for re-encoding a file or IO stream.
The returned `CSV.File` object supports the [Tables.jl](https://github.com/JuliaData/Tables.jl) interface
and can iterate `CSV.Row`s. `CSV.Row` supports `propertynames` and `getproperty` to access individual row values. `CSV.File`
also supports entire column access like a `DataFrame` via direct property access on the file object, like `f = CSV.File(file); f.col1`.
Note that duplicate column names will be detected and adjusted to ensure uniqueness (duplicate column name `a` will become `a_1`).
For example, one could iterate over a csv file with column names `a`, `b`, and `c` by doing:
```julia
for row in CSV.File(file)
println("a=\$(row.a), b=\$(row.b), c=\$(row.c)")
end
```
By supporting the Tables.jl interface, a `CSV.File` can also be a table input to any other table sink function. Like:
```julia
# materialize a csv file as a DataFrame, without copying columns from CSV.File; these columns are read-only
df = CSV.File(file) |> DataFrame!
# load a csv file directly into an sqlite database table
db = SQLite.DB()
tbl = CSV.File(file) |> SQLite.load!(db, "sqlite_table")
```
Supported keyword arguments include:
* File layout options:
* `header=1`: the `header` argument can be an `Int`, indicating the row to parse for column names; or a `Range`, indicating a span of rows to be concatenated together as column names; or an entire `Vector{Symbol}` or `Vector{String}` to use as column names; if a file doesn't have column names, either provide them as a `Vector`, or set `header=0` or `header=false` and column names will be auto-generated (`Column1`, `Column2`, etc.)
* `normalizenames=false`: whether column names should be "normalized" into valid Julia identifier symbols; useful when iterating rows and accessing column values of a row via `getproperty` (e.g. `row.col1`)
* `datarow`: an `Int` argument to specify the row where the data starts in the csv file; by default, the next row after the `header` row is used. If `header=0`, then the 1st row is assumed to be the start of data
* `skipto::Int`: similar to `datarow`, specifies the number of rows to skip before starting to read data
* `footerskip::Int`: number of rows at the end of a file to skip parsing
* `limit`: an `Int` to indicate a limited number of rows to parse in a csv file; use in combination with `skipto` to read a specific, contiguous chunk within a file
* `transpose::Bool`: read a csv file "transposed", i.e. each column is parsed as a row
* `comment`: rows that begin with this `String` will be skipped while parsing
* `use_mmap::Bool=!Sys.iswindows()`: whether the file should be mmapped for reading, which in some cases can be faster
* `ignoreemptylines::Bool=false`: whether empty rows/lines in a file should be ignored (if `false`, each column will be assigned `missing` for that empty row)
* `threaded::Bool`: whether parsing should utilize multiple threads; by default threads are used on large enough files, but isn't allowed when `transpose=true` or when `limit` is used; only available in Julia 1.3+
* `select`: an `AbstractVector` of `Int`, `Symbol`, `String`, or `Bool`, or a "selector" function of the form `(i, name) -> keep::Bool`; only columns in the collection or for which the selector function returns `true` will be parsed and accessible in the resulting `CSV.File`. Invalid values in `select` are ignored.
* `drop`: inverse of `select`; an `AbstractVector` of `Int`, `Symbol`, `String`, or `Bool`, or a "drop" function of the form `(i, name) -> drop::Bool`; columns in the collection or for which the drop function returns `true` will ignored in the resulting `CSV.File`. Invalid values in `drop` are ignored.
* Parsing options:
* `missingstrings`, `missingstring`: either a `String`, or `Vector{String}` to use as sentinel values that will be parsed as `missing`; by default, only an empty field (two consecutive delimiters) is considered `missing`
* `delim=','`: a `Char` or `String` that indicates how columns are delimited in a file; if no argument is provided, parsing will try to detect the most consistent delimiter on the first 10 rows of the file
* `ignorerepeated::Bool=false`: whether repeated (consecutive) delimiters should be ignored while parsing; useful for fixed-width files with delimiter padding between cells
* `quotechar='"'`, `openquotechar`, `closequotechar`: a `Char` (or different start and end characters) that indicate a quoted field which may contain textual delimiters or newline characters
* `escapechar='"'`: the `Char` used to escape quote characters in a quoted field
* `dateformat::Union{String, Dates.DateFormat, Nothing}`: a date format string to indicate how Date/DateTime columns are formatted for the entire file
* `decimal='.'`: a `Char` indicating how decimals are separated in floats, i.e. `3.14` used '.', or `3,14` uses a comma ','
* `truestrings`, `falsestrings`: `Vectors of Strings` that indicate how `true` or `false` values are represented; by default only `true` and `false` are treated as `Bool`
* Column Type Options:
* `type`: a single type to use for parsing an entire file; i.e. all columns will be treated as the same type; useful for matrix-like data files
* `types`: a Vector or Dict of types to be used for column types; a Dict can map column index `Int`, or name `Symbol` or `String` to type for a column, i.e. Dict(1=>Float64) will set the first column as a Float64, Dict(:column1=>Float64) will set the column named column1 to Float64 and, Dict("column1"=>Float64) will set the column1 to Float64; if a `Vector` if provided, it must match the # of columns provided or detected in `header`
* `typemap::Dict{Type, Type}`: a mapping of a type that should be replaced in every instance with another type, i.e. `Dict(Float64=>String)` would change every detected `Float64` column to be parsed as `String`
* `pool::Union{Bool, Float64}=0.1`: if `true`, *all* columns detected as `String` will be internally pooled; alternatively, the proportion of unique values below which `String` columns should be pooled (by default 0.1, meaning that if the # of unique strings in a column is under 10%, it will be pooled)
* `categorical::Bool=false`: whether pooled columns should be copied as CategoricalArray instead of PooledArray; note that in `CSV.read`, by default, columns are not copied, so pooled columns will have type `CSV.Column{String, PooledString}`; to get `CategoricalArray` columns, also pass `copycols=true`
* `strict::Bool=false`: whether invalid values should throw a parsing error or be replaced with `missing`
* `silencewarnings::Bool=false`: if `strict=false`, whether invalid value warnings should be silenced
"""
function File(source;
# file options
# header can be a row number, range of rows, or actual string vector
header::Union{Integer, Vector{Symbol}, Vector{String}, AbstractVector{<:Integer}}=1,
normalizenames::Bool=false,
# by default, data starts immediately after header or start of file
datarow::Integer=-1,
skipto::Union{Nothing, Integer}=nothing,
footerskip::Integer=0,
limit::Integer=typemax(Int64),
transpose::Bool=false,
comment::Union{String, Nothing}=nothing,
use_mmap::Bool=!Sys.iswindows(),
ignoreemptylines::Bool=false,
threaded::Union{Bool, Nothing}=nothing,
select=nothing,
drop=nothing,
# parsing options
missingstrings=String[],
missingstring="",
delim::Union{Nothing, Char, String}=nothing,
ignorerepeated::Bool=false,
quotechar::Union{UInt8, Char}='"',
openquotechar::Union{UInt8, Char, Nothing}=nothing,
closequotechar::Union{UInt8, Char, Nothing}=nothing,
escapechar::Union{UInt8, Char}='"',
dateformat::Union{String, Dates.DateFormat, Nothing}=nothing,
dateformats::Union{AbstractDict, Nothing}=nothing,
decimal::Union{UInt8, Char}=UInt8('.'),
truestrings::Union{Vector{String}, Nothing}=["true", "True", "TRUE"],
falsestrings::Union{Vector{String}, Nothing}=["false", "False", "FALSE"],
# type options
type=nothing,
types=nothing,
typemap::Dict=Dict{TypeCode, TypeCode}(),
categorical::Union{Bool, Real}=false,
pool::Union{Bool, Real}=0.1,
strict::Bool=false,
silencewarnings::Bool=false,
debug::Bool=false,
parsingdebug::Bool=false,)
h = Header(source, header, normalizenames, datarow, skipto, footerskip, limit, transpose, comment, use_mmap, ignoreemptylines, threaded, select, drop, missingstrings, missingstring, delim, ignorerepeated, quotechar, openquotechar, closequotechar, escapechar, dateformat, dateformats, decimal, truestrings, falsestrings, type, types, typemap, categorical, pool, strict, silencewarnings, debug, parsingdebug, false)
rowsguess, ncols, buf, len, datapos, options, coloptions, positions, typecodes, pool, categorical = h.rowsguess, h.cols, h.buf, h.len, h.datapos, h.options, h.coloptions, h.positions, h.typecodes, h.pool, h.categorical
# determine if we can use threads while parsing
if threaded === nothing && VERSION >= v"1.3-DEV" && Threads.nthreads() > 1 && !transpose && limit == typemax(Int64) && rowsguess > Threads.nthreads() && (rowsguess * ncols) >= 5_000
threaded = true
elseif threaded === true
if VERSION < v"1.3-DEV"
@warn "incompatible julia version for `threaded=true`: $VERSION, requires >= v\"1.3\", setting `threaded=false`"
threaded = false
elseif transpose
@warn "`threaded=true` not supported on transposed files"
threaded = false
elseif limit != typemax(Int64)
@warn "`threaded=true` not supported when limiting # of rows"
threaded = false
end
end
# we now do our parsing pass over the file, starting at datapos
# `tapes` are Vector{UInt64} for each column, with length == # of estimated rows
# all types are treated as UInt64 for type stability while looping over columns and for making "recoding" more convenient
# (e.g. when we "promote" a column type from Int => Float64)
# we have a sentinel value for each value to signal a `missing` value; this sentinel value is tracked internally and adjusted
# automatically if an actual value of the sentinel value is parsed
# for strings, we have the following encoding of a single UInt64:
# leftmost bit indicates a sentinel value (`missing`) was detected while parsing
# 2nd leftmost bit indicates if a field was quoted and included escape chararacters (will have to be unescaped later)
# 42 bits for position (allows for maximum file size of ~4TB)
# 20 bits for field length (allows for maximum field size of ~1M)
# the `poslens` are also Vector{UInt64} allocated for each column where the type must be detected; it stores the "string value" UInt64
# of the cell, which allows promoting any column to a string later if needed
# if a column type if promoted to string, the values are stored in the corresponding `tape` instead of `poslen`
if threaded === true
# multithread
rows, tapes, refs, typecodes, intsentinels = multithreadparse(typecodes, buf, datapos, len, options, coloptions, rowsguess, pool, ncols, typemap, limit, debug)
finalrows = sum(rows)
else
intsentinels = fill(INT_SENTINEL, ncols)
tapes, poslens = allocate(rowsguess, ncols, typecodes)
refs = Vector{Dict{String, UInt64}}(undef, ncols)
lastrefs = zeros(UInt64, ncols)
t = Base.time()
rows, tapes, poslens = parsetape(Val(transpose), ncols, gettypecodes(typemap), tapes, poslens, buf, datapos, len, limit, positions, pool, refs, lastrefs, rowsguess, typecodes, intsentinels, debug, options, coloptions)
finalrows = rows
debug && println("time for initial parsing to tape: $(Base.time() - t)")
end
finaltypes = Type[gettype(T) for T in typecodes]
debug && println("types after parsing: $finaltypes, pool = $pool")
finalrefs = Vector{Union{Vector{String}, Nothing}}(undef, ncols)
if pool > 0.0
for i = 1:ncols
if isassigned(refs, i)
finalrefs[i] = map(x->x[1], sort!(collect(refs[i]), by=x->x[2]))
elseif typebits(typecodes[i]) == POOL
# case where user manually specified types, but no rows were parsed
# so the refs never got initialized; initialize them here to empty
finalrefs[i] = Vector{String}[]
else
finalrefs[i] = nothing
end
end
else
fill!(finalrefs, nothing)
end
if threaded === true
columns = Union{Column, Column2}[Column2{_eltype(finaltypes[i]), finaltypes[i]}([Column{_eltype(finaltypes[i]), finaltypes[i]}(tapes[j][i], rows[j], h.e, categorical, finalrefs[i], buf, finaltypes[i] >: Int64 ? uint64(intsentinels[i]) : sentinelvalue(Base.nonmissingtype(finaltypes[i]))) for j = 1:Threads.nthreads()], finalrows) for i = 1:ncols]
else
columns = Union{Column, Column2}[Column{_eltype(finaltypes[i]), finaltypes[i]}(tapes[i], rows, h.e, categorical, finalrefs[i], buf, finaltypes[i] >: Int64 ? uint64(intsentinels[i]) : sentinelvalue(Base.nonmissingtype(finaltypes[i]))) for i = 1:ncols]
end
deleteat!(h.names, h.todrop)
deleteat!(finaltypes, h.todrop)
ncols -= length(h.todrop)
deleteat!(columns, h.todrop)
lookup = Dict(k => v for (k, v) in zip(h.names, columns))
return File{something(threaded, false)}(h.name, h.names, finaltypes, finalrows, ncols, columns, lookup)
end
function multithreadparse(typecodes, buf, datapos, len, options, coloptions, rowsguess, pool, ncols, typemap, limit, debug)
N = Threads.nthreads()
chunksize = div(len - datapos, N)
ranges = [datapos, (datapos + chunksize * i for i = 1:N)...]
ranges[end] = len
debug && println("initial byte positions before adjusting for start of rows: $ranges")
findrowstarts!(buf, len, options, ranges, ncols)
rowchunkguess = cld(rowsguess, N)
debug && println("parsing using $N threads: $rowchunkguess rows chunked at positions: $ranges")
perthreadrows = Vector{Int}(undef, N)
perthreadtapes = Vector{Vector{Vector{UInt64}}}(undef, N)
perthreadposlens = Vector{Vector{Vector{UInt64}}}(undef, N)
perthreadrefs = Vector{Vector{Dict{String, UInt64}}}(undef, N)
perthreadlastrefs = Vector{Vector{UInt64}}(undef, N)
perthreadtypecodes = [copy(typecodes) for i = 1:N]
perthreadintsentinels = Vector{Vector{Int64}}(undef, N)
@sync for i = 1:N
@static if VERSION >= v"1.3-DEV"
Threads.@spawn begin
tt = Base.time()
tl_refs = Vector{Dict{String, UInt64}}(undef, ncols)
tl_lastrefs = zeros(UInt64, ncols)
tl_tapes, tl_poslens = allocate(rowchunkguess, ncols, typecodes)
tl_intsentinels = fill(INT_SENTINEL, ncols)
tl_datapos = ranges[i]
tl_len = ranges[i + 1] - (i != N)
tl_rows, tl_tapes, tl_poslens = parsetape(Val(false), ncols, gettypecodes(typemap), tl_tapes, tl_poslens, buf, tl_datapos, tl_len, limit, Int64[], pool, tl_refs, tl_lastrefs, rowchunkguess, perthreadtypecodes[i], tl_intsentinels, debug, options, coloptions)
debug && println("thread = $(Threads.threadid()): time for parsing: $(Base.time() - tt)")
perthreadrows[i] = tl_rows
perthreadtapes[i] = tl_tapes
perthreadposlens[i] = tl_poslens
perthreadrefs[i] = tl_refs
perthreadlastrefs[i] = tl_lastrefs
perthreadintsentinels[i] = tl_intsentinels
end
end # @static if VERSION >= v"1.3-DEV"
end
intsentinels = perthreadintsentinels[1]
anyintrecode = false
# promote typecodes from each thread
for col = 1:ncols
for i = 1:N
@inbounds typecodes[col] = promote_typecode(typecodes[col], perthreadtypecodes[i][col])
@inbounds if perthreadintsentinels[N][col] != INT_SENTINEL
intsentinels[col] = perthreadintsentinels[N][col]
anyintrecode = true
end
end
end
# if we need to recode any int column sentinels, we need to check that all the other threads
# don't already have the chosen int sentinel
if anyintrecode
for col = 1:ncols
while true
foundsent = false
intsent = uint64(intsentinels[col])
for i = 1:N
if uint64(perthreadintsentinels[i][col]) != intsent
tape = perthreadtapes[i][col]
for j = 1:perthreadrows[i]
@inbounds z = tape[j]
if z == intsent
foundsent = true
break
end
end
foundsent && break
end
end
if foundsent
intsentinels[col] = sentinelvalue(Int64)
else
break
end
end
end
end
# merge refs for pooled columns from each thread and recode if needed
# take care of any column promoting that needs to happen as well between threads
refs = Vector{Dict{String, UInt64}}(undef, ncols)
lastrefs = zeros(UInt64, ncols)
for i = 1:N
tlrefs = perthreadrefs[i]
tllastrefs = perthreadlastrefs[i]
tltypecodes = perthreadtypecodes[i]
tltapes = perthreadtapes[i]
tlposlens = perthreadposlens[i]
tlrows = perthreadrows[i]
for col = 1:ncols
@inbounds T = typecodes[col]
@inbounds TL = tltypecodes[col]
if T == MISSINGTYPE
unset!(tltapes, col, tlrows, 1)
tltapes[col] = UInt64[]
elseif !stringtype(TL) && stringtype(T)
# promoting non-string to string column
copyto!(tltapes[col], 1, tlposlens[col], 1, tlrows)
unset!(tlposlens, col, tlrows, 1)
elseif TL == MISSINGTYPE && T == (INT | MISSING)
fill!(tltapes[col], uint64(intsentinels[col]))
elseif TL == MISSINGTYPE && pooled(T)
fill!(tltapes[col], 0)
elseif TL == MISSINGTYPE && missingtype(T)
fill!(tltapes[col], sentinelvalue(TYPECODES[T & ~MISSING]))
elseif TL == INT && (T == FLOAT || T == (FLOAT | MISSING))
tape = tltapes[col]
for j = 1:tlrows
@inbounds tape[j] = uint64(Float64(int64(tape[j])))
end
elseif TL == (INT | MISSING) && T == (FLOAT | MISSING)
tape = tltapes[col]
intsent = intsentinels[col]
for j = 1:tlrows
@inbounds z = int64(tape[j])
@inbounds tape[j] = ifelse(z == intsent, sentinelvalue(Float64), uint64(Float64(z)))
end
elseif TL == (INT | MISSING) && T == (INT | MISSING)
# synchronize int sentinel if needed
if perthreadintsentinels[i][col] != intsentinels[col]
tape = tltapes[col]
newintsent = intsentinels[col]
oldintsent = perthreadintsentinels[i][col]
for j = 1:tlrows
@inbounds z = tape[j]
if z == oldintsent
@inbounds tape[j] = newintsent
end
end
end
elseif pooled(T)
# synchronize pooled refs from each thread
if !isassigned(refs, col)
refs[col] = tlrefs[col]
lastrefs[col] = tllastrefs[col]
else
refrecodes = collect(UInt64(0):tllastrefs[col])
colrefs = refs[col]
recode = false
for (k, v) in tlrefs[col]
refvalue = get(colrefs, k, UInt64(0))
if refvalue != v
recode = true
if refvalue == 0
refvalue = (lastrefs[col] += UInt64(1))
end
colrefs[k] = refvalue
refrecodes[v + 1] = refvalue
end
end
if recode
tape = tltapes[col]
for j = 1:tlrows
tape[j] = refrecodes[tape[j] + 1]
end
end
end
end
end
end
return perthreadrows, perthreadtapes, refs, typecodes, intsentinels
end
function parsetape(TR::Val{transpose}, ncols, typemap, tapes, poslens, buf, pos, len, limit, positions, pool, refs, lastrefs, rowsguess, typecodes, intsentinels, debug, options::Parsers.Options{ignorerepeated}, coloptions) where {transpose, ignorerepeated}
row = 0
startpos = pos
if pos <= len && len > 0
while row < limit
row += 1
pos = parserow(row, TR, ncols, typemap, tapes, poslens, buf, pos, len, limit, positions, pool, refs, lastrefs, rowsguess, typecodes, intsentinels, debug, options, coloptions)
pos > len && break
# if our initial row estimate was too few, we need to reallocate our tapes/poslens to read the rest of the file
if row + 1 > rowsguess
# (bytes left in file) / (avg bytes per row) == estimated rows left in file (+ 10 for kicks)
estimated_rows_left = ceil(Int64, (len - pos) / ((pos - startpos) / row) + 10.0)
newrowsguess = rowsguess + estimated_rows_left
debug && reallocatetape(row, rowsguess, newrowsguess)
newtapes = Vector{Vector{UInt64}}(undef, ncols)
newposlens = Vector{Vector{UInt64}}(undef, ncols)
for i = 1:ncols
if usermissing(typecodes[i])
newtapes[i] = tapes[i]
else
newtapes[i] = Mmap.mmap(Vector{UInt64}, newrowsguess)
copyto!(newtapes[i], 1, tapes[i], 1, row)
# safe to finalize, even in multithreaded, each thread has it's own set of tapes/poslens
unset!(tapes, i, row, 5)
if isassigned(poslens, i)
newposlens[i] = Mmap.mmap(Vector{UInt64}, newrowsguess)
copyto!(newposlens[i], 1, poslens[i], 1, row)
unset!(poslens, i, row, 6)
end
end
end
tapes = newtapes
poslens = newposlens
rowsguess = newrowsguess
end
end
end
return row, tapes, poslens
end
@noinline reallocatetape(row, old, new) = println("thread = $(Threads.threadid()) warning: didn't pre-allocate enough tape while parsing on row $row, re-allocating from $old to $new...")
@noinline notenoughcolumns(cols, ncols, row) = println("thread = $(Threads.threadid()) warning: only found $cols / $ncols columns on data row: $row. Filling remaining columns with `missing`")
@noinline toomanycolumns(cols, row) = println("thread = $(Threads.threadid()) warning: parsed expected $cols columns, but didn't reach end of line on data row: $row. Ignoring any extra columns on this row")
@noinline stricterror(T, buf, pos, len, code, row, col) = throw(Error("thread = $(Threads.threadid()) error parsing $T on row = $row, col = $col: \"$(String(buf[pos:pos+len-1]))\", error=$(Parsers.codes(code))"))
@noinline warning(T, buf, pos, len, code, row, col) = println("thread = $(Threads.threadid()) warning: error parsing $T on row = $row, col = $col: \"$(String(buf[pos:pos+len-1]))\", error=$(Parsers.codes(code))")
@noinline fatalerror(buf, pos, len, code, row, col) = throw(Error("thread = $(Threads.threadid()) fatal error, encountered an invalidly quoted field while parsing on row = $row, col = $col: \"$(String(buf[pos:pos+len-1]))\", error=$(Parsers.codes(code)), check your `quotechar` arguments or manually fix the field in the file itself"))
@inline function parserow(row, ::Val{transpose}, ncols, typemap, tapes, poslens, buf, pos, len, limit, positions, pool, refs, lastrefs, rowsguess, typecodes, intsentinels, debug, options::Parsers.Options{ignorerepeated}, coloptions) where {transpose, ignorerepeated}
for col = 1:ncols
if transpose
@inbounds pos = positions[col]
end
@inbounds T = typecodes[col]
@inbounds tape = tapes[col]
type = typebits(T)
opts = coloptions === nothing ? options : coloptions[col]
if usermissing(T)
pos, code = parsemissing!(buf, pos, len, opts, row, col)
elseif type === EMPTY
pos, code = detect(tape, buf, pos, len, opts, row, col, typemap, pool, refs, lastrefs, intsentinels, debug, typecodes, poslens)
elseif type === MISSINGTYPE
pos, code = detect(tape, buf, pos, len, opts, row, col, typemap, pool, refs, lastrefs, intsentinels, debug, typecodes, poslens)
elseif type === INT
pos, code = parseint!(T, tape, buf, pos, len, opts, row, col, typecodes, poslens, intsentinels)
elseif type === FLOAT
pos, code = parsevalue!(Float64, T, tape, buf, pos, len, opts, row, col, typecodes, poslens)
elseif type === DATE
pos, code = parsevalue!(Date, T, tape, buf, pos, len, opts, row, col, typecodes, poslens)
elseif type === DATETIME
pos, code = parsevalue!(DateTime, T, tape, buf, pos, len, opts, row, col, typecodes, poslens)
elseif type === TIME
pos, code = parsevalue!(Time, T, tape, buf, pos, len, opts, row, col, typecodes, poslens)
elseif type === BOOL
pos, code = parsevalue!(Bool, T, tape, buf, pos, len, opts, row, col, typecodes, poslens)
elseif type === POOL
pos, code = parsepooled!(T, tape, buf, pos, len, opts, row, col, rowsguess, pool, refs, lastrefs, typecodes, poslens)
else # STRING
pos, code = parsestring!(T, tape, buf, pos, len, opts, row, col, typecodes)
end
if transpose
@inbounds positions[col] = pos
else
if col < ncols
if Parsers.newline(code) || pos > len
options.silencewarnings || notenoughcolumns(col, ncols, row)
for j = (col + 1):ncols
# put in dummy missing values on the tape for missing columns
if !usermissing(typecodes[j])
@inbounds tape = tapes[j]
T = typebits(typecodes[j])
tape[row] = T == POOL ? 0 : T == INT ? uint64(intsentinels[j]) : sentinelvalue(TYPECODES[T])
if isassigned(poslens, j)
setposlen!(poslens[j], row, Parsers.SENTINEL, pos, UInt64(0))
end
if T > MISSINGTYPE
typecodes[j] |= MISSING
end
end
end
break # from for col = 1:ncols
end
else
if pos <= len && !Parsers.newline(code)
options.silencewarnings || toomanycolumns(ncols, row)
# ignore the rest of the line
pos = skiptorow(buf, pos, len, options.oq, options.e, options.cq, 1, 2)
end
end
end
end
return pos
end
@inline function setposlen!(tape, row, code, pos, len)
pos = Core.bitcast(UInt64, pos) << 20
pos |= ifelse(Parsers.sentinel(code), MISSING_BIT, UInt64(0))
pos |= ifelse(Parsers.escapedstring(code), ESCAPE_BIT, UInt64(0))
@inbounds tape[row] = pos | Core.bitcast(UInt64, len)
return
end
function detect(tape, buf, pos, len, options, row, col, typemap, pool, refs, lastrefs, intsentinels, debug, typecodes, poslens)
int, code, vpos, vlen, tlen = Parsers.xparse(Int64, buf, pos, len, options)
if Parsers.invalidquotedfield(code)
fatalerror(buf, pos, tlen, code, row, col)
end
if Parsers.sentinel(code) && code > 0
@inbounds setposlen!(poslens[col], row, code, vpos, vlen)
if typecodes[col] == EMPTY
@inbounds typecodes[col] = MISSINGTYPE
end
# return; parsing will continue to detect until a non-missing value is parsed
@goto finaldone
end
if Parsers.ok(code) && !haskey(typemap, INT)
@inbounds setposlen!(poslens[col], row, code, vpos, vlen)
@inbounds tape[row] = uint64(int)
if int == intsentinels[col]
intsentinels[col] = sentinelvalue(Int64)
end
newT = INT
@goto done
end
float, code, vpos, vlen, tlen = Parsers.xparse(Float64, buf, pos, len, options)
if Parsers.ok(code) && !haskey(typemap, FLOAT)
@inbounds setposlen!(poslens[col], row, code, vpos, vlen)
@inbounds tape[row] = uint64(float)
newT = FLOAT
@goto done
end
if options.dateformat === nothing
try
date, code, vpos, vlen, tlen = Parsers.xparse(Date, buf, pos, len, options)
if Parsers.ok(code) && !haskey(typemap, DATE)
@inbounds setposlen!(poslens[col], row, code, vpos, vlen)
@inbounds tape[row] = uint64(date)
newT = DATE
@goto done
end
catch e
end
try
datetime, code, vpos, vlen, tlen = Parsers.xparse(DateTime, buf, pos, len, options)
if Parsers.ok(code) && !haskey(typemap, DATETIME)
@inbounds setposlen!(poslens[col], row, code, vpos, vlen)
@inbounds tape[row] = uint64(datetime)
newT = DATETIME
@goto done
end
catch e
end
try
time, code, vpos, vlen, tlen = Parsers.xparse(Time, buf, pos, len, options)
if Parsers.ok(code) && !haskey(typemap, TIME)
@inbounds setposlen!(poslens[col], row, code, vpos, vlen)
@inbounds tape[row] = uint64(time)
newT = TIME
@goto done
end
catch e
end
else
try
# use user-provided dateformat
DT = timetype(options.dateformat)
dt, code, vpos, vlen, tlen = Parsers.xparse(DT, buf, pos, len, options)
if Parsers.ok(code)
@inbounds setposlen!(poslens[col], row, code, vpos, vlen)
@inbounds tape[row] = uint64(dt)
newT = DT == Date ? DATE : DT == DateTime ? DATETIME : TIME
@goto done
end
catch e
end
end
bool, code, vpos, vlen, tlen = Parsers.xparse(Bool, buf, pos, len, options)
if Parsers.ok(code) && !haskey(typemap, BOOL)
@inbounds setposlen!(poslens[col], row, code, vpos, vlen)
@inbounds tape[row] = uint64(bool)
newT = BOOL
@goto done
end
_, code, vpos, vlen, tlen = Parsers.xparse(String, buf, pos, len, options)
setposlen!(tape, row, code, vpos, vlen)
if pool > 0.0
r = Dict{String, UInt64}()
@inbounds refs[col] = r
ref = getref!(r, PointerString(pointer(buf, vpos), vlen), lastrefs, col, code, options)
@inbounds poslens[col][row] = tape[row]
@inbounds tape[row] = ref
newT = POOL
else
newT = STRING
end
@label done
# if we're here, that means we found a non-missing value, so we need to update typecodes
if typecodes[col] == MISSINGTYPE
# we previously parsed missing values for this column before discovering a non-missing value,
# so now we fill in the tape w/ the appropriate type-specific sentinel value
if newT == STRING
# for strings, we just want to set the tape values to the poslens
copyto!(tape, 1, poslens[col], 1, row - 1)
unset!(poslens, col, row, 1)
elseif newT == POOL
for i = 1:(row - 1)
@inbounds tape[i] = 0
end
elseif newT == INT
intsent = uint64(intsentinels[col])
for i = 1:(row - 1)
@inbounds tape[i] = intsent
end
else
sent = sentinelvalue(TYPECODES[newT])
for i = 1:(row - 1)
@inbounds tape[i] = sent
end
end
@inbounds typecodes[col] = newT | MISSING
else
@inbounds typecodes[col] = newT
end
@label finaldone
return pos + tlen, code
end
function parseint!(T, tape, buf, pos, len, options, row, col, typecodes, poslens, intsentinels)
x, code, vpos, vlen, tlen = Parsers.xparse(Int64, buf, pos, len, options)
if code > 0
if !Parsers.sentinel(code)
@inbounds tape[row] = uint64(x)
@inbounds if missingtype(T) && x == intsentinels[col]
oldintsent = uint64(intsentinels[col])
newintsent = uint64(sentinelvalue(Int64))
while true
foundnewsent = false
for i = 1:(row - 1)
@inbounds z = tape[i]
if z == newintsent
foundnewsent = true
break
end
end
!foundnewsent && break
newintsent = uint64(sentinelvalue(Int64))
end
intsentinels[col] = int64(newintsent)
for i = 1:(row - 1)
@inbounds z = tape[i]
if z == oldintsent
tape[i] = newintsent
end
end
end
else
@inbounds typecodes[col] = T | MISSING
@inbounds tape[row] = uint64(intsentinels[col])
end
if !user(T)
@inbounds setposlen!(poslens[col], row, code, vpos, vlen)
end
else
if Parsers.invalidquotedfield(code)
# this usually means parsing is borked because of an invalidly quoted field, hard error
fatalerror(buf, pos, tlen, code, row, col)
end
if user(T)
if !options.strict
options.silencewarnings || warning(Int64, buf, pos, tlen, code, row, col)
@inbounds typecodes[col] = T | MISSING
@inbounds tape[row] = uint64(intsentinels[col])
else
stricterror(Int64, buf, pos, tlen, code, row, col)
end
else
y, code, vpos, vlen, tlen = Parsers.xparse(Float64, buf, pos, len, options)
if code > 0
# recode past Int64 values
intsent = uint64(intsentinels[col])
for i = 1:(row - 1)
@inbounds z = tape[i]
@inbounds tape[i] = ifelse(z == intsent, sentinelvalue(Float64), uint64(Float64(int64(z))))
end
@inbounds tape[row] = uint64(y)
@inbounds typecodes[col] = ifelse(missingtype(T), FLOAT | MISSING, FLOAT)
@inbounds setposlen!(poslens[col], row, code, vpos, vlen)
else
_, code, vpos, vlen, tlen = Parsers.xparse(String, buf, pos, len, options)
# recode tape w/ poslen
copyto!(tape, 1, poslens[col], 1, row - 1)
unset!(poslens, col, row, 2)
setposlen!(tape, row, code, vpos, vlen)
@inbounds typecodes[col] = ifelse(missingtype(T), STRING | MISSING, STRING)
end
end
end
return pos + tlen, code
end
function parsevalue!(::Type{type}, T, tape, buf, pos, len, options, row, col, typecodes, poslens) where {type}
x, code, vpos, vlen, tlen = Parsers.xparse(type, buf, pos, len, options)
if code > 0
if !Parsers.sentinel(code)
@inbounds tape[row] = uint64(x)
else
@inbounds typecodes[col] = T | MISSING
@inbounds tape[row] = sentinelvalue(type)
end
if !user(T)
@inbounds setposlen!(poslens[col], row, code, vpos, vlen)
end
else
if Parsers.invalidquotedfield(code)
# this usually means parsing is borked because of an invalidly quoted field, hard error
fatalerror(buf, pos, tlen, code, row, col)
end
if user(T)
if !options.strict
code |= Parsers.SENTINEL
options.silencewarnings || warning(type, buf, pos, tlen, code, row, col)
@inbounds typecodes[col] = T | MISSING
@inbounds tape[row] = sentinelvalue(type)
else
stricterror(type, buf, pos, tlen, code, row, col)
end
else
_, code, vpos, vlen, tlen = Parsers.xparse(String, buf, pos, len, options)
# recode tape w/ poslen
copyto!(tape, 1, poslens[col], 1, row - 1)
unset!(poslens, col, row, 3)
setposlen!(tape, row, code, vpos, vlen)
@inbounds typecodes[col] = ifelse(missingtype(T), STRING | MISSING, STRING)
end
end
return pos + tlen, code
end
@inline function parsestring!(T, tape, buf, pos, len, options, row, col, typecodes)
x, code, vpos, vlen, tlen = Parsers.xparse(String, buf, pos, len, options)
setposlen!(tape, row, code, vpos, vlen)
if Parsers.invalidquotedfield(code)
# this usually means parsing is borked because of an invalidly quoted field, hard error
fatalerror(buf, pos, tlen, code, row, col)
end
if Parsers.sentinel(code)
@inbounds typecodes[col] = STRING | MISSING
end
return pos + tlen, code
end
function parsemissing!(buf, pos, len, options, row, col)
x, code, vpos, vlen, tlen = Parsers.xparse(String, buf, pos, len, options)
if Parsers.invalidquotedfield(code)
# this usually means parsing is borked because of an invalidly quoted field, hard error
fatalerror(buf, pos, tlen, code, row, col)
end
return pos + tlen, code
end
function getref!(x::Dict, key::PointerString, lastrefs, col, code, options)
if Parsers.escapedstring(code)
key2 = unescape(key, options.e)
index = Base.ht_keyindex2!(x, key2)
else
index = Base.ht_keyindex2!(x, key)
end
if index > 0
@inbounds found_key = x.vals[index]
return found_key::UInt64
else
@inbounds new = (lastrefs[col] += UInt64(1))
@inbounds Base._setindex!(x, new, Parsers.escapedstring(code) ? key2 : String(key), -index)
return new
end
end
function parsepooled!(T, tape, buf, pos, len, options, row, col, rowsguess, pool, refs, lastrefs, typecodes, poslens)
x, code, vpos, vlen, tlen = Parsers.xparse(String, buf, pos, len, options)
if Parsers.invalidquotedfield(code)
# this usually means parsing is borked because of an invalidly quoted field, hard error
fatalerror(buf, pos, tlen, code, row, col)
end
if !isassigned(refs, col)
r = Dict{String, UInt64}()
@inbounds refs[col] = r
else
@inbounds r = refs[col]
end
if Parsers.sentinel(code)
T |= MISSING
@inbounds typecodes[col] = T
ref = UInt64(0)
else
ref = getref!(r, PointerString(pointer(buf, vpos), vlen), lastrefs, col, code, options)
end
if !user(T) && (length(refs[col]) / rowsguess) > pool
# promote to string
copyto!(tape, 1, poslens[col], 1, row - 1)
unset!(poslens, col, row, 4)
setposlen!(tape, row, code, vpos, vlen)
@inbounds typecodes[col] = ifelse(missingtype(T), STRING | MISSING, STRING)
else
if !user(T)
@inbounds setposlen!(poslens[col], row, code, vpos, vlen)
end
@inbounds tape[row] = ref
end
return pos + tlen, code
end