我正在运行此代码来开发网络抓取工具(我决定在聊天 GTP 的帮助下自己编写代码)
我已将我的index.html放在模板文件夹中,当我从Pycharm运行它时,一切都按预期工作,但是当我尝试使用由CX_freeze插件生成的exe文件从另一台PC运行它时,它找不到索引HTML。 我收到此错误:
未找到模板 jinja2.exceptions.TemplateNotFound:index.html
我尝试了 chat_GTP 的多种解决方案,我的假设是问题在于整个项目位于 ONEdrive 上。欢迎任何帮助。`
That's the code i use for my app:
from flask import Flask, render_template, jsonify, request, Response
import os
import re
import logging
import json
import requests
import threading
import pandas as pd
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError
import time
import atexit
import sys
from openpyxl import load_workbook, Workbook
from openpyxl.styles import PatternFill
app = Flask(__name__)
# Set up logging
logging.basicConfig(filename='app.log', level=logging.DEBUG, format='%(asctime)s %(message)s')
MAX_ADDITIONAL_PAGES = 3
MAX_WORKERS = 5
MAX_TIMEOUT = 50 # Maximum time for each search operation in seconds
# Determine base directory
if getattr(sys, 'frozen', False):
base_dir = os.path.dirname(sys.executable)
else:
base_dir = os.path.dirname(os.path.abspath(__file__))
# Update paths for bundled application
def resource_path(relative_path):
""" Get absolute path to resource, works for dev and for PyInstaller """
try:
base_path = sys._MEIPASS
except AttributeError:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
# Use OneDrive directory for JSON files
onedrive_base_dir = os.path.expanduser(r"~\OneDrive - The People Group\General - Web Scrapper APP\Final_excrator")
if not os.path.isdir(onedrive_base_dir):
raise FileNotFoundError(f"OneDrive directory not found: {onedrive_base_dir}")
def onedrive_path(filename):
return os.path.join(onedrive_base_dir, filename)
timer_file = onedrive_path('timer.json')
keywords_file = onedrive_path('keywords.json')
config_file = onedrive_path('config.json')
invalid_keywords_file = onedrive_path('invalid_keywords.json')
search_within_website_file = onedrive_path('search_within_website.json')
stop_thread_event = threading.Event()
output_lines = []
extractor_thread = None
# Function to get OneDrive directory dynamically
def get_onedrive_directory():
possible_directories = [
os.getenv('ONEDRIVE_DIRECTORY'),
os.path.expanduser(r"~\OneDrive - The People Group\Web Scrapper APP - General\Final_excrator\Data"),
os.path.expanduser(r"~\The People Group\Web Scrapper APP - Final_excrator\Data"),
os.path.expanduser(r"~\OneDrive - The People Group\Web Scrapper APP - General\Final_excrator\Data"),
os.path.expanduser(r"~\OneDrive - The People Group\General - Web Scrapper APP\Final_excrator\Data")
]
for directory in possible_directories:
if directory and os.path.isdir(directory):
logging.info(f"Found OneDrive directory: {directory}")
return directory
logging.error("OneDrive directory not found in any of the specified locations.")
return None
onedrive_directory = get_onedrive_directory()
# Load configuration from file
def load_config():
if os.path.exists(config_file):
with open(config_file, 'r') as f:
return json.load(f)
return {}
config = load_config()
directories = [onedrive_base_dir]
# Fallback to the current directory if OneDrive directory is not found
if not onedrive_directory:
logging.warning("OneDrive directory not found, falling back to current directory.")
onedrive_directory = base_dir
@app.route('/')
def index():
return render_template('index.html')
@app.route('/start', methods=['POST'])
def start_script():
global extractor_thread, stop_thread_event
if extractor_thread and extractor_thread.is_alive():
return jsonify({'status': 'already running'})
try:
stop_thread_event.clear()
extractor_thread = threading.Thread(target=run_extractor)
extractor_thread.start()
return jsonify({'status': 'started'})
except Exception as e:
logging.error(f'Failed to start extractor: {e}')
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/stop', methods=['POST'])
def stop_script():
global stop_thread_event
if extractor_thread and extractor_thread.is_alive():
stop_thread_event.set()
extractor_thread.join() # Wait for the thread to finish
logging.info('Extractor stopped')
output_lines.append('Extractor stopped')
return jsonify({'status': 'stopped'})
else:
return jsonify({'status': 'not running'})
@app.route('/log_stream')
def log_stream():
def generate():
global output_lines
while True:
if output_lines:
yield f"data: {output_lines.pop(0)}\n\n"
time.sleep(1)
return Response(generate(), mimetype='text/event-stream')
@app.route('/get_timer', methods=['GET'])
def get_timer_route():
return jsonify({'RUN_INTERVAL': get_timer()})
@app.route('/set_timer', methods=['POST'])
def set_timer_route():
timer_value = request.json.get('RUN_INTERVAL', 300)
set_timer(timer_value)
return jsonify({'status': 'timer updated'})
@app.route('/get_keywords', methods=['GET'])
def get_keywords_route():
keywords = get_keywords()
logging.info(f"Keywords fetched: {keywords}") # Add logging
return jsonify(keywords)
@app.route('/update_keywords', methods=['POST'])
def update_keywords_route():
new_keywords = request.json.get('keywords', [])
set_keywords(new_keywords)
logging.info(f"Keywords updated: {new_keywords}") # Add logging
return jsonify({'status': 'keywords updated'})
@app.route('/get_invalid_keywords', methods=['GET'])
def get_invalid_keywords_route():
invalid_keywords = get_invalid_keywords()
logging.info(f"Invalid Keywords fetched: {invalid_keywords}") # Add logging
return jsonify(invalid_keywords)
@app.route('/update_invalid_keywords', methods=['POST'])
def update_invalid_keywords_route():
new_invalid_keywords = request.json.get('invalid_keywords', [])
set_invalid_keywords(new_invalid_keywords)
logging.info(f"Invalid Keywords updated: {new_invalid_keywords}") # Add logging
return jsonify({'status': 'invalid keywords updated'})
@app.route('/get_search_terms', methods=['GET'])
def get_search_terms_route():
search_terms = get_search_within_terms()
logging.info(f"Search Terms fetched: {search_terms}") # Add logging
return jsonify(search_terms)
@app.route('/update_search_terms', methods=['POST'])
def update_search_terms_route():
new_search_terms = request.json.get('search_terms', [])
set_search_terms(new_search_terms)
logging.info(f"Search Terms updated: {new_search_terms}") # Add logging
return jsonify({'status': 'search terms updated'})
@app.route('/directory', methods=['POST'])
def open_directory():
directory_path = None
for dir in directories:
if os.path.isdir(dir):
directory_path = dir
break
if directory_path:
try:
os.startfile(directory_path)
return jsonify({'status': 'directory opened'})
except Exception as e:
logging.error(f'Failed to open directory: {e}')
return jsonify({'status': 'error', 'message': str(e)})
else:
message = 'Directory does not exist in any of the specified locations.'
logging.error(message)
return jsonify({'status': 'error', 'message': message})
@app.route('/source_file', methods=['POST'])
def open_source_file():
input_filename = "report1718353090095_full.xlsx"
file_path = None
for directory in directories:
potential_path = os.path.join(directory, input_filename)
logging.debug(f"Checking path for source file: {potential_path}")
if os.path.isfile(potential_path):
file_path = potential_path
break
if file_path:
try:
os.startfile(file_path)
return jsonify({'status': 'source file opened'})
except Exception as e:
logging.error(f'Failed to open source file: {e}')
return jsonify({'status': 'error', 'message': str(e)})
else:
message = 'Source file does not exist in any of the specified directories.'
logging.error(message)
return jsonify({'status': 'error', 'message': message})