socket.io客户端库

socketIO-client的Python项目详细描述


https://travis-ci.org/invisibleroads/socketIO-client.svg?branch=master

Socketio客户端

这是一个用于python的socket.io客户端库。您可以使用它为socket.io服务器编写测试代码。

请注意,此版本实现socket.io protocol 1.x,这是不向后兼容的。如果要使用socket.io protocol 0.9(与gevent-socketio兼容)进行通信,请使用socketIO-client 0.5.7.2

安装

在隔离环境中安装软件包。

VIRTUAL_ENV=$HOME/.virtualenv

# Prepare isolated environment
virtualenv $VIRTUAL_ENV

# Activate isolated environment
source $VIRTUAL_ENV/bin/activate

# Install package
pip install -U socketIO-client

使用量

激活隔离环境。

VIRTUAL_ENV=$HOME/.virtualenv
source $VIRTUAL_ENV/bin/activate

启动socket.io服务器。

cd $(python -c "import os, socketIO_client;\
    print(os.path.dirname(socketIO_client.__file__))")

DEBUG=* node tests/serve.js  # Start socket.io server in terminal one
DEBUG=* node tests/proxy.js  # Start proxy server in terminal two
nosetests                    # Run tests in terminal three

有关调试信息,请先运行这些命令。

import logging
logging.getLogger('socketIO-client').setLevel(logging.DEBUG)
logging.basicConfig()

发射。

from socketIO_client import SocketIO, LoggingNamespace

with SocketIO('localhost', 8000, LoggingNamespace) as socketIO:
    socketIO.emit('aaa')
    socketIO.wait(seconds=1)

使用回调发出。

from socketIO_client import SocketIO, LoggingNamespace

def on_bbb_response(*args):
    print('on_bbb_response', args)

with SocketIO('localhost', 8000, LoggingNamespace) as socketIO:
    socketIO.emit('bbb', {'xxx': 'yyy'}, on_bbb_response)
    socketIO.wait_for_callbacks(seconds=1)

定义事件。

from socketIO_client import SocketIO, LoggingNamespace

def on_connect():
    print('connect')

def on_disconnect():
    print('disconnect')

def on_reconnect():
    print('reconnect')

def on_aaa_response(*args):
    print('on_aaa_response', args)

socketIO = SocketIO('localhost', 8000, LoggingNamespace)
socketIO.on('connect', on_connect)
socketIO.on('disconnect', on_disconnect)
socketIO.on('reconnect', on_reconnect)

# Listen
socketIO.on('aaa_response', on_aaa_response)
socketIO.emit('aaa')
socketIO.emit('aaa')
socketIO.wait(seconds=1)

# Stop listening
socketIO.off('aaa_response')
socketIO.emit('aaa')
socketIO.wait(seconds=1)

# Listen only once
socketIO.once('aaa_response', on_aaa_response)
socketIO.emit('aaa')  # Activate aaa_response
socketIO.emit('aaa')  # Ignore
socketIO.wait(seconds=1)

在命名空间中定义事件。

from socketIO_client import SocketIO, BaseNamespace

class Namespace(BaseNamespace):

    def on_aaa_response(self, *args):
        print('on_aaa_response', args)
        self.emit('bbb')

socketIO = SocketIO('localhost', 8000, Namespace)
socketIO.emit('aaa')
socketIO.wait(seconds=1)

定义标准事件。

from socketIO_client import SocketIO, BaseNamespace

class Namespace(BaseNamespace):

    def on_connect(self):
        print('[Connected]')

    def on_reconnect(self):
        print('[Reconnected]')

    def on_disconnect(self):
        print('[Disconnected]')

socketIO = SocketIO('localhost', 8000, Namespace)
socketIO.wait(seconds=1)

在单个套接字上定义不同的命名空间。

from socketIO_client import SocketIO, BaseNamespace

class ChatNamespace(BaseNamespace):

    def on_aaa_response(self, *args):
        print('on_aaa_response', args)

class NewsNamespace(BaseNamespace):

    def on_aaa_response(self, *args):
        print('on_aaa_response', args)

socketIO = SocketIO('localhost', 8000)
chat_namespace = socketIO.define(ChatNamespace, '/chat')
news_namespace = socketIO.define(NewsNamespace, '/news')

chat_namespace.emit('aaa')
news_namespace.emit('aaa')
socketIO.wait(seconds=1)

通过ssl连接(https://github.com/invisibleroads/socketIO-client/issues/54)。

from socketIO_client import SocketIO

# Skip server certificate verification
SocketIO('https://localhost', verify=False)
# Verify the server certificate
SocketIO('https://localhost', verify='server.crt')
# Verify the server certificate and encrypt using client certificate
socketIO = SocketIO('https://localhost', verify='server.crt', cert=(
    'client.crt', 'client.key'))

通过{{8}库)指定参数、标头、cookies、代理。

from socketIO_client import SocketIO
from base64 import b64encode

SocketIO(
    localhost', 8000,
    params={'q': 'qqq'},
    headers={'Authorization': 'Basic ' + b64encode('username:password')},
    cookies={'a': 'aaa'},
    proxies={'https': 'https://proxy.example.com:8080'})

永远等待。

from socketIO_client import SocketIO

socketIO = SocketIO('localhost', 8000)
socketIO.wait()

许可证

这个软件是在麻省理工学院的许可下提供的。

0.7

  • 固定线程清理
  • 修复了由于Andreas Strikos直接定义的断开连接检测

0.6

  • 由于Sean Arietta和Joe Palmer,已升级到Socket.io Protocol 1.x
  • 修复了对Python3的支持
  • 固定SSL支持
  • 添加锁以解决轮询传输的并发问题
  • 添加了socketio.off()和socketio.once()

0.5

  • 增加了对Python3的支持
  • 由于bernard pratz,增加了对jsonp轮询的支持
  • 由于francis bull,增加了对xhr轮询的支持
  • 添加了对查询参数和cookies的支持
  • 修复了由于travis odom而在自定义命名空间中发送确认的问题
  • 重写库以使用协程而不是线程来节省内存

0.4

    添加了对自定义标头和代理的支持,这要归功于芮和Sajal
  • 由于zac lee,增加了对服务器端回调的支持
  • 由于Alexandre Bourget,已将频道功能合并到BaseNamespace中

0.3

  • 增加了对安全连接的支持
  • 添加了socketio.wait()
  • 改进了rhythmicthread和listenerthread中的异常处理

0.2

  • 由于Paul Kienzle,增加了对回调和频道的支持
  • 采纳了Josh Vanderlinden和Ian Fitzpatrick的建议

0.1

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java Hibernate、Spring和SLF4J绑定   java如何准确地导航导航地图。天花板入口()工作?   从excel文件导入数据时发生java异常   java如何将地图转换为url查询字符串?   java HSQLDB在数据库中插入值   java将元素从JTextField添加到ArrayList   java如何使用SetDataSource从名称中有空格的MP3文件中获取唱片集艺术   java排序ArrayList<ArrayList<String>>作为行字段   java在拼写检查器中更新正确的拼写   哪里可以找到JavaAPI类图?   spring boot如何在自动生成的swagger java类中屏蔽任何参数   java使用哪种设计模式(我需要类似中介模式的东西)?   java为什么JTextfield没有出现在这个实例中?   如何在Java中向当前日期添加一个月?   安卓如何使java类可序列化,从而扩展不可序列化的