如何使用 Gmail API 和 Pub/Sub 通知仅处理真正的新 Gmail 邮件?

问题描述 投票:0回答:1

我正在开发一个使用 Gmail API 和 Google Pub/Sub 的应用程序来处理传入的电子邮件。我的目标是只处理真正的新消息,并避免重新处理在标签之间移动的消息(例如,将消息移回收件箱)。

问题: 当前的实现仍然处理移回收件箱的邮件,从而导致重复处理。如何确保只处理真正的新邮件并避免重新处理移回收件箱的邮件?

注意事项:

  • Gmail API 的历史类型:
    ['messageAdded']
    似乎包含已在标签之间移动的邮件。
  • 我正在存储最后处理的历史记录 ID 和时间戳以过滤消息。
  • 我应该跟踪和存储邮件 ID 以避免重新处理,还是有更有效的方法来使用 Gmail API 处理此问题?

当前设置:

  • Pub/Sub 通知:我有一个 Pub/Sub 主题,只要收件箱中添加了新消息,就会触发我的
    process_notification
    功能。
  • 历史记录 API:我正在使用 Gmail 历史记录 API 根据历史记录 ID 获取邮件历史记录。

主要问题:

如何才能最好地仅处理真正的新传入消息,并避免在消息移回收件箱(或任何其他标签)时重新处理消息?

环境:

  • 语言:Python
  • 框架:Flask
  • 数据库:MongoDB
  • Gmail API

尝试的解决方案: 我尝试按内部时间戳过滤消息,仅处理时间戳大于上次处理的消息的消息。我尝试过存储消息 ID,但这会导致应用程序臃肿。这是我的代码的简化版本:

def start_watch():
    credentials = google.oauth2.credentials.Credentials(**session['credentials'])
    service = build('gmail', 'v1', credentials=credentials)
    
    request = {
        'labelIds': ['INBOX'],
        'topicName': f"projects/{create_app().config['GOOGLE_CLOUD_PROJECT_ID']}/topics/{create_app().config['PUBSUB_TOPIC']}",
        'labelFilterBehavior': 'INCLUDE',
        'historyTypes': ['messageAdded']
    }
    response = service.users().watch(userId='me', body=request).execute()

    mongo.db.users.update_one(
        {'google_id': session['google_id']},
        {'$set': {'historyId': response['historyId'], 'watch_state': True}}
    )

def process_notification(data):
    global last_processed_history_id

    payload = base64.urlsafe_b64decode(data['message']['data'])
    message_data = json.loads(payload)
    
    history_id = message_data['historyId']
    email_address = message_data['emailAddress']
    
    current_time = time.time()

    # Debounce logic
    if email_address in last_processed_history_id:
        last_id, last_time = last_processed_history_id[email_address]
        if history_id == last_id and (current_time - last_time) < debounce_interval:
            print(f"Skipping duplicate notification for history ID {history_id}")
            return

    user = mongo.db.users.find_one({'email': email_address})
    if user:
        credentials = google.oauth2.credentials.Credentials(**user['credentials'])

        try:
            # Refresh credentials if necessary
            if credentials.expired and credentials.refresh_token:
                credentials.refresh(Request())
                mongo.db.users.update_one(
                    {'email': email_address},
                    {'$set': {'credentials': credentials_to_dict(credentials)}}
                )
            elif credentials.expired and not credentials.refresh_token:
                print("No refresh token available. User needs to re-authenticate.")
                return redirect(url_for('auth.reauthenticate'))

            service = build('gmail', 'v1', credentials=credentials)

            # Check if the sender is on the block list for the user
            blocked_senders = mongo.db.block_list.find({'receiver_id': user['_id']})
            blocked_emails = [block['email'] for block in blocked_senders]
            if email_address in blocked_emails:
                results = service.users().history().list(userId='me', startHistoryId=history_id, historyTypes=['messageAdded']).execute()
                if 'history' in results:
                    for history in results['history']:
                        if 'messagesAdded' in history:
                            for message in history['messagesAdded']:
                                apply_label(service, message['message']['id'], 'Blocked - PitchSlap')
                                remove_label(service, message['message']['id'], 'INBOX')
                return

            last_history_id = user.get('historyId', '1')
            last_processed_message_id = user.get('lastProcessedMessageId', '0')
            last_processed_timestamp = user.get('lastProcessedTimestamp', 0)

            if int(history_id) <= int(last_history_id):
                print(f"History ID {history_id} has already been processed.")
                return

            results = service.users().history().list(userId='me', startHistoryId=last_history_id, historyTypes=['messageAdded']).execute()

            if 'history' in results:
                for history in results['history']:
                    if 'messagesAdded' in history:
                        for message_added in history['messagesAdded']:
                            message_id = message_added['message']['id']
                            msg = service.users().messages().get(userId='me', id=message_id).execute()
                            
                            if 'INBOX' not in msg.get('labelIds', []):
                                print(f"Message {message_id} not in INBOX.")
                                continue
                                
                            msg_internal_date = int(msg.get('internalDate', 0)) / 1000
                            
                            if msg_internal_date <= last_processed_timestamp:
                                print(f"Message {message_id} with timestamp {msg_internal_date} has already been processed.")
                                continue

                            headers = msg.get('payload', {}).get('headers', [])
                            sender = next((h['value'] for h in headers if h['name'] == 'From'), 'Unknown')
                            receiver = next((h['value'] for h in headers if h['name'] == 'To'), 'Unknown')
                            subject = next((h['value'] for h in headers if h['name'] == 'Subject'), 'No Subject')
                            snippet = msg.get('snippet', '')

                            sender_email = extract_email_address(sender)

                            email_content = f"Subject: {subject}\n\n{snippet}"
                            ai_response = check_sales_email_with_openai(email_content)
                            is_sales_email = ai_response["Sales Email"]
                            confidence = ai_response["Confidence"]

                            if is_sales_email and confidence >= 70:
                                apply_label(service, message_id, 'PitchSlap')
                                remove_label(service, message_id, 'INBOX')
                                pitch = save_sales_pitch(
                                    service, message_id, sender_email, receiver, subject, snippet, email_address, user['_id'],
                                    ai_response
                                )
                                send_template_email(service, sender_email, 'form_request', pitch)
                                if not pitch.get('submitted_at'):
                                    apply_label(service, message_id, 'Quarantined - PitchSlap')

                            print(f"History ID: {history_id}")
                            print(f"Email Address: {email_address}")
                            print(f"Sender: {sender_email}")
                            print(f"Receiver: {receiver}")
                            print(f"Subject: {subject}")
                            print(f"Email Content: {snippet}")
                            print(f"Sales email detected: {is_sales_email}")

                            mongo.db.users.update_one(
                                {'email': email_address},
                                {'$set': {'lastProcessedMessageId': message_id, 'lastProcessedTimestamp': msg_internal_date}}
                            )

        except googleapiclient.errors.HttpError as error:
            if error.resp.status == 404:
                print(f"Message {message_id} not found.")
                continue
            else:
                raise

            # Update the history ID in the database after processing all messages
            mongo.db.users.update_one(
                {'email': email_address},
                {'$set': {'historyId': history_id}}
            )

            # Update the last processed history ID and timestamp
            last_processed_history_id[email_address] = (history_id, current_time)

        except google.auth.exceptions.RefreshError:
            print("Failed to refresh token, please reauthenticate.")
            return redirect(url_for('auth.reauthenticate'))

