Skip to content

Commit

Permalink
[cleanup] Optional usage (apache#6613)
Browse files Browse the repository at this point in the history
This PR aims to fix or simplify `Optional` type usage logic:
- Null value used for optional type.
- Optional.isPresent() can be replaced with functional style expression
- Optional call chain that can be simplified.
  • Loading branch information
yjshen authored Mar 28, 2020
1 parent c9a8d58 commit acc161b
Show file tree
Hide file tree
Showing 21 changed files with 72 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,7 @@ public static String createToken(Key signingKey, String subject, Optional<Date>
.setSubject(subject)
.signWith(signingKey);

if (expiryTime.isPresent()) {
builder.setExpiration(expiryTime.get());
}
expiryTime.ifPresent(builder::setExpiration);

return builder.compact();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -897,9 +897,7 @@ public Map<String, FailureDomain> getFailureDomains(
try {
Optional<FailureDomain> domain = failureDomainCache()
.get(joinPath(failureDomainRootPath, domainName));
if (domain.isPresent()) {
domains.put(domainName, domain.get());
}
domain.ifPresent(failureDomain -> domains.put(domainName, failureDomain));
} catch (Exception e) {
log.warn("Failed to get domain {}", domainName, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,21 +186,18 @@ private Path getNicSpeedPath(String nic) {
}

private double getTotalNicLimitKbps(List<String> nics) {
if (overrideBrokerNicSpeedGbps.isPresent()) {
// Use the override value as configured. Return the total max speed across all available NICs, converted
// from Gbps into Kbps
return overrideBrokerNicSpeedGbps.get() * nics.size() * 1024 * 1024;
}

// Nic speed is in Mbits/s, return kbits/s
return nics.stream().mapToDouble(s -> {
try {
return Double.parseDouble(new String(Files.readAllBytes(getNicSpeedPath(s))));
} catch (IOException e) {
LOG.error("Failed to read speed for nic " + s, e);
return 0d;
}
}).sum() * 1024;
// Use the override value as configured. Return the total max speed across all available NICs, converted
// from Gbps into Kbps
return overrideBrokerNicSpeedGbps.map(aDouble -> aDouble * nics.size() * 1024 * 1024)
.orElseGet(() -> nics.stream().mapToDouble(s -> {
// Nic speed is in Mbits/s, return kbits/s
try {
return Double.parseDouble(new String(Files.readAllBytes(getNicSpeedPath(s))));
} catch (IOException e) {
LOG.error("Failed to read speed for nic " + s, e);
return 0d;
}
}).sum() * 1024);
}

private Path getNicTxPath(String nic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,8 @@ public LoadManagerReport generateLoadReport() {
@Override
public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId serviceUnit) {
Optional<String> leastLoadedBroker = loadManager.selectBrokerForAssignment(serviceUnit);
if (leastLoadedBroker.isPresent()) {
return Optional.of(new SimpleResourceUnit(getBrokerWebServiceUrl(leastLoadedBroker.get()),
new PulsarResourceDescription()));
} else {
return Optional.empty();
}
return leastLoadedBroker.map(s -> new SimpleResourceUnit(getBrokerWebServiceUrl(s),
new PulsarResourceDescription()));
}

private String getBrokerWebServiceUrl(String broker) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,7 @@ private Optional<NamespaceIsolationPolicies> getIsolationPolicies(String cluster
public boolean areIsolationPoliciesPresent(NamespaceName namespace) {
try {
Optional<NamespaceIsolationPolicies> policies = getIsolationPolicies(pulsar.getConfiguration().getClusterName());
if (policies.isPresent()) {
return policies.get().getPolicyByNamespace(namespace) != null;
} else {
return false;
}
return policies.filter(isolationPolicies -> isolationPolicies.getPolicyByNamespace(namespace) != null).isPresent();
} catch (Exception e) {
LOG.warn("IsIsolationPoliciesPresent: Unable to get the namespaceIsolationPolicies", e);
return false;
Expand All @@ -76,11 +72,8 @@ public boolean areIsolationPoliciesPresent(NamespaceName namespace) {
private Optional<NamespaceIsolationPolicy> getNamespaceIsolationPolicy(NamespaceName namespace) {
try {
Optional<NamespaceIsolationPolicies> policies =getIsolationPolicies(pulsar.getConfiguration().getClusterName());
if (!policies.isPresent()) {
return Optional.empty();
}
return policies.map(isolationPolicies -> isolationPolicies.getPolicyByNamespace(namespace));

return Optional.ofNullable(policies.get().getPolicyByNamespace(namespace));
} catch (Exception e) {
LOG.warn("Unable to get the namespaceIsolationPolicies", e);
return Optional.empty();
Expand All @@ -100,11 +93,8 @@ public boolean isSecondaryBroker(NamespaceName namespace, String broker) {
public boolean isSharedBroker(String broker) {
try {
Optional<NamespaceIsolationPolicies> policies = getIsolationPolicies(pulsar.getConfiguration().getClusterName());
if (!policies.isPresent()) {
return true;
}
return policies.map(isolationPolicies -> isolationPolicies.isSharedBroker(broker)).orElse(true);

return policies.get().isSharedBroker(broker);
} catch (Exception e) {
LOG.warn("isPrimaryForAnyNamespace", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1186,9 +1186,7 @@ private void refreshBrokerPublishRate() {
public void forEachTopic(Consumer<Topic> consumer) {
topics.forEach((n, t) -> {
Optional<Topic> topic = extractTopic(t);
if (topic.isPresent()) {
consumer.accept(topic.get());
}
topic.ifPresent(consumer::accept);
});
}

Expand Down Expand Up @@ -1691,29 +1689,27 @@ private void updateDynamicServiceConfiguration() {
} catch (Exception e) {
log.warn("Failed to read zookeeper path [{}]:", BROKER_SERVICE_CONFIGURATION_PATH, e);
}
if (configCache.isPresent()) {
configCache.get().forEach((key, value) -> {
// validate field
if (dynamicConfigurationMap.containsKey(key) && dynamicConfigurationMap.get(key).validator != null) {
if (!dynamicConfigurationMap.get(key).validator.test(value)) {
log.error("Failed to validate dynamic config {} with value {}", key, value);
throw new IllegalArgumentException(
String.format("Failed to validate dynamic-config %s/%s", key, value));
}
configCache.ifPresent(stringStringMap -> stringStringMap.forEach((key, value) -> {
// validate field
if (dynamicConfigurationMap.containsKey(key) && dynamicConfigurationMap.get(key).validator != null) {
if (!dynamicConfigurationMap.get(key).validator.test(value)) {
log.error("Failed to validate dynamic config {} with value {}", key, value);
throw new IllegalArgumentException(
String.format("Failed to validate dynamic-config %s/%s", key, value));
}
// update field value
try {
Field field = ServiceConfiguration.class.getDeclaredField(key);
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
field.setAccessible(true);
field.set(pulsar().getConfiguration(), FieldParser.value(value, field));
log.info("Successfully updated {}/{}", key, value);
}
} catch (Exception e) {
log.warn("Failed to update service configuration {}/{}, {}", key, value, e.getMessage());
}
// update field value
try {
Field field = ServiceConfiguration.class.getDeclaredField(key);
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
field.setAccessible(true);
field.set(pulsar().getConfiguration(), FieldParser.value(value, field));
log.info("Successfully updated {}/{}", key, value);
}
});
}
} catch (Exception e) {
log.warn("Failed to update service configuration {}/{}, {}", key, value, e.getMessage());
}
}));
// register a listener: it updates field value and triggers appropriate registered field-listener only if
// field's value has been changed so, registered doesn't have to update field value in ServiceConfiguration
dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener<Map<String, String>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,9 @@ public CompletableFuture<Void> close() {
this.delayedDeliveryTracker = Optional.empty();
}

if (delayedDeliveryTracker.isPresent()) {
delayedDeliveryTracker.get().close();
}
delayedDeliveryTracker.ifPresent(DelayedDeliveryTracker::close);

if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().close();
}
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);

return disconnectAllConsumers();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,7 @@ public synchronized void internalReadEntriesComplete(final List<Entry> entries,
topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessages, totalBytes);
}

if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().tryDispatchPermit(totalMessages, totalBytes);
}
dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(totalMessages, totalBytes));
}

// Schedule a new read batch operation only after the previous batch has been written to the
Expand Down Expand Up @@ -541,9 +539,7 @@ public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
@Override
public CompletableFuture<Void> close() {
IS_CLOSED_UPDATER.set(this, TRUE);
if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().close();
}
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
return disconnectAllConsumers();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,7 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
continue;
}

if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().tryDispatchPermit(1, entry.getLength());
}
dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(1, entry.getLength()));

// Increment pending messages for messages produced locally
PENDING_MESSAGES_UPDATER.incrementAndGet(this);
Expand Down Expand Up @@ -756,9 +754,7 @@ public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog)
final CompletableFuture<Void> future = new CompletableFuture<>();

super.disconnect(failIfHasBacklog).thenRun(() -> {
if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().close();
}
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
future.complete(null);
}).exceptionally(ex -> {
Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -894,13 +894,9 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
public void deleteLedgerComplete(Object ctx) {
brokerService.removeTopicFromCache(topic);

if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().close();
}
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);

if (subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.get().close();
}
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);

log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
Expand Down Expand Up @@ -982,18 +978,11 @@ public void closeComplete(Object ctx) {
// Everything is now closed, remove the topic from map
brokerService.removeTopicFromCache(topic);

ReplicatedSubscriptionsController ctrl = replicatedSubscriptionsController.orElse(null);
if (ctrl != null) {
ctrl.close();
}
replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);

if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().close();
}
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);

if (subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.get().close();
}
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);

log.info("[{}] Topic closed", topic);
closeFuture.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public static <T> MessageImpl<T> create(MessageMetadata.Builder msgMetadataBuild
// Constructor for incoming message
MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata,
ByteBuf payload, ClientCnx cnx, Schema<T> schema) {
this(topic, messageId, msgMetadata, payload, null, cnx, schema);
this(topic, messageId, msgMetadata, payload, Optional.empty(), cnx, schema);
}

MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf payload,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1108,9 +1108,7 @@ public static ByteBuf newGetSchema(long requestId, String topic, Optional<Schema
CommandGetSchema.Builder schema = CommandGetSchema.newBuilder()
.setRequestId(requestId);
schema.setTopic(topic);
if (version.isPresent()) {
schema.setSchemaVersion(ByteString.copyFrom(version.get().bytes()));
}
version.ifPresent(schemaVersion -> schema.setSchemaVersion(ByteString.copyFrom(schemaVersion.bytes())));

CommandGetSchema getSchema = schema.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ private static ByteBuf newMessage(MarkerType markerType, Optional<String> restri
msgMetadataBuilder.setSequenceId(0);
msgMetadataBuilder.setMarkerType(markerType.getNumber());

if (restrictToCluster.isPresent()) {
msgMetadataBuilder.addReplicateTo(restrictToCluster.get());
}
restrictToCluster.ifPresent(msgMetadataBuilder::addReplicateTo);

MessageMetadata msgMetadata = msgMetadataBuilder.build();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void setWebServicePortTls(Optional<Integer> webServicePortTls) {

@Deprecated
public boolean isTlsEnabled() {
return tlsEnabled || webServicePortTls != null || servicePortTls != null;
return tlsEnabled || webServicePortTls.isPresent() || servicePortTls.isPresent();
}

@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,7 @@ public void write(Record<T> record) {
} else {
// It is coming from some source
Optional<Long> eventTime = sinkRecord.getSourceRecord().getEventTime();
if (eventTime.isPresent()) {
msg.eventTime(eventTime.get());
}
eventTime.ifPresent(msg::eventTime);
}

pulsarSinkProcessor.sendOutputMessage(msg, record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ public String process(String input, Context context) {
String key = "config-key";
Optional<Object> appendValue = context.getUserConfigValue(key);

if (appendValue.isPresent()) {
return input + appendValue.get();
} else {
return input + "!";
}
return appendValue.map(o -> input + (String) o).orElseGet(() -> input + "!");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@
public final class FunctionAuthUtils {

public static final FunctionAuthData getFunctionAuthData(Optional<Function.FunctionAuthenticationSpec> functionAuthenticationSpec) {
if (functionAuthenticationSpec.isPresent()) {
return FunctionAuthData.builder().data(functionAuthenticationSpec.get().getData().toByteArray()).build();
}
return null;
return functionAuthenticationSpec
.map(authenticationSpec -> FunctionAuthData.builder().data(authenticationSpec.getData().toByteArray()).build())
.orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,7 @@ public Optional<FunctionAuthData> updateAuthData(Function.FunctionDetails funcDe
AuthenticationDataSource authenticationDataSource) throws Exception {

String secretId;
if (existingFunctionAuthData.isPresent()) {
secretId = new String(existingFunctionAuthData.get().getData());
} else {
secretId = RandomStringUtils.random(5, true, true).toLowerCase();
}
secretId = existingFunctionAuthData.map(functionAuthData -> new String(functionAuthData.getData())).orElseGet(() -> RandomStringUtils.random(5, true, true).toLowerCase());

String token;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,10 @@ public void registerFunction(final String tenant,
Optional<FunctionAuthData> functionAuthData = functionAuthProvider
.cacheAuthData(finalFunctionDetails, clientAuthenticationDataHttps);

if (functionAuthData.isPresent()) {
functionMetaDataBuilder.setFunctionAuthSpec(
Function.FunctionAuthenticationSpec.newBuilder()
.setData(ByteString.copyFrom(functionAuthData.get().getData()))
.build());
}
functionAuthData.ifPresent(authData -> functionMetaDataBuilder.setFunctionAuthSpec(
Function.FunctionAuthenticationSpec.newBuilder()
.setData(ByteString.copyFrom(authData.getData()))
.build()));
} catch (Exception e) {
log.error("Error caching authentication data for {} {}/{}/{}",
ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
Expand Down
Loading

0 comments on commit acc161b

Please sign in to comment.