socket.io客户端库

socketIO-client-MV的Python项目详细描述


https://travis-ci.org/invisibleroads/socketIO-client.svg?branch=master

Socketio客户端

这是一个用于python的socket.io客户端库。您可以使用它为socket.io服务器编写测试代码。

这是Socketio_客户端的分叉版本。你可以找到原始的here

安装

在隔离环境中安装软件包。

VIRTUAL_ENV=$HOME/.virtualenv

# Prepare isolated environment
virtualenv $VIRTUAL_ENV

# Activate isolated environment
source $VIRTUAL_ENV/bin/activate

# Install package
pip install -U socketIO-client

使用量

激活隔离环境。

VIRTUAL_ENV=$HOME/.virtualenv
source $VIRTUAL_ENV/bin/activate

启动socket.io服务器。

cd $(python -c "import os, socketIO_client;\
    print(os.path.dirname(socketIO_client.__file__))")

DEBUG=* node tests/serve.js  # Start socket.io server in terminal one
DEBUG=* node tests/proxy.js  # Start proxy server in terminal two
nosetests                    # Run tests in terminal three

有关调试信息,请先运行这些命令。

import logging
logging.getLogger('socketIO-client').setLevel(logging.DEBUG)
logging.basicConfig()

发射。

from socketIO_client import SocketIO, LoggingNamespace

with SocketIO('127.0.0.1', 8000, LoggingNamespace) as socketIO:
    socketIO.emit('aaa')
    socketIO.wait(seconds=1)

使用回调发出。

from socketIO_client import SocketIO, LoggingNamespace

def on_bbb_response(*args):
    print('on_bbb_response', args)

with SocketIO('127.0.0.1', 8000, LoggingNamespace) as socketIO:
    socketIO.emit('bbb', {'xxx': 'yyy'}, on_bbb_response)
    socketIO.wait_for_callbacks(seconds=1)

定义事件。

from socketIO_client import SocketIO, LoggingNamespace

def on_connect():
    print('connect')

def on_disconnect():
    print('disconnect')

def on_reconnect():
    print('reconnect')

def on_aaa_response(*args):
    print('on_aaa_response', args)

socketIO = SocketIO('127.0.0.1', 8000, LoggingNamespace)
socketIO.on('connect', on_connect)
socketIO.on('disconnect', on_disconnect)
socketIO.on('reconnect', on_reconnect)

# Listen
socketIO.on('aaa_response', on_aaa_response)
socketIO.emit('aaa')
socketIO.emit('aaa')
socketIO.wait(seconds=1)

# Stop listening
socketIO.off('aaa_response')
socketIO.emit('aaa')
socketIO.wait(seconds=1)

# Listen only once
socketIO.once('aaa_response', on_aaa_response)
socketIO.emit('aaa')  # Activate aaa_response
socketIO.emit('aaa')  # Ignore
socketIO.wait(seconds=1)

在命名空间中定义事件。

from socketIO_client import SocketIO, BaseNamespace

class Namespace(BaseNamespace):

    def on_aaa_response(self, *args):
        print('on_aaa_response', args)
        self.emit('bbb')

socketIO = SocketIO('127.0.0.1', 8000, Namespace)
socketIO.emit('aaa')
socketIO.wait(seconds=1)

定义标准事件。

from socketIO_client import SocketIO, BaseNamespace

class Namespace(BaseNamespace):

    def on_connect(self):
        print('[Connected]')

    def on_reconnect(self):
        print('[Reconnected]')

    def on_disconnect(self):
        print('[Disconnected]')

socketIO = SocketIO('127.0.0.1', 8000, Namespace)
socketIO.wait(seconds=1)

在单个套接字上定义不同的命名空间。

from socketIO_client import SocketIO, BaseNamespace

class ChatNamespace(BaseNamespace):

    def on_aaa_response(self, *args):
        print('on_aaa_response', args)

