Skip to content

Commit

Permalink
Man0s/event pipeline 2 (mrgnlabs#137)
Browse files Browse the repository at this point in the history
* feat(obs): write to BQ

* feat(obs): setup flex template

* chore(obs): structure for multiple job definitions

* chore(obs): update gitignore

* feat(obs): working batch tx process job

* feat(obs): modularize for multiple jobs

* feat(obs): create tx parsing stream job & add missing events to event generator script

* fix(obs): parse JSON stringify data from pub/sub into same format as fed from BQ

* feat(obs): containerize indexer bot

* feat(obs): avoid reading file for each item for versioned IDL

* fix(obs): allow seamless SA auth in indexer bots in GKE

* feat(obs): use latest triton geyser protos

* fix(obs): silently discard parsing transactions from non-supported programs

* feat(obs): WIP account ETL

* feat(obs): namespace IDLs by program

* feat(obs): working account parsing

* feat(obs): ORM for accounts

* feat(obs): stream job for accounts ETL

* fix(obs): wrong field parsing

* feat(obs): setup logging for GCP

* feat(obs): downgrade log severities

* feat(obs): update BQ schema

* feat(obs): add mainnet IDL

* fix(obs): add missing pubkey in parsed accounts

* feat(obs): ETL mainnet config + snapshot bot WIP

* feat(obs): more metrics + lint

* feat(obs): working snapshot bot

* fix(obs): snapshot bugs

* chore: bring indexer in main workspace and fix version conflicts

* feat(obs): account snapshot improvements

* feat(obs): add bank and mint to parsed liquidity change event

* fix(obs): wrong IDL structs padding for ix/event parsing

* feat: switchboard oracle support

* fix: break down account metrics upload

* fix: pin solana deps and update base image rust version for indexer image

* chore: sync cli

* chore: sync cli

* chore: sync fuzz

* chore: remove obsolete arg

* fix: breaking solana dep update + wrapper struct

* chore: format and pin dep

* chore: format

* chore: remove unused file

---------

Co-authored-by: Jakob Povšič <[email protected]>
  • Loading branch information
losman0s and jkbpvsc authored Nov 23, 2023
1 parent e0414f2 commit 6309c44
Show file tree
Hide file tree
Showing 76 changed files with 10,309 additions and 6,151 deletions.
1,346 changes: 1,147 additions & 199 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
[workspace]
members = ["programs/*", "clients/rust/*", "tools/*"]
exclude = ["observability/indexer"]
members = ["programs/*", "clients/rust/*", "tools/*", "observability/indexer"]

[workspace.dependencies]
solana-client = "1.14.13"
Expand All @@ -9,6 +8,9 @@ solana-logger = "1.14.13"
solana-program = "1.14.13"
solana-program-test = "1.14.13"
solana-account-decoder = "1.14.13"
solana-measure = "1.14.13"
solana-metrics = "1.14.13"
solana-transaction-status = "1.14.13"
anchor-lang = "0.26.0"
anchor-spl = "0.26.0"
anchor-client = "0.26.0"
Expand Down
2 changes: 1 addition & 1 deletion clients/rust/marginfi-cli/src/processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1582,7 +1582,7 @@ pub fn marginfi_account_create(profile: &Profile, config: &Config) -> Result<()>
let tx = Transaction::new_signed_with_payer(
&[ix],
Some(&signer.pubkey()),
&[&signer, &marginfi_account_key],
&[signer, &marginfi_account_key],
recent_blockhash,
);

Expand Down
6 changes: 6 additions & 0 deletions observability/etl/dataflow-etls/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Ignore everything except for Python files and the requirements file.
*
!setup.py
!MANIFEST.in
!dataflow_etls/
!jobs/
7 changes: 7 additions & 0 deletions observability/etl/dataflow-etls/.gcloudignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Ignore everything except for Python files and the requirements file.
*
!setup.py
!MANIFEST.in
!Dockerfile
!dataflow_etls/
!jobs/
3 changes: 2 additions & 1 deletion observability/etl/dataflow-etls/.gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
__pycache__
.mypy_cache
.venv
poetry.lock
local_file*
parsed_event_*
.idea*
beam-temp-*
2 changes: 1 addition & 1 deletion observability/etl/dataflow-etls/.mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pretty = False
show_absolute_path = True
show_column_numbers = True
show_error_codes = True
files = src/apologies, tests
files = .
exclude = scripts/playground.py

# This is mostly equivalent to strict=true as of v0.770
Expand Down
27 changes: 12 additions & 15 deletions observability/etl/dataflow-etls/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
FROM gcr.io/dataflow-templates-base/python39-template-launcher-base

ARG JOB_DIRECTORY
ARG WORKDIR=/dataflow/template
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}

