diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index ceb8f4e8f0..8a6066fc4c 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2496,10 +2496,86 @@ func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *Auth kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) + kafkaSubRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost) // for other messaging queues, add SubRouters here } +func (aH *APIHandler) getNetworkData( + w http.ResponseWriter, r *http.Request, +) { + messagingQueue, apiErr := ParseMessagingQueueBody(r) + + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + attributeCache := make([]mq.Clients, 0) + queryRangeParams, err := mq.BuildQRParamsNetwork(messagingQueue, "throughput", attributeCache) + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + var result []*v3.Result + var errQuriesByName map[string]error + + result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + result = postprocess.TransformToTableForClickHouseQueries(result) + + // iterate over the result and extract messaging.client_id + for _, res := range result { + table := res.Table + for _, row := range table.Rows { + if row.Data["consumer_id"] != nil && row.Data["serviceName"] != nil { + consumerId := row.Data["consumer_id"].(string) + serviceName := row.Data["serviceName"].(string) + attributeCache = append(attributeCache, mq.Clients{ConsumerId: consumerId, ServiceName: serviceName}) + } + } + } + + queryRangeParams, err = mq.BuildQRParamsNetwork(messagingQueue, "fetch-latency", attributeCache) + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + resultFetchLatency, errQuriesByNameFetchLatency, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByNameFetchLatency) + return + } + resultFetchLatency = postprocess.TransformToTableForClickHouseQueries(resultFetchLatency) + + result = append(result, resultFetchLatency...) + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + func (aH *APIHandler) getProducerData( w http.ResponseWriter, r *http.Request, ) { diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md index c34bc7ad64..671d250fe9 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -195,3 +195,231 @@ response in query range format `table` } ``` +### 4) Network Fetch Latency: + + +API endpoint: + +``` +POST /api/v1/messaging-queues/kafka/consumer-lag/network-latency +``` + +```json +{ + "start": 1721174400000000000, + "end": 1722470400000000000, + "variables": { + "consumer_group": "cg1" + } +} +``` + +response in query range format `series` +```json +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "table": { + "columns": [ + { + "name": "consumer_id", + "queryName": "", + "isValueColumn": false + }, + { + "name": "instance_id", + "queryName": "", + "isValueColumn": false + }, + { + "name": "serviceName", + "queryName": "", + "isValueColumn": false + }, + { + "name": "throughput", + "queryName": "throughput", + "isValueColumn": true + } + ], + "rows": [ + { + "data": { + "consumer_id": "consumer-cg1-1", + "instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8", + "serviceName": "consumer-svc", + "throughput": 0.00035 + } + }, + { + "data": { + "consumer_id": "consumer-cg1-1", + "instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f", + "serviceName": "consumer-svc", + "throughput": 0.00027 + } + }, + { + "data": { + "consumer_id": "consumer-cg1-1", + "instance_id": "ac4833a8-fbe1-4592-a0ff-241f46a0851d", + "serviceName": "consumer-svc-2", + "throughput": 0.00019 + } + }, + { + "data": { + "consumer_id": "consumer-cg1-1", + "instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400", + "serviceName": "consumer-svc", + "throughput": 0.00008 + } + } + ] + } + }, + { + "table": { + "columns": [ + { + "name": "service_name", + "queryName": "", + "isValueColumn": false + }, + { + "name": "service_instance_id", + "queryName": "", + "isValueColumn": false + }, + { + "name": "latency_0", + "queryName": "latency_0", + "isValueColumn": true + }, + { + "name": "latency_1", + "queryName": "latency_1", + "isValueColumn": true + }, + { + "name": "latency_2", + "queryName": "latency_2", + "isValueColumn": true + }, + { + "name": "latency_3", + "queryName": "latency_3", + "isValueColumn": true + } + ], + "rows": [ + { + "data": { + "latency_0": 3230.1, + "latency_1": "n/a", + "latency_2": "n/a", + "latency_3": "n/a", + "service_instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f", + "service_name": "consumer-svc" + } + }, + { + "data": { + "latency_0": 503, + "latency_1": "n/a", + "latency_2": "n/a", + "latency_3": "n/a", + "service_instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8", + "service_name": "consumer-svc" + } + }, + { + "data": { + "latency_0": 502.62, + "latency_1": "n/a", + "latency_2": "n/a", + "latency_3": "n/a", + "service_instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400", + "service_name": "consumer-svc" + } + }, + { + "data": { + "latency_0": "n/a", + "latency_1": 3230.1, + "latency_2": "n/a", + "latency_3": "n/a", + "service_instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f", + "service_name": "consumer-svc" + } + }, + { + "data": { + "latency_0": "n/a", + "latency_1": 503, + "latency_2": "n/a", + "latency_3": "n/a", + "service_instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8", + "service_name": "consumer-svc" + } + }, + { + "data": { + "latency_0": "n/a", + "latency_1": 502.62, + "latency_2": "n/a", + "latency_3": "n/a", + "service_instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400", + "service_name": "consumer-svc" + } + }, + { + "data": { + "latency_0": "n/a", + "latency_1": "n/a", + "latency_2": 502.81, + "latency_3": "n/a", + "service_instance_id": "ac4833a8-fbe1-4592-a0ff-241f46a0851d", + "service_name": "consumer-svc-2" + } + }, + { + "data": { + "latency_0": "n/a", + "latency_1": "n/a", + "latency_2": "n/a", + "latency_3": 3230.1, + "service_instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f", + "service_name": "consumer-svc" + } + }, + { + "data": { + "latency_0": "n/a", + "latency_1": "n/a", + "latency_2": "n/a", + "latency_3": 503, + "service_instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8", + "service_name": "consumer-svc" + } + }, + { + "data": { + "latency_0": "n/a", + "latency_1": "n/a", + "latency_2": "n/a", + "latency_3": 502.62, + "service_instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400", + "service_name": "consumer-svc" + } + } + ] + } + } + ] + } +} +``` diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go index b24734cf48..2d5a55dd5f 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go @@ -7,3 +7,8 @@ type MessagingQueue struct { End int64 `json:"end"` Variables map[string]string `json:"variables,omitempty"` } + +type Clients struct { + ConsumerId string + ServiceName string +} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index e06e35efde..894ac26895 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -74,3 +74,67 @@ ORDER BY `, start, end, queueType, topic, partition, timeRange) return query } + +func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, queueType string) string { + query := fmt.Sprintf(` +--- Subquery for RPS calculation, desc sorted by rps +SELECT + stringTagMap['messaging.client_id'] AS consumer_id, + stringTagMap['service.instance.id'] AS instance_id, + serviceName, + count(*) / ((%d - %d) / 1000000000) AS rps -- Convert nanoseconds to seconds +FROM signoz_traces.signoz_index_v2 +WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 5 + AND msgSystem = '%s' + AND stringTagMap['messaging.kafka.consumer.group'] = '%s' +GROUP BY serviceName, consumer_id, instance_id +ORDER BY rps DESC +`, end, start, start, end, queueType, consumerGroup) + return query +} + +func generateNetworkLatencyFetchSQL(step, start, end int64, clientId, serviceName string) string { + query := fmt.Sprintf(` +--- metrics aggregation, desc sorted by value +WITH filtered_time_series AS ( + SELECT DISTINCT + JSONExtractString(labels, 'service_instance_id') as service_instance_id, + JSONExtractString(labels, 'service_name') as service_name, + fingerprint + FROM signoz_metrics.time_series_v4_1day + WHERE metric_name = 'kafka_consumer_fetch_latency_avg' + AND temporality = 'Unspecified' + AND unix_milli >= '%d' + AND unix_milli < '%d' + AND JSONExtractString(labels, 'service_name') = '%s' + AND JSONExtractString(labels, 'client_id') = '%s' +), +aggregated_data AS ( + SELECT + fingerprint, + any(service_instance_id) as service_instance_id, + any(service_name) as service_name, + toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL '%d' SECOND) as ts, + avg(value) as per_series_value + FROM signoz_metrics.distributed_samples_v4 + INNER JOIN filtered_time_series USING fingerprint + WHERE metric_name = 'kafka_consumer_fetch_latency_avg' + AND unix_milli >= '%d' + AND unix_milli < '%d' + GROUP BY fingerprint, ts + ORDER BY fingerprint, ts +) +SELECT + service_name, + service_instance_id, + avg(per_series_value) as value +FROM aggregated_data +WHERE isNaN(per_series_value) = 0 +GROUP BY service_name, service_instance_id +ORDER BY value DESC +`, start, end, serviceName, clientId, step, start, end) + return query +} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index 6b5d355caa..d482cfa937 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -2,7 +2,6 @@ package kafka import ( "fmt" - v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) @@ -35,6 +34,77 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) return queryRangeParams, nil } +func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) { + start := messagingQueue.Start + end := messagingQueue.End + consumerGroup, ok := messagingQueue.Variables["consumer_group"] + if !ok { + return nil, fmt.Errorf("consumer_group not found in the request") + } + + query := generateNetworkLatencyThroughputSQL(start, end, consumerGroup, queueType) + + return &v3.ClickHouseQuery{ + Query: query, + }, nil +} + +func buildClickHouseQueriesNetwork(messagingQueue *MessagingQueue, attributeCache []Clients) (map[string]*v3.ClickHouseQuery, error) { + cq := make(map[string]*v3.ClickHouseQuery) + start := messagingQueue.Start + end := messagingQueue.End + + for i, clientInfo := range attributeCache { + query := generateNetworkLatencyFetchSQL(defaultStepInterval, start/1000000, end/1000000, clientInfo.ConsumerId, clientInfo.ServiceName) + chq := &v3.ClickHouseQuery{ + Query: query, + } + index := fmt.Sprintf("latency_%d", i) + cq[index] = chq + } + + return cq, nil +} + +func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache []Clients) (*v3.QueryRangeParamsV3, error) { + + // ToDo: propagate this through APIs when there are different handlers + queueType := kafkaQueue + + var cq *v3.CompositeQuery + + if queryContext == "throughput" { + chq, err := buildClickHouseQueryNetwork(messagingQueue, queueType) + + if err != nil { + return nil, err + } + + cq, err = buildCompositeQuery(chq, queryContext) + } else if queryContext == "fetch-latency" { + chq, err := buildClickHouseQueriesNetwork(messagingQueue, attributeCache) + if err != nil { + return nil, err + } + cq = &v3.CompositeQuery{ + QueryType: v3.QueryTypeClickHouseSQL, + ClickHouseQueries: chq, + PanelType: v3.PanelTypeTable, + } + } + + queryRangeParams := &v3.QueryRangeParamsV3{ + Start: messagingQueue.Start, + End: messagingQueue.End, + Step: defaultStepInterval, + CompositeQuery: cq, + Version: "v4", + FormatForWeb: true, + } + + return queryRangeParams, nil +} + func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End