Skip to content

Commit

Permalink
Add db retention support (MarquezProject#2486)
Browse files Browse the repository at this point in the history
* Add db migration to add cascade deletion on `fk`s

Signed-off-by: wslulciuc <[email protected]>

* Add `DbDataRetention` and `dataRetentionInDays` config

Signed-off-by: wslulciuc <[email protected]>

* Add `DbRetentionJob`

Signed-off-by: wslulciuc <[email protected]>

* Add `DbRetentionCommand`

Signed-off-by: wslulciuc <[email protected]>

* Add `frequencyMins` config for runs and rename `dbRetentionInDays`

Signed-off-by: wslulciuc <[email protected]>

* Add docs to `DbRetentionJob` and minor renaming

Signed-off-by: wslulciuc <[email protected]>

* Wrap `DbRetention.retentionOnDbOrError()` in `try/catch`

Signed-off-by: wslulciuc <[email protected]>

* Add docs to DbRetention

Signed-off-by: wslulciuc <[email protected]>

* continued: Add docs to `DbRetention`

Signed-off-by: wslulciuc <[email protected]>

* Add handling of `errorOnDbRetention`

Signed-off-by: wslulciuc <[email protected]>

* Add docs to `DbException` and `DbRetentionException`

Signed-off-by: wslulciuc <[email protected]>

* `info` -> `debug` when inserting column lineage

Signed-off-by: wslulciuc <[email protected]>

* Remove `dbRetention.enabled`

Signed-off-by: wslulciuc <[email protected]>

* Update handling of `StatementException`

Signed-off-by: wslulciuc <[email protected]>

* Minor changes

Signed-off-by: wslulciuc <[email protected]>

* Add `docs/faq.md`

Signed-off-by: wslulciuc <[email protected]>

* continued: `Add docs/faq.md`

Signed-off-by: wslulciuc <[email protected]>

* continued: Add `docs/faq.md`

Signed-off-by: wslulciuc <[email protected]>

* continued: Add `docs/faq.md`

Signed-off-by: wslulciuc <[email protected]>

* Define `DEFAULT_RETENTION_DAYS` constant in `DbRetention`

Signed-off-by: wslulciuc <[email protected]>

* Make chunk size in retention query configurable

Signed-off-by: wslulciuc <[email protected]>

* Remove `DATA_RETENTION_IN_DAYS` from `MarquezConfig`

Signed-off-by: wslulciuc <[email protected]>

* Update docs for chunk size config

Signed-off-by: wslulciuc <[email protected]>

* Remove error log from `DbRetention.retentionOnDbOrError()`

Signed-off-by: wslulciuc <[email protected]>

* Use `LOOP` for retention

Signed-off-by: wslulciuc <[email protected]>

* continued: Use `LOOP` for retention

Signed-off-by: wslulciuc <[email protected]>

* Use `numberOfRowsPerBatch`

Signed-off-by: wslulciuc <[email protected]>

* Use `--number-of-rows-per-batch`

Signed-off-by: wslulciuc <[email protected]>

* Add pause to prevent lock timeouts

Signed-off-by: wslulciuc <[email protected]>

* Add `FOR UPDATE SKIP LOCKED`

Signed-off-by: wslulciuc <[email protected]>

* Add `sql()`

Signed-off-by: wslulciuc <[email protected]>

* Add `--dry-run`

Signed-off-by: wslulciuc <[email protected]>

* Add `jdbi3-testcontainers`

Signed-off-by: wslulciuc <[email protected]>

* Remove shortened flag args

Signed-off-by: wslulciuc <[email protected]>

* Use `marquez.db.DbRetention.DEFAULT_DRY_RUN`

Signed-off-by: wslulciuc <[email protected]>

* Add DbRetention.retentionOnRuns()

Signed-off-by: wslulciuc <[email protected]>

* Add `DbMigration.migrateDbOrError(DataSource)`

Signed-off-by: wslulciuc <[email protected]>

* Add `TestingDb`

Signed-off-by: wslulciuc <[email protected]>

* Add `DbTest`

Signed-off-by: wslulciuc <[email protected]>

* Add `testRetentionOnDbOrError_withDatasetsOlderThanXDays()`

Signed-off-by: wslulciuc <[email protected]>

* Remove `jobs.DbRetentionConfig.dryRun`

Signed-off-by: wslulciuc <[email protected]>

* Add `--dry-run` option to `faq.md`

Signed-off-by: wslulciuc <[email protected]>

* continued: Add --dry-run option to faq.md

Signed-off-by: wslulciuc <[email protected]>

* continued: `Add testRetentionOnDbOrError_withDatasetsOlderThanXDays`

Signed-off-by: wslulciuc <[email protected]>

* Fix retention query for datasets and dataset versions

Signed-off-by: wslulciuc <[email protected]>

* Add test for retention on dataset versions

Signed-off-by: wslulciuc <[email protected]>

* Add comments to tests

Signed-off-by: wslulciuc <[email protected]>

* Add `testRetentionOnDbOrErrorWithDatasetVersionsOlderThanXDays_skipIfVersionAsInputForRun()`

Signed-off-by: wslulciuc <[email protected]>

* Add `testRetentionOnDbOrErrorWithJobsOlderThanXDays()`

Signed-off-by: wslulciuc <[email protected]>

* Add `testRetentionOnDbOrErrorWithJobVersionsOlderThanXDays()`

Signed-off-by: wslulciuc <[email protected]>

* Add tests for dry run

Signed-off-by: wslulciuc <[email protected]>

* Add testRetentionOnDbOrErrorWithRunsOlderThanXDays()

Signed-off-by: wslulciuc <[email protected]>

* Add `testRetentionOnDbOrErrorWithOlEventsOlderThanXDays()`

Signed-off-by: wslulciuc <[email protected]>

* continued: `Add testRetentionOnDbOrErrorWithOlEventsOlderThanXDays()`

Signed-off-by: wslulciuc <[email protected]>

* Add `javadocs` to `DbRetention`

Signed-off-by: wslulciuc <[email protected]>

* Run tests in order of retention

Signed-off-by: wslulciuc <[email protected]>

---------

Signed-off-by: wslulciuc <[email protected]>
Co-authored-by: Harel Shein <[email protected]>
  • Loading branch information
wslulciuc and Harel Shein authored Jul 18, 2023
1 parent 3d023e4 commit a515c91
Show file tree
Hide file tree
Showing 37 changed files with 3,173 additions and 45 deletions.
1 change: 1 addition & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ dependencies {

testImplementation "io.dropwizard:dropwizard-testing:${dropwizardVersion}"
testImplementation "org.jdbi:jdbi3-testing:${jdbi3Version}"
testImplementation "org.jdbi:jdbi3-testcontainers:${jdbi3Version}"
testImplementation "org.junit.vintage:junit-vintage-engine:${junit5Version}"
testImplementation "org.testcontainers:postgresql:${testcontainersVersion}"
testImplementation "org.testcontainers:junit-jupiter:${testcontainersVersion}"
Expand Down
63 changes: 39 additions & 24 deletions api/src/main/java/marquez/MarquezApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
import io.sentry.Sentry;
import java.util.EnumSet;
import javax.servlet.DispatcherType;
import javax.sql.DataSource;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.api.filter.JobRedirectFilter;
import marquez.cli.DbMigrationCommand;
import marquez.cli.DbRetentionCommand;
import marquez.cli.MetadataCommand;
import marquez.cli.SeedCommand;
import marquez.common.Utils;
import marquez.db.DbMigration;
import marquez.jobs.DbRetentionJob;
import marquez.logging.LoggingMdcFilter;
import marquez.tracing.SentryConfig;
import marquez.tracing.TracingContainerResponseFilter;
Expand Down Expand Up @@ -79,6 +80,7 @@ public void initialize(@NonNull Bootstrap<MarquezConfig> bootstrap) {
new EnvironmentVariableSubstitutor(ERROR_ON_UNDEFINED)));

// Add CLI commands
bootstrap.addCommand(new DbRetentionCommand());
bootstrap.addCommand(new MetadataCommand());
bootstrap.addCommand(new SeedCommand());

Expand All @@ -97,7 +99,7 @@ public void initialize(@NonNull Bootstrap<MarquezConfig> bootstrap) {
@Override
public void run(@NonNull MarquezConfig config, @NonNull Environment env) {
final DataSourceFactory sourceFactory = config.getDataSourceFactory();
final DataSource source = sourceFactory.build(env.metrics(), DB_SOURCE_NAME);
final ManagedDataSource source = sourceFactory.build(env.metrics(), DB_SOURCE_NAME);

log.info("Running startup actions...");

Expand All @@ -124,17 +126,51 @@ public void run(@NonNull MarquezConfig config, @NonNull Environment env) {
env.jersey().register(new TracingContainerResponseFilter());
}

MarquezContext marquezContext = buildMarquezContext(config, env, (ManagedDataSource) source);
final Jdbi jdbi = newJdbi(config, env, source);
final MarquezContext marquezContext =
MarquezContext.builder().jdbi(jdbi).tags(config.getTags()).build();

registerResources(config, env, marquezContext);
registerServlets(env);
registerFilters(env, marquezContext);

// Add scheduled jobs to lifecycle.
if (config.hasDbRetentionPolicy()) {
// Add job to apply retention policy to database.
env.lifecycle()
.manage(
new DbRetentionJob(
jdbi,
config.getDbRetention().getFrequencyMins(),
config.getDbRetention().getNumberOfRowsPerBatch(),
config.getDbRetention().getRetentionDays()));
}
}

private boolean isSentryEnabled(MarquezConfig config) {
return config.getSentry() != null
&& !config.getSentry().getDsn().equals(SentryConfig.DEFAULT_DSN);
}

/** Returns a new {@link Jdbi} object. */
private Jdbi newJdbi(
@NonNull MarquezConfig config, @NonNull Environment env, @NonNull ManagedDataSource source) {
final JdbiFactory factory = new JdbiFactory();
final Jdbi jdbi =
factory
.build(env, config.getDataSourceFactory(), source, DB_POSTGRES)
.installPlugin(new SqlObjectPlugin())
.installPlugin(new PostgresPlugin())
.installPlugin(new Jackson2Plugin());
SqlLogger sqlLogger = new InstrumentedSqlLogger(env.metrics());
if (isSentryEnabled(config)) {
sqlLogger = new TracingSQLLogger(sqlLogger);
}
jdbi.setSqlLogger(sqlLogger);
jdbi.getConfig(Jackson2Config.class).setMapper(Utils.getMapper());
return jdbi;
}

public void registerResources(
@NonNull MarquezConfig config, @NonNull Environment env, MarquezContext context) {

Expand All @@ -156,27 +192,6 @@ protected void addDefaultCommands(Bootstrap<MarquezConfig> bootstrap) {
super.addDefaultCommands(bootstrap);
}

private MarquezContext buildMarquezContext(
MarquezConfig config, Environment env, ManagedDataSource source) {
final JdbiFactory factory = new JdbiFactory();
final Jdbi jdbi =
factory
.build(env, config.getDataSourceFactory(), source, DB_POSTGRES)
.installPlugin(new SqlObjectPlugin())
.installPlugin(new PostgresPlugin())
.installPlugin(new Jackson2Plugin());
SqlLogger sqlLogger = new InstrumentedSqlLogger(env.metrics());
if (isSentryEnabled(config)) {
sqlLogger = new TracingSQLLogger(sqlLogger);
}
jdbi.setSqlLogger(sqlLogger);
jdbi.getConfig(Jackson2Config.class).setMapper(Utils.getMapper());

final MarquezContext context =
MarquezContext.builder().jdbi(jdbi).tags(config.getTags()).build();
return context;
}

private void registerServlets(@NonNull Environment env) {
log.debug("Registering servlets...");

Expand Down
12 changes: 12 additions & 0 deletions api/src/main/java/marquez/MarquezConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import io.dropwizard.db.DataSourceFactory;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import marquez.db.FlywayFactory;
import marquez.graphql.GraphqlConfig;
import marquez.jobs.DbRetentionConfig;
import marquez.service.models.Tag;
import marquez.tracing.SentryConfig;

Expand Down Expand Up @@ -40,4 +42,14 @@ public class MarquezConfig extends Configuration {
@Getter
@JsonProperty("sentry")
private final SentryConfig sentry = new SentryConfig();

@Getter
@Setter
@JsonProperty("dbRetention")
private DbRetentionConfig dbRetention; // OPTIONAL

/** Returns {@code true} if a data retention policy has been configured. */
public boolean hasDbRetentionPolicy() {
return (dbRetention != null);
}
}
114 changes: 114 additions & 0 deletions api/src/main/java/marquez/cli/DbRetentionCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.cli;

import static marquez.db.DbRetention.DEFAULT_DRY_RUN;
import static marquez.db.DbRetention.DEFAULT_NUMBER_OF_ROWS_PER_BATCH;
import static marquez.db.DbRetention.DEFAULT_RETENTION_DAYS;

import io.dropwizard.cli.ConfiguredCommand;
import io.dropwizard.db.DataSourceFactory;
import io.dropwizard.db.ManagedDataSource;
import io.dropwizard.setup.Bootstrap;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.MarquezConfig;
import marquez.db.DbRetention;
import marquez.db.exceptions.DbRetentionException;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.Namespace;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.postgres.PostgresPlugin;

/**
* A command to apply a one-off ad-hoc retention policy directly to source, dataset, and job
* metadata collected by Marquez.
*
* <h2>Usage</h2>
*
* For example, to override the {@code retention-days}:
*
* <pre>{@code
* java -jar marquez-api.jar db-retention --retention-days 30 marquez.yml
* }</pre>
*/
@Slf4j
public class DbRetentionCommand extends ConfiguredCommand<MarquezConfig> {
private static final String DB_SOURCE_NAME = "ad-hoc-db-retention-source";

/* Args for 'db-retention' command. */
private static final String CMD_ARG_NUMBER_OF_ROWS_PER_BATCH = "numberOfRowsPerBatch";
private static final String CMD_ARG_RETENTION_DAYS = "retentionDays";
private static final String CMD_ARG_DRY_RUN = "dryRun";

/* Define 'db-retention' command. */
public DbRetentionCommand() {
super("db-retention", "apply one-off ad-hoc retention policy directly to database");
}

@Override
public void configure(@NonNull net.sourceforge.argparse4j.inf.Subparser subparser) {
super.configure(subparser);
// Arg '--number-of-rows-per-batch'
subparser
.addArgument("--number-of-rows-per-batch")
.dest(CMD_ARG_NUMBER_OF_ROWS_PER_BATCH)
.type(Integer.class)
.required(false)
.setDefault(DEFAULT_NUMBER_OF_ROWS_PER_BATCH)
.help("the number of rows deleted per batch");
// Arg '--retention-days'
subparser
.addArgument("--retention-days")
.dest(CMD_ARG_RETENTION_DAYS)
.type(Integer.class)
.required(false)
.setDefault(DEFAULT_RETENTION_DAYS)
.help("the number of days to retain metadata");
// Arg '--dry-run'
subparser
.addArgument("--dry-run")
.dest(CMD_ARG_DRY_RUN)
.type(Boolean.class)
.required(false)
.setDefault(DEFAULT_DRY_RUN)
.action(Arguments.storeTrue())
.help(
"only output an estimate of metadata deleted by the retention policy, "
+ "without applying the policy on database");
}

@Override
protected void run(
@NonNull Bootstrap<MarquezConfig> bootstrap,
@NonNull Namespace namespace,
@NonNull MarquezConfig config)
throws Exception {
final int numberOfRowsPerBatch = namespace.getInt(CMD_ARG_NUMBER_OF_ROWS_PER_BATCH);
final int retentionDays = namespace.getInt(CMD_ARG_RETENTION_DAYS);
final boolean dryRun = namespace.getBoolean(CMD_ARG_DRY_RUN);

// Configure connection.
final DataSourceFactory sourceFactory = config.getDataSourceFactory();
final ManagedDataSource source =
sourceFactory.build(bootstrap.getMetricRegistry(), DB_SOURCE_NAME);

// Open connection.
final Jdbi jdbi = Jdbi.create(source);
jdbi.installPlugin(new PostgresPlugin()); // Add postgres support.

try {
// Attempt to apply a database retention policy. An exception is thrown on failed retention
// policy attempts requiring we handle the throwable and log the error.
DbRetention.retentionOnDbOrError(jdbi, numberOfRowsPerBatch, retentionDays, dryRun);
} catch (DbRetentionException errorOnDbRetention) {
log.error(
"Failed to apply retention policy of '{}' days to database!",
retentionDays,
errorOnDbRetention);
}
}
}
1 change: 1 addition & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ private Columns() {}

/* LINEAGE EVENT ROW COLUMNS */
public static final String EVENT = "event";
public static final String EVENT_TIME = "event_time";

public static UUID uuidOrNull(final ResultSet results, final String column) throws SQLException {
if (results.getObject(column) == null) {
Expand Down
10 changes: 8 additions & 2 deletions api/src/main/java/marquez/db/DbMigration.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,24 @@
public final class DbMigration {
private DbMigration() {}

private static final boolean DEFAULT_MIGRATE_DB_ON_STARTUP = false;

public static void migrateDbOrError(@NonNull final DataSource source) {
migrateDbOrError(new FlywayFactory(), source, DEFAULT_MIGRATE_DB_ON_STARTUP);
}

public static void migrateDbOrError(
@NonNull final FlywayFactory flywayFactory,
@NonNull final DataSource source,
final boolean migrateOnStartup) {
final boolean migrateDbOnStartup) {
final Flyway flyway = flywayFactory.build(source);
// Only attempt a database migration if there are pending changes to be applied,
// or on the initialization of a new database. Otherwise, error on pending changes
// when the flag 'migrateOnStartup' is set to 'false'.
if (!hasPendingDbMigrations(flyway)) {
log.info("No pending migrations found, skipping...");
return;
} else if (!migrateOnStartup && hasDbMigrationsApplied(flyway)) {
} else if (!migrateDbOnStartup && hasDbMigrationsApplied(flyway)) {
errorOnPendingDbMigrations(flyway);
}
// Attempt to perform a database migration. An exception is thrown on failed migration attempts
Expand Down
Loading

0 comments on commit a515c91

Please sign in to comment.