Skip to content

Commit

Permalink
ENH: use cython bin groupers, fix bug in DatetimeIndex.__getitem caus…
Browse files Browse the repository at this point in the history
…ing slowness, some timeseries vbenches
  • Loading branch information
wesm committed Apr 5, 2012
1 parent 30dd412 commit 91d8453
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 156 deletions.
35 changes: 34 additions & 1 deletion pandas/core/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,39 @@ def agg_series(self, obj, func):
grouper = lib.SeriesBinGrouper(obj, func, self.bins, dummy)
return grouper.get_result()

#----------------------------------------------------------------------
# cython aggregation

_cython_functions = {
'add' : lib.group_add_bin,
'mean' : lib.group_mean_bin,
'var' : lib.group_var_bin,
'std' : lib.group_var_bin
}

def aggregate(self, values, how):
agg_func = self._cython_functions[how]
if values.ndim == 1:
squeeze = True
values = values[:, None]
out_shape = (self.ngroups, 1)
else:
squeeze = False
out_shape = (self.ngroups, values.shape[1])

trans_func = self._cython_transforms.get(how, lambda x: x)

# will be filled in Cython function
result = np.empty(out_shape, dtype=np.float64)
counts = np.zeros(self.ngroups, dtype=np.int32)

agg_func(result, counts, values, self.bins)
result = trans_func(result)

if squeeze:
result = result.squeeze()

return result, counts

class Grouping(object):
"""
Expand Down Expand Up @@ -1901,7 +1934,7 @@ def picker(arr):
return arr[-1] if arr is not None and len(arr) else np.nan
return picker

raise ValueError("Unrecognized method: %s" % how)
return how


from pandas.util import py3compat
Expand Down
31 changes: 20 additions & 11 deletions pandas/core/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,7 @@ class DatetimeIndex(Int64Index):
def __new__(cls, data=None,
freq=None, start=None, end=None, periods=None,
dtype=None, copy=False, name=None, tz=None,
**kwds):
verify_integrity=True, **kwds):

warn = False
if 'offset' in kwds and kwds['offset']:
Expand Down Expand Up @@ -1292,11 +1292,12 @@ def __new__(cls, data=None,
# TODO: this is horribly inefficient. If user passes data + offset, we
# need to make sure data points conform. Punting on this

if offset is not None:
for i, ts in enumerate(subarr):
if not offset.onOffset(Timestamp(ts)):
val = Timestamp(offset.rollforward(ts)).value
subarr[i] = val
if verify_integrity:
if offset is not None:
for i, ts in enumerate(subarr):
if not offset.onOffset(Timestamp(ts)):
val = Timestamp(offset.rollforward(ts)).value
subarr[i] = val

subarr = subarr.view(cls)
subarr.name = name
Expand All @@ -1305,6 +1306,15 @@ def __new__(cls, data=None,

return subarr

@classmethod
def _simple_new(cls, values, name, offset, tz):
result = values.view(cls)
result.name = name
result.offset = offset
result.tz = tz

return result

@property
def tzinfo(self):
"""
Expand Down Expand Up @@ -1740,16 +1750,15 @@ def __getitem__(self, key):
if result.ndim > 1:
return result

return DatetimeIndex(result, name=self.name, freq=new_offset,
tz=self.tz)
return self._simple_new(result, self.name, new_offset, self.tz)

# Try to run function on index first, and then on elements of index
# Especially important for group-by functionality
def map(self, func_to_map):
def map(self, f):
try:
return func_to_map(self)
return f(self)
except:
return super(DatetimeIndex, self).map(func_to_map)
return Index.map(self, f)

