Skip to content

Commit

Permalink
Add support for pandas
Browse files Browse the repository at this point in the history
  • Loading branch information
maciejlach committed Oct 22, 2014
1 parent 240826c commit 8aa5cb1
Show file tree
Hide file tree
Showing 12 changed files with 617 additions and 10 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
------------------------------------------------------------------------------
qPython 1.0 RC1 [2014.10.22]
------------------------------------------------------------------------------

- Introduce support for pandas

------------------------------------------------------------------------------
qPython 1.0 Beta 6 [2014.10.16]
------------------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions doc/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
sphinx>=1.2.3
mock>=1.0.1
1 change: 1 addition & 0 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Welcome to qPython's documentation!
connection
queries
type-conversion
pandas
usage-examples


Expand Down
89 changes: 89 additions & 0 deletions doc/source/pandas.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
Pandas integration
==================

The `qPython` allows user to use ``pandas.DataFrame`` and ``pandas.Series``
instead of ``numpy.recarray`` and ``numpy.ndarray`` to represent ``q`` tables
and vectors.

In order to instrument `qPython` to use `pandas`_ data types user has to set
``pandas`` flag while:

- creating :class:`.qconnection.QConnection` instance,
- executing synchronous query: :meth:`~qpython.qconnection.QConnection.sync`,
- or retrieving data from q: :meth:`~qpython.qconnection.QConnection.receive`.

For example:
::

>>> with qconnection.QConnection(host = 'localhost', port = 5000, pandas = True) as q:
>>> ds = q('(1i;0Ni;3i)', pandas = True)
>>> print ds
0 1
1 NaN
2 3
dtype: float64
>>> print ds.meta
metadata(qtype=6)

>>> df = q('flip `name`iq`fullname!(`Dent`Beeblebrox`Prefect;98 42 126;("Arthur Dent"; "Zaphod Beeblebrox"; "Ford Prefect"))')
>>> print df
name iq fullname
0 Dent 98 Arthur Dent
1 Beeblebrox 42 Zaphod Beeblebrox
2 Prefect 126 Ford Prefect
>>> print df.meta
metadata(iq=7, fullname=0, qtype=98, name=11)
>>> print q('type', df)
98

>>> df = q('([eid:1001 0N 1003;sym:`foo`bar`] pos:`d1`d2`d3;dates:(2001.01.01;2000.05.01;0Nd))')
>>> print df
pos dates
eid sym
1001 foo d1 2001-01-01
NaN bar d2 2000-05-01
1003 d3 NaT
>>> print df.meta
metadata(dates=14, qtype=99, eid=7, sym=11, pos=11)
>>> print q('type', df)
99


Data conversions
****************

If ``pandas`` flag is set, `qPython` converts the data according to following
rules:

- ``q`` vectors are represented as ``pandas.Series``:

- ``pandas.Series`` is initialized with ``numpy.ndarray`` being result of
parsing with ``numpy_temporals`` flag set to ``True`` (to ensure that
temporal vectors are represented as numpy ``datetime64``/``timedelta64``
arrays).
- q nulls are replaced with ``numpy.NaN``. This can result in type promotion
as described in `pandas documentation <http://pandas.pydata.org/pandas-docs/stable/gotchas.html#support-for-integer-na>`_.
- ``pandas.Series`` is enriched with custom attribute ``meta``
(:class:`qpython.MetaData`), which contains `qtype` of the vector. Note
that this information is used while serializaing ``pandas.Series`` instance
to IPC protocol.


- tables are represented as ``pandas.DataFrame`` instances:

- individual columns are represented as ``pandas.Series``.
- ``pandas.DataFrame`` is enriched with custom attribute ``meta``
(:class:`qpython.MetaData`), which lists `qtype` for each column in table.
Note that this information is used during ``pandas.DataFrame`` serialization.

- keyed tables are backed as ``pandas.DataFrame`` instances as well:

- index for ``pandas.DataFrame`` is created from key columns.
- ``pandas.DataFrame`` is enriched with custom attribute ``meta``
(:class:`qpython.MetaData`), which lists `qtype` for each column in table,
including index ones. Note that this information is used during
``pandas.DataFrame`` serialization.


.. _pandas: http://pandas.pydata.org/

3 changes: 3 additions & 0 deletions qpython/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def __getattr__(self, attr):
def __getitem__(self, key):
return self.__dict__.get(key, None)

