Azure Synapse 可在现有 csv 文件上将数据从 cosmos 更新、插入或删除到 Data Lake,并通过附加维护事务数据

问题描述 投票:0回答:1

下面是我通过 azure synapse 数据流管道从 cosmos 推送到数据湖的 Json 对象。现在,如果以下数据发生任何更改,则必须覆盖数据湖中现有 csv 文件中的数据,如果没有事务文件,则应创建新的事务文件并继续附加更改。 我想创建管道来解决上述查询,因为我是这个蔚蓝云平台的新手

示例 Json-

    {
    "timeFrames": [
        {
            "yearGroup": "Reception",
            "term": "Summer",
            "scoreIndeces": {
                "66": 0.2,
                "67": 0.3
            }
        },
        {
            "yearGroup": "Year 1",
            "term": "Spring",
            "scoreIndeces": {
                "64": 0.6,
                "65": 0.6
            }
        }
    ],
    "tableType": 1,
    "id": "f82",
    "productId": "4b",
    "productName": "New PUMA"
}

交易

主要

azure azure-pipelines azure-synapse
1个回答
0
投票

为了实现您的要求,您需要首先将数据从主文件复制到横断面文件,并向其中添加附加列并将自定义值设置为插入,以便我们获得初始横断面。

enter image description here

然后,每当新文件中发生更改时,使用数据流比较这些文件,如下所示:

  • 获取数据流活动并向其添加新数据流。首先使用横断面文件创建两个源,第二个使用主文件创建两个源。
  • enter image description here
  • 第二个源进行存在转换后,添加左流作为源 2,右流作为源 1,存在类型作为不存在,存在条件因为源中的所有列都相等。

enter image description here

  • 这将返回一个更新的行,采用 dericed 列转换并创建与横断面文件同名的列
    Transaction Type
    并为其分配 Update 值。

enter image description here

  • 然后在第一个源之后进行联合转换并添加派生列转换和源 1 作为传入流,如下所示:

enter image description here

  • 这将返回您所需的输出并将其存储在横断面文件中。

enter image description here

注意:- 当 Synapse 管道将数据写入 ADLS gen 2 文件时,它只会覆盖数据,不会追加数据。所以处理你的情况更好的方法是像 SQL 这样的 RDMS 数据库

© www.soinside.com 2019 - 2024. All rights reserved.