Skip to content

Commit

Permalink
chore: remove crash recover from warehouse (rudderlabs#5290)
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 authored Nov 16, 2024
1 parent 1f575ec commit 9abff79
Show file tree
Hide file tree
Showing 18 changed files with 0 additions and 489 deletions.
4 changes: 0 additions & 4 deletions warehouse/integrations/azure-synapse/azure-synapse.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,10 +829,6 @@ func (as *AzureSynapse) Setup(_ context.Context, warehouse model.Warehouse, uplo
return err
}

func (as *AzureSynapse) CrashRecover(ctx context.Context) error {
return as.dropDanglingStagingTables(ctx)
}

func (as *AzureSynapse) dropDanglingStagingTables(ctx context.Context) error {
sqlStatement := fmt.Sprintf(`
select
Expand Down
106 changes: 0 additions & 106 deletions warehouse/integrations/azure-synapse/azure_synapse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ import (

"github.com/rudderlabs/rudder-server/testhelper/backendconfigtest"
azuresynapse "github.com/rudderlabs/rudder-server/warehouse/integrations/azure-synapse"
"github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
mockuploader "github.com/rudderlabs/rudder-server/warehouse/internal/mocks/utils"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/validations"

"github.com/rudderlabs/compose-test/compose"

"github.com/DATA-DOG/go-sqlmock"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/compose-test/testcompose"
Expand Down Expand Up @@ -631,110 +629,6 @@ func TestIntegration(t *testing.T) {
require.Equal(t, records, whth.DiscardTestRecords())
})
})

t.Run("CrashRecovery", func(t *testing.T) {
c := testcompose.New(t, compose.FilePaths([]string{"testdata/docker-compose.yml", "../testdata/docker-compose.minio.yml"}))
c.Start(context.Background())

azureSynapsePort := c.Port("azure_synapse", 1433)
minioEndpoint := fmt.Sprintf("localhost:%d", c.Port("minio", 9000))

ctx := context.Background()
namespace := whth.RandSchema(destType)

warehouse := model.Warehouse{
Source: backendconfig.SourceT{
ID: "test_source_id",
},
Destination: backendconfig.DestinationT{
ID: "test_destination_id",
DestinationDefinition: backendconfig.DestinationDefinitionT{
Name: destType,
},
Config: map[string]any{
"host": host,
"database": database,
"user": user,
"password": password,
"port": strconv.Itoa(azureSynapsePort),
"sslMode": "disable",
"namespace": namespace,
"bucketProvider": whutils.MINIO,
"bucketName": bucketName,
"accessKeyID": accessKeyID,
"secretAccessKey": secretAccessKey,
"useSSL": false,
"endPoint": minioEndpoint,
"syncFrequency": "30",
"useRudderStorage": false,
},
},
WorkspaceID: "test_workspace_id",
Namespace: namespace,
}

tableName := "crash_recovery_test_table"

mockUploader := newMockUploader(t, []whutils.LoadFile{}, tableName, whutils.DiscardsSchema, whutils.DiscardsSchema)

t.Run("successful cleanup", func(t *testing.T) {
az := azuresynapse.New(config.New(), logger.NOP, stats.NOP)
err := az.Setup(ctx, warehouse, mockUploader)
require.NoError(t, err)

stagingTable := whutils.StagingTablePrefix(destType) + tableName

_, err = az.DB.ExecContext(ctx, fmt.Sprintf("IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '%[1]s') BEGIN EXEC('CREATE SCHEMA %[1]s') END;", namespace))
require.NoError(t, err)

_, err = az.DB.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %q.%q (id int)", namespace, stagingTable))
require.NoError(t, err)

var count int
err = az.DB.QueryRowContext(ctx, `
SELECT count(*)
FROM information_schema.tables
WHERE table_schema = @schema AND table_name = @table`,
sql.Named("schema", namespace),
sql.Named("table", stagingTable),
).Scan(&count)
require.NoError(t, err)

require.Equal(t, 1, count, "staging table should be created")

err = az.CrashRecover(ctx)
require.NoError(t, err)

err = az.DB.QueryRowContext(ctx, `
SELECT count(*)
FROM information_schema.tables
WHERE table_schema = @schema AND table_name = @table`,
sql.Named("schema", namespace),
sql.Named("table", stagingTable),
).Scan(&count)
require.NoError(t, err)
require.Equal(t, 0, count, "staging table should be dropped")
})

t.Run("query error", func(t *testing.T) {
az := azuresynapse.New(config.New(), logger.NOP, stats.NOP)
err := az.Setup(ctx, warehouse, mockUploader)
require.NoError(t, err)

db, dbMock, err := sqlmock.New()
require.NoError(t, err)
defer func() {
_ = db.Close()
}()

dbMock.ExpectQuery("select table_name").WillReturnError(fmt.Errorf("query error"))

// TODO: Add more test cases
az.DB = sqlquerywrapper.New(db)
err = az.CrashRecover(ctx)
require.ErrorContains(t, err, "query error")
})
})
}

func TestAzureSynapse_ProcessColumnValue(t *testing.T) {
Expand Down
4 changes: 0 additions & 4 deletions warehouse/integrations/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,10 +767,6 @@ func (bq *BigQuery) connect(ctx context.Context) (*middleware.Client, error) {
return middlewareClient, nil
}

func (bq *BigQuery) CrashRecover(ctx context.Context) error {
return bq.dropDanglingStagingTables(ctx)
}

func (bq *BigQuery) dropDanglingStagingTables(ctx context.Context) error {
sqlStatement := fmt.Sprintf(`
SELECT
Expand Down
5 changes: 0 additions & 5 deletions warehouse/integrations/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -957,11 +957,6 @@ func (ch *Clickhouse) Setup(_ context.Context, warehouse model.Warehouse, upload
return err
}

func (*Clickhouse) CrashRecover(context.Context) error {
// no-op: clickhouse does not need crash recovery
return nil
}

// FetchSchema queries clickhouse and returns the schema associated with provided namespace
func (ch *Clickhouse) FetchSchema(ctx context.Context) (model.Schema, model.Schema, error) {
schema := make(model.Schema)
Expand Down
5 changes: 0 additions & 5 deletions warehouse/integrations/datalake/datalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ func (d *Datalake) Setup(_ context.Context, warehouse model.Warehouse, uploader
return err
}

func (*Datalake) CrashRecover(context.Context) error {
// no-op: datalake does not need crash recovery
return nil
}

func (d *Datalake) FetchSchema(ctx context.Context) (model.Schema, model.Schema, error) {
return d.SchemaRepository.FetchSchema(ctx, d.Warehouse)
}
Expand Down
5 changes: 0 additions & 5 deletions warehouse/integrations/deltalake/deltalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,6 @@ func (d *Deltalake) connect() (*sqlmiddleware.DB, error) {
return middleware, nil
}

// CrashRecover crash recover scenarios
func (d *Deltalake) CrashRecover(ctx context.Context) error {
return d.dropDanglingStagingTables(ctx)
}

// dropDanglingStagingTables drops dangling staging tables
func (d *Deltalake) dropDanglingStagingTables(ctx context.Context) error {
tableNames, err := d.fetchTables(ctx, rudderStagingTableRegex)
Expand Down
1 change: 0 additions & 1 deletion warehouse/integrations/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

type Manager interface {
Setup(ctx context.Context, warehouse model.Warehouse, uploader warehouseutils.Uploader) error
CrashRecover(ctx context.Context) error
FetchSchema(ctx context.Context) (model.Schema, model.Schema, error)
CreateSchema(ctx context.Context) (err error)
CreateTable(ctx context.Context, tableName string, columnMap model.TableSchema) (err error)
Expand Down
4 changes: 0 additions & 4 deletions warehouse/integrations/mssql/mssql.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,10 +840,6 @@ func (ms *MSSQL) Setup(_ context.Context, warehouse model.Warehouse, uploader wa
return err
}

func (ms *MSSQL) CrashRecover(ctx context.Context) error {
return ms.dropDanglingStagingTables(ctx)
}

func (ms *MSSQL) dropDanglingStagingTables(ctx context.Context) error {
sqlStatement := fmt.Sprintf(`
select
Expand Down
4 changes: 0 additions & 4 deletions warehouse/integrations/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,6 @@ func (pg *Postgres) Setup(_ context.Context, warehouse model.Warehouse, uploader
return err
}

func (*Postgres) CrashRecover(context.Context) error {
return nil
}

// FetchSchema queries postgres and returns the schema associated with provided namespace
func (pg *Postgres) FetchSchema(ctx context.Context) (model.Schema, model.Schema, error) {
schema := make(model.Schema)
Expand Down
4 changes: 0 additions & 4 deletions warehouse/integrations/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -1304,10 +1304,6 @@ func (rs *Redshift) Cleanup(ctx context.Context) {
}
}

func (rs *Redshift) CrashRecover(ctx context.Context) error {
return rs.dropDanglingStagingTables(ctx)
}

func (*Redshift) IsEmpty(context.Context, model.Warehouse) (empty bool, err error) {
return
}
Expand Down
43 changes: 0 additions & 43 deletions warehouse/integrations/redshift/redshift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1403,49 +1403,6 @@ func TestIntegration(t *testing.T) {
)
require.Equal(t, records, whth.SampleTestRecords())
})
t.Run("crashRecover", func(t *testing.T) {
ctx := context.Background()
tableName := "crash_recovery_test_table"
stgTableName := whutils.StagingTableName(destType, tableName, 64)
mockUploader := newMockUploader(t, nil, tableName, schemaInUpload, schemaInUpload, whutils.LoadFileTypeParquet)

rs := redshift.New(config.New(), logger.NOP, stats.NOP)
err := rs.Setup(ctx, warehouse, mockUploader)
require.NoError(t, err)

err = rs.CreateSchema(ctx)
require.NoError(t, err)

err = rs.CreateTable(ctx, stgTableName, schemaInWarehouse)
require.NoError(t, err)

tableExists := func(t *testing.T) bool {
t.Helper()

var count int
err = rs.DB.DB.QueryRow(`
SELECT
count(*)
FROM
information_schema.tables
WHERE
table_schema = $1
AND table_name = $2;
`,
tc.warehouse.Namespace,
stgTableName,
).Scan(&count)
require.NoError(t, err)

return count == 1
}
require.True(t, tableExists(t))

err = rs.CrashRecover(ctx)
require.NoError(t, err)

require.False(t, tableExists(t))
})
})
}
})
Expand Down
5 changes: 0 additions & 5 deletions warehouse/integrations/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,11 +1303,6 @@ func (sf *Snowflake) DownloadIdentityRules(ctx context.Context, gzWriter *misc.G
return nil
}

func (*Snowflake) CrashRecover(context.Context) error {
// no-op: snowflake does not need crash recovery
return nil
}

func (sf *Snowflake) IsEmpty(ctx context.Context, warehouse model.Warehouse) (empty bool, err error) {
empty = true

Expand Down
44 changes: 0 additions & 44 deletions warehouse/internal/repo/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,50 +543,6 @@ func scanUpload(scan scanFn, upload *model.Upload) error {
return nil
}

// InterruptedDestinations returns a list of destination IDs, which have uploads was interrupted.
//
// Interrupted upload might require cleanup of intermediate upload tables.
func (u *Uploads) InterruptedDestinations(ctx context.Context, destinationType string) ([]string, error) {
destinationIDs := make([]string, 0)
rows, err := u.db.QueryContext(ctx, `
SELECT
destination_id
FROM
`+uploadsTableName+`
WHERE
in_progress = TRUE
AND destination_type = $1
AND (
status = $2
OR status = $3
);
`,
destinationType,
model.ExportingData,
model.ExportingDataFailed,
)
if err != nil {
return nil, fmt.Errorf("query for interrupted destinations: %w", err)
}

defer func() { _ = rows.Close() }()

for rows.Next() {
var destID string
err := rows.Scan(&destID)
if err != nil {
return nil, fmt.Errorf("scanning: %w", err)
}
destinationIDs = append(destinationIDs, destID)
}

if rows.Err() != nil {
return nil, fmt.Errorf("iterating rows: %w", rows.Err())
}

return destinationIDs, nil
}

// PendingTableUploads returns a list of pending table uploads for a given upload.
func (u *Uploads) PendingTableUploads(ctx context.Context, namespace string, uploadID int64, destID string) ([]model.PendingTableUpload, error) {
pendingTableUploads := make([]model.PendingTableUpload, 0)
Expand Down
24 changes: 0 additions & 24 deletions warehouse/internal/repo/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,30 +854,6 @@ func TestUploads_Delete(t *testing.T) {
require.Len(t, files, 1)
}

func TestUploads_InterruptedDestinations(t *testing.T) {
t.Parallel()
db := setupDB(t)

_, err := db.Exec(`INSERT INTO wh_uploads (destination_id, source_id, in_progress, destination_type, status, namespace, schema, created_at, updated_at)
VALUES
(1, 1, true, 'RS', 'exporting_data', '', '{}', NOW(), NOW()),
(2, 1, true, 'RS', 'exporting_data_failed', '', '{}', NOW(), NOW()),
(3, 1, true, 'RS', 'exporting_data_failed', '', '{}', NOW(), NOW()),
(4, 1, true, 'RS', 'exported_data', '', '{}', NOW(), NOW()),
(5, 1, true, 'RS', 'aborted', '', '{}', NOW(), NOW()),
(6, 1, true, 'RS', 'failed', '', '{}', NOW(), NOW()),
(7, 1, true, 'SNOWFLAKE', 'exporting_data', '', '{}', NOW(), NOW())
`)
require.NoError(t, err)

repoUpload := repo.NewUploads(db)
ids, err := repoUpload.InterruptedDestinations(context.Background(), "RS")
require.NoError(t, err)

require.Equal(t, []string{"1", "2", "3"}, ids)
}

func TestUploads_PendingTableUploads(t *testing.T) {
t.Parallel()

Expand Down
Loading

0 comments on commit 9abff79

Please sign in to comment.