From 21037920be99efdfd24d55d09c38b2b3caf94e14 Mon Sep 17 00:00:00 2001 From: tuteng Date: Sat, 5 Jan 2019 17:36:10 +0800 Subject: [PATCH] Add logic in the dashboard to handle cases where Bundle,Topic,Subscription,Consumer already exists ### Motivation Related Issue : #3226 Add logic in the dashboard to handle cases where Bundle,Topic,Subscription,Consumer already exists ### Modifications Update Consumer, Bundle,Subscription,Topic if exist --- dashboard/django/collector.py | 249 +++++++++++++++++++++++----------- 1 file changed, 167 insertions(+), 82 deletions(-) diff --git a/dashboard/django/collector.py b/dashboard/django/collector.py index c8c3b13cfc5b4..7e43376465c6f 100755 --- a/dashboard/django/collector.py +++ b/dashboard/django/collector.py @@ -77,10 +77,14 @@ def _fetch_broker_stats(cluster, broker_host_port, timestamp): clusters = dict( (cluster.name, cluster) for cluster in Cluster.objects.all() ) - db_bundles = [] - db_topics = [] - db_subscriptions = [] - db_consumers = [] + db_create_bundles = [] + db_update_bundles = [] + db_create_topics = [] + db_update_topics = [] + db_create_subscriptions = [] + db_update_subscriptions = [] + db_create_consumers = [] + db_update_consumers = [] db_replication = [] for namespace_name, bundles_stats in topics_stats.items(): @@ -94,73 +98,137 @@ def _fetch_broker_stats(cluster, broker_host_port, timestamp): namespace.save() for bundle_range, topics_stats in bundles_stats.items(): - bundle = Bundle( - broker = broker, - namespace = namespace, - range = bundle_range, - cluster = cluster, - timestamp = timestamp) - db_bundles.append(bundle) + bundle = Bundle.objects.filter( + cluster_id=cluster.id, + namespace_id=namespace.id, + range=bundle_range) + if bundle: + temp_bundle = bundle.first() + temp_bundle.timestame = timestamp + db_update_bundles.append(temp_bundle) + bundle = temp_bundle + else: + bundle = Bundle( + broker = broker, + namespace = namespace, + range = bundle_range, + cluster = cluster, + timestamp = timestamp) + db_create_bundles.append(bundle) + for topic_name, stats in topics_stats['persistent'].items(): - topic = Topic( - broker = broker, - active_broker = active_broker, - name = topic_name, - namespace = namespace, - bundle = bundle, - cluster = cluster, - timestamp = timestamp, - averageMsgSize = stats['averageMsgSize'], - msgRateIn = stats['msgRateIn'], - msgRateOut = stats['msgRateOut'], - msgThroughputIn = stats['msgThroughputIn'], - msgThroughputOut = stats['msgThroughputOut'], - pendingAddEntriesCount = stats['pendingAddEntriesCount'], - producerCount = stats['producerCount'], - storageSize = stats['storageSize'] - ) - - db_topics.append(topic) + topic = Topic.objects.filter( + cluster_id=cluster.id, + bundle_id=bundle.id, + namespace_id=namespace.id, + broker_id=broker.id, + name=topic_name) + if topic: + temp_topic = topic.first() + temp_topic.timestamp = timestamp + temp_topic.averageMsgSize = stats['averageMsgSize'] + temp_topic.msgRateIn = stats['msgRateIn'] + temp_topic.msgRateOut = stats['msgRateOut'] + temp_topic.msgThroughputIn = stats['msgThroughputIn'] + temp_topic.msgThroughputOut = stats['msgThroughputOut'] + temp_topic.pendingAddEntriesCount = stats['pendingAddEntriesCount'] + temp_topic.producerCount = stats['producerCount'] + temp_topic.storageSize = stats['storageSize'] + db_update_topics.append(temp_topic) + topic = temp_topic + else: + topic = Topic( + broker = broker, + active_broker = active_broker, + name = topic_name, + namespace = namespace, + bundle = bundle, + cluster = cluster, + timestamp = timestamp, + averageMsgSize = stats['averageMsgSize'], + msgRateIn = stats['msgRateIn'], + msgRateOut = stats['msgRateOut'], + msgThroughputIn = stats['msgThroughputIn'], + msgThroughputOut = stats['msgThroughputOut'], + pendingAddEntriesCount = stats['pendingAddEntriesCount'], + producerCount = stats['producerCount'], + storageSize = stats['storageSize'] + ) + db_create_topics.append(topic) totalBacklog = 0 numSubscriptions = 0 numConsumers = 0 for subscription_name, subStats in stats['subscriptions'].items(): numSubscriptions += 1 - subscription = Subscription( - topic = topic, - name = subscription_name, - namespace = namespace, - timestamp = timestamp, - msgBacklog = subStats['msgBacklog'], - msgRateExpired = subStats['msgRateExpired'], - msgRateOut = subStats['msgRateOut'], - msgRateRedeliver = subStats.get('msgRateRedeliver', 0), - msgThroughputOut = subStats['msgThroughputOut'], - subscriptionType = subStats['type'][0], - unackedMessages = subStats.get('unackedMessages', 0), - ) - db_subscriptions.append(subscription) + subscription = Subscription.objects.filter( + topic_id=topic.id, + namespace_id=namespace.id, + name=subscription_name) + if subscription: + temp_subscription = subscription.first() + temp_subscription.timestamp = timestamp + temp_subscription.msgBacklog = subStats['msgBacklog'] + temp_subscription.msgRateExpired = subStats['msgRateExpired'] + temp_subscription.msgRateOut = subStats['msgRateOut'] + temp_subscription.msgRateRedeliver = subStats.get('msgRateRedeliver', 0) + temp_subscription.msgThroughputOut = subStats['msgThroughputOut'] + temp_subscription.subscriptionType = subStats['type'][0] + temp_subscription.unackedMessages = subStats.get('unackedMessages', 0) + db_update_subscriptions.append(temp_subscription) + subscription = temp_subscription + else: + subscription = Subscription( + topic = topic, + name = subscription_name, + namespace = namespace, + timestamp = timestamp, + msgBacklog = subStats['msgBacklog'], + msgRateExpired = subStats['msgRateExpired'], + msgRateOut = subStats['msgRateOut'], + msgRateRedeliver = subStats.get('msgRateRedeliver', 0), + msgThroughputOut = subStats['msgThroughputOut'], + subscriptionType = subStats['type'][0], + unackedMessages = subStats.get('unackedMessages', 0), + ) + db_create_subscriptions.append(subscription) totalBacklog += subStats['msgBacklog'] for consStats in subStats['consumers']: numConsumers += 1 - consumer = Consumer( - subscription = subscription, - timestamp = timestamp, - address = consStats['address'], - availablePermits = consStats.get('availablePermits', 0), - connectedSince = parse_date(consStats.get('connectedSince')), - consumerName = consStats.get('consumerName'), - msgRateOut = consStats.get('msgRateOut', 0), - msgRateRedeliver = consStats.get('msgRateRedeliver', 0), - msgThroughputOut = consStats.get('msgThroughputOut', 0), - unackedMessages = consStats.get('unackedMessages', 0), - blockedConsumerOnUnackedMsgs = consStats.get('blockedConsumerOnUnackedMsgs', False) - ) - db_consumers.append(consumer) + consumer = Consumer.objects.filter( + subscription_id=subscription.id, + consumerName=consStats.get('consumerName')) + if consumer: + temp_consumer = consumer.first() + temp_consumer.timestamp = timestamp + temp_consumer.address = consStats['address'] + temp_consumer.availablePermits = consStats.get('availablePermits', 0) + temp_consumer.connectedSince = parse_date(consStats.get('connectedSince')) + temp_consumer.msgRateOut = consStats.get('msgRateOut', 0) + temp_consumer.msgRateRedeliver = consStats.get('msgRateRedeliver', 0) + temp_consumer.msgThroughputOut = consStats.get('msgThroughputOut', 0) + temp_consumer.unackedMessages = consStats.get('unackedMessages', 0) + temp_consumer.blockedConsumerOnUnackedMsgs = consStats.get('blockedConsumerOnUnackedMsgs', False) + db_update_consumers.append(temp_consumer) + consumer = temp_consumer + else: + consumer = Consumer( + subscription = subscription, + timestamp = timestamp, + address = consStats['address'], + availablePermits = consStats.get('availablePermits', 0), + connectedSince = parse_date(consStats.get('connectedSince')), + consumerName = consStats.get('consumerName'), + msgRateOut = consStats.get('msgRateOut', 0), + msgRateRedeliver = consStats.get('msgRateRedeliver', 0), + msgThroughputOut = consStats.get('msgThroughputOut', 0), + unackedMessages = consStats.get('unackedMessages', 0), + blockedConsumerOnUnackedMsgs = consStats.get('blockedConsumerOnUnackedMsgs', False) + ) + db_create_consumers.append(consumer) topic.backlog = totalBacklog topic.subscriptionCount = numSubscriptions @@ -210,46 +278,63 @@ def _fetch_broker_stats(cluster, broker_host_port, timestamp): topic.localThroughputIn = topic.msgThroughputIn - replicationThroughputIn topic.localThroughputOut = topic.msgThroughputIn - replicationThroughputOut - if connection.vendor == 'postgresql': # Bulk insert into db - Bundle.objects.bulk_create(db_bundles, batch_size=10000) + Bundle.objects.bulk_create(db_create_bundles, batch_size=10000) # Trick to refresh primary keys after previous bulk import - for topic in db_topics: topic.bundle = topic.bundle - Topic.objects.bulk_create(db_topics, batch_size=10000) + for topic in db_create_topics: topic.bundle = topic.bundle + Topic.objects.bulk_create(db_create_topics, batch_size=10000) - for subscription in db_subscriptions: subscription.topic = subscription.topic - Subscription.objects.bulk_create(db_subscriptions, batch_size=10000) + for subscription in db_create_subscriptions: subscription.topic = subscription.topic + Subscription.objects.bulk_create(db_create_subscriptions, batch_size=10000) - for consumer in db_consumers: consumer.subscription = consumer.subscription - Consumer.objects.bulk_create(db_consumers, batch_size=10000) + for consumer in db_create_consumers: consumer.subscription = consumer.subscription + Consumer.objects.bulk_create(db_create_consumers, batch_size=10000) for replication in db_replication: replication.topic = replication.topic Replication.objects.bulk_create(db_replication, batch_size=10000) - else: - # For other DB providers we have to insert one by one - # to be able to retrieve the PK of the newly inserted records - for bundle in db_bundles: - bundle.save() - - for topic in db_topics: - topic.bundle = topic.bundle - topic.save() + update_or_create_object( + db_update_bundles, + db_update_topics, + db_update_consumers, + db_update_subscriptions) - for subscription in db_subscriptions: - subscription.topic = subscription.topic - subscription.save() - - for consumer in db_consumers: - consumer.subscription = consumer.subscription - consumer.save() + else: + update_or_create_object( + db_create_bundles, + db_create_topics, + db_create_consumers, + db_create_subscriptions) + update_or_create_object( + db_update_bundles, + db_update_topics, + db_update_consumers, + db_update_subscriptions) for replication in db_replication: replication.topic = replication.topic replication.save() +def update_or_create_object(db_bundles, db_topics, db_consumers, db_subscriptions): + # For DB providers we have to insert or update one by one + # to be able to retrieve the PK of the newly inserted records + for bundle in db_bundles: + bundle.save() + + for topic in db_topics: + topic.bundle = topic.bundle + topic.save() + + for subscription in db_subscriptions: + subscription.topic = subscription.topic + subscription.save() + + for consumer in db_consumers: + consumer.subscription = consumer.subscription + consumer.save() + def fetch_stats(): timestamp = current_milli_time()