Skip to content

Commit

Permalink
KAFKA-4863; Querying window store may return unwanted keys
Browse files Browse the repository at this point in the history
Make sure that the iterator returned from `WindowStore.fetch(..)` only returns matching keys, rather than all keys that are a prefix match.

Author: Damian Guy <[email protected]>

Reviewers: Eno Thereska, Guozhang Wang

Closes apache#2662 from dguy/kafka-4863
  • Loading branch information
dguy authored and guozhangwang committed Mar 14, 2017
1 parent d3b8ff0 commit 9e4548d
Show file tree
Hide file tree
Showing 7 changed files with 663 additions and 482 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
Expand All @@ -31,7 +30,6 @@
import org.apache.kafka.streams.state.StateSerdes;

import java.util.List;
import java.util.NoSuchElementException;


class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore implements SessionStore<K, AGG>, CachedStateStore<Windowed<K>, AGG> {
Expand Down Expand Up @@ -162,54 +160,4 @@ public void setFlushListener(CacheFlushListener<Windowed<K>, AGG> flushListener)
this.flushListener = flushListener;
}

private static class FilteredCacheIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
private final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator;
private final HasNextCondition hasNextCondition;

FilteredCacheIterator(final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator,
final HasNextCondition hasNextCondition) {
this.cacheIterator = cacheIterator;
this.hasNextCondition = hasNextCondition;
}

@Override
public void close() {
// no-op
}

@Override
public Bytes peekNextKey() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return cacheIterator.peekNextKey();
}

@Override
public boolean hasNext() {
return hasNextCondition.hasNext(cacheIterator);
}

@Override
public KeyValue<Bytes, LRUCacheEntry> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return cacheIterator.next();

}

@Override
public void remove() {
throw new UnsupportedOperationException();
}

@Override
public KeyValue<Bytes, LRUCacheEntry> peekNext() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return cacheIterator.peekNext();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
private final long windowSize;
private final SegmentedBytesStore.KeySchema keySchema = new WindowKeySchema();

private String name;
private ThreadCache cache;
Expand Down Expand Up @@ -149,9 +150,16 @@ public synchronized WindowStoreIterator<V> fetch(final K key, final long timeFro
Bytes fromBytes = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes);
Bytes toBytes = WindowStoreUtils.toBinaryKey(key, timeTo, 0, serdes);

final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo);
final Bytes keyBytes = Bytes.wrap(serdes.rawKey(key));
final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(keyBytes, timeFrom, timeTo);
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, fromBytes, toBytes);
return new MergedSortedCacheWindowStoreIterator<>(cacheIterator,

final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyBytes,
timeFrom,
timeTo);
final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition);

return new MergedSortedCacheWindowStoreIterator<>(filteredCacheIterator,
underlyingIterator,
new StateSerdes<>(serdes.stateName(), Serdes.Long(), serdes.valueSerde()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;

import java.util.NoSuchElementException;

class FilteredCacheIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator;
private final HasNextCondition hasNextCondition;

FilteredCacheIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
final HasNextCondition hasNextCondition) {
this.cacheIterator = cacheIterator;
this.hasNextCondition = hasNextCondition;
}

@Override
public void close() {
// no-op
}

@Override
public Bytes peekNextKey() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return cacheIterator.peekNextKey();
}

@Override
public boolean hasNext() {
return hasNextCondition.hasNext(cacheIterator);
}

@Override
public KeyValue<Bytes, LRUCacheEntry> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return cacheIterator.next();

}

@Override
public void remove() {
throw new UnsupportedOperationException();
}

@Override
public KeyValue<Bytes, LRUCacheEntry> peekNext() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return cacheIterator.peekNext();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@
import java.util.List;

class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
private static final HasNextCondition ITERATOR_HAS_NEXT = new HasNextCondition() {
@Override
public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
return iterator.hasNext();
}
};
private final StateSerdes<Bytes, byte[]> serdes = new StateSerdes<>("window-store-key-schema", Serdes.Bytes(), Serdes.ByteArray());

