Skip to content

Commit

Permalink
NIFI-5241: Updated EventSumValue to use synchronized methods instead …
Browse files Browse the repository at this point in the history
…of many atomic values. This is more efficient and uses less heap. Also noticed that the Logger instance in ProcessorNode was not used so removed it, and in testing this also noticed that the default connection pool size for OkHttpReplicationClient was only 5, which can cause a lot of unnecessary HTTP connections to be created so adjusted the pool size

NIFI-5241: Extended timeout that Jetty uses before closing an active HTTP connection. Because the UI refreshes every 30 seconds by default, and the Jetty connection pool times out every 30 seconds by default, we very frequently saw new HTTP connections being created for the UI refreshes. This resulted in 4 new connections and 4 SSL handshakes occurring every 30 seconds. By extending the timeout, we now see those connections being reused and SSL Handshakes no longer occurring frequently
NIFI-5241: Set Jetty idle timeout to double the amount of time for browser to refresh
NIFI-5241: Fixed synchronization issue with EventSumValue
This closes apache#2752
  • Loading branch information
markap14 authored and mcgilman committed Jun 7, 2018
1 parent f7f809c commit ff00050
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;

import okhttp3.Call;
import okhttp3.ConnectionPool;
import okhttp3.Headers;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
Expand Down Expand Up @@ -289,6 +290,8 @@ private OkHttpClient createOkHttpClient(final NiFiProperties properties, final H
okHttpClientBuilder.connectTimeout(connectionTimeoutMs, TimeUnit.MILLISECONDS);
okHttpClientBuilder.readTimeout(readTimeoutMs, TimeUnit.MILLISECONDS);
okHttpClientBuilder.followRedirects(true);
final int connectionPoolSize = properties.getClusterNodeMaxConcurrentRequests();
okHttpClientBuilder.connectionPool(new ConnectionPool(connectionPoolSize, 5, TimeUnit.MINUTES));

final Tuple<SSLSocketFactory, X509TrustManager> tuple = createSslSocketFactory(properties);
if (tuple != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,9 @@
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ProcessorNode extends AbstractComponentNode implements Connectable {

private static final Logger logger = LoggerFactory.getLogger(ProcessorNode.class);

protected final AtomicReference<ScheduledState> scheduledState;

public ProcessorNode(final String id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,31 @@
package org.apache.nifi.controller.repository.metrics;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.nifi.controller.repository.FlowFileEvent;

public class EventSumValue {

private final AtomicInteger flowFilesIn = new AtomicInteger(0);
private final AtomicInteger flowFilesOut = new AtomicInteger(0);
private final AtomicInteger flowFilesRemoved = new AtomicInteger(0);
private final AtomicInteger flowFilesReceived = new AtomicInteger(0);
private final AtomicInteger flowFilesSent = new AtomicInteger(0);

private final AtomicLong contentSizeIn = new AtomicLong(0L);
private final AtomicLong contentSizeOut = new AtomicLong(0L);
private final AtomicLong contentSizeRemoved = new AtomicLong(0L);
private final AtomicLong bytesRead = new AtomicLong(0L);
private final AtomicLong bytesWritten = new AtomicLong(0L);

private final AtomicLong bytesReceived = new AtomicLong(0L);
private final AtomicLong bytesSent = new AtomicLong(0L);
private final AtomicLong processingNanos = new AtomicLong(0L);
private final AtomicLong aggregateLineageMillis = new AtomicLong(0L);
private final AtomicInteger invocations = new AtomicInteger(0);
private final ConcurrentMap<String, Long> counters = new ConcurrentHashMap<>();
private int flowFilesIn = 0;
private int flowFilesOut = 0;
private int flowFilesRemoved = 0;
private int flowFilesReceived = 0;
private int flowFilesSent = 0;

private long contentSizeIn = 0;
private long contentSizeOut = 0;
private long contentSizeRemoved = 0;
private long bytesRead = 0;
private long bytesWritten = 0;

private long bytesReceived = 0;
private long bytesSent = 0;
private long processingNanos = 0;
private long aggregateLineageMillis = 0;
private int invocations = 0;
private Map<String, Long> counters;

private final long minuteTimestamp;
private final long millisecondTimestamp;
Expand All @@ -56,152 +53,97 @@ public EventSumValue() {
this.minuteTimestamp = millisecondTimestamp / 60000;
}

public void add(final FlowFileEvent flowFileEvent) {
this.aggregateLineageMillis.addAndGet(flowFileEvent.getAggregateLineageMillis());
this.bytesRead.addAndGet(flowFileEvent.getBytesRead());
this.bytesReceived.addAndGet(flowFileEvent.getBytesReceived());
this.bytesSent.addAndGet(flowFileEvent.getBytesSent());
this.bytesWritten.addAndGet(flowFileEvent.getBytesWritten());
this.contentSizeIn.addAndGet(flowFileEvent.getContentSizeIn());
this.contentSizeOut.addAndGet(flowFileEvent.getContentSizeOut());
this.contentSizeRemoved.addAndGet(flowFileEvent.getContentSizeRemoved());
this.flowFilesIn.addAndGet(flowFileEvent.getFlowFilesIn());
this.flowFilesOut.addAndGet(flowFileEvent.getFlowFilesOut());
this.flowFilesReceived.addAndGet(flowFileEvent.getFlowFilesReceived());
this.flowFilesRemoved.addAndGet(flowFileEvent.getFlowFilesRemoved());
this.flowFilesSent.addAndGet(flowFileEvent.getFlowFilesSent());
this.invocations.addAndGet(flowFileEvent.getInvocations());
this.processingNanos.addAndGet(flowFileEvent.getProcessingNanoseconds());
public synchronized void add(final FlowFileEvent flowFileEvent) {
this.aggregateLineageMillis += flowFileEvent.getAggregateLineageMillis();
this.bytesRead += flowFileEvent.getBytesRead();
this.bytesReceived += flowFileEvent.getBytesReceived();
this.bytesSent += flowFileEvent.getBytesSent();
this.bytesWritten += flowFileEvent.getBytesWritten();
this.contentSizeIn += flowFileEvent.getContentSizeIn();
this.contentSizeOut += flowFileEvent.getContentSizeOut();
this.contentSizeRemoved += flowFileEvent.getContentSizeRemoved();
this.flowFilesIn += flowFileEvent.getFlowFilesIn();
this.flowFilesOut += flowFileEvent.getFlowFilesOut();
this.flowFilesReceived += flowFileEvent.getFlowFilesReceived();
this.flowFilesRemoved += flowFileEvent.getFlowFilesRemoved();
this.flowFilesSent += flowFileEvent.getFlowFilesSent();
this.invocations += flowFileEvent.getInvocations();
this.processingNanos += flowFileEvent.getProcessingNanoseconds();

final Map<String, Long> eventCounters = flowFileEvent.getCounters();
if (eventCounters != null) {
for (final Map.Entry<String, Long> entry : eventCounters.entrySet()) {
final String counterName = entry.getKey();
final Long counterValue = entry.getValue();

if (counters == null) {
counters = new HashMap<>();
}
counters.compute(counterName, (key, value) -> value == null ? counterValue : value + counterValue);
}
}
}

public FlowFileEvent toFlowFileEvent(final String componentId) {
public synchronized FlowFileEvent toFlowFileEvent(final String componentId) {
final StandardFlowFileEvent event = new StandardFlowFileEvent(componentId);
event.setAggregateLineageMillis(getAggregateLineageMillis());
event.setBytesRead(getBytesRead());
event.setBytesReceived(getBytesReceived());
event.setBytesSent(getBytesSent());
event.setBytesWritten(getBytesWritten());
event.setContentSizeIn(getContentSizeIn());
event.setContentSizeOut(getContentSizeOut());
event.setContentSizeRemoved(getContentSizeRemoved());
event.setFlowFilesIn(getFlowFilesIn());
event.setFlowFilesOut(getFlowFilesOut());
event.setFlowFilesReceived(getFlowFilesReceived());
event.setFlowFilesRemoved(getFlowFilesRemoved());
event.setFlowFilesSent(getFlowFilesSent());
event.setInvocations(getInvocations());
event.setProcessingNanos(getProcessingNanoseconds());
event.setCounters(Collections.unmodifiableMap(this.counters));
event.setAggregateLineageMillis(aggregateLineageMillis);
event.setBytesRead(bytesRead);
event.setBytesReceived(bytesReceived);
event.setBytesSent(bytesSent);
event.setBytesWritten(bytesWritten);
event.setContentSizeIn(contentSizeIn);
event.setContentSizeOut(contentSizeOut);
event.setContentSizeRemoved(contentSizeRemoved);
event.setFlowFilesIn(flowFilesIn);
event.setFlowFilesOut(flowFilesOut);
event.setFlowFilesReceived(flowFilesReceived);
event.setFlowFilesRemoved(flowFilesRemoved);
event.setFlowFilesSent(flowFilesSent);
event.setInvocations(invocations);
event.setProcessingNanos(processingNanos);
event.setCounters(this.counters == null ? Collections.emptyMap() : Collections.unmodifiableMap(this.counters));
return event;
}

public void add(final EventSumValue other) {
this.aggregateLineageMillis.addAndGet(other.getAggregateLineageMillis());
this.bytesRead.addAndGet(other.getBytesRead());
this.bytesReceived.addAndGet(other.getBytesReceived());
this.bytesSent.addAndGet(other.getBytesSent());
this.bytesWritten.addAndGet(other.getBytesWritten());
this.contentSizeIn.addAndGet(other.getContentSizeIn());
this.contentSizeOut.addAndGet(other.getContentSizeOut());
this.contentSizeRemoved.addAndGet(other.getContentSizeRemoved());
this.flowFilesIn.addAndGet(other.getFlowFilesIn());
this.flowFilesOut.addAndGet(other.getFlowFilesOut());
this.flowFilesReceived.addAndGet(other.getFlowFilesReceived());
this.flowFilesRemoved.addAndGet(other.getFlowFilesRemoved());
this.flowFilesSent.addAndGet(other.getFlowFilesSent());
this.invocations.addAndGet(other.getInvocations());
this.processingNanos.addAndGet(other.getProcessingNanoseconds());

final Map<String, Long> eventCounters = other.getCounters();
if (eventCounters != null) {
for (final Map.Entry<String, Long> entry : eventCounters.entrySet()) {
final String counterName = entry.getKey();
final Long counterValue = entry.getValue();

counters.compute(counterName, (key, value) -> value == null ? counterValue : value + counterValue);
public synchronized void add(final EventSumValue other) {
synchronized (other) {
this.aggregateLineageMillis += other.aggregateLineageMillis;
this.bytesRead += other.bytesRead;
this.bytesReceived += other.bytesReceived;
this.bytesSent += other.bytesSent;
this.bytesWritten += other.bytesWritten;
this.contentSizeIn += other.contentSizeIn;
this.contentSizeOut += other.contentSizeOut;
this.contentSizeRemoved += other.contentSizeRemoved;
this.flowFilesIn += other.flowFilesIn;
this.flowFilesOut += other.flowFilesOut;
this.flowFilesReceived += other.flowFilesReceived;
this.flowFilesRemoved += other.flowFilesRemoved;
this.flowFilesSent += other.flowFilesSent;
this.invocations += other.invocations;
this.processingNanos += other.processingNanos;

final Map<String, Long> eventCounters = other.counters;
if (eventCounters != null) {
if (counters == null) {
counters = new HashMap<>();
}

for (final Map.Entry<String, Long> entry : eventCounters.entrySet()) {
final String counterName = entry.getKey();
final Long counterValue = entry.getValue();

counters.compute(counterName, (key, value) -> value == null ? counterValue : value + counterValue);
}
}
}
}

public long getTimestamp() {
return millisecondTimestamp;
}

public long getMinuteTimestamp() {
return minuteTimestamp;
}

public long getBytesRead() {
return bytesRead.get();
}

public long getBytesWritten() {
return bytesWritten.get();
}

public int getFlowFilesIn() {
return flowFilesIn.get();
}

public int getFlowFilesOut() {
return flowFilesOut.get();
}

public long getContentSizeIn() {
return contentSizeIn.get();
}

public long getContentSizeOut() {
return contentSizeOut.get();
}

public int getFlowFilesRemoved() {
return flowFilesRemoved.get();
}

public long getContentSizeRemoved() {
return contentSizeRemoved.get();
}

public long getProcessingNanoseconds() {
return processingNanos.get();
}

public int getInvocations() {
return invocations.get();
}

public long getAggregateLineageMillis() {
return aggregateLineageMillis.get();
}

public int getFlowFilesReceived() {
return flowFilesReceived.get();
}

public int getFlowFilesSent() {
return flowFilesSent.get();
}

public long getBytesReceived() {
return bytesReceived.get();
}

public long getBytesSent() {
return bytesSent.get();
}

public Map<String, Long> getCounters() {
return counters;
public long getTimestamp() {
return millisecondTimestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -68,6 +69,7 @@
import org.apache.nifi.services.FlowService;
import org.apache.nifi.ui.extension.UiExtension;
import org.apache.nifi.ui.extension.UiExtensionMapping;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.ContentAccess;
import org.apache.nifi.web.NiFiWebConfigurationContext;
Expand Down Expand Up @@ -680,6 +682,13 @@ private void configureGenericConnector(Server server, HttpConfiguration configur

final List<Connector> serverConnectors = Lists.newArrayList();

// Calculate Idle Timeout as twice the auto-refresh interval. This ensures that even with some variance in timing,
// we are able to avoid closing connections from users' browsers most of the time. This can make a significant difference
// in HTTPS connections, as each HTTPS connection that is established must perform the SSL handshake.
final String autoRefreshInterval = props.getAutoRefreshInterval();
final long autoRefreshMillis = autoRefreshInterval == null ? 30000L : FormatUtils.getTimeDuration(autoRefreshInterval, TimeUnit.MILLISECONDS);
final long idleTimeout = autoRefreshMillis * 2;

// If the interfaces collection is empty or each element is empty
if (networkInterfaces.isEmpty() || networkInterfaces.values().stream().filter(value -> !Strings.isNullOrEmpty(value)).collect(Collectors.toList()).isEmpty()) {
final ServerConnector serverConnector = serverConnectorCreator.create(server, configuration);
Expand All @@ -689,6 +698,7 @@ private void configureGenericConnector(Server server, HttpConfiguration configur
serverConnector.setHost(hostname);
}
serverConnector.setPort(port);
serverConnector.setIdleTimeout(idleTimeout);
serverConnectors.add(serverConnector);
} else {
// Add connectors for all IPs from network interfaces
Expand All @@ -710,6 +720,8 @@ private void configureGenericConnector(Server server, HttpConfiguration configur
// Set host and port
serverConnector.setHost(inetAddress.getHostAddress());
serverConnector.setPort(port);
serverConnector.setIdleTimeout(idleTimeout);

return serverConnector;
}).collect(Collectors.toList())));
}
Expand Down

0 comments on commit ff00050

Please sign in to comment.