COPY requirements.txt .
COPY job.py .
COPY idls .
COPY dataflow_etls .
COPY setup.py .
COPY MANIFEST.in .

ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="/${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/${WORKDIR}/job.py"
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="/${WORKDIR}/setup.py"

# We could get rid of installing libffi-dev and git, or we could leave them.
RUN apt-get update \
&& apt-get install -y libffi-dev git \
&& rm -rf /var/lib/apt/lists/* \
# Upgrade pip and install the requirements.
&& pip install --no-cache-dir --upgrade pip \
&& pip install -U apache-beam==2.44.0 \
&& pip install --no-cache-dir -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE \
# Upgrade pip and install the requirements.
RUN pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir . \
# Download the requirements to speed up launching the Dataflow job.
&& pip download --no-cache-dir --dest /tmp/dataflow-etls-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
&& pip download --no-cache-dir --dest /tmp/dataflow-etls-requirements-cache .

COPY dataflow_etls/ dataflow_etls/
COPY ${JOB_DIRECTORY}/job.py .

# Since we already downloaded all the dependencies, there's no need to rebuild everything.
ENV PIP_NO_DEPS=True
1 change: 1 addition & 0 deletions observability/etl/dataflow-etls/MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include dataflow_etls/idls/**/*.json
9 changes: 3 additions & 6 deletions observability/etl/dataflow-etls/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@ python job.py \
- Build image and upload to Artifact Registry:

```
gcloud builds submit --tag us-east1-docker.pkg.dev/marginfi-dev/main/dataflow/event-parsing-batch:latest .
./scripts/build_job_template <PATH TO JOB DIR> <VERSION>
./scripts/upload_job_template <PATH TO JOB DIR> <VERSION>
```

- Create/Update template and associate metadata file:

```
gcloud dataflow flex-template build \
gs://dataflow_jobs_marginfi_v2/templates/event-parsing-batch.json \
--image "us-east1-docker.pkg.dev/marginfi-dev/main/dataflow/event-parsing-batch:latest" \
--sdk-language "PYTHON" \
--metadata-file "metadata.json"
./scripts/sync_job_template <PATH TO JOB DIR> <VERSION>
```
76 changes: 76 additions & 0 deletions observability/etl/dataflow-etls/dataflow_etls/account_parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import base64
from dataclasses import asdict
from datetime import datetime
from typing import List, TypedDict, Dict, Any, Tuple, Generator
from decimal import Decimal

from anchorpy_core.idl import Idl
from based58 import based58 # type: ignore
from solders.pubkey import Pubkey
import apache_beam as beam # type: ignore
from anchorpy.program.common import NamedInstruction as NamedAccountData

from dataflow_etls.idl_versions import VersionedProgram, IdlPool, Cluster
from dataflow_etls.orm.accounts import ACCOUNT_UPDATE_TO_RECORD_TYPE, AccountUpdateRecord

AccountUpdateRaw = TypedDict('AccountUpdateRaw', {
'id': str,
'created_at': datetime,
'timestamp': datetime,
'owner': str,
'slot': Decimal,
'pubkey': str,
'txn_signature': str,
'lamports': Decimal,
'executable': bool,
'rent_epoch': Decimal,
'data': str,
})


class OwnerProgramNotSupported(Exception):
pass


def parse_account(account_update: AccountUpdateRaw, min_idl_version: int, cluster: Cluster,
idl_pool: IdlPool) -> List[AccountUpdateRecord]:
owner_program_id_str = account_update["owner"]
owner_program_id = Pubkey.from_string(owner_program_id_str)
account_update_slot = int(account_update["slot"])

try:
idl_raw, idl_version = idl_pool.get_idl_for_slot(owner_program_id_str, account_update_slot)
except KeyError:
raise OwnerProgramNotSupported(f"Unsupported program {owner_program_id_str}")

idl = Idl.from_json(idl_raw)
program = VersionedProgram(cluster, idl_version, idl, owner_program_id)

if idl_version < min_idl_version:
return []

account_data_bytes = base64.b64decode(account_update["data"])

try:
parsed_account_data: NamedAccountData = program.coder.accounts.parse(account_data_bytes)
except Exception as e:
print(f"failed to parse account data in update {account_update['id']}", e)
return []

if parsed_account_data.name not in ACCOUNT_UPDATE_TO_RECORD_TYPE:
print(f"discarding unsupported account type {parsed_account_data.name} in update {account_update['id']}")
return []
else:
# noinspection PyPep8Naming
AccountUpdateRecordType = ACCOUNT_UPDATE_TO_RECORD_TYPE[parsed_account_data.name]
return [AccountUpdateRecordType(parsed_account_data, account_update, idl_version)]


class DispatchEventsDoFn(beam.DoFn): # type: ignore
def process(self, record: AccountUpdateRecord, *args: Tuple[Any], **kwargs: Dict[str, Tuple[Any]]) -> Generator[
str, None, None]:
yield beam.pvalue.TaggedOutput(record.get_tag(), record)


