Получение данных о торговых парах с биржи Bitget через WebSocket на Python


Биржа Bitget предоставляет доступ к торговым данным через API, что позволяет пользователям получать актуальную информацию о ценах и объемах в реальном времени. В этой статье рассмотрим, как реализовать подключение к WebSocket API Bitget с использованием Python.
Биржа Bitget предоставляет доступ к торговым данным через API, что позволяет пользователям получать актуальную информацию о ценах и объемах в реальном времени. В этой статье рассмотрим, как реализовать подключение к WebSocket API Bitget с использованием Python.
Как это работает
- Инициализация: Создается объект класса Bitget, который сразу запускает процесс подключения;
- Получение списка пар: Отправляется запрос к REST API для получения списка всех доступных спотовых торговых пар;
- Установка WebSocket-соединения: После получения списка пар программа подключается к WebSocket-серверу Bitget;
- Подписка на пары: Формируется сообщение для подписки на обновления цен всех пар, которое отправляется на сервер;
- Получение данных: WebSocket передает данные о ценах, которые выводятся в консоль;
Код
# -*- coding: utf-8 -*-
import websocket, requests, json, time
class Bitget():
def __init__(self):
try:
self.start()
except Exception as e:
print(e)
# Список всех торговых пар
def symbols_list(self):
try:
result = []
url = "https://api.bitget.com/api/spot/v1/public/products"
response = requests.get(url)
if response.status_code == 200:
products = response.json()
for product in products['data']:
result.append(product['symbolName'])
except Exception as e:
result = []
return result
# Данные от сокета
def on_message(self, ws, message):
data = json.loads(message)
print(data)
# Выключения сокета
def on_error(self, ws, error):
ws.close()
def on_open(self, ws):
print(u'Установлено соединение Web Socket')
# Список всех торговых пар
symbols = self.symbols_list()
# Подписка на тикеры
subscribe_message = {
"op": "subscribe",
"args": [
{"instType":"SPOT","channel":"ticker","instId":pair} for pair in symbols
]
}
ws.send(json.dumps(subscribe_message))
def start(self):
while True:
try:
wss = 'wss://ws.bitget.com/v2/ws/public'
wsa = websocket.WebSocketApp(wss,
on_open = self.on_open,
on_message = self.on_message,
on_error = self.on_error)
wsa.run_forever()
raise Exception(u'Разрыв соединения Web Socket')
except Exception as e:
print(str(e))
time.sleep(5)
if __name__ == "__main__":
Bitget()
Пример вывода
Установлено соединение Web Socket
{'arg': {'instType': 'SPOT', 'channel': 'ticker', 'instId': 'BTCUSDT'}, 'data': [{'last': '56789.01', 'high24h': '58000', 'low24h': '55000', 'vol24h': '1200.34'}]}
{'arg': {'instType': 'SPOT', 'channel': 'ticker', 'instId': 'ETHUSDT'}, 'data': [{'last': '4321.56', 'high24h': '4500', 'low24h': '4200', 'vol24h': '800.12'}]}
Заключение
Этот скрипт демонстрирует, как с помощью WebSocket API и REST API автоматизировать получение данных о криптовалютных торговых парах. Такой подход позволяет оперативно получать актуальные данные о ценах, минимизируя задержки, что особенно полезно для трейдеров и аналитиков.
- 01.12.2024
- 82
- 1
Получение данных о торговых парах с биржи Bitget через WebSocket на Python
Биржа Bitget предоставляет доступ к торговым данным через API, что позволяет пользователям получать актуальную информацию о ценах и объемах в реальном времени. В этой статье рассмотрим, как реализовать подключение к WebSocket API Bitget с использованием Python.
Как это работает
- Инициализация: Создается объект класса Bitget, который сразу запускает процесс подключения;
- Получение списка пар: Отправляется запрос к REST API для получения списка всех доступных спотовых торговых пар;
- Установка WebSocket-соединения: После получения списка пар программа подключается к WebSocket-серверу Bitget;
- Подписка на пары: Формируется сообщение для подписки на обновления цен всех пар, которое отправляется на сервер;
- Получение данных: WebSocket передает данные о ценах, которые выводятся в консоль;
Код
# -*- coding: utf-8 -*-
import websocket, requests, json, time
class Bitget():
def __init__(self):
try:
self.start()
except Exception as e:
print(e)
# Список всех торговых пар
def symbols_list(self):
try:
result = []
url = "https://api.bitget.com/api/spot/v1/public/products"
response = requests.get(url)
if response.status_code == 200:
products = response.json()
for product in products['data']:
result.append(product['symbolName'])
except Exception as e:
result = []
return result
# Данные от сокета
def on_message(self, ws, message):
data = json.loads(message)
print(data)
# Выключения сокета
def on_error(self, ws, error):
ws.close()
def on_open(self, ws):
print(u'Установлено соединение Web Socket')
# Список всех торговых пар
symbols = self.symbols_list()
# Подписка на тикеры
subscribe_message = {
"op": "subscribe",
"args": [
{"instType":"SPOT","channel":"ticker","instId":pair} for pair in symbols
]
}
ws.send(json.dumps(subscribe_message))
def start(self):
while True:
try:
wss = 'wss://ws.bitget.com/v2/ws/public'
wsa = websocket.WebSocketApp(wss,
on_open = self.on_open,
on_message = self.on_message,
on_error = self.on_error)
wsa.run_forever()
raise Exception(u'Разрыв соединения Web Socket')
except Exception as e:
print(str(e))
time.sleep(5)
if __name__ == "__main__":
Bitget()
Пример вывода
Установлено соединение Web Socket
{'arg': {'instType': 'SPOT', 'channel': 'ticker', 'instId': 'BTCUSDT'}, 'data': [{'last': '56789.01', 'high24h': '58000', 'low24h': '55000', 'vol24h': '1200.34'}]}
{'arg': {'instType': 'SPOT', 'channel': 'ticker', 'instId': 'ETHUSDT'}, 'data': [{'last': '4321.56', 'high24h': '4500', 'low24h': '4200', 'vol24h': '800.12'}]}
Заключение
Этот скрипт демонстрирует, как с помощью WebSocket API и REST API автоматизировать получение данных о криптовалютных торговых парах. Такой подход позволяет оперативно получать актуальные данные о ценах, минимизируя задержки, что особенно полезно для трейдеров и аналитиков.