Skip to content

Commit

Permalink
[FLINK-3779] [runtime] Add KvState network client and server
Browse files Browse the repository at this point in the history
- Adds a Netty-based server and client to query KvState instances, which have
  been published to the KvStateRegistry.
  • Loading branch information
uce committed Aug 9, 2016
1 parent 63c9b8e commit af07eed
Show file tree
Hide file tree
Showing 27 changed files with 4,525 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class NettyBufferPool implements ByteBufAllocator {
* @param numberOfArenas Number of arenas (recommended: 2 * number of task
* slots)
*/
NettyBufferPool(int numberOfArenas) {
public NettyBufferPool(int numberOfArenas) {
checkArgument(numberOfArenas >= 1, "Number of arenas");
this.numberOfArenas = numberOfArenas;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

package org.apache.flink.runtime.jobgraph;

import javax.xml.bind.DatatypeConverter;

import org.apache.flink.util.AbstractID;

import javax.xml.bind.DatatypeConverter;

/**
* A class for statistically unique job vertex IDs.
*/
Expand All @@ -32,15 +32,14 @@ public class JobVertexID extends AbstractID {
public JobVertexID() {
super();
}
public JobVertexID(byte[] bytes) {
super(bytes);
}

public JobVertexID(long lowerPart, long upperPart) {
super(lowerPart, upperPart);
}

public JobVertexID(byte[] bytes) {
super(bytes);
}

public static JobVertexID fromHexString(String hexString) {
return new JobVertexID(DatatypeConverter.parseHexBinary(hexString));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.query;

import org.apache.flink.runtime.state.KvState;
import org.apache.flink.util.AbstractID;

/**
* Identifier for {@link KvState} instances.
*
* <p>Assigned when registering state at the {@link KvStateRegistry}.
*/
public class KvStateID extends AbstractID {

private static final long serialVersionUID = 1L;

public KvStateID() {
super();
}

public KvStateID(long lowerPart, long upperPart) {
super(lowerPart, upperPart);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.query;

import org.apache.flink.runtime.query.netty.KvStateServer;
import org.apache.flink.util.Preconditions;

import java.io.Serializable;
import java.net.InetAddress;

/**
* The (host, port)-address of a {@link KvStateServer}.
*/
public class KvStateServerAddress implements Serializable {

private static final long serialVersionUID = 1L;

/** KvStateServer host address. */
private final InetAddress hostAddress;

/** KvStateServer port. */
private final int port;

/**
* Creates a KvStateServerAddress for the given KvStateServer host address
* and port.
*
* @param hostAddress KvStateServer host address
* @param port KvStateServer port
*/
public KvStateServerAddress(InetAddress hostAddress, int port) {
this.hostAddress = Preconditions.checkNotNull(hostAddress, "Host address");
Preconditions.checkArgument(port > 0 && port <= 65535, "Port " + port + " is out of range 1-65535");
this.port = port;
}

/**
* Returns the host address of the KvStateServer.
*
* @return KvStateServer host address
*/
public InetAddress getHost() {
return hostAddress;
}

/**
* Returns the port of the KvStateServer.
*
* @return KvStateServer port
*/
public int getPort() {
return port;
}

@Override
public boolean equals(Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }

KvStateServerAddress that = (KvStateServerAddress) o;

return port == that.port && hostAddress.equals(that.hostAddress);
}

@Override
public int hashCode() {
int result = hostAddress.hashCode();
result = 31 * result + port;
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.query.netty;

import java.util.concurrent.atomic.AtomicLong;

/**
* Atomic {@link KvStateRequestStats} implementation.
*/
public class AtomicKvStateRequestStats implements KvStateRequestStats {

/**
* Number of active connections.
*/
private final AtomicLong numConnections = new AtomicLong();

/**
* Total number of reported requests.
*/
private final AtomicLong numRequests = new AtomicLong();

/**
* Total number of successful requests (<= reported requests).
*/
private final AtomicLong numSuccessful = new AtomicLong();

/**
* Total duration of all successful requests.
*/
private final AtomicLong successfulDuration = new AtomicLong();

/**
* Total number of failed requests (<= reported requests).
*/
private final AtomicLong numFailed = new AtomicLong();

@Override
public void reportActiveConnection() {
numConnections.incrementAndGet();
}

@Override
public void reportInactiveConnection() {
numConnections.decrementAndGet();
}

@Override
public void reportRequest() {
numRequests.incrementAndGet();
}

@Override
public void reportSuccessfulRequest(long durationTotalMillis) {
numSuccessful.incrementAndGet();
successfulDuration.addAndGet(durationTotalMillis);
}

@Override
public void reportFailedRequest() {
numFailed.incrementAndGet();
}

public long getNumConnections() {
return numConnections.get();
}

public long getNumRequests() {
return numRequests.get();
}

public long getNumSuccessful() {
return numSuccessful.get();
}

public long getNumFailed() {
return numFailed.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.query.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.stream.ChunkedInput;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.util.Preconditions;

/**
* A {@link ByteBuf} instance to be consumed in chunks by {@link ChunkedWriteHandler},
* respecting the high and low watermarks.
*
* @see <a href="http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#10.0">Low/High Watermarks</a>
*/
class ChunkedByteBuf implements ChunkedInput<ByteBuf> {

/** The buffer to chunk */
private final ByteBuf buf;

/** Size of chunks */
private final int chunkSize;

/** Closed flag */
private boolean isClosed;

/** End of input flag */
private boolean isEndOfInput;

public ChunkedByteBuf(ByteBuf buf, int chunkSize) {
this.buf = Preconditions.checkNotNull(buf, "Buffer");
Preconditions.checkArgument(chunkSize > 0, "Non-positive chunk size");
this.chunkSize = chunkSize;
}

@Override
public boolean isEndOfInput() throws Exception {
return isClosed || isEndOfInput;
}

@Override
public void close() throws Exception {
if (!isClosed) {
// If we did not consume the whole buffer yet, we have to release
// it here. Otherwise, it's the responsibility of the consumer.
if (!isEndOfInput) {
buf.release();
}

isClosed = true;
}
}

@Override
public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
if (isClosed) {
return null;
} else if (buf.readableBytes() <= chunkSize) {
isEndOfInput = true;

// Don't retain as the consumer is responsible to release it
return buf.slice();
} else {
// Return a chunk sized slice of the buffer. The ref count is
// shared with the original buffer. That's why we need to retain
// a reference here.
return buf.readSlice(chunkSize).retain();
}
}

@Override
public String toString() {
return "ChunkedByteBuf{" +
"buf=" + buf +
", chunkSize=" + chunkSize +
", isClosed=" + isClosed +
", isEndOfInput=" + isEndOfInput +
'}';
}
}
Loading

0 comments on commit af07eed

Please sign in to comment.