logboard/docs/websocket.md

31 KiB
Raw Permalink Blame History

WebSocket API LogBoard+

Автор: Сергей Антропов
Сайт: https://devops.org.ru

Содержание

  1. Обзор WebSocket API
  2. Аутентификация
  3. Endpoints
  4. Примеры использования
  5. Обработка ошибок
  6. Лучшие практики

Обзор WebSocket API

WebSocket API LogBoard+ предоставляет возможность получения логов Docker контейнеров в реальном времени. Это позволяет создавать интерактивные приложения для мониторинга и отладки микросервисов.

Преимущества WebSocket

  • Real-time данные - мгновенное получение новых логов
  • Эффективность - меньше накладных расходов по сравнению с polling
  • Масштабируемость - поддержка множественных соединений
  • Фильтрация - получение логов только нужных контейнеров

Поддерживаемые протоколы

  • ws:// - для HTTP соединений
  • wss:// - для HTTPS соединений (рекомендуется для продакшена)

Аутентификация

Все WebSocket endpoints требуют JWT токен для аутентификации. Токен передается как параметр запроса token.

Получение токена

# Получение JWT токена через REST API
curl -X POST "http://localhost:9001/api/auth/login" \
  -H "Content-Type: application/json" \
  -d '{"username":"admin","password":"your-password"}'

Использование токена

const token = "your-jwt-token";
const ws = new WebSocket(`ws://localhost:9001/api/websocket/logs/container-id?token=${token}`);

Endpoints

ws://host:port/api/websocket/logs/{container_id}

Получение логов отдельного контейнера в реальном времени.

Параметры

Параметр Тип Обязательный Описание
container_id string Да ID контейнера (первые 12 символов)
token string Да JWT токен аутентификации
tail number Нет Количество начальных строк (по умолчанию 500)
service string Нет Имя сервиса (для информации)
project string Нет Имя проекта (для информации)

Пример подключения

const containerId = "abc123def456";
const token = "your-jwt-token";

const ws = new WebSocket(`ws://localhost:9001/api/websocket/logs/${containerId}?token=${token}&tail=100`);

ws.onopen = function() {
    console.log('WebSocket соединение установлено');
};

ws.onmessage = function(event) {
    console.log('Получены логи:', event.data);
};

ws.onerror = function(error) {
    console.error('WebSocket ошибка:', error);
};

ws.onclose = function(event) {
    console.log('WebSocket соединение закрыто:', event.code, event.reason);
};

Формат сообщений

При подключении сервер отправляет приветственное сообщение:

Connected to container: myproject_web_1

Затем отправляются логи в формате:

2024-01-15T10:30:15.123456789Z 2024/01/15 10:30:15 [notice] 1#1: start worker processes
2024-01-15T10:30:15.123456789Z 2024/01/15 10:30:15 [notice] 1#1: start worker process 1234

ws://host:port/api/websocket/fan/{service_name}

Получение логов всех реплик сервиса Docker Compose (fan-in).

Параметры

Параметр Тип Обязательный Описание
service_name string Да Имя сервиса Docker Compose
token string Да JWT токен аутентификации
tail number Нет Количество начальных строк (по умолчанию 500)
project string Нет Имя проекта (для фильтрации)

Пример подключения

const serviceName = "web";
const token = "your-jwt-token";

const ws = new WebSocket(`ws://localhost:9001/api/websocket/fan/${serviceName}?token=${token}&tail=100`);

ws.onmessage = function(event) {
    // Логи приходят с префиксом ID контейнера
    console.log('Логи сервиса:', event.data);
};

Формат сообщений

Логи приходят с префиксом короткого ID контейнера:

[abc123de] 2024-01-15T10:30:15.123456789Z 2024/01/15 10:30:15 [notice] 1#1: start worker processes
[def456gh] 2024-01-15T10:30:16.123456789Z 2024/01/15 10:30:16 [notice] 1#1: start worker processes

ws://host:port/api/websocket/fan_group

Получение логов группы сервисов одновременно.

Параметры

