python-期货CTP接口封装

python-期货CTP接口封装使用 cython 对 CTP 接口完成封装 低延时 基于 Cython 释放 GIL 支持多路行情源 无需主事件引擎 实现去中心化

大家好,欢迎来到IT知识分享网。


简介

CtpPlus是上期技术CTP API的Python封装,具有以下特点:

  • 易使用:Python语言,结构清晰,注释完整,文档详尽。
  • 低延时:基于Cython释放GIL;支持多路行情源;无需主事件引擎,实现去中心化。
  • 忠实于CTP官方特性:充分利用CTP的异步、多线程特性。

CtpPlus开源地址:https://gitee.com/syealfalfa/CtpPlus-master


一、安装

首先配置Anaconda环境,
以下为安装方法:
请添加图片描述
项目中已提供了python3.8和python3.10的window安装包,cd到对应的安装包路径即可

然后使用pip命令安装:

pip install CtpPlus-1.0-cp38-cp38-win_amd64.whl 

二、CtpPlus文件介绍

  • api:存放的是Linux和windows版本CTP链接库
  • c2cython:实现CTP回调函数功能
  • cython2c:使用cython调用CTP接口
  • MdApiBase.py:封装后的CTP行情接口
  • TraderApiBase.py:封装后的CTP交易接口
  • MdApi.py和TraderApi.py:接口案例文件
  • ApiConst.py:对应CTP的ThostFtdcUserApiDataType.h中的数据类型
  • ApiStruct.py:对应CTP的ThostFtdcUserApiStruct.h中的数据结构
  • FutureAccount.py:配置文件

三.应用范例

1.配置文件:FutureAccount.py

代码如下(示例):

# encoding:utf-8 import os BASE_LOCATION = "./log" MD_LOCATION = BASE_LOCATION TD_LOCATION = BASE_LOCATION SIMULATE_SERVER = { 
    '电信1': { 
   'BrokerID': 9999, 'TDServer': "180.168.146.187:10201", 'MDServer': '180.168.146.187:10211', 'AppID': 'simnow_client_test', 'AuthCode': '0000000000000000'}, '电信2': { 
   'BrokerID': 9999, 'TDServer': "180.168.146.187:10202", 'MDServer': '180.168.146.187:10212', 'AppID': 'simnow_client_test', 'AuthCode': '0000000000000000'}, '移动': { 
   'BrokerID': 9999, 'TDServer': "218.202.237.33:10203", 'MDServer': '218.202.237.33:10213', 'AppID': 'simnow_client_test', 'AuthCode': '0000000000000000'}, 'TEST': { 
   'BrokerID': 9999, 'TDServer': "180.168.146.187:10130", 'MDServer': '180.168.146.187:10131', 'AppID': 'simnow_client_test', 'AuthCode': '0000000000000000'}, 'N视界': { 
   'BrokerID': 10010, 'TDServer': "210.14.72.12:4600", 'MDServer': '210.14.72.12:4602', 'AppID': '', 'AuthCode': ''}, } class FutureAccount: def __init__(self, broker_id, server_dict, reserve_server_dict, investor_id, password, app_id, auth_code, subscribe_list, md_flow_path=MD_LOCATION, td_flow_path=TD_LOCATION): self.broker_id = broker_id # 期货公司BrokerID self.server_dict = server_dict # 登录的服务器地址 self.reserve_server_dict = reserve_server_dict # 备用服务器地址 self.investor_id = investor_id # 账户 self.password = password # 密码 self.app_id = app_id # 认证使用AppID self.auth_code = auth_code # 认证使用授权码 self.subscribe_list = subscribe_list # 订阅合约列表[] self.md_flow_path = md_flow_path # MdApi流文件存储地址,默认MD_LOCATION self.td_flow_path = td_flow_path # TraderApi流文件存储地址,默认TD_LOCATION def get_simulate_account(investor_id, password, subscribe_list=None, server_name='电信1', md_flow_path=MD_LOCATION, td_flow_path=TD_LOCATION): if server_name not in SIMULATE_SERVER.keys(): print(f'{ 
     server_name}不在可选列表[电信1, 电信2, 移动, TEST]中,默认使用电信1。') server_name = '电信1' if subscribe_list is None: subscribe_list = [] investor_id = investor_id if isinstance(investor_id, bytes) else investor_id.encode(encoding='utf-8') password = password if isinstance(password, bytes) else password.encode(encoding='utf-8') return FutureAccount( broker_id=SIMULATE_SERVER[server_name]['BrokerID'], # 期货公司BrokerID server_dict=SIMULATE_SERVER[server_name], # TDServer为交易服务器,MDServer为行情服务器。服务器地址格式为"ip:port。" reserve_server_dict={ 
   }, investor_id=investor_id, # 账户 password=password, # 密码 app_id=SIMULATE_SERVER[server_name]['AppID'], # 认证使用AppID auth_code=SIMULATE_SERVER[server_name]['AuthCode'], # 认证使用授权码 subscribe_list=subscribe_list, # 订阅合约列表 md_flow_path=md_flow_path, # MdApi流文件存储地址,默认MD_LOCATION td_flow_path=td_flow_path # TraderApi流文件存储地址,默认TD_LOCATION ) 

2.获取实时行情和交易接口测试

1.1获取实时行情

演示这个功能的例子是examples/test_MdApi.py,运行之后可以看到如下的输出结果:

