forked from zenml-io/zenml
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsmoke_test.py
89 lines (74 loc) · 2.7 KB
/
smoke_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from zenml.core.datasources.csv_datasource import CSVDatasource
from zenml.core.pipelines.training_pipeline import TrainingPipeline
from zenml.core.repo.repo import Repository
from zenml.core.steps.evaluator.tfma_evaluator import TFMAEvaluator
from zenml.core.steps.preprocesser.standard_preprocesser \
.standard_preprocesser import StandardPreprocesser
from zenml.core.steps.split.random_split import RandomSplit
from zenml.core.steps.trainer.tensorflow_trainers.tf_ff_trainer import \
FeedForwardTrainer
#########################
# CREATE FIRST PIPELINE #
########################
training_pipeline = TrainingPipeline(name='Experiment 1')
# Add a datasource. This will automatically track and version it.
ds = CSVDatasource(name='Pima Diabetes',
path='gs://zenml_quickstart/diabetes.csv')
training_pipeline.add_datasource(ds)
# Add a split
training_pipeline.add_split(RandomSplit(
split_map={'train': 0.7, 'eval': 0.3}))
# Add a preprocessing unit
training_pipeline.add_preprocesser(
StandardPreprocesser(
features=['times_pregnant', 'pgc', 'dbp', 'tst', 'insulin', 'bmi',
'pedigree', 'age'],
labels=['has_diabetes'],
overwrite={'has_diabetes': {
'transform': [{'method': 'no_transform', 'parameters': {}}]}}
))
# Add a trainer
training_pipeline.add_trainer(FeedForwardTrainer(
loss='binary_crossentropy',
last_activation='sigmoid',
output_units=1,
metrics=['accuracy'],
epochs=11))
# Add an evaluator
training_pipeline.add_evaluator(
TFMAEvaluator(slices=[['has_diabetes']],
metrics={'has_diabetes': ['binary_crossentropy',
'binary_accuracy']}))
# Run the pipeline locally
training_pipeline.run()
######################
# DO SOME EVALUATION #
#####################
# Sample data
df = training_pipeline.sample_transformed_data()
print(df.shape)
print(df.describe())
# See schema of data and detect drift
print(training_pipeline.view_schema())
##########################
# CREATE SECOND PIPELINE #
#########################
training_pipeline_2 = training_pipeline.copy('Experiment 2')
training_pipeline_2.add_trainer(FeedForwardTrainer(
loss='binary_crossentropy',
last_activation='sigmoid',
output_units=1,
metrics=['accuracy'],
epochs=15))
training_pipeline_2.run()
############################
# DO SOME REPOSITORY STUFF #
###########################
repo: Repository = Repository.get_instance()
# Check pipeline
pipelines = repo.get_pipelines()
assert len(pipelines) == 2
datasources = repo.get_datasources()
assert len(datasources) == 1
datasource: CSVDatasource = repo.get_datasource_by_name('Pima Diabetes')
print(datasource.view_schema())