Получение данных о торговых парах с биржи Coinbase через WebSocket на Python

GetCoder.ru
Изображение статьи

Работа с данными криптовалютных бирж, таких как Coinbase, является ключевым аспектом разработки алгоритмов для трейдинга или анализа рынка. В этой статье рассмотрим пример использования Python для подключения к WebSocket Coinbase и получения в реальном времени данных о всех доступных торговых парах.

Работа с данными криптовалютных бирж, таких как Coinbase, является ключевым аспектом разработки алгоритмов для трейдинга или анализа рынка. В этой статье рассмотрим пример использования Python для подключения к WebSocket Coinbase и получения в реальном времени данных о всех доступных торговых парах.

Как это работает

  • Получение списка торговых пар: API Coinbase предоставляет REST-эндпоинт для получения информации о всех доступных торговых парах. Мы используем запрос к https://api.exchange.coinbase.com/products, чтобы собрать список идентификаторов пар;
  • Подключение к WebSocket: Биржа Coinbase предоставляет WebSocket API для получения данных о ценах и объеме торговли. Соединение устанавливается через wss://ws-feed.exchange.coinbase.com
  • Подписка на данные тикеров: После установления WebSocket-соединения, отправляется сообщение для подписки на канал ticker. Это позволяет получать обновления о ценах для всех торговых пар.

Код

  • copy
# -*- coding: utf-8 -*- import websocket, requests, json, time class Coinbase(): def __init__(self): try: self.mTickers = {} self.ticker_socket = {} self.start() except Exception as e: print(e) # Список всех торговых пар def symbols_list(self): try: result = [] url = "https://api.exchange.coinbase.com/products" response = requests.get(url) if response.status_code == 200: products = response.json() for product in products: result.append(product['id']) except Exception as e: result = [] return result # Данные от сокета def on_message(self, ws, message): data = json.loads(message) print(data) # Выключения сокета def on_error(self, ws, error): ws.close() def on_open(self, ws): print(u'Установлено соединение Web Socket') # Список всех торговых пар symbols = self.symbols_list() # Подписка на тикеры subscribe_message = { "type": "subscribe", "channels": [ { "name": "ticker", "product_ids": symbols } ] } ws.send(json.dumps(subscribe_message)) def start(self): while True: try: wss = 'wss://ws-feed.exchange.coinbase.com' wsa = websocket.WebSocketApp(wss, on_open = self.on_open, on_message = self.on_message, on_error = self.on_error) wsa.run_forever() raise Exception(u'Разрыв соединения Web Socket') except Exception as e: print(str(e)) time.sleep(5) if __name__ == "__main__": Coinbase()
Исходный код

Заключение

Этот подход позволяет автоматизировать получение рыночных данных с биржи Coinbase и использовать их для аналитики, алгоритмической торговли или других целей.

  • 28.11.2024
  • 23
  • 0

Получение данных о торговых парах с биржи Coinbase через WebSocket на Python

Работа с данными криптовалютных бирж, таких как Coinbase, является ключевым аспектом разработки алгоритмов для трейдинга или анализа рынка. В этой статье рассмотрим пример использования Python для подключения к WebSocket Coinbase и получения в реальном времени данных о всех доступных торговых парах.

Как это работает

  • Получение списка торговых пар: API Coinbase предоставляет REST-эндпоинт для получения информации о всех доступных торговых парах. Мы используем запрос к https://api.exchange.coinbase.com/products, чтобы собрать список идентификаторов пар;
  • Подключение к WebSocket: Биржа Coinbase предоставляет WebSocket API для получения данных о ценах и объеме торговли. Соединение устанавливается через wss://ws-feed.exchange.coinbase.com
  • Подписка на данные тикеров: После установления WebSocket-соединения, отправляется сообщение для подписки на канал ticker. Это позволяет получать обновления о ценах для всех торговых пар.

Код

  • copy
# -*- coding: utf-8 -*- import websocket, requests, json, time class Coinbase(): def __init__(self): try: self.mTickers = {} self.ticker_socket = {} self.start() except Exception as e: print(e) # Список всех торговых пар def symbols_list(self): try: result = [] url = "https://api.exchange.coinbase.com/products" response = requests.get(url) if response.status_code == 200: products = response.json() for product in products: result.append(product['id']) except Exception as e: result = [] return result # Данные от сокета def on_message(self, ws, message): data = json.loads(message) print(data) # Выключения сокета def on_error(self, ws, error): ws.close() def on_open(self, ws): print(u'Установлено соединение Web Socket') # Список всех торговых пар symbols = self.symbols_list() # Подписка на тикеры subscribe_message = { "type": "subscribe", "channels": [ { "name": "ticker", "product_ids": symbols } ] } ws.send(json.dumps(subscribe_message)) def start(self): while True: try: wss = 'wss://ws-feed.exchange.coinbase.com' wsa = websocket.WebSocketApp(wss, on_open = self.on_open, on_message = self.on_message, on_error = self.on_error) wsa.run_forever() raise Exception(u'Разрыв соединения Web Socket') except Exception as e: print(str(e)) time.sleep(5) if __name__ == "__main__": Coinbase()
Исходный код

Заключение

Этот подход позволяет автоматизировать получение рыночных данных с биржи Coinbase и использовать их для аналитики, алгоритмической торговли или других целей.