AWS Glue 在顺序运行时输出空文件

问题描述 投票:0回答:3

我正在尝试自动化 ETL 管道,将数据从 AWS RDS MYSQL 输出到 AWS S3。我目前正在使用 AWS Glue 来完成这项工作。当我从 RDS 到 S3 进行初始加载时。它捕获文件中的所有数据,这正是我想要的。但是,当我将新数据添加到 MYSQL 数据库并再次运行 Glue 作业时。我得到一个空文件而不是添加的行。任何帮助将不胜感激。

amazon-web-services etl aws-glue
3个回答
2
投票

JDBC 源的书签规则位于此处。对于 JDBC 源,需要记住的重要一点是值必须按递增或递减顺序排列,并且 Glue 仅处理来自上一个检查点的新数据。

通常,自动生成的序列号或数据时间用作书签的密钥


1
投票

对于任何仍在为此苦苦挣扎的人(这让我发疯,因为我认为我的 Spark 代码是错误的),请禁用工作详细信息中的书签。


0
投票

书签在这里肯定有问题,但奇怪的是,即使需要显式启用它们,问题中也根本没有提及它们。 如果书签到位,Glue 将尽力找出关系源中发生的更改,但如果不提供帮助,它将受到限制

For JDBC sources, the following rules apply:

1) For each table, AWS Glue uses one or more columns as bookmark keys to determine new and processed data. The bookmark keys combine to form a single compound key.

2) AWS Glue by default uses the primary key as the bookmark key, provided that it is sequentially increasing or decreasing (with no gaps).

3) You can specify the columns to use as bookmark keys in your AWS Glue script. For more information about using Job bookmarks in AWS Glue scripts, see Using job bookmarks.

4) AWS Glue doesn't support using columns with case-sensitive names as job bookmark keys.

您可以按照这篇文章中的建议指定您自己的查询键。请注意,对于直接 JDBC 连接(不通过数据目录),您必须在连接选项中指定书签参数,如下所示:

RelationalDB_node1710411919513 = glueContext.create_dynamic_frame.from_options(
connection_type = "postgresql",
connection_options = {
    "useConnectionProperties": "true",
    "dbtable": "items",
    "connectionName": "Aurora connection",
    "jobBookmarkKeys": ["item_id","modified"],
    "jobBookmarkKeysSortOrder": "asc"
},
transformation_ctx = "RelationalDB_node1710411919513"
)

请注意,我想通过组合键 -

item_id
modified
时间戳进行查询。如果我不添加
modified
,Glue 只会查询新项目。 但是,由于我还添加了
modified
,只要我每次更新数据库中的记录时更改
modified
,它就能够获取更新。

请注意,如果数据库没有更改并且使用了书签,Glue 在每次运行时仍会生成一个空文件。 请注意,如果使用查询/书签参数不起作用,您可以随时检查 CloudWatch 的日志。只要启用了 CloudWatch 日志,就会记录查询。这是我的例子:

24/03/17 14:35:54 INFO JDBCJobBookmarkUtil$: JDBC Bookmark: querying the last row in db:  (select item_id, modified from items order by item_id DESC, modified DESC LIMIT 1) as items

24/03/17 14:47:53 INFO JDBCRDD: Querying jdbc source with sql: SELECT * FROM (select item_id, modified from items order by item_id DESC, modified DESC LIMIT 1) as items

24/03/17 14:47:58 INFO JDBCRDD: Querying jdbc source with sql: SELECT * FROM (select * from items WHERE ((item_id > '152000022') or (item_id = '152000022' and modified > '2023-06-26 16:42:03.13')) and ((item_id < '152000022') or (item_id = '152000022' and modified <= '2023-06-26 16:42:03.13'))) as items 
  1. 第一个日志表示Glue使用什么信息进行书签(表中最新记录);
  2. 第二个由 Spark/Glue 用于推断模式;
  3. 最后一个查询实际上是在获取数据。

请注意,我已经运行了该作业两次,数据库中没有任何更改,但在现实场景中,数据库中一直发生更改,获取查询的语义将是:

((item_id > 'PREV_BOOKMARK') or (item_id = 'PREV_BOOKMARK' and modified > 'PREV_BOOKMARK'))
and ((item_id < 'NEW_BOOKMARK') or (item_id = 'NEW_BOOKMARK' and modified <= 'NEW_BOOKMARK'))
© www.soinside.com 2019 - 2024. All rights reserved.