Skip to content

Commit

Permalink
fix dataset list error (MarquezProject#266)
Browse files Browse the repository at this point in the history
* update dataset findAll query to use ns name

* add exception handling into /datasets endpoint

* all fixes required to successfully wire up a DatasetDao test

* the unit test

* add new Generator methods

* rename DataSource::dataSource attribute to DataSource::name and remove DataSource::type attribute

* fix uncaught merge errors
  • Loading branch information
sshah-wework authored Feb 15, 2019
1 parent 6d3e953 commit 9aeeafa
Show file tree
Hide file tree
Showing 16 changed files with 180 additions and 33 deletions.
13 changes: 9 additions & 4 deletions src/main/java/marquez/api/resources/DatasetResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.List;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.InternalServerErrorException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
Expand Down Expand Up @@ -52,9 +53,13 @@ public Response list(
throw new WebApplicationException(
String.format("The namespace %s does not exist.", namespaceString), NOT_FOUND);
}
final List<Dataset> datasets =
datasetService.getAll(NamespaceName.fromString(namespaceString), limit, offset);
final List<DatasetResponse> datasetResponses = map(datasets);
return Response.ok(new DatasetsResponse(datasetResponses)).build();
try {
final List<Dataset> datasets =
datasetService.getAll(NamespaceName.fromString(namespaceString), limit, offset);
final List<DatasetResponse> datasetResponses = map(datasets);
return Response.ok(new DatasetsResponse(datasetResponses)).build();
} catch (Exception e) {
throw new InternalServerErrorException("unexpected error while fetching datasets", e);
}
}
}
8 changes: 4 additions & 4 deletions src/main/java/marquez/db/DataSourceDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
@RegisterRowMapper(DataSourceRowMapper.class)
public interface DataSourceDao {
@SqlUpdate(
"INSERT INTO data_sources (uuid, data_source, connection_url) "
+ "VALUES (:uuid, :dataSource, :connectionUrl)")
"INSERT INTO datasources (guid, name, connection_url) "
+ "VALUES (:uuid, :name, :connectionUrl)")
void insert(@BindBean DataSourceRow dataSourceRow);

@SqlQuery("SELECT * FROM data_sources WHERE uuid = :uuid")
@SqlQuery("SELECT * FROM datasources WHERE guid = :uuid")
Optional<DataSourceRow> findBy(@Bind("uuid") UUID uuid);

@SqlQuery("SELECT * FROM data_sources LIMIT :limit OFFSET :offset")
@SqlQuery("SELECT * FROM datasources LIMIT :limit OFFSET :offset")
List<DataSourceRow> findAll(@Bind("limit") Integer limit, @Bind("offset") Integer offset);
}
18 changes: 12 additions & 6 deletions src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public interface DatasetDao {
DbTableVersionDao createDbTableVersionDao();

@SqlUpdate(
"INSERT INTO datasets (uuid, namespace_uuid, datasource_uuid, urn, description) "
+ "VALUES (:uuid, :namespaceUuid, :datasourceUuid, :urn, :description)")
"INSERT INTO datasets (guid, namespace_guid, datasource_uuid, urn, description) "
+ "VALUES (:uuid, :namespaceUuid, :dataSourceUuid, :urn, :description)")
void insert(@BindBean DatasetRow datasetRow);

@Transaction
Expand All @@ -41,11 +41,12 @@ default void insertAll(
createDataSourceDao().insert(dataSourceRow);
insert(datasetRow);
createDbTableVersionDao().insertAll(dbTableInfoRow, dbTableVersionRow);
updateCurrentVersion(datasetRow.getUuid(), Instant.now(), dbTableVersionRow.getUuid());
}

