我有一个场景,需要将多个 SQL 数据库(每个数据库都有自己的端点和连接)中的大型数据集加载到 Spark 集群中。考虑到这些数据集的大小(数十亿行),我希望通过并行而不是顺序加载数据来最大限度地提高效率,以避免单个数据库过载并减少总体加载时间。
将这种并行数据加载到 Spark 中是否可行?如果是这样,实现这一目标的最佳实践或方法是什么?这个过程应该手动管理,还是有可以提供帮助的自动化解决方案或框架?
任何有关如何解决此问题的指导或建议将不胜感激!
既然您提到每个数据库都有自己的连接,并且您希望并行触发所有数据库的获取,则可以使用线程池来完成此操作。 我使用以下代码优化了读取大约 300 多个包含 100 万行的表。您需要使用不同的获取大小来测试性能。
此外,如果您知道 Spark 在源级别下推过滤器(如果可以)以便可以推送最小化的数据,请尝试使用它。当您使用强制转换时,下推将不起作用,过滤器列上的一些计算或过滤器列是结构体类型。
如果数据库有分区逻辑,请尝试使用它,在我的例子中,SQL DB 有 3 个分区,我只想从 1 个分区获取数据,因此我在 SQL 查询中使用了分区提示。
如果您想对表进行一些转换,请不要在此级别执行它们,因为这会增加您的时间,并且首先将这些表保存在 hive db 或 delta Lake 上的某个位置,然后对它们进行转换,您将获得更快的速度。
from multiprocessing.pool import ThreadPool
from itertools import product
def query_table(params):
connection, table_name = params
username = connection['username']
password = connection['password']
host = connection['host']
url = f"jdbc:oracle:thin:{username}/{password}@{host}"
query_str = f"SELECT * FROM {table_name}"
driver = "oracle.jdbc.driver.OracleDriver"
fetch_size = 30000
df = spark.read.format("jdbc") \
.option("url", url) \
.option("query", query_str) \
.option("user", username) \
.option("password", password) \
.option("fetchsize", str(fetch_size)) \
.option("driver", driver) \
.load()
return df
connection_lists = [
{'username': 'u1', 'password': 'p1', 'host': 'hostname1:port1/servicename1'},
{'username': 'u2', 'password': 'p2', 'host': 'hostname2:port2/servicename2'},
{'username': 'u3', 'password': 'p3', 'host': 'hostname2:port2/servicename2'}
]
table_names = ['table1', 'table2', 'table3']
with ThreadPool(150) as pool:
dataframes = pool.map(query_table, product(connection_lists, table_names))