Skip to content

Commit

Permalink
API redesign: improve custom serializers & deserializers (relates to e…
Browse files Browse the repository at this point in the history
  • Loading branch information
maciejlach committed Jan 21, 2016
1 parent 669600c commit eece39d
Show file tree
Hide file tree
Showing 10 changed files with 253 additions and 48 deletions.
2 changes: 1 addition & 1 deletion doc/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def __getattr__(cls, name):
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
#html_static_path = ['_static']

# Add any extra paths that contain custom files (such as robots.txt or
# .htaccess) here, relative to this directory. These files are copied
Expand Down
40 changes: 10 additions & 30 deletions doc/source/connection.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.. _connection:

Managing connection
===================

Expand Down Expand Up @@ -51,41 +53,19 @@ synchronous/asynchronous queries (:meth:`~qpython.qconnection.QConnection.sync`,
(:meth:`~qpython.qconnection.QConnection.receive`).



.. _custom_ipc_mapping:

Custom IPC protocol serializers/deserializers
*********************************************

Default IPC serializers (`.QWriter` and `.PandasQWriter`) and deserializers
(`.QReader` and `.PandasQReader`) can be replaced with custom implementations.
This allow users to override the default mapping between the q types and Python
representation.
Default IPC serializers (:class:`.QWriter` and :class:`._pandas.PandasQWriter`) and
deserializers (:class:`.QReader` and :class:`._pandas.PandasQReader`) can be replaced
with custom implementations. This allow users to override the default mapping
between the q types and Python representation.
::

q = qconnection.QConnection(host = 'localhost', port = 5000, writer_class = MyQWriter, reader_class = MyQReader)


::

class MyQReader(QReader):
# QReader and QWriter use decorators to map data types and corresponding function handlers
parse = Mapper(QReader._reader_map)
def _read_list(self, qtype):
if qtype == QSYMBOL_LIST:
self._buffer.skip()
length = self._buffer.get_int()
symbols = self._buffer.get_symbols(length)
return [s.decode(self._encoding) for s in symbols]
else:
return QReader._read_list(self, qtype = qtype)
@parse(QSYMBOL)
def _read_symbol(self, qtype = QSYMBOL):
return numpy.string_(self._buffer.get_symbol()).decode(self._encoding)
with qconnection.QConnection(host='localhost', port=5000, reader_class = MyQReader) as q:
symbols = q.sync('`foo`bar')
print(symbols, type(symbols), type(symbols[0]))
symbol = q.sync('`foo')
print(symbol, type(symbol))
Refer to :ref:`custom_type_mapping` for details.
7 changes: 3 additions & 4 deletions doc/source/pandas.rst
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
.. _pandas:

Pandas integration
==================

The `qPython` allows user to use ``pandas.DataFrame`` and ``pandas.Series``
instead of ``numpy.recarray`` and ``numpy.ndarray`` to represent ``q`` tables
and vectors.

In order to instrument `qPython` to use `pandas`_ data types user has to set
In order to instrument `qPython` to use `pandas <http://pandas.pydata.org/>`_ data types user has to set
``pandas`` flag while:

- creating :class:`.qconnection.QConnection` instance,
Expand Down Expand Up @@ -222,6 +224,3 @@ keyed table, use type hinting mechanism to enforce the serialization rules::
# pos | s
# dates| d


.. _pandas: http://pandas.pydata.org/

2 changes: 2 additions & 0 deletions doc/source/queries.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.. _queries:

Queries
=======

Expand Down
57 changes: 55 additions & 2 deletions doc/source/type-conversion.rst
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
.. _type-conversion:

Types conversions
=================


Data types supported by q and Python are incompatible and thus require
additional translation. This page describes rules used for converting data types
between q and Python.
additional translation. This page describes default rules used for converting
data types between q and Python.

The translation mechanism used in qPython library is designed to:
- deserialized message from kdb+ can be serialized and send back to kdb+
without additional processing,
- end user can enforce type hinting for translation,
- efficient storage for tables and lists is backed with numpy arrays.

Default type mapping can be overriden by using custom IPC serializers
or deserializers implementations.

Atoms
*****
Expand Down Expand Up @@ -403,3 +407,52 @@ The :py:mod:`qtype` provides two utility functions to work with null values:
- :func:`~.qtype.qnull` - retrieves null type for specified q type code,
- :func:`~.qtype.is_null` - checks whether value is considered a null for
specified q type code.


.. _custom_type_mapping:

Custom type mapping
*******************

Default type mapping can be overwritten by providing custom implementations
of :class:`.QWriter` and/or :class:`.QReader` and proper initialization of
the connection as described in :ref:`custom_ipc_mapping`.

:class:`.QWriter` and :class:`.QReader` use parse time decorator
(:class:`Mapper`) which generates mapping between q and Python types.
This mapping is stored in a static variable: ``QReader._reader_map`` and
``QWriter._writer_map``. In case mapping is not found in the mapping:

- :class:`.QWriter` tries to find a matching qtype in the ``~qtype.Q_TYPE``
dictionary and serialize data as q atom,
- :class:`.QReader` tries to parse lists and atoms based on the type indicator
in IPC stream.

While subclassing these classes, user can create copy of the mapping
in the parent class and use parse time decorator:


.. code:: python
class PandasQWriter(QWriter):
_writer_map = dict.copy(QWriter._writer_map) # create copy of default serializer map
serialize = Mapper(_writer_map) # upsert custom mapping
@serialize(pandas.Series)
def _write_pandas_series(self, data, qtype = None):
# serialize pandas.Series into IPC stream
# ..omitted for readability..
self._write_list(data, qtype = qtype)
class PandasQReader(QReader):
_reader_map = dict.copy(QReader._reader_map) # create copy of default deserializer map
parse = Mapper(_reader_map) # overwrite default mapping
@parse(QTABLE)
def _read_table(self, qtype = QTABLE):
# parse q table as pandas.DataFrame
# ..omitted for readability..
return pandas.DataFrame(data)
Refer to :ref:`sample_custom_reader` for complete example.
76 changes: 76 additions & 0 deletions doc/source/usage-examples.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.. _usage-examples:

Usage examples
==============

Expand Down Expand Up @@ -485,4 +487,78 @@ This example shows how to stream data to the kdb+ process using standard tickerp
t.join()
.. _sample_custom_reader:

Custom type IPC deserialization
*******************************

This example shows how to override standard deserialization type mapping with two different :class:`.QReader` sub-classes.
Please refer to :ref:`custom_type_mapping` on implementation aspects:

.. code:: python
import numpy
from qpython import qconnection
from qpython.qreader import QReader
from qpython.qtype import QSYMBOL, QSYMBOL_LIST, Mapper
class StringQReader(QReader):
# QReader and QWriter use decorators to map data types and corresponding function handlers
_reader_map = dict.copy(QReader._reader_map)
parse = Mapper(_reader_map)
def _read_list(self, qtype):
if qtype == QSYMBOL_LIST:
self._buffer.skip()
length = self._buffer.get_int()
symbols = self._buffer.get_symbols(length)
return [s.decode(self._encoding) for s in symbols]
else:
return QReader._read_list(self, qtype = qtype)
@parse(QSYMBOL)
def _read_symbol(self, qtype = QSYMBOL):
return numpy.string_(self._buffer.get_symbol()).decode(self._encoding)
class ReverseStringQReader(QReader):
# QReader and QWriter use decorators to map data types and corresponding function handlers
_reader_map = dict.copy(QReader._reader_map)
parse = Mapper(_reader_map)
@parse(QSYMBOL_LIST)
def _read_symbol_list(self, qtype):
self._buffer.skip()
length = self._buffer.get_int()
symbols = self._buffer.get_symbols(length)
return [s.decode(self._encoding)[::-1] for s in symbols]
@parse(QSYMBOL)
def _read_symbol(self, qtype = QSYMBOL):
return numpy.string_(self._buffer.get_symbol()).decode(self._encoding)[::-1]
if __name__ == '__main__':
with qconnection.QConnection(host = 'localhost', port = 5000, reader_class = StringQReader) as q:
symbols = q.sync('`foo`bar')
print(symbols, type(symbols), type(symbols[0]))
symbol = q.sync('`foo')
print(symbol, type(symbol))
with qconnection.QConnection(host = 'localhost', port = 5000, reader_class = ReverseStringQReader) as q:
symbols = q.sync('`foo`bar')
print(symbols, type(symbols), type(symbols[0]))
symbol = q.sync('`foo')
print(symbol, type(symbol))
.. _Twisted: http://twistedmatrix.com/trac/
21 changes: 12 additions & 9 deletions qpython/_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@

class PandasQReader(QReader):

parse = Mapper(QReader._reader_map)
_reader_map = dict.copy(QReader._reader_map)
parse = Mapper(_reader_map)

@parse(QDICTIONARY)
def _read_dictionary(self, qtype = QDICTIONARY):
Expand Down Expand Up @@ -106,34 +107,36 @@ def _read_list(self, qtype):
if self._options.pandas:
self._options.numpy_temporals = True

list = QReader._read_list(self, qtype = qtype)
qlist = QReader._read_list(self, qtype = qtype)

if self._options.pandas:
if -abs(qtype) not in [QMONTH, QDATE, QDATETIME, QMINUTE, QSECOND, QTIME, QTIMESTAMP, QTIMESPAN, QSYMBOL]:
null = QNULLMAP[-abs(qtype)][1]
ps = pandas.Series(data = list).replace(null, numpy.NaN)
ps = pandas.Series(data = qlist).replace(null, numpy.NaN)
else:
ps = pandas.Series(data = list)
ps = pandas.Series(data = qlist)

ps.meta = MetaData(qtype = qtype)
return ps
else:
return list
return qlist


@parse(QGENERAL_LIST)
def _read_general_list(self, qtype = QGENERAL_LIST):
list = QReader._read_general_list(self, qtype)
qlist = QReader._read_general_list(self, qtype)
if self._options.pandas:
return [numpy.nan if isinstance(element, basestring) and element == b' ' else element for element in list]
return [numpy.nan if isinstance(element, basestring) and element == b' ' else element for element in qlist]
else:
return list
return qlist



class PandasQWriter(QWriter):

serialize = Mapper(QWriter._writer_map)
_writer_map = dict.copy(QWriter._writer_map)
serialize = Mapper(_writer_map)


@serialize(pandas.Series)
def _write_pandas_series(self, data, qtype = None):
Expand Down
10 changes: 9 additions & 1 deletion qpython/qreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ class QReader(object):
:Parameters:
- `stream` (`file object` or `None`) - data input stream
- `encoding` (`string`) - encoding for characters parsing
:Attrbutes:
- `_reader_map` - stores mapping between q types and functions
responsible for parsing into Python objects
'''

_reader_map = {}
Expand Down Expand Up @@ -215,7 +219,7 @@ def read_data(self, message_size, is_compressed = False, **options):
def _read_object(self):
qtype = self._buffer.get_byte()

reader = QReader._reader_map.get(qtype, None)
reader = self._get_reader(qtype)

if reader:
return reader(self, qtype)
Expand All @@ -227,6 +231,10 @@ def _read_object(self):
raise QReaderException('Unable to deserialize q type: %s' % hex(qtype))


def _get_reader(self, qtype):
return self._reader_map.get(qtype, None)


@parse(QERROR)
def _read_error(self, qtype = QERROR):
raise QException(self._read_symbol())
Expand Down
10 changes: 9 additions & 1 deletion qpython/qwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class QWriter(object):
- `stream` (`socket` or `None`) - stream for data serialization
- `protocol_version` (`integer`) - version IPC protocol
- `encoding` (`string`) - encoding for characters serialization
:Attrbutes:
- `_writer_map` - stores mapping between Python types and functions
responsible for serializing into IPC representation
'''

_writer_map = {}
Expand Down Expand Up @@ -102,7 +106,7 @@ def _write(self, data):
else:
data_type = type(data)

writer = self._writer_map.get(data_type, None)
writer = self._get_writer(data_type)

if writer:
writer(self, data)
Expand All @@ -115,6 +119,10 @@ def _write(self, data):
raise QWriterException('Unable to serialize type: %s' % data.__class__ if isinstance(data, object) else type(data))


def _get_writer(self, data_type):
return self._writer_map.get(data_type, None)


def _write_null(self):
self._buffer.write(struct.pack('=bx', QNULL))

Expand Down
Loading

0 comments on commit eece39d

Please sign in to comment.