Skip to content

Commit

Permalink
Remove IsDeleted filter to improve pinot query performance (cadence-w…
Browse files Browse the repository at this point in the history
  • Loading branch information
neil-xie authored Aug 8, 2024
1 parent d9c7481 commit d5814da
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 53 deletions.
7 changes: 0 additions & 7 deletions common/persistence/pinot/pinot_visibility_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,6 @@ func (v *pinotVisibilityStore) getCountWorkflowExecutionsQuery(tableName string,

// need to add Domain ID
query.filters.addEqual(DomainID, request.DomainUUID)
query.filters.addEqual(IsDeleted, false)

requestQuery := strings.TrimSpace(request.Query)

Expand Down Expand Up @@ -801,7 +800,6 @@ func (v *pinotVisibilityStore) getListWorkflowExecutionsByQueryQuery(tableName s

// need to add Domain ID
query.filters.addEqual(DomainID, request.DomainUUID)
query.filters.addEqual(IsDeleted, false)

requestQuery := strings.TrimSpace(request.Query)

Expand Down Expand Up @@ -960,7 +958,6 @@ func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWor

query := NewPinotQuery(tableName)
query.filters.addEqual(DomainID, request.DomainUUID)
query.filters.addEqual(IsDeleted, false)

earliest := request.EarliestTime.UnixMilli() - oneMicroSecondInNano
latest := request.LatestTime.UnixMilli() + oneMicroSecondInNano
Expand Down Expand Up @@ -988,7 +985,6 @@ func getListWorkflowExecutionsByTypeQuery(tableName string, request *p.InternalL
query := NewPinotQuery(tableName)

query.filters.addEqual(DomainID, request.DomainUUID)
query.filters.addEqual(IsDeleted, false)
query.filters.addEqual(WorkflowType, request.WorkflowTypeName)
earliest := request.EarliestTime.UnixMilli() - oneMicroSecondInNano
latest := request.LatestTime.UnixMilli() + oneMicroSecondInNano
Expand Down Expand Up @@ -1024,7 +1020,6 @@ func getListWorkflowExecutionsByWorkflowIDQuery(tableName string, request *p.Int
query := NewPinotQuery(tableName)

query.filters.addEqual(DomainID, request.DomainUUID)
query.filters.addEqual(IsDeleted, false)
query.filters.addEqual(WorkflowID, request.WorkflowID)
earliest := request.EarliestTime.UnixMilli() - oneMicroSecondInNano
latest := request.LatestTime.UnixMilli() + oneMicroSecondInNano
Expand Down Expand Up @@ -1060,7 +1055,6 @@ func getListWorkflowExecutionsByStatusQuery(tableName string, request *p.Interna
query := NewPinotQuery(tableName)

query.filters.addEqual(DomainID, request.DomainUUID)
query.filters.addEqual(IsDeleted, false)

status := 0
switch request.Status.String() {
Expand Down Expand Up @@ -1103,7 +1097,6 @@ func getGetClosedWorkflowExecutionQuery(tableName string, request *p.InternalGet
query := NewPinotQuery(tableName)

query.filters.addEqual(DomainID, request.DomainUUID)
query.filters.addEqual(IsDeleted, false)
query.filters.addGte(CloseStatus, 0)
query.filters.addEqual(WorkflowID, request.Execution.GetWorkflowID())

Expand Down
29 changes: 0 additions & 29 deletions common/persistence/pinot/pinot_visibility_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,6 @@ func TestGetCountWorkflowExecutionsQuery(t *testing.T) {
expectEmptyQueryResult := fmt.Sprintf(`SELECT COUNT(*)
FROM test-table-name
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
`)

request := &p.CountWorkflowExecutionsRequest{
Expand All @@ -1098,7 +1097,6 @@ AND IsDeleted = false
expectResult := fmt.Sprintf(`SELECT COUNT(*)
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND WorkflowID = 'wfid'
`, testTableName)

Expand Down Expand Up @@ -1174,7 +1172,6 @@ func TestGetListWorkflowExecutionQuery(t *testing.T) {
`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND (JSON_MATCH(Attr, '"$.CustomKeywordField"=''keywordCustomized''') or JSON_MATCH(Attr, '"$.CustomKeywordField[*]"=''keywordCustomized'''))
Order BY StartTime DESC
LIMIT 0, 10
Expand All @@ -1193,7 +1190,6 @@ LIMIT 0, 10
`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND JSON_MATCH(Attr, '"$.CustomIntField"=''2''') and (JSON_MATCH(Attr, '"$.CustomKeywordField"=''Update2''') or JSON_MATCH(Attr, '"$.CustomKeywordField[*]"=''Update2'''))
order by CustomDatetimeField DESC
LIMIT 0, 10
Expand All @@ -1211,7 +1207,6 @@ LIMIT 0, 10
expectedOutput: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND (JSON_MATCH(Attr, '"$.CustomKeywordField"=''keywordCustomized''') or JSON_MATCH(Attr, '"$.CustomKeywordField[*]"=''keywordCustomized''')) and (JSON_MATCH(Attr, '"$.CustomStringField" is not null') AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.CustomStringField', 'string'), 'String and or order by*'))
Order BY StartTime DESC
LIMIT 0, 10
Expand All @@ -1229,7 +1224,6 @@ LIMIT 0, 10
expectedOutput: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND ((JSON_MATCH(Attr, '"$.CustomStringField" is not null') AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.CustomStringField', 'string'), 'Or*')) or (JSON_MATCH(Attr, '"$.CustomStringField" is not null') AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.CustomStringField', 'string'), 'and*')))
Order by StartTime DESC
LIMIT 0, 10
Expand All @@ -1247,7 +1241,6 @@ LIMIT 0, 10
expectedOutput: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND WorkflowID = 'wid' and ((JSON_MATCH(Attr, '"$.CustomStringField" is not null') AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.CustomStringField', 'string'), 'custom and custom2 or custom3 order by*')) or (JSON_MATCH(Attr, '"$.CustomIntField" is not null') AND CAST(JSON_EXTRACT_SCALAR(Attr, '$.CustomIntField') AS INT) >= 1 AND CAST(JSON_EXTRACT_SCALAR(Attr, '$.CustomIntField') AS INT) <= 10))
Order BY StartTime DESC
LIMIT 0, 10
Expand All @@ -1265,7 +1258,6 @@ LIMIT 0, 10
expectedOutput: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND (JSON_MATCH(Attr, '"$.CustomIntField"=''1''') or JSON_MATCH(Attr, '"$.CustomIntField"=''2'''))
Order BY StartTime DESC
LIMIT 0, 10
Expand All @@ -1283,7 +1275,6 @@ LIMIT 0, 10
expectedOutput: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND CloseTime = -1 and WorkflowType = 'some-test-workflow'
Order BY StartTime DESC
LIMIT 0, 10
Expand All @@ -1301,7 +1292,6 @@ LIMIT 0, 10
expectedOutput: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND CloseStatus < 0 and (JSON_MATCH(Attr, '"$.CustomKeywordField"=''keywordCustomized''') or JSON_MATCH(Attr, '"$.CustomKeywordField[*]"=''keywordCustomized''')) and (JSON_MATCH(Attr, '"$.CustomIntField" is not null') AND CAST(JSON_EXTRACT_SCALAR(Attr, '$.CustomIntField') AS INT) <= 10) and (JSON_MATCH(Attr, '"$.CustomStringField" is not null') AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.CustomStringField', 'string'), 'String field is for text*'))
Order by DomainID Desc
LIMIT 11, 10
Expand All @@ -1319,7 +1309,6 @@ LIMIT 11, 10
expectedOutput: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
Order by DomainId Desc
LIMIT 0, 10
`, testTableName),
Expand All @@ -1336,7 +1325,6 @@ LIMIT 0, 10
expectedOutput: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND CloseStatus < 0
Order BY StartTime DESC
LIMIT 0, 10
Expand All @@ -1354,7 +1342,6 @@ LIMIT 0, 10
expectedOutput: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
LIMIT 0, 10
`, testTableName),
},
Expand All @@ -1364,7 +1351,6 @@ LIMIT 0, 10
expectedOutput: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = ''
AND IsDeleted = false
LIMIT 0, 0
`, testTableName),
},
Expand Down Expand Up @@ -1409,7 +1395,6 @@ func TestGetListWorkflowExecutionsQuery(t *testing.T) {
expectCloseResult := fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND CloseTime BETWEEN 1547596871371 AND 2547596873371
AND CloseStatus >= 0
Order BY StartTime DESC
Expand All @@ -1418,7 +1403,6 @@ LIMIT 0, 10
expectOpenResult := fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND StartTime BETWEEN 1547596871371 AND 2547596873371
AND CloseStatus < 0
AND CloseTime = -1
Expand Down Expand Up @@ -1454,7 +1438,6 @@ func TestGetListWorkflowExecutionsByTypeQuery(t *testing.T) {
expectCloseResult := fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND WorkflowType = 'test-wf-type'
AND CloseTime BETWEEN 1547596871371 AND 2547596873371
AND CloseStatus >= 0
Expand All @@ -1464,7 +1447,6 @@ LIMIT 0, 10
expectOpenResult := fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND WorkflowType = 'test-wf-type'
AND StartTime BETWEEN 1547596871371 AND 2547596873371
AND CloseStatus < 0
Expand Down Expand Up @@ -1501,7 +1483,6 @@ func TestGetListWorkflowExecutionsByWorkflowIDQuery(t *testing.T) {
expectCloseResult := fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND WorkflowID = 'test-wid'
AND CloseTime BETWEEN 1547596871371 AND 2547596873371
AND CloseStatus >= 0
Expand All @@ -1511,7 +1492,6 @@ LIMIT 0, 10
expectOpenResult := fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND WorkflowID = 'test-wid'
AND StartTime BETWEEN 1547596871371 AND 2547596873371
AND CloseStatus < 0
Expand Down Expand Up @@ -1555,7 +1535,6 @@ func TestGetListWorkflowExecutionsByStatusQuery(t *testing.T) {
expectResult: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND CloseStatus = 0
AND CloseTime BETWEEN 1547596872371 AND 2547596872371
Order BY StartTime DESC
Expand All @@ -1578,7 +1557,6 @@ LIMIT 0, 10
expectResult: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND CloseStatus = 1
AND CloseTime BETWEEN 1547596872371 AND 2547596872371
Order BY StartTime DESC
Expand All @@ -1601,7 +1579,6 @@ LIMIT 0, 10
expectResult: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND CloseStatus = 2
AND CloseTime BETWEEN 1547596872371 AND 2547596872371
Order BY StartTime DESC
Expand All @@ -1624,7 +1601,6 @@ LIMIT 0, 10
expectResult: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND CloseStatus = 3
AND CloseTime BETWEEN 1547596872371 AND 2547596872371
Order BY StartTime DESC
Expand All @@ -1647,7 +1623,6 @@ LIMIT 0, 10
expectResult: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND CloseStatus = 4
AND CloseTime BETWEEN 1547596872371 AND 2547596872371
Order BY StartTime DESC
Expand All @@ -1670,7 +1645,6 @@ LIMIT 0, 10
expectResult: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND CloseStatus = 5
AND CloseTime BETWEEN 1547596872371 AND 2547596872371
Order BY StartTime DESC
Expand Down Expand Up @@ -1706,7 +1680,6 @@ func TestGetGetClosedWorkflowExecutionQuery(t *testing.T) {
expectedOutput: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND CloseStatus >= 0
AND WorkflowID = 'test-wid'
`, testTableName),
Expand All @@ -1724,7 +1697,6 @@ AND WorkflowID = 'test-wid'
expectedOutput: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND CloseStatus >= 0
AND WorkflowID = 'test-wid'
AND RunID = 'runid'
Expand All @@ -1736,7 +1708,6 @@ AND RunID = 'runid'
expectedOutput: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = ''
AND IsDeleted = false
AND CloseStatus >= 0
AND WorkflowID = ''
`, testTableName),
Expand Down
6 changes: 3 additions & 3 deletions docker/buildkite/docker-compose-pinot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ services:
- zookeeper

pinot-controller:
image: apachepinot/pinot:0.12.1
image: apachepinot/pinot:1.1.0
command: "StartController -zkAddress zookeeper:2181 -controllerPort 9001"
container_name: pinot-controller
restart: unless-stopped
Expand All @@ -97,7 +97,7 @@ services:
aliases:
- pinot-controller
pinot-broker:
image: apachepinot/pinot:0.12.1
image: apachepinot/pinot:1.1.0
command: "StartBroker -zkAddress zookeeper:2181"
restart: unless-stopped
container_name: "pinot-broker"
Expand All @@ -112,7 +112,7 @@ services:
aliases:
- pinot-broker
pinot-server:
image: apachepinot/pinot:0.12.1
image: apachepinot/pinot:1.1.0
command: "StartServer -zkAddress zookeeper:2181"
restart: unless-stopped
container_name: "pinot-server"
Expand Down
6 changes: 3 additions & 3 deletions docker/dev/cassandra-pinot-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ services:
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_TICK_TIME=2000
pinot-controller:
image: apachepinot/pinot:0.12.1
image: apachepinot/pinot:1.1.0
command: "StartController -zkAddress zookeeper:2181 -controllerPort 9001"
container_name: pinot-controller
restart: unless-stopped
Expand All @@ -27,7 +27,7 @@ services:
depends_on:
- zookeeper
pinot-broker:
image: apachepinot/pinot:0.12.1
image: apachepinot/pinot:1.1.0
command: "StartBroker -zkAddress zookeeper:2181"
restart: unless-stopped
container_name: "pinot-broker"
Expand All @@ -38,7 +38,7 @@ services:
depends_on:
- pinot-controller
pinot-server:
image: apachepinot/pinot:0.12.1
image: apachepinot/pinot:1.1.0
command: "StartServer -zkAddress zookeeper:2181"
restart: unless-stopped
container_name: "pinot-server"
Expand Down
6 changes: 3 additions & 3 deletions docker/docker-compose-pinot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ services:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
pinot-controller:
image: apachepinot/pinot:0.11.0
image: apachepinot/pinot:1.1.0
command: "StartController -zkAddress zookeeper:2181"
container_name: pinot-controller
restart: unless-stopped
Expand All @@ -51,7 +51,7 @@ services:
depends_on:
- zookeeper
pinot-broker:
image: apachepinot/pinot:0.11.0
image: apachepinot/pinot:1.1.0
command: "StartBroker -zkAddress zookeeper:2181"
restart: unless-stopped
container_name: "pinot-broker"
Expand All @@ -62,7 +62,7 @@ services:
depends_on:
- pinot-controller
pinot-server:
image: apachepinot/pinot:0.11.0
image: apachepinot/pinot:1.1.0
command: "StartServer -zkAddress zookeeper:2181"
restart: unless-stopped
container_name: "pinot-server"
Expand Down
13 changes: 6 additions & 7 deletions schema/pinot/cadence-visibility-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,18 @@
"stream.kafka.hlc.zk.connect.string": "zookeeper:2181/kafka",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "zookeeper:2181/kafka",
"stream.kafka.broker.list": "kafka:9093",
"realtime.segment.flush.threshold.size": 30,
"realtime.segment.flush.threshold.rows": 30,
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.threshold.segment.size": "50M"
"stream.kafka.broker.list": "kafka:9093"
}
},
"routing": {
"instanceSelectorType": "strictReplicaGroup"
},
"upsertConfig": {
"mode": "FULL"
"mode": "FULL",
"deleteRecordColumn": "IsDeleted",
"deletedKeysTTL": 86400,
"hashFunction": "NONE",
"enableSnapshot": false
},
"metadata": {}
}

Loading

0 comments on commit d5814da

Please sign in to comment.