将表数据转换为json变体列

问题描述 投票:0回答:1

我们有一个通用的事务表,其中每天通过标准管道加载历史数据,然后使用流、任务、过程将数据处理(批处理)到这个最终表上。现在,该公司改变了加载数据的方式,转而使用 kafka connect 创建一个原始表,读取发布到主题中的 avro 消息。

由 kafka 连接器创建的原始表自动具有变体列,特别是以下形式之一:

[
   {
      "transactionDetailId": 112233445566,
      "brandCode": "31137209",
      "transactionOrigin": "Origin 1",
      "installmentCreditTransactionId": 1,
      ...
   },
   {
      "transactionDetailId": 112233445566,
      "brandCode": "31137209",
      "transactionOrigin": "Origin 1",
      "installmentCreditTransactionId": 2,
   },
   ...
 ]

kafka connect 创建的这个原始表仅包含自 2024 年 10 月 1 日(yyyy-mm-dd)以来的新数据,并且历史数据已加载到已经提到的最终表中(已处理的数据)。

我面临的挑战是将这个历史事务表数据转换成这个变体列。对于其他表,我已经能够使用雪花的函数 OBJECT_CONSTRUCT 将此事务数据转换为变体列,但对于此表,特别是此列,事情是

{
          "transactionDetailId": 112233445566,
          "brandCode": "31137209",
          "transactionOrigin": "Origin 1",
          "installmentCreditTransactionId": 1,
          ...
       }

是决赛桌中的一行并且

{
          "transactionDetailId": 112233445566,
          "brandCode": "31137209",
          "transactionOrigin": "Origin 1",
          "installmentCreditTransactionId": 2,
       },

是另一行。

分割此变体列的属性是“installmentCreditTransactionId”,因此在这种情况下,这两个对象会转换为两行。例如,如果我转到事务表,我会看到同样的情况:

transactionDetailId   brandCode   transactionOrigin   installmentCreditTransactionId
998877665544          55555555    Origin 2            1
998877665544          55555555    Origin 2            2
998877665544          55555555    Origin 2            3

应该转化为:

   [
       {
          "transactionDetailId": 998877665544,
          "brandCode": "55555555",
          "transactionOrigin": "Origin 2",
          "installmentCreditTransactionId": 1,
          ...
       },
       {
          "transactionDetailId": 998877665544,
          "brandCode": "55555555",
          "transactionOrigin": "Origin 2",
          "installmentCreditTransactionId": 2,
       },
       {
          "transactionDetailId": 998877665544,
          "brandCode": "55555555",
          "transactionOrigin": "Origin 2",
          "installmentCreditTransactionId": 3,
       },
       ...
    ]

我不明白只有 OBJECT_CONSTRUCT 在这里会有什么帮助,因为每条消息的 installmentCreditTransactionId id 是随机的。

对于如何解决这个问题有什么想法吗?使用 OBJECT_AGGR?

编辑1:这是我到目前为止所拥有的:

SELECT 
    OBJECT_AGG(tab1.id_1::VARCHAR,
        OBJECT_CONSTRUCT(
            'col1', null,
            'col2', tab1.col2::VARCHAR,
            'id_1', tab1.id_1::VARCHAR, -- one of the id's
            'id_2', tab1.id_2::VARCHAR, -- one of the id's
            'col3', COALESCE(tab1.col3::VARCHAR, '0'),
            'id_3', tab1.id_3::VARCHAR, -- one of the id's
            'col4', tab1.col4::VARCHAR,
            'id_4', tab1.id_4::VARCHAR, -- one of the id's
            'col5', CURRENT_TIMESTAMP()::VARCHAR,
            'col6', null) OVER (PARTITION BY tab1.id_1::VARCHAR, tab1.id_2::VARCHAR, tab1.id_3::VARCHAR, tab1.id_4::VARCHAR) AS result
FROM 
    db.schema.table1 tab1 
LEFT JOIN
    db.schema.table2 tab2
ON 
    ...
LEFT JOIN
    db.schema.table3 tab3
ON
    ...;

但没有返回我需要的东西,正在返回:

第一行:

{
  "1": {
    "col1": "28031866",
    "col2": "524",
    "id_1": "1",
    "id_2": "46554737145",
    "col3": 0,
    "id_3": "39866",
    "col_4": "100",
    "id_4": "969424002",
    "col_5": "2025-03-13",
    "col_6": "2024-10-11"
  }
}

第二行:

{
  "2": {
    "col1": "28031866",
    "col2": "522",
    "id_1": "2",
    "id_2": "46554737145",
    "col3": 0,
    "id_3": "39866",
    "col_4": "99",
    "id_4": "969424002",
    "col_5": "2024-11-13",
    "col_6": "2024-10-11"
  }
}

第三行:

{
  "3": {
    "col1": "28031866",
    "col2": "522",
    "id_1": "3",
    "id_2": "46554737145",
    "col3": 0,
    "id_3": "39866",
    "col_4": "99",
    "id_4": "969424002",
    "col_5": "2025-01-15",
    "col_6": "2024-10-11"
  }
}

我需要的是这样的:

[
   {
      "col1": "28031866",
      "col2": "522",
      "id_1": "1",
      "id_2": "46554737145",
      "col3": 0,
      "id_3": "39866",
      "col_4": "99",
      "id_4": "969424002",
      "col_5": "2025-01-15",
      "col_6": "2024-10-11"
   },
   {
      "col1": "28031866",
      "col2": "522",
      "id_1": "2",
      "id_2": "46554737145",
      "col3": 0,
      "id_3": "39866",
      "col_4": "99",
      "id_4": "969424002",
      "col_5": "2025-01-15",
      "col_6": "2024-10-11"
   },
   {
      "col1": "28031866",
      "col2": "522",
      "id_1": "3",
      "id_2": "46554737145",
      "col3": 0,
      "id_3": "39866",
      "col_4": "99",
      "id_4": "969424002",
      "col_5": "2025-01-15",
      "col_6": "2024-10-11"
   }
 ]

排成一排。看看如何只有 id_1 才是应该将所有内容聚合到一行中的那个。但我不明白如何实现这一点

snowflake-cloud-data-platform
1个回答
0
投票

因此,对于一些虚假数据,我希望您在数组中构建所需的对象,例如:

with fake_data(transactionDetailId, brandCode, transactionOrigin, installmentCreditTransactionId) as (
    select * from values
    (998877665544, 55555555, 'Origin 2', 1),
    (998877665544, 55555555, 'Origin 2', 2),
    (998877665544, 55555555, 'Origin 2', 3)
)
select 
    *,
    OBJECT_CONSTRUCT( 
        'transactionDetailId', transactionDetailId,
        'brandCode', brandCode,
        'transactionOrigin', transactionOrigin,
        'installmentCreditTransactionId', installmentCreditTransactionId) as obj
from fake_data
where true /*filters you care about here */

然后聚合这些

obj
,结果发现你想要 ARRAY_AGG

with fake_data(transactionDetailId, brandCode, transactionOrigin, installmentCreditTransactionId) as (
    select * from values
    (998877665544, 55555555, 'Origin 2', 1),
    (998877665544, 55555555, 'Origin 2', 2),
    (998877665544, 55555555, 'Origin 2', 3)
)
select 
    transactionDetailId,
    ARRAY_AGG(
        OBJECT_CONSTRUCT( 
            'transactionDetailId', transactionDetailId,
            'brandCode', brandCode,
            'transactionOrigin', transactionOrigin,
            'installmentCreditTransactionId', installmentCreditTransactionId)
        ) as result
from fake_data
where true /*filters you care about here */
group by 1 
order by 1;

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.