在 cronjob 内废弃网页时响应提前结束

问题描述 投票:0回答:1

我创建了 Cronjob 来每 24 小时执行一次脚本,我注意到当代码在本地计算机上的 cron 进程期间自行编译时会出现此错误,我没有注意到这个问题。

import os
import psycopg2
import re
import requests
import time
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
from loguru import logger
from psycopg2.extras import execute_values
from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter

PSQL_CONN_STRING = os.environ.get("DB_Postgresql","Some env keys")
conn = psycopg2.connect(PSQL_CONN_STRING)

session = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
session.mount('http://', HTTPAdapter(max_retries=retries))
session.mount('https://', HTTPAdapter(max_retries=retries))


def get_list_of_eans(conn) -> tuple[list, list, list]:
   # take data from db and process it


def scrape_page(conn, ean: str, sku: str, supplier: str, sid: str, aid: str):
    search_query = ean if isinstance(ean, int) else ean.replace('"', '')
    if not str(search_query).startswith('PO_'):

        time.sleep(1)
        url = f'https://www.ebay.de/sch/i.html?_from=R42&_nkw={search_query}&_sacat=0&_sop=2&LH_ItemCondition=3&LH_BIN=1&_stpos=10719&_fcid=77&_fspt=1&LH_PrefLoc=99&rt=nc&_sadis=1000'
        res = session.get(url)

        if res.status_code != 200:
            return res.status_code

        soup = BeautifulSoup(res.text, 'html.parser')
        # further code for obtaining information from the site  
        
        
                            data.append(
                        [offer_id, search_query, title, supplier, price, shipping, free_days])
            except Exception as ex:
                logger.error(f'search - {search_query}: {ex}')
        if data:

            try:
                with conn.cursor() as cur:
                    execute_values(cur, '''
                         insert do update on conflict 
                    ''', data)
                    conn.commit()
            except psycopg2.IntegrityError as ie:
                conn.rollback()
                logger.error(f'IntegrityError on insert - {search_query}: {ie}')
            except psycopg2.DatabaseError as de:
                conn.rollback()
                logger.error(f'DatabaseError on insert - {search_query}: {de}')
            except Exception as e:
                conn.rollback()
                logger.error(f'Unexpected error on insert - {search_query}: {e}')

def main():
    list_of_eans, list_sid, list_aid = get_list_of_eans(conn)
    pool = ThreadPoolExecutor(10)
    future_threads = []

    d, dd = 0, 0

    expanded_list_sid = list_sid * (len(list_of_eans) // len(list_sid)) + list_sid[:len(list_of_eans) % len(list_sid)]
    expanded_list_aid = list_aid * (len(list_of_eans) // len(list_aid)) + list_aid[:len(list_of_eans) % len(list_aid)]

    for ean, sid, aid in zip(list_of_eans, expanded_list_sid, expanded_list_aid):
        future = pool.submit(scrape_page, conn, ean[0], ean[1], ean[2], sid, aid)
        future_threads.append(future)
        dd += 1

    for future in future_threads:
        result = future.result()
        if d % 100 == 0:
            logger.info(f'{d} / {dd}, status code: {result}')
        d += 1


if __name__ == '__main__':
    main()

我困扰了很长时间但没有找到答案的问题是整个 Cronjob 被冻结并从服务器窃取了超过 80 GB 的内存。遇到这样的问题我该如何处理?

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/requests/models.py", line 820, in generate
    yield from self.raw.stream(chunk_size, decode_content=True)
  File "/usr/local/lib/python3.10/site-packages/urllib3/response.py", line 1057, in stream
    yield from self.read_chunked(amt, decode_content=decode_content)
  File "/usr/local/lib/python3.10/site-packages/urllib3/response.py", line 1206, in read_chunked
    self._update_chunk_length()
  File "/usr/local/lib/python3.10/site-packages/urllib3/response.py", line 1136, in _update_chunk_length
    raise ProtocolError("Response ended prematurely") from None
urllib3.exceptions.ProtocolError: Response ended prematurely
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/app/./main.py", line 192, in <module>
    main()
  File "/app/./main.py", line 185, in main
    result = future.result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/app/./main.py", line 79, in scrape_page
    res = session.get(url)
  File "/usr/local/lib/python3.10/site-packages/requests/sessions.py", line 602, in get
    return self.request("GET", url, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/requests/sessions.py", line 589, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/local/lib/python3.10/site-packages/requests/sessions.py", line 746, in send
    r.content
  File "/usr/local/lib/python3.10/site-packages/requests/models.py", line 902, in content
    self._content = b"".join(self.iter_content(CONTENT_CHUNK_SIZE)) or b""
  File "/usr/local/lib/python3.10/site-packages/requests/models.py", line 822, in generate
    raise ChunkedEncodingError(e)
requests.exceptions.ChunkedEncodingError: Response ended prematurely

我尝试通过各种方式提升库的版本并保护代码,添加 try 异常块,但我很难确定问题可能出在哪里以及可能导致问题的原因。

python multithreading beautifulsoup python-requests cron
1个回答
0
投票

我前段时间设法解决了这个问题。 我现在有时间写回复了。

更改变量退出并删除挂载http。

retries = Retry(total=5, backoff_factor=1, status_forcelist=[500, 502, 503, 504], raise_on_status=False)
session.mount('https://', HTTPAdapter(max_retries=retries))

我添加了2个功能。

  def make_request(url, retries=3):
    for attempt in range(retries):
        try:
            res = session.get(url, timeout=(5, 20))
            res.raise_for_status()
            return res
        except ChunkedEncodingError as e:
            if attempt < retries - 1:
                time.sleep(2 ** attempt)
                continue
            else:
                raise e
        except Exception as e:
            raise e



def scrape_page_with_retry(conn, ean: str, sku: str, supplier: str, sid: str, aid: str, retries=3):
    for attempt in range(retries):
        try:
            return scrape_page(conn, ean, sku, supplier, sid, aid)
        except ChunkedEncodingError:
            if attempt < retries - 1:
                time.sleep(2 ** attempt)
                continue
            else:
                return 400
        except Exception as e:
            return None

函数make_request

使用 session.get(url, timeout=(5, 20)) 发送 GET 请求,其中 timeout=(5, 20) 表示最多等待 5 秒连接,20 秒响应。

如果失败 3 次,则返回错误。

函数 scrape_page_with_retry

在for循环中,它最多重试执行scrape_page函数。 如果发生 ChunkedEncodingError,它会在等待呈指数增长的时间(2^尝试)后重试。 如果所有尝试后仍然存在错误,则返回 400。 我在这些地方给他们打电话

url = f'https://www.ebay.de/sch/i.html?_from=R42&_nkw={search_query}&_sacat=0&_sop=2&LH_ItemCondition=3&LH_BIN=1&_stpos=10719&_fcid=77&_fspt=1&LH_PrefLoc=99&rt=nc&_sadis=1000'
res = make_request(url)

还有这里

for ean, sid, aid in zip(list_of_eans, expanded_list_sid, expanded_list_aid):
    future = pool.submit(scrape_page_with_retry, conn, ean[0], ean[1], ean[2], sid, aid)
    future_threads.append(future)
    dd += 1

这解决了我的问题。

© www.soinside.com 2019 - 2024. All rights reserved.