def __setitem__(self, key, value):
self.__dict__[key] = value

def as_dict(self):
return self.__dict__.copy()

Expand Down
184 changes: 184 additions & 0 deletions qpython/_pandas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
#
# Copyright (c) 2011-2014 Exxeleron GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import pandas
import struct

from collections import OrderedDict

from qpython import MetaData
from qpython.qreader import QReader, READER_CONFIGURATION, QReaderException
from qpython.qcollection import QDictionary, qlist
from qpython.qwriter import QWriter, QWriterException
from qpython.qtype import *



class PandasQReader(QReader):

parse = Mapper(QReader._reader_map)

@parse(QDICTIONARY)
def _read_dictionary(self, qtype = QDICTIONARY, options = READER_CONFIGURATION):
if options.pandas:
keys = self._read_object(options = options)
values = self._read_object(options = options)

if isinstance(keys, pandas.DataFrame):
if not isinstance(values, pandas.DataFrame):
raise QReaderException('Keyed table creation: values are expected to be of type pandas.DataFrame. Actual: %s' % type(values))

indices = keys.columns
table = keys
table.meta = keys.meta
table.meta.qtype = QKEYED_TABLE

for column in values.columns:
table[column] = values[column]
table.meta[column] = values.meta[column]

table.set_index([column for column in indices], inplace = True)

return table
else:
keys = keys if not isinstance(keys, pandas.Series) else keys.as_matrix()
values = values if not isinstance(values, pandas.Series) else values.as_matrix()
return QDictionary(keys, values)
else:
return QReader._read_dictionary(self, qtype = qtype, options = options)


@parse(QTABLE)
def _read_table(self, qtype = QTABLE, options = READER_CONFIGURATION):
if options.pandas:
self._buffer.skip() # ignore attributes
self._buffer.skip() # ignore dict type stamp

columns = self._read_object(options = options)
data = self._read_object(options = options)

odict = OrderedDict()
meta = MetaData(qtype = QTABLE)
for i in xrange(len(columns)):
if isinstance(data[i], str):
# convert character list (represented as string) to numpy representation
meta[columns[i]] = QSTRING
odict[columns[i]] = numpy.array(list(data[i]), dtype = numpy.str)
elif isinstance(data[i], (list, tuple)):
# convert character list (represented as string) to numpy representation
meta[columns[i]] = QGENERAL_LIST
odict[columns[i]] = numpy.array(list(data[i]))
else:
meta[columns[i]] = data[i].meta.qtype
odict[columns[i]] = data[i]

df = pandas.DataFrame(odict)
df.meta = meta
return df
else:
return QReader._read_table(self, qtype = qtype, options = options)


def _read_list(self, qtype, options):
if options.pandas:
options.numpy_temporals = True

list = QReader._read_list(self, qtype = qtype, options = options)

if options.pandas:
if qtype != QSYMBOL_LIST:
null = QNULLMAP[-abs(qtype)][1]
ps = pandas.Series(data = list).replace(null, numpy.NaN)
else:
ps = pandas.Series(data = list)

ps.meta = MetaData(qtype = qtype)
return ps
else:
return list



class PandasQWriter(QWriter):

serialize = Mapper(QWriter._writer_map)

@serialize(pandas.Series)
def _write_pandas_series(self, data, qtype = None):
if qtype is not None:
qtype = -abs(qtype)

if qtype is None and hasattr(data, 'meta'):
qtype = -abs(data.meta.qtype)

if data.dtype == '|S1':
qtype = QCHAR

if qtype is None:
qtype = Q_TYPE.get(data.dtype.type, None)

if qtype is None and data.dtype.type in (numpy.datetime64, numpy.timedelta64):
qtype = TEMPORAL_PY_TYPE.get(str(data.dtype), None)

if qtype is None:
# determinate type based on first element of the numpy array
qtype = Q_TYPE.get(type(data[0]), QGENERAL_LIST)

if qtype is None:
raise QWriterException('Unable to serialize pandas series %s' % data)

if qtype == QGENERAL_LIST:
self._write_generic_list(data.as_matrix())
elif qtype == QCHAR:
self._write_string(data.as_matrix().astype(numpy.string_).tostring())
elif data.dtype.type not in (numpy.datetime64, numpy.timedelta64):
data = data.fillna(QNULLMAP[-abs(qtype)][1])
data = data.as_matrix()

