Получение данных о торговых парах с биржи OKX через WebSocket на Python


Биржа OKX является одной из крупнейших площадок для торговли криптовалютами, предоставляющей доступ к широкому набору инструментов через API. Одним из наиболее эффективных способов получения данных о торговых парах в реальном времени является использование WebSocket API. В данной статье мы рассмотрим, как реализовать такой инструмент на Python.
Биржа OKX является одной из крупнейших площадок для торговли криптовалютами, предоставляющей доступ к широкому набору инструментов через API. Одним из наиболее эффективных способов получения данных о торговых парах в реальном времени является использование WebSocket API. В данной статье мы рассмотрим, как реализовать такой инструмент на Python.
Как это работает
- Инициализация: Создается объект класса Okx, который сразу запускает процесс подключения;
- Получение списка пар: Сначала скрипт делает HTTP-запрос к REST API OKX, чтобы получить актуальный список всех спотовых пар;
- Установка WebSocket-соединения: После получения списка пар программа подключается к WebSocket-серверу OKX;
- Подписка на пары: Скрипт подписывается на обновления цен всех пар через канал tickers;
- Получение данных: По мере поступления данных скрипт обрабатывает их и выводит в консоль информацию о текущих ценах;
Код
# -*- coding: utf-8 -*-
import websocket, requests, json, time
class Okx():
def __init__(self):
try:
self.ticker_socket = {}
self.start()
except Exception as e:
print(e)
# Список всех торговых пар
def symbols_list(self):
try:
result = []
url = "https://www.okx.com/api/v5/market/tickers?instType=SPOT" # Только спотовые пары
response = requests.get(url)
if response.status_code == 200:
products = response.json()
if "data" in products:
for product in products['data']:
result.append(product['instId'])
except Exception as e:
result = []
return result
# Данные от сокета
def on_message(self, ws, message):
data = json.loads(message)
if "data" in data:
for ticker in data["data"]:
pair = ticker["instId"]
last_price = ticker["last"]
print(f"Пара: {pair}, Цена: {last_price}")
# Выключения сокета
def on_error(self, ws, error):
ws.close()
def on_open(self, ws):
print(u'Установлено соединение Web Socket')
pairs = self.symbols_list()
# Формируем сообщение для подписки
subscription_message = {
"op": "subscribe",
"args": [{"channel": "tickers", "instId": pair} for pair in pairs]
}
# Отправляем подписку
ws.send(json.dumps(subscription_message))
def start(self):
while True:
try:
wss = 'wss://ws.okx.com:8443/ws/v5/public'
wsa = websocket.WebSocketApp(wss,
on_open = self.on_open,
on_message = self.on_message,
on_error = self.on_error)
wsa.run_forever()
raise Exception(u'Разрыв соединения Web Socket')
except Exception as e:
print(str(e))
time.sleep(5)
if __name__ == "__main__":
Okx()
Пример вывода
Установлено соединение Web Socket
Пара: BTC-USDT, Цена: 58123.1
Пара: ETH-USDT, Цена: 4321.5
Пара: SOL-USDT, Цена: 189.4
Заключение
Этот скрипт демонстрирует, как с помощью WebSocket API и REST API автоматизировать получение данных о криптовалютных торговых парах. Такой подход позволяет оперативно получать актуальные данные о ценах, минимизируя задержки, что особенно полезно для трейдеров и аналитиков.
- 30.11.2024
- 105
- 0
Получение данных о торговых парах с биржи OKX через WebSocket на Python
Биржа OKX является одной из крупнейших площадок для торговли криптовалютами, предоставляющей доступ к широкому набору инструментов через API. Одним из наиболее эффективных способов получения данных о торговых парах в реальном времени является использование WebSocket API. В данной статье мы рассмотрим, как реализовать такой инструмент на Python.
Как это работает
- Инициализация: Создается объект класса Okx, который сразу запускает процесс подключения;
- Получение списка пар: Сначала скрипт делает HTTP-запрос к REST API OKX, чтобы получить актуальный список всех спотовых пар;
- Установка WebSocket-соединения: После получения списка пар программа подключается к WebSocket-серверу OKX;
- Подписка на пары: Скрипт подписывается на обновления цен всех пар через канал tickers;
- Получение данных: По мере поступления данных скрипт обрабатывает их и выводит в консоль информацию о текущих ценах;
Код
# -*- coding: utf-8 -*-
import websocket, requests, json, time
class Okx():
def __init__(self):
try:
self.ticker_socket = {}
self.start()
except Exception as e:
print(e)
# Список всех торговых пар
def symbols_list(self):
try:
result = []
url = "https://www.okx.com/api/v5/market/tickers?instType=SPOT" # Только спотовые пары
response = requests.get(url)
if response.status_code == 200:
products = response.json()
if "data" in products:
for product in products['data']:
result.append(product['instId'])
except Exception as e:
result = []
return result
# Данные от сокета
def on_message(self, ws, message):
data = json.loads(message)
if "data" in data:
for ticker in data["data"]:
pair = ticker["instId"]
last_price = ticker["last"]
print(f"Пара: {pair}, Цена: {last_price}")
# Выключения сокета
def on_error(self, ws, error):
ws.close()
def on_open(self, ws):
print(u'Установлено соединение Web Socket')
pairs = self.symbols_list()
# Формируем сообщение для подписки
subscription_message = {
"op": "subscribe",
"args": [{"channel": "tickers", "instId": pair} for pair in pairs]
}
# Отправляем подписку
ws.send(json.dumps(subscription_message))
def start(self):
while True:
try:
wss = 'wss://ws.okx.com:8443/ws/v5/public'
wsa = websocket.WebSocketApp(wss,
on_open = self.on_open,
on_message = self.on_message,
on_error = self.on_error)
wsa.run_forever()
raise Exception(u'Разрыв соединения Web Socket')
except Exception as e:
print(str(e))
time.sleep(5)
if __name__ == "__main__":
Okx()
Пример вывода
Установлено соединение Web Socket
Пара: BTC-USDT, Цена: 58123.1
Пара: ETH-USDT, Цена: 4321.5
Пара: SOL-USDT, Цена: 189.4
Заключение
Этот скрипт демонстрирует, как с помощью WebSocket API и REST API автоматизировать получение данных о криптовалютных торговых парах. Такой подход позволяет оперативно получать актуальные данные о ценах, минимизируя задержки, что особенно полезно для трейдеров и аналитиков.