Skip to content

Commit

Permalink
KAFKA-5651; Follow-up: add with method to Materialized
Browse files Browse the repository at this point in the history
Add a `with(Serde keySerde, Serde valSerde)` to `Materialized` for cases where people don't care about the state store name.

Author: Damian Guy <[email protected]>

Reviewers: Guozhang Wang <[email protected]>, Ismael Juma <[email protected]>, Matthias J. Sax <[email protected]>

Closes apache#4009 from dguy/materialized
  • Loading branch information
dguy authored and guozhangwang committed Oct 6, 2017
1 parent 105ab47 commit 23a0140
Show file tree
Hide file tree
Showing 24 changed files with 330 additions and 154 deletions.
31 changes: 16 additions & 15 deletions streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public synchronized <K, V> KTable<K, V> table(final String topic,
materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
return internalStreamsBuilder.table(topic,
new ConsumedInternal<>(consumed),
new MaterializedInternal<>(materialized));
new MaterializedInternal<>(materialized, internalStreamsBuilder, topic));
}

/**
Expand Down Expand Up @@ -273,11 +273,9 @@ public synchronized <K, V> KTable<K, V> table(final String topic,
return internalStreamsBuilder.table(topic,
new ConsumedInternal<>(consumed),
new MaterializedInternal<>(
Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(
internalStreamsBuilder.newStoreName(topic))
.withKeySerde(consumed.keySerde)
.withValueSerde(consumed.valueSerde),
false));
Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumed.keySerde, consumed.valueSerde),
internalStreamsBuilder,
topic));
}

/**
Expand All @@ -301,11 +299,12 @@ public synchronized <K, V> KTable<K, V> table(final String topic,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
= new MaterializedInternal<>(materialized, internalStreamsBuilder, topic);
return internalStreamsBuilder.table(topic,
new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
materializedInternal.valueSerde())),
new MaterializedInternal<>(materialized));
materializedInternal);
}

/**
Expand All @@ -329,11 +328,12 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized =
new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(
internalStreamsBuilder.newStoreName(topic))
.withKeySerde(consumed.keySerde)
.withValueSerde(consumed.valueSerde),
false);
new MaterializedInternal<>(
Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumed.keySerde, consumed.valueSerde),
internalStreamsBuilder,
topic);


return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(consumed), materialized);
}

Expand Down Expand Up @@ -399,7 +399,7 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
return internalStreamsBuilder.globalTable(topic,
new ConsumedInternal<>(consumed),
new MaterializedInternal<>(materialized));
new MaterializedInternal<>(materialized, internalStreamsBuilder, topic));
}

/**
Expand Down Expand Up @@ -431,7 +431,8 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, internalStreamsBuilder, topic);
return internalStreamsBuilder.globalTable(topic,
new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
materializedInternal.valueSerde())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,7 @@ <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
* @param <VR> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
* @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
* @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.with(null, aggValueSerde))}
*/
@Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
Expand Down Expand Up @@ -1346,7 +1346,7 @@ <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> i
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
* @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
* {@link TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized)} aggregate(initializer, aggregator, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
* {@link TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized)} aggregate(initializer, aggregator, Materialized.with(null, aggValueSerde))}
*/
@Deprecated
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
Expand Down Expand Up @@ -1509,7 +1509,7 @@ <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
* @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
* {@link SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized) aggregate(initializer, aggregator, sessionMerger, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
* {@link SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized) aggregate(initializer, aggregator, sessionMerger, Materialized.with(null, aggValueSerde))}
*/
@Deprecated
<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
* @param <VR> the value type of the aggregated {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
* @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
* @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.with(null, aggValueSerde))}
*/
@Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,7 @@ public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
* @return a new unique name
*/
public String newName(final String prefix) {
return internalStreamsBuilder.newName(prefix);
return internalStreamsBuilder.newProcessorName(prefix);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,24 @@ public static <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> as(final K
return new Materialized<>(supplier);
}

/**
* Materialize a {@link StateStore} with the provided key and value {@link Serde}s.
* An internal name will be used for the store.
*
* @param keySerde the key {@link Serde} to use. If the {@link Serde} is null, then the default key
* serde from configs will be used
* @param valueSerde the value {@link Serde} to use. If the {@link Serde} is null, then the default value
* serde from configs will be used
* @param <K> key type
* @param <V> value type
* @param <S> store type
* @return a new {@link Materialized} instance with the given key and value serdes
*/
public static <K, V, S extends StateStore> Materialized<K, V, S> with(final Serde<K> keySerde,
final Serde<V> valueSerde) {
return new Materialized<K, V, S>((String) null).withKeySerde(keySerde).withValueSerde(valueSerde);
}

/**
* Set the valueSerde the materialized {@link StateStore} will use.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ <T> KTable<K, T> build(final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSu
final String functionName,
final StoreBuilder storeBuilder,
final boolean isQueryable) {
final String aggFunctionName = builder.newName(functionName);
final String aggFunctionName = builder.newProcessorName(functionName);
final String sourceName = repartitionIfRequired(storeBuilder.name());
builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
builder.internalTopologyBuilder.addStateStore(storeBuilder, aggFunctionName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;

public interface InternalNameProvider {
String newProcessorName(String prefix);

String newStoreName(String prefix);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;

public class InternalStreamsBuilder {
public class InternalStreamsBuilder implements InternalNameProvider {

final InternalTopologyBuilder internalTopologyBuilder;

Expand All @@ -44,7 +44,7 @@ public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuil

public <K, V> KStream<K, V> stream(final Collection<String> topics,
final ConsumedInternal<K, V> consumed) {
final String name = newName(KStreamImpl.SOURCE_NAME);
final String name = newProcessorName(KStreamImpl.SOURCE_NAME);

internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
name,
Expand All @@ -57,7 +57,7 @@ public <K, V> KStream<K, V> stream(final Collection<String> topics,
}

public <K, V> KStream<K, V> stream(final Pattern topicPattern, final ConsumedInternal<K, V> consumed) {
final String name = newName(KStreamImpl.SOURCE_NAME);
final String name = newProcessorName(KStreamImpl.SOURCE_NAME);

internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
name,
Expand All @@ -74,8 +74,8 @@ public <K, V> KTable<K, V> table(final String topic,
final ConsumedInternal<K, V> consumed,
final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
final String source = newName(KStreamImpl.SOURCE_NAME);
final String name = newName(KTableImpl.SOURCE_NAME);
final String source = newProcessorName(KStreamImpl.SOURCE_NAME);
final String name = newProcessorName(KTableImpl.SOURCE_NAME);

final KTable<K, V> kTable = createKTable(consumed,
topic,
Expand All @@ -94,10 +94,11 @@ public <K, V> KTable<K, V> table(final String topic,
public <K, V> KTable<K, V> table(final String topic,
final ConsumedInternal<K, V> consumed,
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize();
final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized)
.materialize();

final String source = newName(KStreamImpl.SOURCE_NAME);
final String name = newName(KTableImpl.SOURCE_NAME);
final String source = newProcessorName(KStreamImpl.SOURCE_NAME);
final String name = newProcessorName(KTableImpl.SOURCE_NAME);
final KTable<K, V> kTable = createKTable(consumed,
topic,
storeBuilder.name(),
Expand Down Expand Up @@ -141,8 +142,8 @@ public <K, V> GlobalKTable<K, V> globalTable(final String topic,
// explicitly disable logging for global stores
materialized.withLoggingDisabled();
final StoreBuilder storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize();
final String sourceName = newName(KStreamImpl.SOURCE_NAME);
final String processorName = newName(KTableImpl.SOURCE_NAME);
final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME);
final String processorName = newProcessorName(KTableImpl.SOURCE_NAME);
final KTableSource<K, V> tableSource = new KTableSource<>(storeBuilder.name());


Expand All @@ -160,10 +161,12 @@ public <K, V> GlobalKTable<K, V> globalTable(final String topic,
return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name()));
}

public String newName(final String prefix) {
@Override
public String newProcessorName(final String prefix) {
return prefix + String.format("%010d", index.getAndIncrement());
}

@Override
public String newStoreName(final String prefix) {
return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public KTable<K, V> reduce(final Reducer<V> reducer,
Objects.requireNonNull(reducer, "reducer can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
= new MaterializedInternal<>(materialized);
= new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
return doAggregate(
new KStreamReduce<K, V>(materializedInternal.storeName(), reducer),
REDUCE_NAME,
Expand Down Expand Up @@ -171,7 +171,7 @@ private <VR> KTable<K, VR> aggregateMaterialized(final Initializer<VR> initializ
final Aggregator<? super K, ? super V, VR> aggregator,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal
= new MaterializedInternal<>(materialized);
= new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
return doAggregate(
new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator),
AGGREGATE_NAME,
Expand All @@ -183,10 +183,10 @@ public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
final String storeName = builder.newStoreName(AGGREGATE_NAME);

MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(Materialized.<K, VR, KeyValueStore<Bytes, byte[]>>as(storeName), false);
new MaterializedInternal<>(Materialized.<K, VR, KeyValueStore<Bytes, byte[]>>with(keySerde, null),
builder,
AGGREGATE_NAME);
return doAggregate(new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator),
AGGREGATE_NAME,
materializedInternal);
Expand Down Expand Up @@ -277,7 +277,7 @@ public KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStore
public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal
= new MaterializedInternal<>(materialized);
= new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
if (materializedInternal.valueSerde() == null) {
materialized.withValueSerde(Serdes.Long());
}
Expand Down Expand Up @@ -495,7 +495,7 @@ private <T> KTable<K, T> doAggregate(
final String functionName,
final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier) {

final String aggFunctionName = builder.newName(functionName);
final String aggFunctionName = builder.newProcessorName(functionName);

final String sourceName = repartitionIfRequired(storeSupplier.name());

Expand Down
Loading

0 comments on commit 23a0140

Please sign in to comment.