@Override
Expand All @@ -49,7 +43,21 @@ public long segmentTimestamp(final Bytes key) {

@Override
public HasNextCondition hasNextCondition(final Bytes binaryKey, final long from, final long to) {
return ITERATOR_HAS_NEXT;
return new HasNextCondition() {
@Override
public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
if (iterator.hasNext()) {
final Bytes bytes = iterator.peekNextKey();
final Bytes keyBytes = WindowStoreUtils.bytesKeyFromBinaryKey(bytes.get());
if (!keyBytes.equals(binaryKey)) {
return false;
}
final long time = WindowStoreUtils.timestampFromBinaryKey(bytes.get());
return time >= from && time <= to;
}
return false;
}
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Windowed;
Expand All @@ -30,12 +31,17 @@
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
import static org.apache.kafka.test.StreamsTestUtils.toList;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -53,13 +59,12 @@ public class CachingWindowStoreTest {
private ThreadCache cache;
private String topic;
private WindowKeySchema keySchema;
private RocksDBWindowStore<Bytes, byte[]> windowStore;

@Before
public void setUp() throws Exception {
keySchema = new WindowKeySchema();
underlying = new RocksDBSegmentedBytesStore("test", 30000, 3, keySchema);
windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false);
final RocksDBWindowStore<Bytes, byte[]> windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false);
cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>();
cachingStore = new CachingWindowStore<>(windowStore,
Serdes.String(),
Expand All @@ -73,6 +78,10 @@ public void setUp() throws Exception {
cachingStore.init(context, cachingStore);
}

@After
public void closeStore() {
cachingStore.close();
}

@Test
public void shouldPutFetchFromCache() throws Exception {
Expand Down Expand Up @@ -179,6 +188,18 @@ public void shouldThrowIfTryingToWriteToClosedCachingStore() throws Exception {
cachingStore.put("a", "a");
}

@SuppressWarnings("unchecked")
@Test
public void shouldFetchAndIterateOverExactKeys() throws Exception {
cachingStore.put("a", "0001", 0);
cachingStore.put("aa", "0002", 0);
cachingStore.put("a", "0003", 1);
cachingStore.put("aa", "0004", 1);
cachingStore.put("a", "0005", 60000);

final List<KeyValue<Long, String>> expected = Utils.mkList(KeyValue.pair(0L, "0001"), KeyValue.pair(1L, "0003"), KeyValue.pair(60000L, "0005"));
assertThat(toList(cachingStore.fetch("a", 0, Long.MAX_VALUE)), equalTo(expected));
}

private int addItemsToCache() throws IOException {
int cachedSize = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.junit.Before;
import org.junit.Test;

import java.util.List;

import static org.apache.kafka.test.StreamsTestUtils.toList;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class FilteredCacheIteratorTest {

@SuppressWarnings("unchecked")
private final InMemoryKeyValueStore<Bytes, LRUCacheEntry> store = new InMemoryKeyValueStore("name", null, null);
private final KeyValue<Bytes, LRUCacheEntry> firstEntry = KeyValue.pair(Bytes.wrap("a".getBytes()),
new LRUCacheEntry("1".getBytes()));
private final List<KeyValue<Bytes, LRUCacheEntry>> entries = Utils.mkList(
firstEntry,
KeyValue.pair(Bytes.wrap("b".getBytes()),
new LRUCacheEntry("2".getBytes())),
KeyValue.pair(Bytes.wrap("c".getBytes()),
new LRUCacheEntry("3".getBytes())));

private FilteredCacheIterator allIterator;
private FilteredCacheIterator firstEntryIterator;

@Before
public void before() {
store.putAll(entries);
final HasNextCondition allCondition = new HasNextCondition() {
@Override
public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
return iterator.hasNext();
}
};
allIterator = new FilteredCacheIterator(
new DelegatingPeekingKeyValueIterator<>("",
store.all()), allCondition);

final HasNextCondition firstEntryCondition = new HasNextCondition() {
@Override
public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
return iterator.hasNext() && iterator.peekNextKey().equals(firstEntry.key);
}
};
firstEntryIterator = new FilteredCacheIterator(
new DelegatingPeekingKeyValueIterator<>("",
store.all()), firstEntryCondition);

}

@Test
public void shouldAllowEntryMatchingHasNextCondition() throws Exception {
final List<KeyValue<Bytes, LRUCacheEntry>> keyValues = toList(allIterator);
assertThat(keyValues, equalTo(entries));
}

@Test
public void shouldPeekNextKey() throws Exception {
while (allIterator.hasNext()) {
final Bytes nextKey = allIterator.peekNextKey();
final KeyValue<Bytes, LRUCacheEntry> next = allIterator.next();
assertThat(next.key, equalTo(nextKey));
}
}

@Test
public void shouldPeekNext() throws Exception {
while (allIterator.hasNext()) {
final KeyValue<Bytes, LRUCacheEntry> peeked = allIterator.peekNext();
final KeyValue<Bytes, LRUCacheEntry> next = allIterator.next();
assertThat(peeked, equalTo(next));
}
}

@Test
public void shouldNotHaveNextIfHasNextConditionNotMet() throws Exception {
assertTrue(firstEntryIterator.hasNext());
firstEntryIterator.next();
assertFalse(firstEntryIterator.hasNext());
}

@Test
public void shouldFilterEntriesNotMatchingHasNextCondition() throws Exception {
final List<KeyValue<Bytes, LRUCacheEntry>> keyValues = toList(firstEntryIterator);
assertThat(keyValues, equalTo(Utils.mkList(firstEntry)));
}

@Test(expected = UnsupportedOperationException.class)
public void shouldThrowUnsupportedOperationExeceptionOnRemove() throws Exception {
allIterator.remove();
}

}
Loading

0 comments on commit 9e4548d

Please sign in to comment.