Skip to content

Commit

Permalink
Merge pull request cdapio#7993 from caskdata/release/4.1
Browse files Browse the repository at this point in the history
Merge release/4.1 to develop
  • Loading branch information
anwar6953 authored Feb 15, 2017
2 parents b7251d8 + 724596f commit d66a4d8
Show file tree
Hide file tree
Showing 641 changed files with 16,597 additions and 12,425 deletions.
32 changes: 17 additions & 15 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.. meta::
:author: Cask Data, Inc.
:copyright: Copyright © 2015 Cask Data, Inc.
:copyright: Copyright © 2015-2017 Cask Data, Inc.

=====================================
Cask Data Application Platform - CDAP
Expand All @@ -25,15 +25,17 @@ essential capabilities:
- Abstraction of data in the Hadoop environment through logical representations of underlying data;
- Portability of applications through decoupling underlying infrastructures;
- Services and tools that enable faster application creation in development;
- Integration of the components of the Hadoop ecosystem into a single platform; and
- Higher degrees of operational control in production through enterprise best practices.
- Integration of the components of the Hadoop ecosystem into a single platform;
- Metadata management that automatically captures metadata and lineage;
- CDAP pipelines with an integrated UI for click-and-drag development; and
- Higher degrees of operational control in production through enterprise best-practices.

CDAP exposes developer APIs (Application Programming Interfaces) for creating applications
and accessing core CDAP services. CDAP defines and implements a diverse collection of
services that land applications and data on existing Hadoop infrastructure such as HBase,
HDFS, YARN, MapReduce, Hive, and Spark.

