Skip to content

Commit

Permalink
GEODE-6763: Send GatewayReceiver events received to Micrometer (apach…
Browse files Browse the repository at this point in the history
…e#3573)

Updated GatewayReceiverStats to add a
cache.gatewayreceiver.events.received meter to the registry.

The meter is a FunctionCounter that retrieves its value from the
"eventsReceived" stat.

Co-Authored-By: Aaron Lindsey <[email protected]>
Co-Authored-By: Michael Oleske <[email protected]>
Co-authored-by: Dale Emery <[email protected]>
Co-authored-by: Kirk Lund <[email protected]>
  • Loading branch information
4 people committed Jun 10, 2019
1 parent 7693689 commit 3d7113f
Show file tree
Hide file tree
Showing 8 changed files with 660 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.metrics;

import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;

import io.micrometer.core.instrument.FunctionCounter;
import org.apache.commons.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.test.compiler.ClassBuilder;
import org.apache.geode.test.junit.categories.MetricsTest;
import org.apache.geode.test.junit.rules.gfsh.GfshRule;

@Category(MetricsTest.class)
public class GatewayReceiverMetricsTest {

@Rule
public GfshRule gfshRule = new GfshRule();

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

private static final String SENDER_LOCATOR_NAME = "sender-locator";
private static final String RECEIVER_LOCATOR_NAME = "receiver-locator";
private static final String SENDER_SERVER_NAME = "sender-server";
private static final String RECEIVER_SERVER_NAME = "receiver-server";
private static final String REGION_NAME = "region";
private static final String GFSH_COMMAND_SEPARATOR = " ";
private String senderLocatorFolder;
private String receiverLocatorFolder;
private String senderServerFolder;
private String receiverServerFolder;
private int receiverLocatorPort;
private int senderLocatorPort;

@Before
public void startClusters() throws IOException {
int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(8);

receiverLocatorPort = ports[0];
senderLocatorPort = ports[1];
int senderServerPort = ports[2];
int receiverServerPort = ports[3];
int senderLocatorJmxPort = ports[4];
int receiverLocatorJmxPort = ports[5];
int senderLocatorHttpPort = ports[6];
int receiverLocatorHttpPort = ports[7];

int senderSystemId = 2;
int receiverSystemId = 1;

senderLocatorFolder = newFolder(SENDER_LOCATOR_NAME);
receiverLocatorFolder = newFolder(RECEIVER_LOCATOR_NAME);
senderServerFolder = newFolder(SENDER_SERVER_NAME);
receiverServerFolder = newFolder(RECEIVER_SERVER_NAME);

String startSenderLocatorCommand = String.join(GFSH_COMMAND_SEPARATOR,
"start locator",
"--name=" + SENDER_LOCATOR_NAME,
"--dir=" + senderLocatorFolder,
"--port=" + senderLocatorPort,
"--locators=localhost[" + senderLocatorPort + "]",
"--J=-Dgemfire.remote-locators=localhost[" + receiverLocatorPort + "]",
"--J=-Dgemfire.distributed-system-id=" + senderSystemId,
"--J=-Dgemfire.jmx-manager-start=true",
"--J=-Dgemfire.jmx-manager-http-port=" + senderLocatorHttpPort,
"--J=-Dgemfire.jmx-manager-port=" + senderLocatorJmxPort);

String startReceiverLocatorCommand = String.join(GFSH_COMMAND_SEPARATOR,
"start locator",
"--name=" + RECEIVER_LOCATOR_NAME,
"--dir=" + receiverLocatorFolder,
"--port=" + receiverLocatorPort,
"--locators=localhost[" + receiverLocatorPort + "]",
"--J=-Dgemfire.remote-locators=localhost[" + senderLocatorPort + "]",
"--J=-Dgemfire.distributed-system-id=" + receiverSystemId,
"--J=-Dgemfire.jmx-manager-start=true ",
"--J=-Dgemfire.jmx-manager-http-port=" + receiverLocatorHttpPort,
"--J=-Dgemfire.jmx-manager-port=" + receiverLocatorJmxPort);

String startSenderServerCommand = String.join(GFSH_COMMAND_SEPARATOR,
"start server",
"--name=" + SENDER_SERVER_NAME,
"--dir=" + senderServerFolder,
"--locators=localhost[" + senderLocatorPort + "]",
"--server-port=" + senderServerPort,
"--J=-Dgemfire.distributed-system-id=" + senderSystemId);

String metricsPublishingServiceJarPath =
newJarForMetricsPublishingServiceClass(SimpleMetricsPublishingService.class,
"metrics-publishing-service.jar");

String startReceiverServerCommand = String.join(GFSH_COMMAND_SEPARATOR,
"start server",
"--name=" + RECEIVER_SERVER_NAME,
"--dir=" + receiverServerFolder,
"--locators=localhost[" + receiverLocatorPort + "]",
"--server-port=" + receiverServerPort,
"--classpath=" + metricsPublishingServiceJarPath,
"--J=-Dgemfire.distributed-system-id=" + receiverSystemId);

gfshRule.execute(startSenderLocatorCommand, startReceiverLocatorCommand,
startSenderServerCommand, startReceiverServerCommand);

String gatewaySenderId = "gs";

String connectToSenderLocatorCommand = "connect --locator=localhost[" + senderLocatorPort + "]";

String startGatewaySenderCommand = String.join(GFSH_COMMAND_SEPARATOR,
"create gateway-sender",
"--id=" + gatewaySenderId,
"--parallel=false",
"--remote-distributed-system-id=" + receiverSystemId);

String createSenderRegionCommand = String.join(GFSH_COMMAND_SEPARATOR,
"create region",
"--name=" + REGION_NAME,
"--type=" + RegionShortcut.REPLICATE.name(),
"--gateway-sender-id=" + gatewaySenderId);

gfshRule.execute(connectToSenderLocatorCommand, startGatewaySenderCommand);

// There is a bug in the GFSH create gateway-sender command where it returns before the system
// has recognized the creation status: GEODE-6777. We can remove the following when that bug is
// fixed.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Don't care
}

gfshRule.execute(connectToSenderLocatorCommand, createSenderRegionCommand);

String connectToReceiverLocatorCommand =
"connect --locator=localhost[" + receiverLocatorPort + "]";
String startGatewayReceiverCommand = "create gateway-receiver";
String createReceiverRegionCommand = String.join(GFSH_COMMAND_SEPARATOR,
"create region",
"--name=" + REGION_NAME,
"--type=" + RegionShortcut.REPLICATE.name());

gfshRule.execute(connectToReceiverLocatorCommand, startGatewayReceiverCommand,
createReceiverRegionCommand);

// Deploy function to members
String functionJarPath =
newJarForFunctionClass(GetEventsReceivedCountFunction.class, "function.jar");
String deployCommand = "deploy --jar=" + functionJarPath;
String listFunctionsCommand = "list functions";

gfshRule.execute(connectToReceiverLocatorCommand, deployCommand, listFunctionsCommand);
}

@After
public void stopClusters() {
String stopReceiverServerCommand = "stop server --dir=" + receiverServerFolder;
String stopSenderServerCommand = "stop server --dir=" + senderServerFolder;
String stopReceiverLocatorCommand = "stop locator --dir=" + receiverLocatorFolder;
String stopSenderLocatorCommand = "stop locator --dir=" + senderLocatorFolder;

gfshRule.execute(stopReceiverServerCommand, stopSenderServerCommand, stopReceiverLocatorCommand,
stopSenderLocatorCommand);
}

@Test
public void whenPerformingOperations_thenGatewayReceiverEventsReceivedIncreases() {
String connectToSenderLocatorCommand = "connect --locator=localhost[" + senderLocatorPort + "]";

String doPutCommand = String.join(GFSH_COMMAND_SEPARATOR,
"put",
"--region=" + REGION_NAME,
"--key=foo",
"--value=bar");

String doRemoveCommand = String.join(GFSH_COMMAND_SEPARATOR,
"remove",
"--region=" + REGION_NAME,
"--key=foo");

String doCreateRegionCommand = String.join(GFSH_COMMAND_SEPARATOR,
"create region",
"--name=blah",
"--type=" + RegionShortcut.REPLICATE.name());

gfshRule.execute(connectToSenderLocatorCommand, doPutCommand, doRemoveCommand,
doCreateRegionCommand);

String connectToReceiverLocatorCommand =
"connect --locator=localhost[" + receiverLocatorPort + "]";
String executeFunctionCommand = "execute function --id=" + GetEventsReceivedCountFunction.ID;

Collection<String> gatewayEventsExpectedToReceive =
Arrays.asList(doPutCommand, doRemoveCommand);

await().untilAsserted(() -> {
String output =
gfshRule.execute(connectToReceiverLocatorCommand, executeFunctionCommand).getOutputText();

assertThat(output.trim())
.as("Returned count of events received.")
.endsWith("[" + gatewayEventsExpectedToReceive.size() + ".0]");
});
}

private String newFolder(String folderName) throws IOException {
return temporaryFolder.newFolder(folderName).getAbsolutePath();
}

private String newJarForFunctionClass(Class clazz, String jarName) throws IOException {
File jar = temporaryFolder.newFile(jarName);
new ClassBuilder().writeJarFromClass(clazz, jar);
return jar.getAbsolutePath();
}

private String newJarForMetricsPublishingServiceClass(Class clazz, String jarName)
throws IOException {
File jar = temporaryFolder.newFile(jarName);

String className = clazz.getName();
String classAsPath = className.replace('.', '/') + ".class";
InputStream stream = clazz.getClassLoader().getResourceAsStream(classAsPath);
byte[] bytes = IOUtils.toByteArray(stream);
try (FileOutputStream out = new FileOutputStream(jar)) {
JarOutputStream jarOutputStream = new JarOutputStream(out);

// Add the class file to the JAR file
JarEntry classEntry = new JarEntry(classAsPath);
classEntry.setTime(System.currentTimeMillis());
jarOutputStream.putNextEntry(classEntry);
jarOutputStream.write(bytes);
jarOutputStream.closeEntry();

String metaInfPath = "META-INF/services/org.apache.geode.metrics.MetricsPublishingService";

JarEntry metaInfEntry = new JarEntry(metaInfPath);
metaInfEntry.setTime(System.currentTimeMillis());
jarOutputStream.putNextEntry(metaInfEntry);
jarOutputStream.write(className.getBytes());
jarOutputStream.closeEntry();

jarOutputStream.close();
}

return jar.getAbsolutePath();
}

public static class GetEventsReceivedCountFunction implements Function<Void> {
static final String ID = "GetEventsReceivedCountFunction";

@Override
public void execute(FunctionContext<Void> context) {
FunctionCounter eventsReceivedCounter = SimpleMetricsPublishingService.getRegistry()
.find("cache.gatewayreceiver.events.received")
.functionCounter();

Object result = eventsReceivedCounter == null
? "Meter not found."
: eventsReceivedCounter.count();

context.getResultSender().lastResult(result);
}

@Override
public String getId() {
return ID;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.metrics;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;

public class SimpleMetricsPublishingService implements MetricsPublishingService {

private static final MeterRegistry registry = new SimpleMeterRegistry();

private volatile MetricsSession session;

public static MeterRegistry getRegistry() {
return registry;
}

@Override
public void start(MetricsSession session) {
this.session = session;

// add your registry as a sub-registry to the cache's composite registry
session.addSubregistry(registry);
}

@Override
public void stop() {
// clean up any resources used by your meter registry
session.removeSubregistry(registry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.util.Properties;

import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -69,7 +70,9 @@ public void setUp() throws Exception {
new CacheFactory().create();

StatisticsFactory statisticsFactory = system.getStatisticsManager();
receiverStats = createGatewayReceiverStats(statisticsFactory, "Test Sock Name");
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
receiverStats = createGatewayReceiverStats(statisticsFactory, "Test Sock Name",
meterRegistry);

GatewayReceiver gatewayReceiver = mock(GatewayReceiver.class);
InternalCacheServer receiverServer = mock(InternalCacheServer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import io.micrometer.core.instrument.MeterRegistry;
import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelException;
Expand Down Expand Up @@ -597,7 +598,9 @@ public class AcceptorImpl implements Acceptor, Runnable {
StatisticsFactory statisticsFactory =
internalCache.getInternalDistributedSystem().getStatisticsManager();
if (isGatewayReceiver()) {
stats = GatewayReceiverStats.createGatewayReceiverStats(statisticsFactory, sockName);
MeterRegistry meterRegistry = internalCache.getMeterRegistry();
stats = GatewayReceiverStats.createGatewayReceiverStats(statisticsFactory, sockName,
meterRegistry);
} else {
stats = new CacheServerStats(statisticsFactory, sockName);
}
Expand Down
Loading

0 comments on commit 3d7113f

Please sign in to comment.