如何在 AWS Glue/Athena 上使用 AVRO 格式

问题描述 投票:0回答:3

我在 Kafka 中有一些主题正在将 AVRO 文件写入 S3 存储桶,我想使用 AWS Athena 对存储桶执行一些查询。

我正在尝试创建一个表,但 AWS Glue 爬网程序运行但不添加我的表(如果我将文件类型更改为 JSON,它会起作用)。我尝试从 Athena 控制台创建一个表,但它不显示对 AVRO 文件的支持。

关于如何让它发挥作用有什么想法吗?

amazon-web-services avro amazon-athena aws-glue
3个回答
2
投票

我建议手动完成,而不是通过胶水。胶水仅适用于最基本的情况,不幸的是,这超出了这一范围。

您可以在此处找到有关如何创建 Avro 表的文档:https://docs.aws.amazon.com/athena/latest/ug/avro.html

Avro 表的注意事项是您需要指定表列和 Avro 架构。这可能看起来很奇怪和多余,但这就是 Athena/Presto 的工作原理。它需要一个模式来知道如何解释文件,然后需要知道您想要将文件中的哪些属性公开为列(及其类型,可能与 Avro 类型匹配,也可能不匹配)。

CREATE EXTERNAL TABLE avro_table (
   foo STRING,
   bar INT
)
ROW FORMAT
SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
WITH SERDEPROPERTIES ('avro.schema.literal' = '
{
  "type": "record",
  "name": "example",
  "namespace": "default",
  "fields": [
    {
      "name": "foo",
      "type": ["null", "string"],
      "default": null
    },
    {
      "name": "bar",
      "type": ["null", "int"],
      "default": null
    }
  ]
}
')
STORED AS AVRO
LOCATION 's3://some-bucket/data/';

注意 Avro 架构如何显示为 serde 属性值(单引号)内的 JSON 文档 – 格式是可选的,但使此示例更易于阅读。


0
投票

这是从 avro 文件生成 CREATE TABLE 语句的一种方法,然后可以在 AWS Athena 中运行该语句,最终由 Glue 选取。 该脚本旨在在 bash 中运行,并使用 jqavro-tools 程序,这些程序可用于使用自制程序包管理器的 Mac。

# parses avro file and returns create table schema for aws/hive/spark on mac running bash with homebrew installed
function ct_avro() {
  if [[ $# -lt 3 ]] || [[ $# -gt 4 ]] ; then echo -e "\n\nUsage: ct_avro <avro_file> <fully_qualified_table_name> <s3_location> [optional_partition_field and data type in double quotes]\n\nEx.  : ct_avro \${fn} db.table_name s3://bucket_name_goes_here \"partition_date STRING\"\n\n"; return; fi
  if [[ $(which avro-tools | grep -c 'avro-tools') -ne 1 ]];  then  echo -e "\n\tPlease install avro-tools:\n\t\t\->   brew install avro-tools" >&2; return; fi  # checks that avro-tools is available
  jq --version > /dev/null 2>&1 ​; if [ $? -ne 0 ]; then  echo -e "\n\tPlease install jq:\n\t\t\->   brew install jq" >&2; return; fi  # checks that jq is available
  if (! [[ -f ${1} ]] ); then echo -e "\n\tPlease try again with a complete file location specified to use.\n"; return; fi;  #checks that file exists at all
  if [[ $(file ${1} | grep -c "Apache Avro") -ne 1 ]]; then echo -e "\n\tPlease try again with an Apache Avro to use.\nThe file specified: ${1}, is considered:  $(file ${1})\n"; return; fi;  #checks that the file in an Apache Avro binary file
  if [[ $# -eq 4 ]] ; then local partition_clause="\nPARTITIONED BY (${4})"; else unset partition_clause; fi  #deals with optional partition clause in 4th input variable

  echo -e '\n\nNow preparing avro schema.'

  # for listing the fields/data types ref https://stackoverflow.com/a/46131963; the put commas at end with perl, remove last one with sed, and line up columns with column -t
  local field_list=$(avro-tools getschema ${1} 2>/dev/null | jq -r '.fields.[] | [.name, .type.[1]] | @tsv' | perl -pe "s/$/,/" | sed '$s/,$//' | column -t | perl -pe "s/^/  /")

  # for generating the schema literal:
  local schema_literal=$(avro-tools getschema ${1}  2>/dev/null | jq -c)

  # variable subtitutions provide a create table statement
  echo -e "\n\nCREATE EXTERNAL TABLE ${2} (\n${field_list}\n)${partition_clause}\nROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'\nWITH SERDEPROPERTIES ('avro.schema.literal'='\n${schema_literal}\n')\nSTORED AS AVRO\nLOCATION '${3}';\n"
}


Usage: 
ct_avro <avro_file> <fully_qualified_table_name> <s3_location> [optional_partition_field and data type in double quotes]

Example: 
ct_avro 8987.avro db.table_name s3://bucket_name_goes_here "partition_date STRING"


-1
投票

手动执行似乎是使其发挥作用的方法。 下面是一些直接从文字 avro 模式生成 Athena 模式的代码。它与

avro-python3
上的
python3.7
配合使用。它取自这里:https://github.com/dataqube-GmbH/avro2athena(我是回购协议的所有者)

from avro.schema import Parse, RecordSchema, PrimitiveSchema, ArraySchema, MapSchema, EnumSchema, UnionSchema, FixedSchema


def create_athena_schema_from_avro(avro_schema_literal: str) -> str:
    avro_schema: RecordSchema = Parse(avro_schema_literal)

    column_schemas = []
    for field in avro_schema.fields:
        column_name = field.name.lower()
        column_type = create_athena_column_schema(field.type)
        column_schemas.append(f"`{column_name}` {column_type}")

    return ', '.join(column_schemas)


def create_athena_column_schema(avro_schema) -> str:
    if type(avro_schema) == PrimitiveSchema:
        return rename_type_names(avro_schema.type)

    elif type(avro_schema) == ArraySchema:
        items_type = create_athena_column_schema(avro_schema.items)
        return f'array<{items_type}>'

    elif type(avro_schema) == MapSchema:
        values_type = avro_schema.values.type
        return f'map<string,{values_type}>'

    elif type(avro_schema) == RecordSchema:
        field_schemas = []
        for field in avro_schema.fields:
            field_name = field.name.lower()
            field_type = create_athena_column_schema(field.type)
            field_schemas.append(f'{field_name}:{field_type}')

        field_schema_concatenated = ','.join(field_schemas)
        return f'struct<{field_schema_concatenated}>'

    elif type(avro_schema) == UnionSchema:
        # pick the first schema which is not null
        union_schemas_not_null = [s for s in avro_schema.schemas if s.type != 'null']
        if len(union_schemas_not_null) > 0:
            return create_athena_column_schema(union_schemas_not_null[0])
        else:
            raise Exception('union schemas contains only null schema')

    elif type(avro_schema) in [EnumSchema, FixedSchema]:
        return 'string'

    else:
        raise Exception(f'unknown avro schema type {avro_schema.type}')


def rename_type_names(typ: str) -> str:
    if typ in ['long']:
        return 'bigint'
    else:
        return typ
© www.soinside.com 2019 - 2024. All rights reserved.