Параметр Тип Обязательный Описание
services string Да Имена сервисов через запятую
token string Да JWT токен аутентификации
tail number Нет Количество начальных строк (по умолчанию 500)
project string Нет Имя проекта (для фильтрации)

Пример подключения

const services = "web,db,redis";
const token = "your-jwt-token";

const ws = new WebSocket(`ws://localhost:9001/api/websocket/fan_group?services=${services}&token=${token}&tail=100`);

ws.onmessage = function(event) {
    // Логи приходят с префиксом ID контейнера и имени сервиса
    console.log('Логи группы сервисов:', event.data);
};

Формат сообщений

Логи приходят с префиксом ID контейнера и имени сервиса:

[abc123de web] 2024-01-15T10:30:15.123456789Z 2024/01/15 10:30:15 [notice] 1#1: start worker processes
[def456gh db] 2024-01-15T10:30:16.123456789Z 2024/01/15 10:30:16 [info] database system is ready to accept connections
[ghi789jk redis] 2024-01-15T10:30:17.123456789Z 2024/01/15 10:30:17 [notice] Ready to accept connections

Примеры использования

JavaScript (Browser)

class LogBoardWebSocket {
    constructor(baseUrl, token) {
        this.baseUrl = baseUrl;
        this.token = token;
        this.connections = new Map();
    }

    // Подключение к логам контейнера
    connectToContainer(containerId, options = {}) {
        const { tail = 100, onMessage, onError, onClose } = options;
        
        const url = `ws://${this.baseUrl.replace('http://', '')}/api/websocket/logs/${containerId}?token=${this.token}&tail=${tail}`;
        const ws = new WebSocket(url);

        ws.onopen = () => {
            console.log(`Подключение к контейнеру ${containerId} установлено`);
        };

        ws.onmessage = (event) => {
            if (onMessage) onMessage(event.data);
        };

        ws.onerror = (error) => {
            console.error(`Ошибка WebSocket для контейнера ${containerId}:`, error);
            if (onError) onError(error);
        };

        ws.onclose = (event) => {
            console.log(`Соединение с контейнером ${containerId} закрыто:`, event.code, event.reason);
            if (onClose) onClose(event);
        };

        this.connections.set(containerId, ws);
        return ws;
    }

    // Подключение к логам сервиса
    connectToService(serviceName, options = {}) {
        const { tail = 100, project, onMessage, onError, onClose } = options;
        
        let url = `ws://${this.baseUrl.replace('http://', '')}/api/websocket/fan/${serviceName}?token=${this.token}&tail=${tail}`;
        if (project) url += `&project=${project}`;
        
        const ws = new WebSocket(url);

        ws.onopen = () => {
            console.log(`Подключение к сервису ${serviceName} установлено`);
        };

        ws.onmessage = (event) => {
            if (onMessage) onMessage(event.data);
        };

        ws.onerror = (error) => {
            console.error(`Ошибка WebSocket для сервиса ${serviceName}:`, error);
            if (onError) onError(error);
        };

        ws.onclose = (event) => {
            console.log(`Соединение с сервисом ${serviceName} закрыто:`, event.code, event.reason);
            if (onClose) onClose(event);
        };

        this.connections.set(serviceName, ws);
        return ws;
    }

    // Подключение к группе сервисов
    connectToServiceGroup(services, options = {}) {
        const { tail = 100, project, onMessage, onError, onClose } = options;
        
        let url = `ws://${this.baseUrl.replace('http://', '')}/api/websocket/fan_group?services=${services}&token=${this.token}&tail=${tail}`;
        if (project) url += `&project=${project}`;
        
        const ws = new WebSocket(url);

        ws.onopen = () => {
            console.log(`Подключение к группе сервисов ${services} установлено`);
        };

        ws.onmessage = (event) => {
            if (onMessage) onMessage(event.data);
        };

        ws.onerror = (error) => {
            console.error(`Ошибка WebSocket для группы сервисов ${services}:`, error);
            if (onError) onError(error);
        };

        ws.onclose = (event) => {
            console.log(`Соединение с группой сервисов ${services} закрыто:`, event.code, event.reason);
            if (onClose) onClose(event);
        };

        this.connections.set(`group:${services}`, ws);
        return ws;
    }

    // Закрытие соединения
    disconnect(identifier) {
        const ws = this.connections.get(identifier);
        if (ws) {
            ws.close();
            this.connections.delete(identifier);
        }
    }

    // Закрытие всех соединений
    disconnectAll() {
        for (const [identifier, ws] of this.connections) {
            ws.close();
        }
        this.connections.clear();
    }
}

// Использование
async function main() {
    // Получение токена
    const loginResponse = await fetch('http://localhost:9001/api/auth/login', {
        method: 'POST',
        headers: {'Content-Type': 'application/json'},
        body: JSON.stringify({
            username: 'admin',
            password: 'your-password'
        })
    });
    const {access_token} = await loginResponse.json();

    // Создание WebSocket клиента
    const wsClient = new LogBoardWebSocket('http://localhost:9001', access_token);

    // Подключение к контейнеру
    wsClient.connectToContainer('abc123def456', {
        tail: 100,
        onMessage: (data) => {
            console.log('Логи контейнера:', data);
            // Добавление в UI
            appendToLogView(data);
        },
        onError: (error) => {
            console.error('Ошибка:', error);
        }
    });

    // Подключение к сервису
    wsClient.connectToService('web', {
        tail: 100,
        onMessage: (data) => {
            console.log('Логи сервиса:', data);
            // Парсинг префикса для группировки
            const match = data.match(/^\[([a-f0-9]+)\]\s+(.+)$/);
            if (match) {
                const [, containerId, logMessage] = match;
                appendToServiceLogView(containerId, logMessage);
            }
        }
    });

    // Подключение к группе сервисов
    wsClient.connectToServiceGroup('web,db,redis', {
        tail: 100,
        onMessage: (data) => {
            console.log('Логи группы:', data);
            // Парсинг префикса для группировки
            const match = data.match(/^\[([a-f0-9]+)\s+(\w+)\]\s+(.+)$/);
            if (match) {
                const [, containerId, serviceName, logMessage] = match;
                appendToGroupLogView(serviceName, containerId, logMessage);
            }
        }
    });
}

// Функции для обновления UI
function appendToLogView(message) {
    const logContainer = document.getElementById('log-container');
    const logLine = document.createElement('div');
    logLine.className = 'log-line';
    logLine.textContent = message;
    logContainer.appendChild(logLine);
    logContainer.scrollTop = logContainer.scrollHeight;
}

function appendToServiceLogView(containerId, message) {
    // Реализация для отображения логов сервиса
}

function appendToGroupLogView(serviceName, containerId, message) {
    // Реализация для отображения логов группы сервисов
}

main().catch(console.error);

Python

import asyncio
import websockets
import json
import requests

class LogBoardWebSocket:
    def __init__(self, base_url, token):
        self.base_url = base_url.replace('http://', 'ws://')
        self.token = token
        self.connections = {}

    async def connect_to_container(self, container_id, tail=100, callback=None):
        """Подключение к логам контейнера"""
        uri = f"{self.base_url}/api/websocket/logs/{container_id}?token={self.token}&tail={tail}"
        
        try:
            async with websockets.connect(uri) as websocket:
                print(f"Подключение к контейнеру {container_id} установлено")
                
                async for message in websocket:
                    if callback:
                        await callback(container_id, message)
                    else:
                        print(f"[{container_id}] {message}")
                        
        except websockets.exceptions.ConnectionClosed:
            print(f"Соединение с контейнером {container_id} закрыто")
        except Exception as e:
            print(f"Ошибка подключения к контейнеру {container_id}: {e}")

    async def connect_to_service(self, service_name, tail=100, project=None, callback=None):
        """Подключение к логам сервиса"""
        uri = f"{self.base_url}/api/websocket/fan/{service_name}?token={self.token}&tail={tail}"
        if project:
            uri += f"&project={project}"
        
        try:
            async with websockets.connect(uri) as websocket:
                print(f"Подключение к сервису {service_name} установлено")
                
                async for message in websocket:
                    if callback:
                        await callback(service_name, message)
                    else:
                        print(f"[{service_name}] {message}")
                        
        except websockets.exceptions.ConnectionClosed:
            print(f"Соединение с сервисом {service_name} закрыто")
        except Exception as e:
            print(f"Ошибка подключения к сервису {service_name}: {e}")

    async def connect_to_service_group(self, services, tail=100, project=None, callback=None):
        """Подключение к логам группы сервисов"""
        uri = f"{self.base_url}/api/websocket/fan_group?services={services}&token={self.token}&tail={tail}"
        if project:
            uri += f"&project={project}"
        
        try:
            async with websockets.connect(uri) as websocket:
                print(f"Подключение к группе сервисов {services} установлено")
                
                async for message in websocket:
                    if callback:
                        await callback(services, message)
                    else:
                        print(f"[{services}] {message}")
                        
        except websockets.exceptions.ConnectionClosed:
            print(f"Соединение с группой сервисов {services} закрыто")
        except Exception as e:
            print(f"Ошибка подключения к группе сервисов {services}: {e}")

