我正在开发一个使用 Gmail API 和 Google Pub/Sub 的应用程序来处理传入的电子邮件。我的目标是只处理真正的新消息,并避免重新处理在标签之间移动的消息(例如,将消息移回收件箱)。
问题: 当前的实现仍然处理移回收件箱的邮件,从而导致重复处理。如何确保只处理真正的新邮件并避免重新处理移回收件箱的邮件?
注意事项:
['messageAdded']
似乎包含已在标签之间移动的邮件。当前设置:
process_notification
功能。主要问题:
如何才能最好地仅处理真正的新传入消息,并避免在消息移回收件箱(或任何其他标签)时重新处理消息?
环境:
尝试的解决方案: 我尝试按内部时间戳过滤消息,仅处理时间戳大于上次处理的消息的消息。我尝试过存储消息 ID,但这会导致应用程序臃肿。这是我的代码的简化版本:
def start_watch():
credentials = google.oauth2.credentials.Credentials(**session['credentials'])
service = build('gmail', 'v1', credentials=credentials)
request = {
'labelIds': ['INBOX'],
'topicName': f"projects/{create_app().config['GOOGLE_CLOUD_PROJECT_ID']}/topics/{create_app().config['PUBSUB_TOPIC']}",
'labelFilterBehavior': 'INCLUDE',
'historyTypes': ['messageAdded']
}
response = service.users().watch(userId='me', body=request).execute()
mongo.db.users.update_one(
{'google_id': session['google_id']},
{'$set': {'historyId': response['historyId'], 'watch_state': True}}
)
def process_notification(data):
global last_processed_history_id
payload = base64.urlsafe_b64decode(data['message']['data'])
message_data = json.loads(payload)
history_id = message_data['historyId']
email_address = message_data['emailAddress']
current_time = time.time()
# Debounce logic
if email_address in last_processed_history_id:
last_id, last_time = last_processed_history_id[email_address]
if history_id == last_id and (current_time - last_time) < debounce_interval:
print(f"Skipping duplicate notification for history ID {history_id}")
return
user = mongo.db.users.find_one({'email': email_address})
if user:
credentials = google.oauth2.credentials.Credentials(**user['credentials'])
try:
# Refresh credentials if necessary
if credentials.expired and credentials.refresh_token:
credentials.refresh(Request())
mongo.db.users.update_one(
{'email': email_address},
{'$set': {'credentials': credentials_to_dict(credentials)}}
)
elif credentials.expired and not credentials.refresh_token:
print("No refresh token available. User needs to re-authenticate.")
return redirect(url_for('auth.reauthenticate'))
service = build('gmail', 'v1', credentials=credentials)
# Check if the sender is on the block list for the user
blocked_senders = mongo.db.block_list.find({'receiver_id': user['_id']})
blocked_emails = [block['email'] for block in blocked_senders]
if email_address in blocked_emails:
results = service.users().history().list(userId='me', startHistoryId=history_id, historyTypes=['messageAdded']).execute()
if 'history' in results:
for history in results['history']:
if 'messagesAdded' in history:
for message in history['messagesAdded']:
apply_label(service, message['message']['id'], 'Blocked - PitchSlap')
remove_label(service, message['message']['id'], 'INBOX')
return
last_history_id = user.get('historyId', '1')
last_processed_message_id = user.get('lastProcessedMessageId', '0')
last_processed_timestamp = user.get('lastProcessedTimestamp', 0)
if int(history_id) <= int(last_history_id):
print(f"History ID {history_id} has already been processed.")
return
results = service.users().history().list(userId='me', startHistoryId=last_history_id, historyTypes=['messageAdded']).execute()
if 'history' in results:
for history in results['history']:
if 'messagesAdded' in history:
for message_added in history['messagesAdded']:
message_id = message_added['message']['id']
msg = service.users().messages().get(userId='me', id=message_id).execute()
if 'INBOX' not in msg.get('labelIds', []):
print(f"Message {message_id} not in INBOX.")
continue
msg_internal_date = int(msg.get('internalDate', 0)) / 1000
if msg_internal_date <= last_processed_timestamp:
print(f"Message {message_id} with timestamp {msg_internal_date} has already been processed.")
continue
headers = msg.get('payload', {}).get('headers', [])
sender = next((h['value'] for h in headers if h['name'] == 'From'), 'Unknown')
receiver = next((h['value'] for h in headers if h['name'] == 'To'), 'Unknown')
subject = next((h['value'] for h in headers if h['name'] == 'Subject'), 'No Subject')
snippet = msg.get('snippet', '')
sender_email = extract_email_address(sender)
email_content = f"Subject: {subject}\n\n{snippet}"
ai_response = check_sales_email_with_openai(email_content)
is_sales_email = ai_response["Sales Email"]
confidence = ai_response["Confidence"]
if is_sales_email and confidence >= 70:
apply_label(service, message_id, 'PitchSlap')
remove_label(service, message_id, 'INBOX')
pitch = save_sales_pitch(
service, message_id, sender_email, receiver, subject, snippet, email_address, user['_id'],
ai_response
)
send_template_email(service, sender_email, 'form_request', pitch)
if not pitch.get('submitted_at'):
apply_label(service, message_id, 'Quarantined - PitchSlap')
print(f"History ID: {history_id}")
print(f"Email Address: {email_address}")
print(f"Sender: {sender_email}")
print(f"Receiver: {receiver}")
print(f"Subject: {subject}")
print(f"Email Content: {snippet}")
print(f"Sales email detected: {is_sales_email}")
mongo.db.users.update_one(
{'email': email_address},
{'$set': {'lastProcessedMessageId': message_id, 'lastProcessedTimestamp': msg_internal_date}}
)
except googleapiclient.errors.HttpError as error:
if error.resp.status == 404:
print(f"Message {message_id} not found.")
continue
else:
raise
# Update the history ID in the database after processing all messages
mongo.db.users.update_one(
{'email': email_address},
{'$set': {'historyId': history_id}}
)
# Update the last processed history ID and timestamp
last_processed_history_id[email_address] = (history_id, current_time)
except google.auth.exceptions.RefreshError:
print("Failed to refresh token, please reauthenticate.")
return redirect(url_for('auth.reauthenticate'))
**用户数据库条目:**
{
"_id": {"$oid": "6xxxxxa"},
"google_id": "1xxxxx7",
"email": "[email protected]",
"credentials": {
"token": "yxxxxxx69",
"refresh_token": "1xxxxxxxM",
"token_uri": "htxxxxen",
"client_id": "22xxxxxcom",
"client_secret": "GOxxxxxc",
"scopes": [
"https://www.googleapis.com/auth/userinfo.profile",
"https://www.googleapis.com/auth/userinfo.email",
"openid",
"https://www.googleapis.com/auth/gmail.readonly",
"https://www.googleapis.com/auth/gmail.modify"
]
},
"historyId": {"$numberInt": "140436"},
"watch_state": true,
"last_processed_timestamp": {"$numberDouble": "1721765846.9436474"},
"lastProcessedMessageId": "190e1507ff1729a4"
}
控制台。
我重新启动了程序。然后它会检查并重新处理所有内容。正确处理第一封电子邮件。然后我测试了它并将其移动到收件箱中,然后它开始循环处理它。
Starting to process history for user [email protected] from history ID 139567 to 139707
Starting to process history for user [email protected] from history ID 139567 to 139671
Message 190e13de8a820465 not found.
Message 190e13de810426fb not found.
Message 190e13de8a820465 not found.
Starting to process history for user [email protected] from history ID 139567 to 139726
Message 190e13df993fbc8b not found.
Message 190e13de810426fb not found.
Message 190e13dfcdc2e0bd not found.
Message 190e13df993fbc8b not found.
Message 190e13dfe60bbb9b not found.
127.0.0.1 - - [23/Jul/2024 16:37:03] "POST /pubsub HTTP/1.1" 200 -
Message 190e13dfcdc2e0bd not found.
Message 190e13dfe60bbb9b not found.
Message 190e13de8a820465 not found.
127.0.0.1 - - [23/Jul/2024 16:37:03] "POST /pubsub HTTP/1.1" 200 -
Message 190e13de810426fb not found.
Starting to process history for user [email protected] from history ID 139707 to 139808
Starting to process history for user [email protected] from history ID 139707 to 139755
Message 190e13df993fbc8b not found.
Message 190e13dfcdc2e0bd not found.
Message 190e13dfe60bbb9b not found.
127.0.0.1 - - [23/Jul/2024 16:37:04] "POST /pubsub HTTP/1.1" 200 -
Starting to process history for user [email protected] from history ID 139726 to 140355
Message 190e13dfe60bbb9b not found.
127.0.0.1 - - [23/Jul/2024 16:37:04] "POST /pubsub HTTP/1.1" 200 -
Message 190e13dfe60bbb9b not found.
History ID 139587 has already been processed.
127.0.0.1 - - [23/Jul/2024 16:37:04] "POST /pubsub HTTP/1.1" 200 -
127.0.0.1 - - [23/Jul/2024 16:37:04] "POST /pubsub HTTP/1.1" 200 -
History ID 139638 has already been processed.
127.0.0.1 - - [23/Jul/2024 16:37:05] "POST /pubsub HTTP/1.1" 200 -
History ID 139605 has already been processed.
127.0.0.1 - - [23/Jul/2024 16:37:05] "POST /pubsub HTTP/1.1" 200 -
127.0.0.1 - - [23/Jul/2024 16:37:05] "POST /pubsub HTTP/1.1" 200 -
History ID 139650 has already been processed.
127.0.0.1 - - [23/Jul/2024 16:37:05] "POST /pubsub HTTP/1.1" 200 -
Starting to process history for user [email protected] from history ID 140355 to 140426
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Starting to process history for user [email protected] from history ID 140355 to 140456
History ID: 140426
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 16:37:32 -0400
Email Content: you want to buy this lead list or something
Sales email detected: True
Message 190e1507ff1729a4 not in INBOX.
Starting to process history for user [email protected] from history ID 140355 to 140436
Message 190e150993252c08 not in INBOX.
127.0.0.1 - - [23/Jul/2024 16:37:53] "POST /pubsub HTTP/1.1" 200 -
127.0.0.1 - - [23/Jul/2024 16:37:53] "POST /pubsub HTTP/1.1" 200 -
Message 190e1507ff1729a4 not in INBOX.
Message 190e150993252c08 not in INBOX.
127.0.0.1 - - [23/Jul/2024 16:37:53] "POST /pubsub HTTP/1.1" 200 -
Starting to process history for user [email protected] from history ID 140436 to 140507
Starting to process history for user [email protected] from history ID 140436 to 140511
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Starting to process history for user [email protected] from history ID 140436 to 140520
Message 190e150993252c08 not in INBOX.
Message 190e150993252c08 not in INBOX.
127.0.0.1 - - [23/Jul/2024 16:38:36] "POST /pubsub HTTP/1.1" 200 -
History ID: 140507
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 20:37:53 +0000
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/[email protected]
Sales email detected: True
Starting to process history for user [email protected] from history ID 140511 to 140540
127.0.0.1 - - [23/Jul/2024 16:38:37] "POST /pubsub HTTP/1.1" 200 -
Starting to process history for user [email protected] from history ID 140507 to 140563
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Starting to process history for user [email protected] from history ID 140507 to 140575
Message 190e15145fe96b7c not in INBOX.
127.0.0.1 - - [23/Jul/2024 16:38:41] "POST /pubsub HTTP/1.1" 200 -
Message 190e15145fe96b7c not in INBOX.
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
History ID: 140520
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 20:38:38 +0000
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/[email protected]
Sales email detected: True
127.0.0.1 - - [23/Jul/2024 16:38:42] "POST /pubsub HTTP/1.1" 200 -
History ID: 140540
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 20:38:38 +0000
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/[email protected]
Sales email detected: True
Starting to process history for user [email protected] from history ID 140520 to 140595
Starting to process history for user [email protected] from history ID 140520 to 140629
127.0.0.1 - - [23/Jul/2024 16:38:42] "POST /pubsub HTTP/1.1" 200 -
Message 190e15145fe96b7c has already been processed.
Message 190e15145fe96b7c has already been processed.
Starting to process history for user [email protected] from history ID 140540 to 140646
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Starting to process history for user [email protected] from history ID 140540 to 140658
Message 190e151573f8d86e not in INBOX.
Message 190e151573f8d86e not in INBOX.
History ID: 140575
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 13:38:42 -0700
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/[email protected]
Sales email detected: True
Starting to process history for user [email protected] from history ID 140540 to 140677
127.0.0.1 - - [23/Jul/2024 16:38:46] "POST /pubsub HTTP/1.1" 200 -
Message 190e151573f8d86e not in INBOX.
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Starting to process history for user [email protected] from history ID 140575 to 140733
Starting to process history for user [email protected] from history ID 140575 to 140715
History ID: 140595
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 13:38:42 -0700
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/[email protected]
Sales email detected: True
History ID: 140629
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 13:38:42 -0700
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/[email protected]
Sales email detected: True
好吧,经过漫长的过程。我找到了解决方案。