Skip to content

Commit

Permalink
BUG: fixed an issue with extracting bundles
Browse files Browse the repository at this point in the history
  • Loading branch information
fredfortier committed Mar 8, 2018
1 parent 5de8954 commit fc9837b
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 29 deletions.
56 changes: 30 additions & 26 deletions catalyst/marketplace/marketplace.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from catalyst.marketplace.utils.path_utils import get_bundle_folder, \
get_data_source_folder, get_marketplace_folder, \
get_user_pubaddr, get_temp_bundles_folder, extract_bundle
from catalyst.utils.paths import ensure_directory

if sys.version_info.major < 3:
import urllib
Expand Down Expand Up @@ -154,14 +155,14 @@ def sign_transaction(self, tx):
'Gas Price:\t\t[Accept the default value]\n'
'Nonce:\t\t\t{nonce}\n'
'Data:\t\t\t{data}\n'.format(
url=url,
_from=tx['from'],
to=tx['to'],
value=tx['value'],
gas=tx['gas'],
nonce=tx['nonce'],
data=tx['data'], )
)
url=url,
_from=tx['from'],
to=tx['to'],
value=tx['value'],
gas=tx['gas'],
nonce=tx['nonce'],
data=tx['data'], )
)

webbrowser.open_new(url)

Expand Down Expand Up @@ -221,16 +222,16 @@ def subscribe(self, dataset=None):
print(df_sets)
dataset_num = input('Choose the dataset you want to '
'subscribe to [0..{}]: '.format(
df_sets.size-1))
df_sets.size - 1))
try:
dataset_num = int(dataset_num)
except ValueError:
print('Enter a number between 0 and {}'.format(
df_sets.size-1))
df_sets.size - 1))
else:
if dataset_num not in range(0, df_sets.size):
print('Enter a number between 0 and {}'.format(
df_sets.size-1))
df_sets.size - 1))
else:
dataset = df_sets.iloc[dataset_num]['dataset']
break
Expand Down Expand Up @@ -296,14 +297,14 @@ def subscribe(self, dataset=None):
'buy: {} ENG. Get enough ENG to cover the costs of the '
'monthly\nsubscription for what you are trying to buy, '
'and try again.'.format(
address, from_grains(balance), price))
address, from_grains(balance), price))
return

while True:
agree_pay = input('Please confirm that you agree to pay {} ENG '
'for a monthly subscription to the dataset "{}" '
'starting today. [default: Y] '.format(
price, dataset)) or 'y'
price, dataset)) or 'y'
if agree_pay.lower() not in ('y', 'n'):
print("Please answer Y or N.")
else:
Expand Down Expand Up @@ -365,10 +366,10 @@ def subscribe(self, dataset=None):
'Now processing second transaction.')

tx = self.mkt_contract.functions.subscribe(
Web3.toHex(dataset),
).buildTransaction({
'from': address,
'nonce': self.web3.eth.getTransactionCount(address)})
Web3.toHex(dataset),
).buildTransaction({
'from': address,
'nonce': self.web3.eth.getTransactionCount(address)})

if 'ropsten' in ETH_REMOTE_NODE:
tx['gas'] = min(int(tx['gas'] * 1.5), 4700000)
Expand Down Expand Up @@ -408,7 +409,7 @@ def subscribe(self, dataset=None):
'You can now ingest this dataset anytime during the '
'next month by running the following command:\n'
'catalyst marketplace ingest --dataset={}'.format(
dataset, address, dataset))
dataset, address, dataset))

