这是代码:
class TelegramWorker(QThread):
otp_sent = pyqtSignal(object)
auth_complete = pyqtSignal(bool, str)
def __init__(self, action, phone_number=None, otp=None, phone_code_hash=None, password=None):
super().__init__()
self.action = action
self.phone_number = phone_number
self.otp = otp
self.phone_code_hash = phone_code_hash
self.password = password
self.client = None
def run(self):
asyncio.run(self.perform_action())
async def perform_action(self):
try:
self.client = Client(TELEGRAM_SESSION, api_id=API_ID, api_hash=API_HASH)
await self.client.connect()
if self.action == "send_code":
sent_code_info = await self.client.send_code(self.phone_number)
self.otp_sent.emit(sent_code_info)
elif self.action == "auth_with_otp":
try:
await self.client.sign_in(
self.phone_number, self.phone_code_hash, self.otp
)
self.auth_complete.emit(True, "Authentication successful!")
except errors.PhoneCodeInvalid:
self.auth_complete.emit(False, "Invalid confirmation code. Please try again.")
except errors.PhoneCodeExpired:
self.auth_complete.emit(False, "The confirmation code has expired. Request a new one.")
elif self.action == "auth_with_password":
await self.client.check_password(self.password)
self.auth_complete.emit(True, "Authentication successful!")
await self.client.disconnect()
except errors.SessionPasswordNeeded:
self.auth_complete.emit(False, "Password is required for authentication.")
except Exception as e:
self.auth_complete.emit(False, str(e))
class TelegramAuthenticationDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("Telegram Authentication")
self.setFixedSize(400, 300)
self.phone_number = None
self.sent_code = None
self.worker = None
self.resend_timer = QTimer(self) # Add a QTimer for resending OTP
self.resend_timer.timeout.connect(self.update_resend_timer)
self.resend_countdown = 60
self.init_ui()
def init_ui(self):
--- relevant UI code---
def authenticate(self):
otp = self.otp_input.text().strip()
if not otp:
QMessageBox.information(self, "Error", "Enter the OTP.")
return
self.worker = TelegramWorker(action="auth_with_otp", phone_number=self.phone_number, otp=otp, phone_code_hash=self.sent_code.phone_code_hash)
self.worker.auth_complete.connect(self.handle_auth_result)
self.worker.start()
def handle_auth_result(self, success, message):
if success:
QMessageBox.information(self, "Success", message)
self.accept()
else:
QMessageBox.warning(self, "Error", message)
if "password" in message.lower():
self.password_input.setVisible(True)
self.otp_input.setVisible(False)
self.action_button.setText("Authenticate with Password")
self.action_button.clicked.disconnect()
self.action_button.clicked.connect(self.authenticate_with_password)
elif "expired" in message.lower() or "invalid" in message.lower():
self.otp_input.clear()
self.action_button.setText("Send OTP on Telegram")
self.action_button.clicked.disconnect()
self.action_button.clicked.connect(self.send_otp)
self.otp_input.setVisible(False)
self.resend_label.setVisible(False)
def authenticate_with_password(self):
password = self.password_input.text().strip()
if not password:
QMessageBox.information(self, "Error", "Enter your Telegram password.")
return
self.worker = TelegramWorker(action="auth_with_password", password=password)
self.worker.auth_complete.connect(self.handle_auth_result)
self.worker.start()
def start_resend_timer(self):
self.resend_countdown = 180
self.resend_label.setText(f"Didn't receive OTP? Resend in {self.resend_countdown} seconds")
self.resend_timer.start(1000)
def update_resend_timer(self):
self.resend_countdown -= 1
if self.resend_countdown <= 0:
self.resend_timer.stop()
self.resend_label.setText("Didn't receive OTP? Resend")
self.action_button.setText("Resend OTP")
self.action_button.clicked.disconnect()
self.action_button.clicked.connect(self.resend_otp)
else:
self.resend_label.setText(f"Didn't receive OTP? Resend in {self.resend_countdown} seconds")
def resend_otp(self):
self.resend_label.setVisible(False)
self.otp_input.clear()
self.otp_input.setVisible(False)
self.send_otp()
现在,即使立即输入 OTP(几秒钟内),它也会显示错误 - “确认码已过期。请求新的。”。我认为这是由于热解图库的错误或问题造成的。我尝试了所有不同的方法来确认 OTP,但没有人成功。
请帮助解决此问题。预先感谢。
我可以帮你检查以下解决方案
async def perform_action(self):
....
elif self.action == "auth_with_otp":
try:
# Ensure the client remains connected during OTP verification
await self.client.sign_in(
self.phone_number, self.phone_code_hash, self.otp
)
self.auth_complete.emit(True, "Authentication successful!")
.....
# add this
except Exception as e:
self.auth_complete.emit(False, str(e))
......
def handle_auth_result(self, success, message):
.....
# Trigger resend logic
self.start_resend_timer()
self.resend_countdown = 180 # check the suitable value here