Получение данных о торговых парах с биржи Coinbase через WebSocket на Python


Работа с данными криптовалютных бирж, таких как Coinbase, является ключевым аспектом разработки алгоритмов для трейдинга или анализа рынка. В этой статье рассмотрим пример использования Python для подключения к WebSocket Coinbase и получения в реальном времени данных о всех доступных торговых парах.
Работа с данными криптовалютных бирж, таких как Coinbase, является ключевым аспектом разработки алгоритмов для трейдинга или анализа рынка. В этой статье рассмотрим пример использования Python для подключения к WebSocket Coinbase и получения в реальном времени данных о всех доступных торговых парах.
Как это работает
- Получение списка торговых пар: API Coinbase предоставляет REST-эндпоинт для получения информации о всех доступных торговых парах. Мы используем запрос к https://api.exchange.coinbase.com/products, чтобы собрать список идентификаторов пар;
- Подключение к WebSocket: Биржа Coinbase предоставляет WebSocket API для получения данных о ценах и объеме торговли. Соединение устанавливается через wss://ws-feed.exchange.coinbase.com
- Подписка на данные тикеров: После установления WebSocket-соединения, отправляется сообщение для подписки на канал ticker. Это позволяет получать обновления о ценах для всех торговых пар.
Код
# -*- coding: utf-8 -*-
import websocket, requests, json, time
class Coinbase():
def __init__(self):
try:
self.mTickers = {}
self.ticker_socket = {}
self.start()
except Exception as e:
print(e)
# Список всех торговых пар
def symbols_list(self):
try:
result = []
url = "https://api.exchange.coinbase.com/products"
response = requests.get(url)
if response.status_code == 200:
products = response.json()
for product in products:
result.append(product['id'])
except Exception as e:
result = []
return result
# Данные от сокета
def on_message(self, ws, message):
data = json.loads(message)
print(data)
# Выключения сокета
def on_error(self, ws, error):
ws.close()
def on_open(self, ws):
print(u'Установлено соединение Web Socket')
# Список всех торговых пар
symbols = self.symbols_list()
# Подписка на тикеры
subscribe_message = {
"type": "subscribe",
"channels": [
{
"name": "ticker",
"product_ids": symbols
}
]
}
ws.send(json.dumps(subscribe_message))
def start(self):
while True:
try:
wss = 'wss://ws-feed.exchange.coinbase.com'
wsa = websocket.WebSocketApp(wss,
on_open = self.on_open,
on_message = self.on_message,
on_error = self.on_error)
wsa.run_forever()
raise Exception(u'Разрыв соединения Web Socket')
except Exception as e:
print(str(e))
time.sleep(5)
if __name__ == "__main__":
Coinbase()
Заключение
Этот подход позволяет автоматизировать получение рыночных данных с биржи Coinbase и использовать их для аналитики, алгоритмической торговли или других целей.
- 28.11.2024
- 23
- 0
Получение данных о торговых парах с биржи Coinbase через WebSocket на Python
Работа с данными криптовалютных бирж, таких как Coinbase, является ключевым аспектом разработки алгоритмов для трейдинга или анализа рынка. В этой статье рассмотрим пример использования Python для подключения к WebSocket Coinbase и получения в реальном времени данных о всех доступных торговых парах.
Как это работает
- Получение списка торговых пар: API Coinbase предоставляет REST-эндпоинт для получения информации о всех доступных торговых парах. Мы используем запрос к https://api.exchange.coinbase.com/products, чтобы собрать список идентификаторов пар;
- Подключение к WebSocket: Биржа Coinbase предоставляет WebSocket API для получения данных о ценах и объеме торговли. Соединение устанавливается через wss://ws-feed.exchange.coinbase.com
- Подписка на данные тикеров: После установления WebSocket-соединения, отправляется сообщение для подписки на канал ticker. Это позволяет получать обновления о ценах для всех торговых пар.
Код
# -*- coding: utf-8 -*-
import websocket, requests, json, time
class Coinbase():
def __init__(self):
try:
self.mTickers = {}
self.ticker_socket = {}
self.start()
except Exception as e:
print(e)
# Список всех торговых пар
def symbols_list(self):
try:
result = []
url = "https://api.exchange.coinbase.com/products"
response = requests.get(url)
if response.status_code == 200:
products = response.json()
for product in products:
result.append(product['id'])
except Exception as e:
result = []
return result
# Данные от сокета
def on_message(self, ws, message):
data = json.loads(message)
print(data)
# Выключения сокета
def on_error(self, ws, error):
ws.close()
def on_open(self, ws):
print(u'Установлено соединение Web Socket')
# Список всех торговых пар
symbols = self.symbols_list()
# Подписка на тикеры
subscribe_message = {
"type": "subscribe",
"channels": [
{
"name": "ticker",
"product_ids": symbols
}
]
}
ws.send(json.dumps(subscribe_message))
def start(self):
while True:
try:
wss = 'wss://ws-feed.exchange.coinbase.com'
wsa = websocket.WebSocketApp(wss,
on_open = self.on_open,
on_message = self.on_message,
on_error = self.on_error)
wsa.run_forever()
raise Exception(u'Разрыв соединения Web Socket')
except Exception as e:
print(str(e))
time.sleep(5)
if __name__ == "__main__":
Coinbase()
Заключение
Этот подход позволяет автоматизировать получение рыночных данных с биржи Coinbase и использовать их для аналитики, алгоритмической торговли или других целей.