Skip to content

Commit

Permalink
Push Schema Struct out of Worker (airbytehq#682)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Oct 22, 2020
1 parent d159571 commit e68ee48
Show file tree
Hide file tree
Showing 28 changed files with 155 additions and 182 deletions.
3 changes: 0 additions & 3 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,6 @@ components:
type: string
enum:
- full_refresh
- append
syncSchema:
$ref: "#/components/schemas/SourceSchema"
schedule:
Expand Down Expand Up @@ -1381,7 +1380,6 @@ components:
type: string
enum:
- full_refresh
- append
syncSchema:
$ref: "#/components/schemas/SourceSchema"
schedule:
Expand Down Expand Up @@ -1612,7 +1610,6 @@ components:
type: string
enum:
- full_refresh
- append
syncSchema:
$ref: "#/components/schemas/SourceSchema"
schedule:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,9 @@ title: StandardDiscoverCatalogOutput
description: describes the standard output for any discovery run.
type: object
required:
- schema
# - catalog
- catalog
additionalProperties: false
properties:
schema:
description: describes the available schemas.
"$ref": StandardDataSchema.yaml#/definitions/schema
catalog:
description: describes the available schemas.
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ properties:
name:
type: string
syncMode:
type: string
enum:
- full_refresh
- append
"$ref": SyncMode.yaml
schema:
"$ref": StandardDataSchema.yaml#/definitions/schema
status:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@ required:
- sourceConnectionImplementation
- destinationConnectionImplementation
- standardSync
- state
- connectionId
properties:
sourceConnectionImplementation:
"$ref": SourceConnectionImplementation.yaml
destinationConnectionImplementation:
"$ref": DestinationConnectionImplementation.yaml
standardSync:
"$ref": StandardSync.yaml
syncMode:
"$ref": SyncMode.yaml
catalog:
type: object
existingJavaType: io.airbyte.protocol.models.AirbyteCatalog
connectionId:
type: string
format: uuid
state:
"$ref": State.yaml
standardSyncSummary:
description:
optional state of the previous run. this object is standard for any
sync run.
"$ref": StandardSyncSummary.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,22 @@ description: StandardTapConfig
type: object
additionalProperties: false
required:
- sourceConnectionImplementation
- standardSync
- sourceConnectionConfiguration
- catalog
- syncMode
- connectionId
properties:
sourceConnectionImplementation:
"$ref": SourceConnectionImplementation.yaml
standardSync:
"$ref": StandardSync.yaml
sourceConnectionConfiguration:
description: Integration specific blob. Must be a valid JSON string.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
catalog:
type: object
existingJavaType: io.airbyte.protocol.models.AirbyteCatalog
syncMode:
"$ref": SyncMode.yaml
connectionId:
type: string
format: uuid
state:
"$ref": State.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,22 @@ description: StandardTargetConfig
type: object
additionalProperties: false
required:
- sourceConnectionImplementation
- destinationConnectionImplementation
- standardSync
- destinationConnectionConfiguration
- syncMode
- catalog
- connectionId
properties:
destinationConnectionImplementation:
"$ref": DestinationConnectionImplementation.yaml
standardSync:
"$ref": StandardSync.yaml
destinationConnectionConfiguration:
description: Integration specific blob. Must be a valid JSON string.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
syncMode:
"$ref": SyncMode.yaml
catalog:
type: object
existingJavaType: io.airbyte.protocol.models.AirbyteCatalog
connectionId:
type: string
format: uuid
state:
"$ref": State.yaml
8 changes: 8 additions & 0 deletions airbyte-config/models/src/main/resources/types/SyncMode.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/SyncMode.yaml
title: SyncMode
description: sync modes.
type: string
enum:
- full_refresh
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.DestinationConnectionImplementation;
import io.airbyte.config.Schema;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.SyncMode;
import io.airbyte.config.StandardTargetConfig;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
Expand All @@ -47,6 +46,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -177,7 +177,7 @@ public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
@ParameterizedTest
@ArgumentsSource(DataArgumentsProvider.class)
void testSync(String messagesFilename, String catalogFilename) throws Exception {
final Schema catalog = Jsons.deserialize(MoreResources.readResource(catalogFilename), Schema.class);
final AirbyteCatalog catalog = Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class);
final List<AirbyteMessage> messages = MoreResources.readResource(messagesFilename).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
runSync(messages, catalog);
Expand All @@ -190,7 +190,7 @@ void testSync(String messagesFilename, String catalogFilename) throws Exception
*/
@Test
void testSecondSync() throws Exception {
final Schema catalog = Jsons.deserialize(MoreResources.readResource("exchange_rate_catalog.json"), Schema.class);
final AirbyteCatalog catalog = Jsons.deserialize(MoreResources.readResource("exchange_rate_catalog.json"), AirbyteCatalog.class);
final List<AirbyteMessage> firstSyncMessages = MoreResources.readResource("exchange_rate_messages.txt").lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
runSync(firstSyncMessages, catalog);
Expand All @@ -208,10 +208,12 @@ void testSecondSync() throws Exception {
}

// todo (cgardens) - still uses the old schema.
private void runSync(List<AirbyteMessage> messages, Schema catalog) throws Exception {
private void runSync(List<AirbyteMessage> messages, AirbyteCatalog catalog) throws Exception {
final StandardTargetConfig targetConfig = new StandardTargetConfig()
.withDestinationConnectionImplementation(new DestinationConnectionImplementation().withConfiguration(getConfig()))
.withStandardSync(new StandardSync().withSchema(catalog));
.withConnectionId(UUID.randomUUID())
.withSyncMode(SyncMode.FULL_REFRESH)
.withCatalog(catalog)
.withDestinationConnectionConfiguration(getConfig());

final AirbyteDestination target = new DefaultAirbyteDestination(new AirbyteIntegrationLauncher(getImageName(), pbf));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,19 @@
"streams": [
{
"name": "exchange_rate",
"selected": true,
"fields": [
{
"name": "date",
"dataType": "string",
"selected": true
},
{
"name": "HKD",
"dataType": "number",
"selected": true
},
{
"name": "NZD",
"dataType": "number",
"selected": true
"json_schema": {
"properties": {
"date": {
"dataType": "string"
},
"HKD": {
"dataType": "number"
},
"NZD": {
"dataType": "number"
}
}
]
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand All @@ -33,16 +34,18 @@
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.Schema;
import io.airbyte.config.SourceConnectionImplementation;
import io.airbyte.config.StandardCheckConnectionInput;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.config.StandardDiscoverCatalogOutput;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.SyncMode;
import io.airbyte.config.StandardTapConfig;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import io.airbyte.workers.DefaultCheckConnectionWorker;
import io.airbyte.workers.DefaultDiscoverCatalogWorker;
Expand All @@ -69,18 +72,20 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.MountableFile;

@SuppressWarnings("rawtypes")
public class SingerPostgresSourceTest {

private static final Logger LOGGER = LoggerFactory.getLogger(SingerPostgresSourceTest.class);
private static final String IMAGE_NAME = "airbyte/postgres-singer-source-abprotocol:dev";
private static final Path TESTS_PATH = Path.of("/tmp/airbyte_integration_tests");

private static final AirbyteCatalog CATALOG = CatalogHelpers.createAirbyteCatalog(
"id_and_name",
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING));

private PostgreSQLContainer psqlDb;
private Path jobRoot;
private IntegrationLauncher integrationLauncher;
Expand Down Expand Up @@ -108,20 +113,10 @@ public void cleanUp() {

@Test
public void testFullRefreshStatelessRead() throws Exception {

Schema schema = Jsons.deserialize(MoreResources.readResource("schema.json"), Schema.class);

// select all streams and all fields
schema.getStreams().forEach(s -> s.setSelected(true));
schema.getStreams().forEach(t -> t.getFields().forEach(c -> c.setSelected(true)));

StandardSync syncConfig = new StandardSync().withSyncMode(StandardSync.SyncMode.FULL_REFRESH).withSchema(schema);
SourceConnectionImplementation sourceImpl =
new SourceConnectionImplementation().withConfiguration(Jsons.jsonNode(getDbConfig(psqlDb)));

StandardTapConfig tapConfig = new StandardTapConfig()
.withStandardSync(syncConfig)
.withSourceConnectionImplementation(sourceImpl);
.withCatalog(CATALOG)
.withSyncMode(SyncMode.FULL_REFRESH)
.withSourceConnectionConfiguration(Jsons.jsonNode(getDbConfig(psqlDb)));

DefaultAirbyteSource source = new DefaultAirbyteSource(integrationLauncher);
source.start(tapConfig, jobRoot);
Expand Down Expand Up @@ -161,7 +156,7 @@ public void testCanReadUtf8() throws IOException, InterruptedException, WorkerEx

String configFileName = "config.json";
String catalogFileName = "selected_catalog.json";
writeFileToJobRoot(catalogFileName, MoreResources.readResource(catalogFileName));
writeFileToJobRoot(catalogFileName, Jsons.serialize(CATALOG));
writeFileToJobRoot(configFileName, Jsons.serialize(getDbConfig(db)));

Process tapProcess = integrationLauncher.read(jobRoot, configFileName, catalogFileName).start();
Expand Down Expand Up @@ -205,10 +200,18 @@ public void testDiscover() throws IOException {
StandardDiscoverCatalogInput inputConfig = new StandardDiscoverCatalogInput().withConnectionConfiguration(Jsons.jsonNode(getDbConfig(psqlDb)));
OutputAndStatus<StandardDiscoverCatalogOutput> run = new DefaultDiscoverCatalogWorker(integrationLauncher).run(inputConfig, jobRoot);

Schema expected = Jsons.deserialize(MoreResources.readResource("schema.json"), Schema.class);
assertEquals(JobStatus.SUCCEEDED, run.getStatus());
assertTrue(run.getOutput().isPresent());
assertEquals(expected, run.getOutput().get().getSchema());
final AirbyteCatalog actualCatalog = run.getOutput().get().getCatalog();
assertEquals(CATALOG.getStreams().get(0).getName(), actualCatalog.getStreams().get(0).getName());
assertNotNull(CATALOG.getStreams().get(0).getJsonSchema().get("properties").get("id"));
assertNotNull(CATALOG.getStreams().get(0).getJsonSchema().get("properties").get("name"));
assertEquals(
Jsons.jsonNode(Lists.newArrayList("null", "integer")),
actualCatalog.getStreams().get(0).getJsonSchema().get("properties").get("id").get("type"));
assertEquals(
Jsons.jsonNode(Lists.newArrayList("null", "string")),
actualCatalog.getStreams().get(0).getJsonSchema().get("properties").get("name").get("type"));
}

@Test
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit e68ee48

Please sign in to comment.