Skip to content

Commit

Permalink
Add logic in the dashboard to handle cases where Bundle,Topic,Subscri…
Browse files Browse the repository at this point in the history
…ption,Consumer already exists

### Motivation

Related Issue : apache#3226 

Add logic in the dashboard to handle cases where Bundle,Topic,Subscription,Consumer already exists

### Modifications

Update Consumer, Bundle,Subscription,Topic if exist
  • Loading branch information
tuteng authored and sijie committed Jan 5, 2019
1 parent 1d22571 commit 2103792
Showing 1 changed file with 167 additions and 82 deletions.
249 changes: 167 additions & 82 deletions dashboard/django/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,14 @@ def _fetch_broker_stats(cluster, broker_host_port, timestamp):

clusters = dict( (cluster.name, cluster) for cluster in Cluster.objects.all() )

db_bundles = []
db_topics = []
db_subscriptions = []
db_consumers = []
db_create_bundles = []
db_update_bundles = []
db_create_topics = []
db_update_topics = []
db_create_subscriptions = []
db_update_subscriptions = []
db_create_consumers = []
db_update_consumers = []
db_replication = []

for namespace_name, bundles_stats in topics_stats.items():
Expand All @@ -94,73 +98,137 @@ def _fetch_broker_stats(cluster, broker_host_port, timestamp):
namespace.save()

for bundle_range, topics_stats in bundles_stats.items():
bundle = Bundle(
broker = broker,
namespace = namespace,
range = bundle_range,
cluster = cluster,
timestamp = timestamp)
db_bundles.append(bundle)

bundle = Bundle.objects.filter(
cluster_id=cluster.id,
namespace_id=namespace.id,
range=bundle_range)
if bundle:
temp_bundle = bundle.first()
temp_bundle.timestame = timestamp
db_update_bundles.append(temp_bundle)
bundle = temp_bundle
else:
bundle = Bundle(
broker = broker,
namespace = namespace,
range = bundle_range,
cluster = cluster,
timestamp = timestamp)
db_create_bundles.append(bundle)

for topic_name, stats in topics_stats['persistent'].items():
topic = Topic(
broker = broker,
active_broker = active_broker,
name = topic_name,
namespace = namespace,
bundle = bundle,
cluster = cluster,
timestamp = timestamp,
averageMsgSize = stats['averageMsgSize'],
msgRateIn = stats['msgRateIn'],
msgRateOut = stats['msgRateOut'],
msgThroughputIn = stats['msgThroughputIn'],
msgThroughputOut = stats['msgThroughputOut'],
pendingAddEntriesCount = stats['pendingAddEntriesCount'],
producerCount = stats['producerCount'],
storageSize = stats['storageSize']
)

db_topics.append(topic)
topic = Topic.objects.filter(
cluster_id=cluster.id,
bundle_id=bundle.id,
namespace_id=namespace.id,
broker_id=broker.id,
name=topic_name)
if topic:
temp_topic = topic.first()
temp_topic.timestamp = timestamp
temp_topic.averageMsgSize = stats['averageMsgSize']
temp_topic.msgRateIn = stats['msgRateIn']
temp_topic.msgRateOut = stats['msgRateOut']
temp_topic.msgThroughputIn = stats['msgThroughputIn']
temp_topic.msgThroughputOut = stats['msgThroughputOut']
temp_topic.pendingAddEntriesCount = stats['pendingAddEntriesCount']
temp_topic.producerCount = stats['producerCount']
temp_topic.storageSize = stats['storageSize']
db_update_topics.append(temp_topic)
topic = temp_topic
else:
topic = Topic(
broker = broker,
active_broker = active_broker,
name = topic_name,
namespace = namespace,
bundle = bundle,
cluster = cluster,
timestamp = timestamp,
averageMsgSize = stats['averageMsgSize'],
msgRateIn = stats['msgRateIn'],
msgRateOut = stats['msgRateOut'],
msgThroughputIn = stats['msgThroughputIn'],
msgThroughputOut = stats['msgThroughputOut'],
pendingAddEntriesCount = stats['pendingAddEntriesCount'],
producerCount = stats['producerCount'],
storageSize = stats['storageSize']
)
db_create_topics.append(topic)
totalBacklog = 0
numSubscriptions = 0
numConsumers = 0

