Skip to content

Commit

Permalink
TST: Adds tests for running multiple columns in one query
Browse files Browse the repository at this point in the history
  • Loading branch information
llllllllll committed Jan 21, 2016
1 parent 97298d1 commit 517ad7a
Showing 1 changed file with 78 additions and 0 deletions.
78 changes: 78 additions & 0 deletions tests/pipeline/test_blaze.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,43 @@ def test_custom_query_time_tz(self):
))
assert_frame_equal(result, expected, check_dtype=False)

def test_id_multiple_columns(self):
df = self.df.copy()
df['other'] = df.value + 1
fields = OrderedDict(self.dshape.measure.fields)
fields['other'] = fields['value']
expr = bz.Data(df, name='expr', dshape=var * Record(fields))
loader = BlazeLoader()
ds = from_blaze(
expr,
loader=loader,
no_deltas_rule='ignore',
)
p = Pipeline()
p.add(ds.value.latest, 'value')
p.add(ds.other.latest, 'other')
dates = self.dates

with tmp_asset_finder() as finder:
result = SimplePipelineEngine(
loader,
dates,
finder,
).run_pipeline(p, dates[0], dates[-1])

expected = df.drop('asof_date', axis=1).set_index(
['timestamp', 'sid'],
).sort_index(axis=1)
expected.index = pd.MultiIndex.from_product((
expected.index.levels[0],
finder.retrieve_all(expected.index.levels[1]),
))
assert_frame_equal(
result,
expected.sort_index(axis=1),
check_dtype=False,
)

def test_id_macro_dataset(self):
expr = bz.Data(self.macro_df, name='expr', dshape=self.macro_dshape)
loader = BlazeLoader()
Expand Down Expand Up @@ -391,6 +428,47 @@ def test_id_macro_dataset(self):
)
assert_frame_equal(result, expected, check_dtype=False)

def test_id_macro_dataset_multiple_columns(self):
df = self.macro_df.copy()
df['other'] = df.value + 1
fields = OrderedDict(self.macro_dshape.measure.fields)
fields['other'] = fields['value']
expr = bz.Data(df, name='expr', dshape=var * Record(fields))
loader = BlazeLoader()
ds = from_blaze(
expr,
loader=loader,
no_deltas_rule='ignore',
)
p = Pipeline()
p.add(ds.value.latest, 'value')
p.add(ds.other.latest, 'latest')
dates = self.dates

asset_info = asset_infos[0][0]
with tmp_asset_finder(asset_info) as finder:
result = SimplePipelineEngine(
loader,
dates,
finder,
).run_pipeline(p, dates[0], dates[-1])

expected = pd.DataFrame(
np.array([[0, 1],
[1, 2],
[2, 3]]).repeat(3, axis=0),
index=pd.MultiIndex.from_product((
df.timestamp,
finder.retrieve_all(asset_info.index),
)),
columns=('value', 'latest'),
).sort_index(axis=1)
assert_frame_equal(
result,
expected.sort_index(axis=1),
check_dtype=False,
)

def _run_pipeline(self,
expr,
deltas,
Expand Down

0 comments on commit 517ad7a

Please sign in to comment.