Spark - 如何高效读取大型 zstandard 文件

问题描述 投票:0回答:1

我正在尝试使用 Spark 读取 Databricks 中的大型 zstandard 文件(压缩后约 30GB)。这是以下 PGN 格式 (.pgn.zst) 的国际象棋游戏合集

[Event "Rated Bullet tournament https://lichess.org/tournament/yc1WW2Ox"]
[Site "https://lichess.org/PpwPOZMq"]
[Date "2017.04.01"]
[Round "-"]
[White "Abbot"]
[Black "Costello"]
[Result "0-1"]
[UTCDate "2017.04.01"]
[UTCTime "11:32:01"]
[WhiteElo "2100"]
[BlackElo "2000"]
[WhiteRatingDiff "-4"]
[BlackRatingDiff "+1"]
[WhiteTitle "FM"]
[ECO "B30"]
[Opening "Sicilian Defense: Old Sicilian"]
[TimeControl "300+0"]
[Termination "Time forfeit"]

1. e4 { [%eval 0.17] [%clk 0:00:30] } 1... c5 { [%eval 0.19] [%clk 0:00:30] }
2. Nf3 { [%eval 0.25] [%clk 0:00:29] } 2... Nc6 { [%eval 0.33] [%clk 0:00:30] }
3. Bc4 { [%eval -0.13] [%clk 0:00:28] } 3... e6 { [%eval -0.04] [%clk 0:00:30] }
4. c3 { [%eval -0.4] [%clk 0:00:27] } 4... b5? { [%eval 1.18] [%clk 0:00:30] }
5. Bb3?! { [%eval 0.21] [%clk 0:00:26] } 5... c4 { [%eval 0.32] [%clk 0:00:29] }
6. Bc2 { [%eval 0.2] [%clk 0:00:25] } 6... a5 { [%eval 0.6] [%clk 0:00:29] }
7. d4 { [%eval 0.29] [%clk 0:00:23] } 7... cxd3 { [%eval 0.6] [%clk 0:00:27] }
8. Qxd3 { [%eval 0.12] [%clk 0:00:22] } 8... Nf6 { [%eval 0.52] [%clk 0:00:26] }
9. e5 { [%eval 0.39] [%clk 0:00:21] } 9... Nd5 { [%eval 0.45] [%clk 0:00:25] }
10. Bg5?! { [%eval -0.44] [%clk 0:00:18] } 10... Qc7 { [%eval -0.12] [%clk 0:00:23] }
11. Nbd2?? { [%eval -3.15] [%clk 0:00:14] } 11... h6 { [%eval -2.99] [%clk 0:00:23] }
12. Bh4 { [%eval -3.0] [%clk 0:00:11] } 12... Ba6? { [%eval -0.12] [%clk 0:00:23] }
13. b3?? { [%eval -4.14] [%clk 0:00:02] } 13... Nf4? { [%eval -2.73] [%clk 0:00:21] } 0-1

要加载文件,我使用

spark.read.text
。根据我的理解,zst 文件不可分割,因此最终会将整个文件读入单个分区,从而导致大量溢出。我还想应用主元转换将每场比赛提取到单个记录中。我相信,由于我必须逐行将每一行分组到一个游戏中,因此此步骤在一个分区上运行效率也很低。 我想知道 Spark 中是否有更好的方法来做到这一点,如果没有,什么是更适合这样的工作的工具?

下面是我的代码,带有示例 df 以供澄清

