Skip to content

Commit

Permalink
feat: add network latency for kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
shivanshuraj1333 committed Aug 26, 2024
1 parent c2f607a commit 5cdcbef
Show file tree
Hide file tree
Showing 5 changed files with 444 additions and 1 deletion.
76 changes: 76 additions & 0 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2496,10 +2496,86 @@ func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *Auth

kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost)
kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost)
kafkaSubRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost)

// for other messaging queues, add SubRouters here
}

func (aH *APIHandler) getNetworkData(
w http.ResponseWriter, r *http.Request,
) {
messagingQueue, apiErr := ParseMessagingQueueBody(r)

if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}

attributeCache := make([]mq.Clients, 0)
queryRangeParams, err := mq.BuildQRParamsNetwork(messagingQueue, "throughput", attributeCache)
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}

var result []*v3.Result
var errQuriesByName map[string]error

result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
result = postprocess.TransformToTableForClickHouseQueries(result)

// iterate over the result and extract messaging.client_id
for _, res := range result {
table := res.Table
for _, row := range table.Rows {
if row.Data["consumer_id"] != nil && row.Data["serviceName"] != nil {
consumerId := row.Data["consumer_id"].(string)
serviceName := row.Data["serviceName"].(string)
attributeCache = append(attributeCache, mq.Clients{ConsumerId: consumerId, ServiceName: serviceName})
}
}
}

queryRangeParams, err = mq.BuildQRParamsNetwork(messagingQueue, "fetch-latency", attributeCache)
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}

resultFetchLatency, errQuriesByNameFetchLatency, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByNameFetchLatency)
return
}
resultFetchLatency = postprocess.TransformToTableForClickHouseQueries(resultFetchLatency)

result = append(result, resultFetchLatency...)

resp := v3.QueryRangeResponse{
Result: result,
}
aH.Respond(w, resp)
}

