settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = TableEnvironment.create(environment_settings=settings)
catalog_name = "glue_catalog"
staging_database_name = "source"
dest_database_name = "dest"
staging_table = "source_iceberg_partitioned_hourly"
dest_table = "new_iceberg_table"
warehouse_path = "s3:path....."
t_env.execute_sql(
f"""
CREATE CATALOG {catalog_name} WITH (
'type'='iceberg',
'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
'warehouse'='{warehouse_path}',
'aws.region'='eu-central-1',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'
)
"""
)
t_env.execute_sql(f"USE CATALOG {catalog_name};")
t_env.execute_sql(f"CREATE DATABASE IF NOT EXISTS {staging_database_name}")
t_env.execute_sql(f"CREATE DATABASE IF NOT EXISTS {dest_database_name}")
t_env.execute_sql(f"USE {dest_database_name};")
staging_table_full = f"{catalog_name}.{staging_database_name}.{staging_table}"
dest_table_full = f"{catalog_name}.{dest_database_name}.{dest_table}"`
create_table_statement = f"""
CREATE TABLE IF NOT EXISTS {dest_table} (
"""
Columns....
)
PARTITIONED BY (collector_minute)
WITH (
'type'='iceberg',
'format-version'='2',
'write.format.default'='parquet',
'write.metadata.delete-after-commit.enabled'='true',
'write.distribution-mode'='hash',
'path'='{warehouse_path}'
)
"""
t_env.execute_sql(create_table_statement)
latest_snapshot_id = "12345678901234567890"
t_env.execute_sql(
f"""
INSERT INTO {query_table_full}
SELECT *,
MINUTE(collector_tstamp) AS collector_minute
FROM {staging_table_full} /*+ OPTIONS('streaming' = 'true', 'monitor-interval' = '10s', 'start-snapshot-id' = '{latest_snapshot_id}') */
"""
)
我使用图像flink:1.16.1-scala_2.12 用这些罐子: 捆绑包-2.20.18.jar flink-shaded-hadoop-2-uber-2.8.3-10.0.jar flink-sql-connector-hive-2.3.9_2.12-1.16.1.jar 番石榴-30.1-jre.jar hadoop-common-2.8.3.jar Iceberg-flink-runtime-1.16-1.3.1.jar
我基本上的目标是从一个冰山表中读取数据,并在新数据出现后立即写入第二个冰山表。
通常我会使用 Spark,但我想尝试一下 Flink,如果数据写入速度更快
所以我实际上在做的是
jobmanager.sh
开始,taskmanager.sh start
在开头
等待几秒钟,然后执行我的 pyflink.py
它能够实际创建它想要写入的 Iceberg 表。 在日志中我看到它还能够扫描源冰山表并打印出源表的最新manifest.json的名称。
遗憾的是,此时什么也没有发生,它没有将数据写入表中,所以我想知道这个插入是否可能是错误的。
CPU 和 Mem 实际上始终显示一些活动(CPU 50%)
我只能找到将插入与示例行数据一起使用的示例。所以我不太确定插入是否可以与选择一起发生。
不要认为这是由于资源的原因,因为我在我的 m1 上运行它并给了他我所拥有的一切:D
我遇到了类似的问题,因此我启用了检查点并将最大检查点超时设置为 60 秒。数据开始插入到我的 Iceberg 表中。