Skip to content

Commit

Permalink
Delete schema when deleting inactive topic (apache#4262)
Browse files Browse the repository at this point in the history
Currently, we ignore the schema defined for a topic even the topic has been deleted as it's not active any more, this PR will delete the related schema as well when deleting an inactive topic. And also, remove a Subscription for NonPersistentTopic if unsubscribe called.
  • Loading branch information
liketic authored and jiazhai committed May 26, 2019
1 parent f8349e2 commit feca5bb
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats
*/
CompletableFuture<SchemaVersion> addSchema(SchemaData schema);

/**
* Delete the schema if this topic has a schema defined for it.
*/
CompletableFuture<SchemaVersion> deleteSchema();

/**
* Check if schema is compatible with current topic schema.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -154,7 +155,7 @@ private static class TopicStats {
public final ObjectObjectHashMap<String, PublisherStats> remotePublishersStats;

public TopicStats() {
remotePublishersStats = new ObjectObjectHashMap<String, PublisherStats>();
remotePublishersStats = new ObjectObjectHashMap<>();
reset();
}

Expand Down Expand Up @@ -221,7 +222,7 @@ public void publishMessage(ByteBuf data, PublishContext callback) {
Entry entry = create(0L, 0L, duplicateBuffer);
// entry internally retains data so, duplicateBuffer should be release here
duplicateBuffer.release();
((NonPersistentReplicator) replicator).sendMessage(entry);
replicator.sendMessage(entry);
});
}
}
Expand Down Expand Up @@ -401,13 +402,9 @@ public CompletableFuture<Subscription> createSubscription(String subscriptionNam
return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName));
}

void removeSubscription(String subscriptionName) {
subscriptions.remove(subscriptionName);
}

@Override
public CompletableFuture<Void> delete() {
return delete(false, false);
return delete(false, false, false);
}

/**
Expand All @@ -417,10 +414,12 @@ public CompletableFuture<Void> delete() {
*/
@Override
public CompletableFuture<Void> deleteForcefully() {
return delete(false, true);
return delete(false, true, false);
}

private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean closeIfClientsConnected) {
private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
boolean closeIfClientsConnected,
boolean deleteSchema) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();

lock.writeLock().lock();
Expand Down Expand Up @@ -465,7 +464,9 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c
} else {
subscriptions.forEach((s, sub) -> futures.add(sub.delete()));
}

if (deleteSchema) {
futures.add(deleteSchema().thenApply(schemaVersion -> null));
}
FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
Expand Down Expand Up @@ -930,7 +931,7 @@ public void checkGC(int gcIntervalInSeconds) {
gcIntervalInSeconds);
}

