我正在尝试使用spark scala代码来传输Twitter数据。我能够获取数据并创建数据帧并查看它。但是当尝试提取status.getPlace.getCountry()时,我得到了一个java.lang.NullPointerException。
Spark版本:2.0.0,Scala版本:2.11.8
尝试条件,检查价值等,但徒劳无功。
码:
val spark = SparkSession.builder().appName("Twitter Spark Example").getOrCreate()
val ssc = new StreamingContext(spark.sparkContext,Seconds(5))
val filters:Seq[String] = Seq("hadoop")
val cb = new ConfigurationBuilder()
.setOAuthConsumerKey("******")
.setOAuthConsumerSecret("******")
.setOAuthAccessToken("********")
.setOAuthAccessTokenSecret("******").build()
val twitter_auth = new TwitterFactory(cb)
val a = new OAuthAuthorization(cb)
val atwitter:Option[twitter4j.auth.Authorization] = Some(twitter_auth.getInstance(a).getAuthorization())
val tweetsdstream = TwitterUtils.createStream(ssc, atwitter, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
val data = tweetsdstream.map {status =>
val places = status.getPlace
val id = status.getUser.getId
val date = status.getUser.getCreatedAt.toString()
val user = status.getUser.getName()
val place = places.getCountry()
(id,date,user,place)
}
data.foreachRDD{rdd =>
import spark.implicits._
rdd.toDF("id","date","user","place").show()
}
ssc.start()
ssc.awaitTermination()
从twitter访问位置信息有什么限制吗?任何的意见都将会有帮助。
谢谢
你可以使用Option
来处理null
s:
val data = tweetsdstream.map {
status =>
val place = Option(status.getPlace).map(_.getCountry).orNull
val id = status.getUser.getId
val user = status.getUser.getName
val date = status.getUser.getCreatedAt.toString
(id, date, user, place)
}
通过这种方式,您将能够可视化所有推文,无论它们是否具有某个国家/地区(并且在未定义国家/地区时它将为空)。
Option
对于处理可能缺少的数据非常有用,可以随意将其用于其他可能空的字段。
我想请换行*val tweetsdstream = TwitterUtils.createStream(ssc, atwitter, filters, StorageLevel.MEMORY_AND_DISK_SER_2)*
改变成这样,然后简单地锻炼它
val stream = TwitterUtils.createStream(scc, None, filters)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
如果您想了解更多信息,请访问:http://commandstech.com/spark-streaming-twitter-example/