class NewsNamespace(BaseNamespace):

    def on_aaa_response(self, *args):
        print('on_aaa_response', args)

socketIO = SocketIO('127.0.0.1', 8000)
chat_namespace = socketIO.define(ChatNamespace, '/chat')
news_namespace = socketIO.define(NewsNamespace, '/news')

chat_namespace.emit('aaa')
news_namespace.emit('aaa')
socketIO.wait(seconds=1)

通过ssl连接(https://github.com/invisibleroads/socketIO-client/issues/54)。

from socketIO_client import SocketIO

# Skip server certificate verification
SocketIO('https://127.0.0.1', verify=False)
# Verify the server certificate
SocketIO('https://127.0.0.1', verify='server.crt')
# Verify the server certificate and encrypt using client certificate
socketIO = SocketIO('https://127.0.0.1', verify='server.crt', cert=(
    'client.crt', 'client.key'))

指定了由于{{A5}库而导致的参数、标头、cookies、代理。

from socketIO_client import SocketIO
from base64 import b64encode

SocketIO(
    '127.0.0.1', 8000,
    params={'q': 'qqq'},
    headers={'Authorization': 'Basic ' + b64encode('username:password')},
    cookies={'a': 'aaa'},
    proxies={'https': 'https://proxy.example.com:8080'})

永远等待。

from socketIO_client import SocketIO

socketIO = SocketIO('127.0.0.1', 8000)
socketIO.wait()

不要永远等待。

from requests.exceptions import ConnectionError
from socketIO_client import SocketIO

try:
    socket = SocketIO('127.0.0.1', 8000, wait_for_connection=False)
    socket.wait()
except ConnectionError:
    print('The server is down. Try again later.')

许可证

这个软件是在麻省理工学院的许可下提供的。

0.7

  • 固定线程清理
  • 修复了由于Andreas Strikos直接定义的断开连接检测
  • 固定支持Unicode有效载荷

0.6

  • 由于Sean Arietta和Joe Palmer,已升级到Socket.io Protocol 1.x
  • 修复了对Python3的支持
  • 固定SSL支持
  • 添加锁以解决轮询传输的并发问题
  • 添加了socketio.off()和socketio.once()

0.5

  • 增加了对Python3的支持
  • 由于bernard pratz,增加了对jsonp轮询的支持
  • 由于francis bull,增加了对xhr轮询的支持
  • 添加了对查询参数和cookies的支持
  • 修复了由于travis odom而在自定义命名空间中发送确认的问题
  • 重写库以使用协程而不是线程来节省内存

0.4

  • 由于芮和Sajal
  • 增加了对自定义标题和代理的支持
  • 由于zac lee,增加了对服务器端回调的支持
  • 由于Alexandre Bourget,已将频道功能合并到BaseNamespace中

0.3

  • 增加了对安全连接的支持
  • 添加了socketio.wait()
  • 改进了rhythmicthread和listenerthread中的异常处理

0.2

  • 由于Paul Kienzle,增加了对回调和频道的支持
  • 采纳了Josh Vanderlinden和Ian Fitzpatrick的建议

0.1

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
空字符串检查在java中未按预期工作   JavaSpringWebClient:自动计算主体的HMAC签名并将其作为头传递   foreach是否有一个Java等效的foreach循环和一个引用变量?   java如何在Eclipse中导入jar   使用特定第三方或java时lombok触发错误。*方法或构造函数   安卓 java将对象数组转换为int数组   java使一定百分比的JUnit测试通过   java Android:将Seekbar的一个值与另一个值进行比较   java将int数组(图像数据)写入文件的最佳方式是什么   java取代了系统。yml的构造函数内的getProperty   sqlite Java将公钥和私钥转换为字符串,然后再转换回字符串   安卓获取白色像素并将其保存到java opencv中的数组中   java为什么是ServerSocket。setSocketFactory静态?   Java数组似乎在不直接修改的情况下更改值