> ## Documentation Index
> Fetch the complete documentation index at: https://api.csboard.com/llms.txt
> Use this file to discover all available pages before exploring further.

# 使用 CSBoard API 自动化 CS2 皮肤交易机器人

> 在 CSBoard API 之上构建机器人、价格监控器和数据管道——从轮询新挂单到安全的自动化购买和完整目录摄入。

CSBoard API 为自动化而设计。每个端点都是确定性的、支持游标分页且具备速率限制感知能力，这使得构建价格监控器、抢购机器人、套利工具和完整市场数据管道变得简单。本指南介绍构建可靠自动化系统所需的关键模式。

***

## AI 智能体集成

如果您正在构建一个 AI 智能体或由 LLM 驱动的工具，让它访问完整 API 表面的最快方式是通过 [`/llms.txt`](https://csboard.com/llms.txt) 处的机器可读规范。该文件以单个、紧凑的文档形式包含完整的 API——无需解析。

对于支持模型上下文协议（MCP）的智能体，请将 CSBoard 服务器添加到您的 MCP 配置中：

```json theme={null}
{
  "mcpServers": {
    "csboard": {
      "url": "https://csboard.com/mcp",
      "headers": {
        "Authorization": "Bearer csb_pub_..."
      }
    }
  }
}
```

连接后，智能体可以将任何 CSBoard 端点作为工具进行调用——浏览挂单、查询价格和下单——无需您编写任何粘合代码。

***

## 轮询新挂单

要在新挂单出现时检测到它们而无需重新扫描整个目录，请使用 `sort=newest` 结合游标分页。在每个轮询周期中，翻页直到您遇到已经见过的挂单 ID，然后停止。

```bash theme={null}
# First poll — fetch the newest 50 listings and record next_cursor
curl "https://csboard.com/v1/listings?sort=newest&limit=50" \
  -H "Authorization: Bearer csb_pub_..."
```

```python theme={null}
import time
import httpx

API_KEY = "csb_pub_..."
BASE_URL = "https://csboard.com/v1"
SEEN_IDS: set[str] = set()

def poll_new_listings(category: str = "Knife") -> list[dict]:
    """Return all listings added since the last poll."""
    new_items = []
    cursor = None

    while True:
        params = {"sort": "newest", "limit": 50, "category": category}
        if cursor:
            params["cursor"] = cursor

        resp = httpx.get(
            f"{BASE_URL}/listings",
            params=params,
            headers={"Authorization": f"Bearer {API_KEY}"},
        )
        resp.raise_for_status()
        data = resp.json()

        for item in data["items"]:
            if item["id"] in SEEN_IDS:
                # Reached a listing we already processed — stop paging
                return new_items
            SEEN_IDS.add(item["id"])
            new_items.append(item)

        if data["next_cursor"] is None:
            break
        cursor = data["next_cursor"]

    return new_items

while True:
    fresh = poll_new_listings(category="Knife")
    if fresh:
        print(f"Found {len(fresh)} new listing(s)")
    time.sleep(30)  # Respect rate limits — poll every 30 seconds
```

<Tip>
  在启动时通过运行一次完整轮询（但不对结果采取任何行动）来填充 `SEEN_IDS`。这样，您只会对机器人启动 **之后** 出现的挂单触发动作——而不是对所有已经在线的挂单。
</Tip>

***

## 价格监控机器人

定期轮询 `GET /v1/prices`，将每个物品的 `min_price_usd` 与您的目标阈值进行比较，并在价格降至阈值以下时触发警报或自动购买。

```python theme={null}
import httpx
import time

API_KEY = "csb_pub_..."
BASE_URL = "https://csboard.com/v1"

# Items to watch: market_hash_name → max price willing to pay
WATCHLIST = {
    "AK-47 | Redline (Field-Tested)": 12.00,
    "AWP | Asiimov (Field-Tested)": 55.00,
    "Karambit | Fade (Factory New)": 800.00,
}

def check_prices() -> list[dict]:
    """Return watchlist items whose price has dropped to or below threshold."""
    alerts = []

    for name, threshold in WATCHLIST.items():
        resp = httpx.get(
            f"{BASE_URL}/prices",
            params={"search": name, "max_price": threshold},
            headers={"Authorization": f"Bearer {API_KEY}"},
        )
        resp.raise_for_status()
        data = resp.json()

        for row in data["items"]:
            if row["market_hash_name"] == name and row["min_price_usd"] <= threshold:
                alerts.append({
                    "name": name,
                    "price": row["min_price_usd"],
                    "qty": row["qty"],
                    "threshold": threshold,
                })

    return alerts

while True:
    try:
        hits = check_prices()
        for alert in hits:
            print(
                f"ALERT: {alert['name']} @ ${alert['price']:.2f} "
                f"(threshold ${alert['threshold']:.2f}, qty {alert['qty']})"
            )
            # → trigger_buy(alert["name"]) or send_notification(alert)
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 429:
            retry_after = int(e.response.headers.get("Retry-After", 60))
            print(f"Rate limited — waiting {retry_after}s")
            time.sleep(retry_after)
            continue
        raise

    time.sleep(60)  # Poll once per minute
```

<Note>
  来自 `/v1/prices` 的 `min_price_usd` 是一个分组的指示性快照——它可能滞后于实时的单品价格。在下单之前，请始终从 `/v1/listings` 获取具体挂单，并使用其 `price_usd` 作为权威价格。
</Note>

***

## 安全的购买自动化

自动化购买需要比手动购买更具防御性的代码。请无条件遵循以下四条规则。

**1. 始终发送 `max_price_usd`**

您的价格上限会在余额扣款过程中原子性地强制执行。如果不设置，从您获取挂单到订单执行之间的价格飙升可能会导致意外扣款。将 `max_price_usd` 设置为您观察到的 `price_usd`，如果您愿意承受小幅滑点，可以加上一个小缓冲：

```python theme={null}
listing_price = 14.37
max_price = round(listing_price * 1.02, 2)  # Allow up to 2% slippage
```

**2. 始终使用 Idempotency-Key**

每次订单尝试生成一个 UUID v4。如果您的请求超时或返回网络错误，请使用 **相同的密钥** 重试——服务器会重放原始结果，而不是执行第二次购买。

```python theme={null}
import uuid
import httpx

def place_order(item_ids: list[str], max_price: float) -> dict:
    idempotency_key = str(uuid.uuid4())  # One key per attempt

    resp = httpx.post(
        "https://csboard.com/v1/orders",
        headers={
            "Authorization": f"Bearer {API_KEY}",
            "Idempotency-Key": idempotency_key,
        },
        json={"item_ids": item_ids, "max_price_usd": max_price},
    )
    resp.raise_for_status()
    return resp.json()
```

**3. 处理 `429` 与 Retry-After**

被限流的响应包含一个 `Retry-After` 头部（秒数）。始终读取该值并恰好休眠相应时长——不要使用固定的退避时间，因为它可能比所需时间更短或更长。

**4. 优雅地处理 `400 price_moved`**

`400 price_moved` 意味着价格已超过您的上限——没有任何扣款发生。决定是重新获取挂单、更新 `max_price_usd` 后重试，还是放弃这次机会：

```python theme={null}
def buy_with_retry(item_id: str, max_price: float, retries: int = 3) -> dict | None:
    for attempt in range(retries):
        try:
            return place_order([item_id], max_price)
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 400 and e.response.json().get("code") == "price_moved":
                data = e.response.json()
                current = data.get("current_total_usd", max_price)
                print(f"Price moved to ${current:.2f}, was ${max_price:.2f}")
                # Decide: abort, or raise ceiling and retry
                if current <= max_price * 1.05:  # Accept up to 5% over
                    max_price = current
                    continue
                print("Price spike too large — aborting")
                return None
            if e.response.status_code == 429:
                retry_after = int(e.response.headers.get("Retry-After", 60))
                time.sleep(retry_after)
                continue
            raise
    return None
```

***

## 批量数据管道

对于比价网站、分析仪表板，或任何需要完整目录的系统，请使用快照端点而不是通过 `/v1/prices` 进行分页。

```bash theme={null}
# Download the full snapshot
curl -s "https://csboard.com/v1/prices/snapshot.ndjson.gz" \
  -H "Authorization: Bearer csb_pub_..." \
  -o snapshot.ndjson.gz

gunzip -c snapshot.ndjson.gz | wc -l   # Count total rows
```

使用 ETag 避免重新下载未更改的快照。存储每个 `200` 响应中的 `ETag` 头部值，并在下次请求时作为 `If-None-Match` 发回。`304 Not Modified` 响应意味着您的本地副本仍然是最新的。

```python theme={null}
import gzip
import json
import httpx

SNAPSHOT_URL = "https://csboard.com/v1/prices/snapshot.ndjson.gz"
stored_etag: str | None = None

def refresh_snapshot() -> list[dict] | None:
    """Download snapshot only if it changed. Returns rows, or None if unchanged."""
    global stored_etag

    headers = {"Authorization": f"Bearer {API_KEY}"}
    if stored_etag:
        headers["If-None-Match"] = stored_etag

    resp = httpx.get(SNAPSHOT_URL, headers=headers)

    if resp.status_code == 304:
        print("Snapshot unchanged — skipping")
        return None

    resp.raise_for_status()
    stored_etag = resp.headers.get("ETag")

    rows = []
    for line in gzip.decompress(resp.content).splitlines():
        if line.strip():
            rows.append(json.loads(line))

    print(f"Loaded {len(rows)} price rows")
    return rows
```

<Note>
  快照端点限速为 **每分钟 1 个请求**。请将您的管道设计为以它作为每隔几分钟刷新一次的基础层，并通过 `/v1/prices` 为您正在主动监控的物品叠加实时更新。
</Note>

***

## 最佳实践清单

<Tip>
  **在投入生产之前，请验证以下所有事项：**

  * ✅ 您在每次 `POST /v1/orders` 调用上都发送 `max_price_usd`
  * ✅ 每次订单尝试都生成新的 UUID v4 `Idempotency-Key`
  * ✅ 您的重试逻辑在网络错误重试时复用相同的 `Idempotency-Key`
  * ✅ 您的 `429` 处理器读取并休眠 `Retry-After`，而不是硬编码值
  * ✅ 您的 `400 price_moved` 处理器明确决定是重试还是放弃
  * ✅ 您在下单之前从 `/v1/listings`（而不是 `/v1/prices`）读取 `price_usd`
  * ✅ 您使用 ETag 与快照端点配合以避免冗余下载
  * ✅ 仅当您的机器人确实需要购买时，您的 API 密钥才启用交易
</Tip>

***

## 相关页面

* [市场数据 — 挂单、价格、快照、汇率](/guides/market-data)
* [购买 — 设置、超额收费保护、幂等性](/guides/buying)
* [API 参考 — GET /v1/listings](/api-reference/get-listings)
* [API 参考 — GET /v1/prices](/api-reference/get-prices)
* [API 参考 — POST /v1/orders](/api-reference/post-orders)
* [身份验证](/authentication)
