如何使用 Python 解析/读取 Outlook PST 文件?

问题描述 投票:0回答:2

在 Internet 上搜索使用 Python 访问 Outlook PST 文件的结果很少(而且大多数显示的内容都已过时)。有谁知道如何在有或没有图书馆的情况下阅读 PST?不幸的是,如果没有库的帮助,我的编程能力还不够好,无法构建 PST 阅读器。

我的目标是获取有关内容的以下信息:

  • 每个文件夹的项目数
  • 项目类型(邮件、会议、联系人...)
  • 物品尺寸
  • 附件包括尺寸
  • 也许还有其他元数据,如日期、收件人等(可选)

我已经尝试过以下操作:

  1. libpff / pypff:崩溃并且似乎在执行某些操作之前读取了内存中的整个文件(没有好的解决方案,因为 PST 文件保存在缓慢的网络存储上)。

  2. Libratom:与基于 libpff 的问题相同。

  3. Libpst:不清楚如何使用它/作为二进制文件提供(没有解释如何安装)/参见这篇文章的答案/似乎没有维护或更新。

  4. win32(在 Outlook 中安装 PST):一个教程展示了如何将 PST 安装到本地安装的 Outlook 中并使用 MAPI 访问获取内容,但这也非常非常慢,并且不像 Outlook 那样是一个好的解决方案需要。

  5. Asponse Email Python:一开始就很有希望,尽管文档不是很好(没有 Python 示例/不同的命名,例如 PersonalStorage 对象和许多其他对象/每个文件夹 50 个项目后停止(可能是非免费版本的限制) ,但由于出版商网站上缺乏解释而不清楚)。

这是来自 Asponse 网站的示例:

personalStorage = PersonalStorage.from_file(dataDir + "Outlook.pst")

folderInfoCollection = personalStorage.root_folder.get_sub_folders()

for folderInfo in folderInfoCollection:

    print("Folder: " + folderInfo.display_name)
    print("Total Items: " + str(folderInfo.content_count))
    print("Total Unread Items: " + str(folderInfo.content_unread_count))
    print("----------------------")

我进行了大量的谷歌搜索,找到了合适的导入语句来进行这次运行。

有人有稳定清晰的方法来读取 Outlook PST 文件吗?即使使用 Asponse 的解决方案也会超出 50 个项目的限制。

python outlook pst
2个回答
2
投票

Redemption(我是它的作者)可以是另一种选择 - 它是扩展 MAPI 的包装,因此您仍然需要安装 Outlook(用于其 MAPI 系统),但与 Outlook 对象模型不同,它可以从一项服务,不需要启动 Outlook.exe 和/或将 PST 文件添加到用户的默认配置文件中。您可以使用 RDOSession.

LogonPstStore
(它创建并删除配置为使用指定 PST 文件的临时配置文件)和/或 RDOSession
.Stores.AddPstStore
将 PST 文件添加到现有会话(例如,由Outlook 或由
LogonPstStore
创建)。


0
投票

我也一直在研究这个问题,终于找到了一个可行的解决方案!这是我的代码,适用于 16GB pst 文件。

from libratom.lib.pff import PffArchive
import os
import json
import re
from tqdm import tqdm
from unidecode import unidecode
import logging
from collections import defaultdict

"""
    Needs a .json file email_list.json with the following format:
    blacklist - list of blacklisted sender emails
    whitelist - list of whitelisted sender emails
"""

