Python 实时脚本无法检查 gmail 中的新电子邮件

问题描述 投票:0回答:1

我正在尝试构建一个应用程序脚本来解析新传入的电子邮件,我已经检查了来自特定发件人的现有电子邮件的脚本,它运行良好,但由于我已将其修改为实时运行脚本,所以它向我抛出一个错误:

An error occurred: 'utf-8' codec can't decode byte 0xa9 in position 802: invalid start byte 

以下是它的代码:

import imaplib
import email
import re
import yaml
from datetime import datetime
import time

# Function to extract the promo code from email body
def extract_promo_code(body):
    promo_code_pattern = r'(?s)Enter (?:code|promo).*?\b([A-Z\d]{10,})'
    match = re.search(promo_code_pattern, body, re.MULTILINE)
    if match:
        return match.group(1)
    else:
        return None

# Function to extract the expiry date from email body
def extract_expiry_date(body):
    expiry_date_pattern = r'Offer valid until ([A-Za-z]+ \d{1,2}, \d{4})'
    match = re.search(expiry_date_pattern, body)
    if match:
        original_date = match.group(1)
        parsed_date = datetime.strptime(original_date, '%B %d, %Y')
        formatted_date = parsed_date.strftime('%d/%m/%Y')
        return formatted_date
    else:
        return None

# Read credentials from a YAML file
with open('credentials.yaml') as f:
    content = f.read()

my_credentials = yaml.load(content, Loader=yaml.FullLoader)

user, password = my_credentials['user'], my_credentials['password']

imap_url = 'imap.gmail.com'

while True:
    try:
        my_mail = imaplib.IMAP4_SSL(imap_url)
        my_mail.login(user, password)
        my_mail.select('Inbox')

        _, data = my_mail.search(None, 'ALL')

        mail_id_list = data[0].split()
        
        for num in mail_id_list:
            typ, data = my_mail.fetch(num, '(RFC822)')
            msgs = []

            for msg in data:
                if isinstance(msg, tuple):
                    my_msg = email.message_from_bytes(msg[1])

                    # Initialize data fields
                    msg_to = my_msg['to']
                    date = my_msg['date']
                    expiry_date = None
                    subject = my_msg['subject']
                    promo_code = None
                    exclusions = None
                    supplier = "Supplier ID"
                    message_id = my_msg['Message-ID']

                    # Extract promo code from email body
                    for part in my_msg.walk():
                        if part.get_content_type() == 'text/plain':
                            body = part.get_payload(decode=True).decode('utf-8')
                            promo_code = extract_promo_code(body)

                            # Extract exclusions if present (you can modify this part)
                            exclusions_match = re.search(r'\*\s*EXCLUSIONS AND DISCLAIMERS\s*(.*?)Some exclusions apply\.', body, re.IGNORECASE | re.MULTILINE | re.DOTALL)
                            if exclusions_match:
                                exclusions = exclusions_match.group(1).strip()

                            # Extract expiry date from email body
                            expiry_date = extract_expiry_date(body)

                    # Check if a promo code was found before printing or saving the extracted data
                    if promo_code:
                        # Print or save the extracted data
                        print('______________________________')
                        print("msg_to:", msg_to)
                        print("date:", date)
                        print("expiry_date:", expiry_date)
                        print("subject:", subject)
                        print("promo_code:", promo_code)
                        print("exclusions:", exclusions)
                        print("supplier:", supplier)
                        print("message_id:", message_id)
                        print('______________________________')

        # Close the mailbox
        my_mail.logout()

        # Sleep for a while before checking for new emails again
        time.sleep(60)  # Sleep for 60 seconds before checking again
    except Exception as e:
        print(f"An error occurred: {str(e)}")

即使没有收到电子邮件,代码也会每 60 秒运行一次异常。我只是希望它检查是否有未读的新传入电子邮件,并从中提取所需的数据(如果存在)。

python email web-scraping imaplib email-parsing
1个回答
0
投票

我不太确定,但我猜您的收件箱中某处有一封包含非 UTF-8 字符的电子邮件。由于您的脚本在连续循环中运行,迭代之间有 60 秒的睡眠间隔,因此它每分钟都会尝试读取此电子邮件,然后导致异常。

我建议将解码语句包装在 try-catch 块中以优雅地处理错误:

try:
    body = part.get_payload(decode=True).decode('utf-8')
except UnicodeDecodeError:
    print(f"Error decoding email {message_id}. Skipping.")
    continue

此外,仅处理未读电子邮件而不是所有电子邮件可能是有意义的:

_, data = my_mail.search(None, 'UNSEEN')
© www.soinside.com 2019 - 2024. All rights reserved.