Перейти к основному содержанию
API CSBoard спроектирован для автоматизации. Каждый эндпоинт детерминирован, использует курсорную пагинацию и учитывает лимиты запросов, что делает простым создание мониторов цен, ботов для снайпинга, инструментов арбитража и полноценных конвейеров рыночных данных. Это руководство покрывает ключевые паттерны, нужные для построения надёжных автоматизированных систем.

Интеграция с AI-агентом

Если вы создаёте AI-агента или инструмент на базе LLM, самый быстрый способ дать ему доступ ко всей поверхности API — машиночитаемая спецификация по адресу /llms.txt. В этом файле содержится полный API в едином компактном документе — без необходимости парсинга. Для агентов, поддерживающих Model Context Protocol (MCP), добавьте сервер CSBoard в конфигурацию MCP:
{
  "mcpServers": {
    "csboard": {
      "url": "https://csboard.com/mcp",
      "headers": {
        "Authorization": "Bearer csb_pub_..."
      }
    }
  }
}
После подключения агент может вызывать любой эндпоинт CSBoard как инструмент — просматривать листинги, проверять цены и размещать ордера — без необходимости писать клеящий код.

Опрос новых листингов

Чтобы обнаруживать новые листинги по мере их появления без пересканирования всего каталога, используйте sort=newest в сочетании с курсорной пагинацией. На каждом цикле опроса пролистывайте страницы, пока не дойдёте до уже виденного ID листинга, и останавливайтесь.
# First poll — fetch the newest 50 listings and record next_cursor
curl "https://csboard.com/v1/listings?sort=newest&limit=50" \
  -H "Authorization: Bearer csb_pub_..."
import time
import httpx

API_KEY = "csb_pub_..."
BASE_URL = "https://csboard.com/v1"
SEEN_IDS: set[str] = set()

def poll_new_listings(category: str = "Knife") -> list[dict]:
    """Return all listings added since the last poll."""
    new_items = []
    cursor = None

    while True:
        params = {"sort": "newest", "limit": 50, "category": category}
        if cursor:
            params["cursor"] = cursor

        resp = httpx.get(
            f"{BASE_URL}/listings",
            params=params,
            headers={"Authorization": f"Bearer {API_KEY}"},
        )
        resp.raise_for_status()
        data = resp.json()

        for item in data["items"]:
            if item["id"] in SEEN_IDS:
                # Reached a listing we already processed — stop paging
                return new_items
            SEEN_IDS.add(item["id"])
            new_items.append(item)

        if data["next_cursor"] is None:
            break
        cursor = data["next_cursor"]

    return new_items

while True:
    fresh = poll_new_listings(category="Knife")
    if fresh:
        print(f"Found {len(fresh)} new listing(s)")
    time.sleep(30)  # Respect rate limits — poll every 30 seconds
Инициализируйте SEEN_IDS при старте, выполнив один полный опрос без действий по его результатам. Так вы будете запускать действия только для листингов, появившихся после старта вашего бота, а не для всего, что уже было в каталоге.

Бот для мониторинга цен

Опрашивайте GET /v1/prices по расписанию, сравнивайте min_price_usd каждого предмета с вашим порогом и запускайте уведомление или автоматическую покупку, когда цена опускается ниже него.
import httpx
import time

API_KEY = "csb_pub_..."
BASE_URL = "https://csboard.com/v1"

# Items to watch: market_hash_name → max price willing to pay
WATCHLIST = {
    "AK-47 | Redline (Field-Tested)": 12.00,
    "AWP | Asiimov (Field-Tested)": 55.00,
    "Karambit | Fade (Factory New)": 800.00,
}

def check_prices() -> list[dict]:
    """Return watchlist items whose price has dropped to or below threshold."""
    alerts = []

    for name, threshold in WATCHLIST.items():
        resp = httpx.get(
            f"{BASE_URL}/prices",
            params={"search": name, "max_price": threshold},
            headers={"Authorization": f"Bearer {API_KEY}"},
        )
        resp.raise_for_status()
        data = resp.json()

        for row in data["items"]:
            if row["market_hash_name"] == name and row["min_price_usd"] <= threshold:
                alerts.append({
                    "name": name,
                    "price": row["min_price_usd"],
                    "qty": row["qty"],
                    "threshold": threshold,
                })

    return alerts

while True:
    try:
        hits = check_prices()
        for alert in hits:
            print(
                f"ALERT: {alert['name']} @ ${alert['price']:.2f} "
                f"(threshold ${alert['threshold']:.2f}, qty {alert['qty']})"
            )
            # → trigger_buy(alert["name"]) or send_notification(alert)
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 429:
            retry_after = int(e.response.headers.get("Retry-After", 60))
            print(f"Rate limited — waiting {retry_after}s")
            time.sleep(retry_after)
            continue
        raise

    time.sleep(60)  # Poll once per minute
min_price_usd из /v1/prices — это индикативный сгруппированный снапшот, он может отставать от живой цены по конкретному предмету. Всегда получайте конкретный листинг из /v1/listings и используйте его price_usd как авторитетную цену перед размещением ордера.

Безопасная автоматизация покупок

