Skip to content

Commit

Permalink
sql: some minor improvements (cadence-workflow#1432)
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Feb 8, 2019
1 parent b39331d commit 06ce2bf
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 57 deletions.
47 changes: 11 additions & 36 deletions common/persistence/sql/sqlExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,32 +161,7 @@ func (m *sqlExecutionManager) createWorkflowExecutionTx(tx sqldb.Tx, request *p.
}

func (m *sqlExecutionManager) GetWorkflowExecution(request *p.GetWorkflowExecutionRequest) (*p.InternalGetWorkflowExecutionResponse, error) {
tx, err := m.db.BeginTx()
if err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("GetWorkflowExecution operation failed. Failed to start transaction. Error: %v", err),
}
}
defer tx.Rollback()

// Have to lock next_event_id so that things aren't modified while we are getting
// all the other parts of mutable state
// TODO Replace with repeatable read transaction level

if _, err := lockNextEventID(tx, m.shardID, request.DomainID, *request.Execution.WorkflowId, *request.Execution.RunId); err != nil {
switch err.(type) {
case *workflow.EntityNotExistsError:
return nil, &workflow.EntityNotExistsError{
Message: fmt.Sprintf("GetWorkflowExecution operation failed. Error: %v", err),
}
default:
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("GetWorkflowExecution operation failed. Failed to write-lock executions row. Error: %v", err),
}
}
}

