Получение данных о торговых парах с биржи Bitget через WebSocket на Python

GetCoder.ru
Изображение статьи

Биржа Bitget предоставляет доступ к торговым данным через API, что позволяет пользователям получать актуальную информацию о ценах и объемах в реальном времени. В этой статье рассмотрим, как реализовать подключение к WebSocket API Bitget с использованием Python.

Биржа Bitget предоставляет доступ к торговым данным через API, что позволяет пользователям получать актуальную информацию о ценах и объемах в реальном времени. В этой статье рассмотрим, как реализовать подключение к WebSocket API Bitget с использованием Python.

Как это работает

  • Инициализация: Создается объект класса Bitget, который сразу запускает процесс подключения;
  • Получение списка пар: Отправляется запрос к REST API для получения списка всех доступных спотовых торговых пар;
  • Установка WebSocket-соединения: После получения списка пар программа подключается к WebSocket-серверу Bitget;
  • Подписка на пары: Формируется сообщение для подписки на обновления цен всех пар, которое отправляется на сервер;
  • Получение данных: WebSocket передает данные о ценах, которые выводятся в консоль;

Код

  • copy
# -*- coding: utf-8 -*- import websocket, requests, json, time class Bitget(): def __init__(self): try: self.start() except Exception as e: print(e) # Список всех торговых пар def symbols_list(self): try: result = [] url = "https://api.bitget.com/api/spot/v1/public/products" response = requests.get(url) if response.status_code == 200: products = response.json() for product in products['data']: result.append(product['symbolName']) except Exception as e: result = [] return result # Данные от сокета def on_message(self, ws, message): data = json.loads(message) print(data) # Выключения сокета def on_error(self, ws, error): ws.close() def on_open(self, ws): print(u'Установлено соединение Web Socket') # Список всех торговых пар symbols = self.symbols_list() # Подписка на тикеры subscribe_message = { "op": "subscribe", "args": [ {"instType":"SPOT","channel":"ticker","instId":pair} for pair in symbols ] } ws.send(json.dumps(subscribe_message)) def start(self): while True: try: wss = 'wss://ws.bitget.com/v2/ws/public' wsa = websocket.WebSocketApp(wss, on_open = self.on_open, on_message = self.on_message, on_error = self.on_error) wsa.run_forever() raise Exception(u'Разрыв соединения Web Socket') except Exception as e: print(str(e)) time.sleep(5) if __name__ == "__main__": Bitget()
Исходный код

Пример вывода

  • copy
Установлено соединение Web Socket {'arg': {'instType': 'SPOT', 'channel': 'ticker', 'instId': 'BTCUSDT'}, 'data': [{'last': '56789.01', 'high24h': '58000', 'low24h': '55000', 'vol24h': '1200.34'}]} {'arg': {'instType': 'SPOT', 'channel': 'ticker', 'instId': 'ETHUSDT'}, 'data': [{'last': '4321.56', 'high24h': '4500', 'low24h': '4200', 'vol24h': '800.12'}]}

Заключение

Этот скрипт демонстрирует, как с помощью WebSocket API и REST API автоматизировать получение данных о криптовалютных торговых парах. Такой подход позволяет оперативно получать актуальные данные о ценах, минимизируя задержки, что особенно полезно для трейдеров и аналитиков.

  • 01.12.2024
  • 82
  • 1

Получение данных о торговых парах с биржи Bitget через WebSocket на Python

Биржа Bitget предоставляет доступ к торговым данным через API, что позволяет пользователям получать актуальную информацию о ценах и объемах в реальном времени. В этой статье рассмотрим, как реализовать подключение к WebSocket API Bitget с использованием Python.

Как это работает

  • Инициализация: Создается объект класса Bitget, который сразу запускает процесс подключения;
  • Получение списка пар: Отправляется запрос к REST API для получения списка всех доступных спотовых торговых пар;
  • Установка WebSocket-соединения: После получения списка пар программа подключается к WebSocket-серверу Bitget;
  • Подписка на пары: Формируется сообщение для подписки на обновления цен всех пар, которое отправляется на сервер;
  • Получение данных: WebSocket передает данные о ценах, которые выводятся в консоль;

Код

  • copy
# -*- coding: utf-8 -*- import websocket, requests, json, time class Bitget(): def __init__(self): try: self.start() except Exception as e: print(e) # Список всех торговых пар def symbols_list(self): try: result = [] url = "https://api.bitget.com/api/spot/v1/public/products" response = requests.get(url) if response.status_code == 200: products = response.json() for product in products['data']: result.append(product['symbolName']) except Exception as e: result = [] return result # Данные от сокета def on_message(self, ws, message): data = json.loads(message) print(data) # Выключения сокета def on_error(self, ws, error): ws.close() def on_open(self, ws): print(u'Установлено соединение Web Socket') # Список всех торговых пар symbols = self.symbols_list() # Подписка на тикеры subscribe_message = { "op": "subscribe", "args": [ {"instType":"SPOT","channel":"ticker","instId":pair} for pair in symbols ] } ws.send(json.dumps(subscribe_message)) def start(self): while True: try: wss = 'wss://ws.bitget.com/v2/ws/public' wsa = websocket.WebSocketApp(wss, on_open = self.on_open, on_message = self.on_message, on_error = self.on_error) wsa.run_forever() raise Exception(u'Разрыв соединения Web Socket') except Exception as e: print(str(e)) time.sleep(5) if __name__ == "__main__": Bitget()
Исходный код

Пример вывода

  • copy
Установлено соединение Web Socket {'arg': {'instType': 'SPOT', 'channel': 'ticker', 'instId': 'BTCUSDT'}, 'data': [{'last': '56789.01', 'high24h': '58000', 'low24h': '55000', 'vol24h': '1200.34'}]} {'arg': {'instType': 'SPOT', 'channel': 'ticker', 'instId': 'ETHUSDT'}, 'data': [{'last': '4321.56', 'high24h': '4500', 'low24h': '4200', 'vol24h': '800.12'}]}

Заключение

Этот скрипт демонстрирует, как с помощью WebSocket API и REST API автоматизировать получение данных о криптовалютных торговых парах. Такой подход позволяет оперативно получать актуальные данные о ценах, минимизируя задержки, что особенно полезно для трейдеров и аналитиков.