Skip to content

Commit

Permalink
initial
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Gregory committed Feb 7, 2020
0 parents commit 92b3987
Show file tree
Hide file tree
Showing 51 changed files with 184,551 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.*
R
!.gitignore
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Cloudera Machine Learning Demonstration

Setup:
Admin->Engine
Engine Profile: 2 CPU / 8GB Memory (Add)


Start a Python3 Session (at least 8gb memory) and run utils/setup.py
This will install all requirements for the code below.


Alternatively you can create a custom docker engine using the utils/Dockerfile.
For the CML trials contact Cloudera for a link to the engine image.
Create a new engine as a CML admin with the following editors

-RStudio /usr/sbin/rstudio-server start

-Jupyter Notebook /usr/local/bin/jupyter-notebook --no-browser --ip=127.0.0.1 --port=${CDSW_APP_PORT} --NotebookApp.token= --NotebookApp.allow_remote_access=True --log-level=ERROR


Demos
0. Setup CDP Environment and CML Workspace
1. Batch and online scoring of images with TensorFlow on spark
2. Experiment Tracking and NLP model training for sentiment analysis of Twitter feeds
3. Rstudio analysis of airline data from CDW or HMS/external tables and iDbroker
127 changes: 127 additions & 0 deletions demo0/HMS-test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# # Spark-SQL from PySpark
#
# This example shows how to send SQL queries to Spark.


#NOTE: In CDP find the HMS warehouse directory and external table directory by browsing to:
# Environment -> <env name> -> Data Lake Cluster -> Cloud Storage

#Data taken from http://stat-computing.org/dataexpo/2009/the-data.html
#!for i in `seq 1987 2008`; do wget http://stat-computing.org/dataexpo/2009/$i.csv.bz2; bunzip2 $i.csv.bz2; sed -i '1d' $i.csv; aws s3 cp $i.csv s3://ml-field/demo/flight-analysis/data/flights_csv/; rm $i.csv; done


from __future__ import print_function
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType

spark = SparkSession\
.builder\
.appName("PythonSQL")\
.config("spark.executor.memory", "4g")\
.config("spark.executor.instances", 2)\
.config("spark.driver.maxResultSize","4g")\
.config("fs.s3a.metadatastore.impl","org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore")\
.getOrCreate()

# .config("spark.hadoop.fs.s3a.s3guard.ddb.region", "us-west-2")\
# .config("spark.yarn.access.hadoopFileSystems","s3a://ml-field/demo/flight-analysis/data/")\

spark.sql("SHOW databases").show()
spark.sql("USE default")
spark.sql("SHOW tables").show()

#spark.sql("SELECT COUNT(*) FROM `default`.`flights`").show()
spark.sql("SELECT * FROM `default`.`flights` LIMIT 10").take(5)
spark.sql("SELECT DepDelay FROM `default`.`flights` WHERE DepDelay > 0.0").take(5)

#spark.sql("SELECT COUNT(*) FROM `default`.`airports`").show()
spark.sql("SELECT * FROM `default`.`airports` LIMIT 10").show()









#spark.sql("DROP TABLE IF EXISTS flights").show()
statement = '''
CREATE EXTERNAL TABLE IF NOT EXISTS `default`.`flights` (
`Year` int ,
`Month` int ,
`DayofMonth` int ,
`DayOfWeek` int ,
`DepTime` int ,
`CRSDepTime` int ,
`ArrTime` int ,
`CRSArrTime` int ,
`UniqueCarrier` string ,
`FlightNum` int ,
`TailNum` string ,
`ActualElapsedTime` int ,
`CRSElapsedTime` int ,
`AirTime` int ,
`ArrDelay` int ,
`DepDelay` int ,
`Origin` string ,
`Dest` string ,
`Distance` int ,
`TaxiIn` int ,
`TaxiOut` int ,
`Cancelled` int ,
`CancellationCode` string ,
`Diverted` string ,
`CarrierDelay` int ,
`WeatherDelay` int ,
`NASDelay` int ,
`SecurityDelay` int ,
`LateAircraftDelay` int )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TextFile
LOCATION 's3a://ml-field/demo/flight-analysis/data/flights_csv/'
'''
spark.sql(statement)

#spark.sql("DROP TABLE IF EXISTS airports").show()
statement = '''
CREATE EXTERNAL TABLE IF NOT EXISTS `default`.`airports` (
`iata` string ,
`airport` string ,
`city` string ,
`state` string ,
`country` string ,
`lat` double ,
`long` double )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TextFile
LOCATION 's3a://ml-field/demo/flight-analysis/data/airports_csv/'
'''
spark.sql(statement)


#spark.sql("DROP TABLE IF EXISTS airports_extended").show()
statement = '''
CREATE EXTERNAL TABLE IF NOT EXISTS `default`.`airports_extended` (
`ident` string ,
`type` string ,
`name` string ,
`elevation_ft` string ,
`continent` string ,
`iso_country` string ,
`iso_region` string ,
`municipality` string ,
`gps_code` string ,
`iata_code` string ,
`local_code` string ,
`coordinates` string )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TextFile
LOCATION 's3a://ml-field/demo/flight-analysis/data/airports-extended/'
'''
spark.sql(statement)


#spark.stop()
168 changes: 168 additions & 0 deletions demo1/TF_image_online_score.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# Tensorflow on Spark
#
# Before running:
# pip install -r /home/cdsw/utils/requirements3.txt
# close session, start a new session in the same project

MODEL_URL = 'https://ml-field.s3-us-west-2.amazonaws.com/demo/tensorflow/model/inception-2015-12-05.tgz'
MODEL_DIR = '/tmp/imagenet'
IMAGE_DIR = 's3://ml-field/demo/tensorflow/data'

num_top_predictions = 1

import numpy as np
import random
import tensorflow as tf
import os
from tensorflow.python.platform import gfile
import os.path
import re
import sys
import tarfile
from subprocess import Popen, PIPE, STDOUT
from six.moves import urllib

dest_directory = MODEL_DIR
if not os.path.exists(dest_directory):
os.makedirs(dest_directory)
filename = MODEL_URL.split('/')[-1]
filepath = os.path.join(dest_directory, filename)
if not os.path.exists(filepath):
filepath2, _ = urllib.request.urlretrieve(MODEL_URL, filepath)
statinfo = os.stat(filepath)
tarfile.open(filepath, 'r:gz').extractall(dest_directory)


model_path = os.path.join(MODEL_DIR, 'classify_image_graph_def.pb')
with tf.gfile.GFile(model_path, 'rb') as f:
model_data = f.read()

class NodeLookup(object):
"""Converts integer node IDs to human readable labels."""

def __init__(self,
label_lookup_path=None,
uid_lookup_path=None):
if not label_lookup_path:
label_lookup_path = os.path.join(
MODEL_DIR, 'imagenet_2012_challenge_label_map_proto.pbtxt')
if not uid_lookup_path:
uid_lookup_path = os.path.join(
MODEL_DIR, 'imagenet_synset_to_human_label_map.txt')
self.node_lookup = self.load(label_lookup_path, uid_lookup_path)

def load(self, label_lookup_path, uid_lookup_path):
"""Loads a human readable English name for each softmax node.
Args:
label_lookup_path: string UID to integer node ID.
uid_lookup_path: string UID to human-readable string.
Returns:
dict from integer node ID to human-readable string.
"""
if not gfile.Exists(uid_lookup_path):
tf.logging.fatal('File does not exist %s', uid_lookup_path)
if not gfile.Exists(label_lookup_path):
tf.logging.fatal('File does not exist %s', label_lookup_path)

# Loads mapping from string UID to human-readable string
proto_as_ascii_lines = gfile.GFile(uid_lookup_path).readlines()
uid_to_human = {}
p = re.compile(r'[n\d]*[ \S,]*')
for line in proto_as_ascii_lines:
parsed_items = p.findall(line)
uid = parsed_items[0]
human_string = parsed_items[2]
uid_to_human[uid] = human_string

# Loads mapping from string UID to integer node ID.
node_id_to_uid = {}
proto_as_ascii = gfile.GFile(label_lookup_path).readlines()
for line in proto_as_ascii:
if line.startswith(' target_class:'):
target_class = int(line.split(': ')[1])
if line.startswith(' target_class_string:'):
target_class_string = line.split(': ')[1]
node_id_to_uid[target_class] = target_class_string[1:-2]

# Loads the final mapping of integer node ID to human-readable string
node_id_to_name = {}
for key, val in node_id_to_uid.items():
if val not in uid_to_human:
tf.logging.fatal('Failed to locate: %s', val)
name = uid_to_human[val]
node_id_to_name[key] = name

return node_id_to_name

def id_to_string(self, node_id):
if node_id not in self.node_lookup:
return ''
return self.node_lookup[node_id]

node_lookup = NodeLookup().node_lookup

def run(cmd):
p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
return p.stdout.read()

def run_inference_on_image(sess, img_id, img_url, node_lookup):
"""Download an image, and run inference on it.
Args:
image: Image file URL
Returns:
(image ID, image URL, scores),
where scores is a list of (human-readable node names, score) pairs
"""
from six.moves import urllib
#from urllib2 import HTTPError
try:
image_data = urllib.request.urlopen(img_url, timeout=1.0).read()
except:
return (img_id, img_url, None)
# Some useful tensors:
# 'softmax:0': A tensor containing the normalized prediction across
# 1000 labels.
# 'pool_3:0': A tensor containing the next-to-last layer containing 2048
# float description of the image.
# 'DecodeJpeg/contents:0': A tensor containing a string providing JPEG
# encoding of the image.
# Runs the softmax tensor by feeding the image_data as input to the graph.
softmax_tensor = sess.graph.get_tensor_by_name('softmax:0')
try:
predictions = sess.run(softmax_tensor,
{'DecodeJpeg/contents:0': image_data})
except:
# Handle problems with malformed JPEG files
return (img_id, img_url, None)
predictions = np.squeeze(predictions)
top_k = predictions.argsort()[-num_top_predictions:][::-1]
scores = []
for node_id in top_k:
if node_id not in node_lookup:
human_string = ''
else:
human_string = node_lookup[node_id]
score = predictions[node_id]
scores.append((human_string, score))
return (img_id, img_url, scores)

def apply_inference_online(args):
img_url = args.get('img_url')
with tf.Graph().as_default() as g:
graph_def = tf.GraphDef()
graph_def.ParseFromString(model_data)
tf.import_graph_def(graph_def, name='')
with tf.Session() as sess:
label = run_inference_on_image(sess, "", img_url, node_lookup)
return label[2]


# test
# x={"img_url" : "https://ml-field.s3-us-west-2.amazonaws.com/demo/tensorflow/data/IMG_3587.JPG"}
# apply_inference_online(x)


Loading

0 comments on commit 92b3987

Please sign in to comment.