boto3 检查 Athena 数据库是否存在

问题描述 投票:0回答:3

我正在制作一个脚本,在 AWS Athena 中创建一个数据库,然后为该数据库创建表,今天数据库创建需要很长时间,因此正在创建的表引用了一个不存在的数据库,有没有办法检查是否存在已经使用 boto3 在 Athena 中创建了数据库?

这是创建数据库的部分:

client = boto3.client('athena')
client.start_query_execution(
    QueryString='create database {}'.format('db_name'),
    ResultConfiguration=config
)
python amazon-web-services boto3 amazon-athena
3个回答
2
投票
# -*- coding: utf-8 -*-
import logging
import os
from time import sleep

import boto3
import pandas as pd
from backports.tempfile import TemporaryDirectory

logger = logging.getLogger(__name__)


class AthenaQueryFailed(Exception):
    pass


class Athena(object):
    S3_TEMP_BUCKET = "please-replace-with-your-bucket"

    def __init__(self, bucket=S3_TEMP_BUCKET):
        self.bucket = bucket
        self.client = boto3.Session().client("athena")


    def execute_query_in_athena(self, query, output_s3_directory, database="csv_dumps"):
        """ Useful when client executes a query in Athena and want result in the given `s3_directory`
        :param query: Query to be executed in Athena
        :param output_s3_directory: s3 path in which client want results to be stored
        :return: s3 path
        """
        response = self.client.start_query_execution(
            QueryString=query,
            QueryExecutionContext={"Database": database},
            ResultConfiguration={"OutputLocation": output_s3_directory},
        )
        query_execution_id = response["QueryExecutionId"]
        filename = "{filename}.csv".format(filename=response["QueryExecutionId"])
        s3_result_path = os.path.join(output_s3_directory, filename)
        logger.info(
            "Query query_execution_id <<{query_execution_id}>>, result_s3path <<{s3path}>>".format(
                query_execution_id=query_execution_id, s3path=s3_result_path
            )
        )
        self.wait_for_query_to_complete(query_execution_id)
        return s3_result_path

    def wait_for_query_to_complete(self, query_execution_id):
        is_query_running = True
        backoff_time = 10
        while is_query_running:
            response = self.__get_query_status_response(query_execution_id)
            status = response["QueryExecution"]["Status"][
                "State"
            ]  # possible responses: QUEUED | RUNNING | SUCCEEDED | FAILED | CANCELLED
            if status == "SUCCEEDED":
                is_query_running = False
            elif status in ["CANCELED", "FAILED"]:
                raise AthenaQueryFailed(status)
            elif status in ["QUEUED", "RUNNING"]:
                logger.info("Backing off for {} seconds.".format(backoff_time))
                sleep(backoff_time)
            else:
                raise AthenaQueryFailed(status)

    def __get_query_status_response(self, query_execution_id):
        response = self.client.get_query_execution(QueryExecutionId=query_execution_id)
        return response

正如上面的答案所指出的,Athena Waiter 仍然没有实现。

我使用这个轻量级的

Athena
客户端来进行查询,当查询完成时它返回结果的s3路径。


1
投票

Athena 的服务员功能尚未实现:Athena Waiter

请参阅:支持 AWS Athena 服务员功能 ,了解可能的解决方法,直到在 Boto3 中实现为止。这就是它在 AWS CLI 中的实现方式。

while True:
    stats = self.athena.get_query_execution(execution_id)
    status = stats['QueryExecution']['Status']['State']
    if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
        break
    time.sleep(0.2)

0
投票

使用 AWS pandas sdk 或

awswrangler

安装:

pip install awswrangler

用途:

import awswrangler as wr

assert "some_db" in wr.catalog.databases().values

更多信息在这里:https://aws-sdk-pandas.readthedocs.io/en/3.5.0/tutorials/006%20-%20Amazon%20Athena.html#Checking/Creating-Glue-Catalog-Databases

© www.soinside.com 2019 - 2024. All rights reserved.