From d78a22f94c7413a093b0a2a20087131f1e6e85ad Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Mon, 26 Feb 2018 18:26:38 -0800 Subject: [PATCH] [DataFrame] Implement IO for ray_df (#1599) * Add parquet-cpp to gitignore * Add read_csv and read_parquet * Gitignore pytest_cache * Fix flake8 * Add io to __init__ * Changing Index. Currently running tests, but so far untested. * Removing issue of reassigning DF in from_pandas * Fixing lint * Fix bug * Fix bug * Fix bug * Better performance * Fixing index issue with sum * Address comments * Update io with index * Updating performance and implementation. Adding tests * Fixing off-by-1 * Fix lint * Address Comments * Make pop compatible with new to_pandas * Format Code * Cleanup some index issue * Bug fix: assigned reset_index back * Remove unused debug line --- .gitignore | 4 + python/ray/dataframe/__init__.py | 28 ++- python/ray/dataframe/dataframe.py | 59 ++++-- python/ray/dataframe/io.py | 262 +++++++++++++++++++++++++++ python/ray/dataframe/test/test_io.py | 91 ++++++++++ 5 files changed, 427 insertions(+), 17 deletions(-) create mode 100644 python/ray/dataframe/io.py create mode 100644 python/ray/dataframe/test/test_io.py diff --git a/.gitignore b/.gitignore index efdeba1bb258..924bbe4c7684 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ /src/thirdparty/boost_1_60_0/ /src/thirdparty/catapult/ /src/thirdparty/flatbuffers/ +/src/thirdparty/parquet-cpp # Files generated by flatc should be ignored /src/common/format/*.py @@ -137,3 +138,6 @@ build /site/Gemfile.lock /site/.sass-cache /site/_site + +# Pytest Cache +**/.pytest_cache diff --git a/python/ray/dataframe/__init__.py b/python/ray/dataframe/__init__.py index 8f315be6ad07..6ba12b91ab32 100644 --- a/python/ray/dataframe/__init__.py +++ b/python/ray/dataframe/__init__.py @@ -2,9 +2,27 @@ from __future__ import division from __future__ import print_function -from .dataframe import DataFrame -from .dataframe import from_pandas -from .dataframe import to_pandas -from .series import Series +DEFAULT_NPARTITIONS = 10 -__all__ = ["DataFrame", "from_pandas", "to_pandas", "Series"] + +def set_npartition_default(n): + global DEFAULT_NPARTITIONS + DEFAULT_NPARTITIONS = n + + +def get_npartitions(): + return DEFAULT_NPARTITIONS + + +# We import these file after above two function +# because they depend on npartitions. +from .dataframe import DataFrame # noqa: 402 +from .dataframe import from_pandas # noqa: 402 +from .dataframe import to_pandas # noqa: 402 +from .series import Series # noqa: 402 +from .io import (read_csv, read_parquet) # noqa: 402 + +__all__ = [ + "DataFrame", "from_pandas", "to_pandas", "Series", "read_csv", + "read_parquet" +] diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 0db5a4e4031b..be1d3b9f40ed 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -373,16 +373,29 @@ def transpose(self, *args, **kwargs): temp_index = [idx for _ in range(len(self._df)) for idx in self.columns] - temp_columns = self.index local_transpose = self._map_partitions( lambda df: df.transpose(*args, **kwargs), index=temp_index) local_transpose.columns = temp_columns # Sum will collapse the NAs from the groupby - return local_transpose.reduce_by_index( + df = local_transpose.reduce_by_index( lambda df: df.apply(lambda x: x), axis=1) + # Reassign the columns within partition to self.index. + # We have to use _depoly_func instead of _map_partition due to + # new_labels argument + def _reassign_columns(df, new_labels): + df.columns = new_labels + return df + df._df = [ + _deploy_func.remote( + _reassign_columns, + part, + self.index) for part in df._df] + + return df + T = property(transpose) def dropna(self, axis, how, thresh=None, subset=[], inplace=False): @@ -563,9 +576,15 @@ def count(self, axis=0, level=None, numeric_only=False): for _ in range(len(self._df)) for idx in self.columns] - return sum(ray.get(self._map_partitions(lambda df: df.count( - axis=axis, level=level, numeric_only=numeric_only - ), index=temp_index)._df)) + collapsed_df = sum( + ray.get( + self._map_partitions( + lambda df: df.count( + axis=axis, + level=level, + numeric_only=numeric_only), + index=temp_index)._df)) + return collapsed_df def cov(self, min_periods=None): raise NotImplementedError("Not Yet implemented.") @@ -865,7 +884,9 @@ def iterrows(self): iters = ray.get([ _deploy_func.remote( lambda df: list(df.iterrows()), part) for part in self._df]) - return itertools.chain.from_iterable(iters) + iters = itertools.chain.from_iterable(iters) + series = map(lambda idx_series_tuple: idx_series_tuple[1], iters) + return zip(self.index, series) def items(self): """Iterator over (column name, Series) pairs. @@ -884,6 +905,7 @@ def items(self): def concat_iters(iterables): for partitions in zip(*iterables): series = pd.concat([_series for _, _series in partitions]) + series.index = self.index yield (series.name, series) return concat_iters(iters) @@ -919,7 +941,20 @@ def itertuples(self, index=True, name='Pandas'): _deploy_func.remote( lambda df: list(df.itertuples(index=index, name=name)), part) for part in self._df]) - return itertools.chain.from_iterable(iters) + iters = itertools.chain.from_iterable(iters) + + def _replace_index(row_tuple, idx): + # We need to use try-except here because + # isinstance(row_tuple, namedtuple) won't work. + try: + row_tuple = row_tuple._replace(Index=idx) + except AttributeError: # Tuple not namedtuple + row_tuple = (idx,) + row_tuple[1:] + return row_tuple + + if index: + iters = itertools.starmap(_replace_index, zip(iters, self.index)) + return iters def join(self, other, on=None, how='left', lsuffix='', rsuffix='', sort=False): @@ -1100,8 +1135,7 @@ def pop(self, item): popped = to_pandas(self._map_partitions( lambda df: df.pop(item))) self._df = self._map_partitions(lambda df: df.drop([item], axis=1))._df - self.columns = [col for col in self.columns if col != item] - + self.columns = self.columns.drop(item) return popped def pow(self, other, axis='columns', level=None, fill_value=None): @@ -1949,13 +1983,14 @@ def from_pandas(df, npartitions=None, chunksize=None, sort=True): while len(temp_df) > chunksize: t_df = temp_df[:chunksize] lengths.append(len(t_df)) - # reindex here because we want a pd.RangeIndex within the partitions. - # It is smaller and sometimes faster. - t_df.reindex() + # reset_index here because we want a pd.RangeIndex + # within the partitions. It is smaller and sometimes faster. + t_df = t_df.reset_index(drop=True) top = ray.put(t_df) dataframes.append(top) temp_df = temp_df[chunksize:] else: + temp_df = temp_df.reset_index(drop=True) dataframes.append(ray.put(temp_df)) lengths.append(len(temp_df)) diff --git a/python/ray/dataframe/io.py b/python/ray/dataframe/io.py new file mode 100644 index 000000000000..7fa49ebb242b --- /dev/null +++ b/python/ray/dataframe/io.py @@ -0,0 +1,262 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from itertools import chain +from io import BytesIO +import os +import re + +from pyarrow.parquet import ParquetFile +import pandas as pd + +from .dataframe import ray, DataFrame +from . import get_npartitions + + +# Parquet +def read_parquet(path, engine='auto', columns=None, **kwargs): + """Load a parquet object from the file path, returning a DataFrame. + Ray DataFrame only supports pyarrow engine for now. + + Args: + path: The filepath of the parquet file. + We only support local files for now. + engine: Ray only support pyarrow reader. + This argument doesn't do anything for now. + kwargs: Pass into parquet's read_row_group function. + """ + pf = ParquetFile(path) + + n_rows = pf.metadata.num_rows + chunksize = n_rows // get_npartitions() + n_row_groups = pf.metadata.num_row_groups + + idx_regex = re.compile('__index_level_\d+__') + columns = [ + name for name in pf.metadata.schema.names if not idx_regex.match(name) + ] + + df_from_row_groups = [ + _read_parquet_row_group.remote(path, columns, i, kwargs) + for i in range(n_row_groups) + ] + splited_dfs = ray.get( + [_split_df.remote(df, chunksize) for df in df_from_row_groups]) + df_remotes = list(chain.from_iterable(splited_dfs)) + + return DataFrame(df_remotes, columns) + + +@ray.remote +def _read_parquet_row_group(path, columns, row_group_id, kwargs={}): + """Read a parquet row_group given file_path. + """ + pf = ParquetFile(path) + df = pf.read_row_group(row_group_id, columns=columns, **kwargs).to_pandas() + return df + + +@ray.remote +def _split_df(pd_df, chunksize): + """Split a pd_df into partitions. + + Returns: + remote_df_ids ([ObjectID]) + """ + dataframes = [] + + while len(pd_df) > chunksize: + t_df = pd_df[:chunksize] + t_df.reset_index(drop=True) + top = ray.put(t_df) + dataframes.append(top) + pd_df = pd_df[chunksize:] + else: + pd_df = pd_df.reset_index(drop=True) + dataframes.append(ray.put(pd_df)) + + return dataframes + + +# CSV +def _compute_offset(fn, npartitions): + """ + Calculate the currect bytes offsets for a csv file. + Return a list of (start, end) tuple where the end == \n or EOF. + """ + total_bytes = os.path.getsize(fn) + chunksize = total_bytes // npartitions + if chunksize == 0: + chunksize = 1 + + bio = open(fn, 'rb') + + offsets = [] + start = 0 + while start <= total_bytes: + bio.seek(chunksize, 1) # Move forward {chunksize} bytes + extend_line = bio.readline() # Move after the next \n + total_offset = chunksize + len(extend_line) + # The position of the \n we just crossed. + new_line_cursor = start + total_offset - 1 + offsets.append((start, new_line_cursor)) + start = new_line_cursor + 1 + + bio.close() + return offsets + + +def _get_firstline(file_path): + bio = open(file_path, 'rb') + first = bio.readline() + bio.close() + return first + + +def _infer_column(first_line): + return pd.read_csv(BytesIO(first_line)).columns + + +@ray.remote +def _read_csv_with_offset(fn, start, end, header=b'', kwargs={}): + bio = open(fn, 'rb') + bio.seek(start) + to_read = header + bio.read(end - start) + bio.close() + return pd.read_csv(BytesIO(to_read), **kwargs) + + +def read_csv(filepath, + sep=',', + delimiter=None, + header='infer', + names=None, + index_col=None, + usecols=None, + squeeze=False, + prefix=None, + mangle_dupe_cols=True, + dtype=None, + engine=None, + converters=None, + true_values=None, + false_values=None, + skipinitialspace=False, + skiprows=None, + nrows=None, + na_values=None, + keep_default_na=True, + na_filter=True, + verbose=False, + skip_blank_lines=True, + parse_dates=False, + infer_datetime_format=False, + keep_date_col=False, + date_parser=None, + dayfirst=False, + iterator=False, + chunksize=None, + compression='infer', + thousands=None, + decimal=b'.', + lineterminator=None, + quotechar='"', + quoting=0, + escapechar=None, + comment=None, + encoding=None, + dialect=None, + tupleize_cols=None, + error_bad_lines=True, + warn_bad_lines=True, + skipfooter=0, + skip_footer=0, + doublequote=True, + delim_whitespace=False, + as_recarray=None, + compact_ints=None, + use_unsigned=None, + low_memory=True, + buffer_lines=None, + memory_map=False, + float_precision=None): + """Read csv file from local disk. + + Args: + filepath: + The filepath of the csv file. + We only support local files for now. + kwargs: Keyword arguments in pandas::from_csv + """ + kwargs = dict( + sep=sep, + delimiter=delimiter, + header=header, + names=names, + index_col=index_col, + usecols=usecols, + squeeze=squeeze, + prefix=prefix, + mangle_dupe_cols=mangle_dupe_cols, + dtype=dtype, + engine=engine, + converters=converters, + true_values=true_values, + false_values=false_values, + skipinitialspace=skipinitialspace, + skiprows=skiprows, + nrows=nrows, + na_values=na_values, + keep_default_na=keep_default_na, + na_filter=na_filter, + verbose=verbose, + skip_blank_lines=skip_blank_lines, + parse_dates=parse_dates, + infer_datetime_format=infer_datetime_format, + keep_date_col=keep_date_col, + date_parser=date_parser, + dayfirst=dayfirst, + iterator=iterator, + chunksize=chunksize, + compression=compression, + thousands=thousands, + decimal=decimal, + lineterminator=lineterminator, + quotechar=quotechar, + quoting=quoting, + escapechar=escapechar, + comment=comment, + encoding=encoding, + dialect=dialect, + tupleize_cols=tupleize_cols, + error_bad_lines=error_bad_lines, + warn_bad_lines=warn_bad_lines, + skipfooter=skipfooter, + skip_footer=skip_footer, + doublequote=doublequote, + delim_whitespace=delim_whitespace, + as_recarray=as_recarray, + compact_ints=compact_ints, + use_unsigned=use_unsigned, + low_memory=low_memory, + buffer_lines=buffer_lines, + memory_map=memory_map, + float_precision=float_precision) + + offsets = _compute_offset(filepath, get_npartitions()) + + first_line = _get_firstline(filepath) + columns = _infer_column(first_line) + + df_obj_ids = [] + for start, end in offsets: + if start != 0: + df = _read_csv_with_offset.remote( + filepath, start, end, header=first_line, kwargs=kwargs) + else: + df = _read_csv_with_offset.remote( + filepath, start, end, kwargs=kwargs) + df_obj_ids.append(df) + + return DataFrame(df_obj_ids, columns) diff --git a/python/ray/dataframe/test/test_io.py b/python/ray/dataframe/test/test_io.py new file mode 100644 index 000000000000..64ea2c0ff41f --- /dev/null +++ b/python/ray/dataframe/test/test_io.py @@ -0,0 +1,91 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import pytest +import numpy as np +import pandas as pd +import ray +import ray.dataframe as rdf +import ray.dataframe.io as io +import os + +TEST_PARQUET_FILENAME = 'test.parquet' +TEST_CSV_FILENAME = 'test.csv' +SMALL_ROW_SIZE = 2000 +LARGE_ROW_SIZE = 7e6 + + +@pytest.fixture +def ray_df_equals_pandas(ray_df, pandas_df): + return rdf.to_pandas(ray_df).sort_index().equals(pandas_df.sort_index()) + + +@pytest.fixture +def setup_parquet_file(row_size, force=False): + if os.path.exists(TEST_PARQUET_FILENAME) and not force: + pass + else: + df = pd.DataFrame({ + 'col1': np.arange(row_size), + 'col2': np.arange(row_size) + }) + df.to_parquet(TEST_PARQUET_FILENAME) + + +@pytest.fixture +def teardown_parquet_file(): + if os.path.exists(TEST_PARQUET_FILENAME): + os.remove(TEST_PARQUET_FILENAME) + + +@pytest.fixture +def setup_csv_file(row_size, force=False): + if os.path.exists(TEST_CSV_FILENAME) and not force: + pass + else: + df = pd.DataFrame({ + 'col1': np.arange(row_size), + 'col2': np.arange(row_size) + }) + df.to_csv(TEST_CSV_FILENAME) + + +@pytest.fixture +def teardown_csv_file(): + if os.path.exists(TEST_CSV_FILENAME): + os.remove(TEST_CSV_FILENAME) + + +def test_from_parquet_small(): + ray.init() + + setup_parquet_file(SMALL_ROW_SIZE) + + pd_df = pd.read_parquet(TEST_PARQUET_FILENAME) + ray_df = io.read_parquet(TEST_PARQUET_FILENAME) + assert ray_df_equals_pandas(ray_df, pd_df) + + teardown_parquet_file() + + +def test_from_parquet_large(): + setup_parquet_file(LARGE_ROW_SIZE) + + pd_df = pd.read_parquet(TEST_PARQUET_FILENAME) + ray_df = io.read_parquet(TEST_PARQUET_FILENAME) + + assert ray_df_equals_pandas(ray_df, pd_df) + + teardown_parquet_file() + + +def test_from_csv(): + setup_csv_file(SMALL_ROW_SIZE) + + pd_df = pd.read_csv(TEST_CSV_FILENAME) + ray_df = io.read_csv(TEST_CSV_FILENAME) + + assert ray_df_equals_pandas(ray_df, pd_df) + + teardown_csv_file()