我的Spark应用程序在读取Cassandra时有读取时间,我不知道如何解决这个问题

问题描述 投票:1回答:2

我的Spark应用程序在读取Cassandra时有读取时间,我不知道如何解决这个问题。每当它到达下面的代码部分时,它就会有一个读取时间。我试图改变我的代码的结构,但这仍然没有解决问题。

#coding = utf-8
import json
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext, Row
from pyspark.streaming.kafka import KafkaUtils
from datetime import datetime, timedelta 

def read_json(x):
    try:
        y = json.loads(x)
    except:
        y = 0
    return y

def TransformInData(x):
    try:
        body = json.loads(x['body'])
        return (body['articles'])
    except:
        return 0
def partition_key(source,id):
    return source+chr(ord('A') + int(id[-2:]) % 26)

def articleStoreToCassandra(rdd,rdd_axes,source,time_interval,update_list,schedules_rdd):
    rdd_article = rdd.map(lambda x:Row(id=x[1][0],source=x[1][5],thumbnail=x[1][1],title=x[1][2],url=x[1][3],created_at=x[1][4],last_crawled=datetime.now(),category=x[1][6],channel=x[1][7],genre=x[1][8]))

    rdd_article_by_created_at = rdd.map(lambda x:Row(source=x[1][5],created_at=x[1][4],article=x[1][0]))
    rdd_article_by_url = rdd.map(lambda x:Row(url=x[1][3],article=x[1][0]))

    if rdd_article.count()>0:
        result_rdd_article = sqlContext.createDataFrame(rdd_article)
        result_rdd_article.write.format("org.apache.spark.sql.cassandra").options(table="articles", keyspace = source).save(mode ="append")

    if rdd_article_by_created_at.count()>0:  
        result_rdd_article_by_created_at = sqlContext.createDataFrame(rdd_article_by_created_at)
        result_rdd_article_by_created_at.write.format("org.apache.spark.sql.cassandra").options(table="article_by_created_at", keyspace = source).save(mode ="append")
    if rdd_article_by_url.count()>0:   
        result_rdd_article_by_url = sqlContext.createDataFrame(rdd_article_by_url)
        result_rdd_article_by_url.write.format("org.apache.spark.sql.cassandra").options(table="article_by_url", keyspace = source).save(mode ="append")

我的代码的这部分有问题,并连接到下面的错误消息

    rdd_schedule = rdd.map(lambda x:(partition_key(x[1][5],x[1]

[0]),x[1][0])).subtract(schedules_rdd).map(lambda x:Row(source=x[0],type='article',scheduled_for=datetime.now().replace(second=0, microsecond=0)+timedelta(minutes=time_interval),id=x[1]))

我附加了下面的错误消息,该消息可能与datastax有关。

if rdd_schedule.count()>0:   
            result_rdd_schedule = sqlContext.createDataFrame(rdd_schedule)
            result_rdd_schedule.write.format("org.apache.spark.sql.cassandra").options(table="schedules", keyspace = source).save(mode ="append")

    def zhihuArticleTransform(rdd):
        rdd_cassandra =rdd.map(lambda x:(x[0],(x[0],x[1]['thumbnail'], x[1]['title'], x[1]['url'], datetime.fromtimestamp(float(x[1]['created_at'])),'zhihu', x[1]['category'] if x[1]['category'] else '', x[1]['channel'],''))) \
                                .subtract(zhihu_articles)
        articleStoreToCassandra(rdd_cassandra,rdd_cassandra,'zhihu',5,[],zhihu_schedules)

    conf = SparkConf().setAppName('allstreaming')
    conf.set('spark.cassandra.input.consistency.level','QUORUM')
    sc = SparkContext(conf=conf)
    ssc = StreamingContext(sc,30)
    sqlContext = SQLContext(sc)
    start = 0
partition = 0

kafkaParams = {"metadata.broker.list": "localhost"}


"""
zhihustreaming
"""
zhihu_articles = sc.cassandraTable('keyspace','articles').map(lambda x:(x.id,(x.id,x.thumbnail,x.title,x.url,x.created_at+timedelta(hours=8),x.source,x.category,x.channel)))
zhihu_schedules=sqlContext.read.format('org.apache.spark.sql.cassandra').options(keyspace="keyspace", table="schedules").load().map(lambda x:(x.source,x.id))

