StreamConn unsubscribe()和close()似乎不起作用(羊驼皮),正在征求意见

2024-04-19 06:11:56 发布

您现在位置:Python中文网/ 问答频道 /正文

刚开始使用Alpaca API(也使用Polygon IO服务),就遇到了以下问题,看起来应该很简单,但我被卡住了。代码初始化并正确打印所有预期信息。问题是,当我调用“unsubscribe”时,没有错误消息,消息总是不断地出现

有趣的是,我还尝试调用了deregister(),甚至只调用close()——都没有用。我也尝试过以各种方式调整模式——同样,没有抛出错误,很难理解。但是“关闭连接”这一行加上股票代码只是不断地出现

我怀疑调用在某个地方的StreamConn/asyncio事件循环中丢失,并且根本没有运行,但我不确定如何最好地解决这个问题。任何想法都欢迎。Python3.7,以防不明显

请注意,除了在输入流数据时将其打印出来,我对这个脚本没有做任何有趣的事情,这是理解为什么我无法关闭连接的一种方式。文档说明(和代码显示)我们应该能够在运行中期订阅其他频道,但到目前为止,这也不起作用。最终期望的最终状态是能够以持续的方式订阅其他频道和取消订阅现有频道。由于库使用asyncio,应该已经可以使用线程进行管理,人们会认为

羊驼的StreamConn源代码在这里:https://github.com/alpacahq/alpaca-trade-api-python/blob/master/alpaca_trade_api/stream2.py羊驼版本扩展了原始多边形类,在这里:https://github.com/alpacahq/alpaca-trade-api-python/blob/master/alpaca_trade_api/polygon/streamconn.py

from alpaca_trade_api import StreamConn
from alpaca_config import AlpacaAPIConfig
import datetime

script_start = datetime.datetime.now()
conn = StreamConn(AlpacaAPIConfig.get_public_api_key(), AlpacaAPIConfig.get_secret_key())
symbols = ['AAPL', 'MSFT']


async def handle_signal(channel, symbol, value):
    if (datetime.datetime.now() - script_start).seconds > 30:
        print('Closing connection for ' + str(symbol))
        await conn.unsubscribe([r'^T.' + str(symbol), r'^AM.' + str(symbol)])
    else:
        print(str(channel) + '.' + str(symbol))
        print(str(value))
        print(' ')


@conn.on(r'^AM.*$', symbols)  # AM. denotes minute bars, symbols is the list of tickers to listen for
async def on_bar(conn, channel, bar):
    symbol = bar.symbol
    close = float(bar.close)
    print('-- AM Hit --')
    await handle_signal(channel, symbol, close)


@conn.on(r'^T.*$', symbols)  # T. denotes a trade event, symbols is the list of tickers to listen for
async def on_trade(conn, channel, trade):
    symbol = str(trade.symbol)
    price = str(trade.price)
    print('-- T Hit --')
    await handle_signal(channel, symbol, price)


def kickoff():
    conn.run([
        'T.*',
        'AM.*'
    ])


if __name__ == '__main__':
    kickoff()


输出的副本,以防万一有用:

-- T Hit --
T.AAPL
382.31
 
-- T Hit --
T.AAPL
382.31
 
-- AM Hit --
AM.AAPL
382.31
 
-- AM Hit --
AM.MSFT
213.32
 
-- T Hit --
T.MSFT
213.45
 
-- T Hit --
Closing connection for AAPL
-- T Hit --
Closing connection for AAPL
-- T Hit --
Closing connection for AAPL
-- T Hit --
Closing connection for MSFT
-- T Hit --
Closing connection for AAPL
-- T Hit --
Closing connection for AAPL

更新 显然,.run.subscribe / .unsubscribe在一起打得不好。我将kickoff()函数更改为以下内容:

async def kickoff():
    await conn.subscribe(['T.AAPL', 'T.MSFT', 'AM.AAPL', 'AM.MSFT'])

以及__main__()函数:

if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    loop.create_task(kickoff())
    loop.run_forever()

现在,使用.subscribe()而不是.run,ticker行为与预期一样(在调用.unsubscribe()时停止)。但是,现在我必须找出事件循环部分


Tags: apiforchannelconnectionconnsymbolamprint