Spark 从多个 SQL 数据库并行读取

问题描述 投票:0回答:1

我有一个场景,需要将多个 SQL 数据库(每个数据库都有自己的端点和连接)中的大型数据集加载到 Spark 集群中。考虑到这些数据集的大小(数十亿行),我希望通过并行而不是顺序加载数据来最大限度地提高效率,以避免单个数据库过载并减少总体加载时间。

将这种并行数据加载到 Spark 中是否可行?如果是这样,实现这一目标的最佳实践或方法是什么?这个过程应该手动管理,还是有可以提供帮助的自动化解决方案或框架?

任何有关如何解决此问题的指导或建议将不胜感激!

apache-spark pyspark apache-spark-sql
1个回答
0
投票

既然您提到每个数据库都有自己的连接,并且您希望并行触发所有数据库的获取,则可以使用线程池来完成此操作。 我使用以下代码优化了读取大约 300 多个包含 100 万行的表。您需要使用不同的获取大小来测试性能。

此外,如果您知道 Spark 在源级别下推过滤器(如果可以)以便可以推送最小化的数据,请尝试使用它。当您使用强制转换时,下推将不起作用,过滤器列上的一些计算或过滤器列是结构体类型。

如果数据库有分区逻辑,请尝试使用它,在我的例子中,SQL DB 有 3 个分区,我只想从 1 个分区获取数据,因此我在 SQL 查询中使用了分区提示。

如果您想对表进行一些转换,请不要在此级别执行它们,因为这会增加您的时间,并且首先将这些表保存在 hive db 或 delta Lake 上的某个位置,然后对它们进行转换,您将获得更快的速度。

from multiprocessing.pool import ThreadPool
from itertools import product

def query_table(params):
   connection, table_name = params
   username = connection['username']
   password = connection['password']
   host = connection['host']
   url = f"jdbc:oracle:thin:{username}/{password}@{host}"
   query_str = f"SELECT * FROM {table_name}"
   driver = "oracle.jdbc.driver.OracleDriver"
   fetch_size = 30000

   df = spark.read.format("jdbc") \
       .option("url", url) \
       .option("query", query_str) \
       .option("user", username) \
       .option("password", password) \
       .option("fetchsize", str(fetch_size)) \
       .option("driver", driver) \
       .load()
   return df

connection_lists = [
 {'username': 'u1', 'password': 'p1', 'host': 'hostname1:port1/servicename1'},
 {'username': 'u2', 'password': 'p2', 'host': 'hostname2:port2/servicename2'},
 {'username': 'u3', 'password': 'p3', 'host': 'hostname2:port2/servicename2'}
]
table_names = ['table1', 'table2', 'table3']

with ThreadPool(150) as pool:
    dataframes = pool.map(query_table, product(connection_lists, table_names))
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.