def dictionify_record(record: AccountUpdateRecord) -> Dict[str, Any]:
return asdict(record)
86 changes: 64 additions & 22 deletions observability/etl/dataflow-etls/dataflow_etls/idl_versions.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import glob
import os
from pathlib import Path
from typing import List, Literal, Tuple, Optional
from anchorpy import Program, Provider
from typing import List, Literal, Tuple, Optional, Dict
from anchorpy import Program, Provider, Wallet
from anchorpy.utils.rpc import AsyncClient
from anchorpy_core.idl import Idl
from solders.pubkey import Pubkey

Expand All @@ -20,33 +21,74 @@ def __init__(self, cluster: Cluster, version: int, idl: Idl, program_id: Pubkey,
provider: Optional[Provider] = None):
self.version = version
self.cluster = cluster
super(VersionedProgram, self).__init__(idl, program_id, provider)
super(VersionedProgram, self).__init__(idl, program_id,
provider or Provider(AsyncClient("http://localhost:8899"),
Wallet.dummy()))


class VersionedIdl:
VERSIONS: ClusterIdlBoundaries = {"devnet": {
"A7vUDErNPCTt9qrB6SSM4F6GkxzUe9d8P3cXSmRg4eY4": [(196494976, 0), (196520454, 1), (197246719, 2), (197494521, 3)]
}}
# /!\ Boundaries need to be ordered /!\
IDL_VERSIONS: ClusterIdlBoundaries = {
"devnet": {
# "A7vUDErNPCTt9qrB6SSM4F6GkxzUe9d8P3cXSmRg4eY4": [(196494976, 0), (196520454, 1), (197246719, 2), (197494521, 3)],
"5Lt5xXZG7bteZferQk9bsiiAS75JqGVPYcTbB8J6vvJK": [],
},
"mainnet": {
"MFv2hWf31Z9kbCa1snEPYctwafyhdvnV7FZnsebVacA": [],
}
}

@staticmethod
def get_idl_for_slot(cluster: Cluster, program_id: str, slot: int) -> Tuple[Idl, int]:
idl_boundaries = VersionedIdl.VERSIONS[cluster][program_id]

idl_version = None
for boundary_slot, version in idl_boundaries:
# todo: returns latest for upgrade slot, can throw if tx executed in same slot, before upgrade
if boundary_slot > slot:
idl_version = version
break
class ClusterNotSupported(Exception):
pass


if idl_version is None:
class IdlPool:
idls_per_program: Dict[str, Tuple[List[Tuple[int, Tuple[int, str]]], str, int]]

def __init__(self, cluster):
idl_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), f"idls/{cluster}")
try:
boundaries_per_program = IDL_VERSIONS[cluster]
except KeyError:
raise ClusterNotSupported(f"Cluster {cluster} is not supported")

self.idls_per_program = {}

for program_id in boundaries_per_program:
# Find latest IDL
sorted_idls = [int(os.path.basename(path).removesuffix(".json").removeprefix("marginfi-v")) for path in
glob.glob(f"idls/{cluster}/marginfi-v*.json")]
glob.glob(f"{idl_dir}/{program_id}/marginfi-v*.json")]
sorted_idls.sort()
idl_version = sorted_idls[-1]
latest_idl_version = sorted_idls[-1]

path = Path(f"{idl_dir}/{program_id}/marginfi-v{latest_idl_version}.json")
latest_idl_raw = path.read_text()

self.idls_per_program[program_id] = ([], latest_idl_raw, latest_idl_version)

# Load all IDLs
boundaries = boundaries_per_program[program_id]
for boundary in boundaries:
version_end_slot = boundary[0]
idl_version = boundary[1]
path = Path(f"{idl_dir}/{program_id}/marginfi-v{idl_version}.json")
idl_raw = path.read_text()
self.idls_per_program[program_id][0].append((version_end_slot, (idl_version, idl_raw)))

def get_idl_for_slot(self, program_id: str, slot: int) -> Tuple[str, int]:
idl_boundaries, latest_idl, latest_idl_version = self.idls_per_program[program_id]

idl = None
idl_version = None
for version_end_slot, (current_idl_version, current_idl) in idl_boundaries:
# todo: returns latest for upgrade slot, can throw if tx executed in same slot, before upgrade
if version_end_slot > slot:
idl = current_idl
idl_version = current_idl_version
break

path = Path(f"idls/{cluster}/marginfi-v{idl_version}.json")
raw = path.read_text()
idl = Idl.from_json(raw)
if idl is None:
idl = latest_idl
idl_version = latest_idl_version

return idl, idl_version
Loading

0 comments on commit 6309c44

Please sign in to comment.