CTP客户端v6.3.15

ctpwrapper的Python项目详细描述


CTP期

Build StatusBuild statusCodacy BadgepypistatuspyversionimplementationDownloads

CTP Website

版本:V6.3.15

平台:Linux 64位,Windows 64位

python需求:x86-64

特别支持pypy3-v5.10.1-3.5.3 Linux 64位

由Lovelylain激发

安装

在安装ctpwrapper包之前,需要确保 已安装cython软件包。

>>>pip install cython --upgrade
>>>pip install ctpwrapper --upgrade

捐献[捐助]

用法

mdapi类包装器

importsysfromctpwrapperimportApiStructurefromctpwrapperimportMdApiPyclassMd(MdApiPy):"""    """def__init__(self,broker_id,investor_id,password,request_id=1):self.broker_id=broker_idself.investor_id=investor_idself.password=passwordself.request_id=request_iddefOnRspError(self,pRspInfo,nRequestID,bIsLast):self.ErrorRspInfo(pRspInfo,nRequestID)defErrorRspInfo(self,info,request_id):"""        :param info:        :return:        """ifinfo.ErrorID!=0:print('request_id=%s ErrorID=%d, ErrorMsg=%s',request_id,info.ErrorID,info.ErrorMsg.decode('gbk'))returninfo.ErrorID!=0defOnFrontConnected(self):"""        :return:        """user_login=ApiStructure.ReqUserLoginField(BrokerID=self.broker_id,UserID=self.investor_id,Password=self.password)self.ReqUserLogin(user_login,self.request_id)defOnFrontDisconnected(self,nReason):print("Md OnFrontDisconnected %s",nReason)sys.exit()defOnHeartBeatWarning(self,nTimeLapse):"""心跳超时警告。当长时间未收到报文时,该方法被调用。        @param nTimeLapse 距离上次接收报文的时间        """print('Md OnHeartBeatWarning, time = %s',nTimeLapse)defOnRspUserLogin(self,pRspUserLogin,pRspInfo,nRequestID,bIsLast):"""        用户登录应答        :param pRspUserLogin:        :param pRspInfo:        :param nRequestID:        :param bIsLast:        :return:        """ifpRspInfo.ErrorID!=0:print("Md OnRspUserLogin failed error_id=%s msg:%s",pRspInfo.ErrorID,pRspInfo.ErrorMsg.decode('gbk'))else:print("Md user login successfully")print(pRspUserLogin)print(pRspInfo)BORDKER_ID=""INVESTOR_ID=""PASSWORD=""SERVER=""md=Md(BORDKER_ID,INVESTOR_ID,PASSWORD)md.Create()md.RegisterFront(SERVER)md.Init()day=md.GetTradingDay()print(day)print("api worker!")

交易员类包装器

fromctpwrapperimportApiStructurefromctpwrapperimportTraderApiPyclassTrader(TraderApiPy):def__init__(self,broker_id,investor_id,password,request_id=1):self.request_id=request_idself.broker_id=broker_id.encode()self.investor_id=investor_id.encode()self.password=password.encode()defOnRspError(self,pRspInfo,nRequestID,bIsLast):self.ErrorRspInfo(pRspInfo,nRequestID)defErrorRspInfo(self,info,request_id):"""        :param info:        :return:        """ifinfo.ErrorID!=0:print('request_id=%s ErrorID=%d, ErrorMsg=%s',request_id,info.ErrorID,info.ErrorMsg.decode('gbk'))returninfo.ErrorID!=0defOnHeartBeatWarning(self,nTimeLapse):"""心跳超时警告。当长时间未收到报文时,该方法被调用。        @param nTimeLapse 距离上次接收报文的时间        """print("on OnHeartBeatWarning time: ",nTimeLapse)defOnFrontDisconnected(self,nReason):print("on FrontDisConnected disconnected",nReason)defOnFrontConnected(self):req=ApiStructure.ReqUserLoginField(BrokerID=self.broker_id,UserID=self.investor_id,Password=self.password)self.ReqUserLogin(req,self.request_id)print("trader on front connection")defOnRspUserLogin(self,pRspUserLogin,pRspInfo,nRequestID,bIsLast):ifpRspInfo.ErrorID!=0:print("Md OnRspUserLogin failed error_id=%s msg:%s",pRspInfo.ErrorID,pRspInfo.ErrorMsg.decode('gbk'))else:print("Md user login successfully")inv=ApiStructure.QryInvestorField(BrokerID=self.broker_id,InvestorID=self.investor_id)self.ReqQryInvestor(inv,self.inc_request_id())req=ApiStructure.SettlementInfoConfirmField.from_dict({"BrokerID":self.broker_id,"InvestorID":self.investor_id})self.ReqSettlementInfoConfirm(req,self.inc_request_id())defOnRspSettlementInfoConfirm(self,pSettlementInfoConfirm,pRspInfo,nRequestID,bIsLast):print(pSettlementInfoConfirm,pRspInfo)print(pRspInfo.ErrorMsg.decode("GBK"))definc_request_id(self):self.request_id+=1returnself.request_iddefOnRspQryInvestor(self,pInvestor,pRspInfo,nRequestID,bIsLast):print(pInvestor,pRspInfo)if__name__=="__main__":investor_id=""broker_id=""password=""server=""user_trader=Trader(broker_id=broker_id,investor_id=investor_id,password=password)user_trader.Create()user_trader.RegisterFront(server)user_trader.SubscribePrivateTopic(2)# 只传送登录后的流内容user_trader.SubscribePrivateTopic(2)# 只传送登录后的流内容user_trader.Init()print("trader started")print(user_trader.GetTradingDay())user_trader.Join()

文档

ctp的文档可以在docs

联系人

如果您对ctpwrapper api有任何疑问,请联系365504029@qq.com

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
AmazonS3查找从S3forJava下载的压缩文件的MIME类型   java如何使用Selenium在<span>中查找具有特定文本的元素   python如何使用OpenIEDemo生成自定义三元组。由stanfordnlp提供的java   java遇到“方法不适用”编译错误   java如何使用Mockito在Looper中运行的验证代码。getMainLooper?   类Java目录错误   java在已知其他泛型信息时使用原始类型   java connect()和disconnect()在哪里实现?   java使用PDF Box库拆分PDF,生成的PDF几乎与源PDF文件大小相同   java PowerMockito返回错误的对象   java如何找到TIBCO集合消息的字节编码?   java Basic音乐播放器下一步和上一步按钮   添加模块描述符时,java没有名为“entityManagerFactory”的bean可用   java为什么我的代码不是线程安全的?   eclipse java。引用项目中的类的lang.NoClassDefFoundError