我正在将 SQL 存储过程从 SQL Server 迁移到 Spark SQL。 SP中的语句之一使用了CROSS APPLY,但spark没有CROSS APPLY。我研究了一下,发现可以用INNER JOIN来代替CROSS APPLY。下面是 SQL 服务器代码:
SELECT
H.BillingDocument
,H.BillingDocumentItem
,H.BillingDate
,H.Soldto
,H.SalesOrganization
,H.SalesDocument
,M.MaterialPricingGroup
,MaterialPricingGroupText as mpg_description
--,COUNT(*) as item_count
,H.BillingQuantity
,X.AttachPrimaryCategory
-- SELECT * FROM
INTO
#TEMP_FO_ATTACH
FROM sales.dbo.bi__billing_documents H
INNER JOIN masterdata.dbo.bi_materials M ON M.material = H.material and M.SalesOrganization = H.SalesOrganization
CROSS APPLY (
SELECT TOP 1 X.AttachPrimaryCategory
FROM -- SELECT * FROM
marketing.dbo.attach_secondary_cat X
INNER JOIN #TEMP_PrimaryForFO P ON
P.MaterialPricingGroup <> M.MaterialPricingGroup
AND P.PrimaryCategory = X.AttachPrimaryCategory
AND P.BillingDate between H.BillingDate and H.BillingDate + @FollowOnDays
and P.Soldto = H.Soldto and P.SalesOrganization = H.SalesOrganization
WHERE X.ValidAttachmentMPG = M.MaterialPricingGroup
ORDER BY P.BillingDate, P.PrimaryCategory
) X
WHERE
(LEFT(RIGHT(H.Plant,2),1) not in ('6','8') or H.Plant between '660' and '689') and H.billingtype in ('ZF2','ZD2')
AND H.BillingDate between @RunFromDate and @AttachEnd
AND NOT EXISTS ( SELECT TOP 1 1 FROM #TEMP_POS_ATTACH Z WHERE Z.BillingDocumentItem = H.BillingDocumentItem and Z.BillingDocument = H.BillingDocument)
我尝试在 Spark 中重写相同的内容,下面是我的代码,但 SQL Server 和 Spark 之间的结果不匹配,即使所有表和临时视图在两者中都具有相同的数据。
火花代码:
fo_attach_df = spark.sql("""
SELECT DISTINCT
H.BillingDocument
,H.BillingDocumentItem
,H.BillingDate
,H.Soldto
,H.SalesOrganization
,H.SalesDocument
,M.MaterialPricingGroup
,M.MaterialPricingGroupText as mpg_description
,H.BillingQuantity
,X.AttachPrimaryCategory
FROM sales.bi__billing_documents H
INNER JOIN masterdata.bi__materials M ON M.material = H.material and M.SalesOrganization = H.SalesOrganization
INNER JOIN (
SELECT * FROM(
SELECT DISTINCT X.AttachPrimaryCategory,
X.ValidAttachmentMPG,
P.SalesOrganization,
p.SOLDTO,
RANK() OVER(PARTITION BY p.SALESORGANIZATION ORDER BY P.BillingDate, P.PrimaryCategory) AS RNK
FROM marketing.attach_secondary_cats X
INNER JOIN primary_for_fo P ON
P.PrimaryCategory = X.AttachPrimaryCategory
INNER JOIN sales.bi__billing_documents H
ON P.BillingDate between H.BillingDate and H.BillingDate + int({})
and P.Soldto = H.Soldto and P.SalesOrganization = H.SalesOrganization
INNER JOIN masterdata.bi__materials M
on P.MaterialPricingGroup <> M.MaterialPricingGroup
AND M.material = H.material and M.SalesOrganization = H.SalesOrganization
)TMP
WHERE RNK = 1
) X
ON X.SalesOrganization = h.SalesOrganization
AND X.SOLDTO = H.SOLDTO
AND X.ValidAttachmentMPG = M.MaterialPricingGroup
LEFT JOIN POS_ATTACH Z
ON Z.BillingDocumentItem = H.BillingDocumentItem
and Z.BillingDocument = H.BillingDocument
WHERE
(LEFT(RIGHT(H.Plant,2),1) not in ('6','8') or H.Plant between '660' and '689') and H.billingtype in ('ZF2','ZD2')
AND H.BillingDate between CAST('{}' AS DATE) and CAST('{}' AS DATE)
AND z.BILLINGDOCUMENT IS NULL AND Z.BillingDocumentItem IS NULL
""".format(FollowOnDays, RunFromDate, attach_end))
我花了 3-4 个小时来弄清楚如何纠正我的代码。请帮助我的代码有什么问题,如果可能的话请帮助提供等效的代码。感谢任何帮助。
在 Microsoft SQL 中,
CROSS APPLY
可以访问左表中的当前数据行,并运行与左表中的数据行相关的子查询。 在极端情况下(当优化器无法做得更好时),这会导致对左侧表中的每一行执行右侧查询。
Spark SQL 等效项是
LATERAL
或 LATERAL
连接。 我还没有深入研究所有细节,而且您的具体代码足够复杂,我无法花时间帮助将其付诸实践,但LATERAL
是您需要调查的。
尝试
作为有用的起点。