Skip to content

Commit 8d8c367

Browse files
authored
KAFKA-17192 Fix MirrorMaker2 worker config does not pass config.provi… (apache#16678)
Reviewers: Chris Egerton <[email protected]>
1 parent 0eb9ac2 commit 8d8c367

File tree

3 files changed

+18
-2
lines changed

3 files changed

+18
-2
lines changed

clients/src/main/java/org/apache/kafka/common/utils/Utils.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -1648,9 +1648,24 @@ public static <V> Map<String, V> entriesWithPrefix(Map<String, V> map, String pr
16481648
* @param <V> the type of values stored in the map
16491649
*/
16501650
public static <V> Map<String, V> entriesWithPrefix(Map<String, V> map, String prefix, boolean strip) {
1651+
return entriesWithPrefix(map, prefix, strip, false);
1652+
}
1653+
1654+
/**
1655+
* Find all key/value pairs whose keys begin with the given prefix, optionally removing that prefix
1656+
* from all resulting keys.
1657+
* @param map the map to filter key/value pairs from
1658+
* @param prefix the prefix to search keys for
1659+
* @param strip whether the keys of the returned map should not include the prefix
1660+
* @param allowMatchingLength whether to include keys that are exactly the same length as the prefix
1661+
* @return a {@link Map} containing a key/value pair for every key/value pair in the {@code map}
1662+
* parameter whose key begins with the given {@code prefix}; may be empty, but never null
1663+
* @param <V> the type of values stored in the map
1664+
*/
1665+
public static <V> Map<String, V> entriesWithPrefix(Map<String, V> map, String prefix, boolean strip, boolean allowMatchingLength) {
16511666
Map<String, V> result = new HashMap<>();
16521667
for (Map.Entry<String, V> entry : map.entrySet()) {
1653-
if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) {
1668+
if (entry.getKey().startsWith(prefix) && (allowMatchingLength || entry.getKey().length() > prefix.length())) {
16541669
if (strip)
16551670
result.put(entry.getKey().substring(prefix.length()), entry.getValue());
16561671
else

connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ private Map<String, String> stringsWithPrefixStripped(String prefix) {
308308
}
309309

310310
private Map<String, String> stringsWithPrefix(String prefix) {
311-
return Utils.entriesWithPrefix(rawProperties, prefix, false);
311+
return Utils.entriesWithPrefix(rawProperties, prefix, false, true);
312312
}
313313

314314
static Map<String, String> clusterConfigsWithPrefix(String prefix, Map<String, String> props) {

connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ public void testWorkerConfigs() {
257257
assertEquals("b->a", aProps.get("client.id"));
258258
assertEquals("123", aProps.get("offset.storage.replication.factor"));
259259
assertEquals("__", aProps.get("replication.policy.separator"));
260+
assertEquals("fake", aProps.get("config.providers"));
260261
Map<String, String> bProps = mirrorConfig.workerConfig(b);
261262
assertEquals("a->b", bProps.get("client.id"));
262263
assertEquals("456", bProps.get("status.storage.replication.factor"));

0 commit comments

Comments
 (0)