Skip to content

Commit

Permalink
Metrics: Introduce the Scheduled Witness and Scheduler
Browse files Browse the repository at this point in the history
Introduce the Scheduled Witness and Scheduler
- A Witness that is self populated on a schedule
- load average will come in future commit

Change default scheduled interval from 5 to 10 seconds.

Un-deprecate LazyDelegatingGaguge
 - It is the strategy for Ruby based plugins that allows them to continue to duck type.

 Minor test coverage fixes.

Note - this is dead code until the Witness API is wired in via Ruby

Part of elastic#7788

Fixes elastic#8091
  • Loading branch information
jakelandis committed Oct 27, 2017
1 parent fcd0465 commit 5dd2fe9
Show file tree
Hide file tree
Showing 10 changed files with 649 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
* A lazy proxy to a more specific typed {@link GaugeMetric}. The metric will only be initialized if the initial value is set, or once the {@code set} operation is called.
* <p><strong>Intended only for use with Ruby's duck typing, Java consumers should use the specific typed {@link GaugeMetric}</strong></p>
*
* @deprecated - there are no plans to replace this.
*/
@Deprecated
public class LazyDelegatingGauge extends AbstractMetric<Object> implements GaugeMetric<Object, Object> {

private static final Logger LOGGER = LogManager.getLogger(LazyDelegatingGauge.class);
Expand All @@ -26,9 +24,8 @@ public class LazyDelegatingGauge extends AbstractMetric<Object> implements Gauge
* Constructor - null initial value
*
* @param key The key <i>(with in the namespace)</i> for this metric
* @deprecated - there are no plans to replace this
*/
@Deprecated

public LazyDelegatingGauge(final String key) {
this(key, null);
}
Expand All @@ -38,10 +35,8 @@ public LazyDelegatingGauge(final String key) {
*
* @param key The key for this metric
* @param initialValue The initial value for this {@link GaugeMetric}, may be null
* @deprecated - there are no plans to replace this
*/
@Deprecated
public LazyDelegatingGauge(String key, Object initialValue) {
public LazyDelegatingGauge(String key, Object initialValue) {
super(key);
this.key = key;
if (initialValue != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.logstash.instrument.witness.pipeline.PipelineWitness;
import org.logstash.instrument.witness.pipeline.PipelinesWitness;
import org.logstash.instrument.witness.pipeline.ReloadWitness;
import org.logstash.instrument.witness.process.ProcessWitness;
import org.logstash.instrument.witness.schedule.WitnessScheduler;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -28,8 +30,10 @@ public final class Witness implements SerializableWitness {
private final ReloadWitness reloadWitness;
private final EventsWitness eventsWitness;
private final PipelinesWitness pipelinesWitness;
private final ProcessWitness processWitness;
private final WitnessScheduler processWitnessScheduler;

private static Witness _instance;
private static Witness instance;

/**
* Constructor. Consumers should use {@link #instance()} method to obtain an instance of this class.
Expand All @@ -39,17 +43,28 @@ public Witness() {
this.reloadWitness = new ReloadWitness();
this.eventsWitness = new EventsWitness();
this.pipelinesWitness = new PipelinesWitness();
this.processWitness = new ProcessWitness();
this.processWitnessScheduler = new WitnessScheduler(processWitness);
}

/**
* This is a dirty hack since the {@link Witness} needs to mirror the Ruby agent's lifecycle which, at least for testing, can mean more then 1 instance per JVM, but only 1
* active instance at any time. Exposing this allows Ruby to create the instance for use in it's agent constructor, then set it here for all to use as a singleton.
* <p>THIS IS ONLY TO BE USED BY THE RUBY AGENT</p>
*
* @param __instance The instance of the {@link Witness} to use as the singleton instance that mirror's the agent's lifecycle.
* @param newInstance The instance of the {@link Witness} to use as the singleton instance that mirror's the agent's lifecycle.
*/
public static void setInstance(Witness __instance) {
_instance = __instance;
public static synchronized void setInstance(Witness newInstance) {
//Ruby agent restart
if (instance != null) {
instance.processWitnessScheduler.shutdown();
}

instance = newInstance;

if (instance != null) {
instance.processWitnessScheduler.schedule();
}
}

/**
Expand All @@ -59,10 +74,10 @@ public static void setInstance(Witness __instance) {
* @throws IllegalStateException if attempted to be used before being set.
*/
public static Witness instance() {
if (_instance == null) {
if (instance == null) {
throw new IllegalStateException("The stats witness instance must be set before it used. Called from: " + Arrays.toString(new Throwable().getStackTrace()));
}
return _instance;
return instance;
}

public EventsWitness events() {
Expand All @@ -87,6 +102,15 @@ public PipelinesWitness pipelines() {
return pipelinesWitness;
}

/**
* Obtain a reference to the associated process witness.
*
* @return The associated {@link ProcessWitness}
*/
public ProcessWitness process() {
return processWitness;
}

/**
* Shortcut method for {@link PipelinesWitness#pipeline(String)}
*
Expand Down Expand Up @@ -133,6 +157,7 @@ public void serialize(Witness witness, JsonGenerator gen, SerializerProvider pro
}

static void innerSerialize(Witness witness, JsonGenerator gen, SerializerProvider provider) throws IOException {
witness.process().genJson(gen, provider);
witness.events().genJson(gen, provider);
witness.reloads().genJson(gen, provider);
witness.pipelinesWitness.genJson(gen, provider);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package org.logstash.instrument.witness.process;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import com.sun.management.UnixOperatingSystemMXBean;
import org.logstash.instrument.metrics.Metric;
import org.logstash.instrument.metrics.gauge.NumberGauge;
import org.logstash.instrument.witness.MetricSerializer;
import org.logstash.instrument.witness.SerializableWitness;
import org.logstash.instrument.witness.schedule.ScheduledWitness;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.util.concurrent.TimeUnit;

/**
* A scheduled witness for process metrics
*/
@JsonSerialize(using = ProcessWitness.Serializer.class)
public class ProcessWitness implements SerializableWitness, ScheduledWitness {

private static final OperatingSystemMXBean osMxBean;
private static final String KEY = "process";
public static final boolean isUnix;
private static final UnixOperatingSystemMXBean unixOsBean;
private final NumberGauge openFileDescriptors;
private final NumberGauge peakOpenFileDescriptors;
private final NumberGauge maxFileDescriptors;
private final Cpu cpu;
private final Memory memory;
private final Snitch snitch;

static {
osMxBean = ManagementFactory.getOperatingSystemMXBean();
isUnix = osMxBean instanceof UnixOperatingSystemMXBean;
unixOsBean = isUnix ? (UnixOperatingSystemMXBean) osMxBean : null;
}

/**
* Constructor
*/
public ProcessWitness() {
this.openFileDescriptors = new NumberGauge("open_file_descriptors", -1);
this.maxFileDescriptors = new NumberGauge("max_file_descriptors", -1);
this.peakOpenFileDescriptors = new NumberGauge("peak_open_file_descriptors", -1);
this.cpu = new Cpu();
this.memory = new Memory();
this.snitch = new Snitch(this);
}

@Override
public void refresh() {
if (isUnix) {
long currentOpen = unixOsBean.getOpenFileDescriptorCount();
openFileDescriptors.set(currentOpen);
if (maxFileDescriptors.getValue() == null || peakOpenFileDescriptors.getValue().longValue() < currentOpen) {
peakOpenFileDescriptors.set(currentOpen);
}
maxFileDescriptors.set(unixOsBean.getMaxFileDescriptorCount());
}
cpu.refresh();
memory.refresh();
}

/**
* Get a reference to associated snitch to get discrete metric values.
*
* @return the associate {@link Snitch}
*/
public Snitch snitch() {
return snitch;
}

/**
* An inner witness for the process / cpu metrics
*/
public class Cpu implements ScheduledWitness {
private static final String KEY = "cpu";
private final NumberGauge cpuProcessPercent;
private final NumberGauge cpuTotalInMillis;

private Cpu() {
this.cpuProcessPercent = new NumberGauge("percent", -1);
this.cpuTotalInMillis = new NumberGauge("total_in_millis", -1);
}

@Override
public void refresh() {
cpuProcessPercent.set(scaleLoadToPercent(unixOsBean.getProcessCpuLoad()));
cpuTotalInMillis.set(TimeUnit.MILLISECONDS.convert(unixOsBean.getProcessCpuTime(), TimeUnit.NANOSECONDS));
}
}

/**
* An inner witness for the the process / memory metrics
*/
public class Memory implements ScheduledWitness {
private static final String KEY = "mem";
private final NumberGauge memTotalVirtualInBytes;

private Memory() {
memTotalVirtualInBytes = new NumberGauge("total_virtual_in_bytes", -1);
}

@Override
public void refresh() {
memTotalVirtualInBytes.set(unixOsBean.getCommittedVirtualMemorySize());
}
}

@Override
public void genJson(JsonGenerator gen, SerializerProvider provider) throws IOException {
Serializer.innerSerialize(this, gen);
}

/**
* The Jackson serializer.
*/
public static final class Serializer extends StdSerializer<ProcessWitness> {
/**
* Default constructor - required for Jackson
*/
public Serializer() {
this(ProcessWitness.class);
}

/**
* Constructor
*
* @param t the type to serialize
*/
protected Serializer(Class<ProcessWitness> t) {
super(t);
}

@Override
public void serialize(ProcessWitness witness, JsonGenerator gen, SerializerProvider provider) throws IOException {
gen.writeStartObject();
innerSerialize(witness, gen);
gen.writeEndObject();
}

static void innerSerialize(ProcessWitness witness, JsonGenerator gen) throws IOException {
MetricSerializer<Metric<Number>> numberSerializer = MetricSerializer.Get.numberSerializer(gen);
gen.writeObjectFieldStart(KEY);
numberSerializer.serialize(witness.openFileDescriptors);
numberSerializer.serialize(witness.peakOpenFileDescriptors);
numberSerializer.serialize(witness.maxFileDescriptors);
//memory
gen.writeObjectFieldStart(Memory.KEY);
numberSerializer.serialize(witness.memory.memTotalVirtualInBytes);
gen.writeEndObject();
//cpu
gen.writeObjectFieldStart(Cpu.KEY);
numberSerializer.serialize(witness.cpu.cpuTotalInMillis);
numberSerializer.serialize(witness.cpu.cpuProcessPercent);

//TODO: jake load average

gen.writeEndObject();
gen.writeEndObject();
}
}

/**
* The Process snitch. Provides a means to get discrete metric values.
*/
public static final class Snitch {

private final ProcessWitness witness;

private Snitch(ProcessWitness witness) {
this.witness = witness;
}

/**
* Get the number of open file descriptors for this process
*
* @return the open file descriptors
*/
public long openFileDescriptors() {
return witness.openFileDescriptors.getValue().longValue();
}

/**
* Get the max file descriptors for this process
*
* @return the max file descriptors
*/
public long maxFileDescriptors() {
return witness.maxFileDescriptors.getValue().longValue();
}

/**
* Get the high water number of open file descriptors for this process
*
* @return the high water/ peak of the seen open file descriptors
*/
public long peakOpenFileDescriptors() {
return witness.peakOpenFileDescriptors.getValue().longValue();
}

/**
* Get the cpu percent for this process
*
* @return the cpu percent
*/
public short cpuProcessPercent() {
return witness.cpu.cpuProcessPercent.getValue().shortValue();
}

/**
* Get the total time of the cpu in milliseconds for this process
*
* @return the cpu total in milliseconds
*/
public long cpuTotalInMillis() {
return witness.cpu.cpuTotalInMillis.getValue().longValue();
}

/**
* Get the committed (virtual) memory for this process
*
* @return the committed memory
*/
public long memTotalVirtualInBytes() {
return witness.memory.memTotalVirtualInBytes.getValue().longValue();
}

}

private short scaleLoadToPercent(double load) {
if (isUnix && load >= 0) {
return Double.valueOf(Math.floor(load * 100)).shortValue();
} else {
return (short) -1;
}
}
}
Loading

0 comments on commit 5dd2fe9

Please sign in to comment.