Skip to content

Commit

Permalink
Updated to subprocess.run to avoid zombie threading issues
Browse files Browse the repository at this point in the history
  • Loading branch information
dgildeh committed Feb 27, 2018
1 parent 11365df commit fb00ed7
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 15 deletions.
14 changes: 12 additions & 2 deletions python/jmxquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
JAR_PATH = os.path.dirname(os.path.realpath(__file__)) + '/JMXQuery-0.1.7.jar'
# Default Java path
DEFAULT_JAVA_PATH = 'java'
# Default timeout for running jar in seconds
DEFAULT_JAR_TIMEOUT = 10

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -151,12 +153,20 @@ def __run_jar(self, queries: List[JMXQuery]) -> List[JMXQuery]:

jsonOutput = "[]"
try:
jsonOutput = subprocess.check_output(command)
output = subprocess.run(command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=DEFAULT_JAR_TIMEOUT,
check=True)

jsonOutput = output.stdout.decode('utf-8')
except subprocess.TimeoutExpired as err:
logger.error(f"Error calling JMX, Timeout of {err.timeout} Expired: " + err.output.decode('utf-8'))
except subprocess.CalledProcessError as err:
logger.error("Error calling JMX: " + err.output.decode('utf-8'))
raise err

logger.debug("JSON Output Received: " + jsonOutput.decode('utf-8'))
logger.debug("JSON Output Received: " + jsonOutput)
metrics = self.__load_from_json(jsonOutput)
return metrics

Expand Down
2 changes: 1 addition & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
setup(
name = 'jmxquery',
packages = ['jmxquery'],
version = '0.2.0',
version = '0.3.0',
description = 'A JMX Interface for Python to Query runtime metrics in a JVM',
long_description=long_description,
author = 'David Gildeh',
Expand Down
113 changes: 101 additions & 12 deletions python/tests/test_JMXQuery.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""
import logging, sys
import threading
from nose.tools import assert_greater_equal

from jmxquery import JMXConnection, JMXQuery, MetricType
Expand All @@ -16,23 +17,111 @@
level=logging.DEBUG)

CONNECTION_URL = "service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi"
jmxConnection = JMXConnection(CONNECTION_URL)

# def test_wildcard_query():
#
# jmxQuery = [JMXQuery("*:*")]
# metrics = jmxConnection.query(jmxQuery)
# printMetrics(metrics)
# assert_greater_equal(len(metrics), 4699)
def test_wildcard_query():

def test_specific_queries():
jmxConnection = JMXConnection(CONNECTION_URL)
jmxQuery = [JMXQuery("*:*")]
metrics = jmxConnection.query(jmxQuery)
printMetrics(metrics)
assert_greater_equal(len(metrics), 4699)

def test_kafka_plugin():

jmxConnection = JMXConnection(CONNECTION_URL)
jmxQuery = [
# UnderReplicatedPartitions
JMXQuery("kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions/Value",
metric_name="kafka_server_ReplicaManager_UnderReplicatedPartitions"),

# OfflinePartitionsCount
JMXQuery("kafka.controller:type=KafkaController,name=OfflinePartitionsCount/Value",
metric_name="kafka_controller_KafkaController_OfflinePartitionsCount"),

# ActiveControllerCount
JMXQuery("kafka.controller:type=KafkaController,name=ActiveControllerCount/Value",
metric_name="kafka_controller_KafkaController_ActiveControllerCount"),

# MessagesInPerSec
JMXQuery("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec/Count",
metric_name="kafka_server_BrokerTopicMetrics_MessagesInPerSec_Count"),

# BytesInPerSec
JMXQuery("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec/Count",
metric_name="kafka_server_BrokerTopicMetrics_BytesInPerSec_Count"),

# BytesOutPerSec
JMXQuery("kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec/Count",
metric_name="kafka_server_BrokerTopicMetrics_BytesOutPerSec_Count"),

# RequestsPerSec
JMXQuery("kafka.network:type=RequestMetrics,name=RequestsPerSec,request=*/Count",
metric_name="kafka_network_RequestMetrics_RequestsPerSec_Count",
metric_labels={"request": "{request}"}),

# TotalTimeMs
JMXQuery("kafka.network:type=RequestMetrics,name=TotalTimeMs,request=*",
metric_name="kafka_network_RequestMetrics_TotalTimeMs_{attribute}",
metric_labels={"request": "{request}"}),

# LeaderElectionsPerSec
JMXQuery("kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs/Count",
metric_name="kafka_cluster_ControllerStats_LeaderElectionRateAndTimeMs_Count"),

# UncleanLeaderElectionsPerSec
JMXQuery("kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec/Count",
metric_name="kafka_cluster_ControllerStats_UncleanLeaderElectionsPerSec_Count"),

# PartitionCount
JMXQuery("kafka.server:type=ReplicaManager,name=PartitionCount/Value",
metric_name="kafka_server_ReplicaManager_PartitionCount"),

# ISRShrinkRate
JMXQuery("kafka.server:type=ReplicaManager,name=IsrShrinksPerSec",
metric_name="kafka_server_ReplicaManager_IsrShrinksPerSec_{attribute}"),

# ISRExpandRate
JMXQuery("kafka.server:type=ReplicaManager,name=IsrExpandsPerSec",
metric_name="kafka_server_ReplicaManager_IsrExpandsPerSec_{attribute}"),

# NetworkProcessorAvgIdlePercent
JMXQuery("kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent/Value",
metric_name="kafka_network_SocketServer_NetworkProcessorAvgIdlePercent"),

# RequestHandlerAvgIdlePercent
JMXQuery("kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent",
metric_name="kafka_server_KafkaRequestHandlerPool_RequestHandlerAvgIdlePercent_{attribute}"),

# ZooKeeperDisconnectsPerSec
JMXQuery("kafka.server:type=SessionExpireListener,name=ZooKeeperDisconnectsPerSec",
metric_name="kafka_server_SessionExpireListener_ZooKeeperDisconnectsPerSec_{attribute}"),

# ZooKeeperExpiresPerSec
JMXQuery("kafka.server:type=SessionExpireListener,name=ZooKeeperExpiresPerSec",
metric_name="kafka_server_SessionExpireListener_ZooKeeperExpiresPerSec_{attribute}"),

]

jmxQuery = [JMXQuery("kafka.cluster:type=*,name=*,topic=*,partition=*",
metric_name="kafka_cluster_{type}_{name}",
metric_labels={"topic" : "{topic}", "partition" : "{partition}"})]
print(jmxQuery[0].to_query_string())
metrics = jmxConnection.query(jmxQuery)
printMetrics(metrics)
assert_greater_equal(len(metrics), 525)

def test_threading():
"""
This test is to check that we don't get any threading issues as the module will be
run in concurrent plugin threads
"""
list_threads = []
for x in range(0, 10):
t = threading.Thread(target=test_kafka_plugin)
t.daemon = True
list_threads.append(t)
t.start()
sys.stdout.write("All threads started.\n")

for t in list_threads:
t.join() # Wait until thread terminates its task
sys.stdout.write("All threads completed.\n")

def printMetrics(metrics):
for metric in metrics:
Expand Down

0 comments on commit fb00ed7

Please sign in to comment.