Skip to content

Commit

Permalink
[fix][misc] do not require encryption on system topics (apache#18898)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi authored Dec 14, 2022
1 parent 050b310 commit 4129583
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
Expand Down Expand Up @@ -1326,7 +1327,9 @@ protected void handleProducer(final CommandProducer cmdProducer) {

backlogQuotaCheckFuture.thenRun(() -> {
// Check whether the producer will publish encrypted messages or not
if ((topic.isEncryptionRequired() || encryptionRequireOnProducer) && !isEncrypted) {
if ((topic.isEncryptionRequired() || encryptionRequireOnProducer)
&& !isEncrypted
&& !SystemTopicNames.isSystemTopic(topicName)) {
String msg = String.format("Encryption is required in %s", topicName);
log.warn("[{}] {}", remoteAddress, msg);
if (producerFuture.completeExceptionally(new ServerMetadataException(msg))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,10 @@ public boolean isCompactionEnabled() {
// even though is not explicitly set in the policies.
return !NamespaceService.isHeartbeatNamespace(TopicName.get(topic));
}

@Override
public boolean isEncryptionRequired() {
// System topics are only written by the broker that can't know the encryption context.
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -3109,4 +3110,17 @@ public void testGetTopicPoliciesWhenDeleteTopicPolicy() throws Exception {
assertNull(topicPolicies);
}

@Test
public void testProduceChangesWithEncryptionRequired() throws Exception {
final String beforeLac = admin.topics().getInternalStats(topicPolicyEventsTopic).lastConfirmedEntry;
admin.namespaces().setEncryptionRequiredStatus(myNamespace, true);
// just an update to trigger writes on __change_events
admin.topicPolicies().setMaxConsumers(testTopic, 5);
Awaitility.await()
.untilAsserted(() -> {
final PersistentTopicInternalStats newLac = admin.topics().getInternalStats(topicPolicyEventsTopic);
assertNotEquals(newLac, beforeLac);
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1617,4 +1617,29 @@ public void testGetTxnState() throws Exception {
Transaction abortingTxn = transaction;
Awaitility.await().until(() -> abortingTxn.getState() == Transaction.State.ABORTING);
}


@Test
public void testEncryptionRequired() throws Exception {
final String namespace = "tnx/ns-prechecks";
final String topic = "persistent://" + namespace + "/test_transaction_topic";
admin.namespaces().createNamespace(namespace);
admin.namespaces().setEncryptionRequiredStatus(namespace, true);
admin.topics().createNonPartitionedTopic(topic);

@Cleanup
Producer<byte[]> producer = this.pulsarClient.newProducer()
.topic(topic)
.sendTimeout(5, TimeUnit.SECONDS)
.addEncryptionKey("my-app-key")
.defaultCryptoKeyReader("file:./src/test/resources/certificate/public-key.client-rsa.pem")
.create();

Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
producer.newMessage(txn)
.value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))
.send();
txn.commit();
}
}

0 comments on commit 4129583

Please sign in to comment.