Skip to content

Commit

Permalink
KAFKA-12809: Remove deprecated methods of Stores factory (apache#10729)
Browse files Browse the repository at this point in the history
Removes methods deprecated via KIP-319 and KIP-358.

Reviewers: Matthias J. Sax <[email protected]>
  • Loading branch information
jlprat authored May 19, 2021
1 parent 0af3773 commit e23ede1
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 67 deletions.
64 changes: 4 additions & 60 deletions streams/src/main/java/org/apache/kafka/streams/state/Stores.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,46 +175,6 @@ public String metricsScope() {
};
}

/**
* Create a persistent {@link WindowBytesStoreSupplier}.
*
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* (note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period)
* @param numSegments number of db segments (cannot be zero or negative)
* @param windowSize size of the windows that are stored (cannot be negative). Note: the window size
* is not stored with the records, so this value is used to compute the keys that
* the store returns. No effort is made to validate this parameter, so you must be
* careful to set it the same as the windowed keys you're actually storing.
* @param retainDuplicates whether or not to retain duplicates. Turning this on will automatically disable
* caching and means that null values will be ignored.
* @return an instance of {@link WindowBytesStoreSupplier}
* @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, Duration, Duration, boolean)} instead
*/
@Deprecated // continuing to support Windows#maintainMs/segmentInterval in fallback mode
public static WindowBytesStoreSupplier persistentWindowStore(final String name,
final long retentionPeriod,
final int numSegments,
final long windowSize,
final boolean retainDuplicates) {
if (numSegments < 2) {
throw new IllegalArgumentException("numSegments cannot be smaller than 2");
}

final long legacySegmentInterval = Math.max(retentionPeriod / (numSegments - 1), 60_000L);

return persistentWindowStore(
name,
retentionPeriod,
windowSize,
retainDuplicates,
legacySegmentInterval,
false
);
}

/**
* Create a persistent {@link WindowBytesStoreSupplier}.
* <p>
Expand Down Expand Up @@ -364,38 +324,22 @@ public static WindowBytesStoreSupplier inMemoryWindowStore(final String name,
* Create a persistent {@link SessionBytesStoreSupplier}.
*
* @param name name of the store (cannot be {@code null})
* @param retentionPeriodMs length of time to retain data in the store (cannot be negative)
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* (note that the retention period must be at least as long enough to
* contain the inactivity gap of the session and the entire grace period.)
* @return an instance of a {@link SessionBytesStoreSupplier}
* @deprecated since 2.1 Use {@link Stores#persistentSessionStore(String, Duration)} instead
*/
@Deprecated // continuing to support Windows#maintainMs/segmentInterval in fallback mode
public static SessionBytesStoreSupplier persistentSessionStore(final String name,
final long retentionPeriodMs) {
final Duration retentionPeriod) {
Objects.requireNonNull(name, "name cannot be null");
final String msgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
final long retentionPeriodMs = validateMillisecondDuration(retentionPeriod, msgPrefix);
if (retentionPeriodMs < 0) {
throw new IllegalArgumentException("retentionPeriod cannot be negative");
}
return new RocksDbSessionBytesStoreSupplier(name, retentionPeriodMs);
}

/**
* Create a persistent {@link SessionBytesStoreSupplier}.
*
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* (note that the retention period must be at least as long enough to
* contain the inactivity gap of the session and the entire grace period.)
* @return an instance of a {@link SessionBytesStoreSupplier}
*/
@SuppressWarnings("deprecation") // removing #persistentSessionStore(String name, long retentionPeriodMs) will fix this
public static SessionBytesStoreSupplier persistentSessionStore(final String name,
final Duration retentionPeriod) {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
return persistentSessionStore(name, validateMillisecondDuration(retentionPeriod, msgPrefix));
}

/**
* Create an in-memory {@link SessionBytesStoreSupplier}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,6 @@ public void shouldThrowIfIPersistentTimestampedWindowStoreRetentionPeriodIsNegat
assertEquals("retentionPeriod cannot be negative", e.getMessage());
}

@Deprecated
@Test
public void shouldThrowIfIPersistentWindowStoreIfNumberOfSegmentsSmallerThanOne() {
final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentWindowStore("anyName", 0L, 1, 0L, false));
assertEquals("numSegments cannot be smaller than 2", e.getMessage());
}

@Test
public void shouldThrowIfIPersistentWindowStoreIfWindowSizeIsNegative() {
final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentWindowStore("anyName", ofMillis(0L), ofMillis(-1L), false));
Expand Down

0 comments on commit e23ede1

Please sign in to comment.