Skip to content

Instantly share code, notes, and snippets.

@idkravitz
Created May 21, 2021 08:11
Show Gist options
  • Select an option

  • Save idkravitz/b5c91f1bc41c7be7adbb0d96fd462439 to your computer and use it in GitHub Desktop.

Select an option

Save idkravitz/b5c91f1bc41c7be7adbb0d96fd462439 to your computer and use it in GitHub Desktop.
import asyncio
import threading
import time
from contextlib import contextmanager
from typing import List, Dict
from binance import AsyncClient, BinanceSocketManager, DepthCacheManager
from binance.depthcache import DepthCache
class _depth:
def __init__(self):
self.bids = []
self.asks = []
def get_bids(self):
return self.bids
def get_asks(self):
return self.asks
def set_bids(self, bids: list):
self.bids.clear()
for bid in bids:
self.bids.append([bid[0], bid[1]])
# self.bids = bids
def set_asks(self, asks: list):
self.asks.clear()
for ask in asks:
self.asks.append([ask[0], ask[1]])
class Order_books_cache:
_obs: Dict[str, _depth] = {}
_obs_mutex: threading.Lock = threading.Lock()
@contextmanager
def open_orderbook(self):
with self._obs_mutex:
yield self._obs
class Order_book_stream:
def __init__(self, SYMBOLS: List[str], BRIDGE_SYMBOL: str, cache: Order_books_cache):#, logger: Logger):
self.SYMBOLS = SYMBOLS
self.BRIDGE_SYMBOL = BRIDGE_SYMBOL
self.cache: Order_books_cache = cache
# self.logger = logger
async def run_orderbook(self):
client = await AsyncClient.create()
bm = BinanceSocketManager(client)
dcms: List[DepthCacheManager] = [
DepthCacheManager(client, symbol + self.BRIDGE_SYMBOL, bm=bm, limit=10)
for symbol in self.SYMBOLS
]
async def handlerLoop(dcm: DepthCacheManager):
print("Handlers started")
async with dcm as dcm_socket:
print("Socket context entered")
while True:
depth_cache: DepthCache = await dcm_socket.recv()
print("recv happened")
with self.cache.open_orderbook() as cache_orderbook:
if cache_orderbook:
if depth_cache == None:
cache_orderbook._obs.clear()
else:
cache_orderbook[depth_cache.symbol].set_asks(depth_cache.get_asks()[:10])
cache_orderbook[depth_cache.symbol].set_bids(depth_cache.get_bids()[:10])
futures = map(handlerLoop, dcms)
print("Futures prepared")
await asyncio.gather(*futures)
def order_book_worker(SYMBOLS: List[str], BRIDGE_SYMBOL: str, cache: Order_books_cache):#, logger: Logger):
obs = Order_book_stream(SYMBOLS, BRIDGE_SYMBOL, cache)#, logger)
print("Order book stream created")
asyncio.run(obs.run_orderbook())
def start_order_book_thread(SYMBOLS: List[str], BRIDGE_SYMBOL: str, cache: Order_books_cache):#, logger: Logger):
order_book_thread = threading.Thread(target=order_book_worker(SYMBOLS, BRIDGE_SYMBOL, cache))#, logger))
order_book_thread.start()
return order_book_thread
cache = Order_books_cache()
if __name__ == '__main__':
child_thread = start_order_book_thread(["XMR", "XRP", "SUSHI"], "USDT", cache)
time.sleep(120)
# child_thread.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment