为什么我只能通过套接字从Tweepy侦听器通过套接字发送变量“文本”,并从Spark获得结构化流?

问题描述 投票:1回答:1

您可以在标题中读到,我用pyspark编写的程序有一个奇怪的问题。

我有2个程序通过本地套接字进行通信。该程序的目的是发送Twitter数据。当我仅发送text变量数据时,程序运行良好。但是,如果要发送另一个变量,则不会收到任何数据。即使当我从变量text和另一个变量(例如id)发送数据时,它也不起作用。

下面是我的代码:

Client.py:

import os
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener
import socket
import json

#override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):

def __init__(self, csocket):
    self.client_socket = csocket

def on_data(self, data):
    try:
        tweet = str(json.loads(data)['text']).encode('utf-8')   #it works
        #tweet = str(json.loads(data)['id']).encode('utf-8')   #it doesn't work
        #mykeys = ['text','id']
        #tweet = str([json.loads(data)[x] for x in mykeys]).encode('utf-8')  #it doesn't work
        print(tweet)
        self.client_socket.sendall(tweet)
        return True
    except BaseException as e:
        print("Error on_data: %s" % str(e))
    return True

def sendData(c_socket):
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_secret)

    twitter_stream = Stream(auth, MyStreamListener(c_socket))
    twitter_stream.filter(track=['#fakenews'])

if __name__ == "__main__":
    s = socket.socket()     # Create a socket object
    host = "localhost"      # Get local machine name
    port = 5555             # Reserve a port for your service.
    s.bind((host, port))    # Bind to the port

    print("Listening on port: %s" % str(port))

    s.listen(1)                 # Now wait for client connection.
    c, addr = s.accept()        # Establish connection with client.

    print( "Received request from: " + str( addr ) )

    sendData(c)

Server.py:

#Need to launch Spark
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, desc, decode
from pyspark.sql.types import *
import json

spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()

IP = "localhost"
Port = 5555

lines = spark.readStream.format("socket").option("host", IP).option("port", Port).load()

query = lines.writeStream.outputMode("append").format("memory").queryName("tmpTable")

query.start()

查看临时表的代码:

spark.sql("select * from tmpTable").show(n=50)

我该如何解决?

python twitter pyspark streaming tweepy
1个回答
0
投票

我有同样的问题。您找到解决问题的方法了吗?

© www.soinside.com 2019 - 2024. All rights reserved.