Skip to content

Commit

Permalink
Check message encryption when producer connect and publish (apache#904)
Browse files Browse the repository at this point in the history
  • Loading branch information
nkurihar authored and merlimat committed Jan 2, 2018
1 parent 039932a commit 663e8f4
Show file tree
Hide file tree
Showing 16 changed files with 410 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1533,5 +1533,51 @@ private void validatePeerClusterConflict(String clusterName, Set<String> replica
}
}

@POST
@Path("/{property}/{cluster}/{namespace}/encryptionRequired")
@ApiOperation(value = "Message encryption is required or not for all topics in a namespace")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
})
public void modifyEncryptionRequired(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, boolean encryptionRequired) {
validateAdminAccessOnProperty(property);
validatePoliciesReadOnlyAccess();

NamespaceName nsName = NamespaceName.get(property, cluster, namespace);
Entry<Policies, Stat> policiesNode = null;

try {
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path(POLICIES, property, cluster, namespace))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace " + nsName + " does not exist"));
policiesNode.getKey().encryption_required = encryptionRequired;

// Write back the new policies into zookeeper
globalZk().setData(path(POLICIES, property, cluster, namespace),
jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
policiesCache().invalidate(path(POLICIES, property, cluster, namespace));

log.info("[{}] Successfully {} on namespace {}/{}/{}", clientAppId(),
encryptionRequired ? "true" : "false", property, cluster, namespace);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to modify encryption required status for namespace {}/{}/{}: does not exist", clientAppId(),
property, cluster, namespace);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
"[{}] Failed to modify encryption required status on namespace {}/{}/{} expected policy node version={} : concurrent modification",
clientAppId(), property, cluster, namespace, policiesNode.getValue().getVersion());

throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to modify encryption required status on namespace {}/{}/{}", clientAppId(), property,
cluster, namespace, e);
throw new RestException(e);
}
}

private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
Expand Down Expand Up @@ -72,8 +73,9 @@ public class Producer {
private final boolean isRemote;
private final String remoteCluster;
private final boolean isNonPersistentTopic;
private final boolean isEncrypted;

public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName, String appId) {
public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName, String appId, boolean isEncrypted) {
this.topic = topic;
this.cnx = cnx;
this.producerId = producerId;
Expand All @@ -93,6 +95,8 @@ public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName
this.isRemote = producerName
.startsWith(cnx.getBrokerService().pulsar().getConfiguration().getReplicatorPrefix());
this.remoteCluster = isRemote ? producerName.split("\\.")[2] : null;

this.isEncrypted = isEncrypted;
}

@Override
Expand Down Expand Up @@ -130,6 +134,24 @@ public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndP
return;
}

if (topic.isEncryptionRequired()) {

headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();

// Check whether the message is encrypted or not
if (msgMetadata.getEncryptionKeysCount() < 1) {
log.warn("[{}] Messages must be encrypted", getTopic().getName());
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.MetadataError,
"Messages must be encrypted"));
cnx.completedSendOperation(isNonPersistentTopic);
});
return;
}
}

startPublishOperation();
topic.publishMessage(headersAndPayload,
MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), batchSize));
Expand Down Expand Up @@ -440,6 +462,14 @@ public void checkPermissions() {
}
}

public void checkEncryption() {
if (topic.isEncryptionRequired() && !isEncrypted) {
log.info("[{}] [{}] Unencrypted producer is not allowed to produce from destination [{}] anymore",
producerId, producerName, topic.getName());
disconnect();
}
}

private static final Logger log = LoggerFactory.getLogger(Producer.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {
final String topicName = cmdProducer.getTopic();
final long producerId = cmdProducer.getProducerId();
final long requestId = cmdProducer.getRequestId();
final boolean isEncrypted = cmdProducer.getEncrypted();
authorizationFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -522,9 +523,17 @@ protected void handleProducer(final CommandProducer cmdProducer) {
return;
}

// Check whether the producer will publish encrypted messages or not
if (topic.isEncryptionRequired() && !isEncrypted) {
String msg = String.format("Encryption is required in %s", topicName);
log.warn("[{}] {}", remoteAddress, msg);
ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
return;
}

disableTcpNoDelayIfNeeded(topicName, producerName);

Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole);
Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole, isEncrypted);

