Skip to content

Commit

Permalink
HIP-513 download sidecar files (hashgraph#4106)
Browse files Browse the repository at this point in the history
- Add SidecarFileReader and its implementation
- Add SidecarFile domain class
- Add SidecarProperties for sidecar downloading / parsing config
- Add SidecarFileRepository
- Add sidecar file downloading, verification, and parsing logic to RecordFileDownloader
- Extend StreamFilename to parse sidecar record filename
- Persist SidecarFile in RecordStreamFileListener.onEnd

Signed-off-by: Xin Li <[email protected]>
  • Loading branch information
xin-hedera authored Jul 15, 2022
1 parent 2d8c401 commit 44a4bd8
Show file tree
Hide file tree
Showing 83 changed files with 1,184 additions and 343 deletions.
3 changes: 3 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ value, it is recommended to only populate overridden properties in the custom `a
| `hedera.mirror.importer.parser.record.retry.maxBackoff` | 10s | The maximum amount of time to wait between retries |
| `hedera.mirror.importer.parser.record.retry.minBackoff` | 250ms | The minimum amount of time to wait between retries |
| `hedera.mirror.importer.parser.record.retry.multiplier` | 2 | Used to generate the next delay for backoff |
| `hedera.mirror.importer.parser.record.sidecar.enabled` | false | Whether to download and read sidecar record files |
| `hedera.mirror.importer.parser.record.sidecar.persistBytes` | false | Whether to persist the sidecar file bytes to the database |
| `hedera.mirror.importer.parser.record.sidecar.types` | [] | Which types of transaction sidecar records to process. By default it is empty to indicate all types |
| `hedera.mirror.importer.parser.record.transactionTimeout` | 30s | The timeout in seconds for a database transaction |
| `hedera.mirror.importer.parser.tempTableBufferSize` | 256 | The size of the buffer in MB to use for temporary tables |
| `hedera.mirror.importer.reconciliation.cron` | 0 0 0 * * * | When to run the balance reconciliation job. Defaults to once a day at midnight. See Spring [docs](https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#scheduling-cron-expression). |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.hederahashgraph.api.proto.java.ContractFunctionResult;
import com.hederahashgraph.api.proto.java.TransactionRecord;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import javax.persistence.Column;
import javax.persistence.Convert;
import javax.persistence.Entity;
Expand Down Expand Up @@ -120,6 +122,14 @@ public class RecordFile implements StreamFile<RecordItem> {
@ToString.Exclude
private String previousHash;

private int sidecarCount;

@Builder.Default
@EqualsAndHashCode.Exclude
@ToString.Exclude
@Transient
private List<SidecarFile> sidecars = Collections.emptyList();

private Integer size;

private int version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,25 @@
import com.hederahashgraph.api.proto.java.Transaction;
import com.hederahashgraph.api.proto.java.TransactionBody;
import com.hederahashgraph.api.proto.java.TransactionRecord;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.Value;
import lombok.experimental.NonFinal;
import lombok.extern.log4j.Log4j2;
import org.springframework.data.util.Version;

import com.hedera.mirror.common.domain.StreamItem;
import com.hedera.mirror.common.domain.entity.EntityId;
import com.hedera.mirror.common.exception.ProtobufException;
import com.hedera.mirror.common.util.DomainUtils;
import com.hedera.services.stream.proto.TransactionSidecarRecord;

@Builder(buildMethodName = "buildInternal")
@AllArgsConstructor(access = AccessLevel.PRIVATE)
Expand All @@ -62,14 +67,19 @@ public class RecordItem implements StreamItem {
private final byte[] transactionBytes;
private final byte[] recordBytes;

@Builder.Default
@NonFinal
@Setter
private List<TransactionSidecarRecord> sidecarRecords = Collections.emptyList();

@Getter(lazy = true)
private long consensusTimestamp = DomainUtils.timestampInNanosMax(record.getConsensusTimestamp());

// transactions in stream always have a valid payerAccountId
@Getter(lazy = true)
private EntityId payerAccountId = EntityId.of(getTransactionBody().getTransactionID().getAccountID());

private final Integer transactionIndex;
private final int transactionIndex;

private final RecordItem parent;

Expand All @@ -78,7 +88,7 @@ public class RecordItem implements StreamItem {
/**
* Constructs RecordItem from serialized transactionBytes and recordBytes.
*/
public RecordItem(Version hapiVersion, byte[] transactionBytes, byte[] recordBytes, Integer transactionIndex) {
public RecordItem(Version hapiVersion, byte[] transactionBytes, byte[] recordBytes, int transactionIndex) {
try {
transaction = Transaction.parseFrom(transactionBytes);
} catch (InvalidProtocolBufferException e) {
Expand Down Expand Up @@ -114,7 +124,7 @@ public RecordItem(Version hapiVersion, Transaction transaction, TransactionRecor
this.record = record;
transactionBytes = transaction.toByteArray();
recordBytes = record.toByteArray();
transactionIndex = null;
transactionIndex = 0;
parent = null;
previous = null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package com.hedera.mirror.common.domain.transaction;

/*-
* ‌
* Hedera Mirror Node
* ​
* Copyright (C) 2019 - 2022 Hedera Hashgraph, LLC
* ​
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ‍
*/


import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.io.Serial;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Enumerated;
import javax.persistence.IdClass;
import javax.persistence.Transient;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.hibernate.annotations.Type;
import org.springframework.data.domain.Persistable;

import com.hedera.mirror.common.converter.LongListToStringSerializer;
import com.hedera.mirror.common.domain.DigestAlgorithm;
import com.hedera.services.stream.proto.TransactionSidecarRecord;

@AllArgsConstructor(access = AccessLevel.PRIVATE)
@Builder(toBuilder = true)
@Data
@Entity
@IdClass(SidecarFile.Id.class)
@NoArgsConstructor
public class SidecarFile implements Persistable<SidecarFile.Id> {

@ToString.Exclude
@Transient
private byte[] actualHash;

@EqualsAndHashCode.Exclude
@ToString.Exclude
private byte[] bytes;

@javax.persistence.Id
private long consensusEnd;

private Integer count;

@Enumerated
private DigestAlgorithm hashAlgorithm;

@ToString.Exclude
private byte[] hash;

@Column(name = "id")
@javax.persistence.Id
private int index;

private String name;

@Builder.Default
@ToString.Exclude
@Transient
private List<TransactionSidecarRecord> records = Collections.emptyList();

private Integer size;

@Type(type = "com.vladmihalcea.hibernate.type.array.ListArrayType")
@JsonSerialize(using = LongListToStringSerializer.class)
private List<Integer> types = Collections.emptyList();

@JsonIgnore
@Override
public Id getId() {
var id = new Id();
id.setConsensusEnd(consensusEnd);
id.setIndex(index);
return id;
}

@JsonIgnore
@Override
public boolean isNew() {
return true; // Since we never update and use a natural ID, avoid Hibernate querying before insert
}

@Data
public static class Id implements Serializable {

@Serial
private static final long serialVersionUID = -5844173241500874821L;

private long consensusEnd;

private int index;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -102,6 +103,7 @@
import com.hedera.mirror.common.domain.transaction.NonFeeTransfer;
import com.hedera.mirror.common.domain.transaction.Prng;
import com.hedera.mirror.common.domain.transaction.RecordFile;
import com.hedera.mirror.common.domain.transaction.SidecarFile;
import com.hedera.mirror.common.domain.transaction.StakingRewardTransfer;
import com.hedera.mirror.common.domain.transaction.Transaction;
import com.hedera.mirror.common.domain.transaction.TransactionSignature;
Expand Down Expand Up @@ -592,10 +594,12 @@ public DomainWrapper<RecordFile, RecordFile.RecordFileBuilder> recordFile() {
transactionIndex.set(0);

long timestamp = timestamp();
long consensusEnd = timestamp + 1;
var instantString = now.toString().replace(':', '_');
var builder = RecordFile.builder()
.bytes(bytes(128))
.consensusStart(timestamp)
.consensusEnd(timestamp + 1)
.consensusEnd(consensusEnd)
.count(1L)
.digestAlgorithm(DigestAlgorithm.SHA_384)
.fileHash(text(96))
Expand All @@ -608,9 +612,13 @@ public DomainWrapper<RecordFile, RecordFile.RecordFileBuilder> recordFile() {
.logsBloom(bloomFilter())
.loadEnd(now.plusSeconds(1).getEpochSecond())
.loadStart(now.getEpochSecond())
.name(now.toString().replace(':', '_') + ".rcd")
.name(instantString + ".rcd.gz")
.nodeAccountId(entityId(ACCOUNT))
.previousHash(text(96))
.sidecarCount(1)
.sidecars(List.of(sidecarFile()
.customize(s -> s.consensusEnd(consensusEnd).name(instantString + "_01.rcd.gz"))
.get()))
.size(256 * 1024)
.version(6);
return new DomainWrapperImpl<>(builder, builder::build);
Expand All @@ -628,6 +636,21 @@ public DomainWrapper<Schedule, Schedule.ScheduleBuilder> schedule() {
return new DomainWrapperImpl<>(builder, builder::build);
}

public DomainWrapper<SidecarFile, SidecarFile.SidecarFileBuilder> sidecarFile() {
var data = bytes(256);
var builder = SidecarFile.builder()
.bytes(data)
.consensusEnd(timestamp())
.hash(bytes(DigestAlgorithm.SHA_384.getSize()))
.hashAlgorithm(DigestAlgorithm.SHA_384)
.index(1)
.name(now.toString().replace(':', '_') + "_01.rcd.gz")
.records(Collections.emptyList())
.size(data.length)
.types(List.of(1, 2));
return new DomainWrapperImpl<>(builder, builder::build);
}

public DomainWrapper<StakingRewardTransfer, StakingRewardTransfer.StakingRewardTransferBuilder> stakingRewardTransfer() {
var builder = StakingRewardTransfer.builder()
.accountId(entityId(ACCOUNT).getId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ void unknownTransactionType() throws Exception {
}

private void testException(byte[] transactionBytes, byte[] recordBytes, String expectedMessage) {
assertThatThrownBy(() -> new RecordItem(DEFAULT_HAPI_VERSION, transactionBytes, recordBytes, null))
assertThatThrownBy(() -> new RecordItem(DEFAULT_HAPI_VERSION, transactionBytes, recordBytes, 0))
.isInstanceOf(ProtobufException.class)
.hasMessage(expectedMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import com.hedera.mirror.common.domain.contract.Contract;
import com.hedera.mirror.common.domain.contract.ContractLog;
import com.hedera.mirror.common.domain.contract.ContractResult;
import com.hedera.mirror.common.domain.contract.ContractStateChange;
import com.hedera.mirror.common.domain.entity.EntityId;
import com.hedera.mirror.common.domain.transaction.RecordFile;
import com.hedera.mirror.common.domain.transaction.RecordItem;
Expand Down Expand Up @@ -129,7 +128,6 @@ private void processContractResult(RecordItem recordItem, EntityId contractEntit
}

processContractLogs(functionResult, contractResult);
processContractStateChanges(functionResult, contractResult);
}

entityListener.onContractResult(contractResult);
Expand All @@ -155,30 +153,6 @@ private void processContractLogs(ContractFunctionResult functionResult, Contract
}
}

private void processContractStateChanges(ContractFunctionResult functionResult, ContractResult contractResult) {

for (var stateChange : functionResult.getStateChangesList()) {
var contractId = lookup(contractResult.getContractId(), stateChange.getContractID());
for (var storageChange : stateChange.getStorageChangesList()) {
ContractStateChange contractStateChange = new ContractStateChange();
contractStateChange.setConsensusTimestamp(contractResult.getConsensusTimestamp());
contractStateChange.setContractId(contractId);
contractStateChange.setPayerAccountId(contractResult.getPayerAccountId());
contractStateChange.setSlot(DomainUtils.toBytes(storageChange.getSlot()));
contractStateChange.setValueRead(DomainUtils.toBytes(storageChange.getValueRead()));

// If a value of zero is written the valueWritten will be present but the inner value will be
// absent. If a value was read and not written this value will not be present.
if (storageChange.hasValueWritten()) {
contractStateChange
.setValueWritten(DomainUtils.toBytes(storageChange.getValueWritten().getValue()));
}

entityListener.onContractStateChange(contractStateChange);
}
}
}

@SuppressWarnings("deprecation")
private List<Long> getCreatedContractIds(ContractFunctionResult functionResult, RecordItem recordItem,
EntityId parentEntityContractId) {
Expand Down Expand Up @@ -218,7 +192,7 @@ private void processCreatedContractEntity(RecordItem recordItem, EntityId contra
* contract creation is externalized into its own synthesized contract create transaction and should be processed by
* ContractCreateTransactionHandler.
*
* @param contract The contract entity
* @param contract The contract entity
* @param recordItem The recordItem in which the contract is created
*/
private void updateContractEntityOnCreate(Contract contract, RecordItem recordItem) {
Expand Down
Loading

0 comments on commit 44a4bd8

Please sign in to comment.