# Функции обратного вызова
async def container_log_callback(container_id, message):
    """Обработка логов контейнера"""
    print(f"[{container_id}] {message}")

async def service_log_callback(service_name, message):
    """Обработка логов сервиса"""
    print(f"[{service_name}] {message}")

async def group_log_callback(services, message):
    """Обработка логов группы сервисов"""
    print(f"[{services}] {message}")

async def main():
    # Получение токена
    response = requests.post('http://localhost:9001/api/auth/login', json={
        'username': 'admin',
        'password': 'your-password'
    })
    token = response.json()['access_token']

    # Создание WebSocket клиента
    ws_client = LogBoardWebSocket('http://localhost:9001', token)

    # Создание задач для одновременного подключения
    tasks = [
        ws_client.connect_to_container('abc123def456', tail=100, callback=container_log_callback),
        ws_client.connect_to_service('web', tail=100, callback=service_log_callback),
        ws_client.connect_to_service_group('web,db,redis', tail=100, callback=group_log_callback)
    ]

    # Запуск всех подключений одновременно
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

Node.js

const WebSocket = require('ws');
const axios = require('axios');

class LogBoardWebSocket {
    constructor(baseUrl, token) {
        this.baseUrl = baseUrl.replace('http://', 'ws://');
        this.token = token;
        this.connections = new Map();
    }

    // Подключение к логам контейнера
    connectToContainer(containerId, options = {}) {
        const { tail = 100, onMessage, onError, onClose } = options;
        
        const url = `${this.baseUrl}/api/websocket/logs/${containerId}?token=${this.token}&tail=${tail}`;
        const ws = new WebSocket(url);

        ws.on('open', () => {
            console.log(`Подключение к контейнеру ${containerId} установлено`);
        });

        ws.on('message', (data) => {
            const message = data.toString();
            if (onMessage) onMessage(message);
            else console.log(`[${containerId}] ${message}`);
        });

        ws.on('error', (error) => {
            console.error(`Ошибка WebSocket для контейнера ${containerId}:`, error);
            if (onError) onError(error);
        });

        ws.on('close', (code, reason) => {
            console.log(`Соединение с контейнером ${containerId} закрыто:`, code, reason);
            if (onClose) onClose(code, reason);
        });

        this.connections.set(containerId, ws);
        return ws;
    }

    // Подключение к логам сервиса
    connectToService(serviceName, options = {}) {
        const { tail = 100, project, onMessage, onError, onClose } = options;
        
        let url = `${this.baseUrl}/api/websocket/fan/${serviceName}?token=${this.token}&tail=${tail}`;
        if (project) url += `&project=${project}`;
        
        const ws = new WebSocket(url);

        ws.on('open', () => {
            console.log(`Подключение к сервису ${serviceName} установлено`);
        });

        ws.on('message', (data) => {
            const message = data.toString();
            if (onMessage) onMessage(message);
            else console.log(`[${serviceName}] ${message}`);
        });

        ws.on('error', (error) => {
            console.error(`Ошибка WebSocket для сервиса ${serviceName}:`, error);
            if (onError) onError(error);
        });

        ws.on('close', (code, reason) => {
            console.log(`Соединение с сервисом ${serviceName} закрыто:`, code, reason);
            if (onClose) onClose(code, reason);
        });

        this.connections.set(serviceName, ws);
        return ws;
    }

