Skip to content

Instantly share code, notes, and snippets.

@tej87681088
Created November 15, 2024 07:00
Show Gist options
  • Select an option

  • Save tej87681088/62497747f7cb410dfc04a4c07c63a251 to your computer and use it in GitHub Desktop.

Select an option

Save tej87681088/62497747f7cb410dfc04a4c07c63a251 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "f5149aa6",
"metadata": {},
"outputs": [],
"source": [
"import threading\n",
"from MasterTradePy.api import MasterTradeAPI\n",
"from MasterTradePy.model import *\n",
"from MasterTradePy.constant import PriceType, OrderType, TradingSession, Side, TradingUnit, RCode"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "ad02eb72",
"metadata": {},
"outputs": [],
"source": [
"username = 'your_username'\n",
"password = 'your_password'\n",
"stock_id_list = ['1216', '1582', '2108', '2373', '6277', '9911', '9943']"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "733b63c2",
"metadata": {},
"outputs": [],
"source": [
"event = threading.Event()"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "ff344703",
"metadata": {},
"outputs": [],
"source": [
"class ConcreteMarketTrader(MarketTrader):\n",
" def OnNewOrderReply(self, data) -> None:\n",
" print(data)\n",
"\n",
" def OnChangeReply(self, data) -> None:\n",
" print(data)\n",
"\n",
" def OnCancelReply(self, data) -> None:\n",
" print(data)\n",
"\n",
" def OnReport(self, data) -> None:\n",
" if type(data) is ReportOrder:\n",
" if data.order.tableName == \"ORD:TwsOrd\":\n",
" print(f'回報資料: 委託書號={data.order.ordNo}, 股票代號={data.order.symbol}, 委託股數={data.orgOrder.qty}, 成交股數={data.order.cumQty}, 訊息={data.lastMessage}, 狀態={data.order.status}')\n",
" elif data.order.tableName == \"RPT:TwsDeal\":\n",
" print(f'回報資料: 委託書號={data.order.ordNo}, 股票代號={data.order.symbol}, 成交價格={data.order.dealPri}, 成交股數={data.order.cumQty}, 剩餘股數={data.order.leavesQty} 訊息={data.lastMessage}, 狀態={data.order.status}')\n",
" elif data.order.tableName == \"RPT:TwsNew\":\n",
" print(f'回報資料: 委託書號={data.order.ordNo}, 股票代號={data.order.symbol}, 委託價格={data.orgOrder.price}, 委託股數={data.orgOrder.qty}, 訊息={data.lastMessage}, 狀態={data.order.status}')\n",
" else:\n",
" print(f'回報資料: 委託書號={data.order.ordNo}, 股票代號={data.order.symbol}, 委託價格={data.orgOrder.price}, 委託股數={data.orgOrder.qty}, 成交股數={data.order.cumQty}, 訊息={data.lastMessage}, 狀態={data.order.status}')\n",
" \n",
" def OnReqResult(self, workID: str, data) -> None:\n",
" pass\n",
"\n",
" def OnSystemEvent(self, data: SystemEvent) -> None:\n",
" print(f'OnSystemEvent{data}')\n",
" \n",
" def OnAnnouncementEvent(self, data)->None:\n",
" print(f'OnAnnouncementEvent:{data}')\n",
" \n",
" def OnError(self, data):\n",
" print(data)\n",
"\n",
"def OnDigitalSSOEvent(aIsOK, aMsg):\n",
" print(f'OnDigitalSSOEvent: {aIsOK} {aMsg}')\n",
"\n",
"def OnTAConnStuEvent(aIsOK):\n",
" print(f'OnTAConnStuEvent: {aIsOK}')\n",
" if aIsOK:\n",
" event.set()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a3913ab6",
"metadata": {},
"outputs": [],
"source": [
"def execute_order(api, stock_id_list):\n",
" account = 'your_account'\n",
" price = '' # 空白表示市價下單\n",
" qtr = '1' # 1張請輸入1\n",
" orderType = OrderType.ROD\n",
"\n",
" if not price:\n",
" priceType = PriceType.MKT\n",
" else:\n",
" priceType = PriceType.LMT\n",
"\n",
" for stock_id in stock_id_list:\n",
" symbol = stock_id\n",
" order = Order(tradingSession=TradingSession.NORMAL,\n",
" side=Side.Buy,\n",
" symbol=symbol,\n",
" priceType=priceType,\n",
" price=price,\n",
" tradingUnit=TradingUnit.COMMON, \n",
" qty=qty,\n",
" orderType=orderType,\n",
" tradingAccount=account,\n",
" userDef='')\n",
" rcode = api.NewOrder(order)\n",
" if rcode == RCode.OK:\n",
" print(f'已送出委託: {symbol}')\n",
" else:\n",
" print(f'下單失敗! 股票代號: {symbol},請再次執行程式,依據回報資料修正輸入')"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "f2f09fa4",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"連線競賽ID:C121695520VA\n",
"OnAnnouncementEvent:證券交易驗證公告內文 : 證券登入公告\n",
"期貨交易驗證公告內文 : 期貨登入公告\n",
"連線競賽主機!!!\n",
"\n",
"交易主機連線成功,進行雙因子認證\n",
"驗證已通過,可執行 API 交易功能\n",
"回報資料: 委託書號=Y0001, 股票代號=3515, 委託價格=, 委託股數=5000, 訊息=新單 5000 股OK!, 狀態=101)委託已接受(交易所已接受)\n",
"已送出委託: 3515\n",
"已送出委託: 2610\n",
"回報資料: 委託書號=Y0002, 股票代號=2610, 委託價格=, 委託股數=5000, 訊息=新單 5000 股OK!, 狀態=101)委託已接受(交易所已接受)\n",
"已送出委託: 2344\n",
"回報資料: 委託書號=Y0003, 股票代號=2344, 委託價格=, 委託股數=5000, 訊息=新單 5000 股OK!, 狀態=101)委託已接受(交易所已接受)\n",
"已送出委託: 2317\n",
"回報資料: 委託書號=Y0004, 股票代號=2317, 委託價格=, 委託股數=5000, 訊息=新單 5000 股OK!, 狀態=101)委託已接受(交易所已接受)\n",
"已送出委託: 2460\n",
"回報資料: 委託書號=Y0005, 股票代號=2460, 委託價格=, 委託股數=5000, 訊息=新單 5000 股OK!, 狀態=101)委託已接受(交易所已接受)\n"
]
}
],
"source": [
"def main():\n",
" trader = ConcreteMarketTrader()\n",
" api = MasterTradeAPI(trader)\n",
" api.SetConnectionHost('solace140.masterlink.com.tw:55555')\n",
" \n",
" # 登入交易主機\n",
" rc = api.Login(username, password, True, True, True)\n",
" if rc == RCode.OK:\n",
" print('交易主機連線成功,進行雙因子認證')\n",
" \n",
" # 取得帳戶並進行驗證\n",
" accounts = [x[4:] for x in api.accounts]\n",
" rcc = api.CheckAccs(tradingAccounts=accounts)\n",
" if rcc == RCode.OK:\n",
" print('驗證已通過,可執行 API 交易功能')\n",
"\n",
" execute_order(api, stock_id_list)\n",
"\n",
" input(\"Press Enter to finish...\\n\")\n",
"\n",
"main()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a3234a19",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.8"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment