Skip to content

Commit 7de5efa

Browse files
Merge branch '7.9.x' into master by armittal-0077
2 parents 00bafcc + f065940 commit 7de5efa

File tree

8 files changed

+85
-17
lines changed

8 files changed

+85
-17
lines changed

ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java

+15-7
Original file line numberDiff line numberDiff line change
@@ -205,14 +205,22 @@ public Set<String> listTopicNames() {
205205
}
206206

207207
@Override
208-
public Map<String, TopicDescription> describeTopics(final Collection<String> topicNames) {
208+
public Map<String, TopicDescription> describeTopics(final Collection<String> topicNames,
209+
final Boolean skipRetriesOnFailure) {
209210
try {
210-
return ExecutorUtil.executeWithRetries(
211-
() -> adminClient.get().describeTopics(
212-
topicNames,
213-
new DescribeTopicsOptions().includeAuthorizedOperations(true)
214-
).allTopicNames().get(),
215-
ExecutorUtil.RetryBehaviour.ON_RETRYABLE);
211+
if (skipRetriesOnFailure) {
212+
return adminClient.get().describeTopics(
213+
topicNames,
214+
new DescribeTopicsOptions().includeAuthorizedOperations(true)
215+
).allTopicNames().get();
216+
} else {
217+
return ExecutorUtil.executeWithRetries(
218+
() -> adminClient.get().describeTopics(
219+
topicNames,
220+
new DescribeTopicsOptions().includeAuthorizedOperations(true)
221+
).allTopicNames().get(),
222+
ExecutorUtil.RetryBehaviour.ON_RETRYABLE);
223+
}
216224
} catch (final ExecutionException e) {
217225
throw new KafkaResponseGetFailedException(
218226
"Failed to Describe Kafka Topic(s): " + topicNames, e.getCause());

ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaTopicClient.java

+19-5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Map;
3232
import java.util.Objects;
3333
import java.util.Optional;
34+
import java.util.Set;
3435
import java.util.function.Function;
3536
import java.util.function.Supplier;
3637
import java.util.stream.Collectors;
@@ -63,7 +64,9 @@ static KafkaTopicClient createProxy(final KafkaTopicClient delegate,
6364
.forward("isTopicExists", methodParams(String.class), sandbox)
6465
.forward("describeTopic", methodParams(String.class), sandbox)
6566
.forward("getTopicConfig", methodParams(String.class), sandbox)
67+
.forward("describeTopic", methodParams(String.class, Boolean.class), sandbox)
6668
.forward("describeTopics", methodParams(Collection.class), sandbox)
69+
.forward("describeTopics", methodParams(Collection.class, Boolean.class), sandbox)
6770
.forward("deleteTopics", methodParams(Collection.class), sandbox)
6871
.forward("listTopicsStartOffsets", methodParams(Collection.class), sandbox)
6972
.forward("listTopicsEndOffsets", methodParams(Collection.class), sandbox)
@@ -159,21 +162,32 @@ public TopicDescription describeTopic(final String topicName) {
159162
return describeTopics(ImmutableList.of(topicName)).get(topicName);
160163
}
161164

165+
public TopicDescription describeTopic(final String topicName,
166+
final Boolean skipRetriesOnFailure) {
167+
return describeTopics(ImmutableList.of(topicName), skipRetriesOnFailure).get(topicName);
168+
}
169+
162170
private Map<String, TopicDescription> describeTopics(final Collection<String> topicNames) {
171+
return describeTopics(topicNames, false);
172+
}
173+
174+
private Map<String, TopicDescription> describeTopics(final Collection<String> topicNames,
175+
final Boolean skipRetriesOnFailure) {
163176
final Map<String, TopicDescription> descriptions = topicNames.stream()
164177
.map(createdTopics::get)
165178
.filter(Objects::nonNull)
166179
.collect(Collectors.toMap(TopicDescription::name, Function.identity()));
167180

168-
final HashSet<String> remaining = new HashSet<>(topicNames);
169-
remaining.removeAll(descriptions.keySet());
170-
if (remaining.isEmpty()) {
181+
final Set<String> topicsToFetch = new HashSet<>(topicNames);
182+
topicsToFetch.removeAll(descriptions.keySet());
183+
if (topicsToFetch.isEmpty()) {
171184
return descriptions;
172185
}
173186

174-
final Map<String, TopicDescription> fromKafka = delegate.describeTopics(remaining);
187+
final Map<String, TopicDescription> remainingTopicDescriptionMap =
188+
delegate.describeTopics(topicsToFetch, skipRetriesOnFailure);
175189

176-
descriptions.putAll(fromKafka);
190+
descriptions.putAll(remainingTopicDescriptionMap);
177191
return descriptions;
178192
}
179193

ksqldb-engine/src/test/java/io/confluent/ksql/services/FakeKafkaTopicClient.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ public Set<String> listTopicNames() {
167167
}
168168

169169
@Override
170-
public Map<String, TopicDescription> describeTopics(final Collection<String> topicNames) {
170+
public Map<String, TopicDescription> describeTopics(Collection<String> topicNames,
171+
Boolean skipRetriesOnFailure) {
171172
return topicNames.stream()
172173
.collect(Collectors.toMap(Function.identity(), this::describeTopic));
173174
}

ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java

+17
Original file line numberDiff line numberDiff line change
@@ -923,6 +923,23 @@ public void shouldNotRetryIsTopicExistsOnUnknownTopicException() {
923923
verify(adminClient, times(1)).describeTopics(anyCollection(), any());
924924
}
925925

926+
@Test
927+
public void shouldNotRetryDescribeTopicsExistsAnyException() {
928+
// When
929+
when(adminClient.describeTopics(anyCollection(), any()))
930+
.thenAnswer(describeTopicsResult(new UnknownTopicOrPartitionException("meh")));
931+
final String topicName = "foobar";
932+
final Exception e = assertThrows(
933+
KafkaResponseGetFailedException.class,
934+
() -> kafkaTopicClient.describeTopic(topicName, true)
935+
);
936+
937+
// Then
938+
verify(adminClient, times(1)).describeTopics(anyCollection(), any());
939+
assertThat(e.getMessage(),
940+
containsString("Failed to Describe Kafka Topic(s): [" + topicName + "]"));
941+
}
942+
926943
@Test
927944
public void shouldThrowIsTopicExistsOnAuthorizationException() {
928945
// Given

ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedKafkaTopicClientTest.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ public static Collection<TestCase<KafkaTopicClient>> getMethodsToTest() {
9090
.ignore("isTopicExists", String.class)
9191
.ignore("describeTopic", String.class)
9292
.ignore("getTopicConfig", String.class)
93+
.ignore("describeTopic", String.class, Boolean.class)
9394
.ignore("describeTopics", Collection.class)
95+
.ignore("describeTopics", Collection.class, Boolean.class)
9496
.ignore("deleteTopics", Collection.class)
9597
.ignore("listTopicsStartOffsets", Collection.class)
9698
.ignore("listTopicsEndOffsets", Collection.class)
@@ -461,7 +463,7 @@ private void givenTopicExists(
461463
final int numReplicas
462464
) {
463465
when(delegate.isTopicExists(topic)).thenReturn(true);
464-
when(delegate.describeTopics(Collections.singleton(topic)))
466+
when(delegate.describeTopics(Collections.singleton(topic), false))
465467
.thenReturn(Collections.singletonMap(
466468
topic,
467469
new TopicDescription(topic, false, topicPartitions(numPartitions, numReplicas))));

ksqldb-execution/src/main/java/io/confluent/ksql/services/KafkaTopicClient.java

+26-1
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,20 @@ boolean createTopic(
154154
* @return map of topic name to description.
155155
* @throws KafkaTopicExistsException if any of the topic do not exist.
156156
*/
157-
Map<String, TopicDescription> describeTopics(Collection<String> topicNames);
157+
default Map<String, TopicDescription> describeTopics(Collection<String> topicNames) {
158+
return describeTopics(topicNames, false);
159+
}
160+
161+
/**
162+
* Call to get one or more topic's description, considering if we want to skip retries on failure.
163+
*
164+
* @param topicNames topicNames to describe
165+
* @param skipRetriesOnFailure whether to skip retries on failure
166+
* @return map of topic name to description.
167+
* @throws KafkaTopicExistsException if any of the topic do not exist.
168+
*/
169+
Map<String, TopicDescription> describeTopics(Collection<String> topicNames,
170+
Boolean skipRetriesOnFailure);
158171

159172
/**
160173
* Call to get a one topic's description.
@@ -167,6 +180,18 @@ default TopicDescription describeTopic(final String topicName) {
167180
return describeTopics(ImmutableList.of(topicName)).get(topicName);
168181
}
169182

183+
/**
184+
* Call to get one topic's description, considering if we want to skip retries on failure.
185+
*
186+
* @param topicName topicName to describe
187+
* @param skipRetriesOnFailure whether to skip retries on failure
188+
* @return the description if the topic
189+
* @throws KafkaTopicExistsException if the topic does not exist.
190+
*/
191+
default TopicDescription describeTopic(final String topicName, Boolean skipRetriesOnFailure) {
192+
return describeTopics(ImmutableList.of(topicName), skipRetriesOnFailure).get(topicName);
193+
}
194+
170195
/**
171196
* Call to get the config of a topic.
172197
*

ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ private static SourceDescriptionWithWarnings describeSource(
295295
final List<KsqlWarning> warnings = new LinkedList<>();
296296
try {
297297
topicDescription = Optional.of(
298-
serviceContext.getTopicClient().describeTopic(dataSource.getKafkaTopicName())
298+
serviceContext.getTopicClient().describeTopic(dataSource.getKafkaTopicName(), true)
299299
);
300300
sourceConstraints = getSourceConstraints(name, ksqlExecutionContext.getMetaStore());
301301
} catch (final KafkaException | KafkaResponseGetFailedException e) {

ksqldb-testing-tool/src/main/java/io/confluent/ksql/tools/test/stubs/StubKafkaTopicClient.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ public Set<String> listTopicNames() {
141141
}
142142

143143
@Override
144-
public Map<String, TopicDescription> describeTopics(final Collection<String> topicNames) {
144+
public Map<String, TopicDescription> describeTopics(final Collection<String> topicNames,
145+
final Boolean skipRetriesOnFailure) {
145146
return topicNames.stream()
146147
.collect(Collectors.toMap(Function.identity(), this::describeTopic));
147148
}

0 commit comments

Comments
 (0)