您可以在标题中读到,我用pyspark编写的程序有一个奇怪的问题。
我有2个程序通过本地套接字进行通信。该程序的目的是发送Twitter数据。当我仅发送text
变量数据时,程序运行良好。但是,如果要发送另一个变量,则不会收到任何数据。即使当我从变量text
和另一个变量(例如id
)发送数据时,它也不起作用。
下面是我的代码:
Client.py:
import os
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener
import socket
import json
#override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):
def __init__(self, csocket):
self.client_socket = csocket
def on_data(self, data):
try:
tweet = str(json.loads(data)['text']).encode('utf-8') #it works
#tweet = str(json.loads(data)['id']).encode('utf-8') #it doesn't work
#mykeys = ['text','id']
#tweet = str([json.loads(data)[x] for x in mykeys]).encode('utf-8') #it doesn't work
print(tweet)
self.client_socket.sendall(tweet)
return True
except BaseException as e:
print("Error on_data: %s" % str(e))
return True
def sendData(c_socket):
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
twitter_stream = Stream(auth, MyStreamListener(c_socket))
twitter_stream.filter(track=['#fakenews'])
if __name__ == "__main__":
s = socket.socket() # Create a socket object
host = "localhost" # Get local machine name
port = 5555 # Reserve a port for your service.
s.bind((host, port)) # Bind to the port
print("Listening on port: %s" % str(port))
s.listen(1) # Now wait for client connection.
c, addr = s.accept() # Establish connection with client.
print( "Received request from: " + str( addr ) )
sendData(c)
Server.py:
#Need to launch Spark
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, desc, decode
from pyspark.sql.types import *
import json
spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
IP = "localhost"
Port = 5555
lines = spark.readStream.format("socket").option("host", IP).option("port", Port).load()
query = lines.writeStream.outputMode("append").format("memory").queryName("tmpTable")
query.start()
查看临时表的代码:
spark.sql("select * from tmpTable").show(n=50)
我该如何解决?
我有同样的问题。您找到解决问题的方法了吗?