**用户数据库条目:**

{
  "_id": {"$oid": "6xxxxxa"},
  "google_id": "1xxxxx7",
  "email": "[email protected]",
  "credentials": {
    "token": "yxxxxxx69",
    "refresh_token": "1xxxxxxxM",
    "token_uri": "htxxxxen",
    "client_id": "22xxxxxcom",
    "client_secret": "GOxxxxxc",
    "scopes": [
      "https://www.googleapis.com/auth/userinfo.profile",
      "https://www.googleapis.com/auth/userinfo.email",
      "openid",
      "https://www.googleapis.com/auth/gmail.readonly",
      "https://www.googleapis.com/auth/gmail.modify"
    ]
  },
  "historyId": {"$numberInt": "140436"},
  "watch_state": true,
  "last_processed_timestamp": {"$numberDouble": "1721765846.9436474"},
  "lastProcessedMessageId": "190e1507ff1729a4"
}

控制台。

我重新启动了程序。然后它会检查并重新处理所有内容。正确处理第一封电子邮件。然后我测试了它并将其移动到收件箱中,然后它开始循环处理它。

Starting to process history for user [email protected] from history ID 139567 to 139707
Starting to process history for user [email protected] from history ID 139567 to 139671
Message 190e13de8a820465 not found.
Message 190e13de810426fb not found.
Message 190e13de8a820465 not found.
Starting to process history for user [email protected] from history ID 139567 to 139726
Message 190e13df993fbc8b not found.
Message 190e13de810426fb not found.
Message 190e13dfcdc2e0bd not found.
Message 190e13df993fbc8b not found.
Message 190e13dfe60bbb9b not found.
127.0.0.1 - - [23/Jul/2024 16:37:03] "POST /pubsub HTTP/1.1" 200 -
Message 190e13dfcdc2e0bd not found.
Message 190e13dfe60bbb9b not found.
Message 190e13de8a820465 not found.
127.0.0.1 - - [23/Jul/2024 16:37:03] "POST /pubsub HTTP/1.1" 200 -
Message 190e13de810426fb not found.
Starting to process history for user [email protected] from history ID 139707 to 139808
Starting to process history for user [email protected] from history ID 139707 to 139755
Message 190e13df993fbc8b not found.
Message 190e13dfcdc2e0bd not found.
Message 190e13dfe60bbb9b not found.
127.0.0.1 - - [23/Jul/2024 16:37:04] "POST /pubsub HTTP/1.1" 200 -
Starting to process history for user [email protected] from history ID 139726 to 140355
Message 190e13dfe60bbb9b not found.
127.0.0.1 - - [23/Jul/2024 16:37:04] "POST /pubsub HTTP/1.1" 200 -
Message 190e13dfe60bbb9b not found.
History ID 139587 has already been processed.
127.0.0.1 - - [23/Jul/2024 16:37:04] "POST /pubsub HTTP/1.1" 200 -
127.0.0.1 - - [23/Jul/2024 16:37:04] "POST /pubsub HTTP/1.1" 200 -
History ID 139638 has already been processed.
127.0.0.1 - - [23/Jul/2024 16:37:05] "POST /pubsub HTTP/1.1" 200 -
History ID 139605 has already been processed.
127.0.0.1 - - [23/Jul/2024 16:37:05] "POST /pubsub HTTP/1.1" 200 -
127.0.0.1 - - [23/Jul/2024 16:37:05] "POST /pubsub HTTP/1.1" 200 -
History ID 139650 has already been processed.
127.0.0.1 - - [23/Jul/2024 16:37:05] "POST /pubsub HTTP/1.1" 200 -
Starting to process history for user [email protected] from history ID 140355 to 140426
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Starting to process history for user [email protected] from history ID 140355 to 140456
History ID: 140426
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 16:37:32 -0400
Email Content: you want to buy this lead list or something
Sales email detected: True
Message 190e1507ff1729a4 not in INBOX.
Starting to process history for user [email protected] from history ID 140355 to 140436
Message 190e150993252c08 not in INBOX.
127.0.0.1 - - [23/Jul/2024 16:37:53] "POST /pubsub HTTP/1.1" 200 -
127.0.0.1 - - [23/Jul/2024 16:37:53] "POST /pubsub HTTP/1.1" 200 -
Message 190e1507ff1729a4 not in INBOX.
Message 190e150993252c08 not in INBOX.
127.0.0.1 - - [23/Jul/2024 16:37:53] "POST /pubsub HTTP/1.1" 200 -
Starting to process history for user [email protected] from history ID 140436 to 140507
Starting to process history for user [email protected] from history ID 140436 to 140511
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Starting to process history for user [email protected] from history ID 140436 to 140520
Message 190e150993252c08 not in INBOX.
Message 190e150993252c08 not in INBOX.
127.0.0.1 - - [23/Jul/2024 16:38:36] "POST /pubsub HTTP/1.1" 200 -
History ID: 140507
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 20:37:53 +0000
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/[email protected]
Sales email detected: True
Starting to process history for user [email protected] from history ID 140511 to 140540
127.0.0.1 - - [23/Jul/2024 16:38:37] "POST /pubsub HTTP/1.1" 200 -
Starting to process history for user [email protected] from history ID 140507 to 140563
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Starting to process history for user [email protected] from history ID 140507 to 140575
Message 190e15145fe96b7c not in INBOX.
127.0.0.1 - - [23/Jul/2024 16:38:41] "POST /pubsub HTTP/1.1" 200 -
Message 190e15145fe96b7c not in INBOX.
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
History ID: 140520
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 20:38:38 +0000
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/[email protected]
Sales email detected: True
127.0.0.1 - - [23/Jul/2024 16:38:42] "POST /pubsub HTTP/1.1" 200 -
History ID: 140540
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 20:38:38 +0000
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/[email protected]
Sales email detected: True
Starting to process history for user [email protected] from history ID 140520 to 140595
Starting to process history for user [email protected] from history ID 140520 to 140629
127.0.0.1 - - [23/Jul/2024 16:38:42] "POST /pubsub HTTP/1.1" 200 -
Message 190e15145fe96b7c has already been processed.
Message 190e15145fe96b7c has already been processed.
Starting to process history for user [email protected] from history ID 140540 to 140646
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Starting to process history for user [email protected] from history ID 140540 to 140658
Message 190e151573f8d86e not in INBOX.
Message 190e151573f8d86e not in INBOX.
History ID: 140575
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 13:38:42 -0700
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/[email protected]
Sales email detected: True
Starting to process history for user [email protected] from history ID 140540 to 140677
127.0.0.1 - - [23/Jul/2024 16:38:46] "POST /pubsub HTTP/1.1" 200 -
Message 190e151573f8d86e not in INBOX.
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Starting to process history for user [email protected] from history ID 140575 to 140733
Starting to process history for user [email protected] from history ID 140575 to 140715
History ID: 140595
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 13:38:42 -0700
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/[email protected]
Sales email detected: True
History ID: 140629
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 13:38:42 -0700
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/[email protected]
Sales email detected: True
python flask gmail gmail-api google-cloud-pubsub
1个回答
0
投票

好吧,经过漫长的过程。我找到了解决方案。

  1. 包括时间戳处理
  2. 我需要确保发件人不等于用户。如果用户将电子邮件从标签移回收件箱,这会导致无限循环。
© www.soinside.com 2019 - 2024. All rights reserved.