Skip to content

Commit

Permalink
Merge pull request autonomys#58 from subspace/relayer-check-existing-…
Browse files Browse the repository at this point in the history
…feeds-on-start

relayer: implemented checking existing feeds before creating new ones
  • Loading branch information
isSerge authored Oct 7, 2021
2 parents b8912f8 + df45ac7 commit abdd5df
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 43 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/.idea
/target
relayer/feeds.json
2 changes: 1 addition & 1 deletion relayer/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
node_modules
dist
coverage
coverage
1 change: 1 addition & 0 deletions relayer/feeds.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
5 changes: 5 additions & 0 deletions relayer/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ export const sourceChains = [
paraId: 2088,
chain: "Altair" as ChainName,
},
{
url: "https://parallel-heiko.api.onfinality.io/public",
paraId: 2085,
chain: "Parallel Heiko" as ChainName,
},
]
},
];
Expand Down
66 changes: 35 additions & 31 deletions relayer/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ApiPromise, WsProvider } from "@polkadot/api";
import { merge } from "rxjs";

import { getAccount } from "./account";
import Config, { sourceChains } from "./config";
import Source from "./source";
Expand All @@ -25,42 +26,45 @@ const createApi = async (url: string) => {

// TODO: remove IIFE when Eslint is updated to v8.0.0 (will support top-level await)
(async () => {
const targetApi = await createApi(config.targetChainUrl);
// use getAccount func because we cannot create keyring instance before API is instanciated
const signer = getAccount(config.accountSeed);
try {
const targetApi = await createApi(config.targetChainUrl);

const target = new Target({ api: targetApi, signer, logger });
const target = new Target({ api: targetApi, logger });

const sources = await Promise.all(
config.sourceChains.map(async ({ url, parachains }) => {
const api = await createApi(url);
const chain = await api.rpc.system.chain();
const master = getAccount(config.accountSeed);
const sourceSigner = getAccount(`${config.accountSeed}/${chain}`);
const paraSigners = parachains.map(({ paraId }) => getAccount(`${config.accountSeed}/${paraId}`));
const sources = await Promise.all(
config.sourceChains.map(async ({ url, parachains }) => {
const api = await createApi(url);
const chain = await api.rpc.system.chain();
const master = getAccount(config.accountSeed);
const sourceSigner = getAccount(`${config.accountSeed}/${chain}`);
const paraSigners = parachains.map(({ paraId }) => getAccount(`${config.accountSeed}/${paraId}`));

// TODO: can be optimized by sending batch of txs
// TODO: master has to delegate spending to sourceSigner and paraSigners
for (const delegate of [sourceSigner, ...paraSigners]) {
// send 1.5 units
await target.sendBalanceTx(master, delegate, 1.5);
}
// TODO: can be optimized by sending batch of txs
// TODO: master has to delegate spending to sourceSigner and paraSigners
for (const delegate of [sourceSigner, ...paraSigners]) {
// send 1.5 units
await target.sendBalanceTx(master, delegate, 1.5);
}

const feedId = await target.sendCreateFeedTx(sourceSigner);
const parachainsMap = await createParachainsMap(target, parachains, paraSigners);
// check if feed already exists
const feedId = await target.getFeedId(sourceSigner);
const parachainsMap = await createParachainsMap(target, parachains, paraSigners);

return new Source({
api,
chain: chain.toString() as ChainName,
parachainsMap,
logger,
feedId,
signer: sourceSigner,
});
})
);
return new Source({
api,
chain: chain.toString() as ChainName,
parachainsMap,
logger,
feedId,
signer: sourceSigner,
});
})
);

const blockSubscriptions = merge(...sources.map((source) => source.subscribeBlocks()));
const blockSubscriptions = merge(...sources.map((source) => source.subscribeBlocks()));

target.processSubscriptions(blockSubscriptions).subscribe();
target.processSubscriptions(blockSubscriptions).subscribe();
} catch (error) {
logger.error((error as Error).message);
}
})();
39 changes: 29 additions & 10 deletions relayer/src/target.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import * as fs from 'fs';
import { ApiPromise } from "@polkadot/api";
import { Subscription, EMPTY, catchError } from "rxjs";
import { concatMap, takeWhile } from "rxjs/operators";
import { Logger } from "pino";
import { AddressOrPair } from "@polkadot/api/submittable/types";
import { KeyringPair } from "@polkadot/keyring/types";
import { ISubmittableResult, Observable } from "@polkadot/types/types";
import { EventRecord } from "@polkadot/types/interfaces";
import { U64 } from "@polkadot/types/primitive";
import { Subscription, EMPTY, catchError } from "rxjs";
import { concatMap, takeWhile } from "rxjs/operators";

import { TxData } from "./types";

Expand All @@ -16,18 +17,15 @@ const polkadotAppsUrl =

interface TargetConstructorParams {
api: ApiPromise;
signer: AddressOrPair;
logger: Logger;
}

class Target {
private readonly api: ApiPromise;
private readonly signer: AddressOrPair;
private readonly logger: Logger;

constructor({ api, signer, logger }: TargetConstructorParams) {
constructor({ api, logger }: TargetConstructorParams) {
this.api = api;
this.signer = signer;
this.logger = logger;
this.sendBlockTx = this.sendBlockTx.bind(this);
this.logTxResult = this.logTxResult.bind(this);
Expand Down Expand Up @@ -63,7 +61,7 @@ class Target {
// it is required to specify nonce, otherwise transaction within same block will be rejected
// if nonce is -1 API will do the lookup for the right value
// https://polkadot.js.org/docs/api/cookbook/tx/#how-do-i-take-the-pending-tx-pool-into-account-in-my-nonce
.signAndSend(this.signer, { nonce: -1 }, Promise.resolve)
.signAndSend(signer, { nonce: -1 }, Promise.resolve)
.pipe(
takeWhile(({ status }) => !status.isInBlock, true),
catchError((error) => {
Expand All @@ -74,14 +72,13 @@ class Target {
);
}

// TODO: think about re-using existing feedIds instead of creating
async sendCreateFeedTx(signer: AddressOrPair): Promise<U64> {
private async sendCreateFeedTx(signer: AddressOrPair): Promise<U64> {
this.logger.info(`Creating feed for signer ${(signer as KeyringPair).address}`);
this.logger.debug(`Signer: ${(signer as KeyringPair).address}`);
return new Promise((resolve) => {
this.api.rx.tx.feeds
.create()
.signAndSend(this.signer, { nonce: -1 }, Promise.resolve)
.signAndSend(signer, { nonce: -1 }, Promise.resolve)
.pipe(
takeWhile(({ status }) => !status.isInBlock, true),
catchError((error) => {
Expand Down Expand Up @@ -132,6 +129,28 @@ class Target {
});
}

async getFeedId(signer: AddressOrPair): Promise<U64> {
const { address } = (signer as KeyringPair);
this.logger.info(`Checking feed for ${address}`);

const file = await fs.promises.readFile('./feeds.json', 'utf8');
const feeds = JSON.parse(file);

if (feeds[address]) {
const feedId = this.api.createType("U64", feeds[address]);
this.logger.info(`Feed already exists: ${feedId}`);
return feedId;
}

const feedId = await this.sendCreateFeedTx(signer);

feeds[address] = feedId.toBn();

await fs.promises.writeFile('./feeds.json', JSON.stringify(feeds, null, 4));

return feedId;
}

processSubscriptions(subscription: Observable<TxData>): Observable<Subscription> {
return subscription.pipe(concatMap(this.sendBlockTx));
}
Expand Down
3 changes: 2 additions & 1 deletion relayer/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { EventRecord, Event } from "@polkadot/types/interfaces/system";
import { AddressOrPair } from "@polkadot/api/submittable/types";
import { SignedBlock } from "@polkadot/types/interfaces";

import { ParaHeadAndId, ParachainConfigType, ChainName } from "./types";
import Parachain from "./parachain";
import Target from "./target";
Expand Down Expand Up @@ -43,7 +44,7 @@ export const createParachainsMap = async (

for (const [index, { url, chain, paraId }] of configParachains.entries()) {
const signer = signers[index];
const feedId = await target.sendCreateFeedTx(signer);
const feedId = await target.getFeedId(signer);
const parachain = new Parachain({ feedId, url, chain: chain as ChainName, logger, signer });
map.set(paraId, parachain);
}
Expand Down

0 comments on commit abdd5df

Please sign in to comment.