if PY_TYPE[qtype] != data.dtype:
data = data.astype(PY_TYPE[qtype])

self._write_list(data, qtype = qtype)
else:
data = data.as_matrix()
data = data.astype(TEMPORAL_Q_TYPE[qtype])
self._write_list(data, qtype = qtype)



@serialize(pandas.DataFrame)
def _write_pandas_data_frame(self, data, qtype = None):
data_columns = data.columns.values

if hasattr(data, 'meta') and data.meta.qtype == QKEYED_TABLE:
# data frame represents keyed table
self._buffer.write(struct.pack('=b', QDICTIONARY))
self._buffer.write(struct.pack('=bxb', QTABLE, QDICTIONARY))
index_columns = data.index.names
self._write(qlist(numpy.array(index_columns), qtype = QSYMBOL_LIST))
data.reset_index(inplace = True)
self._buffer.write(struct.pack('=bxi', QGENERAL_LIST, len(index_columns)))
for column in index_columns:
self._write_pandas_series(data[column], qtype = data.meta[column] if hasattr(data, 'meta') else None)

data.set_index(index_columns, inplace = True)

self._buffer.write(struct.pack('=bxb', QTABLE, QDICTIONARY))
self._write(qlist(numpy.array(data_columns), qtype = QSYMBOL_LIST))
self._buffer.write(struct.pack('=bxi', QGENERAL_LIST, len(data_columns)))
for column in data_columns:
self._write_pandas_series(data[column], qtype = data.meta[column] if hasattr(data, 'meta') else None)

4 changes: 2 additions & 2 deletions qpython/qcollection.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,9 @@ class QDictionary(object):
- `values` (`QList`, `QTable`, `tuple` or `list`) - dictionary values
'''
def __init__(self, keys, values):
if not isinstance(keys, (QList, tuple, list)):
if not isinstance(keys, (QList, tuple, list, numpy.ndarray)):
raise ValueError('%s expects keys to be of type: QList, tuple or list. Actual type: %s' % (self.__class__.__name__, type(keys)))
if not isinstance(values, (QTable, QList, tuple, list)):
if not isinstance(values, (QTable, QList, tuple, list, numpy.ndarray)):
raise ValueError('%s expects values to be of type: QTable, QList, tuple or list. Actual type: %s' % (self.__class__.__name__, type(values)))
if len(keys) != len(values):
raise ValueError('Number of keys: %d doesn`t match number of values: %d' % (len(keys), len(values)))
Expand Down
23 changes: 19 additions & 4 deletions qpython/qreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@


READER_CONFIGURATION = MetaData(raw = False,
numpy_temporals = False)
numpy_temporals = False,
pandas = False)



Expand Down Expand Up @@ -106,6 +107,19 @@ class QReader(object):
_reader_map = {}
parse = Mapper(_reader_map)


def __new__(cls, *args, **kwargs):
if cls is QReader:
# try to load optional pandas binding
try:
from qpython._pandas import PandasQReader
return super(QReader, cls).__new__(PandasQReader, args, kwargs)
except ImportError:
return super(QReader, cls).__new__(QReader, args, kwargs)
else:
return super(QReader, cls).__new__(cls, args, kwargs)


def __init__(self, stream):
self._stream = stream
self._buffer = QReader.BytesBuffer()
Expand Down Expand Up @@ -188,7 +202,7 @@ def read_data(self, message_size, is_compressed = False, **options):
:returns: read data (parsed or raw byte form)
'''
options = MetaData(**READER_CONFIGURATION.union_dict(**options))

if is_compressed:
if self._stream:
self._buffer.wrap(self._read_bytes(4))
Expand Down Expand Up @@ -290,10 +304,10 @@ def _read_list(self, qtype, options):
data = numpy.fromstring(raw, dtype = conversion)
if not self._is_native:
data.byteswap(True)

if qtype >= QTIMESTAMP_LIST and qtype <= QTIME_LIST and options.numpy_temporals:
data = array_from_raw_qtemporal(data, qtype)

return qlist(data, qtype = qtype, adjust_dtype = False)
else:
raise QReaderException('Unable to deserialize q type: %s' % hex(qtype))
Expand Down Expand Up @@ -536,3 +550,4 @@ def get_symbols(self, count):

return raw.split('\x00')


Loading

0 comments on commit 8aa5cb1

Please sign in to comment.