for subscription_name, subStats in stats['subscriptions'].items():
numSubscriptions += 1
subscription = Subscription(
topic = topic,
name = subscription_name,
namespace = namespace,
timestamp = timestamp,
msgBacklog = subStats['msgBacklog'],
msgRateExpired = subStats['msgRateExpired'],
msgRateOut = subStats['msgRateOut'],
msgRateRedeliver = subStats.get('msgRateRedeliver', 0),
msgThroughputOut = subStats['msgThroughputOut'],
subscriptionType = subStats['type'][0],
unackedMessages = subStats.get('unackedMessages', 0),
)
db_subscriptions.append(subscription)
subscription = Subscription.objects.filter(
topic_id=topic.id,
namespace_id=namespace.id,
name=subscription_name)
if subscription:
temp_subscription = subscription.first()
temp_subscription.timestamp = timestamp
temp_subscription.msgBacklog = subStats['msgBacklog']
temp_subscription.msgRateExpired = subStats['msgRateExpired']
temp_subscription.msgRateOut = subStats['msgRateOut']
temp_subscription.msgRateRedeliver = subStats.get('msgRateRedeliver', 0)
temp_subscription.msgThroughputOut = subStats['msgThroughputOut']
temp_subscription.subscriptionType = subStats['type'][0]
temp_subscription.unackedMessages = subStats.get('unackedMessages', 0)
db_update_subscriptions.append(temp_subscription)
subscription = temp_subscription
else:
subscription = Subscription(
topic = topic,
name = subscription_name,
namespace = namespace,
timestamp = timestamp,
msgBacklog = subStats['msgBacklog'],
msgRateExpired = subStats['msgRateExpired'],
msgRateOut = subStats['msgRateOut'],
msgRateRedeliver = subStats.get('msgRateRedeliver', 0),
msgThroughputOut = subStats['msgThroughputOut'],
subscriptionType = subStats['type'][0],
unackedMessages = subStats.get('unackedMessages', 0),
)
db_create_subscriptions.append(subscription)

totalBacklog += subStats['msgBacklog']

for consStats in subStats['consumers']:
numConsumers += 1
consumer = Consumer(
subscription = subscription,
timestamp = timestamp,
address = consStats['address'],
availablePermits = consStats.get('availablePermits', 0),
connectedSince = parse_date(consStats.get('connectedSince')),
consumerName = consStats.get('consumerName'),
msgRateOut = consStats.get('msgRateOut', 0),
msgRateRedeliver = consStats.get('msgRateRedeliver', 0),
msgThroughputOut = consStats.get('msgThroughputOut', 0),
unackedMessages = consStats.get('unackedMessages', 0),
blockedConsumerOnUnackedMsgs = consStats.get('blockedConsumerOnUnackedMsgs', False)
)
db_consumers.append(consumer)
consumer = Consumer.objects.filter(
subscription_id=subscription.id,
consumerName=consStats.get('consumerName'))
if consumer:
temp_consumer = consumer.first()
temp_consumer.timestamp = timestamp
temp_consumer.address = consStats['address']
temp_consumer.availablePermits = consStats.get('availablePermits', 0)
temp_consumer.connectedSince = parse_date(consStats.get('connectedSince'))
temp_consumer.msgRateOut = consStats.get('msgRateOut', 0)
temp_consumer.msgRateRedeliver = consStats.get('msgRateRedeliver', 0)
temp_consumer.msgThroughputOut = consStats.get('msgThroughputOut', 0)
temp_consumer.unackedMessages = consStats.get('unackedMessages', 0)
temp_consumer.blockedConsumerOnUnackedMsgs = consStats.get('blockedConsumerOnUnackedMsgs', False)
db_update_consumers.append(temp_consumer)
consumer = temp_consumer
else:
consumer = Consumer(
subscription = subscription,
timestamp = timestamp,
address = consStats['address'],
availablePermits = consStats.get('availablePermits', 0),
connectedSince = parse_date(consStats.get('connectedSince')),
consumerName = consStats.get('consumerName'),
msgRateOut = consStats.get('msgRateOut', 0),
msgRateRedeliver = consStats.get('msgRateRedeliver', 0),
msgThroughputOut = consStats.get('msgThroughputOut', 0),
unackedMessages = consStats.get('unackedMessages', 0),
blockedConsumerOnUnackedMsgs = consStats.get('blockedConsumerOnUnackedMsgs', False)
)
db_create_consumers.append(consumer)

