我正在努力使用 Python 和请求访问流 API。
API 的内容:“我们启用了一个流端点,以便利用持久的 HTTP 套接字连接来请求报价和交易数据。来自 API 的流数据包括发出经过身份验证的 HTTP 请求并保持 HTTP 套接字打开以持续接收数据。”
我如何尝试访问数据:
s = requests.Session()
def streaming(symbols):
url = 'https://stream.tradeking.com/v1/market/quotes.json'
payload = {'symbols': ','.join(symbols)}
return s.get(url, params=payload, stream=True)
r = streaming(['AAPL', 'GOOG'])
Requests 文档here展示了两件有趣的事情:使用生成器/迭代器来处理在数据字段中传递的分块数据。对于流数据,建议使用如下代码:
for line in r.iter_lines():
print(line)
似乎都不起作用,尽管我不知道在生成器函数中放入什么,因为示例不清楚。使用 r.iter_lines(),我得到输出: "b'{"status":"connected"}{"status":disconnected"}'"
我可以访问标头,响应是 HTTP 200,但无法获取有效数据,或者找到有关如何在 python 中访问流式 HTTP 数据的清晰示例。任何帮助,将不胜感激。 API 建议使用 Jetty for Java 来保持流打开,但我不确定如何在 Python 中执行此操作。
标头:{'connection':'keep-alive','content-type':'application/json','x-powered-by':'Express','transfer-encoding':'chunked'}
正如 verbsintransit 所说,您需要解决身份验证问题,但是您的流媒体问题可以通过使用以下示例来解决:
s = requests.Session()
def streaming(symbols):
payload = {'symbols': ','.join(symbols)}
headers = {'connection': 'keep-alive', 'content-type': 'application/json', 'x-powered-by': 'Express', 'transfer-encoding': 'chunked'}
req = requests.Request("GET",'https://stream.tradeking.com/v1/market/quotes.json',
headers=headers,
params=payload).prepare()
resp = s.send(req, stream=True)
for line in resp.iter_lines():
if line:
yield line
def read_stream():
for line in streaming(['AAPL', 'GOOG']):
print line
read_stream()
if line:
条件正在检查line
是否是一条实际消息或只是一个连接保持活动状态。
不确定您是否明白这一点,但 TradeKing 不会在 JSON blob 之间添加换行符。因此,您必须使用 iter_content 逐字节获取它,将该字节附加到缓冲区,尝试解码缓冲区,成功后清除缓冲区并生成结果对象。 :(
import requests
from requests_oauthlib import OAuth1
def streaming(symbols):
consumer_key = '***'
consumer_secret = '***'
access_token = '***'
access_secret = '***'
auth = OAuth1(consumer_key,
client_secret = consumer_secret,
resource_owner_key = access_token,
resource_owner_secret = access_secret)
payload = {'symbols': ','.join(symbols)}
resp = requests.Session().request("GET",'https://stream.tradeking.com/v1/market/quotes.json',stream=True,auth=auth,params=payload)
# resp.raise_for_status()
for chunk in resp.iter_content(chunk_size=1):
if chunk:
yield chunk.decode('utf8')
#try this
for line in streaming(['AAPL', 'GOOG']):
print(line)
这对我有用:
import requests
headers = {'Content-Type': 'application/json'}
response = requests.post(URL, data=body, headers=headers, stream=True)
if response.status_code == 200:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
print("DO something")