func (aH *APIHandler) getProducerData(
w http.ResponseWriter, r *http.Request,
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,231 @@ response in query range format `table`
}
```

### 4) Network Fetch Latency:


API endpoint:

```
POST /api/v1/messaging-queues/kafka/consumer-lag/network-latency
```

```json
{
"start": 1721174400000000000,
"end": 1722470400000000000,
"variables": {
"consumer_group": "cg1"
}
}
```

response in query range format `series`
```json
{
"status": "success",
"data": {
"resultType": "",
"result": [
{
"table": {
"columns": [
{
"name": "consumer_id",
"queryName": "",
"isValueColumn": false
},
{
"name": "instance_id",
"queryName": "",
"isValueColumn": false
},
{
"name": "serviceName",
"queryName": "",
"isValueColumn": false
},
{
"name": "throughput",
"queryName": "throughput",
"isValueColumn": true
}
],
"rows": [
{
"data": {
"consumer_id": "consumer-cg1-1",
"instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8",
"serviceName": "consumer-svc",
"throughput": 0.00035
}
},
{
"data": {
"consumer_id": "consumer-cg1-1",
"instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f",
"serviceName": "consumer-svc",
"throughput": 0.00027
}
},
{
"data": {
"consumer_id": "consumer-cg1-1",
"instance_id": "ac4833a8-fbe1-4592-a0ff-241f46a0851d",
"serviceName": "consumer-svc-2",
"throughput": 0.00019
}
},
{
"data": {
"consumer_id": "consumer-cg1-1",
"instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400",
"serviceName": "consumer-svc",
"throughput": 0.00008
}
}
]
}
},
{
"table": {
"columns": [
{
"name": "service_name",
"queryName": "",
"isValueColumn": false
},
{
"name": "service_instance_id",
"queryName": "",
"isValueColumn": false
},
{
"name": "latency_0",
"queryName": "latency_0",
"isValueColumn": true
},
{
"name": "latency_1",
"queryName": "latency_1",
"isValueColumn": true
},
{
"name": "latency_2",
"queryName": "latency_2",
"isValueColumn": true
},
{
"name": "latency_3",
"queryName": "latency_3",
"isValueColumn": true
}
],
"rows": [
{
"data": {
"latency_0": 3230.1,
"latency_1": "n/a",
"latency_2": "n/a",
"latency_3": "n/a",
"service_instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f",
"service_name": "consumer-svc"
}
},
{
"data": {
"latency_0": 503,
"latency_1": "n/a",
"latency_2": "n/a",
"latency_3": "n/a",
"service_instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8",
"service_name": "consumer-svc"
}
},
{
"data": {
"latency_0": 502.62,
"latency_1": "n/a",
"latency_2": "n/a",
"latency_3": "n/a",
"service_instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400",
"service_name": "consumer-svc"
}
},
{
"data": {
"latency_0": "n/a",
"latency_1": 3230.1,
"latency_2": "n/a",
"latency_3": "n/a",
"service_instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f",
"service_name": "consumer-svc"
}
},
{
"data": {
"latency_0": "n/a",
"latency_1": 503,
"latency_2": "n/a",
"latency_3": "n/a",
"service_instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8",
"service_name": "consumer-svc"
}
},
{
"data": {
"latency_0": "n/a",
"latency_1": 502.62,
"latency_2": "n/a",
"latency_3": "n/a",
"service_instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400",
"service_name": "consumer-svc"
}
},
{
"data": {
"latency_0": "n/a",
"latency_1": "n/a",
"latency_2": 502.81,
"latency_3": "n/a",
"service_instance_id": "ac4833a8-fbe1-4592-a0ff-241f46a0851d",
"service_name": "consumer-svc-2"
}
},
{
"data": {
"latency_0": "n/a",
"latency_1": "n/a",
"latency_2": "n/a",
"latency_3": 3230.1,
"service_instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f",
"service_name": "consumer-svc"
}
},
{
"data": {
"latency_0": "n/a",
"latency_1": "n/a",
"latency_2": "n/a",
"latency_3": 503,
"service_instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8",
"service_name": "consumer-svc"
}
},
{
"data": {
"latency_0": "n/a",
"latency_1": "n/a",
"latency_2": "n/a",
"latency_3": 502.62,
"service_instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400",
"service_name": "consumer-svc"
}
}
]
}
}
]
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@ type MessagingQueue struct {
End int64 `json:"end"`
Variables map[string]string `json:"variables,omitempty"`
}

type Clients struct {
ConsumerId string
ServiceName string
}
64 changes: 64 additions & 0 deletions pkg/query-service/app/integrations/messagingQueues/kafka/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,67 @@ ORDER BY
`, start, end, queueType, topic, partition, timeRange)
return query
}

func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, queueType string) string {
query := fmt.Sprintf(`
--- Subquery for RPS calculation, desc sorted by rps
SELECT
stringTagMap['messaging.client_id'] AS consumer_id,
stringTagMap['service.instance.id'] AS instance_id,
serviceName,
count(*) / ((%d - %d) / 1000000000) AS rps -- Convert nanoseconds to seconds
FROM signoz_traces.signoz_index_v2
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND kind = 5
AND msgSystem = '%s'
AND stringTagMap['messaging.kafka.consumer.group'] = '%s'
GROUP BY serviceName, consumer_id, instance_id
ORDER BY rps DESC
`, end, start, start, end, queueType, consumerGroup)
return query
}

func generateNetworkLatencyFetchSQL(step, start, end int64, clientId, serviceName string) string {
query := fmt.Sprintf(`
--- metrics aggregation, desc sorted by value
WITH filtered_time_series AS (
SELECT DISTINCT
JSONExtractString(labels, 'service_instance_id') as service_instance_id,
JSONExtractString(labels, 'service_name') as service_name,
fingerprint
FROM signoz_metrics.time_series_v4_1day
WHERE metric_name = 'kafka_consumer_fetch_latency_avg'
AND temporality = 'Unspecified'
AND unix_milli >= '%d'
AND unix_milli < '%d'
AND JSONExtractString(labels, 'service_name') = '%s'
AND JSONExtractString(labels, 'client_id') = '%s'
),
aggregated_data AS (
SELECT
fingerprint,
any(service_instance_id) as service_instance_id,
any(service_name) as service_name,
toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL '%d' SECOND) as ts,
avg(value) as per_series_value
FROM signoz_metrics.distributed_samples_v4
INNER JOIN filtered_time_series USING fingerprint
WHERE metric_name = 'kafka_consumer_fetch_latency_avg'
AND unix_milli >= '%d'
AND unix_milli < '%d'
GROUP BY fingerprint, ts
ORDER BY fingerprint, ts
)
SELECT
service_name,
service_instance_id,
avg(per_series_value) as value
FROM aggregated_data
WHERE isNaN(per_series_value) = 0
GROUP BY service_name, service_instance_id
ORDER BY value DESC
`, start, end, serviceName, clientId, step, start, end)
return query
}
Loading

0 comments on commit 5cdcbef

Please sign in to comment.