    // Закрытие соединения
    disconnect(identifier) {
        const ws = this.connections.get(identifier);
        if (ws) {
            ws.close();
            this.connections.delete(identifier);
        }
    }

    // Закрытие всех соединений
    disconnectAll() {
        for (const [identifier, ws] of this.connections) {
            ws.close();
        }
        this.connections.clear();
    }
}

async function main() {
    try {
        // Получение токена
        const loginResponse = await axios.post('http://localhost:9001/api/auth/login', {
            username: 'admin',
            password: 'your-password'
        });
        const token = loginResponse.data.access_token;

        // Создание WebSocket клиента
        const wsClient = new LogBoardWebSocket('http://localhost:9001', token);

        // Подключение к контейнеру
        wsClient.connectToContainer('abc123def456', {
            tail: 100,
            onMessage: (message) => {
                console.log('Логи контейнера:', message);
            }
        });

        // Подключение к сервису
        wsClient.connectToService('web', {
            tail: 100,
            onMessage: (message) => {
                console.log('Логи сервиса:', message);
            }
        });

        // Обработка сигналов завершения
        process.on('SIGINT', () => {
            console.log('Закрытие соединений...');
            wsClient.disconnectAll();
            process.exit(0);
        });

    } catch (error) {
        console.error('Ошибка:', error.message);
    }
}

main();

Обработка ошибок

Коды ошибок WebSocket

Код Описание
1000 Нормальное закрытие
1001 Удаленная сторона закрыла соединение
1002 Ошибка протокола
1003 Неподдерживаемый тип данных
1006 Аномальное закрытие
1011 Внутренняя ошибка сервера

Обработка ошибок аутентификации

ws.onmessage = function(event) {
    if (event.data.startsWith('ERROR:')) {
        const error = event.data.substring(6);
        console.error('Ошибка аутентификации:', error);
        
        if (error.includes('token required') || error.includes('invalid token')) {
            // Переподключение с новым токеном
            reconnectWithNewToken();
        }
    } else {
        // Обработка обычных логов
        console.log('Логи:', event.data);
    }
};

Автоматическое переподключение

class ReconnectingWebSocket {
    constructor(url, options = {}) {
        this.url = url;
        this.options = options;
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = options.maxReconnectAttempts || 5;
        this.reconnectInterval = options.reconnectInterval || 1000;
        this.connect();
    }

    connect() {
        this.ws = new WebSocket(this.url);
        
        this.ws.onopen = () => {
            console.log('WebSocket соединение установлено');
            this.reconnectAttempts = 0;
        };

        this.ws.onclose = (event) => {
            console.log('WebSocket соединение закрыто:', event.code, event.reason);
            
            if (this.reconnectAttempts < this.maxReconnectAttempts) {
                this.reconnectAttempts++;
                console.log(`Попытка переподключения ${this.reconnectAttempts}/${this.maxReconnectAttempts}`);
                
                setTimeout(() => {
                    this.connect();
                }, this.reconnectInterval * this.reconnectAttempts);
            } else {
                console.error('Превышено максимальное количество попыток переподключения');
            }
        };

        this.ws.onerror = (error) => {
            console.error('WebSocket ошибка:', error);
        };
    }

    send(data) {
        if (this.ws.readyState === WebSocket.OPEN) {
            this.ws.send(data);
        }
    }

    close() {
        this.ws.close();
    }
}

Лучшие практики

1. Управление соединениями

// Создание пула соединений
class WebSocketPool {
    constructor() {
        this.connections = new Map();
        this.maxConnections = 10;
    }

