diff --git a/common/persistence/pinot/pinot_visibility_store.go b/common/persistence/pinot/pinot_visibility_store.go index 7ab433f5984..6b9fd3695bd 100644 --- a/common/persistence/pinot/pinot_visibility_store.go +++ b/common/persistence/pinot/pinot_visibility_store.go @@ -762,7 +762,6 @@ func (v *pinotVisibilityStore) getCountWorkflowExecutionsQuery(tableName string, // need to add Domain ID query.filters.addEqual(DomainID, request.DomainUUID) - query.filters.addEqual(IsDeleted, false) requestQuery := strings.TrimSpace(request.Query) @@ -801,7 +800,6 @@ func (v *pinotVisibilityStore) getListWorkflowExecutionsByQueryQuery(tableName s // need to add Domain ID query.filters.addEqual(DomainID, request.DomainUUID) - query.filters.addEqual(IsDeleted, false) requestQuery := strings.TrimSpace(request.Query) @@ -960,7 +958,6 @@ func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWor query := NewPinotQuery(tableName) query.filters.addEqual(DomainID, request.DomainUUID) - query.filters.addEqual(IsDeleted, false) earliest := request.EarliestTime.UnixMilli() - oneMicroSecondInNano latest := request.LatestTime.UnixMilli() + oneMicroSecondInNano @@ -988,7 +985,6 @@ func getListWorkflowExecutionsByTypeQuery(tableName string, request *p.InternalL query := NewPinotQuery(tableName) query.filters.addEqual(DomainID, request.DomainUUID) - query.filters.addEqual(IsDeleted, false) query.filters.addEqual(WorkflowType, request.WorkflowTypeName) earliest := request.EarliestTime.UnixMilli() - oneMicroSecondInNano latest := request.LatestTime.UnixMilli() + oneMicroSecondInNano @@ -1024,7 +1020,6 @@ func getListWorkflowExecutionsByWorkflowIDQuery(tableName string, request *p.Int query := NewPinotQuery(tableName) query.filters.addEqual(DomainID, request.DomainUUID) - query.filters.addEqual(IsDeleted, false) query.filters.addEqual(WorkflowID, request.WorkflowID) earliest := request.EarliestTime.UnixMilli() - oneMicroSecondInNano latest := request.LatestTime.UnixMilli() + oneMicroSecondInNano @@ -1060,7 +1055,6 @@ func getListWorkflowExecutionsByStatusQuery(tableName string, request *p.Interna query := NewPinotQuery(tableName) query.filters.addEqual(DomainID, request.DomainUUID) - query.filters.addEqual(IsDeleted, false) status := 0 switch request.Status.String() { @@ -1103,7 +1097,6 @@ func getGetClosedWorkflowExecutionQuery(tableName string, request *p.InternalGet query := NewPinotQuery(tableName) query.filters.addEqual(DomainID, request.DomainUUID) - query.filters.addEqual(IsDeleted, false) query.filters.addGte(CloseStatus, 0) query.filters.addEqual(WorkflowID, request.Execution.GetWorkflowID()) diff --git a/common/persistence/pinot/pinot_visibility_store_test.go b/common/persistence/pinot/pinot_visibility_store_test.go index 1334a57e01e..904f456a0cf 100644 --- a/common/persistence/pinot/pinot_visibility_store_test.go +++ b/common/persistence/pinot/pinot_visibility_store_test.go @@ -1086,7 +1086,6 @@ func TestGetCountWorkflowExecutionsQuery(t *testing.T) { expectEmptyQueryResult := fmt.Sprintf(`SELECT COUNT(*) FROM test-table-name WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false `) request := &p.CountWorkflowExecutionsRequest{ @@ -1098,7 +1097,6 @@ AND IsDeleted = false expectResult := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND WorkflowID = 'wfid' `, testTableName) @@ -1174,7 +1172,6 @@ func TestGetListWorkflowExecutionQuery(t *testing.T) { `SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND (JSON_MATCH(Attr, '"$.CustomKeywordField"=''keywordCustomized''') or JSON_MATCH(Attr, '"$.CustomKeywordField[*]"=''keywordCustomized''')) Order BY StartTime DESC LIMIT 0, 10 @@ -1193,7 +1190,6 @@ LIMIT 0, 10 `SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND JSON_MATCH(Attr, '"$.CustomIntField"=''2''') and (JSON_MATCH(Attr, '"$.CustomKeywordField"=''Update2''') or JSON_MATCH(Attr, '"$.CustomKeywordField[*]"=''Update2''')) order by CustomDatetimeField DESC LIMIT 0, 10 @@ -1211,7 +1207,6 @@ LIMIT 0, 10 expectedOutput: fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND (JSON_MATCH(Attr, '"$.CustomKeywordField"=''keywordCustomized''') or JSON_MATCH(Attr, '"$.CustomKeywordField[*]"=''keywordCustomized''')) and (JSON_MATCH(Attr, '"$.CustomStringField" is not null') AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.CustomStringField', 'string'), 'String and or order by*')) Order BY StartTime DESC LIMIT 0, 10 @@ -1229,7 +1224,6 @@ LIMIT 0, 10 expectedOutput: fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND ((JSON_MATCH(Attr, '"$.CustomStringField" is not null') AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.CustomStringField', 'string'), 'Or*')) or (JSON_MATCH(Attr, '"$.CustomStringField" is not null') AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.CustomStringField', 'string'), 'and*'))) Order by StartTime DESC LIMIT 0, 10 @@ -1247,7 +1241,6 @@ LIMIT 0, 10 expectedOutput: fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND WorkflowID = 'wid' and ((JSON_MATCH(Attr, '"$.CustomStringField" is not null') AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.CustomStringField', 'string'), 'custom and custom2 or custom3 order by*')) or (JSON_MATCH(Attr, '"$.CustomIntField" is not null') AND CAST(JSON_EXTRACT_SCALAR(Attr, '$.CustomIntField') AS INT) >= 1 AND CAST(JSON_EXTRACT_SCALAR(Attr, '$.CustomIntField') AS INT) <= 10)) Order BY StartTime DESC LIMIT 0, 10 @@ -1265,7 +1258,6 @@ LIMIT 0, 10 expectedOutput: fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND (JSON_MATCH(Attr, '"$.CustomIntField"=''1''') or JSON_MATCH(Attr, '"$.CustomIntField"=''2''')) Order BY StartTime DESC LIMIT 0, 10 @@ -1283,7 +1275,6 @@ LIMIT 0, 10 expectedOutput: fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND CloseTime = -1 and WorkflowType = 'some-test-workflow' Order BY StartTime DESC LIMIT 0, 10 @@ -1301,7 +1292,6 @@ LIMIT 0, 10 expectedOutput: fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND CloseStatus < 0 and (JSON_MATCH(Attr, '"$.CustomKeywordField"=''keywordCustomized''') or JSON_MATCH(Attr, '"$.CustomKeywordField[*]"=''keywordCustomized''')) and (JSON_MATCH(Attr, '"$.CustomIntField" is not null') AND CAST(JSON_EXTRACT_SCALAR(Attr, '$.CustomIntField') AS INT) <= 10) and (JSON_MATCH(Attr, '"$.CustomStringField" is not null') AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.CustomStringField', 'string'), 'String field is for text*')) Order by DomainID Desc LIMIT 11, 10 @@ -1319,7 +1309,6 @@ LIMIT 11, 10 expectedOutput: fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false Order by DomainId Desc LIMIT 0, 10 `, testTableName), @@ -1336,7 +1325,6 @@ LIMIT 0, 10 expectedOutput: fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND CloseStatus < 0 Order BY StartTime DESC LIMIT 0, 10 @@ -1354,7 +1342,6 @@ LIMIT 0, 10 expectedOutput: fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false LIMIT 0, 10 `, testTableName), }, @@ -1364,7 +1351,6 @@ LIMIT 0, 10 expectedOutput: fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = '' -AND IsDeleted = false LIMIT 0, 0 `, testTableName), }, @@ -1409,7 +1395,6 @@ func TestGetListWorkflowExecutionsQuery(t *testing.T) { expectCloseResult := fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND CloseTime BETWEEN 1547596871371 AND 2547596873371 AND CloseStatus >= 0 Order BY StartTime DESC @@ -1418,7 +1403,6 @@ LIMIT 0, 10 expectOpenResult := fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND StartTime BETWEEN 1547596871371 AND 2547596873371 AND CloseStatus < 0 AND CloseTime = -1 @@ -1454,7 +1438,6 @@ func TestGetListWorkflowExecutionsByTypeQuery(t *testing.T) { expectCloseResult := fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND WorkflowType = 'test-wf-type' AND CloseTime BETWEEN 1547596871371 AND 2547596873371 AND CloseStatus >= 0 @@ -1464,7 +1447,6 @@ LIMIT 0, 10 expectOpenResult := fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND WorkflowType = 'test-wf-type' AND StartTime BETWEEN 1547596871371 AND 2547596873371 AND CloseStatus < 0 @@ -1501,7 +1483,6 @@ func TestGetListWorkflowExecutionsByWorkflowIDQuery(t *testing.T) { expectCloseResult := fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND WorkflowID = 'test-wid' AND CloseTime BETWEEN 1547596871371 AND 2547596873371 AND CloseStatus >= 0 @@ -1511,7 +1492,6 @@ LIMIT 0, 10 expectOpenResult := fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND WorkflowID = 'test-wid' AND StartTime BETWEEN 1547596871371 AND 2547596873371 AND CloseStatus < 0 @@ -1555,7 +1535,6 @@ func TestGetListWorkflowExecutionsByStatusQuery(t *testing.T) { expectResult: fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND CloseStatus = 0 AND CloseTime BETWEEN 1547596872371 AND 2547596872371 Order BY StartTime DESC @@ -1578,7 +1557,6 @@ LIMIT 0, 10 expectResult: fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND CloseStatus = 1 AND CloseTime BETWEEN 1547596872371 AND 2547596872371 Order BY StartTime DESC @@ -1601,7 +1579,6 @@ LIMIT 0, 10 expectResult: fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND CloseStatus = 2 AND CloseTime BETWEEN 1547596872371 AND 2547596872371 Order BY StartTime DESC @@ -1624,7 +1601,6 @@ LIMIT 0, 10 expectResult: fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND CloseStatus = 3 AND CloseTime BETWEEN 1547596872371 AND 2547596872371 Order BY StartTime DESC @@ -1647,7 +1623,6 @@ LIMIT 0, 10 expectResult: fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND CloseStatus = 4 AND CloseTime BETWEEN 1547596872371 AND 2547596872371 Order BY StartTime DESC @@ -1670,7 +1645,6 @@ LIMIT 0, 10 expectResult: fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND CloseStatus = 5 AND CloseTime BETWEEN 1547596872371 AND 2547596872371 Order BY StartTime DESC @@ -1706,7 +1680,6 @@ func TestGetGetClosedWorkflowExecutionQuery(t *testing.T) { expectedOutput: fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND CloseStatus >= 0 AND WorkflowID = 'test-wid' `, testTableName), @@ -1724,7 +1697,6 @@ AND WorkflowID = 'test-wid' expectedOutput: fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false AND CloseStatus >= 0 AND WorkflowID = 'test-wid' AND RunID = 'runid' @@ -1736,7 +1708,6 @@ AND RunID = 'runid' expectedOutput: fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = '' -AND IsDeleted = false AND CloseStatus >= 0 AND WorkflowID = '' `, testTableName), diff --git a/docker/buildkite/docker-compose-pinot.yml b/docker/buildkite/docker-compose-pinot.yml index fccf0c217db..3c24e5b023d 100644 --- a/docker/buildkite/docker-compose-pinot.yml +++ b/docker/buildkite/docker-compose-pinot.yml @@ -82,7 +82,7 @@ services: - zookeeper pinot-controller: - image: apachepinot/pinot:0.12.1 + image: apachepinot/pinot:1.1.0 command: "StartController -zkAddress zookeeper:2181 -controllerPort 9001" container_name: pinot-controller restart: unless-stopped @@ -97,7 +97,7 @@ services: aliases: - pinot-controller pinot-broker: - image: apachepinot/pinot:0.12.1 + image: apachepinot/pinot:1.1.0 command: "StartBroker -zkAddress zookeeper:2181" restart: unless-stopped container_name: "pinot-broker" @@ -112,7 +112,7 @@ services: aliases: - pinot-broker pinot-server: - image: apachepinot/pinot:0.12.1 + image: apachepinot/pinot:1.1.0 command: "StartServer -zkAddress zookeeper:2181" restart: unless-stopped container_name: "pinot-server" diff --git a/docker/dev/cassandra-pinot-kafka.yml b/docker/dev/cassandra-pinot-kafka.yml index 7b77b67b5c8..31091ed4d66 100644 --- a/docker/dev/cassandra-pinot-kafka.yml +++ b/docker/dev/cassandra-pinot-kafka.yml @@ -16,7 +16,7 @@ services: - ZOOKEEPER_CLIENT_PORT=2181 - ZOOKEEPER_TICK_TIME=2000 pinot-controller: - image: apachepinot/pinot:0.12.1 + image: apachepinot/pinot:1.1.0 command: "StartController -zkAddress zookeeper:2181 -controllerPort 9001" container_name: pinot-controller restart: unless-stopped @@ -27,7 +27,7 @@ services: depends_on: - zookeeper pinot-broker: - image: apachepinot/pinot:0.12.1 + image: apachepinot/pinot:1.1.0 command: "StartBroker -zkAddress zookeeper:2181" restart: unless-stopped container_name: "pinot-broker" @@ -38,7 +38,7 @@ services: depends_on: - pinot-controller pinot-server: - image: apachepinot/pinot:0.12.1 + image: apachepinot/pinot:1.1.0 command: "StartServer -zkAddress zookeeper:2181" restart: unless-stopped container_name: "pinot-server" diff --git a/docker/docker-compose-pinot.yml b/docker/docker-compose-pinot.yml index 6bd7be3dcf2..ea696b9e148 100644 --- a/docker/docker-compose-pinot.yml +++ b/docker/docker-compose-pinot.yml @@ -40,7 +40,7 @@ services: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 pinot-controller: - image: apachepinot/pinot:0.11.0 + image: apachepinot/pinot:1.1.0 command: "StartController -zkAddress zookeeper:2181" container_name: pinot-controller restart: unless-stopped @@ -51,7 +51,7 @@ services: depends_on: - zookeeper pinot-broker: - image: apachepinot/pinot:0.11.0 + image: apachepinot/pinot:1.1.0 command: "StartBroker -zkAddress zookeeper:2181" restart: unless-stopped container_name: "pinot-broker" @@ -62,7 +62,7 @@ services: depends_on: - pinot-controller pinot-server: - image: apachepinot/pinot:0.11.0 + image: apachepinot/pinot:1.1.0 command: "StartServer -zkAddress zookeeper:2181" restart: unless-stopped container_name: "pinot-server" diff --git a/schema/pinot/cadence-visibility-config.json b/schema/pinot/cadence-visibility-config.json index b78d59e152b..99639c34741 100644 --- a/schema/pinot/cadence-visibility-config.json +++ b/schema/pinot/cadence-visibility-config.json @@ -27,19 +27,18 @@ "stream.kafka.hlc.zk.connect.string": "zookeeper:2181/kafka", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", "stream.kafka.zk.broker.url": "zookeeper:2181/kafka", - "stream.kafka.broker.list": "kafka:9093", - "realtime.segment.flush.threshold.size": 30, - "realtime.segment.flush.threshold.rows": 30, - "realtime.segment.flush.threshold.time": "24h", - "realtime.segment.flush.threshold.segment.size": "50M" + "stream.kafka.broker.list": "kafka:9093" } }, "routing": { "instanceSelectorType": "strictReplicaGroup" }, "upsertConfig": { - "mode": "FULL" + "mode": "FULL", + "deleteRecordColumn": "IsDeleted", + "deletedKeysTTL": 86400, + "hashFunction": "NONE", + "enableSnapshot": false }, "metadata": {} } - diff --git a/service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go b/service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go index 5598c734f4b..ddf48d888fe 100644 --- a/service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go +++ b/service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go @@ -176,7 +176,6 @@ func (w *Workflow) getDomainWorkflowTypeCountPinotQuery(domainName string) (stri SELECT WorkflowType, COUNT(*) AS count FROM %s WHERE DomainID = '%s' -AND IsDeleted = false AND CloseStatus = -1 AND StartTime > 0 GROUP BY WorkflowType