-
Notifications
You must be signed in to change notification settings - Fork 0
/
transaction_explorer.py
105 lines (86 loc) · 3.95 KB
/
transaction_explorer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import requests
import time
from solathon import PublicKey
# URL для подключения к Solana RPC
SOLANA_RPC_URL = 'https://api.mainnet-beta.solana.com'
# Публичный ключ кошелька, который вы хотите отслеживать
PUBLIC_KEY = '46F4WsqP2rwNqFYJRk6ef1hXiw5JpRW8TtKzsHEPpK6r'
pubkey = PublicKey(PUBLIC_KEY)
# Хранилище для уже просмотренных подписей транзакций
seen_signatures = set()
def get_signatures_for_address(pubkey, before=None, limit=1):
headers = {'Content-Type': 'application/json'}
params = {
"jsonrpc": "2.0",
"id": 1,
"method": "getSignaturesForAddress",
"params": [str(pubkey), {"limit": limit, "before": before}]
}
try:
response = requests.post(SOLANA_RPC_URL, headers=headers, json=params)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as http_err:
print(f"HTTP error occurred: {http_err}")
return {}
except Exception as err:
print(f"Other error occurred: {err}")
return {}
def get_transaction_info(tx_signature):
headers = {'Content-Type': 'application/json'}
params = {
"jsonrpc": "2.0",
"id": 1,
"method": "getTransaction",
"params": [tx_signature, {"encoding": "jsonParsed"}]
}
try:
response = requests.post(SOLANA_RPC_URL, headers=headers, json=params)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as http_err:
print(f"HTTP error occurred: {http_err}")
return {}
except Exception as err:
print(f"Other error occurred: {err}")
return {}
def fetch_new_transaction(pubkey):
while True:
try:
# Запрос на получение одной новой транзакции
response = get_signatures_for_address(pubkey)
result = response.get('result', [])
if result:
signature_info = result[0]
signature = signature_info['signature']
if signature not in seen_signatures:
seen_signatures.add(signature)
tx_info = get_transaction_info(signature)
if 'error' in tx_info:
print(f"Error fetching transaction info: {tx_info['error']}")
else:
try:
# Получение данных о транзакции
transaction = tx_info['result']['transaction']
for instruction in transaction['message']['instructions']:
# Проверяем, есть ли поле 'parsed' и является ли тип 'transfer'
if 'parsed' in instruction and instruction['parsed']['type'] == 'transfer':
transfer_info = instruction['parsed']['info']
destination = transfer_info['destination']
lamports = transfer_info['lamports']
source = transfer_info['source']
print(f"Destination: {destination}")
print(f"Lamports: {lamports}")
print(f"Source: {source}")
break
except KeyError as e:
print(f"Key error while parsing transaction info: {e}")
else:
print("No new signatures. Waiting before retrying...")
# Ожидание перед следующим запросом
time.sleep(5)
except Exception as e:
print(f"Error fetching transactions: {e}")
time.sleep(5)
if __name__ == "__main__":
fetch_new_transaction(pubkey)