我正在 PythonAnywhere 上运行一个 Python 脚本,该脚本使用 Paramiko 通过 SSH 连接到我的 iPhone 并执行一些命令。此外,我使用 subprocess.Popen 运行另一个脚本 proxy.py,该脚本使用 mitmproxy 来拦截和解密一些请求。这些脚本在我的本地 Windows 计算机上完美运行,但是当我在 PythonAnywhere 上运行它们时,我遇到以下错误:
Starting the process...
Traceback (most recent call last):
File "/home/gaetanoeu/.local/lib/python3.10/site-packages/mitmproxy/proxy/server.py", line 46, in __init__
super().__init__(
File "/home/gaetanoeu/.local/lib/python3.10/site-packages/mitmproxy/net/tcp.py", line 593, in __init__
self.socket.bind(self.address)
OSError: \[Errno 98\] Address already in use
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/gaetanoeu/tokengen/proxy.py", line 46, in \<module\>
m.server = ProxyServer(config)
File "/home/gaetanoeu/.local/lib/python3.10/site-packages/mitmproxy/proxy/server.py", line 54, in __init__
raise exceptions.ServerException(
mitmproxy.exceptions.ServerException: Error starting proxy server: OSError(98, 'Address already in use')
代理.py
from mitmproxy import http
from mitmproxy.options import Options
from mitmproxy.proxy import ProxyConfig, ProxyServer
from mitmproxy.tools.dump import DumpMaster
import json
import re
class Addon:
def request(self, flow: http.HTTPFlow) -> None:
if re.search(r'.*\/auth\/sign_in', flow.request.pretty_url):
self.process_packet(flow, 'Request', flow.request.text, flow.request.headers.get("content-type"))
flow.kill() # Kill the request to prevent it from being sent
def response(self, flow: http.HTTPFlow) -> None:
pass # No need to process responses
def process_packet(self, flow: http.HTTPFlow, packet_type: str, data: str, content_type: str):
if content_type == "application/json":
data = json.loads(data)
if 'device_token' in data:
device_token = data['device_token']
#print(f'Device Token: {device_token}')
# Salvare il token in un file JSON
tokens_file = 'tokens.json'
# Controlla se il file esiste e carica i dati esistenti
try:
with open(tokens_file, 'r') as file:
tokens = json.load(file)
except (FileNotFoundError, json.JSONDecodeError):
tokens = []
# Aggiungi il nuovo token
tokens.append(device_token)
# Scrivi di nuovo il file JSON
with open(tokens_file, 'w') as file:
json.dump(tokens, file, indent=4)
if __name__ == "__main__":
options = Options(listen_port=8888, http2=True)
m = DumpMaster(options, with_termlog=False, with_dumper=False)
config = ProxyConfig(options)
m.server = ProxyServer(config)
m.addons.add(Addon())
try:
print('Starting Dokkan mitmproxy')
m.run()
except KeyboardInterrupt:
m.shutdown()
bot.py
import time
import paramiko
import requests
import json
import subprocess
# Configurazione della connessione
SSH_IP = '192.168.1.89'
SSH_PORT = 22
SSH_USER = 'root'
SSH_PSW = 'alpine'
def send_tokens(file_path, url):
"""
Legge i token da un file JSON e li invia tramite una richiesta POST all'URL specificato.
:param file_path: Il percorso del file JSON contenente i token.
:param url: L'URL a cui inviare i token.
:return: La risposta del server alla richiesta POST.
"""
try:
# Apri e leggi il file JSON
with open(file_path, 'r') as file:
tokens = json.load(file) # Carica direttamente la lista di token
if not isinstance(tokens, list):
print("Il file JSON non contiene una lista di token.")
return
if not tokens:
print("Il file non contiene token.")
return
# Invia i token tramite una richiesta POST
response = requests.post(url, json={"tokens": tokens})
# Controlla lo stato della richiesta
if response.status_code == 201:
print("Token inviati con successo!")
else:
print(f"Errore nell'invio dei token: {response.status_code} - {response.text}")
# Svuota il contenuto del file JSON
open(file_path, 'w').close()
return response
except FileNotFoundError:
print(f"Il file {file_path} non è stato trovato.")
except json.JSONDecodeError:
print("Errore nella decodifica del file JSON.")
except Exception as e:
print(f"Si è verificato un errore: {e}")
def get_device_token():
try:
# Leggi i dati dal file JSON
with open('tokens.json', 'r') as f:
tokens = json.load(f)
# Verifica se la lista dei token non è vuota e restituisce il primo token
if tokens:
return tokens[0]
else:
print("No tokens found in the file.")
return None
except FileNotFoundError:
print("Device token file not found.")
return None
except json.JSONDecodeError:
print("Error decoding JSON from the device token file.")
return None
def execute_ssh_command(command):
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(SSH_IP, SSH_PORT, SSH_USER, SSH_PSW)
stdin, stdout, stderr = client.exec_command(command)
output = stdout.read().decode()
error = stderr.read().decode()
client.close()
return output, error
except paramiko.AuthenticationException:
return None, "Autenticazione fallita, controlla username e password."
except paramiko.SSHException as e:
return None, f"Errore SSH: {e}"
except Exception as e:
return None, f"Errore imprevisto: {e}"
def check_token_count(url):
"""
Controlla il numero di token disponibili tramite una richiesta GET all'URL specificato,
utilizzando un custom User-Agent.
:param url: L'URL da cui recuperare il conteggio dei token.
:return: Il valore del conteggio dei token se la richiesta ha successo, altrimenti None.
"""
headers = {
"User-Agent": "MyCustomUserAgent/1.0" # Imposta qui il tuo custom User-Agent
}
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
return data.get("count", None)
else:
print(f"Errore nella richiesta GET: {response.status_code} - {response.text}")
return None
except Exception as e:
print(f"Si è verificato un errore durante il controllo dei token: {e}")
return None
def main():
print("Starting the process...")
# Start the proxy
subprocess.Popen(["python", "proxy.py"])
# Execute SSH commands
commands = [
'nimbus powerbutton',
'nimbus home',
'curl http://192.168.1.89:8080/control/start_playing?path=/j1jtk65q5pwh2494mhawzg25j/DumpDokkanToken_jpn.at'
]
for command in commands:
output, error = execute_ssh_command(command)
if output:
print(f"Output comando '{command}': {output}")
if error:
print(f"Errore comando '{command}': {error}")
# Wait a few seconds to ensure the proxy has captured the token
# Recupera il device_token
device_token = get_device_token() # Chiamata sincrona
if device_token:
pass
else:
print("Unable to retrieve device token.")
print("Process completed.")
time.sleep(30)
send_tokens('tokens.json', 'https://gaetanoeu.eu.pythonanywhere.com/tokens')
if __name__ == "__main__":
while True:
# Controlla il conteggio dei token ogni minuto
token_count = check_token_count('https://gaetanoeu.eu.pythonanywhere.com/tokens/count')
if token_count is not None and token_count <= 3:
main()
else:
print(f"Conteggio attuale dei token: {token_count}. Non è necessario avviare il processo.")
# Attende 1 minuto prima di eseguire nuovamente il controllo
time.sleep(60)
我尝试更改
proxy.py
脚本中的端口。这解决了“地址已在使用中”错误,但现在脚本挂在“正在启动进程...”和“正在启动 mitmproxy”上,而没有按预期发送 SSH 命令。
您无法在 PythonAnywhere 上运行侦听任意端口的代码。