Skip to content

Commit

Permalink
ARROW-3856: [Ruby] Support compressed CSV save/load
Browse files Browse the repository at this point in the history
Author: Kouhei Sutou <[email protected]>

Closes apache#3015 from kou/ruby-csv-compressed and squashes the following commits:

b3099ca <Kouhei Sutou>  Support compressed CSV save/load
  • Loading branch information
kou authored and xhochy committed Nov 22, 2018
1 parent eaf8d32 commit 80238f2
Show file tree
Hide file tree
Showing 11 changed files with 256 additions and 60 deletions.
37 changes: 37 additions & 0 deletions ruby/red-arrow/lib/arrow/compression-type.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

module Arrow
class CompressionType
EXTENSIONS = {}
values.each do |value|
case value
when UNCOMPRESSED
when GZIP
EXTENSIONS["gz"] = value
else
EXTENSIONS[value.nick] = value
end
end

class << self
def resolve_extension(extension)
EXTENSIONS[extension.to_s]
end
end
end
end
24 changes: 20 additions & 4 deletions ruby/red-arrow/lib/arrow/csv-loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def load(path_or_data, **options)
def initialize(path_or_data, **options)
@path_or_data = path_or_data
@options = options
@compression = @options.delete(:compression)
end

def load
Expand Down Expand Up @@ -115,12 +116,25 @@ def reader_options
options
end

def open_input(raw_input)
if @compression
codec = Codec.new(@compression)
CompressedInputStream.open(codec, raw_input) do |input|
yield(input)
end
else
yield(raw_input)
end
end

def load_from_path(path)
options = reader_options
if options
begin
MemoryMappedInputStream.open(path.to_s) do |input|
return CSVReader.new(input, options).read
MemoryMappedInputStream.open(path.to_s) do |raw_input|
open_input(raw_input) do |input|
return CSVReader.new(input, options).read
end
end
rescue Arrow::Error::Invalid
end
Expand All @@ -136,8 +150,10 @@ def load_data(data)
options = reader_options
if options
begin
BufferInputStream.open(Buffer.new(data)) do |input|
return CSVReader.new(input, options).read
BufferInputStream.open(Buffer.new(data)) do |raw_input|
open_input(raw_input) do |input|
return CSVReader.new(input, options).read
end
end
rescue Arrow::Error::Invalid
end
Expand Down
8 changes: 5 additions & 3 deletions ruby/red-arrow/lib/arrow/loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def require_libraries
require "arrow/array-builder"
require "arrow/chunked-array"
require "arrow/column"
require "arrow/compression-type"
require "arrow/csv-loader"
require "arrow/csv-read-options"
require "arrow/data-type"
Expand All @@ -43,8 +44,11 @@ def require_libraries
require "arrow/date64-array"
require "arrow/date64-array-builder"
require "arrow/field"
require "arrow/path-extension"
require "arrow/record"
require "arrow/record-batch"
require "arrow/record-batch-file-reader"
require "arrow/record-batch-stream-reader"
require "arrow/rolling-window"
require "arrow/schema"
require "arrow/slicer"
Expand All @@ -58,9 +62,7 @@ def require_libraries
require "arrow/tensor"
require "arrow/timestamp-array"
require "arrow/timestamp-array-builder"

require "arrow/record-batch-file-reader"
require "arrow/record-batch-stream-reader"
require "arrow/writable"
end

def load_object_info(info)
Expand Down
45 changes: 45 additions & 0 deletions ruby/red-arrow/lib/arrow/path-extension.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

module Arrow
class PathExtension
def initialize(path)
@path = path
end

def extract
basename = ::File.basename(@path)
components = basename.split(".")
return {} if components.size == 1

extension = components.last.downcase
if components.size > 2
compression = CompressionType.resolve_extension(extension)
if compression
{
format: components[-2].downcase,
compression: compression,
}
else
{format: extension}
end
else
{format: extension}
end
end
end
end
60 changes: 37 additions & 23 deletions ruby/red-arrow/lib/arrow/table-loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ def load(path, options={})
end

def initialize(path, options={})
path = path.to_path if path.respond_to?(:to_path)
@path = path
@options = options
fill_options
end

def load
path = @path
path = path.to_path if path.respond_to?(:to_path)
format = @options[:format] || guess_format(path) || :arrow

format = @options[:format]
custom_load_method = "load_as_#{format}"
unless respond_to?(custom_load_method, true)
available_formats = []
Expand All @@ -47,17 +46,32 @@ def load
message << "]: #{format.inspect}"
raise ArgumentError, message
end
__send__(custom_load_method, path)
if method(custom_load_method).arity.zero?
__send__(custom_load_method)
else
# For backward compatibility.
__send__(custom_load_method, @path)
end
end

private
def guess_format(path)
extension = ::File.extname(path).gsub(/\A\./, "").downcase
return nil if extension.empty?

return extension if respond_to?("load_as_#{extension}", true)
def fill_options
if @options[:format] and @options.key?(:compression)
return
end

nil
extension = PathExtension.new(@path)
info = extension.extract
format = info[:format]
@options = @options.dup
if respond_to?("load_as_#{format}", true)
@options[:format] ||= format.to_sym
else
@options[:format] ||= :arrow
end
unless @options.key?(:compression)
@options[:compression] = info[:compression]
end
end

def load_raw(input, reader)
Expand All @@ -77,7 +91,7 @@ def load_raw(input, reader)
table
end

def load_as_arrow(path)
def load_as_arrow
input = nil
reader = nil
error = nil
Expand All @@ -86,7 +100,7 @@ def load_as_arrow(path)
RecordBatchStreamReader,
]
reader_class_candidates.each do |reader_class_candidate|
input = MemoryMappedInputStream.new(path)
input = MemoryMappedInputStream.new(@path)
begin
reader = reader_class_candidate.new(input)
rescue Arrow::Error
Expand All @@ -99,21 +113,21 @@ def load_as_arrow(path)
load_raw(input, reader)
end

def load_as_batch(path)
input = MemoryMappedInputStream.new(path)
def load_as_batch
input = MemoryMappedInputStream.new(@path)
reader = RecordBatchFileReader.new(input)
load_raw(input, reader)
end

def load_as_stream(path)
input = MemoryMappedInputStream.new(path)
def load_as_stream
input = MemoryMappedInputStream.new(@path)
reader = RecordBatchStreamReader.new(input)
load_raw(input, reader)
end

if Arrow.const_defined?(:ORCFileReader)
def load_as_orc(path)
input = MemoryMappedInputStream.new(path)
def load_as_orc
input = MemoryMappedInputStream.new(@path)
reader = ORCFileReader.new(input)
field_indexes = @options[:field_indexes]
reader.set_field_indexes(field_indexes) if field_indexes
Expand All @@ -123,14 +137,14 @@ def load_as_orc(path)
end
end

def load_as_csv(path)
def load_as_csv
options = @options.dup
options.delete(:format)
CSVLoader.load(Pathname.new(path), options)
CSVLoader.load(Pathname.new(@path), options)
end

def load_as_feather(path)
input = MemoryMappedInputStream.new(path)
def load_as_feather
input = MemoryMappedInputStream.new(@path)
reader = FeatherFileReader.new(input)
table = reader.read
table.instance_variable_set(:@input, input)
Expand Down
Loading

0 comments on commit 80238f2

Please sign in to comment.