feat(kafka): авто-создание топика в writer; Makefile targets kafka-once/kafka-consume; config points to 10.99.0.90

This commit is contained in:
Sergey Antropoff 2025-09-08 18:25:08 +03:00
parent 927a30fd06
commit 838dc4fc00
4 changed files with 70 additions and 17 deletions

View File

@ -2,6 +2,7 @@ SHELL := /bin/sh
PROJECT_NAME := agent
.PHONY: build build-linux build-darwin build-windows run agent test lint docker-clean collectors collectors-darwin collectors-linux collectors-windows
.PHONY: kafka-topic kafka-once kafka-consume
build:
# Платформозависимая сборка агента (как у collectors)
@ -136,3 +137,27 @@ lint:
docker-clean:
docker rmi -f $(PROJECT_NAME):build || true
# Вспомогательное: создать Kafka-топик (через kafkacat/kcat в контейнере)
kafka-topic:
@docker run --rm edenhill/kcat:1.7.1 -b 10.99.0.90:9092 -L | cat >/dev/null 2>&1 || true
@echo "Создание топика sensus.metrics (если нет) через kcat"; \
docker run --rm edenhill/kcat:1.7.1 -b 10.99.0.90:9092 -X allow.auto.create.topics=true -L | cat
# Одноразовый запуск агента в режиме Kafka
kafka-once:
# Одноразовая отправка метрик в Kafka
@if [ "$$(/usr/bin/uname -s)" = "Darwin" ]; then \
mkdir -p .cache/go-build .cache/go-mod; \
docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=darwin -e GOARCH=$$(/usr/bin/uname -m | sed 's/x86_64/amd64/; s/arm64/arm64/') -e GOCACHE=/workspace/.cache/go-build -e GOMODCACHE=/workspace/.cache/go-mod golang:1.22 \
sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/agent-darwin ./src"; \
CONFIG_PATH=./bin/agent/config.yaml LOG_LEVEL=info ./bin/agent/agent-darwin --once --mode kafka || true; \
else \
mkdir -p .cache/go-build .cache/go-mod; \
docker run --rm -v $$PWD:/workspace -w /workspace -e GOCACHE=/workspace/.cache/go-build -e GOMODCACHE=/workspace/.cache/go-mod golang:1.22 \
sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -trimpath -o /tmp/agent ./src >/dev/null 2>&1 && CONFIG_PATH=./bin/agent/config.yaml LOG_LEVEL=info /tmp/agent --once --mode kafka"; \
fi
# Прочитать последние 5 сообщений из топика sensus.metrics
kafka-consume:
@docker run --rm edenhill/kcat:1.7.1 -b 10.99.0.90:9092 -t sensus.metrics -o -5 -C -q -J | cat

View File

@ -5,8 +5,8 @@ mode: auto # stdout | kafka | auto
log_level: info
kafka:
enabled: false
brokers: ["kafka:9092"]
enabled: true
brokers: ["10.99.0.90:9092"]
topic: "sensus.metrics"
client_id: "sensusagent"
enable_tls: false
@ -17,15 +17,15 @@ collectors:
enabled: true
type: exec
key: system
interval: "30s"
timeout: "20s"
interval: "3600s"
timeout: "30s"
exec: "./collectors/system"
platforms: [linux]
uptime:
enabled: false
type: exec
key: uptime
interval: "10s"
interval: "600s"
timeout: "5s"
exec: "./collectors/uptime"
platforms: [darwin, linux, windows]
@ -49,40 +49,40 @@ collectors:
enabled: true
type: exec
key: hba
interval: "60s"
timeout: "10s"
interval: "3600s"
timeout: "20s"
exec: "./collectors/hba"
platforms: [linux]
sensors:
enabled: true
type: exec
key: sensors
interval: "30s"
timeout: "8s"
interval: "3600s"
timeout: "20s"
exec: "./collectors/sensors"
platforms: [linux]
docker:
enabled: true
type: exec
key: docker
interval: "30s"
timeout: "20s"
interval: "36000s"
timeout: "60s"
exec: "./collectors/docker"
platforms: [darwin, linux]
gpu:
enabled: true
type: exec
key: gpu
interval: "60s"
timeout: "30s"
interval: "3600s"
timeout: "60s"
exec: "./collectors/gpu"
platforms: [linux]
kubernetes:
enabled: false
enabled: true
type: exec
key: kubernetes
interval: "60s"
timeout: "12s"
interval: "3600s"
timeout: "60s"
exec: "./collectors/kubernetes"
platforms: [linux]

View File

@ -5,7 +5,7 @@ After=network.target
[Service]
Type=simple
Environment=CONFIG_PATH=/opt/sensusagent/config.yaml
ExecStart=/opt/sensusagent/agent --mode stdout
ExecStart=/opt/sensusagent/agent --mode kafka
Restart=on-failure
RestartSec=3
User=nobody

View File

@ -12,6 +12,8 @@ import (
"time"
"github.com/segmentio/kafka-go"
"net"
"strconv"
)
// Payload — общий JSON для вывода.
@ -56,6 +58,8 @@ func NewKafkaOutput(opts KafkaOptions) (*KafkaOutput, error) {
if len(opts.Brokers) == 0 || strings.TrimSpace(opts.Topic) == "" {
return nil, errors.New("kafka brokers/topic not configured")
}
// Пытаемся создать топик (идемпотентно). Ошибки игнорируем — брокер может создавать топики сам.
ensureTopic(opts)
w := &kafka.Writer{
Addr: kafka.TCP(opts.Brokers...),
Topic: opts.Topic,
@ -81,4 +85,28 @@ func (k *KafkaOutput) Close(ctx context.Context) error {
return nil
}
// ensureTopic — пробует создать топик через контроллер кластера Kafka.
// Безопасно к многократному вызову. В случае ошибок ничего не делает.
func ensureTopic(opts KafkaOptions) {
if len(opts.Brokers) == 0 || strings.TrimSpace(opts.Topic) == "" { return }
timeout := opts.Timeout
if timeout <= 0 { timeout = 5 * time.Second }
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// Подключаемся к первому брокеру
conn, err := kafka.DialContext(ctx, "tcp", opts.Brokers[0])
if err != nil { return }
defer func() { _ = conn.Close() }()
// Получаем контроллер
ctrl, err := conn.Controller()
if err != nil { return }
_ = conn.Close()
addr := net.JoinHostPort(ctrl.Host, strconv.Itoa(ctrl.Port))
c2, err := kafka.DialContext(ctx, "tcp", addr)
if err != nil { return }
defer func() { _ = c2.Close() }()
// Пытаемся создать топик с 1 репликой и 3 партициями по умолчанию
_ = c2.CreateTopics(kafka.TopicConfig{Topic: opts.Topic, NumPartitions: 3, ReplicationFactor: 1})
}