拆分 spark dataframe 中的列

问题描述 投票:0回答:2

我在 spark 数据框中有一个包含 2500 行的扭矩列,数据如下

torque
190Nm@ 2000rpm
250Nm@ 1500-2500rpm
12.7@ 2,700(kgm@ rpm)
22.4 kgm at 1750-2750rpm
11.5@ 4,500(kgm@ rpm) 

我想将每一行分成两列 Nm 和 rpm 就像

Nm        | rpm
190Nm     |  2000rpm
250Nm     | 1500-2500rpm
12.7Nm    | 2,700(kgm@ rpm)
22.4 kgm  | 1750-2750rpm
11.5Nm    | 4,500(kgm@ rpm)

我如何在数据块中做到这一点?

我尝试使用

type herefrom pyspark.sql.functions import split, trim, regexp_extract, when
df=cars

# Assuming the name of your dataframe is "df" and the torque column is "torque"
df = df.withColumn("torque_split", split(df["torque"], "@"))

# Extract the torque values and units, assign to columns 'torque_value' and 'torque_units'
df = df.withColumn("torque_value", trim(regexp_extract(df["torque_split"].getItem(0), r'\d+\.?\d*', 0)))
df = df.withColumn("torque_units", trim(regexp_extract(df["torque_split"].getItem(0), r'[a-zA-Z]+', 0)))

# Extract the rpm values and assign to the 'rpm' column
df = df.withColumn("rpm", trim(regexp_extract(df["torque_split"].getItem(1), r'\d+-?\d*\s?rpm', 0)))

# Convert kgm values to Nm
df = df.withColumn("Nm", 
                   when(df["torque_units"] == "kgm", df["torque_value"] * 9.80665)
                   .otherwise(df["torque_value"]))

# Drop the original torque, torque_split and torque_units columns
df = df.drop("torque", "torque_split", "torque_units", "torque_value")

# Show the resulting dataframe
df.display()

但是当数据类似于 2,700(kgm@rpm)

时,我得到空值
python pyspark databricks
2个回答
0
投票

编写一个能够解析每个单独字符串值的 python 函数,然后使用 PySpark UDF 在您的数据框中使用它。例如:

import re
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType, StringType

# Define UDFs for extracting RPM and torque values from the torque string column
def extract_torque_udf(torque_string):
    # Define regular expressions for matching torque values and units
    nm_regex = r"\d+(\.\d+)?\s*(kgm)?Nm"
    kgm_regex = r"\d+(\.\d+)?\s*(kgm)?"

    # Search for matches in the input string
    nm_match = re.search(nm_regex, torque_string)
    kgm_match = re.search(kgm_regex, torque_string)

    # Extract the torque value and unit from the matching regular expression
    if nm_match:
        torque_value = float(nm_match.group().replace("kgm", "").replace("Nm", ""))
    elif kgm_match:
        # convert kgm to Nm
        torque_value = float(kgm_match.group().replace("kgm", "")) * 9.80665
    else:
        raise ValueError("Torque string does not match expected format")

    return torque_value

def extract_rpm_udf(torque_string):
    # Define regular expression for matching rpm values
    rpm_regex = r"(\d+(?:,\d+)*-\d+(?:,\d+)*)\s*rpm|\b(\d+(?:,\d+)*)\s*rpm|\d+(?:,\d+)*\s*(?=\(kgm@ rpm\))"

    # Search for matches in the input string
    rpm_match = re.search(rpm_regex, torque_string)

    # Extract the rpm value(s) from the matching regular expression
    if rpm_match:
        rpm_value = rpm_match.group(1) or rpm_match.group(2) or rpm_match.group(0)
    else:
        raise ValueError("Torque string does not contain rpm values")

    return rpm_value

# Define the PySpark UDFs for the two functions
extract_torque = udf(extract_torque_udf, DoubleType())
extract_rpm = udf(extract_rpm_udf, StringType())

# Define the PySpark DataFrame
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Torque Extraction").getOrCreate()

data = [("190Nm@ 2000rpm",), 
        ("250Nm@ 1500-2500rpm",),
        ("12.7@ 2,700(kgm@ rpm)",),
        ("22.4 kgm at 1750-2750rpm",),
        ("11.5@ 4,500(kgm@ rpm)",)]

df = spark.createDataFrame(data, ["Torque"])

# Add the RPM and torque value columns using the PySpark UDFs
df = df.withColumn("RPM", extract_rpm("Torque"))
df = df.withColumn("Torque Value", extract_torque("Torque"))

# Display the resulting DataFrame
df.show()
扭矩 每分钟转数 扭矩值
190牛米@ 2000rpm 2000 190.0
250Nm@1500-2500rpm 1500-2500 250.0
12.7@ 2,700(公斤@ ... 2,700 124.54445499999999
22.4 kgm 在 1750-... 1750-2750 219.66895999999997
11.5@ 4,500(公斤@ ... 4,500 112.77647499999999

0
投票

没有 UDF 的方法:

  1. 使用regexp_extract_all提取前两组数
  2. 如果原始字符串包含
    kgm
  3. ,则调整扭矩值的一个因子
  4. 将步骤 1 中的数组拆分为两列,并乘以步骤 2 中的因子
  5. 删除中间列
from pyspark.sql import functions as F

df = ...

df.withColumn("numbers", F.expr("regexp_extract_all(Torque, '([0-9,.\-]+)')")) \
  .withColumn("factor", F.when(F.instr("Torque", "kgm") > 0 , 9.80665).otherwise(1.0)) \
  .withColumn("torque value", F.col("numbers")[0] * F.col("factor")) \
  .withColumn("rpm", F.col("numbers")[1]) \
  .drop("numbers", "factor") \
  .show(truncate=False)

结果:

+------------------------+------------------+---------+
|Torque                  |torque value      |rpm      |
+------------------------+------------------+---------+
|190Nm@ 2000rpm          |190.0             |2000     |
|250Nm@ 1500-2500rpm     |250.0             |1500-2500|
|12.7@ 2,700(kgm@ rpm)   |124.54445499999999|2,700    |
|22.4 kgm at 1750-2750rpm|219.66895999999997|1750-2750|
|11.5@ 4,500(kgm@ rpm)   |112.77647499999999|4,500    |
+------------------------+------------------+---------+
© www.soinside.com 2019 - 2024. All rights reserved.