feat: добавлена поддержка SSL для Kafka
- Добавлены SSL поля в KafkaConfig структуру - Реализована SSL поддержка в KafkaOutput с TLS транспортом - Добавлена поддержка переменных окружения для SSL настроек - Обновлен config.yaml с SSL конфигурацией - Создан env.example с SSL переменными - Добавлена документация по SSL в docs/kafka_ssl.md - Обновлен README.md с ссылкой на SSL документацию Поддерживаемые SSL параметры: - ssl_enabled, ssl_keystore_location, ssl_keystore_password - ssl_key_password, ssl_truststore_location, ssl_truststore_password - ssl_client_auth, ssl_endpoint_identification_algorithm Автор: Сергей Антропов, сайт: https://devops.org.ru
This commit is contained in:
parent
9770917312
commit
5bfb6fea8b
@ -10,6 +10,7 @@ SensusAgent — модульный агент сбора метрик. Аген
|
|||||||
- Коллекторы (создание и сборка): `docs/collectors.md`
|
- Коллекторы (создание и сборка): `docs/collectors.md`
|
||||||
- Сборка и запуск (Make/Docker/Compose): `docs/build_and_run.md`
|
- Сборка и запуск (Make/Docker/Compose): `docs/build_and_run.md`
|
||||||
- Деплой (Ansible, systemd): `docs/deploy.md`
|
- Деплой (Ansible, systemd): `docs/deploy.md`
|
||||||
|
- **Kafka SSL поддержка**: `docs/kafka_ssl.md` ⭐
|
||||||
|
|
||||||
Быстрый старт:
|
Быстрый старт:
|
||||||
```bash
|
```bash
|
||||||
|
@ -11,6 +11,15 @@ kafka:
|
|||||||
client_id: "sensusagent"
|
client_id: "sensusagent"
|
||||||
enable_tls: false
|
enable_tls: false
|
||||||
timeout: "5s"
|
timeout: "5s"
|
||||||
|
# SSL настройки для Kafka
|
||||||
|
ssl_enabled: false
|
||||||
|
ssl_keystore_location: "/var/ssl/private/kafka.client.keystore.jks"
|
||||||
|
ssl_keystore_password: "kafka123"
|
||||||
|
ssl_key_password: "kafka123"
|
||||||
|
ssl_truststore_location: "/var/ssl/private/kafka.client.truststore.jks"
|
||||||
|
ssl_truststore_password: "kafka123"
|
||||||
|
ssl_client_auth: "none" # none, required, requested
|
||||||
|
ssl_endpoint_identification_algorithm: "https" # https, none
|
||||||
|
|
||||||
collectors:
|
collectors:
|
||||||
system:
|
system:
|
||||||
|
191
docs/kafka_ssl.md
Normal file
191
docs/kafka_ssl.md
Normal file
@ -0,0 +1,191 @@
|
|||||||
|
# Kafka SSL поддержка в SensusAgent
|
||||||
|
|
||||||
|
## Автор: Сергей Антропов, сайт: https://devops.org.ru
|
||||||
|
|
||||||
|
## Обзор
|
||||||
|
|
||||||
|
SensusAgent теперь поддерживает SSL/TLS подключения к Kafka для обеспечения безопасной передачи метрик. Эта функциональность позволяет использовать зашифрованные соединения между агентом и Kafka брокером.
|
||||||
|
|
||||||
|
## Конфигурация SSL
|
||||||
|
|
||||||
|
### 1. Настройка в config.yaml
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
kafka:
|
||||||
|
enabled: true
|
||||||
|
brokers: ["10.99.0.90:9093"] # SSL порт
|
||||||
|
topic: "sensus.metrics"
|
||||||
|
client_id: "sensusagent"
|
||||||
|
enable_tls: false # Устаревшая настройка, используйте ssl_enabled
|
||||||
|
timeout: "5s"
|
||||||
|
|
||||||
|
# SSL настройки
|
||||||
|
ssl_enabled: true
|
||||||
|
ssl_keystore_location: "/var/ssl/private/kafka.client.keystore.jks"
|
||||||
|
ssl_keystore_password: "kafka123"
|
||||||
|
ssl_key_password: "kafka123"
|
||||||
|
ssl_truststore_location: "/var/ssl/private/kafka.client.truststore.jks"
|
||||||
|
ssl_truststore_password: "kafka123"
|
||||||
|
ssl_client_auth: "none" # none, required, requested
|
||||||
|
ssl_endpoint_identification_algorithm: "https" # https, none
|
||||||
|
```
|
||||||
|
|
||||||
|
### 2. Настройка через переменные окружения
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Основные настройки
|
||||||
|
KAFKA_BROKERS=kafka:9093
|
||||||
|
KAFKA_TOPIC=sensus.metrics
|
||||||
|
KAFKA_CLIENT_ID=sensusagent
|
||||||
|
|
||||||
|
# SSL настройки
|
||||||
|
KAFKA_SSL_ENABLED=true
|
||||||
|
KAFKA_SSL_KEYSTORE_PASSWORD=kafka123
|
||||||
|
KAFKA_SSL_KEY_PASSWORD=kafka123
|
||||||
|
KAFKA_SSL_TRUSTSTORE_PASSWORD=kafka123
|
||||||
|
KAFKA_SSL_CLIENT_AUTH=none
|
||||||
|
KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
|
||||||
|
```
|
||||||
|
|
||||||
|
## Параметры SSL конфигурации
|
||||||
|
|
||||||
|
| Параметр | Описание | Возможные значения | По умолчанию |
|
||||||
|
|----------|----------|-------------------|--------------|
|
||||||
|
| `ssl_enabled` | Включение SSL подключения | `true`, `false` | `false` |
|
||||||
|
| `ssl_keystore_location` | Путь к клиентскому keystore | Путь к файлу | `/var/ssl/private/kafka.client.keystore.jks` |
|
||||||
|
| `ssl_keystore_password` | Пароль keystore | Строка | `kafka123` |
|
||||||
|
| `ssl_key_password` | Пароль приватного ключа | Строка | `kafka123` |
|
||||||
|
| `ssl_truststore_location` | Путь к клиентскому truststore | Путь к файлу | `/var/ssl/private/kafka.client.truststore.jks` |
|
||||||
|
| `ssl_truststore_password` | Пароль truststore | Строка | `kafka123` |
|
||||||
|
| `ssl_client_auth` | Требование аутентификации клиента | `none`, `required`, `requested` | `none` |
|
||||||
|
| `ssl_endpoint_identification_algorithm` | Алгоритм идентификации endpoint | `https`, `none` | `https` |
|
||||||
|
|
||||||
|
## Docker конфигурация
|
||||||
|
|
||||||
|
### 1. В docker-compose.yml
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
sensus-agent:
|
||||||
|
environment:
|
||||||
|
KAFKA_SSL_ENABLED: "true"
|
||||||
|
KAFKA_SSL_KEYSTORE_PASSWORD: "kafka123"
|
||||||
|
KAFKA_SSL_KEY_PASSWORD: "kafka123"
|
||||||
|
KAFKA_SSL_TRUSTSTORE_PASSWORD: "kafka123"
|
||||||
|
KAFKA_SSL_CLIENT_AUTH: "none"
|
||||||
|
KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
|
||||||
|
volumes:
|
||||||
|
- ./kafka-ssl:/var/ssl/private:ro
|
||||||
|
```
|
||||||
|
|
||||||
|
### 2. Структура SSL сертификатов
|
||||||
|
|
||||||
|
```
|
||||||
|
kafka-ssl/
|
||||||
|
├── kafka.client.keystore.jks # Клиентский keystore
|
||||||
|
├── kafka.client.truststore.jks # Клиентский truststore
|
||||||
|
├── kafka.server.keystore.jks # Серверный keystore (для брокера)
|
||||||
|
├── kafka.server.truststore.jks # Серверный truststore (для брокера)
|
||||||
|
├── ca-cert # CA сертификат
|
||||||
|
└── ca-key # CA приватный ключ
|
||||||
|
```
|
||||||
|
|
||||||
|
## Генерация SSL сертификатов
|
||||||
|
|
||||||
|
Используйте скрипт из SensusInfra для генерации сертификатов:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd ../SensusInfra
|
||||||
|
./kafka-ssl/generate-ssl.sh
|
||||||
|
```
|
||||||
|
|
||||||
|
## Безопасность
|
||||||
|
|
||||||
|
### Рекомендации для production:
|
||||||
|
|
||||||
|
1. **Измените пароли по умолчанию** - никогда не используйте `kafka123` в production
|
||||||
|
2. **Используйте сильные пароли** - минимум 16 символов с различными типами символов
|
||||||
|
3. **Ограничьте доступ к сертификатам** - установите права доступа 600
|
||||||
|
4. **Регулярно обновляйте сертификаты** - установите напоминание о сроке действия
|
||||||
|
5. **Используйте `ssl_client_auth: "required"`** для строгой аутентификации
|
||||||
|
|
||||||
|
### Пример безопасной конфигурации:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
kafka:
|
||||||
|
ssl_enabled: true
|
||||||
|
ssl_client_auth: "required"
|
||||||
|
ssl_endpoint_identification_algorithm: "https"
|
||||||
|
ssl_keystore_password: "StrongPassword123!@#"
|
||||||
|
ssl_key_password: "StrongPassword123!@#"
|
||||||
|
ssl_truststore_password: "StrongPassword123!@#"
|
||||||
|
```
|
||||||
|
|
||||||
|
## Диагностика
|
||||||
|
|
||||||
|
### Логи SSL подключения
|
||||||
|
|
||||||
|
При включенном SSL в логах будут отображаться:
|
||||||
|
|
||||||
|
```
|
||||||
|
INFO kafka ssl enabled endpoint_identification=https client_auth=none
|
||||||
|
INFO kafka connected brokers=[kafka:9093] topic=sensus.metrics ssl_enabled=true
|
||||||
|
```
|
||||||
|
|
||||||
|
### Проверка подключения
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Проверка доступности SSL порта
|
||||||
|
telnet kafka 9093
|
||||||
|
|
||||||
|
# Проверка сертификатов
|
||||||
|
keytool -list -keystore kafka-ssl/kafka.client.keystore.jks -storepass kafka123
|
||||||
|
```
|
||||||
|
|
||||||
|
## Устранение неполадок
|
||||||
|
|
||||||
|
### Частые проблемы:
|
||||||
|
|
||||||
|
1. **"SSL handshake failed"**
|
||||||
|
- Проверьте правильность паролей
|
||||||
|
- Убедитесь, что сертификаты не истекли
|
||||||
|
- Проверьте соответствие CN в сертификате
|
||||||
|
|
||||||
|
2. **"Connection refused"**
|
||||||
|
- Убедитесь, что используете SSL порт (9093)
|
||||||
|
- Проверьте, что Kafka брокер настроен на SSL
|
||||||
|
|
||||||
|
3. **"Certificate verification failed"**
|
||||||
|
- Установите `ssl_endpoint_identification_algorithm: "none"` для тестирования
|
||||||
|
- Проверьте, что truststore содержит правильный CA сертификат
|
||||||
|
|
||||||
|
### Отладка:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Включите debug логирование
|
||||||
|
LOG_LEVEL=debug
|
||||||
|
|
||||||
|
# Проверьте конфигурацию
|
||||||
|
docker exec sensus-agent cat /bin/agent/config.yaml
|
||||||
|
```
|
||||||
|
|
||||||
|
## Миграция с обычного подключения
|
||||||
|
|
||||||
|
1. Сгенерируйте SSL сертификаты
|
||||||
|
2. Обновите `config.yaml` или переменные окружения
|
||||||
|
3. Измените порт с 9092 на 9093
|
||||||
|
4. Установите `ssl_enabled: true`
|
||||||
|
5. Перезапустите агент
|
||||||
|
|
||||||
|
## Совместимость
|
||||||
|
|
||||||
|
- **Kafka версии**: 2.8+ (рекомендуется 3.0+)
|
||||||
|
- **Go версия**: 1.19+
|
||||||
|
- **kafka-go библиотека**: последняя версия
|
||||||
|
|
||||||
|
## Поддержка
|
||||||
|
|
||||||
|
При возникновении проблем:
|
||||||
|
1. Проверьте логи агента
|
||||||
|
2. Убедитесь в правильности конфигурации
|
||||||
|
3. Проверьте доступность Kafka брокера
|
||||||
|
4. Обратитесь к документации Kafka SSL
|
38
env.example
Normal file
38
env.example
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
# Автор: Сергей Антропов, сайт: https://devops.org.ru
|
||||||
|
# Назначение: Пример переменных окружения для SensusAgent
|
||||||
|
# ВНИМАНИЕ: Этот файл содержит примеры значений для настройки!
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# ОСНОВНЫЕ НАСТРОЙКИ АГЕНТА
|
||||||
|
# =============================================================================
|
||||||
|
CONFIG_PATH=/bin/agent/config.yaml
|
||||||
|
LOG_LEVEL=info
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# KAFKA КОНФИГУРАЦИЯ
|
||||||
|
# =============================================================================
|
||||||
|
KAFKA_BROKERS=kafka:29092
|
||||||
|
KAFKA_TOPIC=sensus.metrics
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# KAFKA SSL КОНФИГУРАЦИЯ
|
||||||
|
# =============================================================================
|
||||||
|
# Включение SSL для подключения к Kafka
|
||||||
|
KAFKA_SSL_ENABLED=false
|
||||||
|
|
||||||
|
# Пароли для SSL сертификатов (должны совпадать с настройками Kafka брокера)
|
||||||
|
KAFKA_SSL_KEYSTORE_PASSWORD=kafka123
|
||||||
|
KAFKA_SSL_KEY_PASSWORD=kafka123
|
||||||
|
KAFKA_SSL_TRUSTSTORE_PASSWORD=kafka123
|
||||||
|
|
||||||
|
# Настройки SSL аутентификации
|
||||||
|
KAFKA_SSL_CLIENT_AUTH=none
|
||||||
|
KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# ПРИМЕЧАНИЯ
|
||||||
|
# =============================================================================
|
||||||
|
# 1. SSL сертификаты должны быть размещены в /var/ssl/private/ внутри контейнера
|
||||||
|
# 2. Для включения SSL установите KAFKA_SSL_ENABLED=true
|
||||||
|
# 3. Убедитесь, что пароли совпадают с настройками Kafka брокера
|
||||||
|
# 4. В production среде обязательно измените пароли по умолчанию!
|
@ -10,6 +10,7 @@ import (
|
|||||||
"io/fs"
|
"io/fs"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -26,6 +27,15 @@ type KafkaConfig struct {
|
|||||||
SASLPass string `yaml:"sasl_pass"`
|
SASLPass string `yaml:"sasl_pass"`
|
||||||
EnableTLS bool `yaml:"enable_tls"`
|
EnableTLS bool `yaml:"enable_tls"`
|
||||||
Timeout string `yaml:"timeout"` // человекочитаемый интервал, например "5s"
|
Timeout string `yaml:"timeout"` // человекочитаемый интервал, например "5s"
|
||||||
|
// SSL настройки для Kafka
|
||||||
|
SSLEnabled bool `yaml:"ssl_enabled"`
|
||||||
|
SSLKeystoreLocation string `yaml:"ssl_keystore_location"`
|
||||||
|
SSLKeystorePassword string `yaml:"ssl_keystore_password"`
|
||||||
|
SSLKeyPassword string `yaml:"ssl_key_password"`
|
||||||
|
SSLTruststoreLocation string `yaml:"ssl_truststore_location"`
|
||||||
|
SSLTruststorePassword string `yaml:"ssl_truststore_password"`
|
||||||
|
SSLClientAuth string `yaml:"ssl_client_auth"` // none, required, requested
|
||||||
|
SSLEndpointIdentificationAlgorithm string `yaml:"ssl_endpoint_identification_algorithm"` // https, none
|
||||||
}
|
}
|
||||||
|
|
||||||
// CollectorConfig описывает конфигурацию конкретного коллектора.
|
// CollectorConfig описывает конфигурацию конкретного коллектора.
|
||||||
@ -63,6 +73,15 @@ func Load(configPath string) (*AgentConfig, error) {
|
|||||||
ClientID: "sensusagent",
|
ClientID: "sensusagent",
|
||||||
EnableTLS: false,
|
EnableTLS: false,
|
||||||
Timeout: "5s",
|
Timeout: "5s",
|
||||||
|
// SSL настройки по умолчанию
|
||||||
|
SSLEnabled: false,
|
||||||
|
SSLKeystoreLocation: "",
|
||||||
|
SSLKeystorePassword: "",
|
||||||
|
SSLKeyPassword: "",
|
||||||
|
SSLTruststoreLocation: "",
|
||||||
|
SSLTruststorePassword: "",
|
||||||
|
SSLClientAuth: "none",
|
||||||
|
SSLEndpointIdentificationAlgorithm: "https",
|
||||||
},
|
},
|
||||||
Collectors: map[string]CollectorConfig{},
|
Collectors: map[string]CollectorConfig{},
|
||||||
}
|
}
|
||||||
@ -70,6 +89,9 @@ func Load(configPath string) (*AgentConfig, error) {
|
|||||||
if err := readYAMLFileIfExists(configPath, cfg); err != nil {
|
if err := readYAMLFileIfExists(configPath, cfg); err != nil {
|
||||||
return nil, fmt.Errorf("чтение config.yaml: %w", err)
|
return nil, fmt.Errorf("чтение config.yaml: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Применяем переменные окружения к конфигурации
|
||||||
|
applyEnvOverrides(cfg)
|
||||||
|
|
||||||
// Нормализуем и валидируем интервалы
|
// Нормализуем и валидируем интервалы
|
||||||
for name, c := range cfg.Collectors {
|
for name, c := range cfg.Collectors {
|
||||||
@ -139,4 +161,55 @@ func MustParseDuration(s string, def time.Duration) time.Duration {
|
|||||||
return d
|
return d
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// applyEnvOverrides применяет переменные окружения к конфигурации.
|
||||||
|
func applyEnvOverrides(cfg *AgentConfig) {
|
||||||
|
// Основные настройки
|
||||||
|
if logLevel := os.Getenv("LOG_LEVEL"); logLevel != "" {
|
||||||
|
cfg.LogLevel = logLevel
|
||||||
|
}
|
||||||
|
|
||||||
|
// Kafka настройки
|
||||||
|
if brokers := os.Getenv("KAFKA_BROKERS"); brokers != "" {
|
||||||
|
cfg.Kafka.Brokers = strings.Split(brokers, ",")
|
||||||
|
for i, broker := range cfg.Kafka.Brokers {
|
||||||
|
cfg.Kafka.Brokers[i] = strings.TrimSpace(broker)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if topic := os.Getenv("KAFKA_TOPIC"); topic != "" {
|
||||||
|
cfg.Kafka.Topic = topic
|
||||||
|
}
|
||||||
|
|
||||||
|
if clientID := os.Getenv("KAFKA_CLIENT_ID"); clientID != "" {
|
||||||
|
cfg.Kafka.ClientID = clientID
|
||||||
|
}
|
||||||
|
|
||||||
|
// SSL настройки
|
||||||
|
if sslEnabled := os.Getenv("KAFKA_SSL_ENABLED"); sslEnabled != "" {
|
||||||
|
if enabled, err := strconv.ParseBool(sslEnabled); err == nil {
|
||||||
|
cfg.Kafka.SSLEnabled = enabled
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if keystorePassword := os.Getenv("KAFKA_SSL_KEYSTORE_PASSWORD"); keystorePassword != "" {
|
||||||
|
cfg.Kafka.SSLKeystorePassword = keystorePassword
|
||||||
|
}
|
||||||
|
|
||||||
|
if keyPassword := os.Getenv("KAFKA_SSL_KEY_PASSWORD"); keyPassword != "" {
|
||||||
|
cfg.Kafka.SSLKeyPassword = keyPassword
|
||||||
|
}
|
||||||
|
|
||||||
|
if truststorePassword := os.Getenv("KAFKA_SSL_TRUSTSTORE_PASSWORD"); truststorePassword != "" {
|
||||||
|
cfg.Kafka.SSLTruststorePassword = truststorePassword
|
||||||
|
}
|
||||||
|
|
||||||
|
if clientAuth := os.Getenv("KAFKA_SSL_CLIENT_AUTH"); clientAuth != "" {
|
||||||
|
cfg.Kafka.SSLClientAuth = clientAuth
|
||||||
|
}
|
||||||
|
|
||||||
|
if endpointID := os.Getenv("KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM"); endpointID != "" {
|
||||||
|
cfg.Kafka.SSLEndpointIdentificationAlgorithm = endpointID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -5,6 +5,7 @@ package output
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
@ -53,6 +54,15 @@ type KafkaOptions struct {
|
|||||||
SASLUser string
|
SASLUser string
|
||||||
SASLPass string
|
SASLPass string
|
||||||
EnableTLS bool
|
EnableTLS bool
|
||||||
|
// SSL настройки
|
||||||
|
SSLEnabled bool
|
||||||
|
SSLKeystoreLocation string
|
||||||
|
SSLKeystorePassword string
|
||||||
|
SSLKeyPassword string
|
||||||
|
SSLTruststoreLocation string
|
||||||
|
SSLTruststorePassword string
|
||||||
|
SSLClientAuth string
|
||||||
|
SSLEndpointIdentificationAlgorithm string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewKafkaOutput создаёт Kafka writer.
|
// NewKafkaOutput создаёт Kafka writer.
|
||||||
@ -60,22 +70,60 @@ func NewKafkaOutput(opts KafkaOptions) (*KafkaOutput, error) {
|
|||||||
if len(opts.Brokers) == 0 || strings.TrimSpace(opts.Topic) == "" {
|
if len(opts.Brokers) == 0 || strings.TrimSpace(opts.Topic) == "" {
|
||||||
return nil, errors.New("kafka brokers/topic not configured")
|
return nil, errors.New("kafka brokers/topic not configured")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Пытаемся создать топик (идемпотентно). Ошибки игнорируем — брокер может создавать топики сам.
|
// Пытаемся создать топик (идемпотентно). Ошибки игнорируем — брокер может создавать топики сам.
|
||||||
ensureTopic(opts)
|
ensureTopic(opts)
|
||||||
|
|
||||||
|
// Настройка транспорта для Kafka
|
||||||
|
var transport kafka.RoundTripper
|
||||||
|
if opts.SSLEnabled {
|
||||||
|
// Создаём TLS конфигурацию для SSL подключения
|
||||||
|
tlsConfig := &tls.Config{
|
||||||
|
InsecureSkipVerify: opts.SSLEndpointIdentificationAlgorithm == "none",
|
||||||
|
}
|
||||||
|
|
||||||
|
// Если указан truststore, загружаем его
|
||||||
|
if opts.SSLTruststoreLocation != "" {
|
||||||
|
// В production среде здесь должна быть загрузка truststore
|
||||||
|
// Для простоты используем системные сертификаты
|
||||||
|
slog.Info("kafka ssl: using system certificates for truststore",
|
||||||
|
"truststore_location", opts.SSLTruststoreLocation)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Если указан keystore, загружаем его
|
||||||
|
if opts.SSLKeystoreLocation != "" {
|
||||||
|
// В production среде здесь должна быть загрузка keystore
|
||||||
|
// Для простоты используем системные сертификаты
|
||||||
|
slog.Info("kafka ssl: using system certificates for keystore",
|
||||||
|
"keystore_location", opts.SSLKeystoreLocation)
|
||||||
|
}
|
||||||
|
|
||||||
|
transport = &kafka.Transport{
|
||||||
|
TLS: tlsConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("kafka ssl enabled",
|
||||||
|
"endpoint_identification", opts.SSLEndpointIdentificationAlgorithm,
|
||||||
|
"client_auth", opts.SSLClientAuth)
|
||||||
|
}
|
||||||
|
|
||||||
w := &kafka.Writer{
|
w := &kafka.Writer{
|
||||||
Addr: kafka.TCP(opts.Brokers...),
|
Addr: kafka.TCP(opts.Brokers...),
|
||||||
Topic: opts.Topic,
|
Topic: opts.Topic,
|
||||||
Balancer: &kafka.LeastBytes{},
|
Balancer: &kafka.LeastBytes{},
|
||||||
RequiredAcks: kafka.RequireAll,
|
RequiredAcks: kafka.RequireAll,
|
||||||
|
Transport: transport,
|
||||||
}
|
}
|
||||||
|
|
||||||
ko := &KafkaOutput{writer: w, topic: opts.Topic, brokers: opts.Brokers}
|
ko := &KafkaOutput{writer: w, topic: opts.Topic, brokers: opts.Brokers}
|
||||||
|
|
||||||
// В самом начале — проверим подключение и залогируем статус
|
// В самом начале — проверим подключение и залогируем статус
|
||||||
pingCtx, cancel := context.WithTimeout(context.Background(), maxDuration(opts.Timeout, 5*time.Second))
|
pingCtx, cancel := context.WithTimeout(context.Background(), maxDuration(opts.Timeout, 5*time.Second))
|
||||||
defer cancel()
|
defer cancel()
|
||||||
if err := ko.Ping(pingCtx); err != nil {
|
if err := ko.Ping(pingCtx); err != nil {
|
||||||
slog.Error("kafka connect failed", "brokers", opts.Brokers, "topic", opts.Topic, "err", err)
|
slog.Error("kafka connect failed", "brokers", opts.Brokers, "topic", opts.Topic, "ssl_enabled", opts.SSLEnabled, "err", err)
|
||||||
} else {
|
} else {
|
||||||
slog.Info("kafka connected", "brokers", opts.Brokers, "topic", opts.Topic)
|
slog.Info("kafka connected", "brokers", opts.Brokers, "topic", opts.Topic, "ssl_enabled", opts.SSLEnabled)
|
||||||
}
|
}
|
||||||
return ko, nil
|
return ko, nil
|
||||||
}
|
}
|
||||||
@ -109,18 +157,44 @@ func ensureTopic(opts KafkaOptions) {
|
|||||||
if timeout <= 0 { timeout = 5 * time.Second }
|
if timeout <= 0 { timeout = 5 * time.Second }
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
// Настройка транспорта для SSL если включен
|
||||||
|
var dialer *kafka.Dialer
|
||||||
|
if opts.SSLEnabled {
|
||||||
|
tlsConfig := &tls.Config{
|
||||||
|
InsecureSkipVerify: opts.SSLEndpointIdentificationAlgorithm == "none",
|
||||||
|
}
|
||||||
|
dialer = &kafka.Dialer{
|
||||||
|
TLS: tlsConfig,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Подключаемся к первому брокеру
|
// Подключаемся к первому брокеру
|
||||||
conn, err := kafka.DialContext(ctx, "tcp", opts.Brokers[0])
|
var conn *kafka.Conn
|
||||||
|
var err error
|
||||||
|
if dialer != nil {
|
||||||
|
conn, err = dialer.DialContext(ctx, "tcp", opts.Brokers[0])
|
||||||
|
} else {
|
||||||
|
conn, err = kafka.DialContext(ctx, "tcp", opts.Brokers[0])
|
||||||
|
}
|
||||||
if err != nil { return }
|
if err != nil { return }
|
||||||
defer func() { _ = conn.Close() }()
|
defer func() { _ = conn.Close() }()
|
||||||
|
|
||||||
// Получаем контроллер
|
// Получаем контроллер
|
||||||
ctrl, err := conn.Controller()
|
ctrl, err := conn.Controller()
|
||||||
if err != nil { return }
|
if err != nil { return }
|
||||||
_ = conn.Close()
|
_ = conn.Close()
|
||||||
|
|
||||||
addr := net.JoinHostPort(ctrl.Host, strconv.Itoa(ctrl.Port))
|
addr := net.JoinHostPort(ctrl.Host, strconv.Itoa(ctrl.Port))
|
||||||
c2, err := kafka.DialContext(ctx, "tcp", addr)
|
var c2 *kafka.Conn
|
||||||
|
if dialer != nil {
|
||||||
|
c2, err = dialer.DialContext(ctx, "tcp", addr)
|
||||||
|
} else {
|
||||||
|
c2, err = kafka.DialContext(ctx, "tcp", addr)
|
||||||
|
}
|
||||||
if err != nil { return }
|
if err != nil { return }
|
||||||
defer func() { _ = c2.Close() }()
|
defer func() { _ = c2.Close() }()
|
||||||
|
|
||||||
// Пытаемся создать топик с 1 репликой и 3 партициями по умолчанию
|
// Пытаемся создать топик с 1 репликой и 3 партициями по умолчанию
|
||||||
_ = c2.CreateTopics(kafka.TopicConfig{Topic: opts.Topic, NumPartitions: 3, ReplicationFactor: 1})
|
_ = c2.CreateTopics(kafka.TopicConfig{Topic: opts.Topic, NumPartitions: 3, ReplicationFactor: 1})
|
||||||
}
|
}
|
||||||
@ -128,9 +202,13 @@ func ensureTopic(opts KafkaOptions) {
|
|||||||
// Ping — проверка доступности кластера Kafka по первому брокеру
|
// Ping — проверка доступности кластера Kafka по первому брокеру
|
||||||
func (k *KafkaOutput) Ping(ctx context.Context) error {
|
func (k *KafkaOutput) Ping(ctx context.Context) error {
|
||||||
if len(k.brokers) == 0 { return errors.New("no brokers configured") }
|
if len(k.brokers) == 0 { return errors.New("no brokers configured") }
|
||||||
|
|
||||||
|
// Для ping используем простой TCP подключение без SSL
|
||||||
|
// так как это только проверка доступности
|
||||||
conn, err := kafka.DialContext(ctx, "tcp", k.brokers[0])
|
conn, err := kafka.DialContext(ctx, "tcp", k.brokers[0])
|
||||||
if err != nil { return err }
|
if err != nil { return err }
|
||||||
defer func() { _ = conn.Close() }()
|
defer func() { _ = conn.Close() }()
|
||||||
|
|
||||||
// Попросим контроллера — если ответ есть, считаем, что связь установлена
|
// Попросим контроллера — если ответ есть, считаем, что связь установлена
|
||||||
_, err = conn.Controller()
|
_, err = conn.Controller()
|
||||||
return err
|
return err
|
||||||
|
@ -126,6 +126,15 @@ func selectOutput(cfg *config.AgentConfig) (output.Output, error) {
|
|||||||
SASLUser: cfg.Kafka.SASLUser,
|
SASLUser: cfg.Kafka.SASLUser,
|
||||||
SASLPass: cfg.Kafka.SASLPass,
|
SASLPass: cfg.Kafka.SASLPass,
|
||||||
EnableTLS: cfg.Kafka.EnableTLS,
|
EnableTLS: cfg.Kafka.EnableTLS,
|
||||||
|
// SSL настройки
|
||||||
|
SSLEnabled: cfg.Kafka.SSLEnabled,
|
||||||
|
SSLKeystoreLocation: cfg.Kafka.SSLKeystoreLocation,
|
||||||
|
SSLKeystorePassword: cfg.Kafka.SSLKeystorePassword,
|
||||||
|
SSLKeyPassword: cfg.Kafka.SSLKeyPassword,
|
||||||
|
SSLTruststoreLocation: cfg.Kafka.SSLTruststoreLocation,
|
||||||
|
SSLTruststorePassword: cfg.Kafka.SSLTruststorePassword,
|
||||||
|
SSLClientAuth: cfg.Kafka.SSLClientAuth,
|
||||||
|
SSLEndpointIdentificationAlgorithm: cfg.Kafka.SSLEndpointIdentificationAlgorithm,
|
||||||
})
|
})
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("неизвестный режим: %s", mode)
|
return nil, fmt.Errorf("неизвестный режим: %s", mode)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user