我将这些服务组合在同一个融合网络上的单独的 docker 容器中:
spark-master:
image: bitnami/spark:latest
volumes:
- ./spark_stream.py:/opt/bitnami/spark/spark_stream.py
command: bin/spark-class org.apache.spark.deploy.master.Master
ports:
- "9090:8080"
- "7077:7077"
networks:
- confluent
spark-worker:
image: bitnami/spark:latest
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
depends_on:
- spark-master
environment:
SPARK_MODE: worker
SPARK_WORKER_CORES: 2
SPARK_WORKER_MEMORY: 1g
SPARK_MASTER_URL: spark://spark-master:7077
networks:
- confluent
volumes:
- ./spark_stream.py:/opt/bitnami/spark/spark_stream.py
cassandra_db:
image: cassandra:latest
container_name: cassandra
hostname: cassandra
ports:
- "9042:9042"
environment:
- MAX_HEAP_SIZE=512M
- HEAP_NEWSIZE=100M
- CASSANDRA_USERNAME=cassandra
- CASSANDRA_PASSWORD=cassandra
networks:
- confluent
# Define the networks
networks:
confluent:
我创建了一个这样的火花连接:
s_conn = SparkSession.builder \
.appName('SparkDataStreaming') \
.config('spark.cassandra.connection.host', 'cassandra') \
.config("spark.cassandra.connection.port", "9042") \
.config("spark.cassandra.auth.username", "cassandra") \
.config("spark.cassandra.auth.password", "cassandra") \
.getOrCreate()
Cassandra 连接如下:
auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
cluster = Cluster(['localhost'], port=9042, auth_provider=auth_provider, connect_timeout=60)
cas_session = cluster.connect()
spark 连接从 kafka 流(也在同一网络上)读取数据帧,如下所示:
spark_df = spark_conn.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'broker:29092') \
.option('subscribe', 'users_created') \
.option('startingOffsets', 'earliest') \
.load()
我使用以下方法将数据帧写入 cassandra:
streaming_query = (spark_df.writeStream.format("org.apache.spark.sql.cassandra")
.option('checkpointLocation', '/tmp/checkpoint')
.option('keyspace', 'spark_streams')
.option('table', 'created_users')
.start())
当我尝试将数据帧写入 cassandra 时,出现此错误:
ERROR CassandraConnectorConf: Unknown host 'cassandra'
java.net.UnknownHostException: cassandra: nodename nor servname provided, or not known
这就是我如何称呼我的 Spark 工作(我相信附有正确的包):
spark-submit --master spark://localhost:7077 --packages "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3" --conf spark.cassandra.connection.host=cassandra --conf spark.cassandra.connection.port=9042 spark_stream.py
我以为spark连接会连接到cassandra,但似乎找不到主机或接触点?
我尝试将
spark.cassandra.connection.host
更改为 localhost
或 127.0.0.1
。由于某种原因localhost
给了我一个不同的错误消息:
ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
24/11/14 01:59:40 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 0, writer: CassandraBulkWrite(org.apache.spark.sql.SparkSession@105fe638,com.datastax.spark.connector.cql.CassandraConnector@4d392247,TableDef(spark_streams,created_users,ArrayBuffer(ColumnDef(id,PartitionKeyColumn,VarCharType)),ArrayBuffer(),Stream(ColumnDef(address,RegularColumn,VarCharType), ColumnDef(email,RegularColumn,VarCharType), ColumnDef(first_name,RegularColumn,VarCharType), ColumnDef(gender,RegularColumn,VarCharType), ColumnDef(last_name,RegularColumn,VarCharType), ColumnDef(phone,RegularColumn,VarCharType), ColumnDef(picture,RegularColumn,VarCharType), ColumnDef(post_code,RegularColumn,VarCharType), ColumnDef(registered_date,RegularColumn,VarCharType), ColumnDef(username,RegularColumn,VarCharType)),Stream(),false,false,Map()),WriteConf(RowsInBatch(5),1000,Partition,LOCAL_QUORUM,false,false,5,None,TTLOption(DefaultValue),TimestampOption(DefaultValue),true,None),StructType(StructField(id,StringType,true),StructField(first_name,StringType,true),StructField(last_name,StringType,true),StructField(gender,StringType,true),StructField(address,StringType,true),StructField(post_code,StringType,true),StructField(email,StringType,true),StructField(username,StringType,true),StructField(registered_date,StringType,true),StructField(phone,StringType,true),StructField(picture,StringType,true)),org.apache.spark.SparkConf@5ac2dede)] is aborting.
我不认为这是列不匹配的错误,但我可能是错的。
如有任何帮助,我们将不胜感激。
您是否尝试过将 Kafka 与 Cassandra 集成,集成可以利用 Kafka Streams 和 DataStax Java Driver for Cassandra 的强大功能?这种组合可以有效地从 Kafka 流式传输数据并将其存储在 Cassandra 中。