Skip to content

Commit

Permalink
Validate source/destination definition ids before create connection -…
Browse files Browse the repository at this point in the history
… issue 144 (airbytehq#4115)

* ConnectionsHandler checks that sourceDefinitionId and destinationDefinitionId are valid before creating a connector.

* Whitespace fixes

Co-authored-by: Jenny Brown <[email protected]>
  • Loading branch information
airbyte-jenny and jennybrown8 authored Jun 18, 2021
1 parent 397aaa9 commit 39d4221
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.config.persistence;

import io.airbyte.config.ConfigSchema;
import java.util.UUID;

public class ConfigNotFoundException extends Exception {

Expand All @@ -37,6 +38,10 @@ public ConfigNotFoundException(ConfigSchema type, String configId) {
this.configId = configId;
}

public ConfigNotFoundException(ConfigSchema type, UUID uuid) {
this(type, uuid.toString());
}

public ConfigSchema getType() {
return type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ public ConnectionRead createConnection(ConnectionCreate connectionCreate) throws
standardSync.withManual(true);
}

// Validate source and destination
configRepository.getSourceConnection(connectionCreate.getSourceId());
configRepository.getDestinationConnection(connectionCreate.getDestinationId());

configRepository.writeStandardSync(standardSync);

trackNewConnection(standardSync);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@

package io.airbyte.server.handlers;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand All @@ -45,12 +44,7 @@
import io.airbyte.api.model.SyncMode;
import io.airbyte.api.model.WorkspaceIdRequestBody;
import io.airbyte.commons.enums.Enums;
import io.airbyte.config.DataType;
import io.airbyte.config.Schedule;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.*;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
Expand Down Expand Up @@ -120,6 +114,59 @@ void testCreateConnection() throws JsonValidationException, ConfigNotFoundExcept
verify(configRepository).writeStandardSync(standardSync);
}

@Test
void testCreateConnectionWithBadDefinitionIds() throws JsonValidationException, ConfigNotFoundException, IOException {
when(uuidGenerator.get()).thenReturn(standardSync.getConnectionId());
UUID sourceDefinitionIdBad = UUID.randomUUID();
UUID destinationDefinitionIdBad = UUID.randomUUID();

final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition()
.withName("source-test")
.withSourceDefinitionId(UUID.randomUUID());
final StandardDestinationDefinition destinationDefinition = new StandardDestinationDefinition()
.withName("destination-test")
.withDestinationDefinitionId(UUID.randomUUID());
when(configRepository.getStandardSync(standardSync.getConnectionId())).thenReturn(standardSync);
when(configRepository.getSourceDefinitionFromConnection(standardSync.getConnectionId())).thenReturn(sourceDefinition);
when(configRepository.getDestinationDefinitionFromConnection(standardSync.getConnectionId())).thenReturn(destinationDefinition);

when(configRepository.getSourceConnection(sourceDefinitionIdBad))
.thenThrow(new ConfigNotFoundException(ConfigSchema.SOURCE_CONNECTION, sourceDefinitionIdBad));
when(configRepository.getDestinationConnection(destinationDefinitionIdBad))
.thenThrow(new ConfigNotFoundException(ConfigSchema.DESTINATION_CONNECTION, destinationDefinitionIdBad));

final AirbyteCatalog catalog = ConnectionHelpers.generateBasicApiCatalog();

final ConnectionCreate connectionCreateBadSource = new ConnectionCreate()
.sourceId(sourceDefinitionIdBad)
.destinationId(standardSync.getDestinationId())
.operationIds(standardSync.getOperationIds())
.name("presto to hudi")
.namespaceDefinition(NamespaceDefinitionType.SOURCE)
.namespaceFormat(null)
.prefix("presto_to_hudi")
.status(ConnectionStatus.ACTIVE)
.schedule(ConnectionHelpers.generateBasicConnectionSchedule())
.syncCatalog(catalog);

assertThrows(ConfigNotFoundException.class, () -> connectionsHandler.createConnection(connectionCreateBadSource));

final ConnectionCreate connectionCreateBadDestination = new ConnectionCreate()
.sourceId(standardSync.getSourceId())
.destinationId(destinationDefinitionIdBad)
.operationIds(standardSync.getOperationIds())
.name("presto to hudi")
.namespaceDefinition(NamespaceDefinitionType.SOURCE)
.namespaceFormat(null)
.prefix("presto_to_hudi")
.status(ConnectionStatus.ACTIVE)
.schedule(ConnectionHelpers.generateBasicConnectionSchedule())
.syncCatalog(catalog);

assertThrows(ConfigNotFoundException.class, () -> connectionsHandler.createConnection(connectionCreateBadDestination));

}

@Test
void testUpdateConnection() throws JsonValidationException, ConfigNotFoundException, IOException {
final AirbyteCatalog catalog = ConnectionHelpers.generateBasicApiCatalog();
Expand Down

0 comments on commit 39d4221

Please sign in to comment.