Skip to content

Commit

Permalink
feat(components): Keras - Added the Train_classifier component (kubef…
Browse files Browse the repository at this point in the history
…low#4274)

* Components - keras - Added the Train_classifier component

* Added a sample pipeline
  • Loading branch information
Ark-kun authored Aug 11, 2020
1 parent 6617130 commit 5f7afe3
Show file tree
Hide file tree
Showing 3 changed files with 372 additions and 0 deletions.
51 changes: 51 additions & 0 deletions components/keras/Train_classifier/_samples/sample_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import keras
from kfp import components


chicago_taxi_dataset_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/e3337b8bdcd63636934954e592d4b32c95b49129/components/datasets/Chicago%20Taxi/component.yaml')
pandas_transform_csv_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/6162d55998b176b50267d351241100bb0ee715bc/components/pandas/Transform_DataFrame/in_CSV_format/component.yaml')
keras_train_classifier_from_csv_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/f6aabf7f10b1f545f1fd5079aa8071845224f8e7/components/keras/Train_classifier/from_CSV/component.yaml')

number_of_classes = 2

# Creating the network
dense_network_with_sigmoid = keras.Sequential(layers=[
keras.layers.Dense(10, activation=keras.activations.sigmoid),
keras.layers.Dense(number_of_classes, activation=keras.activations.sigmoid),
])


def keras_classifier_pipeline():
training_data_in_csv = chicago_taxi_dataset_op(
where='trip_start_timestamp >= "2019-01-01" AND trip_start_timestamp < "2019-02-01"',
select='tips,trip_seconds,trip_miles,pickup_community_area,dropoff_community_area,fare,tolls,extras,trip_total',
limit=1000,
).output

training_data_for_classification_in_csv = pandas_transform_csv_op(
table=training_data_in_csv,
transform_code='''df.insert(0, "was_tipped", df["tips"] > 0); del df["tips"]; df = df.fillna(0)''',
).output

features_in_csv = pandas_transform_csv_op(
table=training_data_for_classification_in_csv,
transform_code='''df = df.drop(columns=["was_tipped"])''',
).output

labels_in_csv = pandas_transform_csv_op(
table=training_data_for_classification_in_csv,
transform_code='''df = df["was_tipped"] * 1''',
).output

keras_train_classifier_from_csv_op(
training_features=features_in_csv,
training_labels=labels_in_csv,
network_json=dense_network_with_sigmoid.to_json(),
learning_rate=0.1,
num_epochs=100,
)


if __name__ == '__main__':
kfp_endpoint = None
kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(keras_classifier_pipeline, arguments={})
90 changes: 90 additions & 0 deletions components/keras/Train_classifier/from_CSV/component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from typing import NamedTuple
from kfp.components import create_component_from_func, InputPath, OutputPath

def keras_train_classifier_from_csv(
training_features_path: InputPath('CSV'),
training_labels_path: InputPath('CSV'),
network_json_path: InputPath('KerasModelJson'),
model_path: OutputPath('KerasModelHdf5'),
loss_name: str = 'categorical_crossentropy',
num_classes: int = None,
optimizer: str = 'rmsprop',
optimizer_config: dict = None,
learning_rate: float = 0.01,
num_epochs: int = 100,
batch_size: int = 32,
metrics: list = ['accuracy'],
random_seed: int = 0,
) -> NamedTuple('Outputs', [
('final_loss', float),
('final_metrics', dict),
('metrics_history', dict),
]):
'''Trains classifier model using Keras.
Annotations:
author: Alexey Volkov <[email protected]>
'''
from pathlib import Path

import keras
import numpy
import pandas
import tensorflow

tensorflow.random.set_seed(random_seed)
numpy.random.seed(random_seed)

training_features_df = pandas.read_csv(training_features_path)
training_labels_df = pandas.read_csv(training_labels_path)

x_train = training_features_df.to_numpy()
y_train_labels = training_labels_df.to_numpy()
print('Training features shape:', x_train.shape)
print('Numer of training samples:', x_train.shape[0])

# Convert class vectors to binary class matrices.
y_train_one_hot = keras.utils.to_categorical(y_train_labels, num_classes)

model_json_str = Path(network_json_path).read_text()
model = keras.models.model_from_json(model_json_str)

model.add(keras.layers.Activation('softmax'))

# Initializing the optimizer
optimizer_config = optimizer_config or {}
optimizer_config['learning_rate'] = learning_rate
optimizer = keras.optimizers.deserialize({
'class_name': optimizer,
'config': optimizer_config,
})

model.compile(
loss=loss_name,
optimizer=optimizer,
metrics=metrics,
)

history = model.fit(
x_train,
y_train_one_hot,
batch_size=batch_size,
epochs=num_epochs,
shuffle=True
)

model.save(model_path)

metrics_history = {name: [float(value) for value in values] for name, values in history.history.items()}
final_metrics = {name: values[-1] for name, values in metrics_history.items()}
final_loss = final_metrics['loss']
return (final_loss, final_metrics, metrics_history)


if __name__ == '__main__':
keras_train_classifier_from_csv_op = create_component_from_func(
keras_train_classifier_from_csv,
base_image='tensorflow/tensorflow:2.2.0',
packages_to_install=['keras==2.3.1', 'pandas==1.0.5'],
output_component_file='component.yaml',
)
231 changes: 231 additions & 0 deletions components/keras/Train_classifier/from_CSV/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
name: Keras train classifier from csv
description: |-
Trains classifier model using Keras.
Annotations:
author: Alexey Volkov <[email protected]>
inputs:
- {name: training_features, type: CSV}
- {name: training_labels, type: CSV}
- {name: network_json, type: KerasModelJson}
- {name: loss_name, type: String, default: categorical_crossentropy, optional: true}
- {name: num_classes, type: Integer, optional: true}
- {name: optimizer, type: String, default: rmsprop, optional: true}
- {name: optimizer_config, type: JsonObject, optional: true}
- {name: learning_rate, type: Float, default: '0.01', optional: true}
- {name: num_epochs, type: Integer, default: '100', optional: true}
- {name: batch_size, type: Integer, default: '32', optional: true}
- {name: metrics, type: JsonArray, default: '["accuracy"]', optional: true}
- {name: random_seed, type: Integer, default: '0', optional: true}
outputs:
- {name: model, type: KerasModelHdf5}
- {name: final_loss, type: Float}
- {name: final_metrics, type: JsonObject}
- {name: metrics_history, type: JsonObject}
implementation:
container:
image: tensorflow/tensorflow:2.2.0
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'keras==2.3.1' 'pandas==1.0.5' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m
pip install --quiet --no-warn-script-location 'keras==2.3.1' 'pandas==1.0.5'
--user) && "$0" "$@"
- python3
- -u
- -c
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def keras_train_classifier_from_csv(
training_features_path,
training_labels_path,
network_json_path,
model_path,
loss_name = 'categorical_crossentropy',
num_classes = None,
optimizer = 'rmsprop',
optimizer_config = None,
learning_rate = 0.01,
num_epochs = 100,
batch_size = 32,
metrics = ['accuracy'],
random_seed = 0,
):
'''Trains classifier model using Keras.
Annotations:
author: Alexey Volkov <[email protected]>
'''
from pathlib import Path
import keras
import numpy
import pandas
import tensorflow
tensorflow.random.set_seed(random_seed)
numpy.random.seed(random_seed)
training_features_df = pandas.read_csv(training_features_path)
training_labels_df = pandas.read_csv(training_labels_path)
x_train = training_features_df.to_numpy()
y_train_labels = training_labels_df.to_numpy()
print('Training features shape:', x_train.shape)
print('Numer of training samples:', x_train.shape[0])
# Convert class vectors to binary class matrices.
y_train_one_hot = keras.utils.to_categorical(y_train_labels, num_classes)
model_json_str = Path(network_json_path).read_text()
model = keras.models.model_from_json(model_json_str)
model.add(keras.layers.Activation('softmax'))
# Initializing the optimizer
optimizer_config = optimizer_config or {}
optimizer_config['learning_rate'] = learning_rate
optimizer = keras.optimizers.deserialize({
'class_name': optimizer,
'config': optimizer_config,
})
model.compile(
loss=loss_name,
optimizer=optimizer,
metrics=metrics,
)
history = model.fit(
x_train,
y_train_one_hot,
batch_size=batch_size,
epochs=num_epochs,
shuffle=True
)
model.save(model_path)
metrics_history = {name: [float(value) for value in values] for name, values in history.history.items()}
final_metrics = {name: values[-1] for name, values in metrics_history.items()}
final_loss = final_metrics['loss']
return (final_loss, final_metrics, metrics_history)
import json
def _serialize_float(float_value: float) -> str:
if isinstance(float_value, str):
return float_value
if not isinstance(float_value, (float, int)):
raise TypeError('Value "{}" has type "{}" instead of float.'.format(str(float_value), str(type(float_value))))
return str(float_value)
def _serialize_json(obj) -> str:
if isinstance(obj, str):
return obj
import json
def default_serializer(obj):
if hasattr(obj, 'to_struct'):
return obj.to_struct()
else:
raise TypeError("Object of type '%s' is not JSON serializable and does not have .to_struct() method." % obj.__class__.__name__)
return json.dumps(obj, default=default_serializer, sort_keys=True)
import argparse
_parser = argparse.ArgumentParser(prog='Keras train classifier from csv', description='Trains classifier model using Keras.\n\n Annotations:\n author: Alexey Volkov <[email protected]>')
_parser.add_argument("--training-features", dest="training_features_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--training-labels", dest="training_labels_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--network-json", dest="network_json_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--loss-name", dest="loss_name", type=str, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--num-classes", dest="num_classes", type=int, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--optimizer", dest="optimizer", type=str, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--optimizer-config", dest="optimizer_config", type=json.loads, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--learning-rate", dest="learning_rate", type=float, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--num-epochs", dest="num_epochs", type=int, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--batch-size", dest="batch_size", type=int, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--metrics", dest="metrics", type=json.loads, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--random-seed", dest="random_seed", type=int, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--model", dest="model_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=3)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = keras_train_classifier_from_csv(**_parsed_args)
_output_serializers = [
_serialize_float,
_serialize_json,
_serialize_json,
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
args:
- --training-features
- {inputPath: training_features}
- --training-labels
- {inputPath: training_labels}
- --network-json
- {inputPath: network_json}
- if:
cond: {isPresent: loss_name}
then:
- --loss-name
- {inputValue: loss_name}
- if:
cond: {isPresent: num_classes}
then:
- --num-classes
- {inputValue: num_classes}
- if:
cond: {isPresent: optimizer}
then:
- --optimizer
- {inputValue: optimizer}
- if:
cond: {isPresent: optimizer_config}
then:
- --optimizer-config
- {inputValue: optimizer_config}
- if:
cond: {isPresent: learning_rate}
then:
- --learning-rate
- {inputValue: learning_rate}
- if:
cond: {isPresent: num_epochs}
then:
- --num-epochs
- {inputValue: num_epochs}
- if:
cond: {isPresent: batch_size}
then:
- --batch-size
- {inputValue: batch_size}
- if:
cond: {isPresent: metrics}
then:
- --metrics
- {inputValue: metrics}
- if:
cond: {isPresent: random_seed}
then:
- --random-seed
- {inputValue: random_seed}
- --model
- {outputPath: model}
- '----output-paths'
- {outputPath: final_loss}
- {outputPath: final_metrics}
- {outputPath: metrics_history}

0 comments on commit 5f7afe3

Please sign in to comment.