try {
topic.addProducer(producer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, lo

boolean isBacklogQuotaExceeded(String producerName);

boolean isEncryptionRequired();

BacklogQuota getBacklogQuota();

void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ protected TopicStats initialValue() {
}
};

// Whether messages published must be encrypted or not in this topic
private volatile boolean isEncryptionRequired = false;

private static class TopicStats {
public double averageMsgSize;
public double aggMsgRateIn;
Expand Down Expand Up @@ -164,6 +167,16 @@ public NonPersistentTopic(String topic, BrokerService brokerService) {
USAGE_COUNT_UPDATER.set(this, 0);

this.lastActive = System.nanoTime();

try {
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, DestinationName.get(topic).getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
isEncryptionRequired = policies.encryption_required;
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, e.getMessage());
isEncryptionRequired = false;
}
}

@Override
Expand Down Expand Up @@ -845,7 +858,14 @@ public void checkGC(int gcIntervalInSeconds) {

@Override
public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
producers.forEach(Producer::checkPermissions);
if (log.isDebugEnabled()) {
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required);
}
isEncryptionRequired = data.encryption_required;
producers.forEach(producer -> {
producer.checkPermissions();
producer.checkEncryption();
});
subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));
return checkReplicationAndRetryOnFailure();
}
Expand All @@ -870,6 +890,11 @@ public boolean isBacklogQuotaExceeded(String producerName) {
return false;
}

@Override
public boolean isEncryptionRequired() {
return isEncryptionRequired;
}

@Override
public CompletableFuture<Void> unsubscribe(String subName) {
// No-op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ public class PersistentTopic implements Topic, AddEntryCallback {

private final MessageDeduplication messageDeduplication;

// Whether messages published must be encrypted or not in this topic
private volatile boolean isEncryptionRequired = false;

private static final FastThreadLocal<TopicStats> threadLocalTopicStats = new FastThreadLocal<TopicStats>() {
@Override
protected TopicStats initialValue() {
Expand Down Expand Up @@ -218,6 +221,16 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
this.lastActive = System.nanoTime();

this.messageDeduplication = new MessageDeduplication(brokerService.pulsar(), this, ledger);

try {
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, DestinationName.get(topic).getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
isEncryptionRequired = policies.encryption_required;
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, e.getMessage());
isEncryptionRequired = false;
}
}

@Override
Expand Down Expand Up @@ -1327,7 +1340,14 @@ private boolean shouldTopicBeRetained() {

@Override
public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
producers.forEach(Producer::checkPermissions);
if (log.isDebugEnabled()) {
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required);
}
isEncryptionRequired = data.encryption_required;
producers.forEach(producer -> {
producer.checkPermissions();
producer.checkEncryption();
});
subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));
checkMessageExpiry();
CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
Expand Down Expand Up @@ -1373,6 +1393,11 @@ public boolean isBacklogQuotaExceeded(String producerName) {
return false;
}

@Override
public boolean isEncryptionRequired() {
return isEncryptionRequired;
}

public CompletableFuture<MessageId> terminate() {
CompletableFuture<MessageId> future = new CompletableFuture<>();
ledger.asyncTerminate(new TerminateCallback() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ public void testAddRemoveProducer() throws Exception {

String role = "appid1";
// 1. simple add producer
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role);
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role, false);
topic.addProducer(producer);
assertEquals(topic.getProducers().size(), 1);

Expand All @@ -337,7 +337,7 @@ public void testAddRemoveProducer() throws Exception {

// 3. add producer for a different topic
PersistentTopic failTopic = new PersistentTopic(failTopicName, ledgerMock, brokerService);
Producer failProducer = new Producer(failTopic, serverCnx, 2 /* producer id */, "prod-name", role);
Producer failProducer = new Producer(failTopic, serverCnx, 2 /* producer id */, "prod-name", role, false);
try {
topic.addProducer(failProducer);
fail("should have failed");
Expand Down Expand Up @@ -480,7 +480,7 @@ public void testDeleteTopic() throws Exception {

// 2. delete topic with producer
topic = (PersistentTopic) brokerService.getTopic(successTopicName).get();
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role);
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role, false);
topic.addProducer(producer);

assertTrue(topic.delete().isCompletedExceptionally());
Expand Down Expand Up @@ -635,7 +635,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
try {
String role = "appid1";
Thread.sleep(10); /* delay to ensure that the delete gets executed first */
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role);
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role, false);
topic.addProducer(producer);
fail("Should have failed");
} catch (BrokerServiceException e) {
Expand Down
Loading

0 comments on commit 663e8f4

Please sign in to comment.