topic.backlog = totalBacklog
topic.subscriptionCount = numSubscriptions
Expand Down Expand Up @@ -210,46 +278,63 @@ def _fetch_broker_stats(cluster, broker_host_port, timestamp):
topic.localThroughputIn = topic.msgThroughputIn - replicationThroughputIn
topic.localThroughputOut = topic.msgThroughputIn - replicationThroughputOut


if connection.vendor == 'postgresql':
# Bulk insert into db
Bundle.objects.bulk_create(db_bundles, batch_size=10000)
Bundle.objects.bulk_create(db_create_bundles, batch_size=10000)

# Trick to refresh primary keys after previous bulk import
for topic in db_topics: topic.bundle = topic.bundle
Topic.objects.bulk_create(db_topics, batch_size=10000)
for topic in db_create_topics: topic.bundle = topic.bundle
Topic.objects.bulk_create(db_create_topics, batch_size=10000)

for subscription in db_subscriptions: subscription.topic = subscription.topic
Subscription.objects.bulk_create(db_subscriptions, batch_size=10000)
for subscription in db_create_subscriptions: subscription.topic = subscription.topic
Subscription.objects.bulk_create(db_create_subscriptions, batch_size=10000)

for consumer in db_consumers: consumer.subscription = consumer.subscription
Consumer.objects.bulk_create(db_consumers, batch_size=10000)
for consumer in db_create_consumers: consumer.subscription = consumer.subscription
Consumer.objects.bulk_create(db_create_consumers, batch_size=10000)

for replication in db_replication: replication.topic = replication.topic
Replication.objects.bulk_create(db_replication, batch_size=10000)

else:
# For other DB providers we have to insert one by one
# to be able to retrieve the PK of the newly inserted records
for bundle in db_bundles:
bundle.save()

for topic in db_topics:
topic.bundle = topic.bundle
topic.save()
update_or_create_object(
db_update_bundles,
db_update_topics,
db_update_consumers,
db_update_subscriptions)

for subscription in db_subscriptions:
subscription.topic = subscription.topic
subscription.save()

for consumer in db_consumers:
consumer.subscription = consumer.subscription
consumer.save()
else:
update_or_create_object(
db_create_bundles,
db_create_topics,
db_create_consumers,
db_create_subscriptions)
update_or_create_object(
db_update_bundles,
db_update_topics,
db_update_consumers,
db_update_subscriptions)

for replication in db_replication:
replication.topic = replication.topic
replication.save()

def update_or_create_object(db_bundles, db_topics, db_consumers, db_subscriptions):
# For DB providers we have to insert or update one by one
# to be able to retrieve the PK of the newly inserted records
for bundle in db_bundles:
bundle.save()

for topic in db_topics:
topic.bundle = topic.bundle
topic.save()

for subscription in db_subscriptions:
subscription.topic = subscription.topic
subscription.save()

for consumer in db_consumers:
consumer.subscription = consumer.subscription
consumer.save()


def fetch_stats():
timestamp = current_milli_time()
Expand Down

0 comments on commit 2103792

Please sign in to comment.