You can run applications ranging from simple MapReduce Jobs through complete ETL (extract,
You can run applications ranging from simple MapReduce Jobs and complete ETL (extract,
transform, and load) pipelines all the way up to complex, enterprise-scale data-intensive
applications.

Expand All @@ -54,7 +56,7 @@ Prerequisites
To install and use CDAP and its included examples, there are a few simple prerequisites:

1. JDK 7+ (required to run CDAP; note that $JAVA_HOME should be set)
#. `Node.js <https://nodejs.org/>`__ (required to run the CDAP UI; we recommend any version greater than v0.10.0)
#. `Node.js <https://nodejs.org/>`__ (required to run the CDAP UI; we recommend any version greater than v4.5.0)
#. Apache Maven 3.0+ (required to build the example applications; 3.1+ to build CDAP itself)

Build
Expand All @@ -66,13 +68,13 @@ You can get started with CDAP by building directly from the latest source code::
cd cdap
mvn clean package

After the build completes, you will have a distribution of the CDAP standalone under the
``cdap-distribution/target/`` directory.

Take the ``cdap-<version>.tar.gz`` file and unzip it into a suitable location.
After the build completes, you will have built all modules for CDAP.

For more build options, please refer to the `build instructions <BUILD.rst>`__.

.. image:: https://travis-ci.org/caskdata/cdap.svg?branch=release/4.1
:target: https://travis-ci.org/caskdata/cdap


Introductory Tutorial
=====================
Expand All @@ -92,6 +94,7 @@ Now that you've had a look at the CDAP SDK, take a look at:
(demonstrating basic features of the CDAP) are located on-line; and
- Developers' Manual, located in the source distribution in ``cdap-docs/developers-manual/source``
or `online <http://docs.cask.co/cdap/current/en/developers-manual/index.html>`__.
- `CDAP Releases and timeline <http://docs.cask.co/cdap/index.html>`__


How to Contribute
Expand All @@ -106,7 +109,10 @@ For quick guide to getting your system setup to contribute to CDAP, take a look

Filing Issues: Bug Reports & Feature Requests
---------------------------------------------
Bugs and suggestions should be made by `filing an Issue <https://issues.cask.co/browse/cdap>`__.
Bugs and suggestions should be made by `filing an issue <https://issues.cask.co/browse/cdap>`__.

Existing issues can be browsed at `the CDAP project issues
<https://issues.cask.co/browse/CDAP-8373?jql=project%20%3D%20CDAP>`__.

Pull Requests
-------------
Expand Down Expand Up @@ -145,15 +151,11 @@ on the product, and should be used for all our design, architecture and technica
discussions moving forward. This mailing list will also receive all JIRA and GitHub
notifications.

IRC Channel
-----------
CDAP IRC Channel: #cdap on irc.freenode.net


License and Trademarks
======================

Copyright © 2014-2015 Cask Data, Inc.
Copyright © 2014-2017 Cask Data, Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
in compliance with the License. You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import co.cask.cdap.api.data.stream.StreamBatchWriter;
import co.cask.cdap.common.io.ByteBuffers;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.id.StreamId;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;

import java.io.IOException;
Expand All @@ -33,10 +33,10 @@ public class DefaultStreamBatchWriter implements StreamBatchWriter {

private final HttpURLConnection connection;
private final OutputStream outputStream;
private final Id.Stream stream;
private final StreamId stream;
private boolean open;

public DefaultStreamBatchWriter(HttpURLConnection connection, Id.Stream stream) throws IOException {
public DefaultStreamBatchWriter(HttpURLConnection connection, StreamId stream) throws IOException {
this.connection = connection;
this.outputStream = connection.getOutputStream();
this.stream = stream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,66 +19,62 @@
import co.cask.cdap.api.data.stream.StreamBatchWriter;
import co.cask.cdap.api.data.stream.StreamWriter;
import co.cask.cdap.api.stream.StreamEventData;
import co.cask.cdap.common.ServiceUnavailableException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.common.discovery.EndpointStrategy;
import co.cask.cdap.common.discovery.RandomEndpointStrategy;
import co.cask.cdap.common.http.DefaultHttpRequestConfig;
import co.cask.cdap.common.internal.remote.RemoteClient;
import co.cask.cdap.common.service.Retries;
import co.cask.cdap.common.service.RetryStrategy;
import co.cask.cdap.data2.metadata.lineage.AccessType;
import co.cask.cdap.data2.metadata.writer.LineageWriter;
import co.cask.cdap.data2.registry.RuntimeUsageRegistry;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.id.EntityId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.id.StreamId;
import co.cask.cdap.security.spi.authentication.AuthenticationContext;
import co.cask.common.http.HttpMethod;
import co.cask.common.http.HttpRequest;
import co.cask.common.http.HttpRequests;
import co.cask.common.http.HttpResponse;
import com.google.common.base.Charsets;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.net.HttpHeaders;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;

import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

/**
* Default implementation of {@link StreamWriter}
*/
public class DefaultStreamWriter implements StreamWriter {
private final EndpointStrategy endpointStrategy;
private final ConcurrentMap<Id.Stream, Boolean> isStreamRegistered;
private final ConcurrentMap<StreamId, Boolean> isStreamRegistered;
private final RuntimeUsageRegistry runtimeUsageRegistry;

/**
* The namespace that this {@link StreamWriter} belongs to.
*/
private final Id.Namespace namespace;
private final NamespaceId namespace;
/**
* The owners of this {@link StreamWriter}.
*/
private final Iterable<? extends EntityId> owners;
private final Id.Run run;
private final ProgramRunId run;
private final LineageWriter lineageWriter;
private final AuthenticationContext authenticationContext;
private final boolean authorizationEnabled;
private final RetryStrategy retryStrategy;
private final RemoteClient remoteClient;

@Inject
public DefaultStreamWriter(@Assisted("run") Id.Run run,
Expand All @@ -89,50 +85,25 @@ public DefaultStreamWriter(@Assisted("run") Id.Run run,
DiscoveryServiceClient discoveryServiceClient,
AuthenticationContext authenticationContext,
CConfiguration cConf) {
this.run = run;
this.namespace = run.getNamespace();
this.run = run.toEntityId();
this.namespace = run.getNamespace().toEntityId();
this.owners = owners;
this.lineageWriter = lineageWriter;
this.endpointStrategy = new RandomEndpointStrategy(discoveryServiceClient.discover(Constants.Service.STREAMS));
this.isStreamRegistered = Maps.newConcurrentMap();
this.runtimeUsageRegistry = runtimeUsageRegistry;
this.authenticationContext = authenticationContext;
this.authorizationEnabled = cConf.getBoolean(Constants.Security.Authorization.ENABLED);
this.retryStrategy = retryStrategy;
this.remoteClient = new RemoteClient(
discoveryServiceClient, Constants.Service.STREAMS, new DefaultHttpRequestConfig(false),
String.format("%s/namespaces/%s/streams/", Constants.Gateway.API_VERSION_3, namespace.getNamespace()));
}

private URL getStreamURL(String stream) throws IOException {
return getStreamURL(stream, false);
}

private URL getStreamURL(String stream, boolean batch) throws IOException {
Discoverable discoverable = Retries.supplyWithRetries(new Supplier<Discoverable>() {
@Override
public Discoverable get() {
Discoverable discoverable = endpointStrategy.pick();
if (discoverable == null) {
throw new ServiceUnavailableException(Constants.Service.STREAMS);
}
return discoverable;
}
}, retryStrategy);

InetSocketAddress address = discoverable.getSocketAddress();
String scheme = Arrays.equals(Constants.Security.SSL_URI_SCHEME.getBytes(), discoverable.getPayload()) ?
Constants.Security.SSL_URI_SCHEME : Constants.Security.URI_SCHEME;
String path = String.format("%s%s:%d%s/namespaces/%s/streams/%s", scheme, address.getHostName(), address.getPort(),
Constants.Gateway.API_VERSION_3, namespace.getId(), stream);
if (batch) {
path = String.format("%s/batch", path);
}
return new URL(path);
}

private void writeToStream(Id.Stream stream, HttpRequest.Builder builder) throws IOException {
private void writeToStream(StreamId stream, HttpRequest.Builder builder) throws IOException {
if (authorizationEnabled) {
builder.addHeader(Constants.Security.Headers.USER_ID, authenticationContext.getPrincipal().getName());
}
HttpResponse response = HttpRequests.execute(builder.build(), new DefaultHttpRequestConfig(false));
HttpResponse response = remoteClient.execute(builder.build());
int responseCode = response.getResponseCode();
if (responseCode == HttpResponseStatus.NOT_FOUND.getCode()) {
throw new IOException(String.format("Stream %s not found", stream));
Expand All @@ -148,13 +119,18 @@ private void writeToStream(Id.Stream stream, HttpRequest.Builder builder) throws
}
}

private void write(String stream, ByteBuffer data, Map<String, String> headers) throws IOException {
URL streamURL = getStreamURL(stream);
HttpRequest.Builder requestBuilder = HttpRequest.post(streamURL).withBody(data);
for (Map.Entry<String, String> header : headers.entrySet()) {
requestBuilder.addHeader(stream + "." + header.getKey(), header.getValue());
}
writeToStream(Id.Stream.from(namespace, stream), requestBuilder);
private void write(final String stream, final ByteBuffer data, final Map<String, String> headers) throws IOException {
Retries.callWithRetries(new Retries.Callable<Void, IOException>() {
@Override
public Void call() throws IOException {
HttpRequest.Builder requestBuilder = remoteClient.requestBuilder(HttpMethod.POST, stream).withBody(data);
for (Map.Entry<String, String> header : headers.entrySet()) {
requestBuilder.addHeader(stream + "." + header.getKey(), header.getValue());
}
writeToStream(namespace.stream(stream), requestBuilder);
return null;
}
}, retryStrategy);
}

@Override
Expand All @@ -178,16 +154,27 @@ public void write(String stream, StreamEventData data) throws IOException {
}

@Override
public void writeFile(String stream, File file, String contentType) throws IOException {
URL url = getStreamURL(stream, true);
HttpRequest.Builder requestBuilder = HttpRequest.post(url).withBody(file).addHeader(
HttpHeaders.CONTENT_TYPE, contentType);
writeToStream(Id.Stream.from(namespace, stream), requestBuilder);
public void writeFile(final String stream, final File file, final String contentType) throws IOException {
Retries.callWithRetries(new Retries.Callable<Void, IOException>() {
@Override
public Void call() throws IOException {
HttpRequest.Builder requestBuilder = remoteClient.requestBuilder(HttpMethod.POST, stream + "/batch")
.withBody(file)
.addHeader(HttpHeaders.CONTENT_TYPE, contentType);
writeToStream(namespace.stream(stream), requestBuilder);
return null;
}
}, retryStrategy);
}

@Override
public StreamBatchWriter createBatchWriter(String stream, String contentType) throws IOException {
URL url = getStreamURL(stream, true);
public StreamBatchWriter createBatchWriter(final String stream, String contentType) throws IOException {
URL url = Retries.callWithRetries(new Retries.Callable<URL, IOException>() {
@Override
public URL call() throws IOException {
return remoteClient.resolve(stream + "/batch");
}
}, retryStrategy);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod(HttpMethod.POST.name());
connection.setReadTimeout(15000);
Expand All @@ -200,7 +187,7 @@ public StreamBatchWriter createBatchWriter(String stream, String contentType) th
connection.setChunkedStreamingMode(0);
connection.connect();
try {
Id.Stream streamId = Id.Stream.from(namespace, stream);
StreamId streamId = namespace.stream(stream);
registerStream(streamId);
return new DefaultStreamBatchWriter(connection, streamId);
} catch (IOException e) {
Expand All @@ -209,14 +196,14 @@ public StreamBatchWriter createBatchWriter(String stream, String contentType) th
}
}

private void registerStream(Id.Stream stream) {
private void registerStream(StreamId stream) {
// prone to being entered multiple times, but OK since usageRegistry.register is not an expensive operation
if (!isStreamRegistered.containsKey(stream)) {
runtimeUsageRegistry.registerAll(owners, stream.toEntityId());
runtimeUsageRegistry.registerAll(owners, stream);
isStreamRegistered.put(stream, true);
}

// Lineage writer handles duplicate accesses internally
lineageWriter.addAccess(run.toEntityId(), stream.toEntityId(), AccessType.WRITE);
lineageWriter.addAccess(run, stream, AccessType.WRITE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package co.cask.cdap.gateway.handlers;

import co.cask.cdap.common.BadRequestException;
import co.cask.cdap.common.kerberos.ImpersonationInfo;
import co.cask.cdap.security.TokenSecureStoreUpdater;
import co.cask.cdap.security.impersonation.ImpersonationInfo;
import co.cask.cdap.security.impersonation.ImpersonationUtils;
import co.cask.cdap.security.impersonation.UGIProvider;
import co.cask.http.AbstractHttpHandler;
Expand Down
Loading

0 comments on commit d66a4d8

Please sign in to comment.