Skip to content

Commit

Permalink
KAFKA-3522: Add public interfaces for timestamped stores (apache#6175)
Browse files Browse the repository at this point in the history
Reviewers: Bill Bejeck <[email protected]>, John Roesler <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
mjsax authored Mar 6, 2019
1 parent 4788863 commit ab00c51
Show file tree
Hide file tree
Showing 17 changed files with 1,300 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public interface QueryableStoreType<T> {
*
* @param storeProvider provides access to all the underlying StateStore instances
* @param storeName The name of the Store
* @return a read-only interface over a {@code StateStore} (cf. {@link org.apache.kafka.streams.state.QueryableStoreTypes.KeyValueStoreType})
* @return a read-only interface over a {@code StateStore}
* (cf. {@link org.apache.kafka.streams.state.QueryableStoreTypes.KeyValueStoreType})
*/
T create(final StateStoreProvider storeProvider, final String storeName);
T create(final StateStoreProvider storeProvider,
final String storeName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, V>> keyValueSto
return new KeyValueStoreType<>();
}

/**
* A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>}.
*
* @param <K> key type of the store
* @param <V> value type of the store
* @return {@link QueryableStoreTypes.TimestampedKeyValueStoreType}
*/
public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> timestampedKeyValueStore() {
return new TimestampedKeyValueStoreType<>();
}

/**
* A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore}.
*
Expand All @@ -53,6 +64,17 @@ public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, V>> windowStore()
return new WindowStoreType<>();
}

/**
* A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore ReadOnlyWindowStore<K, ValueAndTimestamp<V>>}.
*
* @param <K> key type of the store
* @param <V> value type of the store
* @return {@link QueryableStoreTypes.TimestampedWindowStoreType}
*/
public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, ValueAndTimestamp<V>>> timestampedWindowStore() {
return new TimestampedWindowStoreType<>();
}

/**
* A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore}.
*
Expand All @@ -79,7 +101,8 @@ public boolean accepts(final StateStore stateStore) {
}
}

static class KeyValueStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {
public static class KeyValueStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {

KeyValueStoreType() {
super(ReadOnlyKeyValueStore.class);
}
Expand All @@ -92,7 +115,22 @@ public ReadOnlyKeyValueStore<K, V> create(final StateStoreProvider storeProvider

}

static class WindowStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, V>> {
private static class TimestampedKeyValueStoreType<K, V>
extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> {

TimestampedKeyValueStoreType() {
super(ReadOnlyKeyValueStore.class);
}

@Override
public ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> create(final StateStoreProvider storeProvider,
final String storeName) {
return new CompositeReadOnlyKeyValueStore<>(storeProvider, this, storeName);
}
}

public static class WindowStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, V>> {

WindowStoreType() {
super(ReadOnlyWindowStore.class);
}
Expand All @@ -104,14 +142,31 @@ public ReadOnlyWindowStore<K, V> create(final StateStoreProvider storeProvider,
}
}

static class SessionStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlySessionStore<K, V>> {
private static class TimestampedWindowStoreType<K, V>
extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, ValueAndTimestamp<V>>> {

TimestampedWindowStoreType() {
super(ReadOnlyWindowStore.class);
}

@Override
public ReadOnlyWindowStore<K, ValueAndTimestamp<V>> create(final StateStoreProvider storeProvider,
final String storeName) {
return new CompositeReadOnlyWindowStore<>(storeProvider, this, storeName);
}
}

public static class SessionStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlySessionStore<K, V>> {

SessionStoreType() {
super(ReadOnlySessionStore.class);
}

@Override
public ReadOnlySessionStore<K, V> create(final StateStoreProvider storeProvider,
final String storeName) {
return new CompositeReadOnlySessionStore<>(storeProvider, this, storeName);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,19 @@
*/
package org.apache.kafka.streams.state;

import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;

/**
* Interface for storing the aggregated values of sessions
* Interface for storing the aggregated values of sessions.
* <p>
* The key is internally represented as {@link Windowed Windowed&lt;K&gt;} that comprises the plain key
* and the {@link Window} that represents window start- and end-timestamp.
* <p>
* If two sessions are merged, a new session with new start- and end-timestamp must be inserted into the store
* while the two old sessions must be deleted.
*
* @param <K> type of the record keys
* @param <AGG> type of the aggregated values
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;

import org.apache.kafka.streams.kstream.Windowed;

/**
* Interface for storing the aggregated values of fixed-size time windows.
* <p>
* Note, that the stores's physical key type is {@link Windowed Windowed&lt;K&gt;}.
* In contrast to a {@link WindowStore} that stores plain windowedKeys-value pairs,
* a {@code TimestampedWindowStore} stores windowedKeys-(value/timestamp) pairs.
* <p>
* While the window start- and end-timestamp are fixed per window, the value-side timestamp is used
* to store the last update timestamp of the corresponding window.
*
* @param <K> Type of keys
* @param <V> Type of values
*/
public interface TimestampedWindowStore<K, V> extends WindowStore<K, ValueAndTimestamp<V>> { }
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ private ValueAndTimestamp(final V value,
this.timestamp = timestamp;
}

/**
* Create a new {@link ValueAndTimestamp} instance if the provide {@code value} is not {@code null}.
*
* @param value the value
* @param timestamp the timestamp
* @param <V> the type of the value
* @return a new {@link ValueAndTimestamp} instance if the provide {@code value} is not {@code null};
* otherwise {@code null} is returned
*/
public static <V> ValueAndTimestamp<V> make(final V value,
final long timestamp) {
return value == null ? null : new ValueAndTimestamp<>(value, timestamp);
Expand Down Expand Up @@ -71,4 +80,4 @@ public boolean equals(final Object o) {
public int hashCode() {
return Objects.hash(value, timestamp);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;

/**
* A windowed store interface extending {@link StateStore}.
* Interface for storing the aggregated values of fixed-size time windows.
* <p>
* Note, that the stores's physical key type is {@link Windowed Windowed&lt;K&gt;}.
*
* @param <K> Type of keys
* @param <V> Type of values
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ValueAndTimestamp;

public class KeyValueIteratorFacade<K, V> implements KeyValueIterator<K, V> {
private final KeyValueIterator<K, ValueAndTimestamp<V>> innerIterator;

public KeyValueIteratorFacade(final KeyValueIterator<K, ValueAndTimestamp<V>> iterator) {
innerIterator = iterator;
}

@Override
public boolean hasNext() {
return innerIterator.hasNext();
}

@Override
public K peekNextKey() {
return innerIterator.peekNextKey();
}

@Override
public KeyValue<K, V> next() {
final KeyValue<K, ValueAndTimestamp<V>> innerKeyValue = innerIterator.next();
return KeyValue.pair(innerKeyValue.key, innerKeyValue.value.value());
}

@Override
public void close() {
innerIterator.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;

public class ReadOnlyKeyValueStoreFacade<K, V> implements ReadOnlyKeyValueStore<K, V> {
protected final TimestampedKeyValueStore<K, V> inner;

protected ReadOnlyKeyValueStoreFacade(final TimestampedKeyValueStore<K, V> store) {
inner = store;
}

@Override
public V get(final K key) {
final ValueAndTimestamp<V> valueAndTimestamp = inner.get(key);
return valueAndTimestamp == null ? null : valueAndTimestamp.value();
}

@Override
public KeyValueIterator<K, V> range(final K from,
final K to) {
return new KeyValueIteratorFacade<>(inner.range(from, to));
}

@Override
public KeyValueIterator<K, V> all() {
return new KeyValueIteratorFacade<>(inner.all());
}

@Override
public long approximateNumEntries() {
return inner.approximateNumEntries();
}
}
Loading

0 comments on commit ab00c51

Please sign in to comment.