如何更新 Athena 中枚举分区的值集

问题描述 投票:0回答:1

Parquet 文件存储在 AWS S3 中,前缀如

/fruit=.../year=.../month=.../day=.../

他们的数据通过 AWS Athena 查询,其中

fruit
被键入为
enum
:

'projection.fruit.type'='enum',
'projection.fruit.values'='apple,banana,cherry',

稍后我们可以在前缀

/fruit=date/year=...
下使用 Parquet 文件。
但在使用
apple,banana,cherry,date
更新投影之前,他们的数据在 Athena 中是不可见的。

如何使 Athena 分区与当前 S3 前缀保持同步?
理想情况下是实时的,但目前每天刷新一两次可能是可以接受的折衷方案。
基数不要太高,最多几百个值。

我看到有一种投影类型

injected
可能很合适,但我不确定,我想知道是否有任何缺点。

编辑:

injected
不是解决方案,因为它需要查询提供值来替换位置模板中的占位符,它不会自动从 S3 前缀获取它们。
有效的方法是通过
ALTER TABLE goods SET TBLPROPERTIES ('projection.fruit.values', '...')
手动更新列表,因此也许让 Lambda 函数收集值并定期运行此查询是一种可接受的解决方法。

amazon-web-services amazon-s3 parquet amazon-athena
1个回答
0
投票

目前我已经在 Lambda 函数中实现了手动更新投影:

public async Task<Output> FunctionHandler(ILambdaContext context)
{
    var s3 = new AmazonS3Client();
    var athena = new AmazonAthenaClient();

    async Task<IEnumerable<string>> GetProjectionValues(string prefix)
    {
        var listPrefixesResponse = await s3.ListObjectsV2Async(new ListObjectsV2Request
        {
            BucketName = "...",
            Prefix = prefix,
            Delimiter = "/"
        });

        var prefixes = listPrefixesResponse.CommonPrefixes;

        var values = new List<string>();
        foreach (var value in prefixes.Select(p => p.Split("/", StringSplitOptions.RemoveEmptyEntries).Last().Split("=")[1]))
        {
            values.Add(value);
        }

        return values;
    }

    async Task UpdateProjectionValues(string table, string projection, string values)
    {
        const string DB = "...";
        var alterQueryResponse = await athena.StartQueryExecutionAsync(new StartQueryExecutionRequest
        {
            QueryString = $"ALTER TABLE {DB}.{table} SET TBLPROPERTIES ('projection.{projection}'='{values}')",
            ResultConfiguration = new ResultConfiguration
            {
                OutputLocation = "s3://..."
            }
        });

        var getStatusResponse = await athena.GetQueryExecutionAsync(new GetQueryExecutionRequest
        {
            QueryExecutionId = alterQueryResponse.QueryExecutionId
        });

        var ongoingStatuses = new[] { QueryExecutionState.QUEUED, QueryExecutionState.RUNNING };
        while (ongoingStatuses.Contains(getStatusResponse.QueryExecution.Status.State))
        {
            await Task.Delay(1000);
            getStatusResponse = await athena.GetQueryExecutionAsync(new GetQueryExecutionRequest
            {
                QueryExecutionId = alterQueryResponse.QueryExecutionId
            });
        }

        if (getStatusResponse.QueryExecution.Status.State != QueryExecutionState.SUCCEEDED)
        {
            throw new Exception($"Update of projection {projection} failed: State={getStatusResponse.QueryExecution.Status.State} Reason=\"{getStatusResponse.QueryExecution.Status.StateChangeReason}\".");
        }
    }

    var fruits = await GetProjectionValues($"goods/fruit=");
    await UpdateProjectionValues("goods", "fruit.values", string.Join(",", fruits));

    var currentYear = DateTime.UtcNow.Year;
    await UpdateProjectionValues($"goods", "year.range", $"2024,{currentYear}");
    }

    return new(fruits.ToArray(), Enumerable.Range(2024, currentYear - 2024 + 1).ToArray());
}
© www.soinside.com 2019 - 2024. All rights reserved.