# -*- codeing:utf-8 -*- ''' @author: syealfalfa @datetime: 2024/2/22 11:00 @Blog: 获取tick数据 ''' from CtpPlus.CTP.ApiStruct import ReqUserLoginField, QryMulticastInstrumentField from CtpPlus.CTP.FutureAccount import get_simulate_account, FutureAccount from CtpPlus.CTP.MdApi import run_api from CtpPlus.CTP.MdApiBase import MdApiBase class TickEngine(MdApiBase): def __init__(self, broker_id, md_server, investor_id, password, app_id, auth_code, instrument_id_list, md_queue_list=None, page_dir='', using_udp=False, multicast=False, *args, kwargs): super(TickEngine, self).__init__() def OnRtnDepthMarketData(self, pDepthMarketData): print(pDepthMarketData) def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast): self.write_log('OnRspUserLogin', pRspUserLogin) def OnRspQryMulticastInstrument(self, pMulticastInstrument, pRspInfo, nRequestID, bIsLast): print(f'OnRspQryMulticastInstrument: { 
     pMulticastInstrument}') if __name__ == '__main__': subscribe_list = [b'rb2410'] future_account = get_simulate_account( investor_id='', # SimNow账户 password='', # SimNow账户密码 subscribe_list=subscribe_list, # 合约列表 server_name='TEST' # 电信1、电信2、移动、TEST ) run_api(TickEngine, future_account) 

请添加图片描述从输出日志可以看到,AlgoPlus第一步连接服务器,第二步登陆账户,第三步订阅行情,最后就是接收行情数据。

1.2 交易接口测试

# -*- codeing:utf-8 -*- ''' @author: syealfalfa @datetime: 2024/3/4 9:56 @Blog: ''' import time from CtpPlus.CTP.ApiStruct import QryRULEIntraParameterField, QryProductGroupField, QryExchangeField, QryInstrumentField from CtpPlus.CTP.FutureAccount import get_simulate_account, FutureAccount from CtpPlus.CTP.TraderApiBase import TraderApiBase from CtpPlus.utils.base_field import to_str class TraderEngine(TraderApiBase): def __init__(self, broker_id, td_server, investor_id, password, app_id, auth_code, md_queue=None, flow_path='', private_resume_type=2, public_resume_type=2): super(TraderEngine, self).__init__() def OnRspSettlementInfoConfirm(self, pSettlementInfoConfirm, pRspInfo, nRequestID, bIsLast): """投资者结算结果确认响应""" self.write_log('pSettlementInfoConfirm', pSettlementInfoConfirm) def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast): """登录请求响应""" self.write_log('OnRspUserLogin', pRspUserLogin) # 请求RULE品种内对锁仓折扣参数查询 # pQryRULEIntraParameter = QryRULEIntraParameterField(ExchangeID=b'9080', ProdFamilyCode=b'MA') # ret = self.ReqQryRULEIntraParameter(pQryRULEIntraParameter) # print(f'ReqQryRULEIntraParameter: ret = {ret}') # 请求查询产品组 # pQryProductGroup = QryProductGroupField(ProductID=b'ag', ExchangeID=b'SHFE') # ret = self.ReqQryProductGroup(pQryProductGroup) # print(f'ReqQryProductGroup: ret = {ret}') # 买开仓 ret = self.buy_open(b'SHFE', b'rb2405', 3720, 1) if not ret: self.write_log("ReqOrderInsert", "买入开仓成功") """查询持仓明细""" # self.query_position_detail() """请求查询交易所""" # pQryExchange = QryExchangeField(ExchangeID=b'SHFE') # self.ReqQryExchange(pQryExchange) """请求查询合约,填空可以查询到所有合约""" # pQryInstrument = QryInstrumentField() # self.ReqQryInstrument(pQryInstrument) # self.query_instrument() def OnRspQryInstrument(self, pInstrument, pRspInfo, nRequestID, bIsLast): """请求查询合约响应""" pInstrument['InstrumentName'] = to_str(pInstrument['InstrumentName']) self.write_log("OnRspQryInstrument", pInstrument, pRspInfo) def OnRspQryRULEIntraParameter(self, pRULEIntraParameter, pRspInfo, nRequestID, bIsLast): self.write_log('OnRspQryRULEIntraParameter', pRULEIntraParameter) def OnRspQryProductGroup(self, pProductGroup, pRspInfo, nRequestID, bIsLast): self.write_log('OnRspQryProductGroup', pProductGroup, pRspInfo, nRequestID, bIsLast) def OnRspQryExchange(self, pExchange, pRspInfo, nRequestID, bIsLast): pExchange['ExchangeName'] = to_str(pExchange['ExchangeName']) self.write_log('OnRspQryExchange', pExchange) def run_api(api_cls, account): if isinstance(account, FutureAccount): trader_engine = api_cls( account.broker_id, account.server_dict['TDServer'], account.investor_id, account.password, account.app_id, account.auth_code, None, account.td_flow_path ) trader_engine.Join() if __name__ == '__main__': subscribe_list = [b'rb2410'] future_account = get_simulate_account( investor_id='', # SimNow账户 password='', # SimNow账户密码 subscribe_list=subscribe_list, # 合约列表 server_name='TEST' # 电信1、电信2、移动、TEST、实盘 ) run_api(TraderEngine, future_account) 

测试结果如下:请添加图片描述

总结

请自行下载源码研究,文档中微信号,欢迎各位量化交易者添加,相互探讨

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/155904.html

(0)
上一篇 2025-02-16 15:15
下一篇 2025-02-16 15:20

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信