在 Internet 上搜索使用 Python 访问 Outlook PST 文件的结果很少(而且大多数显示的内容都已过时)。有谁知道如何在有或没有图书馆的情况下阅读 PST?不幸的是,如果没有库的帮助,我的编程能力还不够好,无法构建 PST 阅读器。
我的目标是获取有关内容的以下信息:
我已经尝试过以下操作:
libpff / pypff:崩溃并且似乎在执行某些操作之前读取了内存中的整个文件(没有好的解决方案,因为 PST 文件保存在缓慢的网络存储上)。
Libratom:与基于 libpff 的问题相同。
Libpst:不清楚如何使用它/作为二进制文件提供(没有解释如何安装)/参见这篇文章的答案/似乎没有维护或更新。
win32(在 Outlook 中安装 PST):一个教程展示了如何将 PST 安装到本地安装的 Outlook 中并使用 MAPI 访问获取内容,但这也非常非常慢,并且不像 Outlook 那样是一个好的解决方案需要。
Asponse Email Python:一开始就很有希望,尽管文档不是很好(没有 Python 示例/不同的命名,例如 PersonalStorage 对象和许多其他对象/每个文件夹 50 个项目后停止(可能是非免费版本的限制) ,但由于出版商网站上缺乏解释而不清楚)。
这是来自 Asponse 网站的示例:
personalStorage = PersonalStorage.from_file(dataDir + "Outlook.pst")
folderInfoCollection = personalStorage.root_folder.get_sub_folders()
for folderInfo in folderInfoCollection:
print("Folder: " + folderInfo.display_name)
print("Total Items: " + str(folderInfo.content_count))
print("Total Unread Items: " + str(folderInfo.content_unread_count))
print("----------------------")
我进行了大量的谷歌搜索,找到了合适的导入语句来进行这次运行。
有人有稳定清晰的方法来读取 Outlook PST 文件吗?即使使用 Asponse 的解决方案也会超出 50 个项目的限制。
Redemption(我是它的作者)可以是另一种选择 - 它是扩展 MAPI 的包装,因此您仍然需要安装 Outlook(用于其 MAPI 系统),但与 Outlook 对象模型不同,它可以从一项服务,不需要启动 Outlook.exe 和/或将 PST 文件添加到用户的默认配置文件中。您可以使用 RDOSession.
LogonPstStore
(它创建并删除配置为使用指定 PST 文件的临时配置文件)和/或 RDOSession.Stores.AddPstStore
将 PST 文件添加到现有会话(例如,由Outlook 或由 LogonPstStore
创建)。
我也一直在研究这个问题,终于找到了一个可行的解决方案!这是我的代码,适用于 16GB pst 文件。
from libratom.lib.pff import PffArchive
import os
import json
import re
from tqdm import tqdm
from unidecode import unidecode
import logging
from collections import defaultdict
"""
Needs a .json file email_list.json with the following format:
blacklist - list of blacklisted sender emails
whitelist - list of whitelisted sender emails
"""
# Configure logging
logging.basicConfig(
filename='email_extraction.log',
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logging.info("Logging is configured.")
LIMIT = 100000
# Load whitelist and blacklist from matt_list.yml
with open("email_list.json", "r") as file:
lists = json.load(file)
WHITELIST = lists.get("whitelist", [])
BLACKLIST = lists.get("blacklist", [])
def is_good_email(message, sender_email):
if sender_email in WHITELIST:
return True
if sender_email in BLACKLIST:
return False
return True
def extract_header_info(headers):
# Extract the sender name and email address from the headers
sender_re = re.search(r"From: (.+?) <(.+?)>", headers)
if sender_re:
sender_name = sender_re.group(1).strip('"')
sender_email = sender_re.group(2)
else:
sender_name = "Unknown Sender"
sender_email = "Unknown Email"
# Extract the timestamp from the headers
timestamp_re = re.search(r"Date: (.+)", headers)
if timestamp_re:
timestamp = timestamp_re.group(1)
else:
timestamp = "Unknown Timestamp"
return sender_name, sender_email, timestamp
def clean_subject(subject):
subject_ascii = unidecode(subject)
clean_subject = re.sub(r'[\\/*?:"<>|]', "_", subject_ascii).strip().rstrip(". ")
return clean_subject
def save_body(message, message_folder):
# Extract the email body (plain text, HTML, or RTF)
if message.plain_text_body:
body = message.plain_text_body
body_file = os.path.join(message_folder, "body.txt")
with open(body_file, "w", encoding="utf-8") as f:
f.write(body)
elif message.html_body:
body = message.html_body
body_file = os.path.join(message_folder, "body.html")
with open(body_file, "w", encoding="utf-8") as f:
f.write(body)
elif message.rtf_body:
try:
body = message.rtf_body
body_file = os.path.join(message_folder, "body.rtf")
# Decode RTF body from bytes to string
with open(body_file, "wb") as msg_file:
msg_file.write(body)
except UnicodeEncodeError:
# Handle encoding error by using a different encoding
logging.error("Encoding error encountered while processing RTF body.")
body = "Encoding error: Unable to extract body content."
body_file = os.path.join(message_folder, "body.txt")
with open(body_file, "w", encoding="utf-8") as f:
f.write(body)
else:
logging.warning("No body content found")
body = "No body content available"
body_file = os.path.join(message_folder, "body.txt")
with open(body_file, "w", encoding="utf-8") as f:
f.write(body)
return body_file
def save_attachments(message, message_folder):
# Initialize a list to store attachment paths
attachment_paths = []
try:
# Check if the message has attachments
if message.attachments:
for attachment in message.attachments:
attachment_name = attachment.name or "Unnamed_attachment"
attachment_path = os.path.join(message_folder, attachment_name)
with open(attachment_path, "wb") as f:
f.write(attachment.read_buffer(attachment.get_size()))
attachment_paths.append(attachment_path)
except OSError as e:
logging.error("Error saving attachment %s %s: %s", message.subject, attachment_name, e)
return attachment_paths
def download_emails(pst_file_path, output_folder):
"""Extract and save the first 10 email bodies from the given .pst file."""
# Open the .pst file using PffArchive from libratom
with PffArchive(pst_file_path) as archive:
# Initialize a counter to keep track of the number of processed emails
email_count = 0
name_counts = defaultdict(int)
senders = set()
email_list = []
# Iterate through all folders in the .pst file
for folder in archive.folders():
if folder.name != "Inbox":
continue
# Loop through each message in the folder
for index in tqdm(range(folder.get_number_of_sub_messages())):
# Get the message using the index
message = folder.get_sub_message(index)
if email_count >= LIMIT:
break
if message.subject and message.subject == "Your daily briefing":
continue # spooky stuff
if not message.transport_headers:
logging.warning("No headers found for message %s", message.subject)
continue
header_str = message.transport_headers.strip()
sender_name, sender_email, timestamp = extract_header_info(header_str)
# skip bad emails
if not is_good_email(message, sender_email):
continue
subject = message.subject or "(No Subject)"
clean_subject_name = clean_subject(subject)
# Check for duplicate subject names and append a number to the name
if clean_subject_name in name_counts:
name_counts[clean_subject_name] += 1
clean_subject_name = f"{clean_subject_name}_{name_counts[clean_subject_name]}"
else:
name_counts[clean_subject_name] = 1
message_folder = os.path.join(output_folder, folder.name, clean_subject_name)
try:
os.makedirs(message_folder, exist_ok=True)
except OSError as e:
logging.error("Error creating folder %s: subject %s clean %s", message_folder, subject, clean_subject_name)
continue
body_file = save_body(message, message_folder)
attachment_paths = save_attachments(message, message_folder)
# Add attachment paths to the email dictionary
senders.add(sender_email)
email_list.append({
"subject": subject,
"sender_name": sender_name,
"sender_email": sender_email,
"body": body_file,
"timestamp": timestamp,
"attachments": attachment_paths
})
email_count += 1
print("SENDERS", len(senders))
print("POST FILTER EMAIL COUNT", len(email_list))
with open("emails.json", "w", encoding="utf-8") as json_file:
json.dump(email_list, json_file, indent=4)
def clean_workspace(output_folder):
if os.path.exists(output_folder):
for root, dirs, files in os.walk(output_folder, topdown=False):
for name in files:
os.remove(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
else:
os.makedirs(output_folder, exist_ok=True)
def main():
# Replace with your .pst file path
pst_file_path = 'backup.pst'
output_folder = "./email_data"
clean_workspace(output_folder)
download_emails(pst_file_path, output_folder)
if __name__ == "__main__":
main()