API CSBoard спроектирован для автоматизации. Каждый эндпоинт детерминирован, использует курсорную пагинацию и учитывает лимиты запросов, что делает простым создание мониторов цен, ботов для снайпинга, инструментов арбитража и полноценных конвейеров рыночных данных. Это руководство покрывает ключевые паттерны, нужные для построения надёжных автоматизированных систем.
Интеграция с AI-агентом
Если вы создаёте AI-агента или инструмент на базе LLM, самый быстрый способ дать ему доступ ко всей поверхности API — машиночитаемая спецификация по адресу /llms.txt. В этом файле содержится полный API в едином компактном документе — без необходимости парсинга.
Для агентов, поддерживающих Model Context Protocol (MCP), добавьте сервер CSBoard в конфигурацию MCP:
{
"mcpServers": {
"csboard": {
"url": "https://csboard.com/mcp",
"headers": {
"Authorization": "Bearer csb_pub_..."
}
}
}
}
После подключения агент может вызывать любой эндпоинт CSBoard как инструмент — просматривать листинги, проверять цены и размещать ордера — без необходимости писать клеящий код.
Опрос новых листингов
Чтобы обнаруживать новые листинги по мере их появления без пересканирования всего каталога, используйте sort=newest в сочетании с курсорной пагинацией. На каждом цикле опроса пролистывайте страницы, пока не дойдёте до уже виденного ID листинга, и останавливайтесь.
# First poll — fetch the newest 50 listings and record next_cursor
curl "https://csboard.com/v1/listings?sort=newest&limit=50" \
-H "Authorization: Bearer csb_pub_..."
import time
import httpx
API_KEY = "csb_pub_..."
BASE_URL = "https://csboard.com/v1"
SEEN_IDS: set[str] = set()
def poll_new_listings(category: str = "Knife") -> list[dict]:
"""Return all listings added since the last poll."""
new_items = []
cursor = None
while True:
params = {"sort": "newest", "limit": 50, "category": category}
if cursor:
params["cursor"] = cursor
resp = httpx.get(
f"{BASE_URL}/listings",
params=params,
headers={"Authorization": f"Bearer {API_KEY}"},
)
resp.raise_for_status()
data = resp.json()
for item in data["items"]:
if item["id"] in SEEN_IDS:
# Reached a listing we already processed — stop paging
return new_items
SEEN_IDS.add(item["id"])
new_items.append(item)
if data["next_cursor"] is None:
break
cursor = data["next_cursor"]
return new_items
while True:
fresh = poll_new_listings(category="Knife")
if fresh:
print(f"Found {len(fresh)} new listing(s)")
time.sleep(30) # Respect rate limits — poll every 30 seconds
Инициализируйте SEEN_IDS при старте, выполнив один полный опрос без действий по его результатам. Так вы будете запускать действия только для листингов, появившихся после старта вашего бота, а не для всего, что уже было в каталоге.
Бот для мониторинга цен
Опрашивайте GET /v1/prices по расписанию, сравнивайте min_price_usd каждого предмета с вашим порогом и запускайте уведомление или автоматическую покупку, когда цена опускается ниже него.
import httpx
import time
API_KEY = "csb_pub_..."
BASE_URL = "https://csboard.com/v1"
# Items to watch: market_hash_name → max price willing to pay
WATCHLIST = {
"AK-47 | Redline (Field-Tested)": 12.00,
"AWP | Asiimov (Field-Tested)": 55.00,
"Karambit | Fade (Factory New)": 800.00,
}
def check_prices() -> list[dict]:
"""Return watchlist items whose price has dropped to or below threshold."""
alerts = []
for name, threshold in WATCHLIST.items():
resp = httpx.get(
f"{BASE_URL}/prices",
params={"search": name, "max_price": threshold},
headers={"Authorization": f"Bearer {API_KEY}"},
)
resp.raise_for_status()
data = resp.json()
for row in data["items"]:
if row["market_hash_name"] == name and row["min_price_usd"] <= threshold:
alerts.append({
"name": name,
"price": row["min_price_usd"],
"qty": row["qty"],
"threshold": threshold,
})
return alerts
while True:
try:
hits = check_prices()
for alert in hits:
print(
f"ALERT: {alert['name']} @ ${alert['price']:.2f} "
f"(threshold ${alert['threshold']:.2f}, qty {alert['qty']})"
)
# → trigger_buy(alert["name"]) or send_notification(alert)
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
retry_after = int(e.response.headers.get("Retry-After", 60))
print(f"Rate limited — waiting {retry_after}s")
time.sleep(retry_after)
continue
raise
time.sleep(60) # Poll once per minute
min_price_usd из /v1/prices — это индикативный сгруппированный снапшот, он может отставать от живой цены по конкретному предмету. Всегда получайте конкретный листинг из /v1/listings и используйте его price_usd как авторитетную цену перед размещением ордера.
Безопасная автоматизация покупок
Автоматическая покупка требует более защищённого кода, чем ручная. Соблюдайте эти четыре правила безусловно.
1. Всегда отправляйте max_price_usd
Ваш потолок применяется атомарно внутри списания с баланса. Без него скачок цены между получением листинга и исполнением ордера может привести к неожиданному списанию. Установите max_price_usd равным наблюдавшемуся price_usd плюс небольшой буфер, если вы готовы принять небольшое проскальзывание:
listing_price = 14.37
max_price = round(listing_price * 1.02, 2) # Allow up to 2% slippage
2. Всегда используйте Idempotency-Key
Генерируйте один UUID v4 на каждую попытку ордера. Если ваш запрос завершился по таймауту или с сетевой ошибкой, повторите его с тем же ключом — сервер воспроизведёт исходный результат вместо повторного исполнения покупки.
import uuid
import httpx
def place_order(item_ids: list[str], max_price: float) -> dict:
idempotency_key = str(uuid.uuid4()) # One key per attempt
resp = httpx.post(
"https://csboard.com/v1/orders",
headers={
"Authorization": f"Bearer {API_KEY}",
"Idempotency-Key": idempotency_key,
},
json={"item_ids": item_ids, "max_price_usd": max_price},
)
resp.raise_for_status()
return resp.json()
3. Обрабатывайте 429 с учётом Retry-After
Ответы с лимитом запросов содержат заголовок Retry-After (в секундах). Всегда читайте это значение и спите ровно это время — не используйте фиксированный backoff, так как он может оказаться короче или длиннее необходимого.
4. Корректно обрабатывайте 400 price_moved
400 price_moved означает, что цена превысила ваш потолок — списание не было выполнено. Решайте: повторно получить листинг, обновить max_price_usd и повторить, или отказаться от сделки:
def buy_with_retry(item_id: str, max_price: float, retries: int = 3) -> dict | None:
for attempt in range(retries):
try:
return place_order([item_id], max_price)
except httpx.HTTPStatusError as e:
if e.response.status_code == 400 and e.response.json().get("code") == "price_moved":
data = e.response.json()
current = data.get("current_total_usd", max_price)
print(f"Price moved to ${current:.2f}, was ${max_price:.2f}")
# Decide: abort, or raise ceiling and retry
if current <= max_price * 1.05: # Accept up to 5% over
max_price = current
continue
print("Price spike too large — aborting")
return None
if e.response.status_code == 429:
retry_after = int(e.response.headers.get("Retry-After", 60))
time.sleep(retry_after)
continue
raise
return None
Массовый конвейер данных
Для сайтов сравнения, аналитических дашбордов и любых систем, которым нужен полный каталог, используйте эндпоинт снапшота вместо пагинации по /v1/prices.
# Download the full snapshot
curl -s "https://csboard.com/v1/prices/snapshot.ndjson.gz" \
-H "Authorization: Bearer csb_pub_..." \
-o snapshot.ndjson.gz
gunzip -c snapshot.ndjson.gz | wc -l # Count total rows
Используйте ETag, чтобы избежать повторной загрузки неизменённого снапшота. Сохраняйте значение заголовка ETag из каждого ответа 200 и отправляйте его обратно как If-None-Match в следующем запросе. Ответ 304 Not Modified означает, что ваша локальная копия по-прежнему актуальна.
import gzip
import json
import httpx
SNAPSHOT_URL = "https://csboard.com/v1/prices/snapshot.ndjson.gz"
stored_etag: str | None = None
def refresh_snapshot() -> list[dict] | None:
"""Download snapshot only if it changed. Returns rows, or None if unchanged."""
global stored_etag
headers = {"Authorization": f"Bearer {API_KEY}"}
if stored_etag:
headers["If-None-Match"] = stored_etag
resp = httpx.get(SNAPSHOT_URL, headers=headers)
if resp.status_code == 304:
print("Snapshot unchanged — skipping")
return None
resp.raise_for_status()
stored_etag = resp.headers.get("ETag")
rows = []
for line in gzip.decompress(resp.content).splitlines():
if line.strip():
rows.append(json.loads(line))
print(f"Loaded {len(rows)} price rows")
return rows
Эндпоинт снапшота ограничен 1 запросом в минуту. Постройте конвейер так, чтобы использовать его как базовый слой, обновляемый каждые несколько минут, и накладывать сверху обновления в реальном времени из /v1/prices для активно отслеживаемых предметов.
Чек-лист лучших практик
Перед выходом в продакшен убедитесь, что выполнено всё перечисленное ниже:
- ✅ Вы отправляете
max_price_usd в каждом вызове POST /v1/orders
- ✅ Вы генерируете свежий UUID v4
Idempotency-Key для каждой попытки ордера
- ✅ Ваша логика повторов использует тот же
Idempotency-Key при повторе после сетевой ошибки
- ✅ Ваш обработчик
429 читает Retry-After и спит именно столько, а не жёстко заданное значение
- ✅ Ваш обработчик
400 price_moved явно решает: повторить или прервать
- ✅ Вы читаете
price_usd из /v1/listings (а не /v1/prices) перед размещением ордера
- ✅ Вы используете ETag с эндпоинтом снапшота, чтобы избежать лишних загрузок
- ✅ У вашего API-ключа торговля включена только если ваш бот действительно должен покупать
Связанные страницы