@SqlUpdate(
"UPDATE datasets SET updated_at = :updatedAt, current_version = :currentVersion"
+ "WHERE uuid = :uuid")
"UPDATE datasets SET updated_at = :updatedAt, current_version_uuid = :currentVersion "
+ "WHERE guid = :uuid")
void updateCurrentVersion(
@Bind("uuid") UUID uuid,
@Bind("updatedAt") Instant updatedAt,
Expand All @@ -57,9 +58,14 @@ void updateCurrentVersion(
@SqlQuery("SELECT * FROM datasets WHERE urn = :urn.value")
Optional<DatasetRow> findBy(@BindBean("urn") DatasetUrn urn);

@SqlQuery("SELECT * FROM datasets LIMIT :limit OFFSET :offset")
@SqlQuery(
"SELECT * "
+ "FROM datasets d "
+ "INNER JOIN namespaces n "
+ " ON (n.guid = d.namespace_guid AND n.name=:namespace.value)"
+ "LIMIT :limit OFFSET :offset")
List<DatasetRow> findAll(
@Bind("namespace") NamespaceName namespaceName,
@BindBean("namespace") NamespaceName namespaceName,
@Bind("limit") Integer limit,
@Bind("offset") Integer offset);
}
3 changes: 2 additions & 1 deletion src/main/java/marquez/db/DbTableInfoDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

@RegisterRowMapper(DbTableInfoRowMapper.class)
public interface DbTableInfoDao {
@SqlUpdate("INSERT INTO db_table_infos (uuid, db, db_schema) VALUES (:uuid, :db, :dbSchema)")
@SqlUpdate(
"INSERT INTO db_table_infos (uuid, db_name, db_schema_name) VALUES (:uuid, :db, :dbSchema)")
void insert(@BindBean DbTableInfoRow dbTableInfoRow);

@SqlQuery("SELECT * FROM db_table_info WHERE uuid = :uuid")
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/marquez/db/DbTableVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public interface DbTableVersionDao {
DbTableInfoDao createDbTableInfoDao();

@SqlUpdate(
"INSERT INTO db_table_versions (uuid, dataset_uuid, db_table_info_uuid, db_table) "
"INSERT INTO db_table_versions (guid, dataset_guid, db_table_info_uuid, db_table_name) "
+ "VALUES (:uuid, :datasetUuid, :dbTableInfoUuid, :dbTable)")
void insert(@BindBean DbTableVersionRow dbTableVersionRow);

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/marquez/db/mappers/DataSourceRowMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public DataSourceRow map(@NonNull ResultSet results, @NonNull StatementContext c
return DataSourceRow.builder()
.uuid(UUID.fromString(results.getString("uuid")))
.createdAt(results.getDate("created_at").toInstant())
.dataSource(results.getString("data_source"))
.name(results.getString("name"))
.connectionUrl(results.getString("connection_url"))
.build();
}
Expand Down
18 changes: 12 additions & 6 deletions src/main/java/marquez/db/mappers/DatasetRowMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,20 @@ public final class DatasetRowMapper implements RowMapper<DatasetRow> {
public DatasetRow map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
return DatasetRow.builder()
.uuid(UUID.fromString(results.getString("uuid")))
.createdAt(results.getDate("created_at").toInstant())
.updatedAt(results.getDate("updated_at").toInstant())
.namespaceUuid(UUID.fromString(results.getString("namespace_uuid")))
.dataSourceUuid(UUID.fromString(results.getString("data_source_uuid")))
.uuid(UUID.fromString(results.getString("guid")))
.createdAt(results.getTimestamp("created_at").toInstant())
.updatedAt(
results.getTimestamp("updated_at") == null
? null
: results.getTimestamp("updated_at").toInstant())
.namespaceUuid(UUID.fromString(results.getString("namespace_guid")))
.dataSourceUuid(UUID.fromString(results.getString("datasource_uuid")))
.urn(results.getString("urn"))
.description(results.getString("description"))
.currentVersion(UUID.fromString(results.getString("current_version")))
.currentVersion(
results.getString("current_version_uuid") == null
? null
: UUID.fromString(results.getString("current_version_uuid")))
.build();
}
}
2 changes: 1 addition & 1 deletion src/main/java/marquez/db/models/DataSourceRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
@Builder
public final class DataSourceRow {
@Getter @NonNull private final UUID uuid;
@Getter @NonNull private final String dataSource;
@Getter @NonNull private final String name;
@Getter @NonNull private final String connectionUrl;
@Getter private final Instant createdAt;
}
8 changes: 4 additions & 4 deletions src/main/java/marquez/db/models/DatasetRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
@ToString
@Builder
public final class DatasetRow {
@Getter @NonNull private final UUID uuid;
@Getter @NonNull private final Instant createdAt;
@Getter @NonNull private final Instant updatedAt;
@Getter private final UUID uuid;
@Getter private final Instant createdAt;
@Getter private final Instant updatedAt;
@Getter @NonNull private final UUID namespaceUuid;
@Getter @NonNull private final UUID dataSourceUuid;
@Getter @NonNull private final String urn;
@Getter @NonNull private final UUID currentVersion;
@Getter private final UUID currentVersion;
private final String description;

public Optional<String> getDescription() {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/marquez/db/models/DbTableInfoRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
@Builder
public final class DbTableInfoRow {
@NonNull private final UUID uuid;
@NonNull private final Instant createdAt;
private final Instant createdAt;
@NonNull private final String db;
@NonNull private final String dbSchema;
}
2 changes: 1 addition & 1 deletion src/main/java/marquez/db/models/DbTableVersionRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
@Builder
public final class DbTableVersionRow {
@NonNull private final UUID uuid;
@NonNull private final Instant createdAt;
private final Instant createdAt;
@NonNull private final UUID datasetUuid;
@NonNull private final UUID dbTableInfoUuid;
@NonNull private final String dbTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ private DataSourceRowMapper() {}
public static DataSourceRow map(@NonNull DbTableVersion dbTableVersion) {
return DataSourceRow.builder()
.uuid(UUID.randomUUID())
.dataSource(dbTableVersion.getConnectionUrl().getDataSource().getValue())
.name(dbTableVersion.getConnectionUrl().getDataSource().getValue())
.connectionUrl(dbTableVersion.getConnectionUrl().getRawValue())
.build();
}
Expand Down
22 changes: 22 additions & 0 deletions src/main/resources/db/migration/V9__fix_datasources.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
ALTER TABLE datasources ADD COLUMN name VARCHAR(64) NOT NULL;
ALTER TABLE datasources ADD COLUMN connection_url VARCHAR(128) NOT NULL;
ALTER TABLE datasources DROP COLUMN type;
ALTER TABLE datasources DROP COLUMN name;
ALTER TABLE datasources ADD COLUMN name VARCHAR(64) NOT NULL;
ALTER TABLE datasets DROP COLUMN name;

CREATE TABLE db_table_infos(
uuid UUID PRIMARY KEY,
db_name VARCHAR(64) NOT NULL,
db_schema_name VARCHAR(64) NOT NULL
);

ALTER TABLE db_table_versions ADD COLUMN db_table_info_uuid UUID
REFERENCES db_table_infos(uuid) NOT NULL;

ALTER TABLE db_table_versions ADD COLUMN db_table_name VARCHAR(64) NOT NULL;

ALTER TABLE db_table_versions ALTER COLUMN description DROP NOT NULL;

ALTER TABLE datasets DROP COLUMN current_version;
ALTER TABLE datasets ADD COLUMN current_version_uuid UUID;
56 changes: 56 additions & 0 deletions src/test/java/marquez/db/DatasetDaoTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package marquez.db;

import static org.junit.Assert.assertEquals;

import marquez.common.models.NamespaceName;
import marquez.db.models.DataSourceRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.DbTableInfoRow;
import marquez.db.models.DbTableVersionRow;
import marquez.service.models.Generator;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.sqlobject.SqlObjectPlugin;
import org.jdbi.v3.testing.JdbiRule;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DatasetDaoTest {
static final Logger logger = LoggerFactory.getLogger(DatasetDaoTest.class);
private DatasetDao datasetDAO;
private marquez.service.models.Namespace namespace = Generator.genNamespace();
private NamespaceDao namespaceDAO;

@Rule
public final JdbiRule dbRule =
JdbiRule.embeddedPostgres().withPlugin(new SqlObjectPlugin()).migrateWithFlyway();

@Before
public void setup() {
Jdbi jdbi = dbRule.getJdbi();
datasetDAO = jdbi.onDemand(DatasetDao.class);
namespaceDAO = jdbi.onDemand(NamespaceDao.class);
namespaceDAO.insert(namespace);
}

private void insertRandomDataset() {
DataSourceRow dataSourceRow = Generator.genDataSourceRow();
DatasetRow datasetRow = Generator.genDatasetRow(namespace.getGuid(), dataSourceRow.getUuid());
DbTableInfoRow dbTableInfoRow = Generator.genDbTableInfowRow();
DbTableVersionRow dbTableVersionRow =
Generator.genDbTableVersionRow(datasetRow.getUuid(), dbTableInfoRow.getUuid());
datasetDAO.insertAll(dataSourceRow, datasetRow, dbTableInfoRow, dbTableVersionRow);
}

@Test
public void testFindAll() throws Exception {
assertEquals(
0, datasetDAO.findAll(NamespaceName.fromString(namespace.getName()), 10, 0).size());
insertRandomDataset();
insertRandomDataset();
assertEquals(
2, datasetDAO.findAll(NamespaceName.fromString(namespace.getName()), 10, 0).size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void testMapDbTableVersion() {
final DataSourceRow dataSourceRow = DataSourceRowMapper.map(dbTableVersion);
assertNotNull(dataSourceRow);
assertNotNull(dataSourceRow.getUuid());
assertEquals(DATA_SOURCE.getValue(), dataSourceRow.getDataSource());
assertEquals(DATA_SOURCE.getValue(), dataSourceRow.getName());
assertEquals(CONNECTION_URL.getRawValue(), dataSourceRow.getConnectionUrl());
assertEquals(nonEmptyDescription, dbTableVersion.getDescription());
}
Expand All @@ -45,7 +45,7 @@ public void testMapDbTableVersionNoDescription() {
final DataSourceRow dataSourceRow = DataSourceRowMapper.map(dbTableVersion);
assertNotNull(dataSourceRow);
assertNotNull(dataSourceRow.getUuid());
assertEquals(DATA_SOURCE.getValue(), dataSourceRow.getDataSource());
assertEquals(DATA_SOURCE.getValue(), dataSourceRow.getName());
assertEquals(CONNECTION_URL.getRawValue(), dataSourceRow.getConnectionUrl());
assertEquals(noDescription, dbTableVersion.getDescription());
}
Expand Down
51 changes: 51 additions & 0 deletions src/test/java/marquez/service/models/Generator.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
import java.util.Date;
import java.util.Random;
import java.util.UUID;
import marquez.common.models.DatasetUrn;
import marquez.db.models.DataSourceRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.DbTableInfoRow;
import marquez.db.models.DbTableVersionRow;

public class Generator {
private static Random rand = new Random();
Expand All @@ -17,6 +22,7 @@ private static String randUrn() {
return String.format("urn:rand:%d.%d.%d", randNum(), randNum(), randNum());
}

// Jobs
public static Job genJob() {
return genJob(UUID.randomUUID());
}
Expand Down Expand Up @@ -45,6 +51,7 @@ public static Job cloneJob(Job job) {
job.getCreatedAt());
}

// Job Runs
public static JobRun genJobRun() {
return new JobRun(
UUID.randomUUID(),
Expand All @@ -69,6 +76,7 @@ public static JobRun cloneJobRun(JobRun j) {
null);
}

// Job Run States
public static JobRunState genJobRunState() {
return new JobRunState(
UUID.randomUUID(),
Expand All @@ -82,6 +90,7 @@ public static JobRunState cloneJobRunState(JobRunState jrs) {
jrs.getGuid(), jrs.getTransitionedAt(), jrs.getJobRunGuid(), jrs.getState());
}

// Job Versions
public static JobVersion genJobVersion() {
return new JobVersion(
UUID.randomUUID(),
Expand Down Expand Up @@ -114,6 +123,7 @@ public static JobVersion cloneJobVersion(JobVersion jv) {
jv.getUpdatedAt());
}

// Namespaces
public static Namespace genNamespace() {
int nsNum = randNum();
return new Namespace(UUID.randomUUID(), "ns" + nsNum, "ns owner" + nsNum, "ns desc" + nsNum);
Expand All @@ -123,11 +133,52 @@ public static Namespace cloneNamespace(Namespace n) {
return new Namespace(n.getGuid(), n.getName(), n.getOwnerName(), n.getDescription());
}

// Run Args
public static RunArgs genRunArgs() {
return new RunArgs("abc123", "{'foo': 1}", null);
}

public static RunArgs cloneRunArgs(RunArgs ra) {
return new RunArgs(ra.getHexDigest(), ra.getJson(), ra.getCreatedAt());
}

// Data Source Rows
public static DataSourceRow genDataSourceRow() {
int dataSourceNum = randNum();
return DataSourceRow.builder()
.uuid(UUID.randomUUID())
.name("Data Source" + dataSourceNum)
.connectionUrl("conn://" + dataSourceNum)
.build();
}

// Dataset Rows
public static DatasetRow genDatasetRow(UUID namespaceID, UUID dataSourceID) {
return DatasetRow.builder()
.uuid(UUID.randomUUID())
.namespaceUuid(namespaceID)
.dataSourceUuid(dataSourceID)
.description("dataset " + randNum())
.urn(DatasetUrn.fromString(randUrn()).toString())
.build();
}

// DbTableInfo Rows
public static DbTableInfoRow genDbTableInfowRow() {
return DbTableInfoRow.builder()
.uuid(UUID.randomUUID())
.db("db" + randNum())
.dbSchema("schema" + randNum())
.build();
}

// DbTableVersion Rows
public static DbTableVersionRow genDbTableVersionRow(UUID datasetUuid, UUID dbTableInfoUuid) {
return DbTableVersionRow.builder()
.uuid(UUID.randomUUID())
.datasetUuid(datasetUuid)
.dbTableInfoUuid(dbTableInfoUuid)
.dbTable("table " + randNum())
.build();
}
}

0 comments on commit 9aeeafa

Please sign in to comment.