Skip to content

Commit

Permalink
KAFKA-4651: improve test coverage of stores (apache#4555)
Browse files Browse the repository at this point in the history
Working on increasing the coverage of stores in unit tests.  
Started with `InMemoryKeyValueLoggedStore` 

Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
bbejeck authored and guozhangwang committed Feb 20, 2018
1 parent eaafbde commit 256708d
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
Expand All @@ -38,33 +37,6 @@ public class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractState
protected StateSerdes<K, AGG> serdes;
protected String topic;

// this is optimizing the case when this store is already a bytes store, in which we can avoid Bytes.wrap() costs
private static class RocksDBSessionBytesStore extends RocksDBSessionStore<Bytes, byte[]> {
RocksDBSessionBytesStore(final SegmentedBytesStore inner) {
super(inner, Serdes.Bytes(), Serdes.ByteArray());
}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) {
final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(key, earliestSessionEndTime, latestSessionStartTime);
return WrappedSessionStoreIterator.bytesIterator(bytesIterator, serdes);
}

@Override
public void remove(final Windowed<Bytes> key) {
bytesStore.remove(SessionKeySerde.bytesToBinary(key));
}

@Override
public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
bytesStore.put(SessionKeySerde.bytesToBinary(sessionKey), aggregate);
}
}

static RocksDBSessionStore<Bytes, byte[]> bytesStore(final SegmentedBytesStore inner) {
return new RocksDBSessionBytesStore(inner);
}

RocksDBSessionStore(final SegmentedBytesStore bytesStore,
final Serde<K> keySerde,
final Serde<AGG> aggSerde) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,28 +363,6 @@ public synchronized KeyValueIterator<Bytes, byte[]> all() {
return rocksDbIterator;
}

public synchronized KeyValue<Bytes, byte[]> first() {
validateStoreOpen();

final RocksIterator innerIter = db.newIterator();
innerIter.seekToFirst();
final KeyValue<Bytes, byte[]> pair = new KeyValue<>(new Bytes(innerIter.key()), innerIter.value());
innerIter.close();

return pair;
}

public synchronized KeyValue<Bytes, byte[]> last() {
validateStoreOpen();

final RocksIterator innerIter = db.newIterator();
innerIter.seekToLast();
final KeyValue<Bytes, byte[]> pair = new KeyValue<>(new Bytes(innerIter.key()), innerIter.value());
innerIter.close();

return pair;
}

/**
* Return an approximate count of key-value mappings in this store.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,18 @@
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

public abstract class AbstractKeyValueStoreTest {
Expand Down Expand Up @@ -341,4 +347,31 @@ public void testSize() {
store.flush();
assertEquals(5, store.approximateNumEntries());
}

@Test
public void shouldPutAll() {
List<KeyValue<Integer, String>> entries = new ArrayList<>();
entries.add(new KeyValue<>(1, "one"));
entries.add(new KeyValue<>(2, "two"));

store.putAll(entries);

final List<KeyValue<Integer, String>> allReturned = new ArrayList<>();
final List<KeyValue<Integer, String>> expectedReturned = Arrays.asList(KeyValue.pair(1, "one"), KeyValue.pair(2, "two"));
final Iterator<KeyValue<Integer, String>> iterator = store.all();

while (iterator.hasNext()) {
allReturned.add(iterator.next());
}
assertThat(allReturned, equalTo(expectedReturned));

}

@Test
public void shouldDeleteFromStore() {
store.put(1, "one");
store.put(2, "two");
store.delete(2);
assertNull(store.get(2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,28 @@
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.junit.Assert.assertEquals;

public class InMemoryKeyValueLoggedStoreTest extends AbstractKeyValueStoreTest {

@SuppressWarnings("unchecked")
@Override
protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("my-store"),
(Serde<K>) context.keySerde(),
(Serde<V>) context.valueSerde())
.withLoggingEnabled(Collections.singletonMap("retention.ms", "1000"));
Stores.inMemoryKeyValueStore("my-store"),
(Serde<K>) context.keySerde(),
(Serde<V>) context.valueSerde())
.withLoggingEnabled(Collections.singletonMap("retention.ms", "1000"));

final StateStore store = storeBuilder.build();
store.init(context, store);

return (KeyValueStore<K, V>) store;
}

@Test
public void shouldPutAll() {
List<KeyValue<Integer, String>> entries = new ArrayList<>();
entries.add(new KeyValue<>(1, "1"));
entries.add(new KeyValue<>(2, "2"));
store.putAll(entries);
assertEquals(store.get(1), "1");
assertEquals(store.get(2), "2");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,20 @@ public void shouldGetAllFromInnerStoreAndRecordAllMetric() {
EasyMock.verify(inner);
}

@Test
public void shouldFlushInnerWhenFlushTimeRecords() {
inner.flush();
EasyMock.expectLastCall().once();
init();

metered.flush();

final KafkaMetric metric = metric("flush-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}


private KafkaMetric metric(final MetricName metricName) {
return this.metrics.metric(metricName);
}
Expand Down
Loading

0 comments on commit 256708d

Please sign in to comment.