清理和分析红移表

问题描述 投票:0回答:1
import logging
import boto3
import sys
import argparse
from urllib.parse import urlparse
import os
import signal
import time

try:
    sys.path.append(os.path.join(os.path.dirname(__file__), "lib"))
except:
    pass

import analyze_vacuum
import config_constants

# Cluster name should match the connection name created in Glue console
JDBC_PREFIX = "jdbc:"
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)

__version__ = ".10"

# Timeout function to limit total runtime
def timeout_handler(signum, frame):
    raise TimeoutError("The process has exceeded the 8-hour time limit.")

# Set the timeout signal (8 hours = 28,800 seconds)
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(28800)`your text`

def main(argv):
    """
        Triggers the vacuum analyze using a glue connection

        :param argv the arguments to parse
    """
    try:
        # setup cli args
        parser = argparse.ArgumentParser()
        parser.add_argument("--glue-connection", dest="glue_connection", required=True)
        parser.add_argument("--max-unsorted-pct", dest="max_unsorted_pct",
                            help="Maximum unsorted percentage( to consider a table for vacuum : Default = 50")
        parser.add_argument("--min-unsorted-pct", dest="min_unsorted_pct",
                            help="Minimum unsorted percentage( to consider a table for vacuum : Default = 5")
        parser.add_argument("--stats-off-pct", dest="stats_off_pct",
                            help="Minimum stats off percentage( to consider a table for analyze : Default = 10")
        parser.add_argument("--vacuum-parameter", dest="vacuum_parameter",
                            help="Vacuum parameters [ FULL | SORT ONLY | DELETE ONLY | REINDEX ] Default = FULL")
        parser.add_argument("--blacklisted-tables", dest="blacklisted_tables", help="The tables we do not want to Vacuum")
        parser.add_argument("--debug", dest="debug", default=False, help="Generate Debug Output including SQL Statements being run")
        parser.add_argument("--ignore-errors", dest="ignore_errors", default=True,
                            help="Ignore errors raised when running and continue processing")
        parser.add_argument("--max-table-size-mb", dest="max_table_size_mb", type=int, help="Maximum table size in MB : Default = 700*1024 MB")
        parser.add_argument("--query-group", dest="query_group", help="Set the query_group for all queries")
        parser.add_argument("--schema-name", dest="schema_name", help="The Schema to be Analyzed or Vacuumed (REGEX: Default = public")
        parser.add_argument("--slot-count", dest="slot_count", help="Modify the wlm_query_slot_count : Default = 1")
        parser.add_argument("--suppress-cloudwatch", dest="suppress_cw",
                            help="Don't emit CloudWatch metrics for analyze or vacuum when set to True")
        parser.add_argument("--skip-materialized-views", dest="skip_materialized_views", default=True,
                            help="Skip materialized views (added to blacklisted tables)")

        full_args, extra = parser.parse_known_args(argv)
        LOGGER.info(f"Parsed args : {full_args}, extra args : {extra}")

        args = {}
        # remove args that end up as None
        for k, v in vars(full_args).items():
            if v is not None:
                args[k] = v

        # getting boto3 custom session object to access metadata
        my_session = boto3.session.Session()
        region_id = my_session.region_name
        glue_client = my_session.client('glue', region_name=region_id)

        # get the connection through glue client
        catalog_config = glue_client.get_connection(Name=args['glue_connection'])
        connection_props = catalog_config['Connection']['ConnectionProperties']
        # Parse the jdbc url
        connection_dsn = urlparse(connection_props['JDBC_CONNECTION_URL'].removeprefix(JDBC_PREFIX))

        LOGGER.debug("Creating redshift connection : {}, User : {}".format(connection_props['JDBC_CONNECTION_URL'], connection_dsn.username))
        args[config_constants.DB_NAME] = connection_dsn.path[1:]
        args[config_constants.DB_USER] = connection_props['USERNAME']
        args[config_constants.DB_PASSWORD] = connection_props['PASSWORD']
        args[config_constants.DB_HOST] = connection_dsn.hostname
        args[config_constants.DB_PORT] = int(connection_dsn.port)
        
        # Flags for analyze and vacuum
        args['analyze_flag'] = 'True'
        args['vacuum_flag'] = 'True'

        # Step 1: Vacuum first
        LOGGER.info("Starting table vacuuming...")
        args['analyze_flag'] = 'False'  # Turn off analyze temporarily
        result = analyze_vacuum.run_analyze_vacuum(**args)
        if result is not None:
            LOGGER.error("Vacuum failed with result: %s", result)
            sys.exit(result)

        # Step 2: Analyze second
        LOGGER.info("Starting table analysis...")
        args['vacuum_flag'] = 'False'  # Turn off vacuum
        args['analyze_flag'] = 'True'  # Turn on analyze
        result = analyze_vacuum.run_analyze_vacuum(**args)
        if result is not None:
            LOGGER.error("Analysis failed with result: %s", result)
            sys.exit(result)

        LOGGER.info("Process complete.")
        sys.exit(0)
    except TimeoutError as te:
        LOGGER.error("Process timed out: %s", str(te))
        sys.exit(1)
    except Exception as e:
        LOGGER.error("Process failed: %s", str(e))
        sys.exit(1)

if __name__ == "__main__":
    main(sys.argv)

需要首先对所有表运行所有真空,然后分析当前代码一次获取一个表并运行真空并分析每个表我想执行所有表真空广告然后分析而不是逐个分析

上面的程序为每个表运行真空进程过夜,有时需要超过 8 小时,如果超过 8 小时,我们希望终止该进程

python amazon-web-services datatables amazon-redshift vacuum
1个回答
0
投票

请注意,在任何给定时间,您只能在集群上运行一个 VACUUM 命令。如果您尝试同时运行多个真空操作,Amazon Redshift 将返回错误。

用法注释。

© www.soinside.com 2019 - 2024. All rights reserved.