import asyncio import json import logging import time from math import floor from fastapi import APIRouter, Request from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from ib_async import Stock from sse_starlette.sse import EventSourceResponse import dependencies router = APIRouter() templates = Jinja2Templates(directory="templates") logger = logging.getLogger(__name__) _history_cache: dict = {} # {symbol: (monotonic_ts, data)} _HISTORY_CACHE_TTL = 60 # seconds class ChartState: def __init__(self): self.latest_tick = None self.current_ticker = None self.current_minute = None self.current_ohlc = None chart_state = ChartState() @router.get("/", response_class=HTMLResponse) async def index(request: Request, symbol: str = ""): return templates.TemplateResponse(request, "index.html", {"symbol": symbol}) @router.get("/history") async def get_history(symbol: str): sym = symbol.upper() now = time.monotonic() cached = _history_cache.get(sym) if cached and (now - cached[0]) < _HISTORY_CACHE_TTL: return cached[1] try: ib = dependencies.get_ib() contract = Stock(sym, "SMART", "USD") qualified = await ib.qualifyContractsAsync(contract) if not qualified: return [] bars = None for attempt in range(3): try: bars = await ib.reqHistoricalDataAsync( qualified[0], endDateTime="", durationStr="1 D", barSizeSetting="1 min", whatToShow="TRADES", useRTH=True, formatDate=1, ) break except Exception as exc: msg = str(exc) if ("162" in msg or "pacing" in msg.lower()) and attempt < 2: logger.warning( f"IBKR pacing violation for {sym}, retry {attempt + 1}/3 in 10s" ) await asyncio.sleep(10) continue raise if bars is None: return [] result = [ { "time": int(bar.date.timestamp()), "open": bar.open, "high": bar.high, "low": bar.low, "close": bar.close, } for bar in bars ] _history_cache[sym] = (time.monotonic(), result) return result except Exception as exc: logger.error(f"Error fetching history for {sym}: {exc}") return [] @router.post("/subscribe") async def subscribe(request: Request): try: ib = dependencies.get_ib() data = await request.json() symbol = data.get("symbol", "").upper() if not symbol: return {"status": "error", "message": "Symbol is required"} contract = Stock(symbol, "SMART", "USD") qualified = await ib.qualifyContractsAsync(contract) if not qualified: return {"status": "error", "message": f"Could not qualify contract for {symbol}"} if chart_state.current_ticker is not None: try: ib.cancelRealTimeBars(chart_state.current_ticker) except Exception as e: print(f"Error cancelling real-time bars: {e}") chart_state.current_minute = None chart_state.current_ohlc = None ticker = ib.reqRealTimeBars(qualified[0], barSize=5, whatToShow="TRADES", useRTH=True) chart_state.current_ticker = ticker def on_bar(bars, hasNewBar): if not hasNewBar or not bars: return bar = bars[-1] timestamp = int(bar.time.timestamp()) minute_bucket = floor(timestamp / 60) * 60 if chart_state.current_minute is None: chart_state.current_minute = minute_bucket chart_state.current_ohlc = { "time": minute_bucket, "open": bar.open_, "high": bar.high, "low": bar.low, "close": bar.close, } elif minute_bucket == chart_state.current_minute: chart_state.current_ohlc["high"] = max(chart_state.current_ohlc["high"], bar.high) chart_state.current_ohlc["low"] = min(chart_state.current_ohlc["low"], bar.low) chart_state.current_ohlc["close"] = bar.close else: print(f"Finalized candle: {chart_state.current_ohlc}") chart_state.current_minute = minute_bucket chart_state.current_ohlc = { "time": minute_bucket, "open": bar.open_, "high": bar.high, "low": bar.low, "close": bar.close, } chart_state.latest_tick = chart_state.current_ohlc.copy() ticker.updateEvent += on_bar return {"status": "ok", "symbol": symbol} except Exception as e: print(f"Error subscribing to {symbol}: {e}") return {"status": "error", "message": str(e)} @router.get("/stream") async def stream(request: Request): async def event_generator(): while True: if await request.is_disconnected(): break if chart_state.latest_tick: yield {"event": "candle", "data": json.dumps(chart_state.latest_tick)} await asyncio.sleep(1) return EventSourceResponse(event_generator())