Skip to content

Commit

Permalink
[SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
Browse files Browse the repository at this point in the history
- Move external/java8-tests tests into core, streaming, sql and remove
- Remove MaxPermGen and related options
- Fix some reflection / TODOs around Java 8+ methods
- Update doc references to 1.7/1.8 differences
- Remove Java 7/8 related build profiles
- Update some plugins for better Java 8 compatibility
- Fix a few Java-related warnings

For the future:

- Update Java 8 examples to fully use Java 8
- Update Java tests to use lambdas for simplicity
- Update Java internal implementations to use lambdas

## How was this patch tested?

Existing tests

Author: Sean Owen <[email protected]>

Closes apache#16871 from srowen/SPARK-19493.
  • Loading branch information
srowen committed Feb 16, 2017
1 parent 3871d94 commit 0e24054
Show file tree
Hide file tree
Showing 101 changed files with 513 additions and 1,186 deletions.
1 change: 1 addition & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>dist</id>
Expand Down
8 changes: 2 additions & 6 deletions build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ _DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
# Preserve the calling directory
_CALLING_DIR="$(pwd)"
# Options used during compilation
_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
_COMPILE_JVM_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"

# Installs any application tarball given a URL, the expected tarball name,
# and, optionally, a checkable binary path to determine if the binary has
Expand Down Expand Up @@ -141,13 +141,9 @@ cd "${_CALLING_DIR}"
# Now that zinc is ensured to be installed, check its status and, if its
# not running or just installed, start it
if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port ${ZINC_PORT}`" ]; then
ZINC_JAVA_HOME=
if [ -n "$JAVA_7_HOME" ]; then
ZINC_JAVA_HOME="env JAVA_HOME=$JAVA_7_HOME"
fi
export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"}
"${ZINC_BIN}" -shutdown -port ${ZINC_PORT}
$ZINC_JAVA_HOME "${ZINC_BIN}" -start -port ${ZINC_PORT} \
"${ZINC_BIN}" -start -port ${ZINC_PORT} \
-scala-compiler "${SCALA_COMPILER}" \
-scala-library "${SCALA_LIBRARY}" &>/dev/null
fi
Expand Down
2 changes: 1 addition & 1 deletion build/sbt-launch-lib.bash
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ get_mem_opts () {
(( $perm < 4096 )) || perm=4096
local codecache=$(( $perm / 2 ))

echo "-Xms${mem}m -Xmx${mem}m -XX:MaxPermSize=${perm}m -XX:ReservedCodeCacheSize=${codecache}m"
echo "-Xms${mem}m -Xmx${mem}m -XX:ReservedCodeCacheSize=${codecache}m"
}

require_arg () {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.SettableFuture;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -133,40 +131,36 @@ public void setClientId(String id) {
*/
public void fetchChunk(
long streamId,
final int chunkIndex,
final ChunkReceivedCallback callback) {
final long startTime = System.currentTimeMillis();
int chunkIndex,
ChunkReceivedCallback callback) {
long startTime = System.currentTimeMillis();
if (logger.isDebugEnabled()) {
logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
}

final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
handler.addFetchRequest(streamChunkId, callback);

channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
logger.trace("Sending request {} to {} took {} ms", streamChunkId,
getRemoteAddress(channel), timeTaken);
}
} else {
String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
handler.removeFetchRequest(streamChunkId);
channel.close();
try {
callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
} catch (Exception e) {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(future -> {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
logger.trace("Sending request {} to {} took {} ms", streamChunkId,
getRemoteAddress(channel), timeTaken);
}
});
} else {
String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
handler.removeFetchRequest(streamChunkId);
channel.close();
try {
callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
} catch (Exception e) {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
});
}

/**
Expand All @@ -175,8 +169,8 @@ public void operationComplete(ChannelFuture future) throws Exception {
* @param streamId The stream to fetch.
* @param callback Object to call with the stream data.
*/
public void stream(final String streamId, final StreamCallback callback) {
final long startTime = System.currentTimeMillis();
public void stream(String streamId, StreamCallback callback) {
long startTime = System.currentTimeMillis();
if (logger.isDebugEnabled()) {
logger.debug("Sending stream request for {} to {}", streamId, getRemoteAddress(channel));
}
Expand All @@ -186,29 +180,25 @@ public void stream(final String streamId, final StreamCallback callback) {
// when responses arrive.
synchronized (this) {
handler.addStreamCallback(callback);
channel.writeAndFlush(new StreamRequest(streamId)).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
logger.trace("Sending request for {} to {} took {} ms", streamId,
getRemoteAddress(channel), timeTaken);
}
} else {
String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
channel.close();
try {
callback.onFailure(streamId, new IOException(errorMsg, future.cause()));
} catch (Exception e) {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
channel.writeAndFlush(new StreamRequest(streamId)).addListener(future -> {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
logger.trace("Sending request for {} to {} took {} ms", streamId,
getRemoteAddress(channel), timeTaken);
}
});
} else {
String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
channel.close();
try {
callback.onFailure(streamId, new IOException(errorMsg, future.cause()));
} catch (Exception e) {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
});
}
}

Expand All @@ -220,19 +210,17 @@ public void operationComplete(ChannelFuture future) throws Exception {
* @param callback Callback to handle the RPC's reply.
* @return The RPC's id.
*/
public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
final long startTime = System.currentTimeMillis();
public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
long startTime = System.currentTimeMillis();
if (logger.isTraceEnabled()) {
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
}

final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
handler.addRpcRequest(requestId, callback);

channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message)))
.addListener(future -> {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
Expand All @@ -251,8 +239,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
}
});
});

return requestId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;
import java.security.Key;
import javax.crypto.KeyGenerator;
import javax.crypto.Mac;
import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand All @@ -37,7 +32,6 @@
import org.apache.spark.network.client.TransportClientBootstrap;
import org.apache.spark.network.sasl.SaslClientBootstrap;
import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;

/**
Expand Down Expand Up @@ -103,20 +97,18 @@ public void doBootstrap(TransportClient client, Channel channel) {
private void doSparkAuth(TransportClient client, Channel channel)
throws GeneralSecurityException, IOException {

AuthEngine engine = new AuthEngine(authUser, secretKeyHolder.getSecretKey(authUser), conf);
try {
String secretKey = secretKeyHolder.getSecretKey(authUser);
try (AuthEngine engine = new AuthEngine(authUser, secretKey, conf)) {
ClientChallenge challenge = engine.challenge();
ByteBuf challengeData = Unpooled.buffer(challenge.encodedLength());
challenge.encode(challengeData);

ByteBuffer responseData = client.sendRpcSync(challengeData.nioBuffer(),
conf.authRTTimeoutMs());
ByteBuffer responseData =
client.sendRpcSync(challengeData.nioBuffer(), conf.authRTTimeoutMs());
ServerResponse response = ServerResponse.decodeMessage(responseData);

engine.validate(response);
engine.sessionCipher().addToChannel(channel);
} finally {
engine.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.spark.network.crypto;

import java.io.IOException;
import java.nio.ByteBuffer;
import javax.security.sasl.Sasl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
Expand All @@ -35,7 +33,6 @@
import org.apache.spark.network.sasl.SaslRpcHandler;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@

import com.google.common.base.Throwables;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -189,21 +187,16 @@ private void processOneWayMessage(OneWayMessage req) {
* Responds to a single message with some Encodable object. If a failure occurs while sending,
* it will be logged and the channel closed.
*/
private void respond(final Encodable result) {
final SocketAddress remoteAddress = channel.remoteAddress();
channel.writeAndFlush(result).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
logger.trace("Sent result {} to client {}", result, remoteAddress);
} else {
logger.error(String.format("Error sending result %s to %s; closing connection",
result, remoteAddress), future.cause());
channel.close();
}
}
private void respond(Encodable result) {
SocketAddress remoteAddress = channel.remoteAddress();
channel.writeAndFlush(result).addListener(future -> {
if (future.isSuccess()) {
logger.trace("Sent result {} to client {}", result, remoteAddress);
} else {
logger.error(String.format("Error sending result %s to %s; closing connection",
result, remoteAddress), future.cause());
channel.close();
}
);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
package org.apache.spark.network.crypto;

import java.util.Arrays;
import java.util.Map;
import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.common.collect.ImmutableMap;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,8 @@ private ShuffleMetrics() {
allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis);
allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis);
allMetrics.put("blockTransferRateBytes", blockTransferRateBytes);
allMetrics.put("registeredExecutorsSize", new Gauge<Integer>() {
@Override
public Integer getValue() {
return blockManager.getRegisteredExecutorsSize();
}
});
allMetrics.put("registeredExecutorsSize",
(Gauge<Integer>) () -> blockManager.getRegisteredExecutorsSize());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,7 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);

// Execute the actual deletion in a different thread, as it may take some time.
directoryCleaner.execute(new Runnable() {
@Override
public void run() {
deleteExecutorDirs(executor.localDirs);
}
});
directoryCleaner.execute(() -> deleteExecutorDirs(executor.localDirs));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,19 @@ public void init(String appId) {

@Override
public void fetchBlocks(
final String host,
final int port,
final String execId,
String host,
int port,
String execId,
String[] blockIds,
BlockFetchingListener listener) {
checkInit();
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
try {
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
new RetryingBlockFetcher.BlockFetchStarter() {
@Override
public void createAndStart(String[] blockIds, BlockFetchingListener listener)
throws IOException, InterruptedException {
(blockIds1, listener1) -> {
TransportClient client = clientFactory.createClient(host, port);
new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start();
}
};
new OneForOneBlockFetcher(client, appId, execId, blockIds1, listener1).start();
};

int maxRetries = conf.maxIORetries();
if (maxRetries > 0) {
Expand Down Expand Up @@ -131,12 +127,9 @@ public void registerWithShuffleServer(
String execId,
ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException {
checkInit();
TransportClient client = clientFactory.createUnmanagedClient(host, port);
try {
try (TransportClient client = clientFactory.createUnmanagedClient(host, port)) {
ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer();
client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
} finally {
client.close();
}
}

Expand Down
Loading

0 comments on commit 0e24054

Please sign in to comment.