feat(kafka-logs): логируем статус подключения при старте и успешные отправки сообщений
This commit is contained in:
parent
838dc4fc00
commit
51fdd67831
2
Makefile
2
Makefile
@ -43,7 +43,7 @@ collectors:
|
||||
ARCH=$$(/usr/bin/uname -m | sed 's/x86_64/amd64/; s/arm64/arm64/'); \
|
||||
docker run --rm -v $$PWD:/workspace -w /workspace \
|
||||
-e GOOS=darwin -e GOARCH=$$ARCH -e GOCACHE=/workspace/.cache/go-build -e GOMODCACHE=/workspace/.cache/go-mod golang:1.22 \
|
||||
sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/uptime ./src/collectors/uptime && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/macos ./src/collectors/macos"; \
|
||||
sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/uptime ./src/collectors/uptime && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/macos ./src/collectors/macos && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/docker ./src/collectors/docker"; \
|
||||
else \
|
||||
docker run --rm -v $$PWD:/workspace -w /workspace \
|
||||
-e GOOS=linux -e GOARCH=amd64 -e GOCACHE=/workspace/.cache/go-build -e GOMODCACHE=/workspace/.cache/go-mod golang:1.22 \
|
||||
|
@ -1,4 +1,4 @@
|
||||
//go:build !linux
|
||||
//go:build !linux && !darwin
|
||||
|
||||
package main
|
||||
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
@ -40,6 +41,7 @@ func (s *StdoutOutput) Close(_ context.Context) error { return nil }
|
||||
type KafkaOutput struct {
|
||||
writer *kafka.Writer
|
||||
topic string
|
||||
brokers []string
|
||||
}
|
||||
|
||||
// KafkaOptions описывает параметры подключения к Kafka.
|
||||
@ -66,7 +68,16 @@ func NewKafkaOutput(opts KafkaOptions) (*KafkaOutput, error) {
|
||||
Balancer: &kafka.LeastBytes{},
|
||||
RequiredAcks: kafka.RequireAll,
|
||||
}
|
||||
return &KafkaOutput{writer: w, topic: opts.Topic}, nil
|
||||
ko := &KafkaOutput{writer: w, topic: opts.Topic, brokers: opts.Brokers}
|
||||
// В самом начале — проверим подключение и залогируем статус
|
||||
pingCtx, cancel := context.WithTimeout(context.Background(), maxDuration(opts.Timeout, 5*time.Second))
|
||||
defer cancel()
|
||||
if err := ko.Ping(pingCtx); err != nil {
|
||||
slog.Error("kafka connect failed", "brokers", opts.Brokers, "topic", opts.Topic, "err", err)
|
||||
} else {
|
||||
slog.Info("kafka connected", "brokers", opts.Brokers, "topic", opts.Topic)
|
||||
}
|
||||
return ko, nil
|
||||
}
|
||||
|
||||
func (k *KafkaOutput) Write(ctx context.Context, data Payload) error {
|
||||
@ -75,7 +86,12 @@ func (k *KafkaOutput) Write(ctx context.Context, data Payload) error {
|
||||
return err
|
||||
}
|
||||
msg := kafka.Message{Value: b, Time: time.Now()}
|
||||
return k.writer.WriteMessages(ctx, msg)
|
||||
if err := k.writer.WriteMessages(ctx, msg); err != nil {
|
||||
return err
|
||||
}
|
||||
// Логируем успешную отправку
|
||||
slog.Info("kafka message sent", "topic", k.topic, "bytes", len(b))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *KafkaOutput) Close(ctx context.Context) error {
|
||||
@ -109,4 +125,17 @@ func ensureTopic(opts KafkaOptions) {
|
||||
_ = c2.CreateTopics(kafka.TopicConfig{Topic: opts.Topic, NumPartitions: 3, ReplicationFactor: 1})
|
||||
}
|
||||
|
||||
// Ping — проверка доступности кластера Kafka по первому брокеру
|
||||
func (k *KafkaOutput) Ping(ctx context.Context) error {
|
||||
if len(k.brokers) == 0 { return errors.New("no brokers configured") }
|
||||
conn, err := kafka.DialContext(ctx, "tcp", k.brokers[0])
|
||||
if err != nil { return err }
|
||||
defer func() { _ = conn.Close() }()
|
||||
// Попросим контроллера — если ответ есть, считаем, что связь установлена
|
||||
_, err = conn.Controller()
|
||||
return err
|
||||
}
|
||||
|
||||
func maxDuration(a, b time.Duration) time.Duration { if a > b { return a }; return b }
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user