diff --git a/Makefile b/Makefile index 600c0b5..95ede8c 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,7 @@ SHELL := /bin/sh PROJECT_NAME := agent .PHONY: build build-linux build-darwin build-windows run agent test lint docker-clean collectors collectors-darwin collectors-linux collectors-windows +.PHONY: kafka-topic kafka-once kafka-consume build: # Платформозависимая сборка агента (как у collectors) @@ -136,3 +137,27 @@ lint: docker-clean: docker rmi -f $(PROJECT_NAME):build || true +# Вспомогательное: создать Kafka-топик (через kafkacat/kcat в контейнере) +kafka-topic: + @docker run --rm edenhill/kcat:1.7.1 -b 10.99.0.90:9092 -L | cat >/dev/null 2>&1 || true + @echo "Создание топика sensus.metrics (если нет) через kcat"; \ + docker run --rm edenhill/kcat:1.7.1 -b 10.99.0.90:9092 -X allow.auto.create.topics=true -L | cat + +# Одноразовый запуск агента в режиме Kafka +kafka-once: + # Одноразовая отправка метрик в Kafka + @if [ "$$(/usr/bin/uname -s)" = "Darwin" ]; then \ + mkdir -p .cache/go-build .cache/go-mod; \ + docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=darwin -e GOARCH=$$(/usr/bin/uname -m | sed 's/x86_64/amd64/; s/arm64/arm64/') -e GOCACHE=/workspace/.cache/go-build -e GOMODCACHE=/workspace/.cache/go-mod golang:1.22 \ + sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/agent-darwin ./src"; \ + CONFIG_PATH=./bin/agent/config.yaml LOG_LEVEL=info ./bin/agent/agent-darwin --once --mode kafka || true; \ + else \ + mkdir -p .cache/go-build .cache/go-mod; \ + docker run --rm -v $$PWD:/workspace -w /workspace -e GOCACHE=/workspace/.cache/go-build -e GOMODCACHE=/workspace/.cache/go-mod golang:1.22 \ + sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -trimpath -o /tmp/agent ./src >/dev/null 2>&1 && CONFIG_PATH=./bin/agent/config.yaml LOG_LEVEL=info /tmp/agent --once --mode kafka"; \ + fi + +# Прочитать последние 5 сообщений из топика sensus.metrics +kafka-consume: + @docker run --rm edenhill/kcat:1.7.1 -b 10.99.0.90:9092 -t sensus.metrics -o -5 -C -q -J | cat + diff --git a/bin/agent/config.yaml b/bin/agent/config.yaml index 59ad16a..935c4c3 100644 --- a/bin/agent/config.yaml +++ b/bin/agent/config.yaml @@ -5,8 +5,8 @@ mode: auto # stdout | kafka | auto log_level: info kafka: - enabled: false - brokers: ["kafka:9092"] + enabled: true + brokers: ["10.99.0.90:9092"] topic: "sensus.metrics" client_id: "sensusagent" enable_tls: false @@ -17,15 +17,15 @@ collectors: enabled: true type: exec key: system - interval: "30s" - timeout: "20s" + interval: "3600s" + timeout: "30s" exec: "./collectors/system" platforms: [linux] uptime: enabled: false type: exec key: uptime - interval: "10s" + interval: "600s" timeout: "5s" exec: "./collectors/uptime" platforms: [darwin, linux, windows] @@ -49,40 +49,40 @@ collectors: enabled: true type: exec key: hba - interval: "60s" - timeout: "10s" + interval: "3600s" + timeout: "20s" exec: "./collectors/hba" platforms: [linux] sensors: enabled: true type: exec key: sensors - interval: "30s" - timeout: "8s" + interval: "3600s" + timeout: "20s" exec: "./collectors/sensors" platforms: [linux] docker: enabled: true type: exec key: docker - interval: "30s" - timeout: "20s" + interval: "36000s" + timeout: "60s" exec: "./collectors/docker" platforms: [darwin, linux] gpu: enabled: true type: exec key: gpu - interval: "60s" - timeout: "30s" + interval: "3600s" + timeout: "60s" exec: "./collectors/gpu" platforms: [linux] kubernetes: - enabled: false + enabled: true type: exec key: kubernetes - interval: "60s" - timeout: "12s" + interval: "3600s" + timeout: "60s" exec: "./collectors/kubernetes" platforms: [linux] diff --git a/runner/sensusagent.service b/runner/sensusagent.service index 79ba7eb..439026e 100644 --- a/runner/sensusagent.service +++ b/runner/sensusagent.service @@ -5,7 +5,7 @@ After=network.target [Service] Type=simple Environment=CONFIG_PATH=/opt/sensusagent/config.yaml -ExecStart=/opt/sensusagent/agent --mode stdout +ExecStart=/opt/sensusagent/agent --mode kafka Restart=on-failure RestartSec=3 User=nobody diff --git a/src/core/output/output.go b/src/core/output/output.go index 77dfa9e..df890ae 100644 --- a/src/core/output/output.go +++ b/src/core/output/output.go @@ -12,6 +12,8 @@ import ( "time" "github.com/segmentio/kafka-go" + "net" + "strconv" ) // Payload — общий JSON для вывода. @@ -56,6 +58,8 @@ func NewKafkaOutput(opts KafkaOptions) (*KafkaOutput, error) { if len(opts.Brokers) == 0 || strings.TrimSpace(opts.Topic) == "" { return nil, errors.New("kafka brokers/topic not configured") } + // Пытаемся создать топик (идемпотентно). Ошибки игнорируем — брокер может создавать топики сам. + ensureTopic(opts) w := &kafka.Writer{ Addr: kafka.TCP(opts.Brokers...), Topic: opts.Topic, @@ -81,4 +85,28 @@ func (k *KafkaOutput) Close(ctx context.Context) error { return nil } +// ensureTopic — пробует создать топик через контроллер кластера Kafka. +// Безопасно к многократному вызову. В случае ошибок ничего не делает. +func ensureTopic(opts KafkaOptions) { + if len(opts.Brokers) == 0 || strings.TrimSpace(opts.Topic) == "" { return } + timeout := opts.Timeout + if timeout <= 0 { timeout = 5 * time.Second } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + // Подключаемся к первому брокеру + conn, err := kafka.DialContext(ctx, "tcp", opts.Brokers[0]) + if err != nil { return } + defer func() { _ = conn.Close() }() + // Получаем контроллер + ctrl, err := conn.Controller() + if err != nil { return } + _ = conn.Close() + addr := net.JoinHostPort(ctrl.Host, strconv.Itoa(ctrl.Port)) + c2, err := kafka.DialContext(ctx, "tcp", addr) + if err != nil { return } + defer func() { _ = c2.Close() }() + // Пытаемся создать топик с 1 репликой и 3 партициями по умолчанию + _ = c2.CreateTopics(kafka.TopicConfig{Topic: opts.Topic, NumPartitions: 3, ReplicationFactor: 1}) +} +