    connect(identifier, url, options = {}) {
        if (this.connections.size >= this.maxConnections) {
            console.warn('Достигнут лимит соединений');
            return null;
        }

        const ws = new WebSocket(url);
        this.connections.set(identifier, ws);
        
        ws.onclose = () => {
            this.connections.delete(identifier);
        };

        return ws;
    }

    disconnect(identifier) {
        const ws = this.connections.get(identifier);
        if (ws) {
            ws.close();
            this.connections.delete(identifier);
        }
    }
}

2. Обработка больших объемов данных

// Буферизация сообщений
class LogBuffer {
    constructor(maxSize = 1000) {
        this.buffer = [];
        this.maxSize = maxSize;
    }

    add(message) {
        this.buffer.push(message);
        
        if (this.buffer.length > this.maxSize) {
            this.buffer.shift(); // Удаляем старые сообщения
        }
    }

    getMessages() {
        return [...this.buffer];
    }

    clear() {
        this.buffer = [];
    }
}

// Использование
const logBuffer = new LogBuffer(1000);

ws.onmessage = function(event) {
    logBuffer.add(event.data);
    
    // Обновление UI каждые 100ms
    if (!updateScheduled) {
        updateScheduled = setTimeout(() => {
            updateUI(logBuffer.getMessages());
            updateScheduled = null;
        }, 100);
    }
};

3. Фильтрация логов

// Фильтр по уровню логирования
class LogFilter {
    constructor() {
        this.filters = {
            level: 'all', // debug, info, warn, error, all
            service: null,
            container: null,
            text: null
        };
    }

    setFilter(type, value) {
        this.filters[type] = value;
    }

    shouldDisplay(message) {
        // Фильтр по уровню
        if (this.filters.level !== 'all') {
            const levelMatch = message.toLowerCase().includes(`level=${this.filters.level}`);
            if (!levelMatch) return false;
        }

        // Фильтр по сервису
        if (this.filters.service) {
            const serviceMatch = message.includes(`[${this.filters.service}]`);
            if (!serviceMatch) return false;
        }

        // Фильтр по контейнеру
        if (this.filters.container) {
            const containerMatch = message.includes(`[${this.filters.container}]`);
            if (!containerMatch) return false;
        }

        // Фильтр по тексту
        if (this.filters.text) {
            const textMatch = message.toLowerCase().includes(this.filters.text.toLowerCase());
            if (!textMatch) return false;
        }

        return true;
    }
}

// Использование
const logFilter = new LogFilter();
logFilter.setFilter('level', 'error');

ws.onmessage = function(event) {
    if (logFilter.shouldDisplay(event.data)) {
        console.log('Отфильтрованные логи:', event.data);
    }
};

4. Мониторинг состояния соединений

class ConnectionMonitor {
    constructor() {
        this.stats = {
            totalMessages: 0,
            totalBytes: 0,
            connectionTime: null,
            lastMessageTime: null,
            errors: 0
        };
    }

    onConnect() {
        this.stats.connectionTime = new Date();
        this.stats.totalMessages = 0;
        this.stats.totalBytes = 0;
        this.stats.errors = 0;
    }

    onMessage(message) {
        this.stats.totalMessages++;
        this.stats.totalBytes += message.length;
        this.stats.lastMessageTime = new Date();
    }

    onError() {
        this.stats.errors++;
    }

    getStats() {
        return {
            ...this.stats,
            uptime: this.stats.connectionTime ? 
                Date.now() - this.stats.connectionTime.getTime() : 0,
            messagesPerSecond: this.stats.totalMessages / 
                (this.stats.uptime / 1000) || 0
        };
    }
}

Ограничения и рекомендации

Ограничения

  • Максимальное количество соединений: 100 одновременных WebSocket соединений
  • Таймаут соединения: 60 секунд бездействия
  • Размер сообщения: до 1 MB на сообщение
  • Частота сообщений: до 1000 сообщений в секунду

Рекомендации

  1. Используйте переподключение при разрыве соединений
  2. Ограничивайте количество соединений для одного клиента
  3. Фильтруйте логи на стороне клиента для снижения нагрузки
  4. Мониторьте состояние соединений для диагностики проблем
  5. Используйте буферизацию для больших объемов данных