Skip to content

Commit

Permalink
[FLINK-6878] [runtime] Activate checkstyle for runtime/query
Browse files Browse the repository at this point in the history
This closes apache#4096.
  • Loading branch information
greghogan authored and zentol committed Jul 10, 2017
1 parent f21915b commit d50076f
Show file tree
Hide file tree
Showing 25 changed files with 223 additions and 144 deletions.
1 change: 0 additions & 1 deletion flink-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,6 @@ under the License.
**/runtime/messages/**,
**/runtime/minicluster/**,
**/runtime/operators/**,
**/runtime/query/**,
**/runtime/registration/**,
**/runtime/resourcemanager/**,
**/runtime/rpc/**,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,30 @@

package org.apache.flink.runtime.query;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.Recover;
import akka.pattern.Patterns;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.util.Preconditions;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.Recover;
import akka.pattern.Patterns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;

import java.util.UUID;
import java.util.concurrent.Callable;

import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;

/**
* Akka-based {@link KvStateLocationLookupService} that retrieves the current
* JobManager address and uses it for lookups.
Expand All @@ -48,7 +50,7 @@ class AkkaKvStateLocationLookupService implements KvStateLocationLookupService,

private static final Logger LOG = LoggerFactory.getLogger(KvStateLocationLookupService.class);

/** Future returned when no JobManager is available */
/** Future returned when no JobManager is available. */
private static final Future<ActorGateway> UNKNOWN_JOB_MANAGER = Futures.failed(new UnknownJobManager());

/** Leader retrieval service to retrieve the current job manager. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class KvStateLocation implements Serializable {
private int numRegisteredKeyGroups;

/**
* Creates the location information
* Creates the location information.
*
* @param jobId JobID the KvState instances belong to
* @param jobVertexId JobVertexID the KvState instances belong to
Expand Down Expand Up @@ -220,16 +220,31 @@ public String toString() {

@Override
public boolean equals(Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

KvStateLocation that = (KvStateLocation) o;

if (numKeyGroups != that.numKeyGroups) { return false; }
if (!jobId.equals(that.jobId)) { return false; }
if (!jobVertexId.equals(that.jobVertexId)) { return false; }
if (!registrationName.equals(that.registrationName)) { return false; }
if (!Arrays.equals(kvStateIds, that.kvStateIds)) { return false; }
if (numKeyGroups != that.numKeyGroups) {
return false;
}
if (!jobId.equals(that.jobId)) {
return false;
}
if (!jobVertexId.equals(that.jobVertexId)) {
return false;
}
if (!registrationName.equals(that.registrationName)) {
return false;
}
if (!Arrays.equals(kvStateIds, that.kvStateIds)) {
return false;
}

return Arrays.equals(kvStateAddresses, that.kvStateAddresses);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.query;

import org.apache.flink.api.common.JobID;

import scala.concurrent.Future;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.Preconditions;

import java.io.Serializable;
Expand All @@ -35,6 +35,9 @@ public interface KvStateMessage extends Serializable {
// Lookup
// ------------------------------------------------------------------------

/**
* Actor message for looking up {@link KvStateLocation}.
*/
class LookupKvStateLocation implements KvStateMessage {

private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -88,6 +91,9 @@ public String toString() {
// Registration
// ------------------------------------------------------------------------

/**
* Actor message for notification of {@code KvState} registration.
*/
class NotifyKvStateRegistered implements KvStateMessage {

private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -147,7 +153,7 @@ public JobID getJobId() {
}

/**
* Returns the JobVertexID the KvState instance belongs to
* Returns the JobVertexID the KvState instance belongs to.
*
* @return JobVertexID the KvState instance belongs to
*/
Expand Down Expand Up @@ -204,6 +210,9 @@ public String toString() {
}
}

/**
* Actor message for notification of {@code KvState} unregistration.
*/
class NotifyKvStateUnregistered implements KvStateMessage {

private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -251,7 +260,7 @@ public JobID getJobId() {
}

/**
* Returns the JobVertexID the KvState instance belongs to
* Returns the JobVertexID the KvState instance belongs to.
*
* @return JobVertexID the KvState instance belongs to
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.netty.KvStateServer;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.taskmanager.Task;

import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -60,7 +60,7 @@ public void registerListener(KvStateRegistryListener listener) {
}

/**
* Unregisters the listener with the registry
* Unregisters the listener with the registry.
*/
public void unregisterListener() {
listener.set(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.state.KeyGroupRange;

/**
* A gateway to listen for {@code KvState} registrations.
*/
public interface KvStateRegistryGateway extends RpcGateway {
/**
* Notifies the listener about a registered KvState instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,12 @@ public int getPort() {

@Override
public boolean equals(Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

KvStateServerAddress that = (KvStateServerAddress) o;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@

package org.apache.flink.runtime.query;

import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.Recover;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
Expand Down Expand Up @@ -49,8 +45,19 @@
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceTypeInfo;
import org.apache.flink.util.Preconditions;

import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.Recover;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.ConnectException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import scala.Option;
import scala.Some;
import scala.Tuple2;
Expand All @@ -59,11 +66,6 @@
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

import java.io.IOException;
import java.net.ConnectException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Client for queryable state.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.Preconditions;

import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@

package org.apache.flink.runtime.query.netty;

import org.apache.flink.util.Preconditions;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.stream.ChunkedInput;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.util.Preconditions;

/**
* A {@link ByteBuf} instance to be consumed in chunks by {@link ChunkedWriteHandler},
Expand All @@ -32,16 +33,16 @@
*/
class ChunkedByteBuf implements ChunkedInput<ByteBuf> {

/** The buffer to chunk */
/** The buffer to chunk. */
private final ByteBuf buf;

/** Size of chunks */
/** Size of chunks. */
private final int chunkSize;

/** Closed flag */
/** Closed flag. */
private boolean isClosed;

/** End of input flag */
/** End of input flag. */
private boolean isEndOfInput;

public ChunkedByteBuf(ByteBuf buf, int chunkSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@

package org.apache.flink.runtime.query.netty;

import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.util.Preconditions;

import akka.dispatch.Futures;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.bootstrap.Bootstrap;
Expand All @@ -33,13 +39,6 @@
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.util.Preconditions;
import scala.concurrent.Future;
import scala.concurrent.Promise;

import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
Expand All @@ -51,6 +50,9 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import scala.concurrent.Future;
import scala.concurrent.Promise;

/**
* Netty-based client querying {@link KvStateServer} instances.
*
Expand All @@ -74,7 +76,7 @@ public class KvStateClient {
/** Netty's Bootstrap. */
private final Bootstrap bootstrap;

/** Statistics tracker */
/** Statistics tracker. */
private final KvStateRequestStats stats;

/** Established connections. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@

package org.apache.flink.runtime.query.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import org.apache.flink.runtime.query.netty.message.KvStateRequestFailure;
import org.apache.flink.runtime.query.netty.message.KvStateRequestResult;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.query.netty.message.KvStateRequestType;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Loading

0 comments on commit d50076f

Please sign in to comment.