用tweepy和pyspark流推特

问题描述 投票:1回答:1

[我正在尝试使用pyspark和tweepy库流式传递推文,以根据转发和喜欢的次数获得前十名推文。

第一步是使用tweepy传输tweet,我在pycharm中完美地传输了tweets,这是代码:

import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener
import socket
import json

consumer_key = 'consumer_key'
consumer_secret = 'secret_key'
access_token = 'token_key'
access_secret = 'access_secret_key'

class TweetsListener(StreamListener):

  def __init__(self, csocket):
  self.client_socket = csocket

  def on_data(self, data):
      try:
      msg = json.loads( data )
      print( msg['text'].encode('utf-8') )
      self.client_socket.send( 
      msg['text'].encode('utf-8') )
      return True
      except BaseException as e:
      print("Error on_data: %s" % str(e))
     return True

  def on_error(self, status):
     print(status)
     return True

  def sendData(c_socket):
    auth = OAuthHandler(consumer_key, 
   consumer_secret)
   auth.set_access_token(access_token, 
    access_secret)

   twitter_stream = Stream(auth, 
   TweetsListener(c_socket))
    twitter_stream.filter(track= 
    ['iphone'],languages=["en"])

  if __name__ == "__main__":
    s = socket.socket()       
    host = "192.168.0.12"      
    port = 5555                 
    s.bind((host, port))        

    print("Listening on port: %s" % str(port))

     s.listen(5)                 
   c, addr = s.accept()        

  print( "Received request from: " + str( addr ) )

   sendData( c )

第二,这是我在pyyter Notebook中运行的pyspark代码。首先是流代码:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
from pyspark.sql import HiveContext 
sc
ssc = StreamingContext(sc, 10 )

sqlContext = SQLContext(sc)
socket_stream = ssc.socketTextStream("192.168.0.12", 5555)
lines = socket_stream.window( 20 )
from collections import namedtuple 

fields = ("tag", "count" )

Tweet = namedtuple( 'Tweet', fields)

(lines.flatMap(lambda text: text.split(" ")).filter(lambda word: word.lower().startswith("#")) 
.map( lambda word: ( word.lower(), 1)) 
.reduceByKey( lambda a, b: a + b) 
.map(lambda rec: Tweet(rec[0], rec[1])) 
.foreachRDD(lambda rdd: rdd.toDF().sort(desc("count")).limit(10).registerTempTable("tweets") ))

ssc.start()

然后是一些测试和绘图代码:

import time
from IPython import display
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

count = 0
 while count < 10:

    time.sleep( 3 )

    top_10_tweets = sqlContext.sql( 'Select tag, count from tweets' )
    top_10_df = top_10_tweets.toPandas() 
    display.clear_output(wait=True) #Clears the output, if a plot exists.
    sns.plt.figure( figsize = ( 10, 8 ) )
    sns.barplot( x="count", y="tag", data=top_10_df)
    sns.plt.show()
    count = count + 1

但是当我到达以count = 0开始直到结束的最后一个单元格时,我得到了这个错误:


---------------------------
 Py4JJavaError                             
     Traceback (most recent call last)
  /usr/local/Cellar/apache-/usr/local/Cellar/apache- 
spark/2.4.0/libexec/python/pyspark/sql/utils.py 
in deco(*a, **kw)
 62         try:
---> 63             return f(*a, **kw)
    64         except 
  py4j.protocol.Py4JJavaError as e:

  /usr/local/Cellar/apache- 
  spark/2.4.0/libexec/python/lib/py4j-0.10.7- 
  src.zip/py4j/protocol.py in 
  get_return_value(answer, gateway_client, 
  target_id, name)
   327                     "An error occurred 
  while calling {0}{1}{2}.\n".
  --> 328                     format(target_id, 
  ".", name), value)
      329             else:

       AnalysisException: 'Table or view not 
        found: tweets; line 1 pos 23'

知道如何解决吗?

python twitter pyspark
1个回答
0
投票

这不是代码的问题。由于“ registerTempTable”没有创建名为“ tweet”的临时视图,因此会出现此问题。这不是在本地计算机上完成的,但是您有任何虚拟机(Ubuntu可以正常运行),然后将创建临时视图。

© www.soinside.com 2019 - 2024. All rights reserved.