initial: alpaclaudia paper-trading bot + dashboard

Python bot (bot/alpaclaudia): alpaca-py client, wheel strategy (CSP + covered
calls) plus equity trailing stops, risk gates (cash buffer, cost-basis guard,
per-symbol concentration cap), SQLite state log, Typer CLI (tick/loop/status/
report/dump-state), Discord daily report, pytest suite.

Next.js 14 dashboard (dashboard/): read-only — reads the bot's SQLite directly
and pulls live account/positions/orders from Alpaca. KPIs, equity chart,
positions, bot-intents audit table, and orders table. Dark UI with Tailwind.

systemd/: user-unit templates for the polling loop and the post-close report
timer.

docs/STRATEGY.md: wheel mechanics, risk invariants, later candidates.

Defaults to BOT_MODE=dry — nothing is submitted to Alpaca until explicitly
enabled in .env. ALPACA_ENV=paper by default; flipping to live requires an
explicit second guard.
This commit is contained in:
2026-04-16 21:38:25 +02:00
commit 39875112a0
46 changed files with 5195 additions and 0 deletions
+2
View File
@@ -0,0 +1,2 @@
"""alpaclaudia — Alpaca paper-trading bot (Wheel + trailing stops)."""
__version__ = "0.1.0"
+121
View File
@@ -0,0 +1,121 @@
"""CLI entry point. Usage:
alpaclaudia tick # one-shot iteration
alpaclaudia loop # poll forever (intended for systemd)
alpaclaudia report # post daily summary to Discord
alpaclaudia status # print account snapshot + last tick
Honours BOT_MODE=dry (default) — nothing is submitted until BOT_MODE=live.
"""
from __future__ import annotations
import json
from datetime import datetime, time as dtime, timezone
import typer
from rich import print as rprint
from rich.table import Table
from .client import (
build_clients,
get_account_snapshot,
list_positions,
list_recent_orders,
)
from .config import load_config
from .logger import get_logger, setup_logging
from .reporter import build_daily_summary, post_to_discord
from .scheduler import loop as run_loop, tick as run_tick
from .state import Store
app = typer.Typer(help="alpaclaudia — Alpaca paper-trading bot.")
log = get_logger(__name__)
def _bootstrap():
cfg = load_config()
setup_logging(cfg.log_dir)
store = Store(cfg.data_dir / "alpaclaudia.db")
clients = build_clients(cfg.alpaca)
return cfg, store, clients
@app.command()
def tick():
"""Run one planning/execution cycle."""
cfg, store, clients = _bootstrap()
result = run_tick(cfg, clients, store)
rprint(result)
@app.command()
def loop():
"""Run indefinitely, polling every TICK_INTERVAL_SECONDS."""
cfg, store, clients = _bootstrap()
run_loop(cfg, clients, store)
@app.command()
def status():
"""Print account + positions snapshot."""
cfg, store, clients = _bootstrap()
acct = get_account_snapshot(clients)
positions = list_positions(clients)
rprint({"config_mode": cfg.mode, "env": cfg.alpaca.env, "account": acct})
t = Table(title="Positions")
for col in ("symbol", "qty", "avg_entry_price", "current_price", "market_value", "unrealized_pl"):
t.add_column(col)
for p in positions:
t.add_row(
str(p["symbol"]),
f"{p['qty']:g}",
f"{p['avg_entry_price']:.2f}",
f"{p['current_price']:.2f}",
f"{p['market_value']:.2f}",
f"{p['unrealized_pl']:+.2f}",
)
rprint(t)
@app.command()
def report():
"""Build today's summary and post to Discord webhook if configured."""
cfg, store, clients = _bootstrap()
acct = get_account_snapshot(clients)
positions = list_positions(clients)
today_start = datetime.now(tz=timezone.utc).replace(
hour=0, minute=0, second=0, microsecond=0
).isoformat()
intents_today = store.intents_since(today_start)
summary = build_daily_summary(
account=acct,
positions=positions,
intents_today=intents_today,
mode=cfg.mode,
)
rprint(summary)
if cfg.discord_webhook:
ok = post_to_discord(cfg.discord_webhook, summary)
rprint({"discord_posted": ok})
else:
rprint("(DISCORD_WEBHOOK_URL not set — skipping post)")
@app.command()
def dump_state(limit: int = 50):
"""Dump recent ticks + intents as JSON (for dashboards / debugging)."""
cfg, store, _ = _bootstrap()
rprint(
json.dumps(
{
"ticks": store.recent_ticks(limit),
"intents": store.recent_intents(limit),
},
default=str,
indent=2,
)
)
if __name__ == "__main__":
app()
+139
View File
@@ -0,0 +1,139 @@
"""Thin Alpaca client wrappers + helpers for spot, chains, and account state."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import date, timedelta
from typing import Any
from alpaca.data.historical.option import OptionHistoricalDataClient
from alpaca.data.historical.stock import StockHistoricalDataClient
from alpaca.data.requests import OptionChainRequest, StockLatestQuoteRequest
from alpaca.trading.client import TradingClient
from alpaca.trading.enums import AssetClass
from alpaca.trading.requests import GetOptionContractsRequest
from .config import AlpacaConfig
@dataclass
class Clients:
trading: TradingClient
stock_data: StockHistoricalDataClient
option_data: OptionHistoricalDataClient
def build_clients(cfg: AlpacaConfig) -> Clients:
if not cfg.key or not cfg.secret:
raise RuntimeError(
"Alpaca credentials missing. Set ALPACA_API_KEY / ALPACA_API_SECRET."
)
trading = TradingClient(cfg.key, cfg.secret, paper=cfg.is_paper)
stock_data = StockHistoricalDataClient(cfg.key, cfg.secret)
option_data = OptionHistoricalDataClient(cfg.key, cfg.secret)
return Clients(trading=trading, stock_data=stock_data, option_data=option_data)
def get_latest_spot(clients: Clients, symbol: str) -> float | None:
req = StockLatestQuoteRequest(symbol_or_symbols=symbol)
resp = clients.stock_data.get_stock_latest_quote(req)
q = resp.get(symbol) if isinstance(resp, dict) else getattr(resp, symbol, None)
if q is None:
return None
# mid-price if both sides present, else fallback to either
bid = getattr(q, "bid_price", None) or 0
ask = getattr(q, "ask_price", None) or 0
if bid and ask:
return (bid + ask) / 2
return bid or ask or None
def list_option_contracts(
clients: Clients,
underlying: str,
*,
option_type: str, # "put" or "call"
dte_min: int,
dte_max: int,
) -> list[Any]:
today = date.today()
req = GetOptionContractsRequest(
underlying_symbols=[underlying],
type=option_type,
expiration_date_gte=today + timedelta(days=dte_min),
expiration_date_lte=today + timedelta(days=dte_max),
status="active",
limit=500,
)
resp = clients.trading.get_option_contracts(req)
return list(getattr(resp, "option_contracts", resp) or [])
def get_option_snapshots(clients: Clients, underlying: str) -> dict[str, Any]:
"""Returns {contract_symbol: snapshot} with greeks/quote when available."""
try:
req = OptionChainRequest(underlying_symbol=underlying)
chain = clients.option_data.get_option_chain(req)
return dict(chain) if chain else {}
except Exception:
return {}
def get_account_snapshot(clients: Clients) -> dict[str, Any]:
acct = clients.trading.get_account()
return {
"equity": float(getattr(acct, "equity", 0) or 0),
"cash": float(getattr(acct, "cash", 0) or 0),
"buying_power": float(getattr(acct, "buying_power", 0) or 0),
"portfolio_value": float(getattr(acct, "portfolio_value", 0) or 0),
"status": str(getattr(acct, "status", "")),
"pattern_day_trader": bool(getattr(acct, "pattern_day_trader", False)),
}
def list_positions(clients: Clients) -> list[dict[str, Any]]:
pos = clients.trading.get_all_positions() or []
out: list[dict[str, Any]] = []
for p in pos:
out.append(
{
"symbol": getattr(p, "symbol", ""),
"asset_class": str(getattr(p, "asset_class", "")),
"qty": float(getattr(p, "qty", 0) or 0),
"avg_entry_price": float(getattr(p, "avg_entry_price", 0) or 0),
"market_value": float(getattr(p, "market_value", 0) or 0),
"unrealized_pl": float(getattr(p, "unrealized_pl", 0) or 0),
"unrealized_plpc": float(getattr(p, "unrealized_plpc", 0) or 0),
"current_price": float(getattr(p, "current_price", 0) or 0),
"side": str(getattr(p, "side", "")),
"is_option": str(getattr(p, "asset_class", "")).lower().endswith("option"),
}
)
return out
def list_recent_orders(clients: Clients, *, limit: int = 100) -> list[dict[str, Any]]:
from alpaca.trading.requests import GetOrdersRequest
req = GetOrdersRequest(status="all", limit=limit, nested=True)
orders = clients.trading.get_orders(filter=req) or []
out: list[dict[str, Any]] = []
for o in orders:
out.append(
{
"id": str(getattr(o, "id", "")),
"symbol": getattr(o, "symbol", ""),
"side": str(getattr(o, "side", "")),
"qty": float(getattr(o, "qty", 0) or 0),
"filled_qty": float(getattr(o, "filled_qty", 0) or 0),
"order_type": str(getattr(o, "order_type", "")),
"status": str(getattr(o, "status", "")),
"submitted_at": str(getattr(o, "submitted_at", "")),
"filled_avg_price": (
float(getattr(o, "filled_avg_price", 0) or 0)
if getattr(o, "filled_avg_price", None) is not None
else None
),
"asset_class": str(getattr(o, "asset_class", AssetClass.US_EQUITY)),
}
)
return out
+134
View File
@@ -0,0 +1,134 @@
from __future__ import annotations
import os
from dataclasses import dataclass, field
from pathlib import Path
from dotenv import load_dotenv
def _load_env() -> None:
root = Path(__file__).resolve().parents[2]
for candidate in (root / ".env", root.parent / ".env"):
if candidate.exists():
load_dotenv(candidate, override=False)
return
def _bool(val: str | None, default: bool = False) -> bool:
if val is None:
return default
return val.strip().lower() in {"1", "true", "yes", "y", "on"}
def _float(val: str | None, default: float) -> float:
try:
return float(val) if val is not None else default
except ValueError:
return default
def _int(val: str | None, default: int) -> int:
try:
return int(val) if val is not None else default
except ValueError:
return default
@dataclass(frozen=True)
class AlpacaConfig:
key: str
secret: str
base_url: str
env: str # "paper" or "live"
@property
def is_paper(self) -> bool:
return self.env.lower() != "live"
@dataclass(frozen=True)
class WheelConfig:
universe: tuple[str, ...]
put_dte_min: int
put_dte_max: int
put_target_delta: float
put_otm_pct: float
call_otm_pct: float
min_annual_yield: float
max_short_puts_per_symbol: int
@dataclass(frozen=True)
class RiskConfig:
max_position_pct: float
min_cash_buffer_pct: float
trailing_stop_enabled: bool
trailing_stop_pct: float
@dataclass(frozen=True)
class Config:
mode: str # "dry" or "live"
alpaca: AlpacaConfig
wheel: WheelConfig
risk: RiskConfig
tick_interval: int
data_dir: Path
log_dir: Path
discord_webhook: str | None = None
@property
def dry_run(self) -> bool:
return self.mode.lower() != "live"
def load_config() -> Config:
_load_env()
env = os.environ
alpaca = AlpacaConfig(
key=env.get("ALPACA_API_KEY", ""),
secret=env.get("ALPACA_API_SECRET", ""),
base_url=env.get("ALPACA_BASE_URL", "https://paper-api.alpaca.markets"),
env=env.get("ALPACA_ENV", "paper"),
)
universe = tuple(
s.strip().upper() for s in env.get("WHEEL_UNIVERSE", "TSLA").split(",") if s.strip()
)
wheel = WheelConfig(
universe=universe,
put_dte_min=_int(env.get("WHEEL_PUT_DTE_MIN"), 14),
put_dte_max=_int(env.get("WHEEL_PUT_DTE_MAX"), 28),
put_target_delta=_float(env.get("WHEEL_PUT_TARGET_DELTA"), 0.30),
put_otm_pct=_float(env.get("WHEEL_PUT_OTM_PCT"), 0.10),
call_otm_pct=_float(env.get("WHEEL_CALL_OTM_PCT"), 0.10),
min_annual_yield=_float(env.get("WHEEL_MIN_ANNUAL_YIELD"), 0.15),
max_short_puts_per_symbol=_int(env.get("WHEEL_MAX_SHORT_PUTS_PER_SYMBOL"), 1),
)
risk = RiskConfig(
max_position_pct=_float(env.get("MAX_POSITION_PCT"), 0.25),
min_cash_buffer_pct=_float(env.get("MIN_CASH_BUFFER_PCT"), 0.05),
trailing_stop_enabled=_bool(env.get("TRAILING_STOP_ENABLED"), True),
trailing_stop_pct=_float(env.get("TRAILING_STOP_PCT"), 0.08),
)
root = Path(__file__).resolve().parents[2]
data_dir = Path(env.get("DATA_DIR") or (root / "data")).expanduser().resolve()
log_dir = Path(env.get("LOG_DIR") or (root / "logs")).expanduser().resolve()
data_dir.mkdir(parents=True, exist_ok=True)
log_dir.mkdir(parents=True, exist_ok=True)
return Config(
mode=env.get("BOT_MODE", "dry"),
alpaca=alpaca,
wheel=wheel,
risk=risk,
tick_interval=_int(env.get("TICK_INTERVAL_SECONDS"), 900),
data_dir=data_dir,
log_dir=log_dir,
discord_webhook=env.get("DISCORD_WEBHOOK_URL") or None,
)
+139
View File
@@ -0,0 +1,139 @@
"""Translates OrderIntent objects into Alpaca order requests.
All submissions go through submit_intent(). In dry-run mode it only records
the intent to SQLite and logs it; no network call to Alpaca is made.
"""
from __future__ import annotations
from typing import Any
from alpaca.trading.enums import OrderSide, OrderType, PositionIntent, TimeInForce
from alpaca.trading.requests import (
LimitOrderRequest,
MarketOrderRequest,
TrailingStopOrderRequest,
)
from .client import Clients
from .config import Config
from .logger import get_logger
from .state import Store
from .strategies.base import OrderIntent
log = get_logger(__name__)
_SIDE_MAP = {
"buy": OrderSide.BUY,
"sell": OrderSide.SELL,
"sell_to_open": OrderSide.SELL,
"buy_to_close": OrderSide.BUY,
}
_POS_INTENT_MAP = {
"sell_to_open": PositionIntent.SELL_TO_OPEN,
"buy_to_close": PositionIntent.BUY_TO_CLOSE,
"buy": PositionIntent.BUY_TO_OPEN,
"sell": PositionIntent.SELL_TO_CLOSE,
}
def _build_request(intent: OrderIntent) -> Any:
side = _SIDE_MAP[intent.side]
pos_intent = _POS_INTENT_MAP.get(intent.side)
common: dict[str, Any] = {
"symbol": intent.symbol,
"qty": intent.qty,
"side": side,
"time_in_force": TimeInForce.DAY,
}
if pos_intent and intent.side in {"sell_to_open", "buy_to_close"}:
common["position_intent"] = pos_intent
if intent.order_type == "trailing_stop":
return TrailingStopOrderRequest(
trail_percent=intent.trail_percent,
**common,
)
if intent.order_type == "limit":
return LimitOrderRequest(limit_price=intent.limit_price, **common)
return MarketOrderRequest(**common)
def submit_intent(
intent: OrderIntent,
*,
cfg: Config,
clients: Clients,
store: Store,
) -> dict[str, Any]:
"""Record the intent; submit iff BOT_MODE=live. Returns result dict."""
if cfg.dry_run:
intent_id = store.record_intent(
strategy=intent.strategy,
symbol=intent.symbol,
side=intent.side,
qty=intent.qty,
order_type=intent.order_type,
limit_price=intent.limit_price,
stop_price=intent.stop_price,
trail_percent=intent.trail_percent,
details={**intent.details, "rationale": intent.rationale},
submitted=False,
status="dry_run",
)
log.info(
"DRY %s %s %s qty=%s %s%s",
intent.strategy,
intent.side,
intent.symbol,
intent.qty,
intent.order_type,
intent.rationale,
)
return {"id": intent_id, "submitted": False, "status": "dry_run"}
req = _build_request(intent)
try:
order = clients.trading.submit_order(req)
alpaca_id = str(getattr(order, "id", ""))
status = str(getattr(order, "status", ""))
intent_id = store.record_intent(
strategy=intent.strategy,
symbol=intent.symbol,
side=intent.side,
qty=intent.qty,
order_type=intent.order_type,
limit_price=intent.limit_price,
stop_price=intent.stop_price,
trail_percent=intent.trail_percent,
details={**intent.details, "rationale": intent.rationale},
submitted=True,
alpaca_order_id=alpaca_id,
status=status,
)
log.info(
"SUBMIT %s %s %s qty=%s id=%s status=%s",
intent.strategy,
intent.side,
intent.symbol,
intent.qty,
alpaca_id,
status,
)
return {"id": intent_id, "submitted": True, "alpaca_order_id": alpaca_id, "status": status}
except Exception as exc: # noqa: BLE001 — we log everything, nothing is fatal at tick level
store.record_intent(
strategy=intent.strategy,
symbol=intent.symbol,
side=intent.side,
qty=intent.qty,
order_type=intent.order_type,
limit_price=intent.limit_price,
stop_price=intent.stop_price,
trail_percent=intent.trail_percent,
details={**intent.details, "rationale": intent.rationale, "error": str(exc)},
submitted=False,
status="error",
)
log.exception("submit failed for %s %s: %s", intent.symbol, intent.side, exc)
return {"submitted": False, "status": "error", "error": str(exc)}
+50
View File
@@ -0,0 +1,50 @@
from __future__ import annotations
import json
import logging
import sys
from datetime import datetime, timezone
from pathlib import Path
_CONFIGURED = False
class _JsonlFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
payload = {
"ts": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"msg": record.getMessage(),
}
if record.exc_info:
payload["exc"] = self.formatException(record.exc_info)
extra = getattr(record, "extra_fields", None)
if isinstance(extra, dict):
payload.update(extra)
return json.dumps(payload, default=str)
def setup_logging(log_dir: Path, level: int = logging.INFO) -> None:
global _CONFIGURED
if _CONFIGURED:
return
log_dir.mkdir(parents=True, exist_ok=True)
root = logging.getLogger()
root.setLevel(level)
root.handlers.clear()
stderr = logging.StreamHandler(sys.stderr)
stderr.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s: %(message)s"))
root.addHandler(stderr)
file_handler = logging.FileHandler(log_dir / "bot.jsonl", encoding="utf-8")
file_handler.setFormatter(_JsonlFormatter())
root.addHandler(file_handler)
_CONFIGURED = True
def get_logger(name: str) -> logging.Logger:
return logging.getLogger(name)
+94
View File
@@ -0,0 +1,94 @@
"""Discord webhook reporting."""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
import httpx
from .logger import get_logger
log = get_logger(__name__)
def _fmt_money(v: Any) -> str:
try:
return f"${float(v):,.2f}"
except (TypeError, ValueError):
return str(v)
def build_daily_summary(
*,
account: dict[str, Any],
positions: list[dict[str, Any]],
intents_today: list[dict[str, Any]],
mode: str,
) -> str:
today = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d")
lines: list[str] = [f"**alpaclaudia — Daily Report {today}** (`mode={mode}`)", ""]
lines.append(
f"**Portfolio:** equity {_fmt_money(account.get('equity'))} · "
f"cash {_fmt_money(account.get('cash'))} · "
f"buying power {_fmt_money(account.get('buying_power'))}"
)
lines.append("")
if positions:
lines.append(f"**Positions ({len(positions)}):**")
for p in positions[:15]:
pl = p.get("unrealized_pl") or 0
plpc = (p.get("unrealized_plpc") or 0) * 100
lines.append(
f"{p['symbol']} qty={p['qty']:g} "
f"mv={_fmt_money(p.get('market_value'))} "
f"P&L={_fmt_money(pl)} ({plpc:+.2f}%)"
)
if len(positions) > 15:
lines.append(f"{len(positions)-15} more")
else:
lines.append("**Positions:** none")
lines.append("")
submitted = [i for i in intents_today if i.get("submitted")]
dry = [i for i in intents_today if not i.get("submitted")]
lines.append(
f"**Today's orders:** {len(submitted)} submitted · {len(dry)} dry/blocked"
)
for i in intents_today[:20]:
tag = "" if i.get("submitted") else "·"
details = i.get("details_json")
lines.append(
f" {tag} [{i['strategy']}] {i['side']} {i['symbol']} qty={i['qty']} "
f"type={i['order_type']} status={i.get('status')}"
)
if len(intents_today) > 20:
lines.append(f"{len(intents_today)-20} more")
return "\n".join(lines)
def post_to_discord(webhook_url: str, content: str) -> bool:
# Discord message limit is 2000 chars; split conservatively.
chunks = []
buf: list[str] = []
size = 0
for line in content.splitlines(keepends=True):
if size + len(line) > 1900 and buf:
chunks.append("".join(buf))
buf, size = [], 0
buf.append(line)
size += len(line)
if buf:
chunks.append("".join(buf))
try:
with httpx.Client(timeout=15.0) as c:
for chunk in chunks:
r = c.post(webhook_url, json={"content": chunk})
r.raise_for_status()
return True
except Exception as exc: # noqa: BLE001
log.exception("discord webhook failed: %s", exc)
return False
+109
View File
@@ -0,0 +1,109 @@
"""Risk gates — every order passes through check_intent() before submission.
The core invariants this enforces:
- Cash-secured puts must actually be *cash-secured*: strike*100*qty ≤ free cash
(minus the required buffer).
- Covered calls cannot be sold below cost basis (strike ≥ avg_entry_price).
- Per-symbol concentration capped at MAX_POSITION_PCT of equity.
- The bot will never *short* stock, only sell what it already owns.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from .config import RiskConfig
@dataclass
class CheckResult:
ok: bool
reason: str = "ok"
@dataclass
class AccountSnapshot:
equity: float
cash: float
buying_power: float
def _position_for(symbol: str, positions: list[dict[str, Any]]) -> dict[str, Any] | None:
for p in positions:
if p.get("symbol") == symbol:
return p
return None
def check_csp(
*,
underlying: str,
strike: float,
qty: int,
account: AccountSnapshot,
existing_short_puts: int,
max_short_puts_per_symbol: int,
risk_cfg: RiskConfig,
) -> CheckResult:
if qty <= 0:
return CheckResult(False, "qty must be positive")
if strike <= 0:
return CheckResult(False, "invalid strike")
if existing_short_puts + qty > max_short_puts_per_symbol:
return CheckResult(
False,
f"would exceed max_short_puts_per_symbol={max_short_puts_per_symbol}",
)
collateral = strike * 100 * qty
buffer_floor = account.equity * risk_cfg.min_cash_buffer_pct
if collateral > (account.cash - buffer_floor):
return CheckResult(
False,
f"insufficient cash: need {collateral:.2f}, have {account.cash:.2f} "
f"(buffer floor {buffer_floor:.2f})",
)
position_notional = collateral
if position_notional > account.equity * risk_cfg.max_position_pct:
return CheckResult(
False,
f"collateral {collateral:.0f} exceeds per-symbol cap "
f"({risk_cfg.max_position_pct*100:.0f}% of equity)",
)
return CheckResult(True)
def check_covered_call(
*,
underlying: str,
strike: float,
qty: int,
positions: list[dict[str, Any]],
) -> CheckResult:
if qty <= 0:
return CheckResult(False, "qty must be positive")
pos = _position_for(underlying, positions)
if not pos or pos.get("qty", 0) < qty * 100:
return CheckResult(
False,
f"no long stock to cover ({qty*100} shares required)",
)
cost_basis = pos.get("avg_entry_price") or 0
if strike < cost_basis:
return CheckResult(
False,
f"strike {strike:.2f} below cost basis {cost_basis:.2f} — would lock in loss",
)
return CheckResult(True)
def check_trailing_stop(
*,
symbol: str,
qty: float,
positions: list[dict[str, Any]],
) -> CheckResult:
pos = _position_for(symbol, positions)
if not pos or pos.get("qty", 0) < qty:
return CheckResult(False, "no long stock to trail")
return CheckResult(True)
+193
View File
@@ -0,0 +1,193 @@
"""Single tick + loop driver. Pure orchestration — no strategy logic here."""
from __future__ import annotations
import time
from datetime import datetime, time as dtime, timezone
from typing import Any
from .client import (
Clients,
build_clients,
get_account_snapshot,
list_positions,
list_recent_orders,
)
from .config import Config
from .logger import get_logger
from .executor import submit_intent
from .risk import (
AccountSnapshot,
check_covered_call,
check_csp,
check_trailing_stop,
)
from .state import Store
from .strategies.trailing_stop import plan_trailing_stops
from .strategies.wheel import plan_wheel
log = get_logger(__name__)
def _is_market_open(clients: Clients) -> bool:
try:
clock = clients.trading.get_clock()
return bool(getattr(clock, "is_open", False))
except Exception as exc: # noqa: BLE001
log.warning("clock check failed: %s", exc)
return False
def _enforce_risk(
intent,
*,
account_snap: AccountSnapshot,
positions: list[dict[str, Any]],
cfg: Config,
) -> tuple[bool, str]:
if intent.strategy == "wheel:cash_secured_put":
existing = sum(
1 for p in positions if p["is_option"] and p["qty"] < 0 and "P" in p["symbol"]
)
r = check_csp(
underlying=intent.details.get("underlying", ""),
strike=float(intent.details.get("strike") or 0),
qty=int(intent.qty),
account=account_snap,
existing_short_puts=existing,
max_short_puts_per_symbol=cfg.wheel.max_short_puts_per_symbol,
risk_cfg=cfg.risk,
)
return r.ok, r.reason
if intent.strategy == "wheel:covered_call":
r = check_covered_call(
underlying=intent.details.get("underlying", ""),
strike=float(intent.details.get("strike") or 0),
qty=int(intent.qty),
positions=positions,
)
return r.ok, r.reason
if intent.strategy == "trailing_stop":
r = check_trailing_stop(
symbol=intent.symbol, qty=float(intent.qty), positions=positions
)
return r.ok, r.reason
return True, "ok"
def tick(cfg: Config, clients: Clients, store: Store) -> dict[str, Any]:
t0 = time.time()
account = get_account_snapshot(clients)
positions = list_positions(clients)
orders = list_recent_orders(clients, limit=100)
account_snap = AccountSnapshot(
equity=account["equity"],
cash=account["cash"],
buying_power=account["buying_power"],
)
store.record_tick(
equity=account["equity"],
cash=account["cash"],
buying_power=account["buying_power"],
mode=cfg.mode,
)
intents = []
intents.extend(
plan_wheel(
cfg=cfg,
clients=clients,
account=account,
positions=positions,
orders=orders,
)
)
intents.extend(
plan_trailing_stops(
cfg=cfg,
clients=clients,
account=account,
positions=positions,
orders=orders,
)
)
submitted = 0
blocked = 0
for intent in intents:
ok, reason = _enforce_risk(
intent, account_snap=account_snap, positions=positions, cfg=cfg
)
if not ok:
blocked += 1
store.record_intent(
strategy=intent.strategy,
symbol=intent.symbol,
side=intent.side,
qty=intent.qty,
order_type=intent.order_type,
limit_price=intent.limit_price,
stop_price=intent.stop_price,
trail_percent=intent.trail_percent,
details={
**intent.details,
"rationale": intent.rationale,
"blocked_reason": reason,
},
submitted=False,
status="blocked",
)
log.warning("BLOCKED %s %s: %s", intent.strategy, intent.symbol, reason)
continue
submit_intent(intent, cfg=cfg, clients=clients, store=store)
submitted += 1
dt = time.time() - t0
store.record_event(
"tick",
{
"duration_s": round(dt, 3),
"intents": len(intents),
"submitted": submitted,
"blocked": blocked,
"positions": len(positions),
},
)
log.info(
"tick: %.2fs intents=%d submitted=%d blocked=%d mode=%s",
dt,
len(intents),
submitted,
blocked,
cfg.mode,
)
return {
"intents": len(intents),
"submitted": submitted,
"blocked": blocked,
"duration_s": dt,
}
def loop(cfg: Config, clients: Clients, store: Store) -> None:
"""Poll-loop. Skips ticks when market is closed (paper or not).
Market hours only — outside hours we still emit a heartbeat to the state DB
so the dashboard can see we're alive, but we don't plan orders.
"""
interval = max(60, cfg.tick_interval)
log.info("loop start: interval=%ss mode=%s", interval, cfg.mode)
while True:
try:
if _is_market_open(clients):
tick(cfg, clients, store)
else:
store.record_event("heartbeat", {"market_open": False})
log.info("market closed; sleeping")
except Exception as exc: # noqa: BLE001 — never die inside the loop
log.exception("tick error: %s", exc)
store.record_event("error", {"message": str(exc)})
time.sleep(interval)
__all__ = ["tick", "loop", "build_clients"]
+152
View File
@@ -0,0 +1,152 @@
from __future__ import annotations
import json
import sqlite3
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Iterator
SCHEMA = """
CREATE TABLE IF NOT EXISTS ticks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts TEXT NOT NULL,
equity REAL,
cash REAL,
buying_power REAL,
mode TEXT NOT NULL,
note TEXT
);
CREATE TABLE IF NOT EXISTS order_intents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts TEXT NOT NULL,
strategy TEXT NOT NULL,
symbol TEXT NOT NULL,
side TEXT NOT NULL,
qty REAL NOT NULL,
order_type TEXT NOT NULL,
limit_price REAL,
stop_price REAL,
trail_percent REAL,
details_json TEXT,
submitted INTEGER NOT NULL DEFAULT 0,
alpaca_order_id TEXT,
status TEXT
);
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts TEXT NOT NULL,
kind TEXT NOT NULL,
payload_json TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_order_intents_ts ON order_intents (ts);
CREATE INDEX IF NOT EXISTS idx_events_ts ON events (ts);
"""
class Store:
def __init__(self, path: Path) -> None:
self.path = path
path.parent.mkdir(parents=True, exist_ok=True)
with self._conn() as c:
c.executescript(SCHEMA)
@contextmanager
def _conn(self) -> Iterator[sqlite3.Connection]:
conn = sqlite3.connect(self.path)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
@staticmethod
def _now() -> str:
return datetime.now(tz=timezone.utc).isoformat()
def record_tick(
self,
*,
equity: float | None,
cash: float | None,
buying_power: float | None,
mode: str,
note: str | None = None,
) -> None:
with self._conn() as c:
c.execute(
"INSERT INTO ticks (ts, equity, cash, buying_power, mode, note) VALUES (?,?,?,?,?,?)",
(self._now(), equity, cash, buying_power, mode, note),
)
def record_intent(
self,
*,
strategy: str,
symbol: str,
side: str,
qty: float,
order_type: str,
limit_price: float | None = None,
stop_price: float | None = None,
trail_percent: float | None = None,
details: dict[str, Any] | None = None,
submitted: bool = False,
alpaca_order_id: str | None = None,
status: str | None = None,
) -> int:
with self._conn() as c:
cur = c.execute(
"""INSERT INTO order_intents
(ts, strategy, symbol, side, qty, order_type, limit_price, stop_price,
trail_percent, details_json, submitted, alpaca_order_id, status)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
self._now(),
strategy,
symbol,
side,
qty,
order_type,
limit_price,
stop_price,
trail_percent,
json.dumps(details or {}, default=str),
1 if submitted else 0,
alpaca_order_id,
status,
),
)
return int(cur.lastrowid)
def record_event(self, kind: str, payload: dict[str, Any]) -> None:
with self._conn() as c:
c.execute(
"INSERT INTO events (ts, kind, payload_json) VALUES (?,?,?)",
(self._now(), kind, json.dumps(payload, default=str)),
)
def recent_ticks(self, limit: int = 200) -> list[dict[str, Any]]:
with self._conn() as c:
rows = c.execute(
"SELECT * FROM ticks ORDER BY id DESC LIMIT ?", (limit,)
).fetchall()
return [dict(r) for r in rows]
def recent_intents(self, limit: int = 200) -> list[dict[str, Any]]:
with self._conn() as c:
rows = c.execute(
"SELECT * FROM order_intents ORDER BY id DESC LIMIT ?", (limit,)
).fetchall()
return [dict(r) for r in rows]
def intents_since(self, since_iso: str) -> list[dict[str, Any]]:
with self._conn() as c:
rows = c.execute(
"SELECT * FROM order_intents WHERE ts >= ? ORDER BY id ASC", (since_iso,)
).fetchall()
return [dict(r) for r in rows]
+4
View File
@@ -0,0 +1,4 @@
"""Strategy modules. Each emits zero-or-more OrderIntent objects per tick."""
from .base import OrderIntent, Strategy
__all__ = ["OrderIntent", "Strategy"]
+36
View File
@@ -0,0 +1,36 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Protocol
from ..client import Clients
from ..config import Config
@dataclass
class OrderIntent:
strategy: str
symbol: str # OCC symbol for options, ticker for equity
side: str # "buy" | "sell" | "sell_to_open" | "buy_to_close"
qty: float
order_type: str # "market" | "limit" | "trailing_stop"
limit_price: float | None = None
stop_price: float | None = None
trail_percent: float | None = None
details: dict[str, Any] = field(default_factory=dict)
rationale: str = ""
class Strategy(Protocol):
name: str
def plan(
self,
*,
cfg: Config,
clients: Clients,
account: dict[str, Any],
positions: list[dict[str, Any]],
orders: list[dict[str, Any]],
) -> list[OrderIntent]:
...
@@ -0,0 +1,70 @@
"""Trailing stops on long equity positions.
For each stock position we intend to submit a trailing_stop sell order with
TRAILING_STOP_PCT trail. On each tick we check open orders and only submit
if none exists for that symbol (Alpaca rejects duplicate open stops).
"""
from __future__ import annotations
from typing import Any
from ..config import Config
from ..logger import get_logger
from .base import OrderIntent
log = get_logger(__name__)
def _has_open_trailing_stop(symbol: str, orders: list[dict[str, Any]]) -> bool:
for o in orders:
if (
o.get("symbol") == symbol
and "trailing" in (o.get("order_type") or "").lower()
and (o.get("status") or "").lower() in {"new", "accepted", "open", "pending_new", "held"}
):
return True
return False
def plan_trailing_stops(
*,
cfg: Config,
clients: Any,
account: dict[str, Any],
positions: list[dict[str, Any]],
orders: list[dict[str, Any]],
) -> list[OrderIntent]:
if not cfg.risk.trailing_stop_enabled:
return []
intents: list[OrderIntent] = []
trail_pct = cfg.risk.trailing_stop_pct * 100 # Alpaca expects percent as "3" for 3%
for p in positions:
if p.get("is_option"):
continue
if p.get("qty", 0) <= 0:
continue
sym = p["symbol"]
if _has_open_trailing_stop(sym, orders):
continue
intents.append(
OrderIntent(
strategy="trailing_stop",
symbol=sym,
side="sell",
qty=float(p["qty"]),
order_type="trailing_stop",
trail_percent=trail_pct,
details={
"avg_entry_price": p.get("avg_entry_price"),
"current_price": p.get("current_price"),
"market_value": p.get("market_value"),
"unrealized_pl": p.get("unrealized_pl"),
},
rationale=(
f"trailing stop {trail_pct:.1f}% on {p['qty']} {sym} "
f"(entry {p.get('avg_entry_price')}, now {p.get('current_price')})"
),
)
)
return intents
+254
View File
@@ -0,0 +1,254 @@
"""Wheel strategy.
Phase 1 (no long stock): sell cash-secured puts ~target-delta, 2-4 weeks out,
meeting a minimum annualised yield.
Phase 2 (assigned, holding 100+ shares): sell covered calls above cost basis.
Contract selection is intentionally conservative:
- Only contracts within the DTE window.
- Prefer the contract whose absolute delta is closest to the target.
If greeks are missing, fall back to closest strike within ±OTM window.
- Annualised yield = (premium / collateral) * (365 / DTE).
Reject if below min_annual_yield.
"""
from __future__ import annotations
from datetime import date
from typing import Any
from ..client import (
Clients,
get_latest_spot,
get_option_snapshots,
list_option_contracts,
)
from ..config import Config
from ..logger import get_logger
from .base import OrderIntent
log = get_logger(__name__)
def _dte(exp: Any) -> int:
if isinstance(exp, str):
exp = date.fromisoformat(exp[:10])
return (exp - date.today()).days
def _mid_from_snapshot(snap: Any) -> float | None:
if snap is None:
return None
q = getattr(snap, "latest_quote", None) or getattr(snap, "quote", None)
if q is None:
return None
bid = getattr(q, "bid_price", None) or 0
ask = getattr(q, "ask_price", None) or 0
if bid and ask:
return (bid + ask) / 2
return bid or ask or None
def _delta_from_snapshot(snap: Any) -> float | None:
greeks = getattr(snap, "greeks", None)
if greeks is None:
return None
d = getattr(greeks, "delta", None)
return float(d) if d is not None else None
def _annualised_yield(premium: float, strike: float, dte: int) -> float:
if dte <= 0 or strike <= 0:
return 0.0
collateral = strike * 100
return (premium * 100 / collateral) * (365 / dte)
def _pick_contract(
contracts: list[Any],
snapshots: dict[str, Any],
*,
target_delta: float,
spot: float,
otm_pct: float,
option_type: str,
) -> tuple[Any, dict[str, Any]] | None:
best: tuple[Any, dict[str, Any]] | None = None
best_score = float("inf")
for c in contracts:
sym = getattr(c, "symbol", None)
if not sym:
continue
strike = float(getattr(c, "strike_price", 0) or 0)
if strike <= 0:
continue
snap = snapshots.get(sym)
delta = _delta_from_snapshot(snap)
mid = _mid_from_snapshot(snap)
dte = _dte(getattr(c, "expiration_date", None))
if dte <= 0:
continue
if delta is not None:
score = abs(abs(delta) - target_delta)
else:
# fallback: distance from target OTM strike
target_strike = (
spot * (1 - otm_pct) if option_type == "put" else spot * (1 + otm_pct)
)
score = abs(strike - target_strike) / max(spot, 1)
info = {
"symbol": sym,
"strike": strike,
"expiration": str(getattr(c, "expiration_date", "")),
"dte": dte,
"delta": delta,
"premium_mid": mid,
"open_interest": getattr(c, "open_interest", None),
}
if score < best_score:
best_score = score
best = (c, info)
return best
def plan_wheel(
*,
cfg: Config,
clients: Clients,
account: dict[str, Any],
positions: list[dict[str, Any]],
orders: list[dict[str, Any]],
) -> list[OrderIntent]:
intents: list[OrderIntent] = []
equity = float(account.get("equity") or 0)
cash = float(account.get("cash") or 0)
for underlying in cfg.wheel.universe:
stock_pos = next(
(p for p in positions if p["symbol"] == underlying and not p["is_option"]),
None,
)
short_put_count = sum(
1
for p in positions
if p["is_option"]
and p["symbol"].startswith(underlying)
and "P" in p["symbol"]
and p["qty"] < 0
)
spot = get_latest_spot(clients, underlying)
if spot is None:
log.warning("no spot for %s; skipping", underlying)
continue
snapshots = get_option_snapshots(clients, underlying)
shares = int(stock_pos["qty"]) if stock_pos else 0
lots_coverable = shares // 100
if lots_coverable >= 1:
# Phase 2: covered calls on every uncovered lot
short_call_count = sum(
1
for p in positions
if p["is_option"]
and p["symbol"].startswith(underlying)
and "C" in p["symbol"]
and p["qty"] < 0
)
uncovered = lots_coverable - short_call_count
if uncovered > 0:
contracts = list_option_contracts(
clients,
underlying,
option_type="call",
dte_min=cfg.wheel.put_dte_min,
dte_max=cfg.wheel.put_dte_max,
)
# target delta for calls: mirror (positive)
picked = _pick_contract(
contracts,
snapshots,
target_delta=cfg.wheel.put_target_delta,
spot=spot,
otm_pct=cfg.wheel.call_otm_pct,
option_type="call",
)
if picked is not None:
contract, info = picked
cost_basis = float(stock_pos["avg_entry_price"]) if stock_pos else 0
if info["strike"] < cost_basis:
log.info(
"%s covered-call candidate strike %.2f below cost basis %.2f — skip",
underlying,
info["strike"],
cost_basis,
)
else:
intents.append(
OrderIntent(
strategy="wheel:covered_call",
symbol=info["symbol"],
side="sell_to_open",
qty=min(uncovered, 1),
order_type="limit" if info["premium_mid"] else "market",
limit_price=info["premium_mid"],
details={**info, "underlying": underlying, "spot": spot},
rationale=(
f"cover {uncovered} lot(s) of {underlying} @ "
f"{info['strike']:.2f} ({info['dte']}dte)"
),
)
)
# Phase 1: sell CSPs if room remains
if short_put_count < cfg.wheel.max_short_puts_per_symbol:
contracts = list_option_contracts(
clients,
underlying,
option_type="put",
dte_min=cfg.wheel.put_dte_min,
dte_max=cfg.wheel.put_dte_max,
)
picked = _pick_contract(
contracts,
snapshots,
target_delta=cfg.wheel.put_target_delta,
spot=spot,
otm_pct=cfg.wheel.put_otm_pct,
option_type="put",
)
if picked is None:
continue
contract, info = picked
premium = info["premium_mid"] or 0
yield_ann = _annualised_yield(premium, info["strike"], max(info["dte"], 1))
info["annualised_yield"] = yield_ann
if yield_ann < cfg.wheel.min_annual_yield:
log.info(
"%s CSP candidate yield %.2f%% < %.2f%% — skip",
underlying,
yield_ann * 100,
cfg.wheel.min_annual_yield * 100,
)
continue
intents.append(
OrderIntent(
strategy="wheel:cash_secured_put",
symbol=info["symbol"],
side="sell_to_open",
qty=1,
order_type="limit" if premium else "market",
limit_price=premium or None,
details={**info, "underlying": underlying, "spot": spot, "cash": cash, "equity": equity},
rationale=(
f"CSP {underlying} @ {info['strike']:.2f} "
f"({info['dte']}dte, Δ={info['delta']}, ann.yield={yield_ann*100:.1f}%)"
),
)
)
return intents
+35
View File
@@ -0,0 +1,35 @@
[project]
name = "alpaclaudia"
version = "0.1.0"
description = "Alpaca Paper-Trading Bot — Wheel strategy + equity trailing stops."
requires-python = ">=3.11"
dependencies = [
"alpaca-py>=0.43.0",
"python-dotenv>=1.0.1",
"httpx>=0.27.0",
"pydantic>=2.7.0",
"typer>=0.12.0",
"rich>=13.7.0",
"pandas>=2.2.0",
"pytz>=2024.1",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0",
"ruff>=0.5",
]
[project.scripts]
alpaclaudia = "alpaclaudia.__main__:app"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["alpaclaudia"]
[tool.ruff]
line-length = 100
target-version = "py311"
View File
+111
View File
@@ -0,0 +1,111 @@
from alpaclaudia.config import RiskConfig
from alpaclaudia.risk import (
AccountSnapshot,
check_covered_call,
check_csp,
check_trailing_stop,
)
RISK = RiskConfig(
max_position_pct=0.25,
min_cash_buffer_pct=0.05,
trailing_stop_enabled=True,
trailing_stop_pct=0.08,
)
ACCT = AccountSnapshot(equity=100_000, cash=50_000, buying_power=50_000)
def test_csp_ok():
r = check_csp(
underlying="TSLA",
strike=200.0,
qty=1,
account=ACCT,
existing_short_puts=0,
max_short_puts_per_symbol=1,
risk_cfg=RISK,
)
assert r.ok, r.reason
def test_csp_blocked_by_cash_buffer():
r = check_csp(
underlying="TSLA",
strike=600.0, # 60k collateral, only 50k cash
qty=1,
account=ACCT,
existing_short_puts=0,
max_short_puts_per_symbol=1,
risk_cfg=RISK,
)
assert not r.ok
def test_csp_blocked_by_position_cap():
# 25% of 100k equity = 25k; strike 300 * 100 = 30k collateral
r = check_csp(
underlying="TSLA",
strike=300.0,
qty=1,
account=ACCT,
existing_short_puts=0,
max_short_puts_per_symbol=1,
risk_cfg=RISK,
)
assert not r.ok
assert "cap" in r.reason or "exceeds" in r.reason
def test_csp_blocked_by_concurrency():
r = check_csp(
underlying="TSLA",
strike=200.0,
qty=1,
account=ACCT,
existing_short_puts=1,
max_short_puts_per_symbol=1,
risk_cfg=RISK,
)
assert not r.ok
def test_covered_call_blocks_below_cost_basis():
positions = [
{"symbol": "TSLA", "qty": 200, "avg_entry_price": 250.0, "is_option": False}
]
r = check_covered_call(
underlying="TSLA",
strike=240.0,
qty=1,
positions=positions,
)
assert not r.ok
def test_covered_call_requires_shares():
r = check_covered_call(
underlying="TSLA",
strike=300.0,
qty=1,
positions=[],
)
assert not r.ok
def test_covered_call_ok():
positions = [
{"symbol": "TSLA", "qty": 200, "avg_entry_price": 250.0, "is_option": False}
]
r = check_covered_call(
underlying="TSLA",
strike=275.0,
qty=2,
positions=positions,
)
assert r.ok
def test_trailing_stop_needs_long_position():
r = check_trailing_stop(symbol="TSLA", qty=100, positions=[])
assert not r.ok