package output // Автор: Сергей Антропов, сайт: https://devops.org.ru // Назначение: Вывод результатов — stdout и Kafka. import ( "context" "crypto/tls" "encoding/json" "errors" "log/slog" "os" "strings" "time" "github.com/segmentio/kafka-go" "net" "strconv" ) // Payload — общий JSON для вывода. type Payload map[string]any // Output — интерфейс механизма доставки результатов. type Output interface { Write(ctx context.Context, data Payload) error Close(ctx context.Context) error } // StdoutOutput — вывод в stdout. type StdoutOutput struct{} func (s *StdoutOutput) Write(_ context.Context, data Payload) error { // Извлекаем имена коллекторов из payload для логирования collectorNames := make([]string, 0, len(data)) for key := range data { collectorNames = append(collectorNames, key) } // Логируем вывод в stdout с информацией о коллекторах slog.Info("stdout output", "collectors", collectorNames) enc := json.NewEncoder(os.Stdout) enc.SetEscapeHTML(false) return enc.Encode(data) } func (s *StdoutOutput) Close(_ context.Context) error { return nil } // KafkaOutput — вывод в Kafka. type KafkaOutput struct { writer *kafka.Writer topic string brokers []string } // KafkaOptions описывает параметры подключения к Kafka. type KafkaOptions struct { Brokers []string Topic string ClientID string Timeout time.Duration SASLUser string SASLPass string EnableTLS bool // SSL настройки SSLEnabled bool SSLKeystoreLocation string SSLKeystorePassword string SSLKeyPassword string SSLTruststoreLocation string SSLTruststorePassword string SSLClientAuth string SSLEndpointIdentificationAlgorithm string } // NewKafkaOutput создаёт Kafka writer. func NewKafkaOutput(opts KafkaOptions) (*KafkaOutput, error) { if len(opts.Brokers) == 0 || strings.TrimSpace(opts.Topic) == "" { return nil, errors.New("kafka brokers/topic not configured") } // Пытаемся создать топик (идемпотентно). Ошибки игнорируем — брокер может создавать топики сам. ensureTopic(opts) // Настройка транспорта для Kafka var transport kafka.RoundTripper if opts.SSLEnabled { // Создаём TLS конфигурацию для SSL подключения tlsConfig := &tls.Config{ InsecureSkipVerify: opts.SSLEndpointIdentificationAlgorithm == "none", } // Если указан truststore, загружаем его if opts.SSLTruststoreLocation != "" { // В production среде здесь должна быть загрузка truststore // Для простоты используем системные сертификаты slog.Info("kafka ssl: using system certificates for truststore", "truststore_location", opts.SSLTruststoreLocation) } // Если указан keystore, загружаем его if opts.SSLKeystoreLocation != "" { // В production среде здесь должна быть загрузка keystore // Для простоты используем системные сертификаты slog.Info("kafka ssl: using system certificates for keystore", "keystore_location", opts.SSLKeystoreLocation) } transport = &kafka.Transport{ TLS: tlsConfig, } slog.Info("kafka ssl enabled", "endpoint_identification", opts.SSLEndpointIdentificationAlgorithm, "client_auth", opts.SSLClientAuth) } w := &kafka.Writer{ Addr: kafka.TCP(opts.Brokers...), Topic: opts.Topic, Balancer: &kafka.LeastBytes{}, RequiredAcks: kafka.RequireAll, Transport: transport, } ko := &KafkaOutput{writer: w, topic: opts.Topic, brokers: opts.Brokers} // В самом начале — проверим подключение и залогируем статус pingCtx, cancel := context.WithTimeout(context.Background(), maxDuration(opts.Timeout, 5*time.Second)) defer cancel() if err := ko.Ping(pingCtx); err != nil { slog.Error("kafka connect failed", "brokers", opts.Brokers, "topic", opts.Topic, "ssl_enabled", opts.SSLEnabled, "err", err) } else { slog.Info("kafka connected", "brokers", opts.Brokers, "topic", opts.Topic, "ssl_enabled", opts.SSLEnabled) } return ko, nil } func (k *KafkaOutput) Write(ctx context.Context, data Payload) error { b, err := json.Marshal(data) if err != nil { return err } msg := kafka.Message{Value: b, Time: time.Now()} if err := k.writer.WriteMessages(ctx, msg); err != nil { return err } // Извлекаем имена коллекторов из payload для логирования collectorNames := make([]string, 0, len(data)) for key := range data { collectorNames = append(collectorNames, key) } // Логируем успешную отправку с информацией о коллекторах slog.Info("kafka message sent", "topic", k.topic, "bytes", len(b), "collectors", collectorNames) return nil } func (k *KafkaOutput) Close(ctx context.Context) error { if k.writer != nil { return k.writer.Close() } return nil } // ensureTopic — пробует создать топик через контроллер кластера Kafka. // Безопасно к многократному вызову. В случае ошибок ничего не делает. func ensureTopic(opts KafkaOptions) { if len(opts.Brokers) == 0 || strings.TrimSpace(opts.Topic) == "" { return } timeout := opts.Timeout if timeout <= 0 { timeout = 5 * time.Second } ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() // Настройка транспорта для SSL если включен var dialer *kafka.Dialer if opts.SSLEnabled { tlsConfig := &tls.Config{ InsecureSkipVerify: opts.SSLEndpointIdentificationAlgorithm == "none", } dialer = &kafka.Dialer{ TLS: tlsConfig, } } // Подключаемся к первому брокеру var conn *kafka.Conn var err error if dialer != nil { conn, err = dialer.DialContext(ctx, "tcp", opts.Brokers[0]) } else { conn, err = kafka.DialContext(ctx, "tcp", opts.Brokers[0]) } if err != nil { return } defer func() { _ = conn.Close() }() // Получаем контроллер ctrl, err := conn.Controller() if err != nil { return } _ = conn.Close() addr := net.JoinHostPort(ctrl.Host, strconv.Itoa(ctrl.Port)) var c2 *kafka.Conn if dialer != nil { c2, err = dialer.DialContext(ctx, "tcp", addr) } else { c2, err = kafka.DialContext(ctx, "tcp", addr) } if err != nil { return } defer func() { _ = c2.Close() }() // Пытаемся создать топик с 1 репликой и 3 партициями по умолчанию _ = c2.CreateTopics(kafka.TopicConfig{Topic: opts.Topic, NumPartitions: 3, ReplicationFactor: 1}) } // Ping — проверка доступности кластера Kafka по первому брокеру func (k *KafkaOutput) Ping(ctx context.Context) error { if len(k.brokers) == 0 { return errors.New("no brokers configured") } // Для ping используем простой TCP подключение без SSL // так как это только проверка доступности conn, err := kafka.DialContext(ctx, "tcp", k.brokers[0]) if err != nil { return err } defer func() { _ = conn.Close() }() // Попросим контроллера — если ответ есть, считаем, что связь установлена _, err = conn.Controller() return err } func maxDuration(a, b time.Duration) time.Duration { if a > b { return a }; return b }