Spark SQL 中的 CROSS APPLY 替代方案

问题描述 投票:0回答:1

我正在将 SQL 存储过程从 SQL Server 迁移到 Spark SQL。 SP中的语句之一使用了CROSS APPLY,但spark没有CROSS APPLY。我研究了一下,发现可以用INNER JOIN来代替CROSS APPLY。下面是 SQL 服务器代码:

SELECT
 H.BillingDocument
,H.BillingDocumentItem
,H.BillingDate
 ,H.Soldto
,H.SalesOrganization
,H.SalesDocument
,M.MaterialPricingGroup
,MaterialPricingGroupText as mpg_description
--,COUNT(*) as item_count
,H.BillingQuantity
,X.AttachPrimaryCategory
-- SELECT * FROM 
INTO
    #TEMP_FO_ATTACH
FROM sales.dbo.bi__billing_documents H

INNER JOIN masterdata.dbo.bi_materials M ON M.material = H.material and M.SalesOrganization = H.SalesOrganization
CROSS APPLY (
    SELECT TOP 1 X.AttachPrimaryCategory
    FROM -- SELECT * FROM
        marketing.dbo.attach_secondary_cat X 
INNER JOIN #TEMP_PrimaryForFO P ON 
        P.MaterialPricingGroup <> M.MaterialPricingGroup 
        AND P.PrimaryCategory = X.AttachPrimaryCategory
        AND P.BillingDate between H.BillingDate and H.BillingDate + @FollowOnDays 
        and P.Soldto = H.Soldto and P.SalesOrganization = H.SalesOrganization
    WHERE  X.ValidAttachmentMPG = M.MaterialPricingGroup
    ORDER BY P.BillingDate, P.PrimaryCategory
        ) X

WHERE 
    (LEFT(RIGHT(H.Plant,2),1) not in ('6','8') or H.Plant between '660' and '689') and H.billingtype in ('ZF2','ZD2')
    AND H.BillingDate between @RunFromDate and @AttachEnd
    AND NOT EXISTS ( SELECT TOP 1 1 FROM #TEMP_POS_ATTACH Z WHERE Z.BillingDocumentItem = H.BillingDocumentItem and Z.BillingDocument = H.BillingDocument)

我尝试在 Spark 中重写相同的内容,下面是我的代码,但 SQL Server 和 Spark 之间的结果不匹配,即使所有表和临时视图在两者中都具有相同的数据。

火花代码:

fo_attach_df = spark.sql("""

SELECT DISTINCT
     H.BillingDocument
    ,H.BillingDocumentItem
    ,H.BillingDate
     ,H.Soldto
    ,H.SalesOrganization
    ,H.SalesDocument
    ,M.MaterialPricingGroup
    ,M.MaterialPricingGroupText as mpg_description
    ,H.BillingQuantity
    ,X.AttachPrimaryCategory

FROM sales.bi__billing_documents H
INNER JOIN masterdata.bi__materials M ON M.material = H.material and M.SalesOrganization = H.SalesOrganization
INNER JOIN (
            SELECT * FROM(
              SELECT DISTINCT X.AttachPrimaryCategory,
              X.ValidAttachmentMPG,
              P.SalesOrganization,
              p.SOLDTO,
              RANK() OVER(PARTITION BY p.SALESORGANIZATION ORDER BY P.BillingDate, P.PrimaryCategory) AS RNK
              FROM marketing.attach_secondary_cats X
                INNER JOIN primary_for_fo P ON 
                 P.PrimaryCategory = X.AttachPrimaryCategory
                INNER JOIN sales.bi__billing_documents H
                ON P.BillingDate between H.BillingDate and H.BillingDate + int({}) 
                and P.Soldto = H.Soldto and P.SalesOrganization = H.SalesOrganization
                INNER JOIN masterdata.bi__materials M
                on P.MaterialPricingGroup <> M.MaterialPricingGroup
                
                AND M.material = H.material and M.SalesOrganization = H.SalesOrganization
            )TMP
            WHERE RNK = 1
        ) X
        ON  X.SalesOrganization = h.SalesOrganization
        AND X.SOLDTO = H.SOLDTO
        AND X.ValidAttachmentMPG = M.MaterialPricingGroup
        LEFT JOIN POS_ATTACH Z 
        ON Z.BillingDocumentItem = H.BillingDocumentItem 
        and Z.BillingDocument = H.BillingDocument
        WHERE 
            (LEFT(RIGHT(H.Plant,2),1) not in ('6','8') or H.Plant between '660' and '689') and H.billingtype in ('ZF2','ZD2')
            AND H.BillingDate between  CAST('{}' AS DATE) and   CAST('{}' AS DATE)
            AND z.BILLINGDOCUMENT IS NULL AND Z.BillingDocumentItem IS NULL

""".format(FollowOnDays, RunFromDate, attach_end))

我花了 3-4 个小时来弄清楚如何纠正我的代码。请帮助我的代码有什么问题,如果可能的话请帮助提供等效的代码。感谢任何帮助。

sql sql-server apache-spark apache-spark-sql bigdata
1个回答
0
投票

在 Microsoft SQL 中,

CROSS APPLY
可以访问左表中的当前数据行,并运行与左表中的数据行相关的子查询。 在极端情况下(当优化器无法做得更好时),这会导致对左侧表中的每一行执行右侧查询。

Spark SQL 等效项是

LATERAL
LATERAL
连接。 我还没有深入研究所有细节,而且您的具体代码足够复杂,我无法花时间帮助将其付诸实践,但
LATERAL
是您需要调查的。

尝试

作为有用的起点。

© www.soinside.com 2019 - 2024. All rights reserved.