From f508ee7521f76532dcb1c2f050d32036a2a3745a Mon Sep 17 00:00:00 2001 From: shivanshu Date: Mon, 26 Aug 2024 20:28:11 +0530 Subject: [PATCH] chore: query, response update --- pkg/query-service/app/http_handler.go | 32 +- .../messagingQueues/kafka/consumerLag.md | 336 ++++-------------- .../integrations/messagingQueues/kafka/sql.go | 7 +- 3 files changed, 93 insertions(+), 282 deletions(-) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index d51b9e2a64..229cb7195b 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2534,17 +2534,16 @@ func (aH *APIHandler) getNetworkData( RespondError(w, apiErrObj, errQuriesByName) return } - result = postprocess.TransformToTableForClickHouseQueries(result) - // iterate over the result and extract messaging.client_id for _, res := range result { - table := res.Table - for _, row := range table.Rows { - if row.Data["client_id"] != nil && row.Data["service_instance_id"] != nil && row.Data["service_name"] != nil { - clientID := row.Data["client_id"].(string) - serviceInstanceId := row.Data["service_instance_id"].(string) - ServiceName := row.Data["service_name"].(string) - attributeCache = append(attributeCache, mq.Clients{ClientID: clientID, ServiceInstanceID: serviceInstanceId, ServiceName: ServiceName}) + for _, series := range res.Series { + clientID, clientIDOk := series.Labels["client_id"] + serviceInstanceID, serviceInstanceIDOk := series.Labels["service_instance_id"] + serviceName, serviceNameOk := series.Labels["service_name"] + if clientIDOk && serviceInstanceIDOk && serviceNameOk { + attributeCache = append(attributeCache, mq.Clients{ClientID: clientID, + ServiceInstanceID: serviceInstanceID, + ServiceName: serviceName}) } } } @@ -2567,12 +2566,21 @@ func (aH *APIHandler) getNetworkData( RespondError(w, apiErrObj, errQuriesByNameFetchLatency) return } - resultFetchLatency = postprocess.TransformToTableForClickHouseQueries(resultFetchLatency) - result = append(result, resultFetchLatency...) + latencyColoumn := &v3.Result{QueryName: "latency"} + var latencySeries []*v3.Series + for _, res := range resultFetchLatency { + for _, series := range res.Series { + latencySeries = append(latencySeries, series) + } + } + latencyColoumn.Series = latencySeries + result = append(result, latencyColoumn) + + resultFetchLatency = postprocess.TransformToTableForBuilderQueries(result, queryRangeParams) resp := v3.QueryRangeResponse{ - Result: result, + Result: resultFetchLatency, } aH.Respond(w, resp) } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md index 1d0f8c93d1..cd0e1f6324 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -1,11 +1,6 @@ ## Consumer Lag feature break down -### 1) Consumer Lag Graph - - ---- - -### 2) Consumer Group Details +### 1) Consumer Group Details API endpoint: @@ -13,94 +8,19 @@ API endpoint: POST /api/v1/messaging-queues/kafka/consumer-lag/consumer-details ``` +Request-Body ```json { - "start": 1720685296000000000, - "end": 1721290096000000000, - "variables": { - "partition": "0", - "topic": "topic1", - "consumer_group": "cg1" - } -} -``` - -response in query range format `series` -```json -{ - "status": "success", - "data": { - "resultType": "", - "result": [ - { - "table": { - "columns": [ - { - "name": "service_name", - "queryName": "", - "isValueColumn": false - }, - { - "name": "p99", - "queryName": "", - "isValueColumn": false - }, - { - "name": "error_rate", - "queryName": "", - "isValueColumn": false - }, - { - "name": "throughput", - "queryName": "", - "isValueColumn": false - }, - { - "name": "avg_msg_size", - "queryName": "", - "isValueColumn": false - } - ], - "rows": [ - { - "data": { - "avg_msg_size": "0", - "error_rate": "0", - "p99": "0.2942205100000016", - "service_name": "consumer-svc", - "throughput": "0.00016534391534391533" - } - } - ] - } - } - ] - } -} -``` - - - -### 3) Producer Details - -API endpoint: - -``` -POST /api/v1/messaging-queues/kafka/consumer-lag/producer-details -``` - -```json -{ - "start": 1720685296000000000, - "end": 1721290096000000000, + "start": 1724429217000000000, + "end": 1724431017000000000, "variables": { - "partition": "0", - "topic": "topic1" + "partition": "0", + "topic": "topic1", + "consumer_group": "cg1" } } ``` - -response in query range format `series` +Response in query range `table` format ```json { "status": "success", @@ -116,7 +36,7 @@ response in query range format `series` "isValueColumn": false }, { - "name": "p99_query.p99", + "name": "p99", "queryName": "", "isValueColumn": false }, @@ -126,7 +46,12 @@ response in query range format `series` "isValueColumn": false }, { - "name": "rps", + "name": "throughput", + "queryName": "", + "isValueColumn": false + }, + { + "name": "avg_msg_size", "queryName": "", "isValueColumn": false } @@ -134,10 +59,11 @@ response in query range format `series` "rows": [ { "data": { + "avg_msg_size": "15", "error_rate": "0", - "p99_query.p99": "150.08830908000002", - "rps": "0.00016534391534391533", - "service_name": "producer-svc" + "p99": "0.47993265000000035", + "service_name": "consumer-svc", + "throughput": "39.86888888888889" } } ] @@ -147,7 +73,30 @@ response in query range format `series` } } ``` -response in query range format `table` + +---- + +### 2) Producer Details + +API endpoint: + +``` +POST /api/v1/messaging-queues/kafka/consumer-lag/producer-details +``` + +Request-Body +```json +{ + "start": 1724429217000000000, + "end": 1724431017000000000, + "variables": { + "partition": "0", + "topic": "topic1" + } +} +``` + +Response in query range `table` format ```json { "status": "success", @@ -163,12 +112,12 @@ response in query range format `table` "isValueColumn": false }, { - "name": "p99_query.p99", + "name": "p99", "queryName": "", "isValueColumn": false }, { - "name": "error_rate", + "name": "error_percentage", "queryName": "", "isValueColumn": false }, @@ -181,9 +130,9 @@ response in query range format `table` "rows": [ { "data": { - "error_rate": "0", - "p99_query.p99": "150.08830908000002", - "rps": "0.00016534391534391533", + "error_percentage": "0", + "p99": "5.51359028", + "rps": "39.86888888888889", "service_name": "producer-svc" } } @@ -195,8 +144,7 @@ response in query range format `table` } ``` -### 4) Network Fetch Latency: - +### 3) Network Fetch Latency: API endpoint: @@ -204,17 +152,18 @@ API endpoint: POST /api/v1/messaging-queues/kafka/consumer-lag/network-latency ``` +Request-Body ```json { - "start": 1721174400000000000, - "end": 1722470400000000000, + "start": 1724673937000000000, + "end": 1724675737000000000, "variables": { "consumer_group": "cg1" } } ``` -response in query range format `series` +Response in query range `table` format ```json { "status": "success", @@ -225,67 +174,12 @@ response in query range format `series` "table": { "columns": [ { - "name": "client_id", - "queryName": "", - "isValueColumn": false - }, - { - "name": "instance_id", - "queryName": "", - "isValueColumn": false - }, - { - "name": "serviceName", + "name": "service_name", "queryName": "", "isValueColumn": false }, { - "name": "throughput", - "queryName": "throughput", - "isValueColumn": true - } - ], - "rows": [ - { - "data": { - "client_id": "consumer-cg1-1", - "instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8", - "serviceName": "consumer-svc", - "throughput": 0.00035 - } - }, - { - "data": { - "client_id": "consumer-cg1-1", - "instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f", - "serviceName": "consumer-svc", - "throughput": 0.00027 - } - }, - { - "data": { - "client_id": "consumer-cg1-1", - "instance_id": "ac4833a8-fbe1-4592-a0ff-241f46a0851d", - "serviceName": "consumer-svc-2", - "throughput": 0.00019 - } - }, - { - "data": { - "client_id": "consumer-cg1-1", - "instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400", - "serviceName": "consumer-svc", - "throughput": 0.00008 - } - } - ] - } - }, - { - "table": { - "columns": [ - { - "name": "service_name", + "name": "client_id", "queryName": "", "isValueColumn": false }, @@ -295,125 +189,33 @@ response in query range format `series` "isValueColumn": false }, { - "name": "latency_0", - "queryName": "latency_0", - "isValueColumn": true - }, - { - "name": "latency_1", - "queryName": "latency_1", + "name": "latency", + "queryName": "latency", "isValueColumn": true }, { - "name": "latency_2", - "queryName": "latency_2", - "isValueColumn": true - }, - { - "name": "latency_3", - "queryName": "latency_3", + "name": "throughput", + "queryName": "throughput", "isValueColumn": true } ], "rows": [ { "data": { - "latency_0": 3230.1, - "latency_1": "n/a", - "latency_2": "n/a", - "latency_3": "n/a", - "service_instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f", - "service_name": "consumer-svc" - } - }, - { - "data": { - "latency_0": 503, - "latency_1": "n/a", - "latency_2": "n/a", - "latency_3": "n/a", - "service_instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8", - "service_name": "consumer-svc" - } - }, - { - "data": { - "latency_0": 502.62, - "latency_1": "n/a", - "latency_2": "n/a", - "latency_3": "n/a", - "service_instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400", - "service_name": "consumer-svc" - } - }, - { - "data": { - "latency_0": "n/a", - "latency_1": 3230.1, - "latency_2": "n/a", - "latency_3": "n/a", - "service_instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f", - "service_name": "consumer-svc" - } - }, - { - "data": { - "latency_0": "n/a", - "latency_1": 503, - "latency_2": "n/a", - "latency_3": "n/a", - "service_instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8", - "service_name": "consumer-svc" - } - }, - { - "data": { - "latency_0": "n/a", - "latency_1": 502.62, - "latency_2": "n/a", - "latency_3": "n/a", - "service_instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400", - "service_name": "consumer-svc" - } - }, - { - "data": { - "latency_0": "n/a", - "latency_1": "n/a", - "latency_2": 502.81, - "latency_3": "n/a", - "service_instance_id": "ac4833a8-fbe1-4592-a0ff-241f46a0851d", - "service_name": "consumer-svc-2" - } - }, - { - "data": { - "latency_0": "n/a", - "latency_1": "n/a", - "latency_2": "n/a", - "latency_3": 3230.1, - "service_instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f", - "service_name": "consumer-svc" - } - }, - { - "data": { - "latency_0": "n/a", - "latency_1": "n/a", - "latency_2": "n/a", - "latency_3": 503, - "service_instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8", - "service_name": "consumer-svc" + "client_id": "consumer-cg1-1", + "latency": 25.21, + "service_instance_id": "ccf49550-2e8f-4c7b-be29-b9e0891ef93d", + "service_name": "consumer-svc", + "throughput": 24.91 } }, { "data": { - "latency_0": "n/a", - "latency_1": "n/a", - "latency_2": "n/a", - "latency_3": 502.62, - "service_instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400", - "service_name": "consumer-svc" + "client_id": "consumer-cg1-1", + "latency": 49.68, + "service_instance_id": "b0a851d7-1735-4e3f-8f5f-7c63a8a55a24", + "service_name": "consumer-svc", + "throughput": 14.97 } } ] diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index 49e50a3d97..7b38f128bd 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -76,14 +76,15 @@ ORDER BY } func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, queueType string) string { + timeRange := (end - start) / 1000000000 query := fmt.Sprintf(` --- Subquery for RPS calculation, desc sorted by rps SELECT stringTagMap['messaging.client_id'] AS client_id, stringTagMap['service.instance.id'] AS service_instance_id, serviceName AS service_name, - count(*) / ((%d - %d) / 1000000000) AS rps -- Convert nanoseconds to seconds -FROM signoz_traces.signoz_index_v2 + count(*) / %d AS rps -- Convert nanoseconds to seconds +FROM signoz_traces.distributed_signoz_index_v2 WHERE timestamp >= '%d' AND timestamp <= '%d' @@ -92,6 +93,6 @@ WHERE AND stringTagMap['messaging.kafka.consumer.group'] = '%s' GROUP BY service_name, client_id, service_instance_id ORDER BY rps DESC -`, end, start, start, end, queueType, consumerGroup) +`, timeRange, start, end, queueType, consumerGroup) return query }