Skip to content

Commit

Permalink
[FLINK-9486][state] Introduce InternalPriorityQueue as state in keyed…
Browse files Browse the repository at this point in the history
… state backends

This commit does not include the integration with checkpointing.

This closes apache#6276.
  • Loading branch information
StefanRRichter committed Jul 9, 2018
1 parent b12acea commit 79b38f8
Show file tree
Hide file tree
Showing 39 changed files with 1,234 additions and 282 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
Expand Down Expand Up @@ -74,9 +75,12 @@ public void testListSerialization() throws Exception {
columnFamilyOptions,
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
1, new KeyGroupRange(0, 0),
new ExecutionConfig(), false,
TestLocalRecoveryConfig.disabled()
1,
new KeyGroupRange(0, 0),
new ExecutionConfig(),
false,
TestLocalRecoveryConfig.disabled(),
RocksDBStateBackend.PriorityQueueStateType.HEAP
);
longHeapKeyedStateBackend.restore(null);
longHeapKeyedStateBackend.setCurrentKey(key);
Expand Down Expand Up @@ -112,10 +116,12 @@ public void testMapSerialization() throws Exception {
columnFamilyOptions,
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
1, new KeyGroupRange(0, 0),
1,
new KeyGroupRange(0, 0),
new ExecutionConfig(),
false,
TestLocalRecoveryConfig.disabled());
TestLocalRecoveryConfig.disabled(),
RocksDBStateBackend.PriorityQueueStateType.HEAP);
longHeapKeyedStateBackend.restore(null);
longHeapKeyedStateBackend.setCurrentKey(key);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
Expand Down Expand Up @@ -185,18 +186,19 @@ public void testDeserializeValueTooMany2() throws Exception {
@Test
public void testListSerialization() throws Exception {
final long key = 0L;

final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
// objects for heap state list serialisation
final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
new HeapKeyedStateBackend<>(
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
ClassLoader.getSystemClassLoader(),
1,
new KeyGroupRange(0, 0),
keyGroupRange.getNumberOfKeyGroups(),
keyGroupRange,
async,
new ExecutionConfig(),
TestLocalRecoveryConfig.disabled()
TestLocalRecoveryConfig.disabled(),
new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128)
);
longHeapKeyedStateBackend.setCurrentKey(key);

Expand Down Expand Up @@ -292,18 +294,19 @@ public void testDeserializeListTooShort2() throws Exception {
@Test
public void testMapSerialization() throws Exception {
final long key = 0L;

final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
// objects for heap state list serialisation
final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
new HeapKeyedStateBackend<>(
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
ClassLoader.getSystemClassLoader(),
1,
new KeyGroupRange(0, 0),
keyGroupRange.getNumberOfKeyGroups(),
keyGroupRange,
async,
new ExecutionConfig(),
TestLocalRecoveryConfig.disabled()
TestLocalRecoveryConfig.disabled(),
new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128)
);
longHeapKeyedStateBackend.setCurrentKey(key);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import javax.annotation.Nullable;

import java.util.Collection;
import java.util.function.Consumer;
import java.util.function.Predicate;

/**
* Interface for collection that gives in order access to elements w.r.t their priority.
Expand All @@ -35,6 +37,16 @@
@Internal
public interface InternalPriorityQueue<T> {

/**
* Polls from the top of the queue as long as the the queue is not empty and passes the elements to
* {@link Consumer} until a {@link Predicate} rejects an offered element. The rejected element is not
* removed from the queue and becomes the new head.
*
* @param canConsume bulk polling ends once this returns false. The rejected element is nor removed and not consumed.
* @param consumer consumer function for elements accepted by canConsume.
*/
void bulkPoll(@Nonnull Predicate<T> canConsume, @Nonnull Consumer<T> consumer);

/**
* Retrieves and removes the first element (w.r.t. the order) of this set,
* or returns {@code null} if this set is empty.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state;

import javax.annotation.Nonnull;

import java.util.Set;

/**
* This interface exists as (temporary) adapter between the new {@link InternalPriorityQueue} and the old way in which
* timers are written in a snapshot. This interface can probably go away once timer state becomes part of the
* keyed state backend snapshot.
*/
public interface KeyGroupedInternalPriorityQueue<T> extends InternalPriorityQueue<T> {

/**
* Returns the subset of elements in the priority queue that belongs to the given key-group, within the operator's
* key-group range.
*/
@Nonnull
Set<T> getSubsetForKeyGroup(int keyGroupId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
*
* @param <K> The key by which state is keyed.
*/
public interface KeyedStateBackend<K> extends InternalKeyContext<K>, KeyedStateFactory, Disposable {
public interface KeyedStateBackend<K>
extends InternalKeyContext<K>, KeyedStateFactory, PriorityQueueSetFactory, Disposable {

/**
* Sets the current key that is used for partitioned state.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state;

/**
* This interface works similar to {@link Comparable} and is used to prioritize between two objects. The main difference
* between this interface and {@link Comparable} is it is not require to follow the usual contract between that
* {@link Comparable#compareTo(Object)} and {@link Object#equals(Object)}. The contract of this interface is:
* When two objects are equal, they indicate the same priority, but indicating the same priority does not require that
* both objects are equal.
*
* @param <T> type of the compared objects.
*/
@FunctionalInterface
public interface PriorityComparator<T> {

/**
* Compares two objects for priority. Returns a negative integer, zero, or a positive integer as the first
* argument has lower, equal to, or higher priority than the second.
* @param left left operand in the comparison by priority.
* @param right left operand in the comparison by priority.
* @return a negative integer, zero, or a positive integer as the first argument has lower, equal to, or higher
* priority than the second.
*/
int comparePriority(T left, T right);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;

import javax.annotation.Nonnull;

/**
* Factory for {@link KeyGroupedInternalPriorityQueue} instances.
*/
public interface PriorityQueueSetFactory {

/**
* Creates a {@link KeyGroupedInternalPriorityQueue}.
*
* @param stateName unique name for associated with this queue.
* @param byteOrderedElementSerializer a serializer that with a format that is lexicographically ordered in
* alignment with elementPriorityComparator.
* @param <T> type of the stored elements.
* @return the queue with the specified unique name.
*/
@Nonnull
<T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
@Nonnull String stateName,
@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
@Nonnull PriorityComparator<T> elementPriorityComparator,
@Nonnull KeyExtractorFunction<T> keyExtractor);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.FlinkRuntimeException;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.util.Comparator;

/**
* This class is an adapter between {@link PriorityComparator} and a full {@link Comparator} that respects the
* contract between {@link Comparator#compare(Object, Object)} and {@link Object#equals(Object)}. This is currently
* needed for implementations of
* {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetCache} that are implemented
* on top of a data structure that relies on the this contract, e.g. a tree set. We should replace this in the near
* future.
*
* @param <T> type of the compared elements.
*/
public class TieBreakingPriorityComparator<T> implements Comparator<T>, PriorityComparator<T> {

/** The {@link PriorityComparator} to which we delegate in a first step. */
@Nonnull
private final PriorityComparator<T> priorityComparator;

/** Serializer for instances of the compared objects. */
@Nonnull
private final TypeSerializer<T> serializer;

/** Stream that we use in serialization. */
@Nonnull
private final ByteArrayOutputStreamWithPos outStream;

/** {@link org.apache.flink.core.memory.DataOutputView} around outStream. */
@Nonnull
private final DataOutputViewStreamWrapper outView;

public TieBreakingPriorityComparator(
@Nonnull PriorityComparator<T> priorityComparator,
@Nonnull TypeSerializer<T> serializer,
@Nonnull ByteArrayOutputStreamWithPos outStream,
@Nonnull DataOutputViewStreamWrapper outView) {

this.priorityComparator = priorityComparator;
this.serializer = serializer;
this.outStream = outStream;
this.outView = outView;
}

@SuppressWarnings("unchecked")
@Override
public int compare(T o1, T o2) {

// first we compare priority, this should be the most commonly hit case
int cmp = priorityComparator.comparePriority(o1, o2);

if (cmp != 0) {
return cmp;
}

// here we start tie breaking and do our best to comply with the compareTo/equals contract, first we try
// to simply find an existing way to fully compare.
if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) {
return ((Comparable<T>) o1).compareTo(o2);
}

// we catch this case before moving to more expensive tie breaks.
if (o1.equals(o2)) {
return 0;
}

// if objects are not equal, their serialized form should somehow differ as well. this can be costly, and...
// TODO we should have an alternative approach in the future, e.g. a cache that does not rely on compare to check equality.
try {
outStream.reset();
serializer.serialize(o1, outView);
int leftLen = outStream.getPosition();
serializer.serialize(o2, outView);
int rightLen = outStream.getPosition() - leftLen;
return compareBytes(outStream.getBuf(), 0, leftLen, leftLen, rightLen);
} catch (IOException ex) {
throw new FlinkRuntimeException("Serializer problem in comparator.", ex);
}
}

@Override
public int comparePriority(T left, T right) {
return priorityComparator.comparePriority(left, right);
}

public static int compareBytes(byte[] bytes, int offLeft, int leftLen, int offRight, int rightLen) {
int maxLen = Math.min(leftLen, rightLen);
for (int i = 0; i < maxLen; ++i) {
int cmp = bytes[offLeft + i] - bytes[offRight + i];
if (cmp != 0) {
return cmp;
}
}
return leftLen - rightLen;
}
}
Loading

0 comments on commit 79b38f8

Please sign in to comment.