class MyStreamListener(tweepy.StreamListener):
def on_status(self, status):
print(status.text) # prints every tweet received
def on_error(self, status_code):
if status_code == 420: # end of monthly limit rate (500k)
return False
我使用Python 3.9并通过pip安装Tweepy。我在班级线上收到了
AttributeError
。
我的进口只是import tweepy
。身份验证得到正确处理。
在 streaming.py
文件中,我有类 Stream
。但使用这个类就到此为止了。例如,即使有 status.text
功能,也没有 on_status
。我有点困惑。
正如 @Harmon758 所提到的,他们在版本 4 之后将 StreamListener 合并到 Stream 中。此外,您不需要单独创建 api auth 对象。这是代码:
from tweepy import Stream
class MyStreamListener(Stream):
def on_status(self, status):
print(status.text) # prints every tweet received
def on_error(self, status_code):
if status_code == 420: # end of monthly limit rate (500k)
return False
stream = MyStreamListener('consumer_key',
'consumer_secret',
'access_token',
'access_token_secret')
stream.filter(track=["Python"], languages=["en"])
如果您查看模块,引用
StreamListener
的正确方法是 tweepy.streaming.StreamListener
,而不是 tweepy.StreamListener
。
对于将来的其他人来说,这不再起作用,现在你需要使用 twitter api v2
班级是:
tweepy.StreamingClient(bearer_token, *, return_type=Response, wait_on_rate_limit=False, chunk_size=512, daemon=False, max_retries=inf, proxy=None, verify=True)
所以你可以这样使用它:
导入tweepy 从 tweepy 导入 StreamingClient,StreamRule
bearer_token = 'WXXXXXXXX'
类 TweetPrinterV2(tweepy.StreamingClient):
def on_tweet(self, tweet):
print(f"{tweet.id} {tweet.created_at} ({tweet.author_id}): {tweet.text}")
流 = TweetPrinterV2(bearer_token)
rule = StreamRule(value="CryptoApeGod") Stream.add_rules(规则)
打印机.filter()
如果您需要,还有一个使用 asyncio 的异步版本: https://docs.tweepy.org/en/stable/asyncstreamingclient.html