Skip to content

Commit

Permalink
Add KinesisDataset support for tensorflow Dataset (tensorflow#19712)
Browse files Browse the repository at this point in the history
* Add KinesisDataset support for tensorflow Dataset

This fix is an attempt to add Kinesis support
for tensorflow's Dataset. Kinesis is provided by
AWS as a managed data streaming service. It is
similiar to Apache Kafka, often used in places
where maintaining a independent Kafka cluster on AWS
is not desirable or possible.

This fix adds the Kinesis support for tensorflow Dataset.
Similiar to the Kafka integration in tensorflow,
KinesisDataset outputs tf.string for records.

Test cases have also been added, which could be invoked manually.

Signed-off-by: Yong Tang <[email protected]>

* Expose KinesisDataset in dataset_ops.cc

Signed-off-by: Yong Tang <[email protected]>

* Expose KinesisDataset in python wrapper

Signed-off-by: Yong Tang <[email protected]>

* Add test cases for KinesisDataset

Signed-off-by: Yong Tang <[email protected]>

* Update AWS library include files

Signed-off-by: Yong Tang <[email protected]>

* Add Bazel BUILD files

Signed-off-by: Yong Tang <[email protected]>

* Rename s3_crypto to aws_crypto

Signed-off-by: Yong Tang <[email protected]>

* Rename with_s3_support to with_aws_support

Signed-off-by: Yong Tang <[email protected]>

* Selectively add kinesis to tensorflow/contrib/BUILD

Signed-off-by: Yong Tang <[email protected]>

* Set different partition key and pylint fix.

Signed-off-by: Yong Tang <[email protected]>

* Add missing modules in cmake's python_modules.txt

Signed-off-by: Yong Tang <[email protected]>

* Address review feedback

Signed-off-by: Yong Tang <[email protected]>
  • Loading branch information
yongtang authored and mrry committed Jul 2, 2018
1 parent 7e8927e commit a7b7aa8
Show file tree
Hide file tree
Showing 17 changed files with 866 additions and 40 deletions.
6 changes: 3 additions & 3 deletions configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -1449,7 +1449,7 @@ def main():
setup_python(environ_cp)

if is_windows():
environ_cp['TF_NEED_S3'] = '0'
environ_cp['TF_NEED_AWS'] = '0'
environ_cp['TF_NEED_GCP'] = '0'
environ_cp['TF_NEED_HDFS'] = '0'
environ_cp['TF_NEED_JEMALLOC'] = '0'
Expand All @@ -1473,8 +1473,8 @@ def main():
'with_gcp_support', True, 'gcp')
set_build_var(environ_cp, 'TF_NEED_HDFS', 'Hadoop File System',
'with_hdfs_support', True, 'hdfs')
set_build_var(environ_cp, 'TF_NEED_S3', 'Amazon S3 File System',
'with_s3_support', True, 's3')
set_build_var(environ_cp, 'TF_NEED_AWS', 'Amazon AWS Platform',
'with_aws_support', True, 'aws')
set_build_var(environ_cp, 'TF_NEED_KAFKA', 'Apache Kafka Platform',
'with_kafka_support', True, 'kafka')
set_build_var(environ_cp, 'TF_ENABLE_XLA', 'XLA JIT', 'with_xla_support',
Expand Down
16 changes: 8 additions & 8 deletions tensorflow/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ config_setting(
)

config_setting(
name = "with_s3_support",
define_values = {"with_s3_support": "true"},
name = "with_aws_support",
define_values = {"with_aws_support": "true"},
visibility = ["//visibility:public"],
)

Expand All @@ -244,8 +244,8 @@ config_setting(
)

config_setting(
name = "with_s3_support_windows_override",
define_values = {"with_s3_support": "true"},
name = "with_aws_support_windows_override",
define_values = {"with_aws_support": "true"},
values = {"cpu": "x64_windows"},
visibility = ["//visibility:public"],
)
Expand Down Expand Up @@ -279,8 +279,8 @@ config_setting(
)

config_setting(
name = "with_s3_support_android_override",
define_values = {"with_s3_support": "true"},
name = "with_aws_support_android_override",
define_values = {"with_aws_support": "true"},
values = {"crosstool_top": "//external:android/crosstool"},
visibility = ["//visibility:public"],
)
Expand All @@ -300,8 +300,8 @@ config_setting(
)

config_setting(
name = "with_s3_support_ios_override",
define_values = {"with_s3_support": "true"},
name = "with_aws_support_ios_override",
define_values = {"with_aws_support": "true"},
values = {"crosstool_top": "//tools/osx/crosstool:crosstool"},
visibility = ["//visibility:public"],
)
Expand Down
18 changes: 18 additions & 0 deletions tensorflow/contrib/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ py_library(
"//tensorflow/contrib/kafka",
],
"//conditions:default": [],
}) + select({
"//tensorflow:with_aws_support_windows_override": [],
"//tensorflow:with_aws_support": [
"//tensorflow/contrib/kinesis",
],
"//conditions:default": [],
}) + if_not_windows_cuda([
"//tensorflow/contrib/fused_conv:fused_conv_py", # unresolved symbols, need to export more symbols
]) + if_not_windows([
Expand Down Expand Up @@ -157,6 +163,12 @@ cc_library(
"//tensorflow/contrib/kafka:dataset_kernels",
],
"//conditions:default": [],
}) + select({
"//tensorflow:with_aws_support_windows_override": [],
"//tensorflow:with_aws_support": [
"//tensorflow/contrib/kinesis:dataset_kernels",
],
"//conditions:default": [],
}),
)

