Skip to content

Commit

Permalink
feat: artifact branches import/export
Browse files Browse the repository at this point in the history
  • Loading branch information
jsenko committed Jan 25, 2024
1 parent 31f58c1 commit fc32c96
Show file tree
Hide file tree
Showing 18 changed files with 228 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,9 @@ CommentDto createArtifactVersionCommentRaw(String groupId, String artifactId, St
void importArtifactRule(ArtifactRuleEntity entity);


void importArtifactBranch(ArtifactVersionBranchEntity entity);


boolean isContentExists(String contentHash) throws RegistryStorageException;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,13 @@ public void importArtifactRule(ArtifactRuleEntity entity) {
}


@Override
public void importArtifactBranch(ArtifactVersionBranchEntity entity) {
checkReadOnly();
delegate.importArtifactBranch(entity);
}


@Override
public void updateContentCanonicalHash(String newCanonicalHash, long contentId, String contentHash) {
checkReadOnly();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,12 @@ public void importArtifactRule(ArtifactRuleEntity entity) {
}


@Override
public void importArtifactBranch(ArtifactVersionBranchEntity entity) {
delegate.importArtifactBranch(entity);
}


@Override
public void updateContentCanonicalHash(String newCanonicalHash, long contentId, String contentHash) {
delegate.updateContentCanonicalHash(newCanonicalHash, contentId, contentHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,12 @@ public void importArtifactRule(ArtifactRuleEntity entity) {
}


@Override
public void importArtifactBranch(ArtifactVersionBranchEntity entity) {
readOnlyViolation();
}


@Override
public void updateContentCanonicalHash(String newCanonicalHash, long contentId, String contentHash) {
readOnlyViolation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,12 @@ public void importGroup(GroupEntity entity) {
}


@Override
public void importArtifactBranch(ArtifactVersionBranchEntity entity) {
submitter.submitBranchImport(entity);
}


@Override
public void resetContentId() {
UUID reqId = ConcurrentUtil.get(submitter.submitGlobalId(ActionType.RESET));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.apicurio.registry.storage.impl.kafkasql.values.*;
import io.apicurio.registry.types.ArtifactState;
import io.apicurio.registry.types.RuleType;
import io.apicurio.registry.utils.impexp.ArtifactVersionBranchEntity;
import io.apicurio.registry.utils.kafka.ProducerActions;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
Expand Down Expand Up @@ -260,12 +261,17 @@ public CompletableFuture<UUID> submitConfigProperty(String propertyName, ActionT
* ****************************************************************************************** */
public CompletableFuture<UUID> submitBranch(ActionType action, GAV gav, BranchId branchId) {
var key = ArtifactBranchKey.create(gav.getRawGroupId(), gav.getRawArtifactId(), branchId.getRawBranchId());
var value = ArtifactBranchValue.create(action, gav.getRawVersionId());
var value = ArtifactBranchValue.create(action, gav.getRawVersionId(), null);
return send(key, value);
}
public CompletableFuture<UUID> submitBranch(ActionType action, GA ga, BranchId branchId) {
var key = ArtifactBranchKey.create(ga.getRawGroupId(), ga.getRawArtifactId(), branchId.getRawBranchId());
var value = ArtifactBranchValue.create(action, null);
var value = ArtifactBranchValue.create(action, null, null);
return send(key, value);
}
public CompletableFuture<UUID> submitBranchImport(ArtifactVersionBranchEntity entity) {
var key = ArtifactBranchKey.create(entity.groupId, entity.artifactId, entity.branch);
var value = ArtifactBranchValue.create(ActionType.IMPORT, entity.version, entity.branchOrder);
return send(key, value);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,14 @@ private Object processBranch(ArtifactBranchKey key, ArtifactBranchValue value) {
sqlStore.deleteArtifactBranch(new GA(key.getGroupId(), key.getArtifactId()), new BranchId(key.getBranchId()));
return null;
case IMPORT:
// TODO
sqlStore.importArtifactBranch(ArtifactVersionBranchEntity.builder()
.groupId(key.getGroupId())
.artifactId(key.getArtifactId())
.branch(key.getBranchId())
.version(value.getVersion())
.branchOrder(value.getBranchOrder())
.build());
return null;
default:
return unsupported(key, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
public class ArtifactBranchValue extends AbstractMessageValue {


private String version;
private String version; // nullable - Not used when deleting
private Integer branchOrder; // nullable - Used for imports


public static ArtifactBranchValue create(ActionType action, String version) {
public static ArtifactBranchValue create(ActionType action, String version, Integer branchOrder) {
return ArtifactBranchValue.builder()
.action(action)
.version(version)
.branchOrder(branchOrder)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2242,6 +2242,20 @@ public void exportData(Function<Entity, Void> handler) throws RegistryStorageExc
return null;
});

// Export all artifact branches
/////////////////////////////////
handles.withHandle(handle -> {
Stream<ArtifactVersionBranchEntity> stream = handle.createQuery(sqlStatements.exportArtifactBranches())
.setFetchSize(50)
.map(ArtifactVersionBranchEntityMapper.instance)
.stream();
// Process and then close the stream.
try (stream) {
stream.forEach(handler::apply);
}
return null;
});

// Export all artifact rules
/////////////////////////////////
handles.withHandle(handle -> {
Expand Down Expand Up @@ -3441,6 +3455,33 @@ public void deleteArtifactBranch(GA ga, BranchId branchId) {
}


@Override
@Transactional
public void importArtifactBranch(ArtifactVersionBranchEntity entity) {
var gav = entity.toGAV();
var branchId = entity.toBranchId();
if (doesArtifactBranchContainVersion(gav, entity.toBranchId())) {
throw new BranchVersionAlreadyExistsException(gav, branchId);
}
handles.withHandleNoException(handle -> {
try {
handle.createUpdate(sqlStatements.importArtifactBranch())
.bind(0, gav.getRawGroupId())
.bind(1, gav.getRawArtifactId())
.bind(2, branchId.getRawBranchId())
.bind(3, entity.branchOrder)
.bind(4, gav.getRawVersionId())
.execute();
} catch (Exception ex) {
if (sqlStatements.isForeignKeyViolation(ex)) {
throw new VersionNotFoundException(gav, ex);
}
throw ex;
}
});
}


private static boolean hasContentFilter(Set<SearchFilter> filters) {
for (SearchFilter searchFilter : filters) {
if (searchFilter.getType() == SearchFilterType.contentHash || searchFilter.getType() == SearchFilterType.canonicalHash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,13 @@ public String exportGroups() {
return "SELECT * FROM groups g ";
}


@Override
public String exportArtifactBranches() {
return "SELECT * FROM artifact_version_branches avb";
}


/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#importArtifactRule()
*/
Expand Down Expand Up @@ -1105,4 +1112,11 @@ public String selectVersionsWithoutBranch() {
"LEFT JOIN artifact_version_branches avb ON v.groupId = avb.groupId AND v.artifactId = avb.artifactId AND v.version = avb.version " +
"WHERE v.groupId = ? AND v.artifactId = ? AND avb.branch IS NULL";
}


@Override
public String importArtifactBranch() {
return "INSERT INTO artifact_version_branches (groupId, artifactId, branch, branchOrder, version) " +
"VALUES(?, ?, ?, ?, ?)";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,8 @@ public interface SqlStatements {

public String exportArtifactVersions();

String exportArtifactBranches();

/*
* The next few statements support importing data into the DB.
*/
Expand All @@ -499,6 +501,8 @@ public interface SqlStatements {

public String importArtifactVersion();

String importArtifactBranch();

public String selectMaxContentId();

public String selectMaxGlobalId();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.apicurio.registry.storage.impl.sql.mappers;

import io.apicurio.registry.storage.impl.sql.SqlUtil;
import io.apicurio.registry.storage.impl.sql.jdb.RowMapper;
import io.apicurio.registry.utils.impexp.ArtifactVersionBranchEntity;

import java.sql.ResultSet;
import java.sql.SQLException;

public class ArtifactVersionBranchEntityMapper implements RowMapper<ArtifactVersionBranchEntity> {

public static final ArtifactVersionBranchEntityMapper instance = new ArtifactVersionBranchEntityMapper();


private ArtifactVersionBranchEntityMapper() {
}


@Override
public ArtifactVersionBranchEntity map(ResultSet rs) throws SQLException {
return ArtifactVersionBranchEntity.builder()
.groupId(SqlUtil.denormalizeGroupId(rs.getString("groupId")))
.artifactId(rs.getString("artifactId"))
.branch(rs.getString("branch"))
.branchOrder(rs.getInt("branchOrder"))
.version(rs.getString("version"))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public void importEntity(Entity entity) {
case Comment:
importComment((CommentEntity) entity);
break;
case ArtifactVersionBranch:
importArtifactBranch((ArtifactVersionBranchEntity) entity);
break;
case Manifest:
ManifestEntity manifest = (ManifestEntity) entity;
log.info("---------- Import Info ----------");
Expand Down Expand Up @@ -63,4 +66,6 @@ public void importEntity(Entity entity) {
protected abstract void importGlobalRule(GlobalRuleEntity entity);

protected abstract void importGroup(GroupEntity entity);

protected abstract void importArtifactBranch(ArtifactVersionBranchEntity entity);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.apicurio.registry.storage.importing;

import io.apicurio.registry.content.ContentHandle;
import io.apicurio.registry.model.GAV;
import io.apicurio.registry.storage.RegistryStorage;
import io.apicurio.registry.storage.dto.ArtifactReferenceDto;
import io.apicurio.registry.storage.error.VersionAlreadyExistsException;
Expand All @@ -13,10 +14,7 @@
import org.slf4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;


Expand All @@ -40,6 +38,9 @@ public class SqlDataImporter extends AbstractDataImporter {
protected final Map<Long, Long> globalIdMapping = new HashMap<>();
protected final Map<Long, Long> contentIdMapping = new HashMap<>();

// To keep track of which versions have been imported
private final Set<GAV> gavDone = new HashSet<>();
private final Map<GAV, List<ArtifactVersionBranchEntity>> branchesWaitingForVersion = new HashMap<>();

public SqlDataImporter(Logger logger, RegistryStorageContentUtils utils, RegistryStorage storage,
boolean preserveGlobalId, boolean preserveContentId) {
Expand Down Expand Up @@ -83,6 +84,8 @@ public void importArtifactVersion(ArtifactVersionEntity entity) {
storage.importArtifactVersion(entity);
log.debug("Artifact version imported successfully: {}", entity);
globalIdMapping.put(oldGlobalId, entity.globalId);
var gav = new GAV(entity.groupId, entity.artifactId, entity.version);
gavDone.add(gav);

// Import comments that were waiting for this version
var commentsToImport = waitingForVersion.stream()
Expand All @@ -93,6 +96,11 @@ public void importArtifactVersion(ArtifactVersionEntity entity) {
}
waitingForVersion.removeAll(commentsToImport);

// Import branches waiting for version
branchesWaitingForVersion.computeIfAbsent(gav, _ignored -> List.of())
.forEach(this::importEntity);
branchesWaitingForVersion.remove(gav);

} catch (VersionAlreadyExistsException ex) {
if (ex.getGlobalId() != null) {
log.warn("Duplicate globalId {} detected, skipping import of artifact version: {}", ex.getGlobalId(), entity);
Expand Down Expand Up @@ -186,6 +194,23 @@ public void importComment(CommentEntity entity) {
}


@Override
protected void importArtifactBranch(ArtifactVersionBranchEntity entity) {
try {
var gav = entity.toGAV();
if (!gavDone.contains(gav)) {
// The version hasn't been imported yet. Need to wait for it.
branchesWaitingForVersion.computeIfAbsent(gav, _ignored -> new ArrayList<>())
.add(entity);
} else {
storage.importArtifactBranch(entity);
log.debug("Artifact branch imported successfully: {}", entity);
}
} catch (Exception ex) {
log.warn("Failed to import artifact branch {}: {}", entity, ex.getMessage());
}
}

/**
* WARNING: Must be executed within a transaction!
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.apicurio.registry.utils.impexp;

import io.apicurio.registry.model.BranchId;
import io.apicurio.registry.model.GAV;
import io.quarkus.runtime.annotations.RegisterForReflection;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;
import lombok.ToString;

import static lombok.AccessLevel.PRIVATE;

@Builder
@NoArgsConstructor
@AllArgsConstructor(access = PRIVATE)
@ToString
@RegisterForReflection
public class ArtifactVersionBranchEntity extends Entity {

public String groupId;
public String artifactId;
public String version;
public String branch;
public int branchOrder;


public GAV toGAV() {
return new GAV(groupId, artifactId, version);
}


public BranchId toBranchId() {
return new BranchId(branch);
}


@Override
public EntityType getEntityType() {
return EntityType.ArtifactVersionBranch;
}
}
Loading

0 comments on commit fc32c96

Please sign in to comment.