我正在尝试构建一个应用程序脚本来解析新传入的电子邮件,我已经检查了来自特定发件人的现有电子邮件的脚本,它运行良好,但由于我已将其修改为实时运行脚本,所以它向我抛出一个错误:
An error occurred: 'utf-8' codec can't decode byte 0xa9 in position 802: invalid start byte
以下是它的代码:
import imaplib
import email
import re
import yaml
from datetime import datetime
import time
# Function to extract the promo code from email body
def extract_promo_code(body):
promo_code_pattern = r'(?s)Enter (?:code|promo).*?\b([A-Z\d]{10,})'
match = re.search(promo_code_pattern, body, re.MULTILINE)
if match:
return match.group(1)
else:
return None
# Function to extract the expiry date from email body
def extract_expiry_date(body):
expiry_date_pattern = r'Offer valid until ([A-Za-z]+ \d{1,2}, \d{4})'
match = re.search(expiry_date_pattern, body)
if match:
original_date = match.group(1)
parsed_date = datetime.strptime(original_date, '%B %d, %Y')
formatted_date = parsed_date.strftime('%d/%m/%Y')
return formatted_date
else:
return None
# Read credentials from a YAML file
with open('credentials.yaml') as f:
content = f.read()
my_credentials = yaml.load(content, Loader=yaml.FullLoader)
user, password = my_credentials['user'], my_credentials['password']
imap_url = 'imap.gmail.com'
while True:
try:
my_mail = imaplib.IMAP4_SSL(imap_url)
my_mail.login(user, password)
my_mail.select('Inbox')
_, data = my_mail.search(None, 'ALL')
mail_id_list = data[0].split()
for num in mail_id_list:
typ, data = my_mail.fetch(num, '(RFC822)')
msgs = []
for msg in data:
if isinstance(msg, tuple):
my_msg = email.message_from_bytes(msg[1])
# Initialize data fields
msg_to = my_msg['to']
date = my_msg['date']
expiry_date = None
subject = my_msg['subject']
promo_code = None
exclusions = None
supplier = "Supplier ID"
message_id = my_msg['Message-ID']
# Extract promo code from email body
for part in my_msg.walk():
if part.get_content_type() == 'text/plain':
body = part.get_payload(decode=True).decode('utf-8')
promo_code = extract_promo_code(body)
# Extract exclusions if present (you can modify this part)
exclusions_match = re.search(r'\*\s*EXCLUSIONS AND DISCLAIMERS\s*(.*?)Some exclusions apply\.', body, re.IGNORECASE | re.MULTILINE | re.DOTALL)
if exclusions_match:
exclusions = exclusions_match.group(1).strip()
# Extract expiry date from email body
expiry_date = extract_expiry_date(body)
# Check if a promo code was found before printing or saving the extracted data
if promo_code:
# Print or save the extracted data
print('______________________________')
print("msg_to:", msg_to)
print("date:", date)
print("expiry_date:", expiry_date)
print("subject:", subject)
print("promo_code:", promo_code)
print("exclusions:", exclusions)
print("supplier:", supplier)
print("message_id:", message_id)
print('______________________________')
# Close the mailbox
my_mail.logout()
# Sleep for a while before checking for new emails again
time.sleep(60) # Sleep for 60 seconds before checking again
except Exception as e:
print(f"An error occurred: {str(e)}")
即使没有收到电子邮件,代码也会每 60 秒运行一次异常。我只是希望它检查是否有未读的新传入电子邮件,并从中提取所需的数据(如果存在)。
我不太确定,但我猜您的收件箱中某处有一封包含非 UTF-8 字符的电子邮件。由于您的脚本在连续循环中运行,迭代之间有 60 秒的睡眠间隔,因此它每分钟都会尝试读取此电子邮件,然后导致异常。
我建议将解码语句包装在 try-catch 块中以优雅地处理错误:
try:
body = part.get_payload(decode=True).decode('utf-8')
except UnicodeDecodeError:
print(f"Error decoding email {message_id}. Skipping.")
continue
此外,仅处理未读电子邮件而不是所有电子邮件可能是有意义的:
_, data = my_mail.search(None, 'UNSEEN')