Expand Down Expand Up @@ -186,5 +198,11 @@ cc_library(
"//tensorflow/contrib/kafka:dataset_ops_op_lib",
],
"//conditions:default": [],
}) + select({
"//tensorflow:with_aws_support_windows_override": [],
"//tensorflow:with_aws_support": [
"//tensorflow/contrib/kinesis:dataset_ops_op_lib",
],
"//conditions:default": [],
}),
)
2 changes: 2 additions & 0 deletions tensorflow/contrib/cmake/python_modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ tensorflow/contrib/keras/api/keras/wrappers/scikit_learn
tensorflow/contrib/kernel_methods
tensorflow/contrib/kernel_methods/python
tensorflow/contrib/kernel_methods/python/mappers
tensorflow/contrib/kinesis/python
tensorflow/contrib/kinesis/python/ops
tensorflow/contrib/kfac
tensorflow/contrib/kfac/examples
tensorflow/contrib/kfac/python
Expand Down
113 changes: 113 additions & 0 deletions tensorflow/contrib/kinesis/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package(default_visibility = ["//tensorflow:internal"])

licenses(["notice"]) # Apache 2.0

exports_files(["LICENSE"])

load(
"//tensorflow:tensorflow.bzl",
"tf_custom_op_library",
"tf_custom_op_py_library",
"tf_gen_op_libs",
"tf_gen_op_wrapper_py",
"tf_kernel_library",
"tf_py_test",
)

py_library(
name = "kinesis",
srcs = ["__init__.py"],
srcs_version = "PY2AND3",
deps = [
":dataset_ops",
],
)

tf_custom_op_library(
name = "_dataset_ops.so",
srcs = ["ops/dataset_ops.cc"],
deps = [":dataset_kernels"],
)

tf_gen_op_libs(
op_lib_names = ["dataset_ops"],
)

cc_library(
name = "dataset_kernels",
srcs = [
"kernels/kinesis_dataset_ops.cc",
],
deps = [
"//tensorflow/core:framework_headers_lib",
"//tensorflow/core/platform/s3:aws_crypto",
"//third_party/eigen3",
"@aws",
"@protobuf_archive//:protobuf_headers",
],
alwayslink = 1,
)

py_library(
name = "dataset_ops",
srcs = [
"python/ops/kinesis_dataset_ops.py",
],
srcs_version = "PY2AND3",
deps = [
":kinesis_op_loader",
"//tensorflow/python:dataset_ops_gen",
"//tensorflow/python:util",
"//tensorflow/python/data/ops:dataset_ops",
"//tensorflow/python/data/util:nest",
],
)

tf_gen_op_wrapper_py(
name = "gen_dataset_ops",
out = "python/ops/gen_dataset_ops.py",
deps = ["//tensorflow/contrib/kinesis:dataset_ops_op_lib"],
)

tf_kernel_library(
name = "dataset_ops_kernels",
deps = [
":dataset_kernels",
"//tensorflow/core:framework",
],
alwayslink = 1,
)

tf_custom_op_py_library(
name = "kinesis_op_loader",
srcs = ["python/ops/kinesis_op_loader.py"],
dso = ["//tensorflow/contrib/kinesis:_dataset_ops.so"],
kernels = [
":dataset_ops_kernels",
"//tensorflow/contrib/kinesis:dataset_ops_op_lib",
],
srcs_version = "PY2AND3",
deps = [
":gen_dataset_ops",
"//tensorflow/contrib/util:util_py",
"//tensorflow/python:platform",
],
)

tf_py_test(
name = "kinesis_test",
srcs = ["python/kernel_tests/kinesis_test.py"],
additional_deps = [
":kinesis",
"//third_party/py/numpy",
"//tensorflow/python:client_testlib",
"//tensorflow/python:framework",
"//tensorflow/python:framework_test_lib",
"//tensorflow/python:platform_test",
],
tags = [
"manual",
"no_windows",
"notap",
],
)
32 changes: 32 additions & 0 deletions tensorflow/contrib/kinesis/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Kinesis Dataset.
@@KinesisDataset
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from tensorflow.contrib.kinesis.python.ops.kinesis_dataset_ops import KinesisDataset

from tensorflow.python.util.all_util import remove_undocumented

_allowed_symbols = [
"KinesisDataset",
]

remove_undocumented(__name__)
Loading

0 comments on commit a7b7aa8

Please sign in to comment.