31 KiB
31 KiB
WebSocket API LogBoard+
Автор: Сергей Антропов
Сайт: https://devops.org.ru
Содержание
Обзор WebSocket API
WebSocket API LogBoard+ предоставляет возможность получения логов Docker контейнеров в реальном времени. Это позволяет создавать интерактивные приложения для мониторинга и отладки микросервисов.
Преимущества WebSocket
- Real-time данные - мгновенное получение новых логов
- Эффективность - меньше накладных расходов по сравнению с polling
- Масштабируемость - поддержка множественных соединений
- Фильтрация - получение логов только нужных контейнеров
Поддерживаемые протоколы
- ws:// - для HTTP соединений
- wss:// - для HTTPS соединений (рекомендуется для продакшена)
Аутентификация
Все WebSocket endpoints требуют JWT токен для аутентификации. Токен передается как параметр запроса token
.
Получение токена
# Получение JWT токена через REST API
curl -X POST "http://localhost:9001/api/auth/login" \
-H "Content-Type: application/json" \
-d '{"username":"admin","password":"your-password"}'
Использование токена
const token = "your-jwt-token";
const ws = new WebSocket(`ws://localhost:9001/api/websocket/logs/container-id?token=${token}`);
Endpoints
ws://host:port/api/websocket/logs/{container_id}
Получение логов отдельного контейнера в реальном времени.
Параметры
Параметр | Тип | Обязательный | Описание |
---|---|---|---|
container_id |
string | Да | ID контейнера (первые 12 символов) |
token |
string | Да | JWT токен аутентификации |
tail |
number | Нет | Количество начальных строк (по умолчанию 500) |
service |
string | Нет | Имя сервиса (для информации) |
project |
string | Нет | Имя проекта (для информации) |
Пример подключения
const containerId = "abc123def456";
const token = "your-jwt-token";
const ws = new WebSocket(`ws://localhost:9001/api/websocket/logs/${containerId}?token=${token}&tail=100`);
ws.onopen = function() {
console.log('WebSocket соединение установлено');
};
ws.onmessage = function(event) {
console.log('Получены логи:', event.data);
};
ws.onerror = function(error) {
console.error('WebSocket ошибка:', error);
};
ws.onclose = function(event) {
console.log('WebSocket соединение закрыто:', event.code, event.reason);
};
Формат сообщений
При подключении сервер отправляет приветственное сообщение:
Connected to container: myproject_web_1
Затем отправляются логи в формате:
2024-01-15T10:30:15.123456789Z 2024/01/15 10:30:15 [notice] 1#1: start worker processes
2024-01-15T10:30:15.123456789Z 2024/01/15 10:30:15 [notice] 1#1: start worker process 1234
ws://host:port/api/websocket/fan/{service_name}
Получение логов всех реплик сервиса Docker Compose (fan-in).
Параметры
Параметр | Тип | Обязательный | Описание |
---|---|---|---|
service_name |
string | Да | Имя сервиса Docker Compose |
token |
string | Да | JWT токен аутентификации |
tail |
number | Нет | Количество начальных строк (по умолчанию 500) |
project |
string | Нет | Имя проекта (для фильтрации) |
Пример подключения
const serviceName = "web";
const token = "your-jwt-token";
const ws = new WebSocket(`ws://localhost:9001/api/websocket/fan/${serviceName}?token=${token}&tail=100`);
ws.onmessage = function(event) {
// Логи приходят с префиксом ID контейнера
console.log('Логи сервиса:', event.data);
};
Формат сообщений
Логи приходят с префиксом короткого ID контейнера:
[abc123de] 2024-01-15T10:30:15.123456789Z 2024/01/15 10:30:15 [notice] 1#1: start worker processes
[def456gh] 2024-01-15T10:30:16.123456789Z 2024/01/15 10:30:16 [notice] 1#1: start worker processes
ws://host:port/api/websocket/fan_group
Получение логов группы сервисов одновременно.
Параметры
Параметр | Тип | Обязательный | Описание |
---|---|---|---|
services |
string | Да | Имена сервисов через запятую |
token |
string | Да | JWT токен аутентификации |
tail |
number | Нет | Количество начальных строк (по умолчанию 500) |
project |
string | Нет | Имя проекта (для фильтрации) |
Пример подключения
const services = "web,db,redis";
const token = "your-jwt-token";
const ws = new WebSocket(`ws://localhost:9001/api/websocket/fan_group?services=${services}&token=${token}&tail=100`);
ws.onmessage = function(event) {
// Логи приходят с префиксом ID контейнера и имени сервиса
console.log('Логи группы сервисов:', event.data);
};
Формат сообщений
Логи приходят с префиксом ID контейнера и имени сервиса:
[abc123de web] 2024-01-15T10:30:15.123456789Z 2024/01/15 10:30:15 [notice] 1#1: start worker processes
[def456gh db] 2024-01-15T10:30:16.123456789Z 2024/01/15 10:30:16 [info] database system is ready to accept connections
[ghi789jk redis] 2024-01-15T10:30:17.123456789Z 2024/01/15 10:30:17 [notice] Ready to accept connections
Примеры использования
JavaScript (Browser)
class LogBoardWebSocket {
constructor(baseUrl, token) {
this.baseUrl = baseUrl;
this.token = token;
this.connections = new Map();
}
// Подключение к логам контейнера
connectToContainer(containerId, options = {}) {
const { tail = 100, onMessage, onError, onClose } = options;
const url = `ws://${this.baseUrl.replace('http://', '')}/api/websocket/logs/${containerId}?token=${this.token}&tail=${tail}`;
const ws = new WebSocket(url);
ws.onopen = () => {
console.log(`Подключение к контейнеру ${containerId} установлено`);
};
ws.onmessage = (event) => {
if (onMessage) onMessage(event.data);
};
ws.onerror = (error) => {
console.error(`Ошибка WebSocket для контейнера ${containerId}:`, error);
if (onError) onError(error);
};
ws.onclose = (event) => {
console.log(`Соединение с контейнером ${containerId} закрыто:`, event.code, event.reason);
if (onClose) onClose(event);
};
this.connections.set(containerId, ws);
return ws;
}
// Подключение к логам сервиса
connectToService(serviceName, options = {}) {
const { tail = 100, project, onMessage, onError, onClose } = options;
let url = `ws://${this.baseUrl.replace('http://', '')}/api/websocket/fan/${serviceName}?token=${this.token}&tail=${tail}`;
if (project) url += `&project=${project}`;
const ws = new WebSocket(url);
ws.onopen = () => {
console.log(`Подключение к сервису ${serviceName} установлено`);
};
ws.onmessage = (event) => {
if (onMessage) onMessage(event.data);
};
ws.onerror = (error) => {
console.error(`Ошибка WebSocket для сервиса ${serviceName}:`, error);
if (onError) onError(error);
};
ws.onclose = (event) => {
console.log(`Соединение с сервисом ${serviceName} закрыто:`, event.code, event.reason);
if (onClose) onClose(event);
};
this.connections.set(serviceName, ws);
return ws;
}
// Подключение к группе сервисов
connectToServiceGroup(services, options = {}) {
const { tail = 100, project, onMessage, onError, onClose } = options;
let url = `ws://${this.baseUrl.replace('http://', '')}/api/websocket/fan_group?services=${services}&token=${this.token}&tail=${tail}`;
if (project) url += `&project=${project}`;
const ws = new WebSocket(url);
ws.onopen = () => {
console.log(`Подключение к группе сервисов ${services} установлено`);
};
ws.onmessage = (event) => {
if (onMessage) onMessage(event.data);
};
ws.onerror = (error) => {
console.error(`Ошибка WebSocket для группы сервисов ${services}:`, error);
if (onError) onError(error);
};
ws.onclose = (event) => {
console.log(`Соединение с группой сервисов ${services} закрыто:`, event.code, event.reason);
if (onClose) onClose(event);
};
this.connections.set(`group:${services}`, ws);
return ws;
}
// Закрытие соединения
disconnect(identifier) {
const ws = this.connections.get(identifier);
if (ws) {
ws.close();
this.connections.delete(identifier);
}
}
// Закрытие всех соединений
disconnectAll() {
for (const [identifier, ws] of this.connections) {
ws.close();
}
this.connections.clear();
}
}
// Использование
async function main() {
// Получение токена
const loginResponse = await fetch('http://localhost:9001/api/auth/login', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({
username: 'admin',
password: 'your-password'
})
});
const {access_token} = await loginResponse.json();
// Создание WebSocket клиента
const wsClient = new LogBoardWebSocket('http://localhost:9001', access_token);
// Подключение к контейнеру
wsClient.connectToContainer('abc123def456', {
tail: 100,
onMessage: (data) => {
console.log('Логи контейнера:', data);
// Добавление в UI
appendToLogView(data);
},
onError: (error) => {
console.error('Ошибка:', error);
}
});
// Подключение к сервису
wsClient.connectToService('web', {
tail: 100,
onMessage: (data) => {
console.log('Логи сервиса:', data);
// Парсинг префикса для группировки
const match = data.match(/^\[([a-f0-9]+)\]\s+(.+)$/);
if (match) {
const [, containerId, logMessage] = match;
appendToServiceLogView(containerId, logMessage);
}
}
});
// Подключение к группе сервисов
wsClient.connectToServiceGroup('web,db,redis', {
tail: 100,
onMessage: (data) => {
console.log('Логи группы:', data);
// Парсинг префикса для группировки
const match = data.match(/^\[([a-f0-9]+)\s+(\w+)\]\s+(.+)$/);
if (match) {
const [, containerId, serviceName, logMessage] = match;
appendToGroupLogView(serviceName, containerId, logMessage);
}
}
});
}
// Функции для обновления UI
function appendToLogView(message) {
const logContainer = document.getElementById('log-container');
const logLine = document.createElement('div');
logLine.className = 'log-line';
logLine.textContent = message;
logContainer.appendChild(logLine);
logContainer.scrollTop = logContainer.scrollHeight;
}
function appendToServiceLogView(containerId, message) {
// Реализация для отображения логов сервиса
}
function appendToGroupLogView(serviceName, containerId, message) {
// Реализация для отображения логов группы сервисов
}
main().catch(console.error);
Python
import asyncio
import websockets
import json
import requests
class LogBoardWebSocket:
def __init__(self, base_url, token):
self.base_url = base_url.replace('http://', 'ws://')
self.token = token
self.connections = {}
async def connect_to_container(self, container_id, tail=100, callback=None):
"""Подключение к логам контейнера"""
uri = f"{self.base_url}/api/websocket/logs/{container_id}?token={self.token}&tail={tail}"
try:
async with websockets.connect(uri) as websocket:
print(f"Подключение к контейнеру {container_id} установлено")
async for message in websocket:
if callback:
await callback(container_id, message)
else:
print(f"[{container_id}] {message}")
except websockets.exceptions.ConnectionClosed:
print(f"Соединение с контейнером {container_id} закрыто")
except Exception as e:
print(f"Ошибка подключения к контейнеру {container_id}: {e}")
async def connect_to_service(self, service_name, tail=100, project=None, callback=None):
"""Подключение к логам сервиса"""
uri = f"{self.base_url}/api/websocket/fan/{service_name}?token={self.token}&tail={tail}"
if project:
uri += f"&project={project}"
try:
async with websockets.connect(uri) as websocket:
print(f"Подключение к сервису {service_name} установлено")
async for message in websocket:
if callback:
await callback(service_name, message)
else:
print(f"[{service_name}] {message}")
except websockets.exceptions.ConnectionClosed:
print(f"Соединение с сервисом {service_name} закрыто")
except Exception as e:
print(f"Ошибка подключения к сервису {service_name}: {e}")
async def connect_to_service_group(self, services, tail=100, project=None, callback=None):
"""Подключение к логам группы сервисов"""
uri = f"{self.base_url}/api/websocket/fan_group?services={services}&token={self.token}&tail={tail}"
if project:
uri += f"&project={project}"
try:
async with websockets.connect(uri) as websocket:
print(f"Подключение к группе сервисов {services} установлено")
async for message in websocket:
if callback:
await callback(services, message)
else:
print(f"[{services}] {message}")
except websockets.exceptions.ConnectionClosed:
print(f"Соединение с группой сервисов {services} закрыто")
except Exception as e:
print(f"Ошибка подключения к группе сервисов {services}: {e}")
# Функции обратного вызова
async def container_log_callback(container_id, message):
"""Обработка логов контейнера"""
print(f"[{container_id}] {message}")
async def service_log_callback(service_name, message):
"""Обработка логов сервиса"""
print(f"[{service_name}] {message}")
async def group_log_callback(services, message):
"""Обработка логов группы сервисов"""
print(f"[{services}] {message}")
async def main():
# Получение токена
response = requests.post('http://localhost:9001/api/auth/login', json={
'username': 'admin',
'password': 'your-password'
})
token = response.json()['access_token']
# Создание WebSocket клиента
ws_client = LogBoardWebSocket('http://localhost:9001', token)
# Создание задач для одновременного подключения
tasks = [
ws_client.connect_to_container('abc123def456', tail=100, callback=container_log_callback),
ws_client.connect_to_service('web', tail=100, callback=service_log_callback),
ws_client.connect_to_service_group('web,db,redis', tail=100, callback=group_log_callback)
]
# Запуск всех подключений одновременно
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
Node.js
const WebSocket = require('ws');
const axios = require('axios');
class LogBoardWebSocket {
constructor(baseUrl, token) {
this.baseUrl = baseUrl.replace('http://', 'ws://');
this.token = token;
this.connections = new Map();
}
// Подключение к логам контейнера
connectToContainer(containerId, options = {}) {
const { tail = 100, onMessage, onError, onClose } = options;
const url = `${this.baseUrl}/api/websocket/logs/${containerId}?token=${this.token}&tail=${tail}`;
const ws = new WebSocket(url);
ws.on('open', () => {
console.log(`Подключение к контейнеру ${containerId} установлено`);
});
ws.on('message', (data) => {
const message = data.toString();
if (onMessage) onMessage(message);
else console.log(`[${containerId}] ${message}`);
});
ws.on('error', (error) => {
console.error(`Ошибка WebSocket для контейнера ${containerId}:`, error);
if (onError) onError(error);
});
ws.on('close', (code, reason) => {
console.log(`Соединение с контейнером ${containerId} закрыто:`, code, reason);
if (onClose) onClose(code, reason);
});
this.connections.set(containerId, ws);
return ws;
}
// Подключение к логам сервиса
connectToService(serviceName, options = {}) {
const { tail = 100, project, onMessage, onError, onClose } = options;
let url = `${this.baseUrl}/api/websocket/fan/${serviceName}?token=${this.token}&tail=${tail}`;
if (project) url += `&project=${project}`;
const ws = new WebSocket(url);
ws.on('open', () => {
console.log(`Подключение к сервису ${serviceName} установлено`);
});
ws.on('message', (data) => {
const message = data.toString();
if (onMessage) onMessage(message);
else console.log(`[${serviceName}] ${message}`);
});
ws.on('error', (error) => {
console.error(`Ошибка WebSocket для сервиса ${serviceName}:`, error);
if (onError) onError(error);
});
ws.on('close', (code, reason) => {
console.log(`Соединение с сервисом ${serviceName} закрыто:`, code, reason);
if (onClose) onClose(code, reason);
});
this.connections.set(serviceName, ws);
return ws;
}
// Закрытие соединения
disconnect(identifier) {
const ws = this.connections.get(identifier);
if (ws) {
ws.close();
this.connections.delete(identifier);
}
}
// Закрытие всех соединений
disconnectAll() {
for (const [identifier, ws] of this.connections) {
ws.close();
}
this.connections.clear();
}
}
async function main() {
try {
// Получение токена
const loginResponse = await axios.post('http://localhost:9001/api/auth/login', {
username: 'admin',
password: 'your-password'
});
const token = loginResponse.data.access_token;
// Создание WebSocket клиента
const wsClient = new LogBoardWebSocket('http://localhost:9001', token);
// Подключение к контейнеру
wsClient.connectToContainer('abc123def456', {
tail: 100,
onMessage: (message) => {
console.log('Логи контейнера:', message);
}
});
// Подключение к сервису
wsClient.connectToService('web', {
tail: 100,
onMessage: (message) => {
console.log('Логи сервиса:', message);
}
});
// Обработка сигналов завершения
process.on('SIGINT', () => {
console.log('Закрытие соединений...');
wsClient.disconnectAll();
process.exit(0);
});
} catch (error) {
console.error('Ошибка:', error.message);
}
}
main();
Обработка ошибок
Коды ошибок WebSocket
Код | Описание |
---|---|
1000 | Нормальное закрытие |
1001 | Удаленная сторона закрыла соединение |
1002 | Ошибка протокола |
1003 | Неподдерживаемый тип данных |
1006 | Аномальное закрытие |
1011 | Внутренняя ошибка сервера |
Обработка ошибок аутентификации
ws.onmessage = function(event) {
if (event.data.startsWith('ERROR:')) {
const error = event.data.substring(6);
console.error('Ошибка аутентификации:', error);
if (error.includes('token required') || error.includes('invalid token')) {
// Переподключение с новым токеном
reconnectWithNewToken();
}
} else {
// Обработка обычных логов
console.log('Логи:', event.data);
}
};
Автоматическое переподключение
class ReconnectingWebSocket {
constructor(url, options = {}) {
this.url = url;
this.options = options;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = options.maxReconnectAttempts || 5;
this.reconnectInterval = options.reconnectInterval || 1000;
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('WebSocket соединение установлено');
this.reconnectAttempts = 0;
};
this.ws.onclose = (event) => {
console.log('WebSocket соединение закрыто:', event.code, event.reason);
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
console.log(`Попытка переподключения ${this.reconnectAttempts}/${this.maxReconnectAttempts}`);
setTimeout(() => {
this.connect();
}, this.reconnectInterval * this.reconnectAttempts);
} else {
console.error('Превышено максимальное количество попыток переподключения');
}
};
this.ws.onerror = (error) => {
console.error('WebSocket ошибка:', error);
};
}
send(data) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(data);
}
}
close() {
this.ws.close();
}
}
Лучшие практики
1. Управление соединениями
// Создание пула соединений
class WebSocketPool {
constructor() {
this.connections = new Map();
this.maxConnections = 10;
}
connect(identifier, url, options = {}) {
if (this.connections.size >= this.maxConnections) {
console.warn('Достигнут лимит соединений');
return null;
}
const ws = new WebSocket(url);
this.connections.set(identifier, ws);
ws.onclose = () => {
this.connections.delete(identifier);
};
return ws;
}
disconnect(identifier) {
const ws = this.connections.get(identifier);
if (ws) {
ws.close();
this.connections.delete(identifier);
}
}
}
2. Обработка больших объемов данных
// Буферизация сообщений
class LogBuffer {
constructor(maxSize = 1000) {
this.buffer = [];
this.maxSize = maxSize;
}
add(message) {
this.buffer.push(message);
if (this.buffer.length > this.maxSize) {
this.buffer.shift(); // Удаляем старые сообщения
}
}
getMessages() {
return [...this.buffer];
}
clear() {
this.buffer = [];
}
}
// Использование
const logBuffer = new LogBuffer(1000);
ws.onmessage = function(event) {
logBuffer.add(event.data);
// Обновление UI каждые 100ms
if (!updateScheduled) {
updateScheduled = setTimeout(() => {
updateUI(logBuffer.getMessages());
updateScheduled = null;
}, 100);
}
};
3. Фильтрация логов
// Фильтр по уровню логирования
class LogFilter {
constructor() {
this.filters = {
level: 'all', // debug, info, warn, error, all
service: null,
container: null,
text: null
};
}
setFilter(type, value) {
this.filters[type] = value;
}
shouldDisplay(message) {
// Фильтр по уровню
if (this.filters.level !== 'all') {
const levelMatch = message.toLowerCase().includes(`level=${this.filters.level}`);
if (!levelMatch) return false;
}
// Фильтр по сервису
if (this.filters.service) {
const serviceMatch = message.includes(`[${this.filters.service}]`);
if (!serviceMatch) return false;
}
// Фильтр по контейнеру
if (this.filters.container) {
const containerMatch = message.includes(`[${this.filters.container}]`);
if (!containerMatch) return false;
}
// Фильтр по тексту
if (this.filters.text) {
const textMatch = message.toLowerCase().includes(this.filters.text.toLowerCase());
if (!textMatch) return false;
}
return true;
}
}
// Использование
const logFilter = new LogFilter();
logFilter.setFilter('level', 'error');
ws.onmessage = function(event) {
if (logFilter.shouldDisplay(event.data)) {
console.log('Отфильтрованные логи:', event.data);
}
};
4. Мониторинг состояния соединений
class ConnectionMonitor {
constructor() {
this.stats = {
totalMessages: 0,
totalBytes: 0,
connectionTime: null,
lastMessageTime: null,
errors: 0
};
}
onConnect() {
this.stats.connectionTime = new Date();
this.stats.totalMessages = 0;
this.stats.totalBytes = 0;
this.stats.errors = 0;
}
onMessage(message) {
this.stats.totalMessages++;
this.stats.totalBytes += message.length;
this.stats.lastMessageTime = new Date();
}
onError() {
this.stats.errors++;
}
getStats() {
return {
...this.stats,
uptime: this.stats.connectionTime ?
Date.now() - this.stats.connectionTime.getTime() : 0,
messagesPerSecond: this.stats.totalMessages /
(this.stats.uptime / 1000) || 0
};
}
}
Ограничения и рекомендации
Ограничения
- Максимальное количество соединений: 100 одновременных WebSocket соединений
- Таймаут соединения: 60 секунд бездействия
- Размер сообщения: до 1 MB на сообщение
- Частота сообщений: до 1000 сообщений в секунду
Рекомендации
- Используйте переподключение при разрыве соединений
- Ограничивайте количество соединений для одного клиента
- Фильтруйте логи на стороне клиента для снижения нагрузки
- Мониторьте состояние соединений для диагностики проблем
- Используйте буферизацию для больших объемов данных