zhihu_topic = 'articles'
zhihu_article_stream = KafkaUtils.createDirectStream(ssc, [zhihu_topic], kafkaParams)
zhihu_article_join_stream=zhihu_article_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:TransformInData(x)).filter(lambda x: x!=0).flatMap(lambda x:(a for a in x)).map(lambda x:(x['id'].encode("utf-8") ,x))
zhihu_article_join_stream.transform(zhihuArticleTransform).pprint()


ssc.start()    # Start the computation ssc.awaitTermination()
ssc.awaitTermination()

这是我的错误消息:

[Stage 67:===================================================>    (12 + 1) / 13]WARN  2016-05-04 09:18:36,943 org.apache.spark.scheduler.TaskSetManager: Lost task 7.0 in stage 67.0 (TID 231, 10.47.182.142): java.io.IOException: Exception during execution of SELECT "source", "type", "scheduled_for", "id" FROM "zhihu"."schedules" WHERE token("source", "type") > ? AND token("source", "type") <= ?   ALLOW FILTERING: Cassandra timeout during read query at consistency QUORUM (3 responses were required but only 0 replica responded)
 at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:215)
 at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$13.apply(CassandraTableScanRDD.scala:229)
 at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$13.apply(CassandraTableScanRDD.scala:229)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:966)
 at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:425)
 at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:248)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1652)
 at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208)
Caused by: com.datastax.driver.core.exceptions.ReadTimeoutException: Cassandra timeout during read query at consistency QUORUM (3 responses were required but only 0 replica responded)
 at com.datastax.driver.core.exceptions.ReadTimeoutException.copy(ReadTimeoutException.java:69)
 at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:269)
 at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:183)
 at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52)
 at sun.reflect.GeneratedMethodAccessor199.invoke(Unknown Source)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
 at com.sun.proxy.$Proxy8.execute(Unknown Source)
 at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:207)
 ... 14 more
Caused by: com.datastax.driver.core.exceptions.ReadTimeoutException: Cassandra timeout during read query at consistency QUORUM (3 responses were required but only 0 replica responded)
 at com.datastax.driver.core.exceptions.ReadTimeoutException.copy(ReadTimeoutException.java:69)
 at com.datastax.driver.core.Responses$Error.asException(Responses.java:99)
 at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:118)
 at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:183)
 at com.datastax.driver.core.RequestHandler.access$2300(RequestHandler.java:45)
 at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:748)
 at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:587)
 at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:991)
 at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:913)
 at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:307)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:293)
 at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:307)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:293)
 at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:307)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:293)
 at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:276)
 at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:263)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:307)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:293)
 at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:840)
 at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:830)
 at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:348)
 at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:264)
 at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
 at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.ReadTimeoutException: Cassandra timeout during read query at consistency QUORUM (3 responses were required but only 0 replica responded)
 at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:60)
 at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:37)
 at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:213)
 at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:204)
 at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
 ... 12 more

[Stage 67:===================================================>    (12 + 1) / 13]

谢谢你的帮助!

apache-spark cassandra pyspark datastax datastax-enterprise
2个回答
1
投票

您必须创建ReadConf对象,然后增加读取数据的读取时间。除了使用WriteConf,您还可以增加写入时间。 Cassandra驱动程序默认使用几秒钟进行读写。所以改变这一点。


0
投票

为了增加讨论,我有完全相同的问题,这就是我尝试过的。在我使用的spark-configuration属性文件中,我添加了以下参数:spark.cassandra.read.timeout_ms = 300000

这就是我提交火花的方式:

/opt/spark/bin/spark-submit \
--class com.pmi.spice.spark.ServiceHousekeepingMain \
--jars log4j-elasticsearch-appender.jar \
--driver-class-path log4j-elasticsearch-appender.jar \
--deploy-mode client \
--files spice6-truststore.jks,log4j.properties \
--properties-file spark-configuration.properties \
--driver-java-options "-Dlog4j.configuration=file:log4j.properties" \
--verbose \
spark-house-cassandra.jar --settings=job-configuration.properties   --startTime=`date +'%Y-%m-%d-00:00:00UTC'` $S3_ARG

但是,Spark采用默认超时。我想要一些帮助。谢谢

© www.soinside.com 2019 - 2024. All rights reserved.