我正在尝试使用 Spark 读取 Databricks 中的大型 zstandard 文件(压缩后约 30GB)。这是以下 PGN 格式 (.pgn.zst) 的国际象棋游戏合集
[Event "Rated Bullet tournament https://lichess.org/tournament/yc1WW2Ox"]
[Site "https://lichess.org/PpwPOZMq"]
[Date "2017.04.01"]
[Round "-"]
[White "Abbot"]
[Black "Costello"]
[Result "0-1"]
[UTCDate "2017.04.01"]
[UTCTime "11:32:01"]
[WhiteElo "2100"]
[BlackElo "2000"]
[WhiteRatingDiff "-4"]
[BlackRatingDiff "+1"]
[WhiteTitle "FM"]
[ECO "B30"]
[Opening "Sicilian Defense: Old Sicilian"]
[TimeControl "300+0"]
[Termination "Time forfeit"]
1. e4 { [%eval 0.17] [%clk 0:00:30] } 1... c5 { [%eval 0.19] [%clk 0:00:30] }
2. Nf3 { [%eval 0.25] [%clk 0:00:29] } 2... Nc6 { [%eval 0.33] [%clk 0:00:30] }
3. Bc4 { [%eval -0.13] [%clk 0:00:28] } 3... e6 { [%eval -0.04] [%clk 0:00:30] }
4. c3 { [%eval -0.4] [%clk 0:00:27] } 4... b5? { [%eval 1.18] [%clk 0:00:30] }
5. Bb3?! { [%eval 0.21] [%clk 0:00:26] } 5... c4 { [%eval 0.32] [%clk 0:00:29] }
6. Bc2 { [%eval 0.2] [%clk 0:00:25] } 6... a5 { [%eval 0.6] [%clk 0:00:29] }
7. d4 { [%eval 0.29] [%clk 0:00:23] } 7... cxd3 { [%eval 0.6] [%clk 0:00:27] }
8. Qxd3 { [%eval 0.12] [%clk 0:00:22] } 8... Nf6 { [%eval 0.52] [%clk 0:00:26] }
9. e5 { [%eval 0.39] [%clk 0:00:21] } 9... Nd5 { [%eval 0.45] [%clk 0:00:25] }
10. Bg5?! { [%eval -0.44] [%clk 0:00:18] } 10... Qc7 { [%eval -0.12] [%clk 0:00:23] }
11. Nbd2?? { [%eval -3.15] [%clk 0:00:14] } 11... h6 { [%eval -2.99] [%clk 0:00:23] }
12. Bh4 { [%eval -3.0] [%clk 0:00:11] } 12... Ba6? { [%eval -0.12] [%clk 0:00:23] }
13. b3?? { [%eval -4.14] [%clk 0:00:02] } 13... Nf4? { [%eval -2.73] [%clk 0:00:21] } 0-1
要加载文件,我使用
spark.read.text
。根据我的理解,zst 文件不可分割,因此最终会将整个文件读入单个分区,从而导致大量溢出。我还想应用主元转换将每场比赛提取到单个记录中。我相信,由于我必须逐行将每一行分组到一个游戏中,因此此步骤在一个分区上运行效率也很低。
我想知道 Spark 中是否有更好的方法来做到这一点,如果没有,什么是更适合这样的工作的工具?
下面是我的代码,带有示例 df 以供澄清
df = spark.read.text(file_name).filter(
(col('value') != '') &
(~col('value').like('%UTCTime%')) &
(~col('value').like('%Result%'))
)
价值 |
---|
[活动“额定闪电战游戏”] |
[白色“btr18am”] |
[黑色“Kozionov_sergey”] |
... |
【开局“后弃兵:交换变化、位置变化”】 |
1. d4 { [%clk 0:03:00] } d5 { [%clk 0:03:00] } 2. c4 { [%clk 0:03:01] } e6 { [%clk 0:03:01] } ... 27.Nxf7 { [%clk 0:01:28] } 1-0 |
[活动“UltraBullet 锦标赛评级 https://lichess.org/tournament/5Emn5TK6”] |
[白色“rickyrich”] |
[黑色“seanysean”] |
... |
[开启“阿廖欣防御”] |
1. e4 { [%clk 0:00:15] } Nf6 { [%clk 0:00:15] } 2. e5 { [%clk 0:00:14] } g6 { [%clk 0:00:15] } ... 22. Rxe1 { [%clk 0:00:00] } Bc3+ { [%clk 0:00:11] } 0-1 |
# Rename the "value" column to "Line"
df = df.withColumnRenamed("value", "Line")
# Extract the "Key" and "Value" columns based on the structure of the "Line" column
df = df.withColumn('Key', when(col('Line').startswith('1.'), 'Moves')
.otherwise(regexp_extract(col('Line'), r'\[(.*?)\s', 1))) \
.withColumn('Value', when(col('Line').startswith('1.'), col('Line'))
.otherwise(regexp_extract(col('Line'), r'"(.*)"', 1)))
# Add a column to identify the start of a game
df = df.withColumn("StartOfGame", when(col("Line").startswith("[Event"), 1).otherwise(0))
# Define a window specification for calculating the cumulative sum
windowSpec = Window.orderBy(monotonically_increasing_id())
# Calculate the cumulative sum of "StartOfGame" to create "GameID"
df = df.withColumn("GameID", sum("StartOfGame").over(windowSpec))
线 | 钥匙 | 价值 | 游戏开始 | 游戏ID |
---|---|---|---|---|
[活动“额定闪电战游戏”] | 活动 | 评级闪电战游戏 | 1 | 1 |
[白色“btr18am”] | 白色 | 上午 18 点 | 0 | 1 |
[黑色“Kozionov_sergey”] | 黑色 | 科齐奥诺夫_谢尔盖 | 0 | 1 |
… | … | … | 0 | 1 |
【开局“后弃兵:交换变化、位置变化”】 | 开幕 | 后弃兵策略:交换变化、位置变化 | 0 | 1 |
1. d4 { [%clk 0:03:00] } d5 { [%clk 0:03:00] } 2. c4 { [%clk 0:03:01] } e6 { [%clk 0:03:01] } ... 27.Nxf7 { [%clk 0:01:28] } 1-0 | 移动 | 1. d4 { [%clk 0:03:00] } d5 { [%clk 0:03:00] } 2. c4 { [%clk 0:03:01] } e6 { [%clk 0:03:01] } ... 27.Nxf7 { [%clk 0:01:28] } 1-0 | 0 | 1 |
[活动“UltraBullet 锦标赛评级 https://lichess.org/tournament/5Emn5TK6”] | 活动 | UltraBullet 锦标赛评级 https://lichess.org/tournament/5Emn5TK6 | 1 | 2 |
[白色“rickyrich”] | 白色 | 瑞奇里奇 | 0 | 2 |
[黑色“seanysean”] | 黑色 | 西尼西恩 | 0 | 2 |
… | … | … | 0 | 2 |
[开启“阿廖欣防御”] | 开幕 | 阿廖欣防御 | 0 | 2 |
1. e4 { [%clk 0:00:15] } Nf6 { [%clk 0:00:15] } 2. e5 { [%clk 0:00:14] } g6 { [%clk 0:00:15] } ... 22. Rxe1 { [%clk 0:00:00] } Bc3+ { [%clk 0:00:11] } 0-1 | 移动 | 1. e4 { [%clk 0:00:15] } Nf6 { [%clk 0:00:15] } 2. e5 { [%clk 0:00:14] } g6 { [%clk 0:00:15] } ... 22. Rxe1 { [%clk 0:00:00] } Bc3+ { [%clk 0:00:11] } 0-1 | 0 | 2 |
# Define the list of columns for pivoting
col_list = [
'UTCDate', 'Event', 'TimeControl', 'Opening', 'ECO' , 'Site',
'Termination', 'Moves',
'White', 'WhiteTitle', 'WhiteElo', 'WhiteRatingDiff',
'Black', 'BlackTitle', 'BlackElo', 'BlackRatingDiff',
]
# Pivot the DataFrame based on "GameID" and the specified columns
df = df.groupBy("GameID").pivot("Key", col_list).agg(first("Value"))
df = df.filter(col('Moves').contains('%eval'))
df.write.partitionBy('UTCDate').mode("overwrite").parquet(silver_file_path)
游戏ID | 活动 | 白色 | 黑色 | … | 开幕 | 移动 |
---|---|---|---|---|---|---|
1 | 评级闪电战游戏 | 上午 18 点 | 科齐奥诺夫_谢尔盖 | … | 后弃兵策略:交换变化、位置变化 | 1. d4 { [%clk 0:03:00] } d5 { [%clk 0:03:00] } 2. c4 { [%clk 0:03:01] } e6 { [%clk 0:03:01] } ... 27.Nxf7 { [%clk 0:01:28] } 1-0 |
2 | UltraBullet 锦标赛评级 https://lichess.org/tournament/5Emn5TK6 | 瑞奇里奇 | 西尼西恩 | … | 阿廖欣防御 | 1. e4 { [%clk 0:00:15] } Nf6 { [%clk 0:00:15] } 2. e5 { [%clk 0:00:14] } g6 { [%clk 0:00:15] } ... 22. Rxe1 { [%clk 0:00:00] } Bc3+ { [%clk 0:00:11] } 0-1 |
我尝试过使用更大的集群和更多的内存。虽然它确实减少了溢出,但我不确定这是否是一种更“有效”的方法,因为它还增加了集群中最终未被起诉的核心数量。
如果有足够大的磁盘可用,请提取 zst 文件并将每个游戏保存为单个文本文件,然后再使用 Spark 进行处理。
获取压缩文件并将每个国际象棋游戏写入名为
pgn
的目录中自己的文件中:
import zstandard as zstd
with zstd.open(file_name, 'r') as f:
current = ''
for i, line in enumerate(f):
if line.startswith('[Event'):
if current != '':
text_file = open(f"pgn/out_{i}.pgn", "w")
text_file.write(current)
text_file.close()
current = ''
current = current + line
if i % 100_000 == 0:
print(f'line {i:8d}: {line}')
text_file = open(f"pgn/out_last.pgn", "w")
text_file.write(current)
text_file.close()
现在用 Spark 阅读整个目录
pgn
。由于 Spark 现在可以看到不同的文本文件,因此它可以将读取多个文件的工作分配给整个集群。结果是一个数据框,每一行都是一场游戏。
spark = SparkSession.builder.appName("test").getOrCreate()
df = spark.read.text('pgn', wholetext=True)
df.show(vertical=True, truncate=False)
现在可以按照问题中所述继续解析每一行。