commit ad409950b8f342415a4eccb94b7fc98215ccf79f Author: Sergey Antropoff Date: Mon Sep 8 10:47:41 2025 +0300 Initial commit: SensusAgent core, collectors, build, docs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a369e91 --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +bin/agent/agent +bin/agent/agent-darwin +bin/agent/agent.exe +bin/agent/sensusagent +bin/agent/sensusagent-darwin +bin/agent/sensusagent.exe +bin/agent/collectors/* +!bin/agent/collectors/sample.sh + +go.sum +.idea/ +.vscode/ +.DS_Store diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..c1c209c --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,16 @@ +run: + timeout: 3m + tests: true +linters: + enable: + - govet + - gofmt + - gocyclo + - revive + - errcheck + - staticcheck +issues: + exclude-use-default: false + max-issues-per-linter: 0 + max-same-issues: 0 + diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..95abd4d --- /dev/null +++ b/Dockerfile @@ -0,0 +1,22 @@ +# syntax=docker/dockerfile:1 + +FROM golang:1.22 AS build +WORKDIR /src +COPY go.mod ./ +RUN go mod download +COPY . . +RUN go mod tidy && \ + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /bin/agent/sensusagent ./src && \ + mkdir -p /bin/agent/collectors && \ + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /bin/agent/collectors/uptime ./src/collectors/uptime && \ + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /bin/agent/collectors/macos ./src/collectors/macos + +FROM alpine:3.20 +WORKDIR / +COPY --from=build /bin/agent /bin/agent +RUN adduser -D -H -s /sbin/nologin agent && \ + chmod +x /bin/agent/agent && \ + chmod -R +x /bin/collectors || true +USER agent +ENTRYPOINT ["/bin/agent/sensusagent"] + diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..a8243f0 --- /dev/null +++ b/Makefile @@ -0,0 +1,100 @@ +SHELL := /bin/sh +PROJECT_NAME := agent + +.PHONY: build build-linux build-darwin build-windows run agent test lint docker-clean collectors collectors-darwin collectors-linux collectors-windows + +build: + # Платформозависимая сборка агента (как у collectors) + @mkdir -p ./bin/agent; \ + if [ "$$(/usr/bin/uname -s)" = "Darwin" ]; then \ + ARCH=$$(/usr/bin/uname -m | sed 's/x86_64/amd64/; s/arm64/arm64/'); \ + docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=darwin -e GOARCH=$$ARCH golang:1.22 \ + sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -o ./bin/agent/agent-darwin ./src"; \ + else \ + docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=linux -e GOARCH=amd64 golang:1.22 \ + sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -o ./bin/agent/agent ./src"; \ + fi + +build-darwin: + # Кросс-сборка darwin (arm64) бинаря агента + @mkdir -p ./bin/agent; \ + docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=darwin -e GOARCH=arm64 golang:1.22 \ + sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -o ./bin/agent/agent-darwin ./src" + +build-linux: + # Кросс-сборка linux (amd64) бинаря агента + @mkdir -p ./bin/agent; \ + docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=linux -e GOARCH=amd64 golang:1.22 \ + sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -o ./bin/agent/agent ./src" + +build-windows: + # Кросс-сборка windows (amd64) бинаря агента + @mkdir -p ./bin/agent; \ + docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=windows -e GOARCH=amd64 golang:1.22 \ + sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -o ./bin/agent/agent.exe ./src" + +collectors: + # Сборка коллекторов для текущей платформы (через Docker) + @mkdir -p ./bin/agent/collectors + @if [ "$$(/usr/bin/uname -s)" = "Darwin" ]; then \ + ARCH=$$(/usr/bin/uname -m | sed 's/x86_64/amd64/; s/arm64/arm64/'); \ + docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=darwin -e GOARCH=$$ARCH golang:1.22 \ + sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -o ./bin/agent/collectors/uptime ./src/collectors/uptime && CGO_ENABLED=0 go build -o ./bin/agent/collectors/macos ./src/collectors/macos"; \ + else \ + docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=linux -e GOARCH=amd64 golang:1.22 \ + sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -o ./bin/agent/collectors/uptime ./src/collectors/uptime && CGO_ENABLED=0 go build -o ./bin/agent/collectors/macos ./src/collectors/macos"; \ + fi + +collectors-darwin: + # Кросс-сборка коллекторов для macOS + @mkdir -p ./bin/agent/collectors + docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=darwin -e GOARCH=arm64 golang:1.22 \ + sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -o ./bin/agent/collectors/uptime-darwin ./src/collectors/uptime && CGO_ENABLED=0 go build -o ./bin/agent/collectors/macos-darwin ./src/collectors/macos" + +collectors-linux: + # Кросс-сборка коллекторов для Linux + @mkdir -p ./bin/agent/collectors + docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=linux -e GOARCH=amd64 golang:1.22 \ + sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -o ./bin/agent/collectors/uptime-linux ./src/collectors/uptime && CGO_ENABLED=0 go build -o ./bin/agent/collectors/macos-linux ./src/collectors/macos" + +collectors-windows: + # Кросс-сборка коллекторов для Windows + @mkdir -p ./bin/agent/collectors + docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=windows -e GOARCH=amd64 golang:1.22 \ + sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -o ./bin/agent/collectors/uptime-windows.exe ./src/collectors/uptime && CGO_ENABLED=0 go build -o ./bin/agent/collectors/macos-windows.exe ./src/collectors/macos" + +run: collectors + # На macOS собираем darwin-бинарь в Docker и запускаем на хосте (для доступа к macOS API); + # на других ОС — полностью в контейнере Linux. + @if [ "$$(/usr/bin/uname -s)" = "Darwin" ]; then \ + mkdir -p ./bin/agent; \ + ARCH=$$(/usr/bin/uname -m | sed 's/x86_64/amd64/; s/arm64/arm64/'); \ + docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=darwin -e GOARCH=$$ARCH golang:1.22 \ + sh -c "go mod tidy >/dev/null 2>&1 && go build -o /workspace/bin/agent/agent-darwin ./src >/dev/null 2>&1"; \ + LOG_LEVEL=error CONFIG_PATH=./bin/agent/config.yaml ./bin/agent/agent-darwin --once --mode stdout | docker run --rm -i ghcr.io/jqlang/jq:latest; \ + else \ + docker run --rm -v $$PWD:/workspace -w /workspace golang:1.22 \ + sh -c "go mod tidy >/dev/null 2>&1 && go build -o /tmp/agent ./src >/dev/null 2>&1 && LOG_LEVEL=error CONFIG_PATH=./bin/agent/config.yaml /tmp/agent --once --mode stdout" | docker run --rm -i ghcr.io/jqlang/jq:latest; \ + fi + +agent: collectors build + # Запуск агента с ВСЕМИ собранными коллекторами (не one-shot) + @if [ "$$(/usr/bin/uname -s)" = "Darwin" ]; then \ + LOG_LEVEL=info CONFIG_PATH=./bin/agent/config.yaml ./bin/agent/agent-darwin --mode stdout; \ + else \ + LOG_LEVEL=info CONFIG_PATH=./bin/agent/config.yaml ./bin/agent/agent --mode stdout; \ + fi + +test: + # Юнит-тесты в Docker без использования локальной машины + docker run --rm -v $$PWD:/workspace -w /workspace golang:1.22 \ + sh -c "go test ./..." + +lint: + # Линт кода через golangci-lint в Docker + docker run --rm -v $$PWD:/app -w /app golangci/golangci-lint:latest \ + golangci-lint run --timeout 3m + +docker-clean: + docker rmi -f $(PROJECT_NAME):build || true + diff --git a/README.md b/README.md new file mode 100644 index 0000000..2dae1f9 --- /dev/null +++ b/README.md @@ -0,0 +1,18 @@ +## SensusAgent + +Автор: Сергей Антропов, сайт: https://devops.org.ru + +Модульный агент мониторинга на Go 1.22+. Поддерживает встроенные и внешние (exec) коллекторы, вывод в stdout и Kafka, systemd unit. + +### Структура +- src/core — ядро (конфиг, логирование, реестр коллекторов, раннер, вывод) +- src/collectors — исходники Go-коллекторов (сборка в bin/agent/collectors) +- bin/agent — бинарник и конфиги + +### Сборка и запуск +Подробно см. `app/docs/README.md`. + +### Конфигурация +Все настройки, включая коллекторы, находятся в `bin/agent/config.yaml`. + + diff --git a/app/docs/README.md b/app/docs/README.md new file mode 100644 index 0000000..f8094e5 --- /dev/null +++ b/app/docs/README.md @@ -0,0 +1,14 @@ +## SensusAgent — документация + +Автор: Сергей Антропов, сайт: https://devops.org.ru + +SensusAgent — лёгкий модульный агент мониторинга (Go 1.22+), собирающий метрики через внешние коллектора (исполняемые файлы) и агрегирующий результат в JSON (stdout или Kafka). + +Содержание: +- Обзор архитектуры: см. `overview.md` +- Конфигурация: см. `config.md` +- Коллекторы: см. `collectors.md` +- Сборка и запуск: см. `build_and_run.md` +- Деплой (Docker, systemd, Kafka): см. `deploy.md` + + diff --git a/app/docs/build_and_run.md b/app/docs/build_and_run.md new file mode 100644 index 0000000..b862e2e --- /dev/null +++ b/app/docs/build_and_run.md @@ -0,0 +1,31 @@ +### Сборка и запуск + +Требования: Docker. + +Сборка коллекторов и запуск one-shot: +```bash +make run +``` + +Длительный запуск (stdout): +```bash +make agent +``` + +Сборка агента под платформы: +```bash +make build # под текущую (darwin/linux) +make build-linux # linux/amd64 +make build-darwin # darwin/arm64 +make build-windows # windows/amd64 +``` + +Сборка коллекторов под платформы: +```bash +make collectors # текущая платформа +make collectors-linux # linux/amd64 +make collectors-darwin # darwin/arm64 +make collectors-windows # windows/amd64 +``` + + diff --git a/app/docs/collectors.md b/app/docs/collectors.md new file mode 100644 index 0000000..fb243d5 --- /dev/null +++ b/app/docs/collectors.md @@ -0,0 +1,14 @@ +### Коллекторы + +Расположение бинарников: `bin/agent/collectors/` + +Добавление собственного коллектора на Go: +1. Создайте каталог `src/collectors//` с `main.go`, печатающим JSON в stdout. +2. Сборка: + - `make collectors` — соберёт под текущую платформу в `bin/agent/collectors/` + - `make collectors-linux|collectors-darwin|collectors-windows` — кросс-сборка +3. Добавьте в `bin/agent/config.yaml` блок с `type: exec` и `exec: ./bin/agent/collectors/`. + +Требования к выводу: корректный JSON на stdout. В случае ошибки — пустой JSON `{}`. + + diff --git a/app/docs/config.md b/app/docs/config.md new file mode 100644 index 0000000..b030bfa --- /dev/null +++ b/app/docs/config.md @@ -0,0 +1,41 @@ +### Конфигурация + +Файл: `bin/agent/config.yaml` + +Поля: +- `mode`: `auto|stdout|kafka` +- `log_level`: `debug|info|warn|error` +- `kafka`: блок настроек Kafka (для режима kafka) +- `collectors`: словарь коллекторов + +Описание коллектора: +```yaml +: + enabled: true + type: exec # запуск внешнего файла + key: # ключ в итоговом JSON + interval: "10s" # период опроса + timeout: "5s" # таймаут запуска коллектора + exec: "./bin/agent/collectors/" +``` + +Пример: +```yaml +collectors: + uptime: + enabled: true + type: exec + key: uptime + interval: "10s" + timeout: "5s" + exec: "./bin/agent/collectors/uptime" + macos: + enabled: true + type: exec + key: macos + interval: "30s" + timeout: "10s" + exec: "./bin/agent/collectors/macos" +``` + + diff --git a/app/docs/deploy.md b/app/docs/deploy.md new file mode 100644 index 0000000..f2f56e6 --- /dev/null +++ b/app/docs/deploy.md @@ -0,0 +1,30 @@ +### Деплой + +Docker Compose (stdout): см. `docker-compose.yml`. + +Systemd: +1. Скопируйте бинарь `bin/agent/agent` и `bin/agent/config.yaml` на сервер. +2. Установите юнит `sensusagent.service` (обновите путь при необходимости): +```ini +[Unit] +Description=SensusAgent metrics collector +After=network.target + +[Service] +Type=simple +Environment=CONFIG_PATH=/bin/agent/config.yaml +ExecStart=/bin/agent/agent --mode kafka +Restart=on-failure +RestartSec=3 +User=nobody +Group=nogroup + +[Install] +WantedBy=multi-user.target +``` +3. `systemctl daemon-reload && systemctl enable --now sensusagent`. + +Kafka: +- Настройте блок `kafka` в `config.yaml` (brokers/topic/timeout…). + + diff --git a/app/docs/overview.md b/app/docs/overview.md new file mode 100644 index 0000000..322f2da --- /dev/null +++ b/app/docs/overview.md @@ -0,0 +1,17 @@ +### Обзор + +SensusAgent — агент, выполняющий внешние коллектора из каталога `bin/agent/collectors` согласно `config.yaml` и объединяющий их JSON-вывод в один документ. + +- Плагинная архитектура: добавьте исполняемый файл — опишите его в конфиге — агент начнёт его опрашивать. +- Два вывода: stdout (standalone) и Kafka (systemd/production). +- Высокая устойчивость: при ошибках коллектора агент возвращает пустой блок и продолжает работу, не падая. + +Основные компоненты: +- `src/core/config` — конфиг. +- `src/core/collector` — интерфейсы, реестр. +- `src/core/runner` — планировщик/агрегация. +- `src/core/output` — stdout/Kafka. +- `src/core/execcollectors` — поддержка exec/execdir в ядре. +- `src/collectors/*` — исходники Go-коллекторов (собираются в `bin/agent/collectors`). + + diff --git a/bin/agent/collectors/sample.sh b/bin/agent/collectors/sample.sh new file mode 100644 index 0000000..e99efeb --- /dev/null +++ b/bin/agent/collectors/sample.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env sh +# Автор: Сергей Антропов, сайт: https://devops.org.ru +# Пример внешнего коллектора: выводит JSON с фиктивными данными. + +set -eu +echo '{"sample": {"ok": true, "ts": '"$(date +%s)"'}}' + + diff --git a/bin/agent/config.yaml b/bin/agent/config.yaml new file mode 100644 index 0000000..59ee4cb --- /dev/null +++ b/bin/agent/config.yaml @@ -0,0 +1,38 @@ +# Автор: Сергей Антропов, сайт: https://devops.org.ru +# Общая конфигурация агента SensusAgent + +mode: auto # stdout | kafka | auto +log_level: info + +kafka: + enabled: false + brokers: ["kafka:9092"] + topic: "sensus.metrics" + client_id: "sensusagent" + enable_tls: false + timeout: "5s" + +collectors: + uptime: + enabled: true + type: exec + key: uptime + interval: "10s" + timeout: "5s" + exec: "./bin/agent/collectors/uptime" + macos: + enabled: true + type: exec + key: macos + interval: "30s" + timeout: "10s" + exec: "./bin/agent/collectors/macos" + sample: + enabled: true + type: exec + key: sample + interval: "30s" + timeout: "5s" + exec: "./bin/agent/collectors/sample.sh" + + diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..e4c052d --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,38 @@ +version: "3.9" +services: + agent: + build: . + image: sensusagent:dev + container_name: sensusagent + environment: + - CONFIG_PATH=/bin/agent/config.yaml + - LOG_LEVEL=debug + volumes: + - ./bin/agent:/bin/agent:ro + entrypoint: ["/bin/agent/sensusagent", "--mode", "stdout"] + deploy: + resources: + limits: + cpus: '0.50' + memory: 256M + + kafka: + image: bitnami/kafka:3.7 + ports: + - "9092:9092" + environment: + - KAFKA_ENABLE_KRAFT=yes + - KAFKA_CFG_PROCESS_ROLES=broker,controller + - KAFKA_CFG_NODE_ID=1 + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true + - KAFKA_CFG_MESSAGE_MAX_BYTES=20000000 + healthcheck: + test: ["CMD", "bash", "-c", "/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list | cat"] + interval: 10s + timeout: 5s + retries: 10 + diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..f6b8c7b --- /dev/null +++ b/go.mod @@ -0,0 +1,14 @@ +module sensusagent + +go 1.22 + +require ( + github.com/joho/godotenv v1.5.1 + github.com/segmentio/kafka-go v0.4.46 + gopkg.in/yaml.v3 v3.0.1 +) + +require ( + github.com/klauspost/compress v1.15.9 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect +) diff --git a/sensusagent.service b/sensusagent.service new file mode 100644 index 0000000..f6d7ab6 --- /dev/null +++ b/sensusagent.service @@ -0,0 +1,17 @@ +[Unit] +Description=SensusAgent metrics collector +After=network.target + +[Service] +Type=simple +Environment=CONFIG_PATH=/bin/agent/config.yaml +ExecStart=/bin/agent/agent --mode kafka +Restart=on-failure +RestartSec=3 +User=nobody +Group=nogroup + +[Install] +WantedBy=multi-user.target + + diff --git a/src/collectors/macos/macos_darwin.go b/src/collectors/macos/macos_darwin.go new file mode 100644 index 0000000..6105ca2 --- /dev/null +++ b/src/collectors/macos/macos_darwin.go @@ -0,0 +1,298 @@ +//go:build darwin + +package main + +import ( + "context" + "encoding/json" + "fmt" + execpkg "os/exec" + "strconv" + "strings" +) + +func collectInfo(ctx context.Context) (map[string]any, error) { + // CPU + cores := toIntSafe(sysctlTrim(ctx, "hw.ncpu")) + cpuUsage := cpuUsagePercent(ctx) + + // RAM + totalMem := toInt64Safe(sysctlTrim(ctx, "hw.memsize")) + usedMem := memUsedBytes(ctx, totalMem) + ram := map[string]any{ + "total_bytes": totalMem, + "used_bytes": usedMem, + "used_percent": percent(usedMem, totalMem), + } + + // LOAD AVG + l1, l5, l15 := loadAvg(ctx) + + // DISKS usage per mount + disks, totalDisk, usedDisk := disksByMount(ctx) + + // NVMe/SSD/HDD counts via system_profiler + nvmeCount := countNVMe(ctx) + ssdCount, hddCount := countSataTypes(ctx) + + // GPU info with VRAM totals and attempt to get usage via ioreg + gpus, gpuCount := gpuInfo(ctx) + + result := map[string]any{ + "cpu": map[string]any{ + "cores": cores, + "usage_percent": cpuUsage, + }, + "ram": ram, + "disks": map[string]any{ + "by_mount": disks, + "total_bytes": totalDisk, + "used_bytes": usedDisk, + "nvme": nvmeCount, + "ssd": ssdCount, + "hdd": hddCount, + }, + "gpu": map[string]any{ + "count": gpuCount, + "devices": gpus, + }, + "load": map[string]any{ + "1m": l1, + "5m": l5, + "15m": l15, + }, + } + return result, nil +} + +func runJSONSafe(ctx context.Context, name string, args ...string) (string, error) { + out, err := execpkg.CommandContext(ctx, name, args...).Output() + if err != nil { return "", err } + b := out + for len(b) > 0 && (b[len(b)-1] == '\n' || b[len(b)-1] == '\r' || b[len(b)-1] == ' ') { b = b[:len(b)-1] } + return string(b), nil +} + +func sysctlTrim(ctx context.Context, key string) string { + s, _ := runJSONSafe(ctx, "/usr/sbin/sysctl", "-n", key) + return s +} + +func cpuUsagePercent(ctx context.Context) float64 { + // top -l 1 -n 0 -> line: "CPU usage: 6.75% user, 5.03% sys, 88.20% idle" + out, err := execpkg.CommandContext(ctx, "/usr/bin/top", "-l", "1", "-n", "0").Output() + if err != nil { return 0 } + lines := strings.Split(string(out), "\n") + for _, ln := range lines { + if strings.Contains(ln, "CPU usage:") { + // extract numbers before "% user" and "% sys" + parts := strings.Split(ln, ",") + var user, sys float64 + for _, p := range parts { + p = strings.TrimSpace(p) + if strings.Contains(p, "user") { + fmt.Sscanf(p, "CPU usage: %f%% user", &user) + } else if strings.Contains(p, "sys") { + fmt.Sscanf(p, "%f%% sys", &sys) + } + } + return user + sys + } + } + return 0 +} + +func memUsedBytes(ctx context.Context, total int64) int64 { + // vm_stat + pagesize + out, err := execpkg.CommandContext(ctx, "/usr/bin/vm_stat").Output() + if err != nil { return 0 } + psStr := sysctlTrim(ctx, "hw.pagesize") + pageSize := toInt64Safe(psStr) + if pageSize <= 0 { pageSize = 4096 } + var freePages int64 + lines := strings.Split(string(out), "\n") + for _, ln := range lines { + if strings.HasPrefix(strings.TrimSpace(ln), "Pages free:") { + // format: Pages free: 12345. + ln = strings.TrimSuffix(ln, ".") + fields := strings.Fields(ln) + if len(fields) > 2 { + freePages = toInt64Safe(fields[len(fields)-1]) + break + } + } + } + free := freePages * pageSize + if total > 0 && total > free { return total - free } + return 0 +} + +func loadAvg(ctx context.Context) (float64, float64, float64) { + out, err := execpkg.CommandContext(ctx, "/usr/sbin/sysctl", "-n", "vm.loadavg").Output() + if err != nil { return 0, 0, 0 } + s := string(out) + // format: { 1.21 1.34 1.56 } + s = strings.TrimSpace(strings.Trim(s, "{}")) + fields := strings.Fields(s) + if len(fields) >= 3 { + return toFloat(fields[0]), toFloat(fields[1]), toFloat(fields[2]) + } + return 0, 0, 0 +} + +func disksByMount(ctx context.Context) ([]map[string]any, int64, int64) { + out, err := execpkg.CommandContext(ctx, "/bin/df", "-kP").Output() + if err != nil { return nil, 0, 0 } + lines := strings.Split(string(out), "\n") + if len(lines) <= 1 { return nil, 0, 0 } + var res []map[string]any + var total, used int64 + for i := 1; i < len(lines); i++ { + ln := strings.TrimSpace(lines[i]) + if ln == "" { continue } + fields := strings.Fields(ln) + if len(fields) < 6 { continue } + fs := fields[0] + // size used avail capacity mountpoint + sz := toInt64Safe(fields[1]) * 1024 + us := toInt64Safe(fields[2]) * 1024 + mp := fields[5] + // Исключаем docker-файловые системы/маунты + lfs, lmp := strings.ToLower(fs), strings.ToLower(mp) + if strings.Contains(lfs, "docker") || strings.Contains(lmp, "docker") { + continue + } + if strings.HasPrefix(fs, "/dev/") { + res = append(res, map[string]any{ + "filesystem": fs, + "mountpoint": mp, + "size_bytes": sz, + "used_bytes": us, + "free_bytes": (toInt64Safe(fields[3]) * 1024), + "used_percent": percent(us, sz), + }) + total += sz + used += us + } + } + return res, total, used +} + +func countNVMe(ctx context.Context) int { + out, err := execpkg.CommandContext(ctx, "/usr/sbin/system_profiler", "-json", "SPNVMeDataType").Output() + if err != nil { return 0 } + var mp map[string]any + if json.Unmarshal(out, &mp) != nil { return 0 } + if arr, ok := mp["SPNVMeDataType"].([]any); ok { return len(arr) } + return 0 +} + +func countSataTypes(ctx context.Context) (int, int) { + out, err := execpkg.CommandContext(ctx, "/usr/sbin/system_profiler", "-json", "SPSerialATADataType").Output() + if err != nil { return 0, 0 } + var mp map[string]any + if json.Unmarshal(out, &mp) != nil { return 0, 0 } + ssd, hdd := 0, 0 + // Heuristic scan for "Solid State" vs others in nested objects + b, _ := json.Marshal(mp) + s := string(b) + if strings.Contains(strings.ToLower(s), "solid state") { ssd++ } + if ssd == 0 && strings.Contains(strings.ToLower(s), "rotational") { hdd++ } + return ssd, hdd +} + +func gpuInfo(ctx context.Context) ([]map[string]any, int) { + out, err := execpkg.CommandContext(ctx, "/usr/sbin/system_profiler", "-json", "SPDisplaysDataType").Output() + if err != nil { return nil, 0 } + var mp map[string]any + if json.Unmarshal(out, &mp) != nil { return nil, 0 } + var devices []map[string]any + if arr, ok := mp["SPDisplaysDataType"].([]any); ok { + for _, it := range arr { + if m, ok := it.(map[string]any); ok { + name := "" + if v, ok := m["_name"].(string); ok { name = v } + // VRAM может приходить как VRAM (Total): "8 GB" + var vramBytes int64 + for k, vv := range m { + if strings.HasPrefix(strings.ToLower(k), "vram") { + if s, ok := vv.(string); ok { vramBytes = parseSizeToBytes(s) } + } + } + used := gpuMemUsedBytes(ctx) + devices = append(devices, map[string]any{ + "model": name, + "vram_bytes": vramBytes, + "used_bytes": used, + "used_percent": percent(used, vramBytes), + }) + } + } + } + return devices, len(devices) +} + +func parseSizeToBytes(s string) int64 { + // Пример: "8 GB", "1536 MB" + s = strings.TrimSpace(strings.ToUpper(s)) + parts := strings.Fields(s) + if len(parts) < 2 { return 0 } + val := toFloat(parts[0]) + unit := parts[1] + switch unit { + case "TB": + return int64(val * 1024 * 1024 * 1024 * 1024) + case "GB": + return int64(val * 1024 * 1024 * 1024) + case "MB": + return int64(val * 1024 * 1024) + case "KB": + return int64(val * 1024) + default: + return int64(val) + } +} + +func gpuMemUsedBytes(ctx context.Context) int64 { + // На macOS общедоступных CLI для точного VRAM usage нет. + // Попытаемся через ioreg получить IOFBMemoryUsage (может быть недоступно). Если не получится — 0. + out, err := execpkg.CommandContext(ctx, "/usr/sbin/ioreg", "-l").Output() + if err != nil { return 0 } + // Ищем строки вида: "IOFBMemoryUsage" = 12345678 + var used int64 + lines := strings.Split(string(out), "\n") + for _, ln := range lines { + if strings.Contains(ln, "IOFBMemoryUsage") { + var v int64 + if _, err := fmt.Sscanf(ln, "%*s = %d", &v); err == nil { + if v > used { used = v } + } + } + } + return used +} + +func toIntSafe(s string) int { + i, _ := strconv.Atoi(strings.TrimSpace(s)) + return i +} +func toInt64Safe(s string) int64 { + s = strings.TrimSpace(s) + if s == "" { return 0 } + // try int + if i, err := strconv.ParseInt(s, 10, 64); err == nil { return i } + // sometimes with trailing chars + var i64 int64 + fmt.Sscanf(s, "%d", &i64) + return i64 +} +func toFloat(s string) float64 { + f, _ := strconv.ParseFloat(strings.TrimSpace(s), 64) + return f +} +func percent(part int64, total int64) float64 { + if total <= 0 { return 0 } + return (float64(part) / float64(total)) * 100.0 +} + + diff --git a/src/collectors/macos/macos_unsupported.go b/src/collectors/macos/macos_unsupported.go new file mode 100644 index 0000000..6a4bb45 --- /dev/null +++ b/src/collectors/macos/macos_unsupported.go @@ -0,0 +1,12 @@ +//go:build !darwin + +package main + +import ( + "context" + "fmt" +) + +func collectInfo(ctx context.Context) (map[string]any, error) { return nil, fmt.Errorf("unsupported OS") } + + diff --git a/src/collectors/macos/main.go b/src/collectors/macos/main.go new file mode 100644 index 0000000..86e2f61 --- /dev/null +++ b/src/collectors/macos/main.go @@ -0,0 +1,40 @@ +package main + +// Автор: Сергей Антропов, сайт: https://devops.org.ru +// Коллектор macOS hardware info на Go: sysctl + system_profiler в JSON. + +import ( + "context" + "encoding/json" + "fmt" + "os" + "strings" + "time" +) + +// collectInfo реализуется в файлах с билд-тегами. + +func main() { + timeout := parseDurationOr("COLLECTOR_TIMEOUT", 8*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + data, err := collectInfo(ctx) + if err != nil || data == nil { + fmt.Println("{}") + return + } + enc := json.NewEncoder(os.Stdout) + enc.SetEscapeHTML(false) + _ = enc.Encode(data) +} + +func parseDurationOr(env string, def time.Duration) time.Duration { + v := strings.TrimSpace(os.Getenv(env)) + if v == "" { return def } + d, err := time.ParseDuration(v) + if err != nil { return def } + return d +} + + diff --git a/src/collectors/uptime/main.go b/src/collectors/uptime/main.go new file mode 100644 index 0000000..bef90fe --- /dev/null +++ b/src/collectors/uptime/main.go @@ -0,0 +1,63 @@ +package main + +// Автор: Сергей Антропов, сайт: https://devops.org.ru +// Коллектор uptime на Go. Печатает JSON c полями seconds и human. + +import ( + "context" + "encoding/json" + "fmt" + "os" + "strings" + "time" +) + +// collectUptime реализуется в файлах с билд-тегами под конкретные ОС. + +func main() { + timeout := parseDurationOr("COLLECTOR_TIMEOUT", 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + secs, err := collectUptime(ctx) + if err != nil || secs < 0 { + fmt.Println("{}") + return + } + out := map[string]any{ + "seconds": secs, + "human": humanize(time.Duration(secs) * time.Second), + } + enc := json.NewEncoder(os.Stdout) + enc.SetEscapeHTML(false) + _ = enc.Encode(out) +} + +func humanize(d time.Duration) string { + total := int64(d.Seconds()) + if total < 0 { total = 0 } + days := total / 86400 + hours := (total % 86400) / 3600 + mins := (total % 3600) / 60 + secs := total % 60 + parts := []string{} + if days > 0 { parts = append(parts, fmt.Sprintf("%dd", days)) } + if hours > 0 { parts = append(parts, fmt.Sprintf("%dh", hours)) } + if mins > 0 { parts = append(parts, fmt.Sprintf("%dm", mins)) } + parts = append(parts, fmt.Sprintf("%ds", secs)) + return strings.Join(parts, " ") +} + +func parseDurationOr(env string, def time.Duration) time.Duration { + v := strings.TrimSpace(os.Getenv(env)) + if v == "" { + return def + } + d, err := time.ParseDuration(v) + if err != nil { + return def + } + return d +} + + diff --git a/src/collectors/uptime/uptime_darwin.go b/src/collectors/uptime/uptime_darwin.go new file mode 100644 index 0000000..d22c5a5 --- /dev/null +++ b/src/collectors/uptime/uptime_darwin.go @@ -0,0 +1,22 @@ +//go:build darwin + +package main + +import ( + "context" + "fmt" + execpkg "os/exec" + "time" +) + +func collectUptime(ctx context.Context) (int64, error) { + b, err := execpkg.CommandContext(ctx, "/usr/sbin/sysctl", "-n", "kern.boottime").Output() + if err != nil { return 0, err } + var sec int64 + _, err = fmt.Sscanf(string(b), "{ sec = %d", &sec) + if err != nil { return 0, err } + now := time.Now().Unix() + return now - sec, nil +} + + diff --git a/src/collectors/uptime/uptime_linux.go b/src/collectors/uptime/uptime_linux.go new file mode 100644 index 0000000..ce67204 --- /dev/null +++ b/src/collectors/uptime/uptime_linux.go @@ -0,0 +1,23 @@ +//go:build linux + +package main + +import ( + "context" + "fmt" + "os" + "strings" +) + +func collectUptime(ctx context.Context) (int64, error) { + b, err := os.ReadFile("/proc/uptime") + if err != nil { return 0, err } + fields := strings.Fields(strings.TrimSpace(string(b))) + if len(fields) == 0 { return 0, fmt.Errorf("no fields") } + var f float64 + _, err = fmt.Sscanf(fields[0], "%f", &f) + if err != nil { return 0, err } + return int64(f), nil +} + + diff --git a/src/collectors/uptime/uptime_unsupported.go b/src/collectors/uptime/uptime_unsupported.go new file mode 100644 index 0000000..c78e519 --- /dev/null +++ b/src/collectors/uptime/uptime_unsupported.go @@ -0,0 +1,12 @@ +//go:build !linux && !darwin + +package main + +import ( + "context" + "fmt" +) + +func collectUptime(ctx context.Context) (int64, error) { return 0, fmt.Errorf("unsupported OS") } + + diff --git a/src/core/collector/registry.go b/src/core/collector/registry.go new file mode 100644 index 0000000..fb3355e --- /dev/null +++ b/src/core/collector/registry.go @@ -0,0 +1,67 @@ +package collector + +// Автор: Сергей Антропов, сайт: https://devops.org.ru +// Назначение: Реестр коллекторов и базовые интерфейсы. + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "sensusagent/src/core/config" +) + +// Result — универсальное представление данных от коллектора. +type Result map[string]any + +// Collector — интерфейс любого коллектора метрик. +// Регистрация производится через init() с помощью RegisterFactory. +type Collector interface { + Name() string + Key() string + Interval() time.Duration + Enabled() bool + Collect(ctx context.Context) (Result, error) +} + +// Factory создает коллектор из его конфигурации. +type Factory func(name string, cfg config.CollectorConfig) (Collector, error) + +var ( + registryMu sync.RWMutex + registry = map[string]Factory{} +) + +// RegisterFactory регистрирует фабрику коллектора по его типу (cfg.Type). +func RegisterFactory(collectorType string, f Factory) { + registryMu.Lock() + defer registryMu.Unlock() + registry[collectorType] = f +} + +// BuildCollectors создает экземпляры коллекторов на основании конфигурации. +func BuildCollectors(cfgs map[string]config.CollectorConfig) ([]Collector, error) { + registryMu.RLock() + defer registryMu.RUnlock() + + collectors := make([]Collector, 0, len(cfgs)) + for name, c := range cfgs { + f, ok := registry[c.Type] + if !ok { + return nil, fmt.Errorf("неизвестный тип коллектора '%s' для '%s'", c.Type, name) + } + inst, err := f(name, c) + if err != nil { + return nil, err + } + collectors = append(collectors, inst) + } + return collectors, nil +} + +// ErrNoData возвращается, если коллектор не смог получить данные (для пустого блока). +var ErrNoData = errors.New("no data") + + diff --git a/src/core/config/config.go b/src/core/config/config.go new file mode 100644 index 0000000..ca0f816 --- /dev/null +++ b/src/core/config/config.go @@ -0,0 +1,127 @@ +package config + +// Автор: Сергей Антропов, сайт: https://devops.org.ru +// Назначение: Загрузка и валидация конфигурации агента SensusAgent. +// Комментарии на русском языке. Подробное логирование предусмотрено на уровне вызывающего кода. + +import ( + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + "strings" + "time" + + yaml "gopkg.in/yaml.v3" +) + +// KafkaConfig описывает настройки подключения к Kafka. +type KafkaConfig struct { + Enabled bool `yaml:"enabled"` + Brokers []string `yaml:"brokers"` + Topic string `yaml:"topic"` + ClientID string `yaml:"client_id"` + SASLUser string `yaml:"sasl_user"` + SASLPass string `yaml:"sasl_pass"` + EnableTLS bool `yaml:"enable_tls"` + Timeout string `yaml:"timeout"` // человекочитаемый интервал, например "5s" +} + +// CollectorConfig описывает конфигурацию конкретного коллектора. +type CollectorConfig struct { + Enabled bool `yaml:"enabled"` + Type string `yaml:"type"` // builtin | exec + Key string `yaml:"key"` // ключ JSON, под которым будет возвращаться блок метрик + Interval string `yaml:"interval"` // человекочитаемый интервал, например "10s" + Exec string `yaml:"exec"` // путь/команда для внешнего коллектора + Timeout string `yaml:"timeout"` // таймаут выполнения + Extra map[string]any `yaml:"extra"` // произвольные параметры коллектора +} + +// AgentConfig — корневая конфигурация агента. +type AgentConfig struct { + Mode string `yaml:"mode"` // stdout | kafka | auto + LogLevel string `yaml:"log_level"` // debug | info | warn | error + Kafka KafkaConfig `yaml:"kafka"` + Collectors map[string]CollectorConfig `yaml:"collectors"` +} + +// Load загружает основную конфигурацию из одного файла config.yaml. +func Load(configPath string) (*AgentConfig, error) { + cfg := &AgentConfig{ + Mode: "auto", + LogLevel: "info", + Kafka: KafkaConfig{ + Enabled: false, + Brokers: nil, + Topic: "sensus.metrics", + ClientID: "sensusagent", + EnableTLS: false, + Timeout: "5s", + }, + Collectors: map[string]CollectorConfig{}, + } + + if err := readYAMLFileIfExists(configPath, cfg); err != nil { + return nil, fmt.Errorf("чтение config.yaml: %w", err) + } + + // Нормализуем и валидируем интервалы + for name, c := range cfg.Collectors { + if strings.TrimSpace(c.Key) == "" { + c.Key = name + } + if strings.TrimSpace(c.Type) == "" { + c.Type = "builtin" + } + if strings.TrimSpace(c.Interval) == "" { + c.Interval = "15s" + } else if _, err := time.ParseDuration(c.Interval); err != nil { + return nil, fmt.Errorf("collector %s: некорректный interval '%s': %w", name, c.Interval, err) + } + if c.Timeout != "" { + if _, err := time.ParseDuration(c.Timeout); err != nil { + return nil, fmt.Errorf("collector %s: некорректный timeout '%s': %w", name, c.Timeout, err) + } + } + cfg.Collectors[name] = c + } + + return cfg, nil +} + +// readYAMLFileIfExists читает YAML в переданную структуру, если файл существует. +func readYAMLFileIfExists(path string, out any) error { + if strings.TrimSpace(path) == "" { + return nil + } + b, err := os.ReadFile(filepath.Clean(path)) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return nil + } + return err + } + if len(b) == 0 { + return nil + } + if err := yaml.Unmarshal(b, out); err != nil { + return fmt.Errorf("yaml unmarshal: %w", err) + } + return nil +} + +// MustParseDuration безопасно парсит duration с дефолтом. +func MustParseDuration(s string, def time.Duration) time.Duration { + if strings.TrimSpace(s) == "" { + return def + } + d, err := time.ParseDuration(s) + if err != nil { + return def + } + return d +} + + diff --git a/src/core/execcollectors/exec.go b/src/core/execcollectors/exec.go new file mode 100644 index 0000000..1b26563 --- /dev/null +++ b/src/core/execcollectors/exec.go @@ -0,0 +1,71 @@ +package execcollectors + +// Автор: Сергей Антропов, сайт: https://devops.org.ru +// Назначение: Коллектор, запускающий внешнюю команду/скрипт и парсящий stdout как JSON. + +import ( + "context" + "encoding/json" + "errors" + "fmt" + osExec "os/exec" + "strings" + "time" + + "sensusagent/src/core/collector" + "sensusagent/src/core/config" +) + +const collectorType = "exec" + +type execCollector struct { + name string + key string + interval time.Duration + enabled bool + command string + timeout time.Duration +} + +func init() { collector.RegisterFactory(collectorType, newExecCollector) } + +func newExecCollector(name string, cfg config.CollectorConfig) (collector.Collector, error) { + interval := config.MustParseDuration(cfg.Interval, 15*time.Second) + timeout := config.MustParseDuration(cfg.Timeout, 10*time.Second) + if strings.TrimSpace(cfg.Exec) == "" { + return nil, errors.New("exec collector: не задана команда exec") + } + key := cfg.Key + if strings.TrimSpace(key) == "" { key = name } + return &execCollector{ + name: name, + key: key, + interval: interval, + enabled: cfg.Enabled, + command: cfg.Exec, + timeout: timeout, + }, nil +} + +func (e *execCollector) Name() string { return e.name } +func (e *execCollector) Key() string { return e.key } +func (e *execCollector) Interval() time.Duration { return e.interval } +func (e *execCollector) Enabled() bool { return e.enabled } + +func (e *execCollector) Collect(ctx context.Context) (collector.Result, error) { + ctx, cancel := context.WithTimeout(ctx, e.timeout) + defer cancel() + cmd := osExec.CommandContext(ctx, "sh", "-c", e.command) + out, err := cmd.Output() + if err != nil { + return nil, fmt.Errorf("exec failed: %w", err) + } + var data map[string]any + if err := json.Unmarshal(out, &data); err != nil { + return nil, collector.ErrNoData + } + if data == nil { data = map[string]any{} } + return data, nil +} + + diff --git a/src/core/execcollectors/execdir.go b/src/core/execcollectors/execdir.go new file mode 100644 index 0000000..3d20056 --- /dev/null +++ b/src/core/execcollectors/execdir.go @@ -0,0 +1,92 @@ +package execcollectors + +import ( + "context" + "encoding/json" + "log/slog" + "os" + "path/filepath" + osExec "os/exec" + "sort" + "strings" + "time" + + "sensusagent/src/core/collector" + "sensusagent/src/core/config" +) + +const dirType = "execdir" + +type execDirCollector struct { + name string + key string + interval time.Duration + enabled bool + dir string + pattern string + timeout time.Duration + wrap bool +} + +func init() { collector.RegisterFactory(dirType, newExecDirCollector) } + +func newExecDirCollector(name string, cfg config.CollectorConfig) (collector.Collector, error) { + key := cfg.Key + if strings.TrimSpace(key) == "" { key = name } + dir := "./bin/collectors" + pattern := "*" + wrap := true + if v, ok := cfg.Extra["dir"].(string); ok && strings.TrimSpace(v) != "" { dir = v } + if v, ok := cfg.Extra["pattern"].(string); ok && strings.TrimSpace(v) != "" { pattern = v } + if v, ok := cfg.Extra["wrap"].(bool); ok { wrap = v } + return &execDirCollector{ + name: name, key: key, interval: config.MustParseDuration(cfg.Interval, 15*time.Second), + enabled: cfg.Enabled, dir: dir, pattern: pattern, timeout: config.MustParseDuration(cfg.Timeout, 10*time.Second), wrap: wrap, + }, nil +} + +func (e *execDirCollector) Name() string { return e.name } +func (e *execDirCollector) Key() string { return e.key } +func (e *execDirCollector) Interval() time.Duration { return e.interval } +func (e *execDirCollector) Enabled() bool { return e.enabled } + +func (e *execDirCollector) Collect(ctx context.Context) (collector.Result, error) { + entries, err := os.ReadDir(e.dir) + if err != nil { slog.Warn("execdir: read dir failed", "dir", e.dir, "err", err); return map[string]any{}, nil } + candidates := make([]string, 0, len(entries)) + for _, entry := range entries { + if entry.IsDir() { continue } + name := entry.Name() + match, _ := filepath.Match(e.pattern, name) + if !match { continue } + path := filepath.Join(e.dir, name) + if fi, err := os.Stat(path); err == nil && fi.Mode()&0111 != 0 { candidates = append(candidates, path) } + } + sort.Strings(candidates) + result := map[string]any{} + for _, path := range candidates { + sub := e.runOne(ctx, path) + if e.wrap { + key := filepath.Base(path) + result[key] = sub + } else { + if m, ok := sub.(map[string]any); ok { + for k, v := range m { result[k] = v } + } + } + } + return result, nil +} + +func (e *execDirCollector) runOne(ctx context.Context, path string) any { + cctx, cancel := context.WithTimeout(ctx, e.timeout); defer cancel() + cmd := osExec.CommandContext(cctx, path) + out, err := cmd.Output() + if err != nil { slog.Warn("execdir: run failed", "path", path, "err", err); return map[string]any{} } + var data any + if err := json.Unmarshal(out, &data); err != nil { slog.Warn("execdir: bad json", "path", path, "err", err); return map[string]any{} } + if data == nil { return map[string]any{} } + return data +} + + diff --git a/src/core/logging/logging.go b/src/core/logging/logging.go new file mode 100644 index 0000000..e798e65 --- /dev/null +++ b/src/core/logging/logging.go @@ -0,0 +1,31 @@ +package logging + +// Автор: Сергей Антропов, сайт: https://devops.org.ru +// Назначение: Инициализация структурированного логгера (slog) с выводом в stdout. + +import ( + "log/slog" + "os" + "strings" +) + +// Setup настраивает глобальный логгер на stdout с нужным уровнем. +func Setup(level string) *slog.Logger { + lvl := slog.LevelInfo + switch strings.ToLower(strings.TrimSpace(level)) { + case "debug": + lvl = slog.LevelDebug + case "warn", "warning": + lvl = slog.LevelWarn + case "error": + lvl = slog.LevelError + default: + lvl = slog.LevelInfo + } + handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: lvl}) + logger := slog.New(handler) + slog.SetDefault(logger) + return logger +} + + diff --git a/src/core/output/output.go b/src/core/output/output.go new file mode 100644 index 0000000..77dfa9e --- /dev/null +++ b/src/core/output/output.go @@ -0,0 +1,84 @@ +package output + +// Автор: Сергей Антропов, сайт: https://devops.org.ru +// Назначение: Вывод результатов — stdout и Kafka. + +import ( + "context" + "encoding/json" + "errors" + "os" + "strings" + "time" + + "github.com/segmentio/kafka-go" +) + +// Payload — общий JSON для вывода. +type Payload map[string]any + +// Output — интерфейс механизма доставки результатов. +type Output interface { + Write(ctx context.Context, data Payload) error + Close(ctx context.Context) error +} + +// StdoutOutput — вывод в stdout. +type StdoutOutput struct{} + +func (s *StdoutOutput) Write(_ context.Context, data Payload) error { + enc := json.NewEncoder(os.Stdout) + enc.SetEscapeHTML(false) + return enc.Encode(data) +} + +func (s *StdoutOutput) Close(_ context.Context) error { return nil } + +// KafkaOutput — вывод в Kafka. +type KafkaOutput struct { + writer *kafka.Writer + topic string +} + +// KafkaOptions описывает параметры подключения к Kafka. +type KafkaOptions struct { + Brokers []string + Topic string + ClientID string + Timeout time.Duration + SASLUser string + SASLPass string + EnableTLS bool +} + +// NewKafkaOutput создаёт Kafka writer. +func NewKafkaOutput(opts KafkaOptions) (*KafkaOutput, error) { + if len(opts.Brokers) == 0 || strings.TrimSpace(opts.Topic) == "" { + return nil, errors.New("kafka brokers/topic not configured") + } + w := &kafka.Writer{ + Addr: kafka.TCP(opts.Brokers...), + Topic: opts.Topic, + Balancer: &kafka.LeastBytes{}, + RequiredAcks: kafka.RequireAll, + } + return &KafkaOutput{writer: w, topic: opts.Topic}, nil +} + +func (k *KafkaOutput) Write(ctx context.Context, data Payload) error { + b, err := json.Marshal(data) + if err != nil { + return err + } + msg := kafka.Message{Value: b, Time: time.Now()} + return k.writer.WriteMessages(ctx, msg) +} + +func (k *KafkaOutput) Close(ctx context.Context) error { + if k.writer != nil { + return k.writer.Close() + } + return nil +} + + diff --git a/src/core/runner/runner.go b/src/core/runner/runner.go new file mode 100644 index 0000000..874185e --- /dev/null +++ b/src/core/runner/runner.go @@ -0,0 +1,136 @@ +package runner + +// Автор: Сергей Антропов, сайт: https://devops.org.ru +// Назначение: Планировщик опроса коллекторов и публикация результатов. + +import ( + "context" + "log/slog" + "sync" + "time" + + "sensusagent/src/core/collector" + "sensusagent/src/core/output" +) + +// Runner объединяет коллекторы и вывод. +type Runner struct { + collectors []collector.Collector + out output.Output +} + +// New создаёт раннер. +func New(collectors []collector.Collector, out output.Output) *Runner { + return &Runner{collectors: collectors, out: out} +} + +// RunOnce собирает метрики один раз со всех коллекторов и публикует общий JSON. +func (r *Runner) RunOnce(ctx context.Context) { + payload := output.Payload{} + var wg sync.WaitGroup + var mu sync.Mutex + + for _, c := range r.collectors { + if !c.Enabled() { + // Пустой блок при выключенном коллекторе + payload[c.Key()] = map[string]any{} + continue + } + wg.Add(1) + go func(c collector.Collector) { + defer wg.Done() + res, err := c.Collect(ctx) + if err != nil { + slog.Warn("collector error", "name", c.Name(), "err", err) + res = map[string]any{} + } + mu.Lock() + payload[c.Key()] = res + mu.Unlock() + }(c) + } + wg.Wait() + + if isAllEmpty(payload) { + // Ничего не печатаем, если все блоки пустые + return + } + if err := r.out.Write(ctx, payload); err != nil { + slog.Error("output write failed", "err", err) + } +} + +// RunContinuous выполняет опрос в цикле с учетом индивидуальных интервалов. +func (r *Runner) RunContinuous(ctx context.Context) { + // Храним время последнего запуска по коллектору + lastRun := make(map[string]time.Time, len(r.collectors)) + // Минимальный тик для проверки графика + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + latest := output.Payload{} + + for { + select { + case <-ctx.Done(): + return + case now := <-ticker.C: + var wg sync.WaitGroup + var mu sync.Mutex + for _, c := range r.collectors { + if !c.Enabled() { + mu.Lock() + latest[c.Key()] = map[string]any{} + mu.Unlock() + continue + } + lr := lastRun[c.Name()] + if lr.IsZero() || now.Sub(lr) >= c.Interval() { + wg.Add(1) + go func(c collector.Collector) { + defer wg.Done() + res, err := c.Collect(ctx) + if err != nil { + slog.Warn("collector error", "name", c.Name(), "err", err) + res = map[string]any{} + } + mu.Lock() + latest[c.Key()] = res + lastRun[c.Name()] = now + mu.Unlock() + }(c) + } + } + wg.Wait() + if isAllEmpty(latest) { + // Пропускаем вывод, пока нет данных + continue + } + if err := r.out.Write(ctx, latest); err != nil { + slog.Error("output write failed", "err", err) + } + } + } +} + +// isAllEmpty проверяет, что все значения payload являются пустыми объектами +// или nil (что трактуем как пусто), чтобы подавлять бессмысленный вывод {}. +func isAllEmpty(p output.Payload) bool { + if len(p) == 0 { + return true + } + for _, v := range p { + if m, ok := v.(map[string]any); ok { + if len(m) > 0 { + return false + } + continue + } + if v != nil { + return false + } + } + return true +} + + diff --git a/src/main.go b/src/main.go new file mode 100644 index 0000000..b1eedca --- /dev/null +++ b/src/main.go @@ -0,0 +1,164 @@ +package main + +// Автор: Сергей Антропов, сайт: https://devops.org.ru +// SensusAgent — модульный агент сбора метрик с поддержкой встроенных и внешних коллекторов. + +import ( + "context" + "flag" + "fmt" + "log/slog" + "os" + "os/signal" + "path/filepath" + "strings" + "syscall" + "time" + + "github.com/joho/godotenv" + + "sensusagent/src/core/collector" + "sensusagent/src/core/config" + "sensusagent/src/core/logging" + "sensusagent/src/core/output" + "sensusagent/src/core/runner" + + // Регистрируем exec/execdir из ядра + _ "sensusagent/src/core/execcollectors" +) + +func main() { + // Загружаем .env если есть (для локальной разработки) + _ = godotenv.Load() + + var ( + configPath = envOr("CONFIG_PATH", "") + cfgPathFlag string + modeFlag string + once bool + ) + + flag.StringVar(&cfgPathFlag, "config", "", "Путь к config.yaml") + flag.StringVar(&modeFlag, "mode", "", "Режим работы: stdout|kafka|auto") + flag.BoolVar(&once, "once", false, "Единичный сбор метрик и вывод") + flag.Parse() + + if cfgPathFlag != "" { configPath = cfgPathFlag } + + // Разрешаем путь к конфигу: сначала берём из ENV/флага, иначе пытаемся найти локальный ./bin/agent/config.yaml, + // затем абсолютный /bin/agent/config.yaml для контейнера. + configPath = resolveConfigPath(configPath) + + // Настройка логирования + logging.Setup(envOr("LOG_LEVEL", "info")) + + // Загрузка конфигурации + cfg, err := config.Load(configPath) + if err != nil { + slog.Error("config load failed", "err", err) + os.Exit(1) + } + if modeFlag != "" { + cfg.Mode = modeFlag + } + + slog.Info("config loaded", "mode", cfg.Mode, "collectors", len(cfg.Collectors), "config_path", configPath) + + // Построение коллекторов + collectors, err := collector.BuildCollectors(cfg.Collectors) + if err != nil { + slog.Error("build collectors failed", "err", err) + os.Exit(1) + } + + // Выбор вывода + out, err := selectOutput(cfg) + if err != nil { + slog.Error("select output failed", "err", err) + os.Exit(1) + } + defer func() { _ = out.Close(context.Background()) }() + + r := runner.New(collectors, out) + + // Контекст с обработкой SIGINT/SIGTERM + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + if once { + r.RunOnce(ctx) + return + } + r.RunContinuous(ctx) +} + +// selectOutput выбирает способ вывода исходя из режима и окружения. +func selectOutput(cfg *config.AgentConfig) (output.Output, error) { + mode := strings.ToLower(strings.TrimSpace(cfg.Mode)) + if mode == "" || mode == "auto" { + if underSystemd() && cfg.Kafka.Enabled { + mode = "kafka" + } else { + mode = "stdout" + } + } + switch mode { + case "stdout": + slog.Info("using stdout output") + return &output.StdoutOutput{}, nil + case "kafka": + slog.Info("using kafka output") + to := config.MustParseDuration(cfg.Kafka.Timeout, 5*time.Second) + return output.NewKafkaOutput(output.KafkaOptions{ + Brokers: cfg.Kafka.Brokers, + Topic: cfg.Kafka.Topic, + ClientID: cfg.Kafka.ClientID, + Timeout: to, + SASLUser: cfg.Kafka.SASLUser, + SASLPass: cfg.Kafka.SASLPass, + EnableTLS: cfg.Kafka.EnableTLS, + }) + default: + return nil, fmt.Errorf("неизвестный режим: %s", mode) + } +} + +// underSystemd определяет запуск под systemd по окружению. +func underSystemd() bool { + if os.Getenv("INVOCATION_ID") != "" { + return true + } + if os.Getenv("JOURNAL_STREAM") != "" { + return true + } + return false +} + +func envOr(k, def string) string { + v := strings.TrimSpace(os.Getenv(k)) + if v == "" { + return def + } + return v +} + +// resolveConfigPath пытается определить корректный путь к config.yaml. +func resolveConfigPath(explicit string) string { + if strings.TrimSpace(explicit) != "" { + return explicit + } + // 1. Локальный путь разработки + local := filepath.Clean("./bin/agent/config.yaml") + if _, err := os.Stat(local); err == nil { + return local + } + // 2. Путь внутри контейнера/дистрибутива + container := "/bin/agent/config.yaml" + if _, err := os.Stat(container); err == nil { + return container + } + // 3. Фоллбек на локальный + return local +} + +