stopReplProducers().thenCompose(v -> delete(true, false))
stopReplProducers().thenCompose(v -> delete(true, false, true))
.thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic))
.exceptionally(e -> {
if (e.getCause() instanceof TopicBusyException) {
Expand All @@ -939,8 +940,7 @@ public void checkGC(int gcIntervalInSeconds) {
log.debug("[{}] Did not delete busy topic: {}", topic,
e.getCause().getMessage());
}
replicators.forEach((region, replicator) -> ((NonPersistentReplicator) replicator)
.startProducer());
replicators.forEach((region, replicator) -> replicator.startProducer());
} else {
log.warn("[{}] Inactive topic deletion failed", topic, e);
}
Expand Down Expand Up @@ -1009,8 +1009,8 @@ public boolean isReplicated() {
}

@Override
public CompletableFuture<Void> unsubscribe(String subName) {
// No-op
public CompletableFuture<Void> unsubscribe(String subscriptionName) {
subscriptions.remove(subscriptionName);
return CompletableFuture.completedFuture(null);
}

Expand Down Expand Up @@ -1047,6 +1047,21 @@ public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
}

@Override
public CompletableFuture<SchemaVersion> deleteSchema() {
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
return schemaRegistryService.getSchema(id)
.thenCompose(schema -> {
if (schema != null) {
return schemaRegistryService.deleteSchema(id, "");
} else {
return CompletableFuture.completedFuture(null);
}
});
}

@Override
public CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema) {
String base = TopicName.get(getName()).getPartitionedTopicName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.broker.stats.ReplicationMetrics;
Expand Down Expand Up @@ -697,7 +698,6 @@ private CompletableFuture<? extends Subscription> getNonDurableSubscription(Stri
return subscriptionFuture;
}

@SuppressWarnings("unchecked")
@Override
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition, boolean replicateSubscriptionState) {
return getDurableSubscription(subscriptionName, initialPosition, replicateSubscriptionState);
Expand Down Expand Up @@ -750,11 +750,11 @@ void removeSubscription(String subscriptionName) {
*/
@Override
public CompletableFuture<Void> delete() {
return delete(false);
return delete(false, false);
}

private CompletableFuture<Void> delete(boolean failIfHasSubscriptions) {
return delete(failIfHasSubscriptions, false);
private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean deleteSchema) {
return delete(failIfHasSubscriptions, false, deleteSchema);
}

/**
Expand All @@ -766,7 +766,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions) {
*/
@Override
public CompletableFuture<Void> deleteForcefully() {
return delete(false, true);
return delete(false, true, false);
}

/**
Expand All @@ -779,11 +779,15 @@ public CompletableFuture<Void> deleteForcefully() {
* @param closeIfClientsConnected
* Flag indicate whether explicitly close connected producers/consumers/replicators before trying to delete topic. If
* any client is connected to a topic and if this flag is disable then this operation fails.
* @param deleteSchema
* Flag indicating whether delete the schema defined for topic if exist.
*
* @return Completable future indicating completion of delete operation Completed exceptionally with:
* IllegalStateException if topic is still active ManagedLedgerException if ledger delete operation fails
*/
private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean closeIfClientsConnected) {
private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
boolean closeIfClientsConnected,
boolean deleteSchema) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();

lock.writeLock().lock();
Expand Down Expand Up @@ -826,6 +830,9 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c
} else {
subscriptions.forEach((s, sub) -> futures.add(sub.delete()));
}
if (deleteSchema) {
futures.add(deleteSchema().thenApply(schemaVersion -> null));
}

FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
if (ex != null) {
Expand Down Expand Up @@ -1503,7 +1510,7 @@ public PersistentTopicInternalStats getInternalStats() {
stats.pendingAddEntriesCount = ml.getPendingAddEntriesCount();

stats.lastConfirmedEntry = ml.getLastConfirmedEntry().toString();
stats.state = ml.getState().toString();
stats.state = ml.getState();

stats.ledgers = Lists.newArrayList();
ml.getLedgersInfo().forEach((id, li) -> {
Expand Down Expand Up @@ -1594,7 +1601,7 @@ public void checkGC(int gcIntervalInSeconds) {
replCloseFuture.complete(null);
}

replCloseFuture.thenCompose(v -> delete(true))
replCloseFuture.thenCompose(v -> delete(true, true))
.thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic))
.exceptionally(e -> {
if (e.getCause() instanceof TopicBusyException) {
Expand Down Expand Up @@ -1949,6 +1956,21 @@ public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
}

@Override
public CompletableFuture<SchemaVersion> deleteSchema() {
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
return schemaRegistryService.getSchema(id)
.thenCompose(schema -> {
if (schema != null) {
return schemaRegistryService.deleteSchema(id, "");
} else {
return CompletableFuture.completedFuture(null);
}
});
}

@Override
public CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema) {
String base = TopicName.get(getName()).getPartitionedTopicName();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import java.util.Optional;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;


@Test
public class NonPersistentTopicE2ETest extends BrokerTestBase {

@BeforeMethod
@Override
protected void setup() throws Exception {
super.baseSetup();
}

@AfterMethod
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Data
@ToString
@EqualsAndHashCode
private static class Foo {
private String field1;
private String field2;
private int field3;
}

private Optional<Topic> getTopic(String topicName) {
return pulsar.getBrokerService().getTopicReference(topicName);
}

private boolean topicHasSchema(String topicName) {
String base = TopicName.get(topicName).getPartitionedTopicName();
String schemaName = TopicName.get(base).getSchemaName();
SchemaRegistry.SchemaAndMetadata result = pulsar.getSchemaRegistryService().getSchema(schemaName).join();
return result != null && !result.schema.isDeleted();
}

@Test
public void testGCWillDeleteSchema() throws Exception {
// 1. Simple successful GC
String topicName = "non-persistent://prop/ns-abc/topic-1";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
producer.close();

Optional<Topic> topic = getTopic(topicName);
assertTrue(topic.isPresent());

byte[] data = JSONSchema.of(SchemaDefinition.builder()
.withPojo(Foo.class).build()).getSchemaInfo().getSchema();
SchemaData schemaData = SchemaData.builder()
.data(data)
.type(SchemaType.BYTES)
.user("foo").build();
topic.get().addSchema(schemaData).join();
assertTrue(topicHasSchema(topicName));
runGC();

topic = getTopic(topicName);
assertFalse(topic.isPresent());
assertFalse(topicHasSchema(topicName));

// 2. Topic is not GCed with live connection
topicName = "non-persistent://prop/ns-abc/topic-2";
String subName = "sub1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
topic = getTopic(topicName);
assertTrue(topic.isPresent());
topic.get().addSchema(schemaData).join();
assertTrue(topicHasSchema(topicName));

runGC();
topic = getTopic(topicName);
assertTrue(topic.isPresent());
assertTrue(topicHasSchema(topicName));

// 3. Topic with subscription is not GCed even with no connections
consumer.close();

runGC();
topic = getTopic(topicName);
assertTrue(topic.isPresent());
assertTrue(topicHasSchema(topicName));

// 4. Topic can be GCed after unsubscribe
admin.topics().deleteSubscription(topicName, subName);

runGC();
topic = getTopic(topicName);
assertFalse(topic.isPresent());
assertFalse(topicHasSchema(topicName));
}

}
Loading

0 comments on commit feca5bb

Please sign in to comment.