execution, err := tx.SelectFromExecutions(&sqldb.ExecutionsFilter{
execution, err := m.db.SelectFromExecutions(&sqldb.ExecutionsFilter{
ShardID: m.shardID, DomainID: request.DomainID, WorkflowID: *request.Execution.WorkflowId, RunID: *request.Execution.RunId})

if err != nil {
Expand Down Expand Up @@ -281,7 +256,7 @@ func (m *sqlExecutionManager) GetWorkflowExecution(request *p.GetWorkflowExecuti
}
{
var err error
state.ActivitInfos, err = getActivityInfoMap(tx,
state.ActivitInfos, err = getActivityInfoMap(m.db,
m.shardID,
request.DomainID,
*request.Execution.WorkflowId,
Expand All @@ -295,7 +270,7 @@ func (m *sqlExecutionManager) GetWorkflowExecution(request *p.GetWorkflowExecuti

{
var err error
state.TimerInfos, err = getTimerInfoMap(tx,
state.TimerInfos, err = getTimerInfoMap(m.db,
m.shardID,
request.DomainID,
*request.Execution.WorkflowId,
Expand All @@ -309,7 +284,7 @@ func (m *sqlExecutionManager) GetWorkflowExecution(request *p.GetWorkflowExecuti

{
var err error
state.ChildExecutionInfos, err = getChildExecutionInfoMap(tx,
state.ChildExecutionInfos, err = getChildExecutionInfoMap(m.db,
m.shardID,
request.DomainID,
*request.Execution.WorkflowId,
Expand All @@ -323,7 +298,7 @@ func (m *sqlExecutionManager) GetWorkflowExecution(request *p.GetWorkflowExecuti

{
var err error
state.RequestCancelInfos, err = getRequestCancelInfoMap(tx,
state.RequestCancelInfos, err = getRequestCancelInfoMap(m.db,
m.shardID,
request.DomainID,
*request.Execution.WorkflowId,
Expand All @@ -337,7 +312,7 @@ func (m *sqlExecutionManager) GetWorkflowExecution(request *p.GetWorkflowExecuti

{
var err error
state.SignalInfos, err = getSignalInfoMap(tx,
state.SignalInfos, err = getSignalInfoMap(m.db,
m.shardID,
request.DomainID,
*request.Execution.WorkflowId,
Expand All @@ -351,7 +326,7 @@ func (m *sqlExecutionManager) GetWorkflowExecution(request *p.GetWorkflowExecuti

{
var err error
state.BufferedEvents, err = getBufferedEvents(tx,
state.BufferedEvents, err = getBufferedEvents(m.db,
m.shardID,
request.DomainID,
*request.Execution.WorkflowId,
Expand All @@ -365,7 +340,7 @@ func (m *sqlExecutionManager) GetWorkflowExecution(request *p.GetWorkflowExecuti

{
var err error
state.BufferedReplicationTasks, err = getBufferedReplicationTasks(tx,
state.BufferedReplicationTasks, err = getBufferedReplicationTasks(m.db,
m.shardID,
request.DomainID,
*request.Execution.WorkflowId,
Expand All @@ -379,7 +354,7 @@ func (m *sqlExecutionManager) GetWorkflowExecution(request *p.GetWorkflowExecuti

{
var err error
state.SignalRequestedIDs, err = getSignalsRequested(tx,
state.SignalRequestedIDs, err = getSignalsRequested(m.db,
m.shardID,
request.DomainID,
*request.Execution.WorkflowId,
Expand All @@ -394,8 +369,8 @@ func (m *sqlExecutionManager) GetWorkflowExecution(request *p.GetWorkflowExecuti
return &p.InternalGetWorkflowExecutionResponse{State: &state}, nil
}

func getBufferedEvents(tx sqldb.Tx, shardID int, domainID string, workflowID string, runID string) (result []*p.DataBlob, err error) {
rows, err := tx.SelectFromBufferedEvents(&sqldb.BufferedEventsFilter{
func getBufferedEvents(db sqldb.Interface, shardID int, domainID string, workflowID string, runID string) (result []*p.DataBlob, err error) {
rows, err := db.SelectFromBufferedEvents(&sqldb.BufferedEventsFilter{
ShardID: shardID,
DomainID: domainID,
WorkflowID: workflowID,
Expand Down
1 change: 1 addition & 0 deletions common/persistence/sql/sqlTaskManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func (m *sqlTaskManager) GetTasks(request *persistence.GetTasksRequest) (*persis
TaskType: int64(request.TaskType),
MinTaskID: &request.ReadLevel,
MaxTaskID: &request.MaxReadLevel,
PageSize: &request.BatchSize,
})
if err != nil {
return nil, &workflow.InternalServiceError{
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/sql/storage/mysql/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ task_type = :task_type

getTaskQry = `SELECT workflow_id, run_id, schedule_id, task_id ` +
`FROM tasks ` +
`WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id > ? AND task_id <= ?`
`WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id > ? AND task_id <= ? LIMIT ?`

createTaskQry = `INSERT INTO ` +
`tasks(domain_id, workflow_id, run_id, schedule_id, task_list_name, task_type, task_id, expiry_ts) ` +
Expand All @@ -81,7 +81,7 @@ func (mdb *DB) SelectFromTasks(filter *sqldb.TasksFilter) ([]sqldb.TasksRow, err
var err error
var rows []sqldb.TasksRow
err = mdb.conn.Select(&rows, getTaskQry, filter.DomainID,
filter.TaskListName, filter.TaskType, *filter.MinTaskID, *filter.MaxTaskID)
filter.TaskListName, filter.TaskType, *filter.MinTaskID, *filter.MaxTaskID, *filter.PageSize)
if err != nil {
return nil, err
}
Expand Down
5 changes: 5 additions & 0 deletions common/persistence/sql/storage/sqldb/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ type (
TaskID *int64
MinTaskID *int64
MaxTaskID *int64
PageSize *int
}

// TaskListsRow represents a row in task_lists table
Expand Down Expand Up @@ -565,7 +566,11 @@ type (
WriteLockShards(filter *ShardsFilter) (int, error)

InsertIntoTasks(rows []TasksRow) (sql.Result, error)
// SelectFromTasks retrieves one or more rows from the tasks table
// Required filter params - {domainID, tasklistName, taskType, minTaskID, maxTaskID, pageSize}
SelectFromTasks(filter *TasksFilter) ([]TasksRow, error)
// DeleteFromTasks deletes a row from tasks table
// Required filter params - {domainID, tasklistName, taskType, taskID}
DeleteFromTasks(filter *TasksFilter) (sql.Result, error)

InsertIntoTaskLists(row *TaskListsRow) (sql.Result, error)
Expand Down
24 changes: 12 additions & 12 deletions common/persistence/sql/workflowStateMaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ func updateActivityInfos(tx sqldb.Tx,
return nil
}

func getActivityInfoMap(tx sqldb.Tx,
func getActivityInfoMap(db sqldb.Interface,
shardID int,
domainID,
workflowID,
runID string) (map[int64]*persistence.InternalActivityInfo, error) {
rows, err := tx.SelectFromActivityInfoMaps(&sqldb.ActivityInfoMapsFilter{
rows, err := db.SelectFromActivityInfoMaps(&sqldb.ActivityInfoMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
Expand Down Expand Up @@ -279,12 +279,12 @@ func updateTimerInfos(tx sqldb.Tx,
return nil
}

func getTimerInfoMap(tx sqldb.Tx,
func getTimerInfoMap(db sqldb.Interface,
shardID int,
domainID,
workflowID,
runID string) (map[string]*persistence.TimerInfo, error) {
rows, err := tx.SelectFromTimerInfoMaps(&sqldb.TimerInfoMapsFilter{
rows, err := db.SelectFromTimerInfoMaps(&sqldb.TimerInfoMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
Expand Down Expand Up @@ -378,12 +378,12 @@ func updateChildExecutionInfos(tx sqldb.Tx,
return nil
}

func getChildExecutionInfoMap(tx sqldb.Tx,
func getChildExecutionInfoMap(db sqldb.Interface,
shardID int,
domainID,
workflowID,
runID string) (map[int64]*persistence.InternalChildExecutionInfo, error) {
rows, err := tx.SelectFromChildExecutionInfoMaps(&sqldb.ChildExecutionInfoMapsFilter{
rows, err := db.SelectFromChildExecutionInfoMaps(&sqldb.ChildExecutionInfoMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
Expand Down Expand Up @@ -490,12 +490,12 @@ func updateRequestCancelInfos(tx sqldb.Tx,
return nil
}

func getRequestCancelInfoMap(tx sqldb.Tx,
func getRequestCancelInfoMap(db sqldb.Interface,
shardID int,
domainID,
workflowID,
runID string) (map[int64]*persistence.RequestCancelInfo, error) {
rows, err := tx.SelectFromRequestCancelInfoMaps(&sqldb.RequestCancelInfoMapsFilter{
rows, err := db.SelectFromRequestCancelInfoMaps(&sqldb.RequestCancelInfoMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
Expand Down Expand Up @@ -592,12 +592,12 @@ func updateSignalInfos(tx sqldb.Tx,
return nil
}

func getSignalInfoMap(tx sqldb.Tx,
func getSignalInfoMap(db sqldb.Interface,
shardID int,
domainID,
workflowID,
runID string) (map[int64]*persistence.SignalInfo, error) {
rows, err := tx.SelectFromSignalInfoMaps(&sqldb.SignalInfoMapsFilter{
rows, err := db.SelectFromSignalInfoMaps(&sqldb.SignalInfoMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
Expand Down Expand Up @@ -700,12 +700,12 @@ func updateBufferedReplicationTasks(tx sqldb.Tx,
return nil
}

func getBufferedReplicationTasks(tx sqldb.Tx,
func getBufferedReplicationTasks(db sqldb.Interface,
shardID int,
domainID,
workflowID,
runID string) (map[int64]*persistence.InternalBufferedReplicationTask, error) {
rows, err := tx.SelectFromBufferedReplicationTasks(&sqldb.BufferedReplicationTaskMapsFilter{
rows, err := db.SelectFromBufferedReplicationTasks(&sqldb.BufferedReplicationTaskMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/sql/workflowStateNonMaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func updateSignalsRequested(tx sqldb.Tx,
return nil
}

func getSignalsRequested(tx sqldb.Tx,
func getSignalsRequested(db sqldb.Interface,
shardID int,
domainID,
workflowID,
runID string) (map[string]struct{}, error) {
rows, err := tx.SelectFromSignalsRequestedSets(&sqldb.SignalsRequestedSetsFilter{
rows, err := db.SelectFromSignalsRequestedSets(&sqldb.SignalsRequestedSetsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
Expand Down
4 changes: 2 additions & 2 deletions schema/mysql/v56/cadence/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ CREATE TABLE events (
run_id VARCHAR(64) NOT NULL,
first_event_id BIGINT NOT NULL,
batch_version BIGINT,
range_id INT NOT NULL,
tx_id INT NOT NULL,
range_id BIGINT NOT NULL,
tx_id BIGINT NOT NULL,
data MEDIUMBLOB NOT NULL,
data_encoding VARCHAR(64) NOT NULL,
PRIMARY KEY (domain_id, workflow_id, run_id, first_event_id)
Expand Down
6 changes: 3 additions & 3 deletions schema/mysql/v57/cadence/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ CREATE TABLE events (
run_id VARCHAR(64) NOT NULL,
first_event_id BIGINT NOT NULL,
batch_version BIGINT,
range_id INT NOT NULL,
tx_id INT NOT NULL,
data BLOB NOT NULL,
range_id BIGINT NOT NULL,
tx_id BIGINT NOT NULL,
data MEDIUMBLOB NOT NULL,
data_encoding VARCHAR(64) NOT NULL,
PRIMARY KEY (domain_id, workflow_id, run_id, first_event_id)
);
Expand Down

0 comments on commit 06ce2bf

Please sign in to comment.