连接到cryptocomp的Python socketio示例

2024-06-09 10:03:48 发布

您现在位置:Python中文网/ 问答频道 /正文

我正试图从Python客户机使用socketIO连接到Cryptocompare的websocket流。事实证明这是具有挑战性的。下面是一些Python代码示例:

from socketIO_client import SocketIO

print "connecting to server"
socketIO = SocketIO('https://streamer.cryptocompare.com/',443, transports=['websocket'])
print "Connected"

然而,不管我做什么,我都无法连接。事实上,这就是这种联系的结果

^{pr2}$

将上面的代码包装在try-catch中并打印异常不会产生额外的信息。任何帮助都将不胜感激。在


Tags: to代码fromimportclient示例客户机websocket
3条回答

我从几个角度研究了这个问题,并得出结论:socketio client for python不能与此API一起工作。如果您只想从CryptoCompare流式api将数据流式传输到python,那么我有一个有效的解决方法,它使用websockets向一个简单的nodejs应用程序提交请求,然后使用其socketio客户机将所需的数据流式返回。我对python还比较陌生,只是看了一下nodejs的这个解决方案,所以别着急。在

加密目前很热门,所以我相信这对某些人是有用的

Python部分:

import json
import pandas as pd
try:
   import thread
except ImportError:
import _thread as thread

import threading
import time
import websocket


class WebSocketClient(threading.Thread):

 def __init__(self):
    self.url = 'ws://localhost:9030/path'
    # self.daemon = True
    self.clist = list()
    threading.Thread.__init__(self)


 def run(self):

    # Running the run_forever() in a seperate thread.
    #websocket.enableTrace(True)
    self.ws = websocket.WebSocketApp(self.url,
                                     on_message = self.on_message,
                                     on_error = self.on_error,
                                     on_close = self.on_close)
    self.ws.on_open = self.on_open
    self.ws.run_forever()

 def send(self, data):

    data = self._encode_message(data)
    # Wait till websocket is connected.
    while not self.ws.sock.connected:
        time.sleep(0.25)

    print(f'Sending data... {data}')
    self.ws.send(data)

 def stop(self):
    print(f'Stopping the websocket...')
    self.ws.close()

 def on_message(self, ws, message):
    message = self._decode_message(message)
    print(f'Received data...{message}')
    if message['msg']=='crypto':

        self.clist.append(message['data'])

 def on_error(self, ws, error):
    print(f'Received error...')
    print(error)

 def on_close(self, ws):
    print('Closed the connection...')

 def on_open(self, ws):
    print('Opened the connection...')
    data = {"msg":"open" ,"from":"Rob", "data":"Hello from the client"}
    self.send(data)

 def _encode_message(self,message):

    message = json.dumps(message)
    return message

 def _decode_message(self, message):

    message = json.loads(message)
    return message

 def getclist(self):
    return self.clist


wsCli =  WebSocketClient()
wsCli.daemon = True
wsCli.start()
wsCli.send({"msg":"getcrypto" ,"from":"Client", "data":['0~Coinbase~BTC~USD'],"subs":['0~Coinbase~BTC~USD']})
wsCli.stop()

Nodejs部分:

和13; 第13章;

socketIO_客户端库似乎不支持cryptocompare使用的XHR轮询协议。我通过重写socketIO_client.transports.XHR_PollingTransport类中的方法recv_packet来实现它。在

import logging
import socketIO_client
from socketIO_client.transports import get_response
from socketIO_client.parsers import get_byte, _read_packet_text, parse_packet_text

from requests.exceptions import ConnectionError

# extra function to support XHR1 style protocol
def _new_read_packet_length(content, content_index):
    packet_length_string = ''
    while get_byte(content, content_index) != ord(':'):
        byte = get_byte(content, content_index)
        packet_length_string += chr(byte)
        content_index += 1
    content_index += 1
    return content_index, int(packet_length_string)

def new_decode_engineIO_content(content):
    content_index = 0
    content_length = len(content)
    while content_index < content_length:
        try:
            content_index, packet_length = _new_read_packet_length(
                content, content_index)
        except IndexError:
            break
        content_index, packet_text = _read_packet_text(
            content, content_index, packet_length)
        engineIO_packet_type, engineIO_packet_data = parse_packet_text(
            packet_text)
        yield engineIO_packet_type, engineIO_packet_data

def new_recv_packet(self):
    params = dict(self._params)
    params['t'] = self._get_timestamp()
    response = get_response(
        self.http_session.get,
        self._http_url,
        params=params,
        **self._kw_get)
    for engineIO_packet in new_decode_engineIO_content(response.content):
        engineIO_packet_type, engineIO_packet_data = engineIO_packet
        yield engineIO_packet_type, engineIO_packet_data

setattr(socketIO_client.transports.XHR_PollingTransport, 'recv_packet', new_recv_packet)

logging.basicConfig(level=logging.DEBUG)

try:
    socket = socketIO_client.SocketIO('https://streamer.cryptocompare.com')
    socket.emit('SubAdd', { 'subs': ['0~Kraken~BTC~USD'] });
    socket.wait()
except ConnectionError:
    print('The server is down. Try again later.')

解决方案主要基于以下github注释:https://github.com/invisibleroads/socketIO-client/issues/129#issuecomment-330058318

您需要在设置套接字后立即调用emit来设置要接收的订阅。在

socketIO.emit('SubAdd', { subs: ['0~Poloniex~BTC~USD'] });

相关问题 更多 >