Skip to content

Commit

Permalink
Add @FlywayTarget annotation to migration tests to control flyway upg… (
Browse files Browse the repository at this point in the history
MarquezProject#2035)

* Add @FlywayTarget annotation to migration tests to control flyway upgrades
Updated backfill tests to fix flyway migration version and update to stop relying on sql in DAOs since they change with future migrations

Signed-off-by: Michael Collado <[email protected]>

* Add comments to unit tests with fixed flyway migrations

Signed-off-by: Michael Collado <[email protected]>
  • Loading branch information
collado-mike authored Jul 18, 2022
1 parent 610f18a commit ccbdb96
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ LEFT JOIN LATERAL (
FROM lineage_events le
WHERE le.run_uuid=r.uuid
AND event->'run'->'facets'->'parent'->'run'->>'runId' IS NOT NULL
AND event->'run'->'facets'->'airflow_version' IS NOT NULL
AND event->'run'->'facets'->>'airflow_version' IS NOT NULL
) e ON e.run_uuid=r.uuid
WHERE e.parent_run_id IS NOT NULL
""";
Expand All @@ -76,8 +76,8 @@ INSERT INTO jobs (uuid, type, created_at, updated_at, namespace_uuid, name, desc
current_location
FROM jobs
WHERE namespace_name=:namespace AND name=:jobName
ON CONFLICT(name, namespace_uuid) WHERE parent_job_uuid IS NULL
DO UPDATE SET updated_at=now()
ON CONFLICT (name, namespace_uuid) WHERE parent_job_uuid IS NULL
DO UPDATE SET updated_at=EXCLUDED.updated_at
RETURNING uuid
""";
public static final String INSERT_PARENT_RUN_QUERY =
Expand Down
80 changes: 54 additions & 26 deletions api/src/test/java/marquez/db/BackfillTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import marquez.common.Utils;
import marquez.common.models.JobType;
import marquez.db.models.ExtendedRunRow;
import marquez.db.models.JobRow;
import marquez.db.models.NamespaceRow;
import marquez.db.models.RunArgsRow;
import marquez.service.models.LineageEvent;
Expand All @@ -36,7 +35,7 @@
public class BackfillTestUtils {
public static final String COMPLETE = "COMPLETE";

public static void writeNewEvent(
public static ExtendedRunRow writeNewEvent(
Jdbi jdbi,
String jobName,
Instant now,
Expand All @@ -45,25 +44,9 @@ public static void writeNewEvent(
String parentJobName)
throws SQLException, JsonProcessingException {
OpenLineageDao openLineageDao = jdbi.onDemand(OpenLineageDao.class);
JobDao jobDao = jdbi.onDemand(JobDao.class);
RunArgsDao runArgsDao = jdbi.onDemand(RunArgsDao.class);
RunDao runDao = jdbi.onDemand(RunDao.class);
PGobject pgInputs = new PGobject();
pgInputs.setType("json");
pgInputs.setValue("[]");
JobRow jobRow =
jobDao.upsertJob(
UUID.randomUUID(),
JobType.BATCH,
now,
namespace.getUuid(),
NAMESPACE,
jobName,
"description",
null,
null,
null,
pgInputs);
UUID jobUuid = writeJob(jdbi, jobName, now, namespace);

RunArgsRow runArgsRow =
runArgsDao.upsertRunArgs(
Expand All @@ -75,7 +58,7 @@ public static void writeNewEvent(
null,
runUuid.toString(),
now,
jobRow.getUuid(),
jobUuid,
null,
runArgsRow.getUuid(),
now,
Expand All @@ -91,6 +74,15 @@ public static void writeNewEvent(
Instant.now().atZone(LOCAL_ZONE).truncatedTo(ChronoUnit.HOURS));
nominalTimeRunFacet.setNominalEndTime(
nominalTimeRunFacet.getNominalStartTime().plus(1, ChronoUnit.HOURS));
Optional<ParentRunFacet> parentRun =
Optional.ofNullable(parentRunId)
.map(
runId ->
new ParentRunFacet(
PRODUCER_URL,
LineageTestUtils.SCHEMA_URL,
new RunLink(runId),
new JobLink(NAMESPACE, parentJobName)));
LineageEvent event =
new LineageEvent(
COMPLETE,
Expand All @@ -99,11 +91,7 @@ public static void writeNewEvent(
runUuid.toString(),
new RunFacet(
nominalTimeRunFacet,
new ParentRunFacet(
LineageTestUtils.PRODUCER_URL,
LineageTestUtils.SCHEMA_URL,
new RunLink(parentRunId),
new JobLink(NAMESPACE, parentJobName)),
parentRun.orElse(null),
ImmutableMap.of("airflow_version", ImmutableMap.of("version", "abc")))),
new LineageEvent.Job(
NAMESPACE, jobName, new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP)),
Expand All @@ -121,5 +109,45 @@ NAMESPACE, jobName, new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP)),
namespace.getName(),
eventJson,
PRODUCER_URL.toString());
return runRow;
}

public static UUID writeJob(Jdbi jdbi, String jobName, Instant now, NamespaceRow namespace)
throws SQLException {
PGobject pgInputs = new PGobject();
pgInputs.setType("json");
pgInputs.setValue("[]");
return jdbi.withHandle(
h -> {
UUID jobContextUuid =
h.createQuery(
"""
INSERT INTO job_contexts (uuid, created_at, context, checksum) VALUES (:uuid, :now, :context, :checksum)
ON CONFLICT (checksum) DO UPDATE SET created_at=EXCLUDED.created_at
RETURNING uuid
""")
.bind("uuid", UUID.randomUUID())
.bind("now", now)
.bind("context", "")
.bind("checksum", "")
.mapTo(UUID.class)
.first();
return h.createQuery(
"""
INSERT INTO jobs (uuid, type, created_at, updated_at, namespace_uuid, name, namespace_name, current_job_context_uuid, current_inputs)
VALUES (:uuid, :type, :now, :now, :namespaceUuid, :name, :namespaceName, :currentJobContextUuid, :currentInputs)
RETURNING uuid
""")
.bind("uuid", UUID.randomUUID())
.bind("type", marquez.client.models.JobType.BATCH)
.bind("now", now)
.bind("namespaceUuid", namespace.getUuid())
.bind("name", jobName)
.bind("namespaceName", namespace.getName())
.bind("currentJobContextUuid", jobContextUuid)
.bind("currentInputs", pgInputs)
.mapTo(UUID.class)
.first();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,23 @@
package marquez.db.migrations;

import static marquez.db.LineageTestUtils.NAMESPACE;
import static marquez.db.LineageTestUtils.createLineageRow;
import static org.assertj.core.api.Assertions.assertThat;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import marquez.db.BackfillTestUtils;
import marquez.db.JobDao;
import marquez.db.LineageTestUtils;
import marquez.db.NamespaceDao;
import marquez.db.OpenLineageDao;
import marquez.db.RunArgsDao;
import marquez.db.RunDao;
import marquez.db.models.NamespaceRow;
import marquez.jdbi.JdbiExternalPostgresExtension.FlywayTarget;
import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
import marquez.service.models.Job;
import marquez.service.models.LineageEvent.JobFacet;
import org.flywaydb.core.api.configuration.Configuration;
import org.flywaydb.core.api.migration.Context;
import org.jdbi.v3.core.Jdbi;
Expand All @@ -35,6 +31,9 @@
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(MarquezJdbiExternalPostgresExtension.class)
// fix the flyway migration up to v44 since we depend on the database structure as it exists at this
// point in time. The migration will only ever be applied on a database at this version.
@FlywayTarget("44")
class V44_2__BackfillAirflowParentRunsTest {

static Jdbi jdbi;
Expand Down Expand Up @@ -66,13 +65,7 @@ public void testMigrateAirflowTasks() throws SQLException, JsonProcessingExcepti
BackfillTestUtils.writeNewEvent(
jdbi, "airflowDag.task2", now, namespace, "schedule:00:00:00", task1Name);

createLineageRow(
openLineageDao,
"a_non_airflow_task",
BackfillTestUtils.COMPLETE,
new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP),
Collections.emptyList(),
Collections.emptyList());
BackfillTestUtils.writeNewEvent(jdbi, "a_non_airflow_task", now, namespace, null, null);

jdbi.useHandle(
handle -> {
Expand All @@ -94,7 +87,18 @@ public Connection getConnection() {
throw new AssertionError("Unable to execute migration", e);
}
});
Optional<Job> jobByName = jobDao.findJobByName(NAMESPACE, dagName);
assertThat(jobByName).isPresent();
Optional<String> jobNameResult =
jdbi.withHandle(
h ->
h.createQuery(
"""
SELECT name FROM jobs_view
WHERE namespace_name=:namespace AND simple_name=:jobName
""")
.bind("namespace", NAMESPACE)
.bind("jobName", dagName)
.mapTo(String.class)
.findFirst());
assertThat(jobNameResult).isPresent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,20 @@

import static marquez.db.BackfillTestUtils.writeNewEvent;
import static marquez.db.LineageTestUtils.NAMESPACE;
import static marquez.db.LineageTestUtils.createLineageRow;
import static org.assertj.core.api.Assertions.assertThat;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import marquez.common.models.JobName;
import marquez.db.JobDao;
import marquez.db.LineageTestUtils;
import marquez.db.NamespaceDao;
import marquez.db.OpenLineageDao;
import marquez.db.models.ExtendedRunRow;
import marquez.db.models.NamespaceRow;
import marquez.db.models.UpdateLineageRow;
import marquez.jdbi.JdbiExternalPostgresExtension.FlywayTarget;
import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
import marquez.service.models.Job;
import marquez.service.models.LineageEvent.JobFacet;
import org.flywaydb.core.api.configuration.Configuration;
import org.flywaydb.core.api.migration.Context;
import org.jdbi.v3.core.Jdbi;
Expand All @@ -35,6 +29,9 @@
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(MarquezJdbiExternalPostgresExtension.class)
// fix the flyway migration up to v44 since we depend on the database structure as it exists at this
// point in time. The migration will only ever be applied on a database at this version.
@FlywayTarget("44")
class V44_3_BackfillJobsWithParentsTest {

static Jdbi jdbi;
Expand All @@ -48,26 +45,16 @@ public static void setUpOnce(Jdbi jdbi) {

@Test
public void testBackfill() throws SQLException, JsonProcessingException {
String parentName = "parentJob";
UpdateLineageRow parentJob =
createLineageRow(
openLineageDao,
parentName,
"COMPLETE",
new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP),
Collections.emptyList(),
Collections.emptyList());

NamespaceDao namespaceDao = jdbi.onDemand(NamespaceDao.class);
Instant now = Instant.now();
NamespaceRow namespace =
namespaceDao.upsertNamespaceRow(UUID.randomUUID(), now, NAMESPACE, "me");
String parentName = "parentJob";
ExtendedRunRow parentRun = writeNewEvent(jdbi, parentName, now, namespace, null, null);

String task1Name = "task1";
writeNewEvent(
jdbi, task1Name, now, namespace, parentJob.getRun().getUuid().toString(), parentName);
writeNewEvent(
jdbi, "task2", now, namespace, parentJob.getRun().getUuid().toString(), parentName);
writeNewEvent(jdbi, task1Name, now, namespace, parentRun.getUuid().toString(), parentName);
writeNewEvent(jdbi, "task2", now, namespace, parentRun.getUuid().toString(), parentName);

jdbi.useHandle(
handle -> {
Expand All @@ -92,11 +79,13 @@ public Connection getConnection() {
}
});

JobDao jobDao = jdbi.onDemand(JobDao.class);
Optional<Job> jobByName = jobDao.findJobByName(NAMESPACE, task1Name);
assertThat(jobByName)
.isPresent()
.get()
.hasFieldOrPropertyWithValue("name", new JobName(parentName + "." + task1Name));
Optional<String> jobName =
jdbi.withHandle(
h ->
h.createQuery("SELECT name FROM jobs_view WHERE simple_name=:jobName")
.bind("jobName", task1Name)
.mapTo(String.class)
.findFirst());
assertThat(jobName).isPresent().get().isEqualTo(parentName + "." + task1Name);
}
}
20 changes: 17 additions & 3 deletions api/src/test/java/marquez/jdbi/JdbiExternalPostgresExtension.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@

package marquez.jdbi;

import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import javax.sql.DataSource;
import org.flywaydb.core.Flyway;
import org.flywaydb.core.api.configuration.FluentConfiguration;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.core.spi.JdbiPlugin;
Expand All @@ -24,6 +27,11 @@
public abstract class JdbiExternalPostgresExtension
implements BeforeAllCallback, AfterAllCallback, ParameterResolver {

@Retention(RetentionPolicy.RUNTIME)
public @interface FlywayTarget {
String value();
}

protected final List<JdbiPlugin> plugins = new ArrayList<>();
private final ReentrantLock lock = new ReentrantLock();
private volatile DataSource dataSource;
Expand Down Expand Up @@ -82,12 +90,18 @@ public Handle getHandle() {
@Override
public void beforeAll(ExtensionContext context) throws Exception {
if (migration != null) {
flyway =
FluentConfiguration flywayConfig =
Flyway.configure()
.dataSource(getDataSource())
.locations(migration.paths.toArray(new String[0]))
.schemas(migration.schemas.toArray(new String[0]))
.load();
.schemas(migration.schemas.toArray(new String[0]));

FlywayTarget target = context.getRequiredTestClass().getAnnotation(FlywayTarget.class);
if (target != null) {
flywayConfig.target(target.value());
}

flyway = flywayConfig.load();
flyway.migrate();
}

Expand Down

0 comments on commit ccbdb96

Please sign in to comment.