diff --git a/app/src/main/java/io/apicurio/registry/storage/RegistryStorage.java b/app/src/main/java/io/apicurio/registry/storage/RegistryStorage.java index 6a3a001617..9b99b8dd42 100644 --- a/app/src/main/java/io/apicurio/registry/storage/RegistryStorage.java +++ b/app/src/main/java/io/apicurio/registry/storage/RegistryStorage.java @@ -870,6 +870,9 @@ CommentDto createArtifactVersionCommentRaw(String groupId, String artifactId, St void importArtifactRule(ArtifactRuleEntity entity); + void importArtifactBranch(ArtifactVersionBranchEntity entity); + + boolean isContentExists(String contentHash) throws RegistryStorageException; diff --git a/app/src/main/java/io/apicurio/registry/storage/decorator/ReadOnlyRegistryStorageDecorator.java b/app/src/main/java/io/apicurio/registry/storage/decorator/ReadOnlyRegistryStorageDecorator.java index e50f951371..d38432d8fe 100644 --- a/app/src/main/java/io/apicurio/registry/storage/decorator/ReadOnlyRegistryStorageDecorator.java +++ b/app/src/main/java/io/apicurio/registry/storage/decorator/ReadOnlyRegistryStorageDecorator.java @@ -422,6 +422,13 @@ public void importArtifactRule(ArtifactRuleEntity entity) { } + @Override + public void importArtifactBranch(ArtifactVersionBranchEntity entity) { + checkReadOnly(); + delegate.importArtifactBranch(entity); + } + + @Override public void updateContentCanonicalHash(String newCanonicalHash, long contentId, String contentHash) { checkReadOnly(); diff --git a/app/src/main/java/io/apicurio/registry/storage/decorator/RegistryStorageDecoratorBase.java b/app/src/main/java/io/apicurio/registry/storage/decorator/RegistryStorageDecoratorBase.java index ae137d31d2..2d6ba638bd 100644 --- a/app/src/main/java/io/apicurio/registry/storage/decorator/RegistryStorageDecoratorBase.java +++ b/app/src/main/java/io/apicurio/registry/storage/decorator/RegistryStorageDecoratorBase.java @@ -338,6 +338,12 @@ public void importArtifactRule(ArtifactRuleEntity entity) { } + @Override + public void importArtifactBranch(ArtifactVersionBranchEntity entity) { + delegate.importArtifactBranch(entity); + } + + @Override public void updateContentCanonicalHash(String newCanonicalHash, long contentId, String contentHash) { delegate.updateContentCanonicalHash(newCanonicalHash, contentId, contentHash); diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/gitops/AbstractReadOnlyRegistryStorage.java b/app/src/main/java/io/apicurio/registry/storage/impl/gitops/AbstractReadOnlyRegistryStorage.java index 5a5521cce3..6886330774 100644 --- a/app/src/main/java/io/apicurio/registry/storage/impl/gitops/AbstractReadOnlyRegistryStorage.java +++ b/app/src/main/java/io/apicurio/registry/storage/impl/gitops/AbstractReadOnlyRegistryStorage.java @@ -340,6 +340,12 @@ public void importArtifactRule(ArtifactRuleEntity entity) { } + @Override + public void importArtifactBranch(ArtifactVersionBranchEntity entity) { + readOnlyViolation(); + } + + @Override public void updateContentCanonicalHash(String newCanonicalHash, long contentId, String contentHash) { readOnlyViolation(); diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlRegistryStorage.java b/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlRegistryStorage.java index 226ba1cfcf..10638036cc 100644 --- a/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlRegistryStorage.java +++ b/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlRegistryStorage.java @@ -812,6 +812,12 @@ public void importGroup(GroupEntity entity) { } + @Override + public void importArtifactBranch(ArtifactVersionBranchEntity entity) { + submitter.submitBranchImport(entity); + } + + @Override public void resetContentId() { UUID reqId = ConcurrentUtil.get(submitter.submitGlobalId(ActionType.RESET)); diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlSubmitter.java b/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlSubmitter.java index 0af2600142..26a7b2caca 100644 --- a/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlSubmitter.java +++ b/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlSubmitter.java @@ -13,6 +13,7 @@ import io.apicurio.registry.storage.impl.kafkasql.values.*; import io.apicurio.registry.types.ArtifactState; import io.apicurio.registry.types.RuleType; +import io.apicurio.registry.utils.impexp.ArtifactVersionBranchEntity; import io.apicurio.registry.utils.kafka.ProducerActions; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; @@ -260,12 +261,17 @@ public CompletableFuture submitConfigProperty(String propertyName, ActionT * ****************************************************************************************** */ public CompletableFuture submitBranch(ActionType action, GAV gav, BranchId branchId) { var key = ArtifactBranchKey.create(gav.getRawGroupId(), gav.getRawArtifactId(), branchId.getRawBranchId()); - var value = ArtifactBranchValue.create(action, gav.getRawVersionId()); + var value = ArtifactBranchValue.create(action, gav.getRawVersionId(), null); return send(key, value); } public CompletableFuture submitBranch(ActionType action, GA ga, BranchId branchId) { var key = ArtifactBranchKey.create(ga.getRawGroupId(), ga.getRawArtifactId(), branchId.getRawBranchId()); - var value = ArtifactBranchValue.create(action, null); + var value = ArtifactBranchValue.create(action, null, null); + return send(key, value); + } + public CompletableFuture submitBranchImport(ArtifactVersionBranchEntity entity) { + var key = ArtifactBranchKey.create(entity.groupId, entity.artifactId, entity.branch); + var value = ArtifactBranchValue.create(ActionType.IMPORT, entity.version, entity.branchOrder); return send(key, value); } diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/sql/KafkaSqlSink.java b/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/sql/KafkaSqlSink.java index b02243ef15..03ee6da723 100644 --- a/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/sql/KafkaSqlSink.java +++ b/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/sql/KafkaSqlSink.java @@ -581,7 +581,14 @@ private Object processBranch(ArtifactBranchKey key, ArtifactBranchValue value) { sqlStore.deleteArtifactBranch(new GA(key.getGroupId(), key.getArtifactId()), new BranchId(key.getBranchId())); return null; case IMPORT: - // TODO + sqlStore.importArtifactBranch(ArtifactVersionBranchEntity.builder() + .groupId(key.getGroupId()) + .artifactId(key.getArtifactId()) + .branch(key.getBranchId()) + .version(value.getVersion()) + .branchOrder(value.getBranchOrder()) + .build()); + return null; default: return unsupported(key, value); } diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/values/ArtifactBranchValue.java b/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/values/ArtifactBranchValue.java index dad8299123..72e69e2e13 100644 --- a/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/values/ArtifactBranchValue.java +++ b/app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/values/ArtifactBranchValue.java @@ -15,13 +15,14 @@ public class ArtifactBranchValue extends AbstractMessageValue { - private String version; + private String version; // nullable - Not used when deleting + private Integer branchOrder; // nullable - Used for imports - - public static ArtifactBranchValue create(ActionType action, String version) { + public static ArtifactBranchValue create(ActionType action, String version, Integer branchOrder) { return ArtifactBranchValue.builder() .action(action) .version(version) + .branchOrder(branchOrder) .build(); } diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/sql/AbstractSqlRegistryStorage.java b/app/src/main/java/io/apicurio/registry/storage/impl/sql/AbstractSqlRegistryStorage.java index b3c57cd6f8..730782be8e 100644 --- a/app/src/main/java/io/apicurio/registry/storage/impl/sql/AbstractSqlRegistryStorage.java +++ b/app/src/main/java/io/apicurio/registry/storage/impl/sql/AbstractSqlRegistryStorage.java @@ -2242,6 +2242,20 @@ public void exportData(Function handler) throws RegistryStorageExc return null; }); + // Export all artifact branches + ///////////////////////////////// + handles.withHandle(handle -> { + Stream stream = handle.createQuery(sqlStatements.exportArtifactBranches()) + .setFetchSize(50) + .map(ArtifactVersionBranchEntityMapper.instance) + .stream(); + // Process and then close the stream. + try (stream) { + stream.forEach(handler::apply); + } + return null; + }); + // Export all artifact rules ///////////////////////////////// handles.withHandle(handle -> { @@ -3441,6 +3455,33 @@ public void deleteArtifactBranch(GA ga, BranchId branchId) { } + @Override + @Transactional + public void importArtifactBranch(ArtifactVersionBranchEntity entity) { + var gav = entity.toGAV(); + var branchId = entity.toBranchId(); + if (doesArtifactBranchContainVersion(gav, entity.toBranchId())) { + throw new BranchVersionAlreadyExistsException(gav, branchId); + } + handles.withHandleNoException(handle -> { + try { + handle.createUpdate(sqlStatements.importArtifactBranch()) + .bind(0, gav.getRawGroupId()) + .bind(1, gav.getRawArtifactId()) + .bind(2, branchId.getRawBranchId()) + .bind(3, entity.branchOrder) + .bind(4, gav.getRawVersionId()) + .execute(); + } catch (Exception ex) { + if (sqlStatements.isForeignKeyViolation(ex)) { + throw new VersionNotFoundException(gav, ex); + } + throw ex; + } + }); + } + + private static boolean hasContentFilter(Set filters) { for (SearchFilter searchFilter : filters) { if (searchFilter.getType() == SearchFilterType.contentHash || searchFilter.getType() == SearchFilterType.canonicalHash) { diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/sql/CommonSqlStatements.java b/app/src/main/java/io/apicurio/registry/storage/impl/sql/CommonSqlStatements.java index a26edd7f2e..8b1f9eca1d 100644 --- a/app/src/main/java/io/apicurio/registry/storage/impl/sql/CommonSqlStatements.java +++ b/app/src/main/java/io/apicurio/registry/storage/impl/sql/CommonSqlStatements.java @@ -717,6 +717,13 @@ public String exportGroups() { return "SELECT * FROM groups g "; } + + @Override + public String exportArtifactBranches() { + return "SELECT * FROM artifact_version_branches avb"; + } + + /** * @see io.apicurio.registry.storage.impl.sql.SqlStatements#importArtifactRule() */ @@ -1105,4 +1112,11 @@ public String selectVersionsWithoutBranch() { "LEFT JOIN artifact_version_branches avb ON v.groupId = avb.groupId AND v.artifactId = avb.artifactId AND v.version = avb.version " + "WHERE v.groupId = ? AND v.artifactId = ? AND avb.branch IS NULL"; } + + + @Override + public String importArtifactBranch() { + return "INSERT INTO artifact_version_branches (groupId, artifactId, branch, branchOrder, version) " + + "VALUES(?, ?, ?, ?, ?)"; + } } diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlStatements.java b/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlStatements.java index 7b880f1bb2..fa0150f7b1 100644 --- a/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlStatements.java +++ b/app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlStatements.java @@ -485,6 +485,8 @@ public interface SqlStatements { public String exportArtifactVersions(); + String exportArtifactBranches(); + /* * The next few statements support importing data into the DB. */ @@ -499,6 +501,8 @@ public interface SqlStatements { public String importArtifactVersion(); + String importArtifactBranch(); + public String selectMaxContentId(); public String selectMaxGlobalId(); diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/sql/mappers/ArtifactVersionBranchEntityMapper.java b/app/src/main/java/io/apicurio/registry/storage/impl/sql/mappers/ArtifactVersionBranchEntityMapper.java new file mode 100644 index 0000000000..3108bda87e --- /dev/null +++ b/app/src/main/java/io/apicurio/registry/storage/impl/sql/mappers/ArtifactVersionBranchEntityMapper.java @@ -0,0 +1,29 @@ +package io.apicurio.registry.storage.impl.sql.mappers; + +import io.apicurio.registry.storage.impl.sql.SqlUtil; +import io.apicurio.registry.storage.impl.sql.jdb.RowMapper; +import io.apicurio.registry.utils.impexp.ArtifactVersionBranchEntity; + +import java.sql.ResultSet; +import java.sql.SQLException; + +public class ArtifactVersionBranchEntityMapper implements RowMapper { + + public static final ArtifactVersionBranchEntityMapper instance = new ArtifactVersionBranchEntityMapper(); + + + private ArtifactVersionBranchEntityMapper() { + } + + + @Override + public ArtifactVersionBranchEntity map(ResultSet rs) throws SQLException { + return ArtifactVersionBranchEntity.builder() + .groupId(SqlUtil.denormalizeGroupId(rs.getString("groupId"))) + .artifactId(rs.getString("artifactId")) + .branch(rs.getString("branch")) + .branchOrder(rs.getInt("branchOrder")) + .version(rs.getString("version")) + .build(); + } +} diff --git a/app/src/main/java/io/apicurio/registry/storage/importing/AbstractDataImporter.java b/app/src/main/java/io/apicurio/registry/storage/importing/AbstractDataImporter.java index fee982f48f..1178163e5b 100644 --- a/app/src/main/java/io/apicurio/registry/storage/importing/AbstractDataImporter.java +++ b/app/src/main/java/io/apicurio/registry/storage/importing/AbstractDataImporter.java @@ -36,6 +36,9 @@ public void importEntity(Entity entity) { case Comment: importComment((CommentEntity) entity); break; + case ArtifactVersionBranch: + importArtifactBranch((ArtifactVersionBranchEntity) entity); + break; case Manifest: ManifestEntity manifest = (ManifestEntity) entity; log.info("---------- Import Info ----------"); @@ -63,4 +66,6 @@ public void importEntity(Entity entity) { protected abstract void importGlobalRule(GlobalRuleEntity entity); protected abstract void importGroup(GroupEntity entity); + + protected abstract void importArtifactBranch(ArtifactVersionBranchEntity entity); } diff --git a/app/src/main/java/io/apicurio/registry/storage/importing/SqlDataImporter.java b/app/src/main/java/io/apicurio/registry/storage/importing/SqlDataImporter.java index 7aabfb7405..f276cb7fc8 100644 --- a/app/src/main/java/io/apicurio/registry/storage/importing/SqlDataImporter.java +++ b/app/src/main/java/io/apicurio/registry/storage/importing/SqlDataImporter.java @@ -1,6 +1,7 @@ package io.apicurio.registry.storage.importing; import io.apicurio.registry.content.ContentHandle; +import io.apicurio.registry.model.GAV; import io.apicurio.registry.storage.RegistryStorage; import io.apicurio.registry.storage.dto.ArtifactReferenceDto; import io.apicurio.registry.storage.error.VersionAlreadyExistsException; @@ -13,10 +14,7 @@ import org.slf4j.Logger; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; @@ -40,6 +38,9 @@ public class SqlDataImporter extends AbstractDataImporter { protected final Map globalIdMapping = new HashMap<>(); protected final Map contentIdMapping = new HashMap<>(); + // To keep track of which versions have been imported + private final Set gavDone = new HashSet<>(); + private final Map> branchesWaitingForVersion = new HashMap<>(); public SqlDataImporter(Logger logger, RegistryStorageContentUtils utils, RegistryStorage storage, boolean preserveGlobalId, boolean preserveContentId) { @@ -83,6 +84,8 @@ public void importArtifactVersion(ArtifactVersionEntity entity) { storage.importArtifactVersion(entity); log.debug("Artifact version imported successfully: {}", entity); globalIdMapping.put(oldGlobalId, entity.globalId); + var gav = new GAV(entity.groupId, entity.artifactId, entity.version); + gavDone.add(gav); // Import comments that were waiting for this version var commentsToImport = waitingForVersion.stream() @@ -93,6 +96,11 @@ public void importArtifactVersion(ArtifactVersionEntity entity) { } waitingForVersion.removeAll(commentsToImport); + // Import branches waiting for version + branchesWaitingForVersion.computeIfAbsent(gav, _ignored -> List.of()) + .forEach(this::importEntity); + branchesWaitingForVersion.remove(gav); + } catch (VersionAlreadyExistsException ex) { if (ex.getGlobalId() != null) { log.warn("Duplicate globalId {} detected, skipping import of artifact version: {}", ex.getGlobalId(), entity); @@ -186,6 +194,23 @@ public void importComment(CommentEntity entity) { } + @Override + protected void importArtifactBranch(ArtifactVersionBranchEntity entity) { + try { + var gav = entity.toGAV(); + if (!gavDone.contains(gav)) { + // The version hasn't been imported yet. Need to wait for it. + branchesWaitingForVersion.computeIfAbsent(gav, _ignored -> new ArrayList<>()) + .add(entity); + } else { + storage.importArtifactBranch(entity); + log.debug("Artifact branch imported successfully: {}", entity); + } + } catch (Exception ex) { + log.warn("Failed to import artifact branch {}: {}", entity, ex.getMessage()); + } + } + /** * WARNING: Must be executed within a transaction! */ diff --git a/utils/importexport/src/main/java/io/apicurio/registry/utils/impexp/ArtifactVersionBranchEntity.java b/utils/importexport/src/main/java/io/apicurio/registry/utils/impexp/ArtifactVersionBranchEntity.java new file mode 100644 index 0000000000..e795c53a22 --- /dev/null +++ b/utils/importexport/src/main/java/io/apicurio/registry/utils/impexp/ArtifactVersionBranchEntity.java @@ -0,0 +1,41 @@ +package io.apicurio.registry.utils.impexp; + +import io.apicurio.registry.model.BranchId; +import io.apicurio.registry.model.GAV; +import io.quarkus.runtime.annotations.RegisterForReflection; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.NoArgsConstructor; +import lombok.ToString; + +import static lombok.AccessLevel.PRIVATE; + +@Builder +@NoArgsConstructor +@AllArgsConstructor(access = PRIVATE) +@ToString +@RegisterForReflection +public class ArtifactVersionBranchEntity extends Entity { + + public String groupId; + public String artifactId; + public String version; + public String branch; + public int branchOrder; + + + public GAV toGAV() { + return new GAV(groupId, artifactId, version); + } + + + public BranchId toBranchId() { + return new BranchId(branch); + } + + + @Override + public EntityType getEntityType() { + return EntityType.ArtifactVersionBranch; + } +} diff --git a/utils/importexport/src/main/java/io/apicurio/registry/utils/impexp/EntityReader.java b/utils/importexport/src/main/java/io/apicurio/registry/utils/impexp/EntityReader.java index 303980179a..f21b66098c 100644 --- a/utils/importexport/src/main/java/io/apicurio/registry/utils/impexp/EntityReader.java +++ b/utils/importexport/src/main/java/io/apicurio/registry/utils/impexp/EntityReader.java @@ -48,6 +48,8 @@ public Entity readEntity() throws IOException { return readGroup(entry); case Comment: return readComment(entry); + case ArtifactVersionBranch: + return readArtifactVersionBranch(entry); case Manifest: return readManifest(entry); } @@ -94,6 +96,10 @@ private CommentEntity readComment(ZipEntry entry) throws IOException { return this.readEntry(entry, CommentEntity.class); } + private ArtifactVersionBranchEntity readArtifactVersionBranch(ZipEntry entry) throws IOException { + return this.readEntry(entry, ArtifactVersionBranchEntity.class); + } + private GlobalRuleEntity readGlobalRule(ZipEntry entry) throws IOException { return this.readEntry(entry, GlobalRuleEntity.class); } diff --git a/utils/importexport/src/main/java/io/apicurio/registry/utils/impexp/EntityType.java b/utils/importexport/src/main/java/io/apicurio/registry/utils/impexp/EntityType.java index 3f27802e0e..96ed6a68f6 100644 --- a/utils/importexport/src/main/java/io/apicurio/registry/utils/impexp/EntityType.java +++ b/utils/importexport/src/main/java/io/apicurio/registry/utils/impexp/EntityType.java @@ -5,6 +5,6 @@ @RegisterForReflection public enum EntityType { - Manifest, GlobalRule, Content, Group, ArtifactVersion, ArtifactRule, Comment + Manifest, GlobalRule, Content, Group, ArtifactVersion, ArtifactRule, Comment, ArtifactVersionBranch } diff --git a/utils/importexport/src/main/java/io/apicurio/registry/utils/impexp/EntityWriter.java b/utils/importexport/src/main/java/io/apicurio/registry/utils/impexp/EntityWriter.java index 65bc5cf09f..4def458b23 100644 --- a/utils/importexport/src/main/java/io/apicurio/registry/utils/impexp/EntityWriter.java +++ b/utils/importexport/src/main/java/io/apicurio/registry/utils/impexp/EntityWriter.java @@ -51,6 +51,10 @@ public void writeEntity(Entity entity) throws IOException { break; case Comment: writeEntity((CommentEntity) entity); + break; + case ArtifactVersionBranch: + writeEntity((ArtifactVersionBranchEntity) entity); + break; case Manifest: writeEntity((ManifestEntity) entity); break; @@ -102,6 +106,11 @@ private void writeEntity(CommentEntity entity) throws IOException { write(mdEntry, entity, CommentEntity.class); } + private void writeEntity(ArtifactVersionBranchEntity entity) throws IOException { + ZipEntry mdEntry = createZipEntry(EntityType.ArtifactVersionBranch, entity.groupId, entity.artifactId, entity.branch + '-' + entity.branchOrder, "json"); + write(mdEntry, entity, ArtifactVersionBranchEntity.class); + } + private ZipEntry createZipEntry(EntityType type, String fileName, String fileExt) { return createZipEntry(type, null, null, fileName, fileExt); } @@ -113,6 +122,7 @@ private ZipEntry createZipEntry(EntityType type, String groupId, String artifact path = String.format("groups/%s/artifacts/%s/rules/%s.%s.%s", groupOrDefault(groupId), artifactId, fileName, type.name(), fileExt); break; case ArtifactVersion: + case ArtifactVersionBranch: path = String.format("groups/%s/artifacts/%s/versions/%s.%s.%s", groupOrDefault(groupId), artifactId, fileName, type.name(), fileExt); break; case Content: