PythonAnywhere 上的 Paramiko 和 mitmproxy:启动代理服务器时出现“OSError: [Errno 98] 地址已在使用中”

问题描述 投票:0回答:1

我正在 PythonAnywhere 上运行一个 Python 脚本,该脚本使用 Paramiko 通过 SSH 连接到我的 iPhone 并执行一些命令。此外,我使用 subprocess.Popen 运行另一个脚本 proxy.py,该脚本使用 mitmproxy 来拦截和解密一些请求。这些脚本在我的本地 Windows 计算机上完美运行,但是当我在 PythonAnywhere 上运行它们时,我遇到以下错误:

Starting the process...
Traceback (most recent call last):
File "/home/gaetanoeu/.local/lib/python3.10/site-packages/mitmproxy/proxy/server.py", line 46, in __init__
super().__init__(
File "/home/gaetanoeu/.local/lib/python3.10/site-packages/mitmproxy/net/tcp.py", line 593, in __init__
self.socket.bind(self.address)
OSError: \[Errno 98\] Address already in use
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/gaetanoeu/tokengen/proxy.py", line 46, in \<module\>
m.server = ProxyServer(config)
File "/home/gaetanoeu/.local/lib/python3.10/site-packages/mitmproxy/proxy/server.py", line 54, in __init__
raise exceptions.ServerException(
mitmproxy.exceptions.ServerException: Error starting proxy server: OSError(98, 'Address already in use')  

代理.py

from mitmproxy import http
from mitmproxy.options import Options
from mitmproxy.proxy import ProxyConfig, ProxyServer
from mitmproxy.tools.dump import DumpMaster
import json
import re

class Addon:
    def request(self, flow: http.HTTPFlow) -> None:
        if re.search(r'.*\/auth\/sign_in', flow.request.pretty_url):
            self.process_packet(flow, 'Request', flow.request.text, flow.request.headers.get("content-type"))
            flow.kill()  # Kill the request to prevent it from being sent

    def response(self, flow: http.HTTPFlow) -> None:
        pass  # No need to process responses

    def process_packet(self, flow: http.HTTPFlow, packet_type: str, data: str, content_type: str):
        if content_type == "application/json":
            data = json.loads(data)

        if 'device_token' in data:
            device_token = data['device_token']
            #print(f'Device Token: {device_token}')

            # Salvare il token in un file JSON
            tokens_file = 'tokens.json'
            # Controlla se il file esiste e carica i dati esistenti
            try:
                with open(tokens_file, 'r') as file:
                    tokens = json.load(file)
            except (FileNotFoundError, json.JSONDecodeError):
                tokens = []

            # Aggiungi il nuovo token
            tokens.append(device_token)

            # Scrivi di nuovo il file JSON
            with open(tokens_file, 'w') as file:
                json.dump(tokens, file, indent=4)

if __name__ == "__main__":
    options = Options(listen_port=8888, http2=True)
    m = DumpMaster(options, with_termlog=False, with_dumper=False)
    config = ProxyConfig(options)

    m.server = ProxyServer(config)
    m.addons.add(Addon())

    try:
        print('Starting Dokkan mitmproxy')
        m.run()
    except KeyboardInterrupt:
        m.shutdown()

bot.py

import time
import paramiko
import requests
import json
import subprocess

# Configurazione della connessione
SSH_IP = '192.168.1.89'
SSH_PORT = 22
SSH_USER = 'root'
SSH_PSW = 'alpine'

def send_tokens(file_path, url):
    """
    Legge i token da un file JSON e li invia tramite una richiesta POST all'URL specificato.

    :param file_path: Il percorso del file JSON contenente i token.
    :param url: L'URL a cui inviare i token.
    :return: La risposta del server alla richiesta POST.
    """
    try:
        # Apri e leggi il file JSON
        with open(file_path, 'r') as file:
            tokens = json.load(file)  # Carica direttamente la lista di token

        if not isinstance(tokens, list):
            print("Il file JSON non contiene una lista di token.")
            return

        if not tokens:
            print("Il file non contiene token.")
            return

        # Invia i token tramite una richiesta POST
        response = requests.post(url, json={"tokens": tokens})

        # Controlla lo stato della richiesta
        if response.status_code == 201:
            print("Token inviati con successo!")
        else:
            print(f"Errore nell'invio dei token: {response.status_code} - {response.text}")

        # Svuota il contenuto del file JSON
        open(file_path, 'w').close()

        return response
    except FileNotFoundError:
        print(f"Il file {file_path} non è stato trovato.")
    except json.JSONDecodeError:
        print("Errore nella decodifica del file JSON.")
    except Exception as e:
        print(f"Si è verificato un errore: {e}")


def get_device_token():
    try:
        # Leggi i dati dal file JSON
        with open('tokens.json', 'r') as f:
            tokens = json.load(f)

        # Verifica se la lista dei token non è vuota e restituisce il primo token
        if tokens:
            return tokens[0]
        else:
            print("No tokens found in the file.")
            return None

    except FileNotFoundError:
        print("Device token file not found.")
        return None
    except json.JSONDecodeError:
        print("Error decoding JSON from the device token file.")
        return None


def execute_ssh_command(command):
    try:
        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        client.connect(SSH_IP, SSH_PORT, SSH_USER, SSH_PSW)
        stdin, stdout, stderr = client.exec_command(command)
        output = stdout.read().decode()
        error = stderr.read().decode()
        client.close()
        return output, error
    except paramiko.AuthenticationException:
        return None, "Autenticazione fallita, controlla username e password."
    except paramiko.SSHException as e:
        return None, f"Errore SSH: {e}"
    except Exception as e:
        return None, f"Errore imprevisto: {e}"


def check_token_count(url):
    """
    Controlla il numero di token disponibili tramite una richiesta GET all'URL specificato,
    utilizzando un custom User-Agent.

    :param url: L'URL da cui recuperare il conteggio dei token.
    :return: Il valore del conteggio dei token se la richiesta ha successo, altrimenti None.
    """
    headers = {
        "User-Agent": "MyCustomUserAgent/1.0"  # Imposta qui il tuo custom User-Agent
    }

    try:
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            data = response.json()
            return data.get("count", None)
        else:
            print(f"Errore nella richiesta GET: {response.status_code} - {response.text}")
            return None
    except Exception as e:
        print(f"Si è verificato un errore durante il controllo dei token: {e}")
        return None



def main():
    print("Starting the process...")

    # Start the proxy
    subprocess.Popen(["python", "proxy.py"])

    # Execute SSH commands
    commands = [
        'nimbus powerbutton',
        'nimbus home',
        'curl http://192.168.1.89:8080/control/start_playing?path=/j1jtk65q5pwh2494mhawzg25j/DumpDokkanToken_jpn.at'
    ]
    for command in commands:
        output, error = execute_ssh_command(command)
        if output:
            print(f"Output comando '{command}': {output}")
        if error:
            print(f"Errore comando '{command}': {error}")

    # Wait a few seconds to ensure the proxy has captured the token

    # Recupera il device_token
    device_token = get_device_token()  # Chiamata sincrona
    if device_token:
        pass
    else:
        print("Unable to retrieve device token.")

    print("Process completed.")
    time.sleep(30)
    send_tokens('tokens.json', 'https://gaetanoeu.eu.pythonanywhere.com/tokens')


if __name__ == "__main__":
    while True:
        # Controlla il conteggio dei token ogni minuto
        token_count = check_token_count('https://gaetanoeu.eu.pythonanywhere.com/tokens/count')
        if token_count is not None and token_count <= 3:
            main()
        else:
            print(f"Conteggio attuale dei token: {token_count}. Non è necessario avviare il processo.")

        # Attende 1 minuto prima di eseguire nuovamente il controllo
        time.sleep(60)

我尝试更改

proxy.py
脚本中的端口。这解决了“地址已在使用中”错误,但现在脚本挂在“正在启动进程...”和“正在启动 mitmproxy”上,而没有按预期发送 SSH 命令。

python subprocess paramiko pythonanywhere mitmproxy
1个回答
0
投票

您无法在 PythonAnywhere 上运行侦听任意端口的代码。

© www.soinside.com 2019 - 2024. All rights reserved.