我一直在尝试用 python 创建一个程序来分析我在 Raydium 上复制交易的按钱包细分的交易盈亏。到目前为止,我已经尝试了两个代码库,第一个返回 0 结果(我知道我已经复制交易了钱包),并且我通过 Solscan API 设置了这个。另一个我一直在尝试直接在 solana 网络上使用(还没有完全设置它,我只是试图让它为我拉取交易),并且无论通过终端安装 solana 都给我返回此错误: 从 solana.publickey 导入 PublicKey ModuleNotFoundError:没有名为“solana.publickey”的模块
第一个代码库:
`import requests
from datetime import datetime
# Raydium AMM Program IDs
RAYDIUM_AMM_PROGRAM_ID = [
"CPMMoo8L3F4NbTegBCKVNunggL7H1ZpdTHKxQB5qKP1C",
"675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8",
"5quBtoiQqxF9Jv6KYKctB59NT3gtJD2Y65kdnB1Uev3h",
"CAMMCzo5YL8w4VFF8KVHrK22GGUsp5VTaW7grrKgrWqK",
"routeUGWgWzqBWFcrCfv8tritsqukccJPu3q5GPP3xS",
]
# Function to fetch transaction history for an account
def fetch_transaction_history(account, api_key, limit=100, before=None):
url = f"https://pro-api.solscan.io/v1.0/account/transactions?account={account}&limit={limit}"
if before:
url += f"&before={before}"
headers = {"accept": "application/json", "token": api_key}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()
else:
print(f"Failed to fetch transaction history for account {account}: {response.status_code}")
return []
# Function to extract token and amount information from instructions
def extract_token_info(instructions):
for instruction in instructions:
if instruction['programId'] in RAYDIUM_AMM_PROGRAM_ID and instruction['type'] == 'swap':
return instruction['data']
return None
# Function to filter and normalize transactions
def filter_and_normalize_transactions(transactions):
valid_transactions = []
for tx in transactions:
if tx['status'] == 'Success':
token_info = extract_token_info(tx['parsedInstruction'])
if token_info:
valid_transactions.append({
'blockTime': tx['blockTime'],
'amountIn': token_info.get('amountIn', 0),
'amountOut': token_info.get('amountOut', 0),
'tokenAccount': token_info.get('tokenAccount', 'Unknown')
})
return valid_transactions
# Function to match buy transactions between your wallet and a copied wallet
def match_buy_transactions(your_trades, copied_trades):
matched_trades = []
for copied_trade in copied_trades:
copied_instruction_data = extract_token_info(copied_trade['parsedInstruction'])
if copied_instruction_data:
copied_token_account = copied_instruction_data.get('tokenAccount', 'Unknown')
copied_amount_in = copied_instruction_data.get('amountIn', 0)
copied_amount_out = copied_instruction_data.get('amountOut', 0)
if copied_amount_in > copied_amount_out:
for your_trade in your_trades:
your_instruction_data = extract_token_info(your_trade['parsedInstruction'])
if your_instruction_data:
your_token_account = your_instruction_data.get('tokenAccount', 'Unknown')
your_amount_in = your_instruction_data.get('amountIn', 0)
your_amount_out = your_instruction_data.get('amountOut', 0)
if your_amount_in > your_amount_out and copied_token_account == your_token_account:
matched_trades.append(your_trade)
break
return matched_trades
# Function to calculate PnL from matched buys and your sells
def calculate_pnl(matched_buys, sells):
pnl = 0
matched_buys_by_token = {}
for buy in matched_buys:
token = buy['tokenAccount']
if token not in matched_buys_by_token:
matched_buys_by_token[token] = []
matched_buys_by_token[token].append(buy)
for sell in sells:
sell_time = datetime.fromtimestamp(sell['blockTime'])
token = sell['tokenAccount']
if token in matched_buys_by_token:
for buy in matched_buys_by_token[token]:
buy_time = datetime.fromtimestamp(buy['blockTime'])
if buy_time < sell_time:
pnl += sell['amountOut'] - buy['amountIn']
matched_buys_by_token[token].remove(buy)
break
return pnl
# Main script
api_key = "My API key here"
your_wallet = "my wallet"
copied_wallets = ["copy traded wallet"]
# Fetch and normalize your trades
your_trades_raw = fetch_transaction_history(your_wallet, api_key)
your_trades = filter_and_normalize_transactions(your_trades_raw)
your_buys = [trade for trade in your_trades if trade['amountIn'] > trade['amountOut']]
your_sells = [trade for trade in your_trades if trade['amountOut'] > trade['amountIn']]
print(f"Your Buys: {your_buys}")
print(f"Your Sells: {your_sells}")
# Dictionary to store PnL categorized by copied wallets
pnl_breakdown = {}
# Fetch trades for copied wallets and calculate PnL categorized by them
for wallet in copied_wallets:
copied_trades_raw = fetch_transaction_history(wallet, api_key)
copied_trades = filter_and_normalize_transactions(copied_trades_raw)
matched_trades = match_buy_transactions(your_buys, copied_trades)
pnl = calculate_pnl(matched_trades, your_sells)
pnl_breakdown[wallet] = pnl
# Output the detailed PnL breakdown
for wallet, pnl in pnl_breakdown.items():
print(f"PnL for trades triggered by copied wallet {wallet}: {pnl}")
# Overall PnL from all categorized trades
overall_pnl = sum(pnl_breakdown.values())
print(f"Overall PnL from copy traded wallets: {overall_pnl}")
Second code base:
import json
import time
import threading
import numpy as np
from urllib.request import Request, urlopen
from solana.rpc.api import Client
from solana.publickey import PublicKey
# Constants
EXPLORER_URL_TX = "https://explorer.solana.com/tx/"
ADDRESS_RAYDIUM_AMM = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"
ADDRESS_SOLSCAN = "https://api.solscan.io/account?address="
# Solana Client
SOLANA_MAINNET = "https://api.mainnet-beta.solana.com"
client = Client(SOLANA_MAINNET)
# Initialize variables
resultArr = []
maxTxCount = 10000
txCount = 0
lastSignature = None
rounds = 0
def get_tx_detail(tx_signature):
tx = client.get_confirmed_transaction(tx_signature)
if tx['result'] is not None:
post_token_balances = tx["result"]["meta"]["postTokenBalances"]
pre_token_balances = tx["result"]["meta"]["preTokenBalances"]
if post_token_balances != pre_token_balances:
print(EXPLORER_URL_TX + tx_signature)
resultArr.append(tx)
# Fetch Raydium transactions
if __name__ == "__main__":
RaydiumPubKey = PublicKey(ADDRESS_RAYDIUM_AMM)
while True:
print("Round-", rounds + 1)
print("Getting transactions")
txs = client.get_signatures_for_address(RaydiumPubKey, limit=200, before=lastSignature)['result']
print("Processing signatures")
signatures = np.array([tx["signature"] for tx in txs])
threads = []
for signature in signatures:
txCount += 1
thread = threading.Thread(target=get_tx_detail, args=(signature,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
if txCount >= maxTxCount:
break
else:
rounds += 1
lastSignature = txs[-1]["signature"]
time.sleep(3)
# Process transaction details
def process_tx(tx):
tx_signature = tx["result"]["transaction"]["signatures"][0]
tx_sender = tx["result"]["transaction"]["message"]["accountKeys"][0]
block_time = tx["result"]["blockTime"]
slot = tx["result"]["slot"]
post_token_balances = tx["result"]["meta"]["postTokenBalances"]
pre_token_balances = tx["result"]["meta"]["preTokenBalances"]
token_balances = []
for pre, post in zip(pre_token_balances, post_token_balances):
change = 0
if pre["uiTokenAmount"]["uiAmount"] is not None and post["uiTokenAmount"]["uiAmount"] is not None:
change = post["uiTokenAmount"]["uiAmount"] - pre["uiTokenAmount"]["uiAmount"]
owner = pre["owner"]
token = pre["mint"]
if change != 0:
token_balances.append({"owner": owner, "token": token, "change": change})
return {
"txSignature": tx_signature,
"sender": tx_sender,
"blockTime": block_time,
"slot": slot,
"tokenBalances": token_balances
}
processed_tx_arr = [process_tx(tx) for tx in resultArr]
# Map token mint address to readable token names
mint_dic = {}
for tx in processed_tx_arr:
for token_balance in tx["tokenBalances"]:
token = token_balance["token"]
if token not in mint_dic:
req = Request(ADDRESS_SOLSCAN + token, headers={'User-Agent': 'Mozilla/5.0'})
data = json.loads(urlopen(req).read())
mint_dic[token] = data["data"]["tokenInfo"]["name"]
time.sleep(1)
for tx in processed_tx_arr:
for token_balance in tx["tokenBalances"]:
token_balance["token"] = mint_dic.get(token_balance["token"], token_balance["token"])
# Normalize token transactions
normalized_tx_arr = []
for tx in processed_tx_arr:
for token_change in tx["tokenBalances"]:
normalized_tx_arr.append({
'txSignature': tx["txSignature"],
'sender': tx["sender"],
'blockTime': tx["blockTime"],
'slot': tx["slot"],
'owner': token_change["owner"],
'token': token_change["token"],
'change': token_change["change"]
})
# Print normalized transactions
for tx in normalized_tx_arr:
print(tx)
`your text`
# Now, you can implement PnL calculations and subcategorization based on your specific needs.`
您使用的代码似乎适用于旧版本的 solana。从他们的最新代码来看,他们正在使用库
solders中的
solders.pubkey.Pubkey
来代替。解决此问题的另一种方法是使用命令将 solana 版本降级到支持
solana.publickey.PublicKey
的版本
pip install solana=="<older version number here>"