176 lines
5.5 KiB
Python
176 lines
5.5 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import time
|
|
from math import floor
|
|
|
|
from fastapi import APIRouter, Request
|
|
from fastapi.responses import HTMLResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from ib_async import Stock
|
|
from sse_starlette.sse import EventSourceResponse
|
|
|
|
import dependencies
|
|
|
|
router = APIRouter()
|
|
templates = Jinja2Templates(directory="templates")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_history_cache: dict = {} # {symbol: (monotonic_ts, data)}
|
|
_HISTORY_CACHE_TTL = 60 # seconds
|
|
|
|
|
|
class ChartState:
|
|
def __init__(self):
|
|
self.latest_tick = None
|
|
self.current_ticker = None
|
|
self.current_minute = None
|
|
self.current_ohlc = None
|
|
|
|
|
|
chart_state = ChartState()
|
|
|
|
|
|
@router.get("/", response_class=HTMLResponse)
|
|
async def index(request: Request, symbol: str = ""):
|
|
return templates.TemplateResponse(request, "index.html", {"symbol": symbol})
|
|
|
|
|
|
@router.get("/history")
|
|
async def get_history(symbol: str):
|
|
sym = symbol.upper()
|
|
now = time.monotonic()
|
|
|
|
cached = _history_cache.get(sym)
|
|
if cached and (now - cached[0]) < _HISTORY_CACHE_TTL:
|
|
return cached[1]
|
|
|
|
try:
|
|
ib = dependencies.get_ib()
|
|
contract = Stock(sym, "SMART", "USD")
|
|
qualified = await ib.qualifyContractsAsync(contract)
|
|
if not qualified:
|
|
return []
|
|
|
|
bars = None
|
|
for attempt in range(3):
|
|
try:
|
|
bars = await ib.reqHistoricalDataAsync(
|
|
qualified[0],
|
|
endDateTime="",
|
|
durationStr="1 D",
|
|
barSizeSetting="1 min",
|
|
whatToShow="TRADES",
|
|
useRTH=True,
|
|
formatDate=1,
|
|
)
|
|
break
|
|
except Exception as exc:
|
|
msg = str(exc)
|
|
if ("162" in msg or "pacing" in msg.lower()) and attempt < 2:
|
|
logger.warning(
|
|
f"IBKR pacing violation for {sym}, retry {attempt + 1}/3 in 10s"
|
|
)
|
|
await asyncio.sleep(10)
|
|
continue
|
|
raise
|
|
|
|
if bars is None:
|
|
return []
|
|
|
|
result = [
|
|
{
|
|
"time": int(bar.date.timestamp()),
|
|
"open": bar.open,
|
|
"high": bar.high,
|
|
"low": bar.low,
|
|
"close": bar.close,
|
|
}
|
|
for bar in bars
|
|
]
|
|
_history_cache[sym] = (time.monotonic(), result)
|
|
return result
|
|
except Exception as exc:
|
|
logger.error(f"Error fetching history for {sym}: {exc}")
|
|
return []
|
|
|
|
|
|
@router.post("/subscribe")
|
|
async def subscribe(request: Request):
|
|
try:
|
|
ib = dependencies.get_ib()
|
|
data = await request.json()
|
|
symbol = data.get("symbol", "").upper()
|
|
if not symbol:
|
|
return {"status": "error", "message": "Symbol is required"}
|
|
|
|
contract = Stock(symbol, "SMART", "USD")
|
|
qualified = await ib.qualifyContractsAsync(contract)
|
|
if not qualified:
|
|
return {"status": "error", "message": f"Could not qualify contract for {symbol}"}
|
|
|
|
if chart_state.current_ticker is not None:
|
|
try:
|
|
ib.cancelRealTimeBars(chart_state.current_ticker)
|
|
except Exception as e:
|
|
print(f"Error cancelling real-time bars: {e}")
|
|
|
|
chart_state.current_minute = None
|
|
chart_state.current_ohlc = None
|
|
|
|
ticker = ib.reqRealTimeBars(qualified[0], barSize=5, whatToShow="TRADES", useRTH=True)
|
|
chart_state.current_ticker = ticker
|
|
|
|
def on_bar(bars, hasNewBar):
|
|
if not hasNewBar or not bars:
|
|
return
|
|
bar = bars[-1]
|
|
timestamp = int(bar.time.timestamp())
|
|
minute_bucket = floor(timestamp / 60) * 60
|
|
|
|
if chart_state.current_minute is None:
|
|
chart_state.current_minute = minute_bucket
|
|
chart_state.current_ohlc = {
|
|
"time": minute_bucket,
|
|
"open": bar.open_,
|
|
"high": bar.high,
|
|
"low": bar.low,
|
|
"close": bar.close,
|
|
}
|
|
elif minute_bucket == chart_state.current_minute:
|
|
chart_state.current_ohlc["high"] = max(chart_state.current_ohlc["high"], bar.high)
|
|
chart_state.current_ohlc["low"] = min(chart_state.current_ohlc["low"], bar.low)
|
|
chart_state.current_ohlc["close"] = bar.close
|
|
else:
|
|
print(f"Finalized candle: {chart_state.current_ohlc}")
|
|
chart_state.current_minute = minute_bucket
|
|
chart_state.current_ohlc = {
|
|
"time": minute_bucket,
|
|
"open": bar.open_,
|
|
"high": bar.high,
|
|
"low": bar.low,
|
|
"close": bar.close,
|
|
}
|
|
|
|
chart_state.latest_tick = chart_state.current_ohlc.copy()
|
|
|
|
ticker.updateEvent += on_bar
|
|
|
|
return {"status": "ok", "symbol": symbol}
|
|
except Exception as e:
|
|
print(f"Error subscribing to {symbol}: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
|
|
@router.get("/stream")
|
|
async def stream(request: Request):
|
|
async def event_generator():
|
|
while True:
|
|
if await request.is_disconnected():
|
|
break
|
|
if chart_state.latest_tick:
|
|
yield {"event": "candle", "data": json.dumps(chart_state.latest_tick)}
|
|
await asyncio.sleep(1)
|
|
|
|
return EventSourceResponse(event_generator())
|