Skip to content

Commit

Permalink
Merge pull request deepchem#899 from lilleswing/hdf5-diskdataset-meta…
Browse files Browse the repository at this point in the history
…data

Store MetaData for DiskDatasets as hd5 File
  • Loading branch information
rbharath authored Nov 2, 2017
2 parents f9d8647 + b74bbad commit c790a3b
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 66 deletions.
49 changes: 33 additions & 16 deletions deepchem/data/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
from __future__ import print_function
from __future__ import division
from __future__ import unicode_literals
import json
import os
import math
import numpy as np
import pandas as pd
import random
from deepchem.utils.save import save_to_disk
from deepchem.utils.save import save_to_disk, save_metadata
from deepchem.utils.save import load_from_disk
from deepchem.utils.save import log
from pandas import read_hdf
import tempfile
import time
import shutil
Expand Down Expand Up @@ -444,11 +446,7 @@ def __init__(self, data_dir, verbose=True):
self.verbose = verbose

log("Loading dataset from disk.", self.verbose)
if os.path.exists(self._get_metadata_filename()):
(self.tasks,
self.metadata_df) = load_from_disk(self._get_metadata_filename())
else:
raise ValueError("No metadata found on disk.")
self.tasks, self.metadata_df = self.load_metadata()

@staticmethod
def create_dataset(shard_generator, data_dir=None, tasks=[], verbose=True):
Expand Down Expand Up @@ -477,20 +475,40 @@ def create_dataset(shard_generator, data_dir=None, tasks=[], verbose=True):
DiskDataset.write_data_to_disk(data_dir, basename, tasks, X, y, w,
ids))
metadata_df = DiskDataset._construct_metadata(metadata_rows)
metadata_filename = os.path.join(data_dir, "metadata.joblib")
save_to_disk((tasks, metadata_df), metadata_filename)
save_metadata(tasks, metadata_df, data_dir)
time2 = time.time()
log("TIMING: dataset construction took %0.3f s" % (time2 - time1), verbose)
return DiskDataset(data_dir, verbose=verbose)

def load_metadata(self):
try:
tasks_filename, metadata_filename = self._get_metadata_filename()
with open(tasks_filename) as fin:
tasks = json.load(fin)
metadata_df = pd.read_csv(metadata_filename, compression='gzip')
metadata_df = metadata_df.where((pd.notnull(metadata_df)), None)
return tasks, metadata_df
except Exception as e:
pass

# Load obsolete format -> save in new format
metadata_filename = os.path.join(self.data_dir, "metadata.joblib")
if os.path.exists(metadata_filename):
tasks, metadata_df = load_from_disk(metadata_filename)
del metadata_df['task_names']
del metadata_df['basename']
save_metadata(tasks, metadata_df, self.data_dir)
return tasks, metadata_df
raise ValueError("No Metadata Found On Disk")

@staticmethod
def _construct_metadata(metadata_entries):
"""Construct a dataframe containing metadata.
metadata_entries should have elements returned by write_data_to_disk
above.
"""
columns = ('basename', 'task_names', 'ids', 'X', 'y', 'w')
columns = ('ids', 'X', 'y', 'w')
metadata_df = pd.DataFrame(metadata_entries, columns=columns)
return metadata_df

Expand Down Expand Up @@ -527,11 +545,11 @@ def write_data_to_disk(data_dir,
out_ids = None

# note that this corresponds to the _construct_metadata column order
return [basename, tasks, out_ids, out_X, out_y, out_w]
return [out_ids, out_X, out_y, out_w]

def save_to_disk(self):
"""Save dataset to disk."""
save_to_disk((self.tasks, self.metadata_df), self._get_metadata_filename())
save_metadata(self.tasks, self.metadata_df, self.data_dir)

def move(self, new_data_dir):
"""Moves dataset to new directory."""
Expand Down Expand Up @@ -603,8 +621,9 @@ def _get_metadata_filename(self):
"""
Get standard location for metadata file.
"""
metadata_filename = os.path.join(self.data_dir, "metadata.joblib")
return metadata_filename
metadata_filename = os.path.join(self.data_dir, "metadata.csv.gzip")
tasks_filename = os.path.join(self.data_dir, "tasks.json")
return tasks_filename, metadata_filename

def get_number_shards(self):
"""
Expand Down Expand Up @@ -945,14 +964,12 @@ def shuffle_each_shard(self):
n_rows = len(self.metadata_df.index)
for i in range(n_rows):
row = self.metadata_df.iloc[i]
basename = row["basename"]
X, y, w, ids = self.get_shard(i)
n = X.shape[0]
permutation = np.random.permutation(n)
X, y, w, ids = (X[permutation], y[permutation], w[permutation],
ids[permutation])
DiskDataset.write_data_to_disk(self.data_dir, basename, tasks, X, y, w,
ids)
DiskDataset.write_data_to_disk(self.data_dir, "", tasks, X, y, w, ids)

def shuffle_shards(self):
"""Shuffles the order of the shards for this dataset."""
Expand Down
97 changes: 50 additions & 47 deletions deepchem/models/tensorgraph/models/test_graph_models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import unittest

import numpy as np

import deepchem
Expand All @@ -7,66 +9,67 @@
from deepchem.molnet.load_function.delaney_datasets import load_delaney


def get_dataset(mode='classification', featurizer='GraphConv'):
data_points = 10
tasks, all_dataset, transformers = load_delaney(featurizer)
train, valid, test = all_dataset

if mode == 'classification':
y = np.random.randint(0, 2, size=(data_points, len(tasks)))
metric = deepchem.metrics.Metric(
deepchem.metrics.roc_auc_score, np.mean, mode="classification")
if mode == 'regression':
y = np.random.normal(size=(data_points, len(tasks)))
metric = deepchem.metrics.Metric(
deepchem.metrics.mean_absolute_error, mode="regression")

ds = NumpyDataset(train.X[:10], y, train.w[:10], train.ids[:10])
class TestGraphModels(unittest.TestCase):

return tasks, ds, transformers, metric
def get_dataset(self, mode='classification', featurizer='GraphConv'):
data_points = 10
tasks, all_dataset, transformers = load_delaney(featurizer)
train, valid, test = all_dataset

if mode == 'classification':
y = np.random.randint(0, 2, size=(data_points, len(tasks)))
metric = deepchem.metrics.Metric(
deepchem.metrics.roc_auc_score, np.mean, mode="classification")
if mode == 'regression':
y = np.random.normal(size=(data_points, len(tasks)))
metric = deepchem.metrics.Metric(
deepchem.metrics.mean_absolute_error, mode="regression")

def test_graph_conv_model():
tasks, dataset, transformers, metric = get_dataset('classification',
'GraphConv')
ds = NumpyDataset(train.X[:10], y, train.w[:10], train.ids[:10])

batch_size = 50
model = GraphConvTensorGraph(
len(tasks), batch_size=batch_size, mode='classification')
return tasks, ds, transformers, metric

model.fit(dataset, nb_epoch=1)
scores = model.evaluate(dataset, [metric], transformers)
def test_graph_conv_model(self):
tasks, dataset, transformers, metric = self.get_dataset(
'classification', 'GraphConv')

model.save()
model = TensorGraph.load_from_dir(model.model_dir)
scores = model.evaluate(dataset, [metric], transformers)
batch_size = 50
model = GraphConvTensorGraph(
len(tasks), batch_size=batch_size, mode='classification')

model.fit(dataset, nb_epoch=1)
scores = model.evaluate(dataset, [metric], transformers)

def test_graph_conv_regression_model():
tasks, dataset, transformers, metric = get_dataset('regression', 'GraphConv')
model.save()
model = TensorGraph.load_from_dir(model.model_dir)
scores = model.evaluate(dataset, [metric], transformers)

batch_size = 50
model = GraphConvTensorGraph(
len(tasks), batch_size=batch_size, mode='regression')
def test_graph_conv_regression_model(self):
tasks, dataset, transformers, metric = self.get_dataset(
'regression', 'GraphConv')

model.fit(dataset, nb_epoch=1)
scores = model.evaluate(dataset, [metric], transformers)
batch_size = 50
model = GraphConvTensorGraph(
len(tasks), batch_size=batch_size, mode='regression')

model.save()
model = TensorGraph.load_from_dir(model.model_dir)
scores = model.evaluate(dataset, [metric], transformers)
model.fit(dataset, nb_epoch=1)
scores = model.evaluate(dataset, [metric], transformers)

model.save()
model = TensorGraph.load_from_dir(model.model_dir)
scores = model.evaluate(dataset, [metric], transformers)

def test_graph_conv_error_bars():
tasks, dataset, transformers, metric = get_dataset('regression', 'GraphConv')
def test_graph_conv_error_bars(self):
tasks, dataset, transformers, metric = self.get_dataset(
'regression', 'GraphConv')

batch_size = 50
model = GraphConvTensorGraph(
len(tasks), batch_size=batch_size, mode='regression')
batch_size = 50
model = GraphConvTensorGraph(
len(tasks), batch_size=batch_size, mode='regression')

model.fit(dataset, nb_epoch=1)
model.fit(dataset, nb_epoch=1)

mu, sigma = model.bayesian_predict(
dataset, transformers, untransform=True, n_passes=24)
assert mu.shape == (len(dataset), len(tasks))
assert sigma.shape == (len(dataset), len(tasks))
mu, sigma = model.bayesian_predict(
dataset, transformers, untransform=True, n_passes=24)
assert mu.shape == (len(dataset), len(tasks))
assert sigma.shape == (len(dataset), len(tasks))
4 changes: 2 additions & 2 deletions deepchem/splits/tests/test_splitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def test_singletask_stratified_column_indices_mask(self):
y[:n_positives] = 1
w = np.ones((n_samples, n_tasks))
# Set half the positives to have zero weight
w[:n_positives / 2] = 0
w[:n_positives // 2] = 0
ids = np.arange(n_samples)

stratified_splitter = dc.splits.RandomStratifiedSplitter()
Expand Down Expand Up @@ -340,7 +340,7 @@ def test_multitask_stratified_column_indices_masked(self):
y = np.random.binomial(1, p, size=(n_samples, n_tasks))
w = np.ones((n_samples, n_tasks))
# Mask half the examples
w[:n_samples / 2] = 0
w[:n_samples // 2] = 0

stratified_splitter = dc.splits.RandomStratifiedSplitter()
split_indices = stratified_splitter.get_task_split_indices(
Expand Down
25 changes: 24 additions & 1 deletion deepchem/utils/save.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import joblib
from sklearn.externals import joblib as old_joblib
import gzip
import json
import pickle
import pandas as pd
import numpy as np
Expand Down Expand Up @@ -103,6 +104,28 @@ def load_csv_files(filenames, shard_size=None, verbose=True):
yield df


def save_metadata(tasks, metadata_df, data_dir):
"""
Saves the metadata for a DiskDataset
Parameters
----------
tasks: list of str
Tasks of DiskDataset
metadata_df: pd.DataFrame
data_dir: str
Directory to store metadata
Returns
-------
"""
if isinstance(tasks, np.ndarray):
tasks = tasks.tolist()
metadata_filename = os.path.join(data_dir, "metadata.csv.gzip")
tasks_filename = os.path.join(data_dir, "tasks.json")
with open(tasks_filename, 'w') as fout:
json.dump(tasks, fout)
metadata_df.to_csv(metadata_filename, index=False, compression='gzip')


def load_from_disk(filename):
"""Load a dataset from file."""
name = filename
Expand Down Expand Up @@ -142,7 +165,7 @@ def load_sharded_csv(filenames):
else:
raise ValueError("Unrecognized filetype for %s" % filename)

#combine dataframes
# combine dataframes
combined_df = dataframes[0]
for i in range(0, len(dataframes) - 1):
combined_df = combined_df.append(dataframes[i + 1])
Expand Down

0 comments on commit c790a3b

Please sign in to comment.