# Configure logging
logging.basicConfig(
    filename='email_extraction.log',
    level=logging.DEBUG,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

logging.info("Logging is configured.")

LIMIT = 100000

# Load whitelist and blacklist from matt_list.yml
with open("email_list.json", "r") as file:
    lists = json.load(file)
    WHITELIST = lists.get("whitelist", [])
    BLACKLIST = lists.get("blacklist", [])

def is_good_email(message, sender_email):
    if sender_email in WHITELIST:
        return True
    if sender_email in BLACKLIST:
        return False
    return True

def extract_header_info(headers):
    # Extract the sender name and email address from the headers
    sender_re = re.search(r"From: (.+?) <(.+?)>", headers)
    if sender_re:
        sender_name = sender_re.group(1).strip('"')
        sender_email = sender_re.group(2)
    else:
        sender_name = "Unknown Sender"
        sender_email = "Unknown Email"

    # Extract the timestamp from the headers
    timestamp_re = re.search(r"Date: (.+)", headers)
    if timestamp_re:
        timestamp = timestamp_re.group(1)
    else:
        timestamp = "Unknown Timestamp"

    return sender_name, sender_email, timestamp

def clean_subject(subject):
    subject_ascii = unidecode(subject)
    clean_subject = re.sub(r'[\\/*?:"<>|]', "_", subject_ascii).strip().rstrip(". ")
    return clean_subject

def save_body(message, message_folder):
    # Extract the email body (plain text, HTML, or RTF)
    if message.plain_text_body:
        body = message.plain_text_body
        body_file = os.path.join(message_folder, "body.txt")
        with open(body_file, "w", encoding="utf-8") as f:
            f.write(body)
    elif message.html_body:
        body = message.html_body
        body_file = os.path.join(message_folder, "body.html")
        with open(body_file, "w", encoding="utf-8") as f:
            f.write(body)
    elif message.rtf_body:
        try:
            body = message.rtf_body
            body_file = os.path.join(message_folder, "body.rtf")
            # Decode RTF body from bytes to string
            with open(body_file, "wb") as msg_file:
                msg_file.write(body)
        except UnicodeEncodeError:
            # Handle encoding error by using a different encoding
            logging.error("Encoding error encountered while processing RTF body.")
            body = "Encoding error: Unable to extract body content."
            body_file = os.path.join(message_folder, "body.txt")
            with open(body_file, "w", encoding="utf-8") as f:
                f.write(body)
    else:
        logging.warning("No body content found")
        body = "No body content available"
        body_file = os.path.join(message_folder, "body.txt")
        with open(body_file, "w", encoding="utf-8") as f:
            f.write(body)

    return body_file

def save_attachments(message, message_folder):
    # Initialize a list to store attachment paths
    attachment_paths = []

    try:
        # Check if the message has attachments
        if message.attachments:
            for attachment in message.attachments:
                attachment_name = attachment.name or "Unnamed_attachment"
                attachment_path = os.path.join(message_folder, attachment_name)
                with open(attachment_path, "wb") as f:
                    f.write(attachment.read_buffer(attachment.get_size()))
                attachment_paths.append(attachment_path)
    except OSError as e:
        logging.error("Error saving attachment %s %s: %s", message.subject, attachment_name, e)

    return attachment_paths

def download_emails(pst_file_path, output_folder):
    """Extract and save the first 10 email bodies from the given .pst file."""
    # Open the .pst file using PffArchive from libratom
    with PffArchive(pst_file_path) as archive:
        # Initialize a counter to keep track of the number of processed emails
        email_count = 0
        name_counts = defaultdict(int)
        senders = set()
        email_list = []

        # Iterate through all folders in the .pst file
        for folder in archive.folders():
            if folder.name != "Inbox":
                continue
            # Loop through each message in the folder
            for index in tqdm(range(folder.get_number_of_sub_messages())):
                # Get the message using the index
                message = folder.get_sub_message(index)

                if email_count >= LIMIT:
                    break

                if message.subject and message.subject == "Your daily briefing":
                    continue # spooky stuff

                if not message.transport_headers:
                    logging.warning("No headers found for message %s", message.subject)
                    continue

                header_str = message.transport_headers.strip()
                sender_name, sender_email, timestamp = extract_header_info(header_str)

                # skip bad emails
                if not is_good_email(message, sender_email):
                    continue

                subject = message.subject or "(No Subject)"
                clean_subject_name = clean_subject(subject)

                # Check for duplicate subject names and append a number to the name
                if clean_subject_name in name_counts:
                    name_counts[clean_subject_name] += 1
                    clean_subject_name = f"{clean_subject_name}_{name_counts[clean_subject_name]}"
                else:
                    name_counts[clean_subject_name] = 1

                message_folder = os.path.join(output_folder, folder.name, clean_subject_name)
                try:
                    os.makedirs(message_folder, exist_ok=True)
                except OSError as e:
                    logging.error("Error creating folder %s: subject %s clean %s", message_folder, subject, clean_subject_name)
                    continue

                body_file = save_body(message, message_folder)
                attachment_paths = save_attachments(message, message_folder)

                # Add attachment paths to the email dictionary
                senders.add(sender_email)
                email_list.append({
                    "subject": subject,
                    "sender_name": sender_name,
                    "sender_email": sender_email,
                    "body": body_file,
                    "timestamp": timestamp,
                    "attachments": attachment_paths
                })

                email_count += 1

    print("SENDERS", len(senders))
    print("POST FILTER EMAIL COUNT", len(email_list))

    with open("emails.json", "w", encoding="utf-8") as json_file:
        json.dump(email_list, json_file, indent=4)

def clean_workspace(output_folder):
    if os.path.exists(output_folder):
        for root, dirs, files in os.walk(output_folder, topdown=False):
            for name in files:
                os.remove(os.path.join(root, name))
            for name in dirs:
                os.rmdir(os.path.join(root, name))
    else:
        os.makedirs(output_folder, exist_ok=True)

def main():
    # Replace with your .pst file path
    pst_file_path = 'backup.pst'
    output_folder = "./email_data"
    clean_workspace(output_folder)

    download_emails(pst_file_path, output_folder)

if __name__ == "__main__":
    main()

© www.soinside.com 2019 - 2024. All rights reserved.