# alias to offset
@property
Expand Down
149 changes: 10 additions & 139 deletions pandas/src/groupby.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ def generate_bins_dt64(ndarray[int64_t] values, ndarray[int64_t] binner,
@cython.boundscheck(False)
@cython.wraparound(False)
def group_add_bin(ndarray[float64_t, ndim=2] out,
ndarray[int32_t] counts,
ndarray[float64_t, ndim=2] values,
ndarray[int32_t] bins):
'''
Expand All @@ -503,6 +504,7 @@ def group_add_bin(ndarray[float64_t, ndim=2] out,
if b < ngroups - 1 and i >= bins[b]:
b += 1

counts[b] += 1
for j in range(K):
val = values[i, j]

Expand All @@ -515,6 +517,7 @@ def group_add_bin(ndarray[float64_t, ndim=2] out,
if b < ngroups - 1 and i >= bins[b]:
b += 1

counts[b] += 1
val = values[i, 0]

# not nan
Expand All @@ -532,6 +535,7 @@ def group_add_bin(ndarray[float64_t, ndim=2] out,
@cython.boundscheck(False)
@cython.wraparound(False)
def group_mean_bin(ndarray[float64_t, ndim=2] out,
ndarray[int32_t] counts,
ndarray[float64_t, ndim=2] values,
ndarray[int32_t] bins):
cdef:
Expand All @@ -551,6 +555,7 @@ def group_mean_bin(ndarray[float64_t, ndim=2] out,
if b < ngroups - 1 and i >= bins[b]:
b += 1

counts[b] += 1
for j in range(K):
val = values[i, j]

Expand All @@ -563,6 +568,7 @@ def group_mean_bin(ndarray[float64_t, ndim=2] out,
if b < ngroups - 1 and i >= bins[b]:
b += 1

counts[b] += 1
val = values[i, 0]

# not nan
Expand All @@ -581,6 +587,7 @@ def group_mean_bin(ndarray[float64_t, ndim=2] out,
@cython.boundscheck(False)
@cython.wraparound(False)
def group_var_bin(ndarray[float64_t, ndim=2] out,
ndarray[int32_t] counts,
ndarray[float64_t, ndim=2] values,
ndarray[int32_t] bins):

Expand All @@ -602,6 +609,8 @@ def group_var_bin(ndarray[float64_t, ndim=2] out,
if b < ngroups - 1 and i >= bins[b]:
b += 1

counts[b] += 1

for j in range(K):
val = values[i, j]

Expand All @@ -615,6 +624,7 @@ def group_var_bin(ndarray[float64_t, ndim=2] out,
if b < ngroups - 1 and i >= bins[b]:
b += 1

counts[b] += 1
val = values[i, 0]

# not nan
Expand Down Expand Up @@ -793,145 +803,6 @@ def generate_slices(ndarray[int32_t] labels, Py_ssize_t ngroups):

return starts, ends

'''
def ts_upsample_mean(ndarray[object] indices,
ndarray[object] buckets,
ndarray[float64_t] values,
inclusive=False):
cdef:
Py_ssize_t i, j, nbuckets, nvalues
ndarray[float64_t] output
object next_bound
float64_t the_sum, val, nobs
nbuckets = len(buckets)
nvalues = len(indices)
assert(len(values) == len(indices))
output = np.empty(nbuckets, dtype=float)
output.fill(np.NaN)
j = 0
for i from 0 <= i < nbuckets:
next_bound = buckets[i]
the_sum = 0
nobs = 0
if inclusive:
while j < nvalues and indices[j] <= next_bound:
val = values[j]
# not NaN
if val == val:
the_sum += val
nobs += 1
j += 1
else:
while j < nvalues and indices[j] < next_bound:
cdef:
Py_ssize_t i, j, nbuckets, nvalues
ndarray[float64_t] output
object next_bound
float64_t the_sum, val, nobs
nbuckets = len(buckets)
nvalues = len(indices)
assert(len(values) == len(indices))
output = np.empty(nbuckets, dtype=float)
output.fill(np.NaN)
j = 0
for i from 0 <= i < nbuckets:
next_bound = buckets[i]
the_sum = 0
nobs = 0
if inclusive:
while j < nvalues and indices[j] <= next_bound:
val = values[j]
# not NaN
if val == val:
the_sum += val
nobs += 1
j += 1
else:
while j < nvalues and indices[j] < next_bound:
val = values[j]
# not NaN
if val == val:
the_sum += val
nobs += 1
j += 1
if nobs > 0:
output[i] = the_sum / nobs
if j >= nvalues:
break
return output
val = values[j]
# not NaN
if val == val:
the_sum += val
nobs += 1
j += 1
if nobs > 0:
output[i] = the_sum / nobs
if j >= nvalues:
break
return output
'''

def ts_upsample_generic(ndarray[object] indices,
ndarray[object] buckets,
ndarray[float64_t] values,
object aggfunc,
inclusive=False):
'''
put something here
'''
cdef:
Py_ssize_t i, j, jstart, nbuckets, nvalues
ndarray[float64_t] output
object next_bound
float64_t the_sum, val, nobs

nbuckets = len(buckets)
nvalues = len(indices)

assert(len(values) == len(indices))

output = np.empty(nbuckets, dtype=float)
output.fill(np.NaN)

j = 0
for i from 0 <= i < nbuckets:
next_bound = buckets[i]
the_sum = 0
nobs = 0

jstart = j
if inclusive:
while j < nvalues and indices[j] <= next_bound:
j += 1
else:
while j < nvalues and indices[j] < next_bound:
j += 1

if nobs > 0:
output[i] = aggfunc(values[jstart:j])

if j >= nvalues:
break

return output


def groupby_arrays(ndarray index, ndarray _labels):
cdef:
Expand Down
3 changes: 2 additions & 1 deletion pandas/tests/test_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,9 @@ def test_custom_grouper(self):
idx = idx.append(DatetimeIndex([np.datetime64(dti[-1])]))
expect = Series(arr, index=idx)

# cython returns float for now
result = g.agg(np.sum)
assert_series_equal(result, expect)
assert_series_equal(result, expect.astype(float))

data = np.random.rand(len(dti), 10)
df = DataFrame(data, index=dti)
Expand Down
10 changes: 7 additions & 3 deletions pandas/tests/test_tseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ def test_group_add_bin():
# bin-based group_add
bins = np.array([3, 6], dtype=np.int32)
out = np.zeros((3, 1), np.float64)
lib.group_add_bin(out, obj, bins)
counts = np.empty(len(out), dtype=np.int32)
lib.group_add_bin(out, counts, obj, bins)

assert_almost_equal(out, exp)

Expand All @@ -333,7 +334,8 @@ def test_group_mean_bin():
# bin-based group_mean
bins = np.array([3, 6], dtype=np.int32)
out = np.zeros((3, 1), np.float64)
lib.group_mean_bin(out, obj, bins)
counts = np.empty(len(out), dtype=np.int32)
lib.group_mean_bin(out, counts, obj, bins)

assert_almost_equal(out, exp)

Expand All @@ -349,7 +351,9 @@ def test_group_var_bin():
# bin-based group_var
bins = np.array([3, 6], dtype=np.int32)
out = np.zeros((3, 1), np.float64)
lib.group_var_bin(out, obj, bins)
counts = np.empty(len(out), dtype=np.int32)

lib.group_var_bin(out, counts, obj, bins)

assert_almost_equal(out, exp)

Expand Down
Loading

0 comments on commit 91d8453

Please sign in to comment.