我在 Kafka 中有一些主题正在将 AVRO 文件写入 S3 存储桶,我想使用 AWS Athena 对存储桶执行一些查询。
我正在尝试创建一个表,但 AWS Glue 爬网程序运行但不添加我的表(如果我将文件类型更改为 JSON,它会起作用)。我尝试从 Athena 控制台创建一个表,但它不显示对 AVRO 文件的支持。
关于如何让它发挥作用有什么想法吗?
我建议手动完成,而不是通过胶水。胶水仅适用于最基本的情况,不幸的是,这超出了这一范围。
您可以在此处找到有关如何创建 Avro 表的文档:https://docs.aws.amazon.com/athena/latest/ug/avro.html
Avro 表的注意事项是您需要指定表列和 Avro 架构。这可能看起来很奇怪和多余,但这就是 Athena/Presto 的工作原理。它需要一个模式来知道如何解释文件,然后需要知道您想要将文件中的哪些属性公开为列(及其类型,可能与 Avro 类型匹配,也可能不匹配)。
CREATE EXTERNAL TABLE avro_table (
foo STRING,
bar INT
)
ROW FORMAT
SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
WITH SERDEPROPERTIES ('avro.schema.literal' = '
{
"type": "record",
"name": "example",
"namespace": "default",
"fields": [
{
"name": "foo",
"type": ["null", "string"],
"default": null
},
{
"name": "bar",
"type": ["null", "int"],
"default": null
}
]
}
')
STORED AS AVRO
LOCATION 's3://some-bucket/data/';
注意 Avro 架构如何显示为 serde 属性值(单引号)内的 JSON 文档 – 格式是可选的,但使此示例更易于阅读。
这是从 avro 文件生成 CREATE TABLE 语句的一种方法,然后可以在 AWS Athena 中运行该语句,最终由 Glue 选取。 该脚本旨在在 bash 中运行,并使用 jq 和 avro-tools 程序,这些程序可用于使用自制程序包管理器的 Mac。
# parses avro file and returns create table schema for aws/hive/spark on mac running bash with homebrew installed
function ct_avro() {
if [[ $# -lt 3 ]] || [[ $# -gt 4 ]] ; then echo -e "\n\nUsage: ct_avro <avro_file> <fully_qualified_table_name> <s3_location> [optional_partition_field and data type in double quotes]\n\nEx. : ct_avro \${fn} db.table_name s3://bucket_name_goes_here \"partition_date STRING\"\n\n"; return; fi
if [[ $(which avro-tools | grep -c 'avro-tools') -ne 1 ]]; then echo -e "\n\tPlease install avro-tools:\n\t\t\-> brew install avro-tools" >&2; return; fi # checks that avro-tools is available
jq --version > /dev/null 2>&1 ; if [ $? -ne 0 ]; then echo -e "\n\tPlease install jq:\n\t\t\-> brew install jq" >&2; return; fi # checks that jq is available
if (! [[ -f ${1} ]] ); then echo -e "\n\tPlease try again with a complete file location specified to use.\n"; return; fi; #checks that file exists at all
if [[ $(file ${1} | grep -c "Apache Avro") -ne 1 ]]; then echo -e "\n\tPlease try again with an Apache Avro to use.\nThe file specified: ${1}, is considered: $(file ${1})\n"; return; fi; #checks that the file in an Apache Avro binary file
if [[ $# -eq 4 ]] ; then local partition_clause="\nPARTITIONED BY (${4})"; else unset partition_clause; fi #deals with optional partition clause in 4th input variable
echo -e '\n\nNow preparing avro schema.'
# for listing the fields/data types ref https://stackoverflow.com/a/46131963; the put commas at end with perl, remove last one with sed, and line up columns with column -t
local field_list=$(avro-tools getschema ${1} 2>/dev/null | jq -r '.fields.[] | [.name, .type.[1]] | @tsv' | perl -pe "s/$/,/" | sed '$s/,$//' | column -t | perl -pe "s/^/ /")
# for generating the schema literal:
local schema_literal=$(avro-tools getschema ${1} 2>/dev/null | jq -c)
# variable subtitutions provide a create table statement
echo -e "\n\nCREATE EXTERNAL TABLE ${2} (\n${field_list}\n)${partition_clause}\nROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'\nWITH SERDEPROPERTIES ('avro.schema.literal'='\n${schema_literal}\n')\nSTORED AS AVRO\nLOCATION '${3}';\n"
}
Usage:
ct_avro <avro_file> <fully_qualified_table_name> <s3_location> [optional_partition_field and data type in double quotes]
Example:
ct_avro 8987.avro db.table_name s3://bucket_name_goes_here "partition_date STRING"
手动执行似乎是使其发挥作用的方法。 下面是一些直接从文字 avro 模式生成 Athena 模式的代码。它与
avro-python3
上的 python3.7
配合使用。它取自这里:https://github.com/dataqube-GmbH/avro2athena(我是回购协议的所有者)
from avro.schema import Parse, RecordSchema, PrimitiveSchema, ArraySchema, MapSchema, EnumSchema, UnionSchema, FixedSchema
def create_athena_schema_from_avro(avro_schema_literal: str) -> str:
avro_schema: RecordSchema = Parse(avro_schema_literal)
column_schemas = []
for field in avro_schema.fields:
column_name = field.name.lower()
column_type = create_athena_column_schema(field.type)
column_schemas.append(f"`{column_name}` {column_type}")
return ', '.join(column_schemas)
def create_athena_column_schema(avro_schema) -> str:
if type(avro_schema) == PrimitiveSchema:
return rename_type_names(avro_schema.type)
elif type(avro_schema) == ArraySchema:
items_type = create_athena_column_schema(avro_schema.items)
return f'array<{items_type}>'
elif type(avro_schema) == MapSchema:
values_type = avro_schema.values.type
return f'map<string,{values_type}>'
elif type(avro_schema) == RecordSchema:
field_schemas = []
for field in avro_schema.fields:
field_name = field.name.lower()
field_type = create_athena_column_schema(field.type)
field_schemas.append(f'{field_name}:{field_type}')
field_schema_concatenated = ','.join(field_schemas)
return f'struct<{field_schema_concatenated}>'
elif type(avro_schema) == UnionSchema:
# pick the first schema which is not null
union_schemas_not_null = [s for s in avro_schema.schemas if s.type != 'null']
if len(union_schemas_not_null) > 0:
return create_athena_column_schema(union_schemas_not_null[0])
else:
raise Exception('union schemas contains only null schema')
elif type(avro_schema) in [EnumSchema, FixedSchema]:
return 'string'
else:
raise Exception(f'unknown avro schema type {avro_schema.type}')
def rename_type_names(typ: str) -> str:
if typ in ['long']:
return 'bigint'
else:
return typ