def process_temp_bundle(self, ds_name, path):
"""
Expand All @@ -425,7 +426,10 @@ def process_temp_bundle(self, ds_name, path):
"""
tmp_bundle = extract_bundle(path)
bundle_folder = get_data_source_folder(ds_name)
bundle_folder = os.path.join(
get_data_source_folder(ds_name), 'bundle'
)
ensure_directory(bundle_folder)
if os.listdir(bundle_folder):
zsource = bcolz.ctable(rootdir=tmp_bundle, mode='r')
ztarget = bcolz.ctable(rootdir=bundle_folder, mode='r')
Expand All @@ -450,16 +454,16 @@ def ingest(self, ds_name=None, start=None, end=None, force_download=False):
print(df_sets)
dataset_num = input('Choose the dataset you want to '
'ingest [0..{}]: '.format(
df_sets.size-1))
df_sets.size - 1))
try:
dataset_num = int(dataset_num)
except ValueError:
print('Enter a number between 0 and {}'.format(
df_sets.size-1))
df_sets.size - 1))
else:
if dataset_num not in range(0, df_sets.size):
print('Enter a number between 0 and {}'.format(
df_sets.size-1))
df_sets.size - 1))
else:
ds_name = df_sets.iloc[dataset_num]['dataset']
break
Expand Down Expand Up @@ -491,10 +495,10 @@ def ingest(self, ds_name=None, start=None, end=None, force_download=False):
print('Your subscription to dataset "{}" expired on {} UTC.'
'Please renew your subscription by running:\n'
'catalyst marketplace subscribe --dataset={}'.format(
ds_name,
pd.to_datetime(check_sub[4], unit='s', utc=True),
ds_name)
)
ds_name,
pd.to_datetime(check_sub[4], unit='s', utc=True),
ds_name)
)

if 'key' in self.addresses[address_i]:
key = self.addresses[address_i]['key']
Expand Down
54 changes: 54 additions & 0 deletions catalyst/marketplace/utils/bundle_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import os
import random
import re
import shutil

import bcolz
import numpy as np
import pandas as pd
from six import string_types


def merge_bundles(zsource, ztarget):
Expand All @@ -27,10 +31,60 @@ def merge_bundles(zsource, ztarget):
df.drop_duplicates(inplace=True)
df.set_index(['date', 'symbol'], drop=False, inplace=True)

sanitize_df(df)

dirname = os.path.basename(ztarget.rootdir)
bak_dir = ztarget.rootdir.replace(dirname, '.{}'.format(dirname))
shutil.move(ztarget.rootdir, bak_dir)

z = bcolz.ctable.fromdataframe(df=df, rootdir=ztarget.rootdir)
shutil.rmtree(bak_dir)
return z


def sanitize_df(df):
# Using a sampling method to identify dates for efficiency with
# large datasets
if len(df) > 100:
indexes = random.sample(range(0, len(df) - 1), 100)
else:
indexes = range(0, len(df) - 1)

for column in df.columns:
is_date = False
for index in indexes:
value = df[column].iloc[index]
if not isinstance(value, string_types):
continue

# TODO: assuming that the date is at least daily
exp = re.compile(r'^\d{4}-\d{2}-\d{2}.*$')
matches = exp.findall(value)

if matches:
is_date = True
break

if is_date:
df[column] = pd.to_datetime(df[column])

else:
try:
ser = safely_reduce_dtype(df[column])
df[column] = ser
except Exception:
pass

return df


def safely_reduce_dtype(ser): # pandas.Series or numpy.array
orig_dtype = "".join(
[x for x in ser.dtype.name if x.isalpha()]) # float/int
mx = 1
for val in ser.values:
new_itemsize = np.min_scalar_type(val).itemsize
if mx < new_itemsize:
mx = new_itemsize
new_dtype = orig_dtype + str(mx * 8)
return ser.astype(new_dtype)
5 changes: 2 additions & 3 deletions tests/marketplace/test_marketplace.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from catalyst.marketplace.marketplace import Marketplace
from catalyst.testing.fixtures import WithLogger, ZiplineTestCase
import pandas as pd


class TestMarketplace(WithLogger, ZiplineTestCase):
Expand All @@ -16,12 +15,12 @@ def test_register(self):

def test_subscribe(self):
marketplace = Marketplace()
marketplace.subscribe('marketcap2222')
marketplace.subscribe('marketcap')
pass

def test_ingest(self):
marketplace = Marketplace()
ds_def = marketplace.ingest('github')
ds_def = marketplace.ingest('marketcap')
pass

def test_publish(self):
Expand Down

0 comments on commit fc9837b

Please sign in to comment.