df = spark.read.text(file_name).filter(
    (col('value') != '') &
    (~col('value').like('%UTCTime%')) &
    (~col('value').like('%Result%'))
)
价值
[活动“额定闪电战游戏”]
[白色“btr18am”]
[黑色“Kozionov_sergey”]
...
【开局“后弃兵:交换变化、位置变化”】
1. d4 { [%clk 0:03:00] } d5 { [%clk 0:03:00] } 2. c4 { [%clk 0:03:01] } e6 { [%clk 0:03:01] } ... 27.Nxf7 { [%clk 0:01:28] } 1-0
[活动“UltraBullet 锦标赛评级 https://lichess.org/tournament/5Emn5TK6”]
[白色“rickyrich”]
[黑色“seanysean”]
...
[开启“阿廖欣防御”]
1. e4 { [%clk 0:00:15] } Nf6 { [%clk 0:00:15] } 2. e5 { [%clk 0:00:14] } g6 { [%clk 0:00:15] } ... 22. Rxe1 { [%clk 0:00:00] } Bc3+ { [%clk 0:00:11] } 0-1
# Rename the "value" column to "Line"
df = df.withColumnRenamed("value", "Line")

# Extract the "Key" and "Value" columns based on the structure of the "Line" column
df = df.withColumn('Key', when(col('Line').startswith('1.'), 'Moves')
                   .otherwise(regexp_extract(col('Line'), r'\[(.*?)\s', 1))) \
       .withColumn('Value', when(col('Line').startswith('1.'), col('Line'))
                   .otherwise(regexp_extract(col('Line'), r'"(.*)"', 1)))

# Add a column to identify the start of a game
df = df.withColumn("StartOfGame", when(col("Line").startswith("[Event"), 1).otherwise(0))

# Define a window specification for calculating the cumulative sum
windowSpec = Window.orderBy(monotonically_increasing_id())

# Calculate the cumulative sum of "StartOfGame" to create "GameID"
df = df.withColumn("GameID", sum("StartOfGame").over(windowSpec))
线 钥匙 价值 游戏开始 游戏ID
[活动“额定闪电战游戏”] 活动 评级闪电战游戏 1 1
[白色“btr18am”] 白色 上午 18 点 0 1
[黑色“Kozionov_sergey”] 黑色 科齐奥诺夫_谢尔盖 0 1
0 1
【开局“后弃兵:交换变化、位置变化”】 开幕 后弃兵策略:交换变化、位置变化 0 1
1. d4 { [%clk 0:03:00] } d5 { [%clk 0:03:00] } 2. c4 { [%clk 0:03:01] } e6 { [%clk 0:03:01] } ... 27.Nxf7 { [%clk 0:01:28] } 1-0 移动 1. d4 { [%clk 0:03:00] } d5 { [%clk 0:03:00] } 2. c4 { [%clk 0:03:01] } e6 { [%clk 0:03:01] } ... 27.Nxf7 { [%clk 0:01:28] } 1-0 0 1
[活动“UltraBullet 锦标赛评级 https://lichess.org/tournament/5Emn5TK6”] 活动 UltraBullet 锦标赛评级 https://lichess.org/tournament/5Emn5TK6 1 2
[白色“rickyrich”] 白色 瑞奇里奇 0 2
[黑色“seanysean”] 黑色 西尼西恩 0 2
0 2
[开启“阿廖欣防御”] 开幕 阿廖欣防御 0 2
1. e4 { [%clk 0:00:15] } Nf6 { [%clk 0:00:15] } 2. e5 { [%clk 0:00:14] } g6 { [%clk 0:00:15] } ... 22. Rxe1 { [%clk 0:00:00] } Bc3+ { [%clk 0:00:11] } 0-1 移动 1. e4 { [%clk 0:00:15] } Nf6 { [%clk 0:00:15] } 2. e5 { [%clk 0:00:14] } g6 { [%clk 0:00:15] } ... 22. Rxe1 { [%clk 0:00:00] } Bc3+ { [%clk 0:00:11] } 0-1 0 2
# Define the list of columns for pivoting
col_list = [ 
    'UTCDate', 'Event', 'TimeControl', 'Opening', 'ECO' , 'Site',
     'Termination', 'Moves', 
    'White', 'WhiteTitle', 'WhiteElo', 'WhiteRatingDiff', 
    'Black', 'BlackTitle', 'BlackElo', 'BlackRatingDiff', 
    ]


# Pivot the DataFrame based on "GameID" and the specified columns
df = df.groupBy("GameID").pivot("Key", col_list).agg(first("Value"))

df = df.filter(col('Moves').contains('%eval'))

df.write.partitionBy('UTCDate').mode("overwrite").parquet(silver_file_path)
游戏ID 活动 白色 黑色 开幕 移动
1 评级闪电战游戏 上午 18 点 科齐奥诺夫_谢尔盖 后弃兵策略:交换变化、位置变化 1. d4 { [%clk 0:03:00] } d5 { [%clk 0:03:00] } 2. c4 { [%clk 0:03:01] } e6 { [%clk 0:03:01] } ... 27.Nxf7 { [%clk 0:01:28] } 1-0
2 UltraBullet 锦标赛评级 https://lichess.org/tournament/5Emn5TK6 瑞奇里奇 西尼西恩 阿廖欣防御 1. e4 { [%clk 0:00:15] } Nf6 { [%clk 0:00:15] } 2. e5 { [%clk 0:00:14] } g6 { [%clk 0:00:15] } ... 22. Rxe1 { [%clk 0:00:00] } Bc3+ { [%clk 0:00:11] } 0-1

我尝试过使用更大的集群和更多的内存。虽然它确实减少了溢出,但我不确定这是否是一种更“有效”的方法,因为它还增加了集群中最终未被起诉的核心数量。

apache-spark databricks
1个回答
0
投票

如果有足够大的磁盘可用,请提取 zst 文件并将每个游戏保存为单个文本文件,然后再使用 Spark 进行处理。

第1步:分割文件

获取压缩文件并将每个国际象棋游戏写入名为

pgn
的目录中自己的文件中:

import zstandard as zstd

with zstd.open(file_name, 'r') as f:
  current = ''
  for i, line in enumerate(f):
    if line.startswith('[Event'):
        if current != '':
          text_file = open(f"pgn/out_{i}.pgn", "w")
          text_file.write(current)
          text_file.close()
          current = ''    
    current = current + line
    if i % 100_000 == 0:
      print(f'line {i:8d}: {line}')
  text_file = open(f"pgn/out_last.pgn", "w")
  text_file.write(current)
  text_file.close()

第 2 步:创建 Spark 数据框

现在用 Spark 阅读整个目录

pgn
。由于 Spark 现在可以看到不同的文本文件,因此它可以将读取多个文件的工作分配给整个集群。结果是一个数据框,每一行都是一场游戏。

spark = SparkSession.builder.appName("test").getOrCreate()
df = spark.read.text('pgn', wholetext=True)
df.show(vertical=True, truncate=False)

现在可以按照问题中所述继续解析每一行。

© www.soinside.com 2019 - 2024. All rights reserved.