我在胶水台上压实时遇到困难。
我创建了一个 Glue 数据库和一个 Glue 表。
我有一个 S3 存储桶设置。
我已使用按照此处所述设置的优化角色在 Glue 仪表板优化部分中启用了表优化。
因此,为了测试压缩是否有效,我的理解是我需要将数据添加到表中,该表将生成超过 50 个小于 128MB 的 parquet 文件,并且所有文件都需要位于同一个 S3 文件夹中。
请注意,我对 Athena、Glue 和 SQL 完全陌生。
有人可以告诉我需要做什么才能生成镶木地板文件以便触发压缩吗?
这是我到目前为止所做的尝试 -
我使用在 AWS Athena 查询编辑器中运行的查询创建了一个表 -
CREATE TABLE msk_ingestion__qa_apps_us.test_compaction ( customer_id INT, name STRING, age INT, dob DATE, email STRING, address STRING, testcol STRING
)
LOCATION 's3://msk-ingestion--qa-apps-us/test_compaction/'
TBLPROPERTIES ( 'table_type' = 'ICEBERG', 'format' = 'parquet'
)
我创建了此作业并成功运行它,它在以“part-00000-xxxx-xxx-xxx-snappy.parquet”的方式命名的表存储桶中创建了 100 多个 parquet 文件,大小均约为 2.7K。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from faker import Faker
import random
# Initialize Glue job context
args = getResolvedOptions(sys.argv, ['JOB_NAME', 's3_target_bucket', 'database_name', 'table_name'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Parameters
s3_target_bucket = args['s3_target_bucket']
database_name = args['database_name']
table_name = args['table_name']
# Define schema
schema = StructType([
StructField("customer_id", IntegerType(), False),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("dob", DateType(), True),
StructField("email", StringType(), True),
StructField("address", StringType(), True),
StructField("testcol", StringType(), True)
])
# Initialize Faker
fake = Faker()
def generate_data(num_records):
"""Generates sample data for testing."""
return [(random.randint(1, 10000), fake.name(), random.randint(18, 70), fake.date_of_birth(minimum_age=18),
fake.email(), fake.address(), "test") for _ in range(num_records)]
# Create and write multiple small files to the same S3 directory
num_files = 10 # Number of files you want to create
records_per_file = 100 # Records per file
for _ in range(num_files):
data = generate_data(records_per_file)
df = spark.createDataFrame(data, schema=schema)
df.write.format("parquet") \
.mode("append") \
.option("path", f"s3://{s3_target_bucket}/{table_name}/") \
.save()
job.commit()
Compaction 表明它已启用,但从未运行。仪表板的压缩部分没有显示错误。
生成的文件是否有问题或者文件格式错误?有其他方法来生成文件吗?
我只想看到压缩运行,所以如果有任何其他解决方案,我将非常感激。
对于任何类似尝试测试压缩的人,本教程为我提供了所需的所有步骤 -
https://devopstar.com/2023/11/18/lets-try-aws-glue-automatic-compaction-for-apache-iceberg/