Files
2026-05-18 00:31:08 +03:00

110 lines
3.7 KiB
Python

import asyncio
import csv
import io
import logging
from datetime import datetime, timezone
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from ib_async import ScannerSubscription
import dependencies
router = APIRouter(prefix="/scanner")
templates = Jinja2Templates(directory="templates")
logger = logging.getLogger(__name__)
async def execute_scan(data: dict, ib) -> list:
scan_type = data.get("scan_type", "HOT_BY_VOLUME")
min_price = float(data.get("min_price", 0) or 0)
min_volume = int(data.get("min_volume", 0) or 0)
min_market_cap = float(data.get("min_market_cap", 0) or 0)
subscription = ScannerSubscription()
subscription.instrument = "STK"
subscription.locationCode = "STK.US.MAJOR"
subscription.stockTypeFilter = "ALL"
subscription.scanCode = scan_type
if min_price > 0:
subscription.abovePrice = min_price
if min_volume > 0:
subscription.aboveVolume = min_volume
if min_market_cap > 0:
subscription.marketCapAbove = min_market_cap
scan_data = await asyncio.wait_for(
ib.reqScannerDataAsync(subscription), timeout=30.0
)
results = []
for item in scan_data[:20]:
contract = item.contractDetails.contract
symbol = contract.symbol
try:
details_list = await ib.reqContractDetailsAsync(contract)
long_name = details_list[0].longName if details_list else None
except Exception:
long_name = None
results.append({
"symbol": symbol,
"company": long_name or contract.localSymbol or f"{symbol} Inc.",
"exchange": contract.primaryExchange or contract.exchange,
"chart_image": f"https://finviz.com/chart.ashx?t={symbol}&ty=c&ta=1&p=d&s=l",
"rank": item.rank if hasattr(item, "rank") else None,
})
return results
@router.get("", response_class=HTMLResponse)
async def scanner_page(request: Request):
return templates.TemplateResponse(request, "scanner.html", {})
@router.post("")
async def run_scanner(request: Request):
try:
ib = dependencies.get_ib()
data = await request.json()
results = await execute_scan(data, ib)
return {"status": "ok", "results": results}
except asyncio.TimeoutError:
return {"status": "error", "message": "Scanner request timed out"}
except Exception as e:
logger.error(f"Scanner error: {e}")
return {"status": "error", "message": str(e)}
@router.post("/export")
async def export_scanner_csv(request: Request):
try:
ib = dependencies.get_ib()
data = await request.json()
scan_type = data.get("scan_type", "HOT_BY_VOLUME")
results = await execute_scan(data, ib)
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["Symbol", "Company", "Exchange", "Scan Type", "Timestamp"])
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
for r in results:
writer.writerow([r["symbol"], r["company"], r["exchange"], scan_type, ts])
filename = "scanner_" + datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + ".csv"
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
except asyncio.TimeoutError:
return {"status": "error", "message": "Scanner request timed out"}
except Exception as e:
logger.error(f"Scanner export error: {e}")
return {"status": "error", "message": str(e)}