""" Integration test suite — pre-Docker validation. Run all tests (mocked IB): pytest tests/test_integration.py -v Run with live IB Gateway on port 4002: pytest tests/test_integration.py -v (SKIP_LIVE_TESTS must NOT be set) Skip live tests: SKIP_LIVE_TESTS=1 pytest tests/test_integration.py -v """ import os import sqlite3 import sys import threading import types import unittest.mock as mock from pathlib import Path import pytest sys.path.insert(0, str(Path(__file__).parent.parent)) SKIP_LIVE = os.getenv("SKIP_LIVE_TESTS", "0") == "1" # ── Minimal ib_async stubs (used when real ib_async not available) ──────────── def _make_stub(): mod = types.ModuleType("ib_async") class _FakeIB: def positions(self): return [] async def reqAccountSummaryAsync(self): return [] def placeOrder(self, *a, **kw): return mock.MagicMock(isDone=lambda: False, fills=[]) def cancelOrder(self, *a, **kw): pass def bracketOrder(self, *a, **kw): return [mock.MagicMock(), mock.MagicMock(), mock.MagicMock()] mod.IB = _FakeIB mod.Stock = mock.MagicMock mod.MarketOrder = mock.MagicMock mod.LimitOrder = mock.MagicMock mod.ScannerSubscription = mock.MagicMock return mod, _FakeIB if "ib_async" not in sys.modules: _ib_mod, _FakeIBClass = _make_stub() sys.modules["ib_async"] = _ib_mod else: _FakeIBClass = None if "dotenv" not in sys.modules: sys.modules["dotenv"] = types.SimpleNamespace(load_dotenv=lambda: None) import dependencies # noqa: E402 _test_conn = sqlite3.connect(":memory:", check_same_thread=False) _test_cursor = _test_conn.cursor() _test_cursor.execute(""" CREATE TABLE IF NOT EXISTS trades ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp INTEGER NOT NULL, symbol TEXT NOT NULL, action TEXT NOT NULL, quantity INTEGER NOT NULL, price REAL NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) _test_conn.commit() _test_lock = threading.Lock() _ib_module = sys.modules["ib_async"] _fake_ib = _ib_module.IB() dependencies.setup_dependencies(_fake_ib, _test_conn, _test_cursor, _test_lock) from fastapi import FastAPI # noqa: E402 from fastapi.testclient import TestClient # noqa: E402 from routers import charts, health, scanner, trades, portfolio # noqa: E402 _app = FastAPI() _app.include_router(health.router) _app.include_router(charts.router) _app.include_router(scanner.router) _app.include_router(trades.router) _app.include_router(portfolio.router) client = TestClient(_app, raise_server_exceptions=False) WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET", "your_super_secret_string_123") BASE = Path(__file__).parent.parent # ── IBKR Connection Tests ───────────────────────────────────────────────────── class TestIBKRConnection: def test_ibkr_host_env_set(self): host = os.getenv("IBKR_HOST", "127.0.0.1") assert host and host.strip(), "IBKR_HOST env var must not be empty" def test_ibkr_port_env_set(self): port = int(os.getenv("IBKR_PORT", "4002")) assert port in (4001, 4002), ( f"IBKR_PORT must be 4001 (live) or 4002 (paper), got {port}" ) @pytest.mark.skipif(SKIP_LIVE, reason="SKIP_LIVE_TESTS=1") def test_ibkr_connection_live(self): import asyncio try: from ib_async import IB # type: ignore except ImportError: pytest.skip("ib_async not installed") ib = IB() host = os.getenv("IBKR_HOST", "127.0.0.1") port = int(os.getenv("IBKR_PORT", "4002")) client_id = int(os.getenv("IBKR_CLIENT_ID", "99")) async def _connect(): await asyncio.wait_for( ib.connectAsync(host, port, clientId=client_id), timeout=10.0, ) try: asyncio.run(_connect()) except asyncio.TimeoutError: pytest.fail(f"Timed out connecting to IB Gateway at {host}:{port}") except Exception as exc: pytest.fail(f"Connection failed: {exc}") assert ib.isConnected(), "IB Gateway connection not established" ib.disconnect() @pytest.mark.skipif(SKIP_LIVE, reason="SKIP_LIVE_TESTS=1") def test_history_returns_data(self): r = client.get("/history?symbol=AAPL") assert r.status_code == 200 data = r.json() assert isinstance(data, list) and len(data) > 0, ( "Expected non-empty history list for AAPL" ) # ── All Endpoint Tests ──────────────────────────────────────────────────────── class TestAllEndpoints: def test_home_200(self): r = client.get("/") assert r.status_code == 200 def test_scanner_page_200(self): r = client.get("/scanner") assert r.status_code == 200 def test_tradelog_200(self): r = client.get("/tradelog") assert r.status_code == 200 def test_portfolio_200(self): r = client.get("/portfolio") assert r.status_code == 200 def test_risk_status_200(self): r = client.get("/risk/status") assert r.status_code == 200 def test_subscribe_returns_ok(self): r = client.post("/subscribe", json={"symbol": "AAPL"}) assert r.status_code == 200 body = r.json() assert "status" in body, f"Expected 'status' key in response: {body}" def test_webhook_wrong_secret_403(self): r = client.post( "/webhook", json={ "secret": "totally_wrong_secret_xyz", "symbol": "AAPL", "strategy": {"order_action": "BUY", "order_contracts": 1}, }, ) assert r.status_code == 403 def test_webhook_limit_missing_price_400(self): r = client.post( "/webhook", json={ "secret": WEBHOOK_SECRET, "symbol": "AAPL", "order_type": "LIMIT", "strategy": {"order_action": "BUY", "order_contracts": 1}, }, ) assert r.status_code == 400 def test_scanner_export_csv(self): r = client.post("/scanner/export", json={"scan_type": "HOT_BY_VOLUME"}) assert r.status_code in (200, 500), ( f"Expected 200 (CSV or error JSON) or 500, got {r.status_code}" ) if r.status_code == 200: ct = r.headers.get("content-type", "") if "csv" not in ct.lower(): body = r.json() assert body.get("status") == "error", ( f"Expected CSV or error JSON, got content-type={ct}: {r.text[:120]}" ) # ── Docker Readiness Tests ──────────────────────────────────────────────────── class TestDockerReadiness: def test_env_example_exists(self): assert (BASE / ".env.example").exists(), ( ".env.example not found — create from .env.example template" ) def test_dockerfile_exists(self): assert (BASE / "Dockerfile").exists(), "Dockerfile not found" def test_docker_compose_exists(self): assert (BASE / "docker-compose.yml").exists(), "docker-compose.yml not found" def test_verify_script_exists(self): assert (BASE / "verify_docker.sh").exists(), ( "verify_docker.sh not found" ) def test_nginx_conf_removed(self): assert not (BASE / "nginx.conf").exists(), ( "nginx.conf should be removed — Traefik is now the reverse proxy" ) def test_env_example_has_tws_userid(self): env_example = (BASE / ".env.example").read_text() assert "TWS_USERID" in env_example, ".env.example must define TWS_USERID" def test_env_example_has_tws_password(self): env_example = (BASE / ".env.example").read_text() assert "TWS_PASSWORD" in env_example, ".env.example must define TWS_PASSWORD" def test_env_example_has_traefik_vars(self): env_example = (BASE / ".env.example").read_text() assert "TRAEFIK_NETWORK" in env_example, ".env.example must define TRAEFIK_NETWORK" assert "TRAEFIK_HOST" in env_example, ".env.example must define TRAEFIK_HOST" def test_docker_compose_uses_gnzsnz_image(self): compose = (BASE / "docker-compose.yml").read_text() assert "gnzsnz/ib-gateway" in compose, ( "docker-compose.yml must use ghcr.io/gnzsnz/ib-gateway image" ) def test_docker_compose_has_healthcheck(self): compose = (BASE / "docker-compose.yml").read_text() assert "healthcheck" in compose, ( "docker-compose.yml must define healthchecks" ) def test_docker_compose_has_traefik_labels(self): compose = (BASE / "docker-compose.yml").read_text() assert "traefik.enable=true" in compose, ( "docker-compose.yml must have Traefik labels" ) assert "flushinterval=-1" in compose, ( "docker-compose.yml must set SSE flush interval to -1 for Traefik" ) def test_dockerfile_has_curl(self): dockerfile = (BASE / "Dockerfile").read_text() assert "curl" in dockerfile, ( "Dockerfile must install curl (required for healthcheck)" ) def test_requirements_has_httpx(self): reqs = (BASE / "requirements.txt").read_text() assert "httpx" in reqs, ( "requirements.txt must include httpx (FastAPI TestClient dependency)" ) # ── Health Endpoint Tests ───────────────────────────────────────────────────── class TestHealthEndpoint: def test_health_200(self): r = client.get("/health") assert r.status_code == 200 def test_health_returns_status_field(self): r = client.get("/health") body = r.json() assert "status" in body, f"Expected 'status' key in /health response: {body}" def test_health_returns_ib_connected(self): r = client.get("/health") body = r.json() assert "ib_connected" in body def test_health_returns_db_ok(self): r = client.get("/health") body = r.json() assert "db_ok" in body def test_health_returns_version(self): r = client.get("/health") body = r.json() assert body.get("version") == "3.0"