我的Spark应用程序在读取Cassandra时有读取时间,我不知道如何解决这个问题。每当它到达下面的代码部分时,它就会有一个读取时间。我试图改变我的代码的结构,但这仍然没有解决问题。
#coding = utf-8
import json
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext, Row
from pyspark.streaming.kafka import KafkaUtils
from datetime import datetime, timedelta
def read_json(x):
try:
y = json.loads(x)
except:
y = 0
return y
def TransformInData(x):
try:
body = json.loads(x['body'])
return (body['articles'])
except:
return 0
def partition_key(source,id):
return source+chr(ord('A') + int(id[-2:]) % 26)
def articleStoreToCassandra(rdd,rdd_axes,source,time_interval,update_list,schedules_rdd):
rdd_article = rdd.map(lambda x:Row(id=x[1][0],source=x[1][5],thumbnail=x[1][1],title=x[1][2],url=x[1][3],created_at=x[1][4],last_crawled=datetime.now(),category=x[1][6],channel=x[1][7],genre=x[1][8]))
rdd_article_by_created_at = rdd.map(lambda x:Row(source=x[1][5],created_at=x[1][4],article=x[1][0]))
rdd_article_by_url = rdd.map(lambda x:Row(url=x[1][3],article=x[1][0]))
if rdd_article.count()>0:
result_rdd_article = sqlContext.createDataFrame(rdd_article)
result_rdd_article.write.format("org.apache.spark.sql.cassandra").options(table="articles", keyspace = source).save(mode ="append")
if rdd_article_by_created_at.count()>0:
result_rdd_article_by_created_at = sqlContext.createDataFrame(rdd_article_by_created_at)
result_rdd_article_by_created_at.write.format("org.apache.spark.sql.cassandra").options(table="article_by_created_at", keyspace = source).save(mode ="append")
if rdd_article_by_url.count()>0:
result_rdd_article_by_url = sqlContext.createDataFrame(rdd_article_by_url)
result_rdd_article_by_url.write.format("org.apache.spark.sql.cassandra").options(table="article_by_url", keyspace = source).save(mode ="append")
我的代码的这部分有问题,并连接到下面的错误消息
rdd_schedule = rdd.map(lambda x:(partition_key(x[1][5],x[1]
[0]),x[1][0])).subtract(schedules_rdd).map(lambda x:Row(source=x[0],type='article',scheduled_for=datetime.now().replace(second=0, microsecond=0)+timedelta(minutes=time_interval),id=x[1]))
我附加了下面的错误消息,该消息可能与datastax有关。
if rdd_schedule.count()>0:
result_rdd_schedule = sqlContext.createDataFrame(rdd_schedule)
result_rdd_schedule.write.format("org.apache.spark.sql.cassandra").options(table="schedules", keyspace = source).save(mode ="append")
def zhihuArticleTransform(rdd):
rdd_cassandra =rdd.map(lambda x:(x[0],(x[0],x[1]['thumbnail'], x[1]['title'], x[1]['url'], datetime.fromtimestamp(float(x[1]['created_at'])),'zhihu', x[1]['category'] if x[1]['category'] else '', x[1]['channel'],''))) \
.subtract(zhihu_articles)
articleStoreToCassandra(rdd_cassandra,rdd_cassandra,'zhihu',5,[],zhihu_schedules)
conf = SparkConf().setAppName('allstreaming')
conf.set('spark.cassandra.input.consistency.level','QUORUM')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc,30)
sqlContext = SQLContext(sc)
start = 0
partition = 0
kafkaParams = {"metadata.broker.list": "localhost"}
"""
zhihustreaming
"""
zhihu_articles = sc.cassandraTable('keyspace','articles').map(lambda x:(x.id,(x.id,x.thumbnail,x.title,x.url,x.created_at+timedelta(hours=8),x.source,x.category,x.channel)))
zhihu_schedules=sqlContext.read.format('org.apache.spark.sql.cassandra').options(keyspace="keyspace", table="schedules").load().map(lambda x:(x.source,x.id))
zhihu_topic = 'articles'
zhihu_article_stream = KafkaUtils.createDirectStream(ssc, [zhihu_topic], kafkaParams)
zhihu_article_join_stream=zhihu_article_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:TransformInData(x)).filter(lambda x: x!=0).flatMap(lambda x:(a for a in x)).map(lambda x:(x['id'].encode("utf-8") ,x))
zhihu_article_join_stream.transform(zhihuArticleTransform).pprint()
ssc.start() # Start the computation ssc.awaitTermination()
ssc.awaitTermination()
这是我的错误消息:
[Stage 67:===================================================> (12 + 1) / 13]WARN 2016-05-04 09:18:36,943 org.apache.spark.scheduler.TaskSetManager: Lost task 7.0 in stage 67.0 (TID 231, 10.47.182.142): java.io.IOException: Exception during execution of SELECT "source", "type", "scheduled_for", "id" FROM "zhihu"."schedules" WHERE token("source", "type") > ? AND token("source", "type") <= ? ALLOW FILTERING: Cassandra timeout during read query at consistency QUORUM (3 responses were required but only 0 replica responded)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:215)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$13.apply(CassandraTableScanRDD.scala:229)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$13.apply(CassandraTableScanRDD.scala:229)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:966)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:425)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:248)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1652)
at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208)
Caused by: com.datastax.driver.core.exceptions.ReadTimeoutException: Cassandra timeout during read query at consistency QUORUM (3 responses were required but only 0 replica responded)
at com.datastax.driver.core.exceptions.ReadTimeoutException.copy(ReadTimeoutException.java:69)
at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:269)
at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:183)
at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52)
at sun.reflect.GeneratedMethodAccessor199.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
at com.sun.proxy.$Proxy8.execute(Unknown Source)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:207)
... 14 more
Caused by: com.datastax.driver.core.exceptions.ReadTimeoutException: Cassandra timeout during read query at consistency QUORUM (3 responses were required but only 0 replica responded)
at com.datastax.driver.core.exceptions.ReadTimeoutException.copy(ReadTimeoutException.java:69)
at com.datastax.driver.core.Responses$Error.asException(Responses.java:99)
at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:118)
at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:183)
at com.datastax.driver.core.RequestHandler.access$2300(RequestHandler.java:45)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:748)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:587)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:991)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:913)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:307)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:293)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:307)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:293)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:307)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:276)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:263)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:307)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:293)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:840)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:830)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:348)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:264)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.ReadTimeoutException: Cassandra timeout during read query at consistency QUORUM (3 responses were required but only 0 replica responded)
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:60)
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:37)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:213)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:204)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
... 12 more
[Stage 67:===================================================> (12 + 1) / 13]
谢谢你的帮助!
您必须创建ReadConf对象,然后增加读取数据的读取时间。除了使用WriteConf,您还可以增加写入时间。 Cassandra驱动程序默认使用几秒钟进行读写。所以改变这一点。
为了增加讨论,我有完全相同的问题,这就是我尝试过的。在我使用的spark-configuration属性文件中,我添加了以下参数:spark.cassandra.read.timeout_ms = 300000
这就是我提交火花的方式:
/opt/spark/bin/spark-submit \
--class com.pmi.spice.spark.ServiceHousekeepingMain \
--jars log4j-elasticsearch-appender.jar \
--driver-class-path log4j-elasticsearch-appender.jar \
--deploy-mode client \
--files spice6-truststore.jks,log4j.properties \
--properties-file spark-configuration.properties \
--driver-java-options "-Dlog4j.configuration=file:log4j.properties" \
--verbose \
spark-house-cassandra.jar --settings=job-configuration.properties --startTime=`date +'%Y-%m-%d-00:00:00UTC'` $S3_ARG
但是,Spark采用默认超时。我想要一些帮助。谢谢