I have a Spark Connect server running. Things are fine when I don't use UDFs (
df.show()
always works fine). But when I use UDF, it fails with SparkContext or SparkSession should be created first
. Obviously SparkSession exists, because it created the Dataframe and printed it. It's only when it tries to apply a UDF to it that it fails.
Is this (udfs/things-using-spark-context) something simply not yet supported in spark connect?
Here is a reproducible example:
pytest -k test_spark_connect
to run simple udf test on a spark connect remote session.pytest -k test_spark
to run same simple udf test on a new local spark session.PS: running
pytest
to run both tests at the same time will cause conflicts, so must be run separately.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
@udf
def to_upper(s):
if s is not None:
return s.upper()
def test_spark_connect():
simple_spark_test(SparkSession.builder.remote("sc://localhost:15002").getOrCreate())
def test_spark():
simple_spark_test(SparkSession.builder.getOrCreate())
def simple_spark_test(session_to_test: SparkSession):
print(f'\nss: {session_to_test}')
print(f'ss.conf.get("spark.app.name"): {session_to_test.conf.get("spark.app.name")}')
df = session_to_test.createDataFrame([(1, "John Doe")], ("id", "name"))
df.show()
df.select(col("name"), to_upper("name")).show()
$ pytest -k test_spark_connect
... snip ...
conftest.py::test_spark_connect FAILED [100%]
ss: <pyspark.sql.connect.session.SparkSession object at 0x0000022C1D8C37F0>
ss.conf.get("spark.app.name"): Spark Connect server
+---+--------+
| id| name|
+---+--------+
| 1|John Doe|
+---+--------+
conftest.py:9 (test_spark_connect)
def test_spark_connect():
> simple_spark_test(SparkSession.builder.remote("sc://localhost:15002").getOrCreate())
conftest.py:11:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
conftest.py:22: in simple_spark_test
df.select(col("name"), to_upper("name")).show()
.venv\lib\site-packages\pyspark\sql\udf.py:425: in wrapper
return self(*args)
.venv\lib\site-packages\pyspark\sql\udf.py:340: in __call__
sc = get_active_spark_context()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
def get_active_spark_context() -> SparkContext:
"""Raise RuntimeError if SparkContext is not initialized,
otherwise, returns the active SparkContext."""
sc = SparkContext._active_spark_context
if sc is None or sc._jvm is None:
> raise RuntimeError("SparkContext or SparkSession should be created first.")
E RuntimeError: SparkContext or SparkSession should be created first.
.venv\lib\site-packages\pyspark\sql\utils.py:248: RuntimeError
$
$ pytest -k test_spark
conftest.py::test_spark
... snip ...
25/03/03 12:25:26 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
PASSED [100%]
ss: <pyspark.sql.session.SparkSession object at 0x000001EA41099540>
ss.conf.get("spark.app.name"): pyspark-shell
+---+--------+
| id| name|
+---+--------+
| 1|John Doe|
+---+--------+
+--------+--------------+
| name|to_upper(name)|
+--------+--------------+
|John Doe| JOHN DOE|
+--------+--------------+
$
I met the same error on Spark 3.5.5. Spark sql can run, but failed on udf:
>>> spark.sql("select 1").show()
+---+
| 1|
+---+
| 1|
+---+
>>> df.select(to_upper("name"), add_one("age")).show()
Traceback (most recent call last):
File "<python-input-13>", line 1, in <module>
df.select(to_upper("name"), add_one("age")).show()
~~~~~~~~^^^^^^^^
File "/Users/remziy/python3/venv/lib/python3.13/site-packages/pyspark/sql/udf.py", line 423, in wrapper
return self(*args)
File "/Users/remziy/python3/venv/lib/python3.13/site-packages/pyspark/sql/udf.py", line 339, in __call__
sc = get_active_spark_context()
File "/Users/remziy/python3/venv/lib/python3.13/site-packages/pyspark/sql/utils.py", line 248, in get_active_spark_context
raise RuntimeError("SparkContext or SparkSession should be created first.")
RuntimeError: SparkContext or SparkSession should be created first.