如何使用 PyFlink/Flink 使用 Table API 写入 amazon s3 上的 Apache Iceberg?

问题描述 投票:0回答:1
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = TableEnvironment.create(environment_settings=settings)


catalog_name = "glue_catalog"
staging_database_name = "source"
dest_database_name = "dest"
staging_table = "source_iceberg_partitioned_hourly"
dest_table = "new_iceberg_table"
warehouse_path = "s3:path....."

t_env.execute_sql(
    f"""
    CREATE CATALOG {catalog_name} WITH (
        'type'='iceberg',
        'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
        'warehouse'='{warehouse_path}',
        'aws.region'='eu-central-1',
        'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'
    )
    """
)

t_env.execute_sql(f"USE CATALOG {catalog_name};")
t_env.execute_sql(f"CREATE DATABASE IF NOT EXISTS {staging_database_name}")
t_env.execute_sql(f"CREATE DATABASE IF NOT EXISTS {dest_database_name}")
t_env.execute_sql(f"USE {dest_database_name};")

staging_table_full = f"{catalog_name}.{staging_database_name}.{staging_table}"
dest_table_full = f"{catalog_name}.{dest_database_name}.{dest_table}"`


create_table_statement = f"""
CREATE TABLE IF NOT EXISTS {dest_table} (
"""
Columns....

)
PARTITIONED BY (collector_minute)
WITH (
    'type'='iceberg',
    'format-version'='2',
    'write.format.default'='parquet',
    'write.metadata.delete-after-commit.enabled'='true',
    'write.distribution-mode'='hash',
    'path'='{warehouse_path}'
)
"""
t_env.execute_sql(create_table_statement)

latest_snapshot_id = "12345678901234567890"

t_env.execute_sql(
    f"""
    INSERT INTO {query_table_full}
    SELECT *, 
        MINUTE(collector_tstamp) AS collector_minute
    FROM {staging_table_full} /*+ OPTIONS('streaming' = 'true', 'monitor-interval' = '10s', 'start-snapshot-id' = '{latest_snapshot_id}') */
    """
)

我使用图像flink:1.16.1-scala_2.12 用这些罐子: 捆绑包-2.20.18.jar flink-shaded-hadoop-2-uber-2.8.3-10.0.jar flink-sql-connector-hive-2.3.9_2.12-1.16.1.jar 番石榴-30.1-jre.jar hadoop-common-2.8.3.jar Iceberg-flink-runtime-1.16-1.3.1.jar

我基本上的目标是从一个冰山表中读取数据,并在新数据出现后立即写入第二个冰山表。

通常我会使用 Spark,但我想尝试一下 Flink,如果数据写入速度更快

所以我实际上在做的是

jobmanager.sh
开始,
taskmanager.sh start
在开头 等待几秒钟,然后执行我的 pyflink.py

它能够实际创建它想要写入的 Iceberg 表。 在日志中我看到它还能够扫描源冰山表并打印出源表的最新manifest.json的名称。

遗憾的是,此时什么也没有发生,它没有将数据写入表中,所以我想知道这个插入是否可能是错误的。

CPU 和 Mem 实际上始终显示一些活动(CPU 50%)

我只能找到将插入与示例行数据一起使用的示例。所以我不太确定插入是否可以与选择一起发生。

不要认为这是由于资源的原因,因为我在我的 m1 上运行它并给了他我所拥有的一切:D

apache-flink flink-streaming flink-sql pyflink apache-iceberg
1个回答
© www.soinside.com 2019 - 2024. All rights reserved.