Автоматическая покупка требует более защищённого кода, чем ручная. Соблюдайте эти четыре правила безусловно. 1. Всегда отправляйте max_price_usd Ваш потолок применяется атомарно внутри списания с баланса. Без него скачок цены между получением листинга и исполнением ордера может привести к неожиданному списанию. Установите max_price_usd равным наблюдавшемуся price_usd плюс небольшой буфер, если вы готовы принять небольшое проскальзывание:
listing_price = 14.37
max_price = round(listing_price * 1.02, 2)  # Allow up to 2% slippage
2. Всегда используйте Idempotency-Key Генерируйте один UUID v4 на каждую попытку ордера. Если ваш запрос завершился по таймауту или с сетевой ошибкой, повторите его с тем же ключом — сервер воспроизведёт исходный результат вместо повторного исполнения покупки.
import uuid
import httpx

def place_order(item_ids: list[str], max_price: float) -> dict:
    idempotency_key = str(uuid.uuid4())  # One key per attempt

    resp = httpx.post(
        "https://csboard.com/v1/orders",
        headers={
            "Authorization": f"Bearer {API_KEY}",
            "Idempotency-Key": idempotency_key,
        },
        json={"item_ids": item_ids, "max_price_usd": max_price},
    )
    resp.raise_for_status()
    return resp.json()
3. Обрабатывайте 429 с учётом Retry-After Ответы с лимитом запросов содержат заголовок Retry-After (в секундах). Всегда читайте это значение и спите ровно это время — не используйте фиксированный backoff, так как он может оказаться короче или длиннее необходимого. 4. Корректно обрабатывайте 400 price_moved 400 price_moved означает, что цена превысила ваш потолок — списание не было выполнено. Решайте: повторно получить листинг, обновить max_price_usd и повторить, или отказаться от сделки:
def buy_with_retry(item_id: str, max_price: float, retries: int = 3) -> dict | None:
    for attempt in range(retries):
        try:
            return place_order([item_id], max_price)
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 400 and e.response.json().get("code") == "price_moved":
                data = e.response.json()
                current = data.get("current_total_usd", max_price)
                print(f"Price moved to ${current:.2f}, was ${max_price:.2f}")
                # Decide: abort, or raise ceiling and retry
                if current <= max_price * 1.05:  # Accept up to 5% over
                    max_price = current
                    continue
                print("Price spike too large — aborting")
                return None
            if e.response.status_code == 429:
                retry_after = int(e.response.headers.get("Retry-After", 60))
                time.sleep(retry_after)
                continue
            raise
    return None

Массовый конвейер данных

Для сайтов сравнения, аналитических дашбордов и любых систем, которым нужен полный каталог, используйте эндпоинт снапшота вместо пагинации по /v1/prices.
# Download the full snapshot
curl -s "https://csboard.com/v1/prices/snapshot.ndjson.gz" \
  -H "Authorization: Bearer csb_pub_..." \
  -o snapshot.ndjson.gz

gunzip -c snapshot.ndjson.gz | wc -l   # Count total rows
Используйте ETag, чтобы избежать повторной загрузки неизменённого снапшота. Сохраняйте значение заголовка ETag из каждого ответа 200 и отправляйте его обратно как If-None-Match в следующем запросе. Ответ 304 Not Modified означает, что ваша локальная копия по-прежнему актуальна.
import gzip
import json
import httpx

SNAPSHOT_URL = "https://csboard.com/v1/prices/snapshot.ndjson.gz"
stored_etag: str | None = None

def refresh_snapshot() -> list[dict] | None:
    """Download snapshot only if it changed. Returns rows, or None if unchanged."""
    global stored_etag

    headers = {"Authorization": f"Bearer {API_KEY}"}
    if stored_etag:
        headers["If-None-Match"] = stored_etag

    resp = httpx.get(SNAPSHOT_URL, headers=headers)

    if resp.status_code == 304:
        print("Snapshot unchanged — skipping")
        return None

    resp.raise_for_status()
    stored_etag = resp.headers.get("ETag")

    rows = []
    for line in gzip.decompress(resp.content).splitlines():
        if line.strip():
            rows.append(json.loads(line))

    print(f"Loaded {len(rows)} price rows")
    return rows
Эндпоинт снапшота ограничен 1 запросом в минуту. Постройте конвейер так, чтобы использовать его как базовый слой, обновляемый каждые несколько минут, и накладывать сверху обновления в реальном времени из /v1/prices для активно отслеживаемых предметов.

Чек-лист лучших практик

Перед выходом в продакшен убедитесь, что выполнено всё перечисленное ниже:
  • ✅ Вы отправляете max_price_usd в каждом вызове POST /v1/orders
  • ✅ Вы генерируете свежий UUID v4 Idempotency-Key для каждой попытки ордера
  • ✅ Ваша логика повторов использует тот же Idempotency-Key при повторе после сетевой ошибки
  • ✅ Ваш обработчик 429 читает Retry-After и спит именно столько, а не жёстко заданное значение
  • ✅ Ваш обработчик 400 price_moved явно решает: повторить или прервать
  • ✅ Вы читаете price_usd из /v1/listings (а не /v1/prices) перед размещением ордера
  • ✅ Вы используете ETag с эндпоинтом снапшота, чтобы избежать лишних загрузок
  • ✅ У вашего API-ключа торговля включена только если ваш бот действительно должен покупать

Связанные страницы