Parquet 文件存储在 AWS S3 中,前缀如
/fruit=.../year=.../month=.../day=.../
。
他们的数据通过 AWS Athena 查询,其中
fruit
被键入为 enum
:
'projection.fruit.type'='enum',
'projection.fruit.values'='apple,banana,cherry',
稍后我们可以在前缀
/fruit=date/year=...
下使用 Parquet 文件。apple,banana,cherry,date
更新投影之前,他们的数据在 Athena 中是不可见的。
如何使 Athena 分区与当前 S3 前缀保持同步?
理想情况下是实时的,但目前每天刷新一两次可能是可以接受的折衷方案。
基数不要太高,最多几百个值。
我看到有一种投影类型
injected
可能很合适,但我不确定,我想知道是否有任何缺点。
编辑:
injected
不是解决方案,因为它需要查询提供值来替换位置模板中的占位符,它不会自动从 S3 前缀获取它们。ALTER TABLE goods SET TBLPROPERTIES ('projection.fruit.values', '...')
手动更新列表,因此也许让 Lambda 函数收集值并定期运行此查询是一种可接受的解决方法。
目前我已经在 Lambda 函数中实现了手动更新投影:
public async Task<Output> FunctionHandler(ILambdaContext context)
{
var s3 = new AmazonS3Client();
var athena = new AmazonAthenaClient();
async Task<IEnumerable<string>> GetProjectionValues(string prefix)
{
var listPrefixesResponse = await s3.ListObjectsV2Async(new ListObjectsV2Request
{
BucketName = "...",
Prefix = prefix,
Delimiter = "/"
});
var prefixes = listPrefixesResponse.CommonPrefixes;
var values = new List<string>();
foreach (var value in prefixes.Select(p => p.Split("/", StringSplitOptions.RemoveEmptyEntries).Last().Split("=")[1]))
{
values.Add(value);
}
return values;
}
async Task UpdateProjectionValues(string table, string projection, string values)
{
const string DB = "...";
var alterQueryResponse = await athena.StartQueryExecutionAsync(new StartQueryExecutionRequest
{
QueryString = $"ALTER TABLE {DB}.{table} SET TBLPROPERTIES ('projection.{projection}'='{values}')",
ResultConfiguration = new ResultConfiguration
{
OutputLocation = "s3://..."
}
});
var getStatusResponse = await athena.GetQueryExecutionAsync(new GetQueryExecutionRequest
{
QueryExecutionId = alterQueryResponse.QueryExecutionId
});
var ongoingStatuses = new[] { QueryExecutionState.QUEUED, QueryExecutionState.RUNNING };
while (ongoingStatuses.Contains(getStatusResponse.QueryExecution.Status.State))
{
await Task.Delay(1000);
getStatusResponse = await athena.GetQueryExecutionAsync(new GetQueryExecutionRequest
{
QueryExecutionId = alterQueryResponse.QueryExecutionId
});
}
if (getStatusResponse.QueryExecution.Status.State != QueryExecutionState.SUCCEEDED)
{
throw new Exception($"Update of projection {projection} failed: State={getStatusResponse.QueryExecution.Status.State} Reason=\"{getStatusResponse.QueryExecution.Status.StateChangeReason}\".");
}
}
var fruits = await GetProjectionValues($"goods/fruit=");
await UpdateProjectionValues("goods", "fruit.values", string.Join(",", fruits));
var currentYear = DateTime.UtcNow.Year;
await UpdateProjectionValues($"goods", "year.range", $"2024,{currentYear}");
}
return new(fruits.ToArray(), Enumerable.Range(2024, currentYear - 2024 + 1).ToArray());
}