Initial commit: SensusAgent core, collectors, build, docs
This commit is contained in:
commit
ad409950b8
13
.gitignore
vendored
Normal file
13
.gitignore
vendored
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
bin/agent/agent
|
||||||
|
bin/agent/agent-darwin
|
||||||
|
bin/agent/agent.exe
|
||||||
|
bin/agent/sensusagent
|
||||||
|
bin/agent/sensusagent-darwin
|
||||||
|
bin/agent/sensusagent.exe
|
||||||
|
bin/agent/collectors/*
|
||||||
|
!bin/agent/collectors/sample.sh
|
||||||
|
|
||||||
|
go.sum
|
||||||
|
.idea/
|
||||||
|
.vscode/
|
||||||
|
.DS_Store
|
16
.golangci.yml
Normal file
16
.golangci.yml
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
run:
|
||||||
|
timeout: 3m
|
||||||
|
tests: true
|
||||||
|
linters:
|
||||||
|
enable:
|
||||||
|
- govet
|
||||||
|
- gofmt
|
||||||
|
- gocyclo
|
||||||
|
- revive
|
||||||
|
- errcheck
|
||||||
|
- staticcheck
|
||||||
|
issues:
|
||||||
|
exclude-use-default: false
|
||||||
|
max-issues-per-linter: 0
|
||||||
|
max-same-issues: 0
|
||||||
|
|
22
Dockerfile
Normal file
22
Dockerfile
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
# syntax=docker/dockerfile:1
|
||||||
|
|
||||||
|
FROM golang:1.22 AS build
|
||||||
|
WORKDIR /src
|
||||||
|
COPY go.mod ./
|
||||||
|
RUN go mod download
|
||||||
|
COPY . .
|
||||||
|
RUN go mod tidy && \
|
||||||
|
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /bin/agent/sensusagent ./src && \
|
||||||
|
mkdir -p /bin/agent/collectors && \
|
||||||
|
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /bin/agent/collectors/uptime ./src/collectors/uptime && \
|
||||||
|
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /bin/agent/collectors/macos ./src/collectors/macos
|
||||||
|
|
||||||
|
FROM alpine:3.20
|
||||||
|
WORKDIR /
|
||||||
|
COPY --from=build /bin/agent /bin/agent
|
||||||
|
RUN adduser -D -H -s /sbin/nologin agent && \
|
||||||
|
chmod +x /bin/agent/agent && \
|
||||||
|
chmod -R +x /bin/collectors || true
|
||||||
|
USER agent
|
||||||
|
ENTRYPOINT ["/bin/agent/sensusagent"]
|
||||||
|
|
100
Makefile
Normal file
100
Makefile
Normal file
@ -0,0 +1,100 @@
|
|||||||
|
SHELL := /bin/sh
|
||||||
|
PROJECT_NAME := agent
|
||||||
|
|
||||||
|
.PHONY: build build-linux build-darwin build-windows run agent test lint docker-clean collectors collectors-darwin collectors-linux collectors-windows
|
||||||
|
|
||||||
|
build:
|
||||||
|
# Платформозависимая сборка агента (как у collectors)
|
||||||
|
@mkdir -p ./bin/agent; \
|
||||||
|
if [ "$$(/usr/bin/uname -s)" = "Darwin" ]; then \
|
||||||
|
ARCH=$$(/usr/bin/uname -m | sed 's/x86_64/amd64/; s/arm64/arm64/'); \
|
||||||
|
docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=darwin -e GOARCH=$$ARCH golang:1.22 \
|
||||||
|
sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -o ./bin/agent/agent-darwin ./src"; \
|
||||||
|
else \
|
||||||
|
docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=linux -e GOARCH=amd64 golang:1.22 \
|
||||||
|
sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -o ./bin/agent/agent ./src"; \
|
||||||
|
fi
|
||||||
|
|
||||||
|
build-darwin:
|
||||||
|
# Кросс-сборка darwin (arm64) бинаря агента
|
||||||
|
@mkdir -p ./bin/agent; \
|
||||||
|
docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=darwin -e GOARCH=arm64 golang:1.22 \
|
||||||
|
sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -o ./bin/agent/agent-darwin ./src"
|
||||||
|
|
||||||
|
build-linux:
|
||||||
|
# Кросс-сборка linux (amd64) бинаря агента
|
||||||
|
@mkdir -p ./bin/agent; \
|
||||||
|
docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=linux -e GOARCH=amd64 golang:1.22 \
|
||||||
|
sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -o ./bin/agent/agent ./src"
|
||||||
|
|
||||||
|
build-windows:
|
||||||
|
# Кросс-сборка windows (amd64) бинаря агента
|
||||||
|
@mkdir -p ./bin/agent; \
|
||||||
|
docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=windows -e GOARCH=amd64 golang:1.22 \
|
||||||
|
sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -o ./bin/agent/agent.exe ./src"
|
||||||
|
|
||||||
|
collectors:
|
||||||
|
# Сборка коллекторов для текущей платформы (через Docker)
|
||||||
|
@mkdir -p ./bin/agent/collectors
|
||||||
|
@if [ "$$(/usr/bin/uname -s)" = "Darwin" ]; then \
|
||||||
|
ARCH=$$(/usr/bin/uname -m | sed 's/x86_64/amd64/; s/arm64/arm64/'); \
|
||||||
|
docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=darwin -e GOARCH=$$ARCH golang:1.22 \
|
||||||
|
sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -o ./bin/agent/collectors/uptime ./src/collectors/uptime && CGO_ENABLED=0 go build -o ./bin/agent/collectors/macos ./src/collectors/macos"; \
|
||||||
|
else \
|
||||||
|
docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=linux -e GOARCH=amd64 golang:1.22 \
|
||||||
|
sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -o ./bin/agent/collectors/uptime ./src/collectors/uptime && CGO_ENABLED=0 go build -o ./bin/agent/collectors/macos ./src/collectors/macos"; \
|
||||||
|
fi
|
||||||
|
|
||||||
|
collectors-darwin:
|
||||||
|
# Кросс-сборка коллекторов для macOS
|
||||||
|
@mkdir -p ./bin/agent/collectors
|
||||||
|
docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=darwin -e GOARCH=arm64 golang:1.22 \
|
||||||
|
sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -o ./bin/agent/collectors/uptime-darwin ./src/collectors/uptime && CGO_ENABLED=0 go build -o ./bin/agent/collectors/macos-darwin ./src/collectors/macos"
|
||||||
|
|
||||||
|
collectors-linux:
|
||||||
|
# Кросс-сборка коллекторов для Linux
|
||||||
|
@mkdir -p ./bin/agent/collectors
|
||||||
|
docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=linux -e GOARCH=amd64 golang:1.22 \
|
||||||
|
sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -o ./bin/agent/collectors/uptime-linux ./src/collectors/uptime && CGO_ENABLED=0 go build -o ./bin/agent/collectors/macos-linux ./src/collectors/macos"
|
||||||
|
|
||||||
|
collectors-windows:
|
||||||
|
# Кросс-сборка коллекторов для Windows
|
||||||
|
@mkdir -p ./bin/agent/collectors
|
||||||
|
docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=windows -e GOARCH=amd64 golang:1.22 \
|
||||||
|
sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -o ./bin/agent/collectors/uptime-windows.exe ./src/collectors/uptime && CGO_ENABLED=0 go build -o ./bin/agent/collectors/macos-windows.exe ./src/collectors/macos"
|
||||||
|
|
||||||
|
run: collectors
|
||||||
|
# На macOS собираем darwin-бинарь в Docker и запускаем на хосте (для доступа к macOS API);
|
||||||
|
# на других ОС — полностью в контейнере Linux.
|
||||||
|
@if [ "$$(/usr/bin/uname -s)" = "Darwin" ]; then \
|
||||||
|
mkdir -p ./bin/agent; \
|
||||||
|
ARCH=$$(/usr/bin/uname -m | sed 's/x86_64/amd64/; s/arm64/arm64/'); \
|
||||||
|
docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=darwin -e GOARCH=$$ARCH golang:1.22 \
|
||||||
|
sh -c "go mod tidy >/dev/null 2>&1 && go build -o /workspace/bin/agent/agent-darwin ./src >/dev/null 2>&1"; \
|
||||||
|
LOG_LEVEL=error CONFIG_PATH=./bin/agent/config.yaml ./bin/agent/agent-darwin --once --mode stdout | docker run --rm -i ghcr.io/jqlang/jq:latest; \
|
||||||
|
else \
|
||||||
|
docker run --rm -v $$PWD:/workspace -w /workspace golang:1.22 \
|
||||||
|
sh -c "go mod tidy >/dev/null 2>&1 && go build -o /tmp/agent ./src >/dev/null 2>&1 && LOG_LEVEL=error CONFIG_PATH=./bin/agent/config.yaml /tmp/agent --once --mode stdout" | docker run --rm -i ghcr.io/jqlang/jq:latest; \
|
||||||
|
fi
|
||||||
|
|
||||||
|
agent: collectors build
|
||||||
|
# Запуск агента с ВСЕМИ собранными коллекторами (не one-shot)
|
||||||
|
@if [ "$$(/usr/bin/uname -s)" = "Darwin" ]; then \
|
||||||
|
LOG_LEVEL=info CONFIG_PATH=./bin/agent/config.yaml ./bin/agent/agent-darwin --mode stdout; \
|
||||||
|
else \
|
||||||
|
LOG_LEVEL=info CONFIG_PATH=./bin/agent/config.yaml ./bin/agent/agent --mode stdout; \
|
||||||
|
fi
|
||||||
|
|
||||||
|
test:
|
||||||
|
# Юнит-тесты в Docker без использования локальной машины
|
||||||
|
docker run --rm -v $$PWD:/workspace -w /workspace golang:1.22 \
|
||||||
|
sh -c "go test ./..."
|
||||||
|
|
||||||
|
lint:
|
||||||
|
# Линт кода через golangci-lint в Docker
|
||||||
|
docker run --rm -v $$PWD:/app -w /app golangci/golangci-lint:latest \
|
||||||
|
golangci-lint run --timeout 3m
|
||||||
|
|
||||||
|
docker-clean:
|
||||||
|
docker rmi -f $(PROJECT_NAME):build || true
|
||||||
|
|
18
README.md
Normal file
18
README.md
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
## SensusAgent
|
||||||
|
|
||||||
|
Автор: Сергей Антропов, сайт: https://devops.org.ru
|
||||||
|
|
||||||
|
Модульный агент мониторинга на Go 1.22+. Поддерживает встроенные и внешние (exec) коллекторы, вывод в stdout и Kafka, systemd unit.
|
||||||
|
|
||||||
|
### Структура
|
||||||
|
- src/core — ядро (конфиг, логирование, реестр коллекторов, раннер, вывод)
|
||||||
|
- src/collectors — исходники Go-коллекторов (сборка в bin/agent/collectors)
|
||||||
|
- bin/agent — бинарник и конфиги
|
||||||
|
|
||||||
|
### Сборка и запуск
|
||||||
|
Подробно см. `app/docs/README.md`.
|
||||||
|
|
||||||
|
### Конфигурация
|
||||||
|
Все настройки, включая коллекторы, находятся в `bin/agent/config.yaml`.
|
||||||
|
|
||||||
|
|
14
app/docs/README.md
Normal file
14
app/docs/README.md
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
## SensusAgent — документация
|
||||||
|
|
||||||
|
Автор: Сергей Антропов, сайт: https://devops.org.ru
|
||||||
|
|
||||||
|
SensusAgent — лёгкий модульный агент мониторинга (Go 1.22+), собирающий метрики через внешние коллектора (исполняемые файлы) и агрегирующий результат в JSON (stdout или Kafka).
|
||||||
|
|
||||||
|
Содержание:
|
||||||
|
- Обзор архитектуры: см. `overview.md`
|
||||||
|
- Конфигурация: см. `config.md`
|
||||||
|
- Коллекторы: см. `collectors.md`
|
||||||
|
- Сборка и запуск: см. `build_and_run.md`
|
||||||
|
- Деплой (Docker, systemd, Kafka): см. `deploy.md`
|
||||||
|
|
||||||
|
|
31
app/docs/build_and_run.md
Normal file
31
app/docs/build_and_run.md
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
### Сборка и запуск
|
||||||
|
|
||||||
|
Требования: Docker.
|
||||||
|
|
||||||
|
Сборка коллекторов и запуск one-shot:
|
||||||
|
```bash
|
||||||
|
make run
|
||||||
|
```
|
||||||
|
|
||||||
|
Длительный запуск (stdout):
|
||||||
|
```bash
|
||||||
|
make agent
|
||||||
|
```
|
||||||
|
|
||||||
|
Сборка агента под платформы:
|
||||||
|
```bash
|
||||||
|
make build # под текущую (darwin/linux)
|
||||||
|
make build-linux # linux/amd64
|
||||||
|
make build-darwin # darwin/arm64
|
||||||
|
make build-windows # windows/amd64
|
||||||
|
```
|
||||||
|
|
||||||
|
Сборка коллекторов под платформы:
|
||||||
|
```bash
|
||||||
|
make collectors # текущая платформа
|
||||||
|
make collectors-linux # linux/amd64
|
||||||
|
make collectors-darwin # darwin/arm64
|
||||||
|
make collectors-windows # windows/amd64
|
||||||
|
```
|
||||||
|
|
||||||
|
|
14
app/docs/collectors.md
Normal file
14
app/docs/collectors.md
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
### Коллекторы
|
||||||
|
|
||||||
|
Расположение бинарников: `bin/agent/collectors/`
|
||||||
|
|
||||||
|
Добавление собственного коллектора на Go:
|
||||||
|
1. Создайте каталог `src/collectors/<name>/` с `main.go`, печатающим JSON в stdout.
|
||||||
|
2. Сборка:
|
||||||
|
- `make collectors` — соберёт под текущую платформу в `bin/agent/collectors/<name>`
|
||||||
|
- `make collectors-linux|collectors-darwin|collectors-windows` — кросс-сборка
|
||||||
|
3. Добавьте в `bin/agent/config.yaml` блок с `type: exec` и `exec: ./bin/agent/collectors/<name>`.
|
||||||
|
|
||||||
|
Требования к выводу: корректный JSON на stdout. В случае ошибки — пустой JSON `{}`.
|
||||||
|
|
||||||
|
|
41
app/docs/config.md
Normal file
41
app/docs/config.md
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
### Конфигурация
|
||||||
|
|
||||||
|
Файл: `bin/agent/config.yaml`
|
||||||
|
|
||||||
|
Поля:
|
||||||
|
- `mode`: `auto|stdout|kafka`
|
||||||
|
- `log_level`: `debug|info|warn|error`
|
||||||
|
- `kafka`: блок настроек Kafka (для режима kafka)
|
||||||
|
- `collectors`: словарь коллекторов
|
||||||
|
|
||||||
|
Описание коллектора:
|
||||||
|
```yaml
|
||||||
|
<name>:
|
||||||
|
enabled: true
|
||||||
|
type: exec # запуск внешнего файла
|
||||||
|
key: <json_key> # ключ в итоговом JSON
|
||||||
|
interval: "10s" # период опроса
|
||||||
|
timeout: "5s" # таймаут запуска коллектора
|
||||||
|
exec: "./bin/agent/collectors/<binary_or_script>"
|
||||||
|
```
|
||||||
|
|
||||||
|
Пример:
|
||||||
|
```yaml
|
||||||
|
collectors:
|
||||||
|
uptime:
|
||||||
|
enabled: true
|
||||||
|
type: exec
|
||||||
|
key: uptime
|
||||||
|
interval: "10s"
|
||||||
|
timeout: "5s"
|
||||||
|
exec: "./bin/agent/collectors/uptime"
|
||||||
|
macos:
|
||||||
|
enabled: true
|
||||||
|
type: exec
|
||||||
|
key: macos
|
||||||
|
interval: "30s"
|
||||||
|
timeout: "10s"
|
||||||
|
exec: "./bin/agent/collectors/macos"
|
||||||
|
```
|
||||||
|
|
||||||
|
|
30
app/docs/deploy.md
Normal file
30
app/docs/deploy.md
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
### Деплой
|
||||||
|
|
||||||
|
Docker Compose (stdout): см. `docker-compose.yml`.
|
||||||
|
|
||||||
|
Systemd:
|
||||||
|
1. Скопируйте бинарь `bin/agent/agent` и `bin/agent/config.yaml` на сервер.
|
||||||
|
2. Установите юнит `sensusagent.service` (обновите путь при необходимости):
|
||||||
|
```ini
|
||||||
|
[Unit]
|
||||||
|
Description=SensusAgent metrics collector
|
||||||
|
After=network.target
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=simple
|
||||||
|
Environment=CONFIG_PATH=/bin/agent/config.yaml
|
||||||
|
ExecStart=/bin/agent/agent --mode kafka
|
||||||
|
Restart=on-failure
|
||||||
|
RestartSec=3
|
||||||
|
User=nobody
|
||||||
|
Group=nogroup
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
||||||
|
```
|
||||||
|
3. `systemctl daemon-reload && systemctl enable --now sensusagent`.
|
||||||
|
|
||||||
|
Kafka:
|
||||||
|
- Настройте блок `kafka` в `config.yaml` (brokers/topic/timeout…).
|
||||||
|
|
||||||
|
|
17
app/docs/overview.md
Normal file
17
app/docs/overview.md
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
### Обзор
|
||||||
|
|
||||||
|
SensusAgent — агент, выполняющий внешние коллектора из каталога `bin/agent/collectors` согласно `config.yaml` и объединяющий их JSON-вывод в один документ.
|
||||||
|
|
||||||
|
- Плагинная архитектура: добавьте исполняемый файл — опишите его в конфиге — агент начнёт его опрашивать.
|
||||||
|
- Два вывода: stdout (standalone) и Kafka (systemd/production).
|
||||||
|
- Высокая устойчивость: при ошибках коллектора агент возвращает пустой блок и продолжает работу, не падая.
|
||||||
|
|
||||||
|
Основные компоненты:
|
||||||
|
- `src/core/config` — конфиг.
|
||||||
|
- `src/core/collector` — интерфейсы, реестр.
|
||||||
|
- `src/core/runner` — планировщик/агрегация.
|
||||||
|
- `src/core/output` — stdout/Kafka.
|
||||||
|
- `src/core/execcollectors` — поддержка exec/execdir в ядре.
|
||||||
|
- `src/collectors/*` — исходники Go-коллекторов (собираются в `bin/agent/collectors`).
|
||||||
|
|
||||||
|
|
8
bin/agent/collectors/sample.sh
Normal file
8
bin/agent/collectors/sample.sh
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
#!/usr/bin/env sh
|
||||||
|
# Автор: Сергей Антропов, сайт: https://devops.org.ru
|
||||||
|
# Пример внешнего коллектора: выводит JSON с фиктивными данными.
|
||||||
|
|
||||||
|
set -eu
|
||||||
|
echo '{"sample": {"ok": true, "ts": '"$(date +%s)"'}}'
|
||||||
|
|
||||||
|
|
38
bin/agent/config.yaml
Normal file
38
bin/agent/config.yaml
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
# Автор: Сергей Антропов, сайт: https://devops.org.ru
|
||||||
|
# Общая конфигурация агента SensusAgent
|
||||||
|
|
||||||
|
mode: auto # stdout | kafka | auto
|
||||||
|
log_level: info
|
||||||
|
|
||||||
|
kafka:
|
||||||
|
enabled: false
|
||||||
|
brokers: ["kafka:9092"]
|
||||||
|
topic: "sensus.metrics"
|
||||||
|
client_id: "sensusagent"
|
||||||
|
enable_tls: false
|
||||||
|
timeout: "5s"
|
||||||
|
|
||||||
|
collectors:
|
||||||
|
uptime:
|
||||||
|
enabled: true
|
||||||
|
type: exec
|
||||||
|
key: uptime
|
||||||
|
interval: "10s"
|
||||||
|
timeout: "5s"
|
||||||
|
exec: "./bin/agent/collectors/uptime"
|
||||||
|
macos:
|
||||||
|
enabled: true
|
||||||
|
type: exec
|
||||||
|
key: macos
|
||||||
|
interval: "30s"
|
||||||
|
timeout: "10s"
|
||||||
|
exec: "./bin/agent/collectors/macos"
|
||||||
|
sample:
|
||||||
|
enabled: true
|
||||||
|
type: exec
|
||||||
|
key: sample
|
||||||
|
interval: "30s"
|
||||||
|
timeout: "5s"
|
||||||
|
exec: "./bin/agent/collectors/sample.sh"
|
||||||
|
|
||||||
|
|
38
docker-compose.yml
Normal file
38
docker-compose.yml
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
version: "3.9"
|
||||||
|
services:
|
||||||
|
agent:
|
||||||
|
build: .
|
||||||
|
image: sensusagent:dev
|
||||||
|
container_name: sensusagent
|
||||||
|
environment:
|
||||||
|
- CONFIG_PATH=/bin/agent/config.yaml
|
||||||
|
- LOG_LEVEL=debug
|
||||||
|
volumes:
|
||||||
|
- ./bin/agent:/bin/agent:ro
|
||||||
|
entrypoint: ["/bin/agent/sensusagent", "--mode", "stdout"]
|
||||||
|
deploy:
|
||||||
|
resources:
|
||||||
|
limits:
|
||||||
|
cpus: '0.50'
|
||||||
|
memory: 256M
|
||||||
|
|
||||||
|
kafka:
|
||||||
|
image: bitnami/kafka:3.7
|
||||||
|
ports:
|
||||||
|
- "9092:9092"
|
||||||
|
environment:
|
||||||
|
- KAFKA_ENABLE_KRAFT=yes
|
||||||
|
- KAFKA_CFG_PROCESS_ROLES=broker,controller
|
||||||
|
- KAFKA_CFG_NODE_ID=1
|
||||||
|
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
|
||||||
|
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
|
||||||
|
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
|
||||||
|
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
|
||||||
|
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
|
||||||
|
- KAFKA_CFG_MESSAGE_MAX_BYTES=20000000
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "bash", "-c", "/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list | cat"]
|
||||||
|
interval: 10s
|
||||||
|
timeout: 5s
|
||||||
|
retries: 10
|
||||||
|
|
14
go.mod
Normal file
14
go.mod
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
module sensusagent
|
||||||
|
|
||||||
|
go 1.22
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/joho/godotenv v1.5.1
|
||||||
|
github.com/segmentio/kafka-go v0.4.46
|
||||||
|
gopkg.in/yaml.v3 v3.0.1
|
||||||
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/klauspost/compress v1.15.9 // indirect
|
||||||
|
github.com/pierrec/lz4/v4 v4.1.15 // indirect
|
||||||
|
)
|
17
sensusagent.service
Normal file
17
sensusagent.service
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
[Unit]
|
||||||
|
Description=SensusAgent metrics collector
|
||||||
|
After=network.target
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=simple
|
||||||
|
Environment=CONFIG_PATH=/bin/agent/config.yaml
|
||||||
|
ExecStart=/bin/agent/agent --mode kafka
|
||||||
|
Restart=on-failure
|
||||||
|
RestartSec=3
|
||||||
|
User=nobody
|
||||||
|
Group=nogroup
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
||||||
|
|
||||||
|
|
298
src/collectors/macos/macos_darwin.go
Normal file
298
src/collectors/macos/macos_darwin.go
Normal file
@ -0,0 +1,298 @@
|
|||||||
|
//go:build darwin
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
execpkg "os/exec"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
func collectInfo(ctx context.Context) (map[string]any, error) {
|
||||||
|
// CPU
|
||||||
|
cores := toIntSafe(sysctlTrim(ctx, "hw.ncpu"))
|
||||||
|
cpuUsage := cpuUsagePercent(ctx)
|
||||||
|
|
||||||
|
// RAM
|
||||||
|
totalMem := toInt64Safe(sysctlTrim(ctx, "hw.memsize"))
|
||||||
|
usedMem := memUsedBytes(ctx, totalMem)
|
||||||
|
ram := map[string]any{
|
||||||
|
"total_bytes": totalMem,
|
||||||
|
"used_bytes": usedMem,
|
||||||
|
"used_percent": percent(usedMem, totalMem),
|
||||||
|
}
|
||||||
|
|
||||||
|
// LOAD AVG
|
||||||
|
l1, l5, l15 := loadAvg(ctx)
|
||||||
|
|
||||||
|
// DISKS usage per mount
|
||||||
|
disks, totalDisk, usedDisk := disksByMount(ctx)
|
||||||
|
|
||||||
|
// NVMe/SSD/HDD counts via system_profiler
|
||||||
|
nvmeCount := countNVMe(ctx)
|
||||||
|
ssdCount, hddCount := countSataTypes(ctx)
|
||||||
|
|
||||||
|
// GPU info with VRAM totals and attempt to get usage via ioreg
|
||||||
|
gpus, gpuCount := gpuInfo(ctx)
|
||||||
|
|
||||||
|
result := map[string]any{
|
||||||
|
"cpu": map[string]any{
|
||||||
|
"cores": cores,
|
||||||
|
"usage_percent": cpuUsage,
|
||||||
|
},
|
||||||
|
"ram": ram,
|
||||||
|
"disks": map[string]any{
|
||||||
|
"by_mount": disks,
|
||||||
|
"total_bytes": totalDisk,
|
||||||
|
"used_bytes": usedDisk,
|
||||||
|
"nvme": nvmeCount,
|
||||||
|
"ssd": ssdCount,
|
||||||
|
"hdd": hddCount,
|
||||||
|
},
|
||||||
|
"gpu": map[string]any{
|
||||||
|
"count": gpuCount,
|
||||||
|
"devices": gpus,
|
||||||
|
},
|
||||||
|
"load": map[string]any{
|
||||||
|
"1m": l1,
|
||||||
|
"5m": l5,
|
||||||
|
"15m": l15,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func runJSONSafe(ctx context.Context, name string, args ...string) (string, error) {
|
||||||
|
out, err := execpkg.CommandContext(ctx, name, args...).Output()
|
||||||
|
if err != nil { return "", err }
|
||||||
|
b := out
|
||||||
|
for len(b) > 0 && (b[len(b)-1] == '\n' || b[len(b)-1] == '\r' || b[len(b)-1] == ' ') { b = b[:len(b)-1] }
|
||||||
|
return string(b), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func sysctlTrim(ctx context.Context, key string) string {
|
||||||
|
s, _ := runJSONSafe(ctx, "/usr/sbin/sysctl", "-n", key)
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func cpuUsagePercent(ctx context.Context) float64 {
|
||||||
|
// top -l 1 -n 0 -> line: "CPU usage: 6.75% user, 5.03% sys, 88.20% idle"
|
||||||
|
out, err := execpkg.CommandContext(ctx, "/usr/bin/top", "-l", "1", "-n", "0").Output()
|
||||||
|
if err != nil { return 0 }
|
||||||
|
lines := strings.Split(string(out), "\n")
|
||||||
|
for _, ln := range lines {
|
||||||
|
if strings.Contains(ln, "CPU usage:") {
|
||||||
|
// extract numbers before "% user" and "% sys"
|
||||||
|
parts := strings.Split(ln, ",")
|
||||||
|
var user, sys float64
|
||||||
|
for _, p := range parts {
|
||||||
|
p = strings.TrimSpace(p)
|
||||||
|
if strings.Contains(p, "user") {
|
||||||
|
fmt.Sscanf(p, "CPU usage: %f%% user", &user)
|
||||||
|
} else if strings.Contains(p, "sys") {
|
||||||
|
fmt.Sscanf(p, "%f%% sys", &sys)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return user + sys
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func memUsedBytes(ctx context.Context, total int64) int64 {
|
||||||
|
// vm_stat + pagesize
|
||||||
|
out, err := execpkg.CommandContext(ctx, "/usr/bin/vm_stat").Output()
|
||||||
|
if err != nil { return 0 }
|
||||||
|
psStr := sysctlTrim(ctx, "hw.pagesize")
|
||||||
|
pageSize := toInt64Safe(psStr)
|
||||||
|
if pageSize <= 0 { pageSize = 4096 }
|
||||||
|
var freePages int64
|
||||||
|
lines := strings.Split(string(out), "\n")
|
||||||
|
for _, ln := range lines {
|
||||||
|
if strings.HasPrefix(strings.TrimSpace(ln), "Pages free:") {
|
||||||
|
// format: Pages free: 12345.
|
||||||
|
ln = strings.TrimSuffix(ln, ".")
|
||||||
|
fields := strings.Fields(ln)
|
||||||
|
if len(fields) > 2 {
|
||||||
|
freePages = toInt64Safe(fields[len(fields)-1])
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
free := freePages * pageSize
|
||||||
|
if total > 0 && total > free { return total - free }
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadAvg(ctx context.Context) (float64, float64, float64) {
|
||||||
|
out, err := execpkg.CommandContext(ctx, "/usr/sbin/sysctl", "-n", "vm.loadavg").Output()
|
||||||
|
if err != nil { return 0, 0, 0 }
|
||||||
|
s := string(out)
|
||||||
|
// format: { 1.21 1.34 1.56 }
|
||||||
|
s = strings.TrimSpace(strings.Trim(s, "{}"))
|
||||||
|
fields := strings.Fields(s)
|
||||||
|
if len(fields) >= 3 {
|
||||||
|
return toFloat(fields[0]), toFloat(fields[1]), toFloat(fields[2])
|
||||||
|
}
|
||||||
|
return 0, 0, 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func disksByMount(ctx context.Context) ([]map[string]any, int64, int64) {
|
||||||
|
out, err := execpkg.CommandContext(ctx, "/bin/df", "-kP").Output()
|
||||||
|
if err != nil { return nil, 0, 0 }
|
||||||
|
lines := strings.Split(string(out), "\n")
|
||||||
|
if len(lines) <= 1 { return nil, 0, 0 }
|
||||||
|
var res []map[string]any
|
||||||
|
var total, used int64
|
||||||
|
for i := 1; i < len(lines); i++ {
|
||||||
|
ln := strings.TrimSpace(lines[i])
|
||||||
|
if ln == "" { continue }
|
||||||
|
fields := strings.Fields(ln)
|
||||||
|
if len(fields) < 6 { continue }
|
||||||
|
fs := fields[0]
|
||||||
|
// size used avail capacity mountpoint
|
||||||
|
sz := toInt64Safe(fields[1]) * 1024
|
||||||
|
us := toInt64Safe(fields[2]) * 1024
|
||||||
|
mp := fields[5]
|
||||||
|
// Исключаем docker-файловые системы/маунты
|
||||||
|
lfs, lmp := strings.ToLower(fs), strings.ToLower(mp)
|
||||||
|
if strings.Contains(lfs, "docker") || strings.Contains(lmp, "docker") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if strings.HasPrefix(fs, "/dev/") {
|
||||||
|
res = append(res, map[string]any{
|
||||||
|
"filesystem": fs,
|
||||||
|
"mountpoint": mp,
|
||||||
|
"size_bytes": sz,
|
||||||
|
"used_bytes": us,
|
||||||
|
"free_bytes": (toInt64Safe(fields[3]) * 1024),
|
||||||
|
"used_percent": percent(us, sz),
|
||||||
|
})
|
||||||
|
total += sz
|
||||||
|
used += us
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res, total, used
|
||||||
|
}
|
||||||
|
|
||||||
|
func countNVMe(ctx context.Context) int {
|
||||||
|
out, err := execpkg.CommandContext(ctx, "/usr/sbin/system_profiler", "-json", "SPNVMeDataType").Output()
|
||||||
|
if err != nil { return 0 }
|
||||||
|
var mp map[string]any
|
||||||
|
if json.Unmarshal(out, &mp) != nil { return 0 }
|
||||||
|
if arr, ok := mp["SPNVMeDataType"].([]any); ok { return len(arr) }
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func countSataTypes(ctx context.Context) (int, int) {
|
||||||
|
out, err := execpkg.CommandContext(ctx, "/usr/sbin/system_profiler", "-json", "SPSerialATADataType").Output()
|
||||||
|
if err != nil { return 0, 0 }
|
||||||
|
var mp map[string]any
|
||||||
|
if json.Unmarshal(out, &mp) != nil { return 0, 0 }
|
||||||
|
ssd, hdd := 0, 0
|
||||||
|
// Heuristic scan for "Solid State" vs others in nested objects
|
||||||
|
b, _ := json.Marshal(mp)
|
||||||
|
s := string(b)
|
||||||
|
if strings.Contains(strings.ToLower(s), "solid state") { ssd++ }
|
||||||
|
if ssd == 0 && strings.Contains(strings.ToLower(s), "rotational") { hdd++ }
|
||||||
|
return ssd, hdd
|
||||||
|
}
|
||||||
|
|
||||||
|
func gpuInfo(ctx context.Context) ([]map[string]any, int) {
|
||||||
|
out, err := execpkg.CommandContext(ctx, "/usr/sbin/system_profiler", "-json", "SPDisplaysDataType").Output()
|
||||||
|
if err != nil { return nil, 0 }
|
||||||
|
var mp map[string]any
|
||||||
|
if json.Unmarshal(out, &mp) != nil { return nil, 0 }
|
||||||
|
var devices []map[string]any
|
||||||
|
if arr, ok := mp["SPDisplaysDataType"].([]any); ok {
|
||||||
|
for _, it := range arr {
|
||||||
|
if m, ok := it.(map[string]any); ok {
|
||||||
|
name := ""
|
||||||
|
if v, ok := m["_name"].(string); ok { name = v }
|
||||||
|
// VRAM может приходить как VRAM (Total): "8 GB"
|
||||||
|
var vramBytes int64
|
||||||
|
for k, vv := range m {
|
||||||
|
if strings.HasPrefix(strings.ToLower(k), "vram") {
|
||||||
|
if s, ok := vv.(string); ok { vramBytes = parseSizeToBytes(s) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
used := gpuMemUsedBytes(ctx)
|
||||||
|
devices = append(devices, map[string]any{
|
||||||
|
"model": name,
|
||||||
|
"vram_bytes": vramBytes,
|
||||||
|
"used_bytes": used,
|
||||||
|
"used_percent": percent(used, vramBytes),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return devices, len(devices)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseSizeToBytes(s string) int64 {
|
||||||
|
// Пример: "8 GB", "1536 MB"
|
||||||
|
s = strings.TrimSpace(strings.ToUpper(s))
|
||||||
|
parts := strings.Fields(s)
|
||||||
|
if len(parts) < 2 { return 0 }
|
||||||
|
val := toFloat(parts[0])
|
||||||
|
unit := parts[1]
|
||||||
|
switch unit {
|
||||||
|
case "TB":
|
||||||
|
return int64(val * 1024 * 1024 * 1024 * 1024)
|
||||||
|
case "GB":
|
||||||
|
return int64(val * 1024 * 1024 * 1024)
|
||||||
|
case "MB":
|
||||||
|
return int64(val * 1024 * 1024)
|
||||||
|
case "KB":
|
||||||
|
return int64(val * 1024)
|
||||||
|
default:
|
||||||
|
return int64(val)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func gpuMemUsedBytes(ctx context.Context) int64 {
|
||||||
|
// На macOS общедоступных CLI для точного VRAM usage нет.
|
||||||
|
// Попытаемся через ioreg получить IOFBMemoryUsage (может быть недоступно). Если не получится — 0.
|
||||||
|
out, err := execpkg.CommandContext(ctx, "/usr/sbin/ioreg", "-l").Output()
|
||||||
|
if err != nil { return 0 }
|
||||||
|
// Ищем строки вида: "IOFBMemoryUsage" = 12345678
|
||||||
|
var used int64
|
||||||
|
lines := strings.Split(string(out), "\n")
|
||||||
|
for _, ln := range lines {
|
||||||
|
if strings.Contains(ln, "IOFBMemoryUsage") {
|
||||||
|
var v int64
|
||||||
|
if _, err := fmt.Sscanf(ln, "%*s = %d", &v); err == nil {
|
||||||
|
if v > used { used = v }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return used
|
||||||
|
}
|
||||||
|
|
||||||
|
func toIntSafe(s string) int {
|
||||||
|
i, _ := strconv.Atoi(strings.TrimSpace(s))
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
func toInt64Safe(s string) int64 {
|
||||||
|
s = strings.TrimSpace(s)
|
||||||
|
if s == "" { return 0 }
|
||||||
|
// try int
|
||||||
|
if i, err := strconv.ParseInt(s, 10, 64); err == nil { return i }
|
||||||
|
// sometimes with trailing chars
|
||||||
|
var i64 int64
|
||||||
|
fmt.Sscanf(s, "%d", &i64)
|
||||||
|
return i64
|
||||||
|
}
|
||||||
|
func toFloat(s string) float64 {
|
||||||
|
f, _ := strconv.ParseFloat(strings.TrimSpace(s), 64)
|
||||||
|
return f
|
||||||
|
}
|
||||||
|
func percent(part int64, total int64) float64 {
|
||||||
|
if total <= 0 { return 0 }
|
||||||
|
return (float64(part) / float64(total)) * 100.0
|
||||||
|
}
|
||||||
|
|
||||||
|
|
12
src/collectors/macos/macos_unsupported.go
Normal file
12
src/collectors/macos/macos_unsupported.go
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
//go:build !darwin
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
func collectInfo(ctx context.Context) (map[string]any, error) { return nil, fmt.Errorf("unsupported OS") }
|
||||||
|
|
||||||
|
|
40
src/collectors/macos/main.go
Normal file
40
src/collectors/macos/main.go
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
// Автор: Сергей Антропов, сайт: https://devops.org.ru
|
||||||
|
// Коллектор macOS hardware info на Go: sysctl + system_profiler в JSON.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// collectInfo реализуется в файлах с билд-тегами.
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
timeout := parseDurationOr("COLLECTOR_TIMEOUT", 8*time.Second)
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
data, err := collectInfo(ctx)
|
||||||
|
if err != nil || data == nil {
|
||||||
|
fmt.Println("{}")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
enc := json.NewEncoder(os.Stdout)
|
||||||
|
enc.SetEscapeHTML(false)
|
||||||
|
_ = enc.Encode(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseDurationOr(env string, def time.Duration) time.Duration {
|
||||||
|
v := strings.TrimSpace(os.Getenv(env))
|
||||||
|
if v == "" { return def }
|
||||||
|
d, err := time.ParseDuration(v)
|
||||||
|
if err != nil { return def }
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
|
||||||
|
|
63
src/collectors/uptime/main.go
Normal file
63
src/collectors/uptime/main.go
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
// Автор: Сергей Антропов, сайт: https://devops.org.ru
|
||||||
|
// Коллектор uptime на Go. Печатает JSON c полями seconds и human.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// collectUptime реализуется в файлах с билд-тегами под конкретные ОС.
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
timeout := parseDurationOr("COLLECTOR_TIMEOUT", 5*time.Second)
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
secs, err := collectUptime(ctx)
|
||||||
|
if err != nil || secs < 0 {
|
||||||
|
fmt.Println("{}")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
out := map[string]any{
|
||||||
|
"seconds": secs,
|
||||||
|
"human": humanize(time.Duration(secs) * time.Second),
|
||||||
|
}
|
||||||
|
enc := json.NewEncoder(os.Stdout)
|
||||||
|
enc.SetEscapeHTML(false)
|
||||||
|
_ = enc.Encode(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
func humanize(d time.Duration) string {
|
||||||
|
total := int64(d.Seconds())
|
||||||
|
if total < 0 { total = 0 }
|
||||||
|
days := total / 86400
|
||||||
|
hours := (total % 86400) / 3600
|
||||||
|
mins := (total % 3600) / 60
|
||||||
|
secs := total % 60
|
||||||
|
parts := []string{}
|
||||||
|
if days > 0 { parts = append(parts, fmt.Sprintf("%dd", days)) }
|
||||||
|
if hours > 0 { parts = append(parts, fmt.Sprintf("%dh", hours)) }
|
||||||
|
if mins > 0 { parts = append(parts, fmt.Sprintf("%dm", mins)) }
|
||||||
|
parts = append(parts, fmt.Sprintf("%ds", secs))
|
||||||
|
return strings.Join(parts, " ")
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseDurationOr(env string, def time.Duration) time.Duration {
|
||||||
|
v := strings.TrimSpace(os.Getenv(env))
|
||||||
|
if v == "" {
|
||||||
|
return def
|
||||||
|
}
|
||||||
|
d, err := time.ParseDuration(v)
|
||||||
|
if err != nil {
|
||||||
|
return def
|
||||||
|
}
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
|
||||||
|
|
22
src/collectors/uptime/uptime_darwin.go
Normal file
22
src/collectors/uptime/uptime_darwin.go
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
//go:build darwin
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
execpkg "os/exec"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func collectUptime(ctx context.Context) (int64, error) {
|
||||||
|
b, err := execpkg.CommandContext(ctx, "/usr/sbin/sysctl", "-n", "kern.boottime").Output()
|
||||||
|
if err != nil { return 0, err }
|
||||||
|
var sec int64
|
||||||
|
_, err = fmt.Sscanf(string(b), "{ sec = %d", &sec)
|
||||||
|
if err != nil { return 0, err }
|
||||||
|
now := time.Now().Unix()
|
||||||
|
return now - sec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
|
23
src/collectors/uptime/uptime_linux.go
Normal file
23
src/collectors/uptime/uptime_linux.go
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
//go:build linux
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
func collectUptime(ctx context.Context) (int64, error) {
|
||||||
|
b, err := os.ReadFile("/proc/uptime")
|
||||||
|
if err != nil { return 0, err }
|
||||||
|
fields := strings.Fields(strings.TrimSpace(string(b)))
|
||||||
|
if len(fields) == 0 { return 0, fmt.Errorf("no fields") }
|
||||||
|
var f float64
|
||||||
|
_, err = fmt.Sscanf(fields[0], "%f", &f)
|
||||||
|
if err != nil { return 0, err }
|
||||||
|
return int64(f), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
|
12
src/collectors/uptime/uptime_unsupported.go
Normal file
12
src/collectors/uptime/uptime_unsupported.go
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
//go:build !linux && !darwin
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
func collectUptime(ctx context.Context) (int64, error) { return 0, fmt.Errorf("unsupported OS") }
|
||||||
|
|
||||||
|
|
67
src/core/collector/registry.go
Normal file
67
src/core/collector/registry.go
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
package collector
|
||||||
|
|
||||||
|
// Автор: Сергей Антропов, сайт: https://devops.org.ru
|
||||||
|
// Назначение: Реестр коллекторов и базовые интерфейсы.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"sensusagent/src/core/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Result — универсальное представление данных от коллектора.
|
||||||
|
type Result map[string]any
|
||||||
|
|
||||||
|
// Collector — интерфейс любого коллектора метрик.
|
||||||
|
// Регистрация производится через init() с помощью RegisterFactory.
|
||||||
|
type Collector interface {
|
||||||
|
Name() string
|
||||||
|
Key() string
|
||||||
|
Interval() time.Duration
|
||||||
|
Enabled() bool
|
||||||
|
Collect(ctx context.Context) (Result, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Factory создает коллектор из его конфигурации.
|
||||||
|
type Factory func(name string, cfg config.CollectorConfig) (Collector, error)
|
||||||
|
|
||||||
|
var (
|
||||||
|
registryMu sync.RWMutex
|
||||||
|
registry = map[string]Factory{}
|
||||||
|
)
|
||||||
|
|
||||||
|
// RegisterFactory регистрирует фабрику коллектора по его типу (cfg.Type).
|
||||||
|
func RegisterFactory(collectorType string, f Factory) {
|
||||||
|
registryMu.Lock()
|
||||||
|
defer registryMu.Unlock()
|
||||||
|
registry[collectorType] = f
|
||||||
|
}
|
||||||
|
|
||||||
|
// BuildCollectors создает экземпляры коллекторов на основании конфигурации.
|
||||||
|
func BuildCollectors(cfgs map[string]config.CollectorConfig) ([]Collector, error) {
|
||||||
|
registryMu.RLock()
|
||||||
|
defer registryMu.RUnlock()
|
||||||
|
|
||||||
|
collectors := make([]Collector, 0, len(cfgs))
|
||||||
|
for name, c := range cfgs {
|
||||||
|
f, ok := registry[c.Type]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("неизвестный тип коллектора '%s' для '%s'", c.Type, name)
|
||||||
|
}
|
||||||
|
inst, err := f(name, c)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
collectors = append(collectors, inst)
|
||||||
|
}
|
||||||
|
return collectors, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrNoData возвращается, если коллектор не смог получить данные (для пустого блока).
|
||||||
|
var ErrNoData = errors.New("no data")
|
||||||
|
|
||||||
|
|
127
src/core/config/config.go
Normal file
127
src/core/config/config.go
Normal file
@ -0,0 +1,127 @@
|
|||||||
|
package config
|
||||||
|
|
||||||
|
// Автор: Сергей Антропов, сайт: https://devops.org.ru
|
||||||
|
// Назначение: Загрузка и валидация конфигурации агента SensusAgent.
|
||||||
|
// Комментарии на русском языке. Подробное логирование предусмотрено на уровне вызывающего кода.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io/fs"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
yaml "gopkg.in/yaml.v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
// KafkaConfig описывает настройки подключения к Kafka.
|
||||||
|
type KafkaConfig struct {
|
||||||
|
Enabled bool `yaml:"enabled"`
|
||||||
|
Brokers []string `yaml:"brokers"`
|
||||||
|
Topic string `yaml:"topic"`
|
||||||
|
ClientID string `yaml:"client_id"`
|
||||||
|
SASLUser string `yaml:"sasl_user"`
|
||||||
|
SASLPass string `yaml:"sasl_pass"`
|
||||||
|
EnableTLS bool `yaml:"enable_tls"`
|
||||||
|
Timeout string `yaml:"timeout"` // человекочитаемый интервал, например "5s"
|
||||||
|
}
|
||||||
|
|
||||||
|
// CollectorConfig описывает конфигурацию конкретного коллектора.
|
||||||
|
type CollectorConfig struct {
|
||||||
|
Enabled bool `yaml:"enabled"`
|
||||||
|
Type string `yaml:"type"` // builtin | exec
|
||||||
|
Key string `yaml:"key"` // ключ JSON, под которым будет возвращаться блок метрик
|
||||||
|
Interval string `yaml:"interval"` // человекочитаемый интервал, например "10s"
|
||||||
|
Exec string `yaml:"exec"` // путь/команда для внешнего коллектора
|
||||||
|
Timeout string `yaml:"timeout"` // таймаут выполнения
|
||||||
|
Extra map[string]any `yaml:"extra"` // произвольные параметры коллектора
|
||||||
|
}
|
||||||
|
|
||||||
|
// AgentConfig — корневая конфигурация агента.
|
||||||
|
type AgentConfig struct {
|
||||||
|
Mode string `yaml:"mode"` // stdout | kafka | auto
|
||||||
|
LogLevel string `yaml:"log_level"` // debug | info | warn | error
|
||||||
|
Kafka KafkaConfig `yaml:"kafka"`
|
||||||
|
Collectors map[string]CollectorConfig `yaml:"collectors"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load загружает основную конфигурацию из одного файла config.yaml.
|
||||||
|
func Load(configPath string) (*AgentConfig, error) {
|
||||||
|
cfg := &AgentConfig{
|
||||||
|
Mode: "auto",
|
||||||
|
LogLevel: "info",
|
||||||
|
Kafka: KafkaConfig{
|
||||||
|
Enabled: false,
|
||||||
|
Brokers: nil,
|
||||||
|
Topic: "sensus.metrics",
|
||||||
|
ClientID: "sensusagent",
|
||||||
|
EnableTLS: false,
|
||||||
|
Timeout: "5s",
|
||||||
|
},
|
||||||
|
Collectors: map[string]CollectorConfig{},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := readYAMLFileIfExists(configPath, cfg); err != nil {
|
||||||
|
return nil, fmt.Errorf("чтение config.yaml: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Нормализуем и валидируем интервалы
|
||||||
|
for name, c := range cfg.Collectors {
|
||||||
|
if strings.TrimSpace(c.Key) == "" {
|
||||||
|
c.Key = name
|
||||||
|
}
|
||||||
|
if strings.TrimSpace(c.Type) == "" {
|
||||||
|
c.Type = "builtin"
|
||||||
|
}
|
||||||
|
if strings.TrimSpace(c.Interval) == "" {
|
||||||
|
c.Interval = "15s"
|
||||||
|
} else if _, err := time.ParseDuration(c.Interval); err != nil {
|
||||||
|
return nil, fmt.Errorf("collector %s: некорректный interval '%s': %w", name, c.Interval, err)
|
||||||
|
}
|
||||||
|
if c.Timeout != "" {
|
||||||
|
if _, err := time.ParseDuration(c.Timeout); err != nil {
|
||||||
|
return nil, fmt.Errorf("collector %s: некорректный timeout '%s': %w", name, c.Timeout, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cfg.Collectors[name] = c
|
||||||
|
}
|
||||||
|
|
||||||
|
return cfg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// readYAMLFileIfExists читает YAML в переданную структуру, если файл существует.
|
||||||
|
func readYAMLFileIfExists(path string, out any) error {
|
||||||
|
if strings.TrimSpace(path) == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
b, err := os.ReadFile(filepath.Clean(path))
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, fs.ErrNotExist) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(b) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := yaml.Unmarshal(b, out); err != nil {
|
||||||
|
return fmt.Errorf("yaml unmarshal: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MustParseDuration безопасно парсит duration с дефолтом.
|
||||||
|
func MustParseDuration(s string, def time.Duration) time.Duration {
|
||||||
|
if strings.TrimSpace(s) == "" {
|
||||||
|
return def
|
||||||
|
}
|
||||||
|
d, err := time.ParseDuration(s)
|
||||||
|
if err != nil {
|
||||||
|
return def
|
||||||
|
}
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
|
||||||
|
|
71
src/core/execcollectors/exec.go
Normal file
71
src/core/execcollectors/exec.go
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
package execcollectors
|
||||||
|
|
||||||
|
// Автор: Сергей Антропов, сайт: https://devops.org.ru
|
||||||
|
// Назначение: Коллектор, запускающий внешнюю команду/скрипт и парсящий stdout как JSON.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
osExec "os/exec"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"sensusagent/src/core/collector"
|
||||||
|
"sensusagent/src/core/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
const collectorType = "exec"
|
||||||
|
|
||||||
|
type execCollector struct {
|
||||||
|
name string
|
||||||
|
key string
|
||||||
|
interval time.Duration
|
||||||
|
enabled bool
|
||||||
|
command string
|
||||||
|
timeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() { collector.RegisterFactory(collectorType, newExecCollector) }
|
||||||
|
|
||||||
|
func newExecCollector(name string, cfg config.CollectorConfig) (collector.Collector, error) {
|
||||||
|
interval := config.MustParseDuration(cfg.Interval, 15*time.Second)
|
||||||
|
timeout := config.MustParseDuration(cfg.Timeout, 10*time.Second)
|
||||||
|
if strings.TrimSpace(cfg.Exec) == "" {
|
||||||
|
return nil, errors.New("exec collector: не задана команда exec")
|
||||||
|
}
|
||||||
|
key := cfg.Key
|
||||||
|
if strings.TrimSpace(key) == "" { key = name }
|
||||||
|
return &execCollector{
|
||||||
|
name: name,
|
||||||
|
key: key,
|
||||||
|
interval: interval,
|
||||||
|
enabled: cfg.Enabled,
|
||||||
|
command: cfg.Exec,
|
||||||
|
timeout: timeout,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *execCollector) Name() string { return e.name }
|
||||||
|
func (e *execCollector) Key() string { return e.key }
|
||||||
|
func (e *execCollector) Interval() time.Duration { return e.interval }
|
||||||
|
func (e *execCollector) Enabled() bool { return e.enabled }
|
||||||
|
|
||||||
|
func (e *execCollector) Collect(ctx context.Context) (collector.Result, error) {
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, e.timeout)
|
||||||
|
defer cancel()
|
||||||
|
cmd := osExec.CommandContext(ctx, "sh", "-c", e.command)
|
||||||
|
out, err := cmd.Output()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("exec failed: %w", err)
|
||||||
|
}
|
||||||
|
var data map[string]any
|
||||||
|
if err := json.Unmarshal(out, &data); err != nil {
|
||||||
|
return nil, collector.ErrNoData
|
||||||
|
}
|
||||||
|
if data == nil { data = map[string]any{} }
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
|
92
src/core/execcollectors/execdir.go
Normal file
92
src/core/execcollectors/execdir.go
Normal file
@ -0,0 +1,92 @@
|
|||||||
|
package execcollectors
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"log/slog"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
osExec "os/exec"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"sensusagent/src/core/collector"
|
||||||
|
"sensusagent/src/core/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
const dirType = "execdir"
|
||||||
|
|
||||||
|
type execDirCollector struct {
|
||||||
|
name string
|
||||||
|
key string
|
||||||
|
interval time.Duration
|
||||||
|
enabled bool
|
||||||
|
dir string
|
||||||
|
pattern string
|
||||||
|
timeout time.Duration
|
||||||
|
wrap bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() { collector.RegisterFactory(dirType, newExecDirCollector) }
|
||||||
|
|
||||||
|
func newExecDirCollector(name string, cfg config.CollectorConfig) (collector.Collector, error) {
|
||||||
|
key := cfg.Key
|
||||||
|
if strings.TrimSpace(key) == "" { key = name }
|
||||||
|
dir := "./bin/collectors"
|
||||||
|
pattern := "*"
|
||||||
|
wrap := true
|
||||||
|
if v, ok := cfg.Extra["dir"].(string); ok && strings.TrimSpace(v) != "" { dir = v }
|
||||||
|
if v, ok := cfg.Extra["pattern"].(string); ok && strings.TrimSpace(v) != "" { pattern = v }
|
||||||
|
if v, ok := cfg.Extra["wrap"].(bool); ok { wrap = v }
|
||||||
|
return &execDirCollector{
|
||||||
|
name: name, key: key, interval: config.MustParseDuration(cfg.Interval, 15*time.Second),
|
||||||
|
enabled: cfg.Enabled, dir: dir, pattern: pattern, timeout: config.MustParseDuration(cfg.Timeout, 10*time.Second), wrap: wrap,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *execDirCollector) Name() string { return e.name }
|
||||||
|
func (e *execDirCollector) Key() string { return e.key }
|
||||||
|
func (e *execDirCollector) Interval() time.Duration { return e.interval }
|
||||||
|
func (e *execDirCollector) Enabled() bool { return e.enabled }
|
||||||
|
|
||||||
|
func (e *execDirCollector) Collect(ctx context.Context) (collector.Result, error) {
|
||||||
|
entries, err := os.ReadDir(e.dir)
|
||||||
|
if err != nil { slog.Warn("execdir: read dir failed", "dir", e.dir, "err", err); return map[string]any{}, nil }
|
||||||
|
candidates := make([]string, 0, len(entries))
|
||||||
|
for _, entry := range entries {
|
||||||
|
if entry.IsDir() { continue }
|
||||||
|
name := entry.Name()
|
||||||
|
match, _ := filepath.Match(e.pattern, name)
|
||||||
|
if !match { continue }
|
||||||
|
path := filepath.Join(e.dir, name)
|
||||||
|
if fi, err := os.Stat(path); err == nil && fi.Mode()&0111 != 0 { candidates = append(candidates, path) }
|
||||||
|
}
|
||||||
|
sort.Strings(candidates)
|
||||||
|
result := map[string]any{}
|
||||||
|
for _, path := range candidates {
|
||||||
|
sub := e.runOne(ctx, path)
|
||||||
|
if e.wrap {
|
||||||
|
key := filepath.Base(path)
|
||||||
|
result[key] = sub
|
||||||
|
} else {
|
||||||
|
if m, ok := sub.(map[string]any); ok {
|
||||||
|
for k, v := range m { result[k] = v }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *execDirCollector) runOne(ctx context.Context, path string) any {
|
||||||
|
cctx, cancel := context.WithTimeout(ctx, e.timeout); defer cancel()
|
||||||
|
cmd := osExec.CommandContext(cctx, path)
|
||||||
|
out, err := cmd.Output()
|
||||||
|
if err != nil { slog.Warn("execdir: run failed", "path", path, "err", err); return map[string]any{} }
|
||||||
|
var data any
|
||||||
|
if err := json.Unmarshal(out, &data); err != nil { slog.Warn("execdir: bad json", "path", path, "err", err); return map[string]any{} }
|
||||||
|
if data == nil { return map[string]any{} }
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
|
||||||
|
|
31
src/core/logging/logging.go
Normal file
31
src/core/logging/logging.go
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
package logging
|
||||||
|
|
||||||
|
// Автор: Сергей Антропов, сайт: https://devops.org.ru
|
||||||
|
// Назначение: Инициализация структурированного логгера (slog) с выводом в stdout.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log/slog"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Setup настраивает глобальный логгер на stdout с нужным уровнем.
|
||||||
|
func Setup(level string) *slog.Logger {
|
||||||
|
lvl := slog.LevelInfo
|
||||||
|
switch strings.ToLower(strings.TrimSpace(level)) {
|
||||||
|
case "debug":
|
||||||
|
lvl = slog.LevelDebug
|
||||||
|
case "warn", "warning":
|
||||||
|
lvl = slog.LevelWarn
|
||||||
|
case "error":
|
||||||
|
lvl = slog.LevelError
|
||||||
|
default:
|
||||||
|
lvl = slog.LevelInfo
|
||||||
|
}
|
||||||
|
handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: lvl})
|
||||||
|
logger := slog.New(handler)
|
||||||
|
slog.SetDefault(logger)
|
||||||
|
return logger
|
||||||
|
}
|
||||||
|
|
||||||
|
|
84
src/core/output/output.go
Normal file
84
src/core/output/output.go
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
package output
|
||||||
|
|
||||||
|
// Автор: Сергей Антропов, сайт: https://devops.org.ru
|
||||||
|
// Назначение: Вывод результатов — stdout и Kafka.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/segmentio/kafka-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Payload — общий JSON для вывода.
|
||||||
|
type Payload map[string]any
|
||||||
|
|
||||||
|
// Output — интерфейс механизма доставки результатов.
|
||||||
|
type Output interface {
|
||||||
|
Write(ctx context.Context, data Payload) error
|
||||||
|
Close(ctx context.Context) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// StdoutOutput — вывод в stdout.
|
||||||
|
type StdoutOutput struct{}
|
||||||
|
|
||||||
|
func (s *StdoutOutput) Write(_ context.Context, data Payload) error {
|
||||||
|
enc := json.NewEncoder(os.Stdout)
|
||||||
|
enc.SetEscapeHTML(false)
|
||||||
|
return enc.Encode(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StdoutOutput) Close(_ context.Context) error { return nil }
|
||||||
|
|
||||||
|
// KafkaOutput — вывод в Kafka.
|
||||||
|
type KafkaOutput struct {
|
||||||
|
writer *kafka.Writer
|
||||||
|
topic string
|
||||||
|
}
|
||||||
|
|
||||||
|
// KafkaOptions описывает параметры подключения к Kafka.
|
||||||
|
type KafkaOptions struct {
|
||||||
|
Brokers []string
|
||||||
|
Topic string
|
||||||
|
ClientID string
|
||||||
|
Timeout time.Duration
|
||||||
|
SASLUser string
|
||||||
|
SASLPass string
|
||||||
|
EnableTLS bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewKafkaOutput создаёт Kafka writer.
|
||||||
|
func NewKafkaOutput(opts KafkaOptions) (*KafkaOutput, error) {
|
||||||
|
if len(opts.Brokers) == 0 || strings.TrimSpace(opts.Topic) == "" {
|
||||||
|
return nil, errors.New("kafka brokers/topic not configured")
|
||||||
|
}
|
||||||
|
w := &kafka.Writer{
|
||||||
|
Addr: kafka.TCP(opts.Brokers...),
|
||||||
|
Topic: opts.Topic,
|
||||||
|
Balancer: &kafka.LeastBytes{},
|
||||||
|
RequiredAcks: kafka.RequireAll,
|
||||||
|
}
|
||||||
|
return &KafkaOutput{writer: w, topic: opts.Topic}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *KafkaOutput) Write(ctx context.Context, data Payload) error {
|
||||||
|
b, err := json.Marshal(data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
msg := kafka.Message{Value: b, Time: time.Now()}
|
||||||
|
return k.writer.WriteMessages(ctx, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *KafkaOutput) Close(ctx context.Context) error {
|
||||||
|
if k.writer != nil {
|
||||||
|
return k.writer.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
|
136
src/core/runner/runner.go
Normal file
136
src/core/runner/runner.go
Normal file
@ -0,0 +1,136 @@
|
|||||||
|
package runner
|
||||||
|
|
||||||
|
// Автор: Сергей Антропов, сайт: https://devops.org.ru
|
||||||
|
// Назначение: Планировщик опроса коллекторов и публикация результатов.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log/slog"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"sensusagent/src/core/collector"
|
||||||
|
"sensusagent/src/core/output"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Runner объединяет коллекторы и вывод.
|
||||||
|
type Runner struct {
|
||||||
|
collectors []collector.Collector
|
||||||
|
out output.Output
|
||||||
|
}
|
||||||
|
|
||||||
|
// New создаёт раннер.
|
||||||
|
func New(collectors []collector.Collector, out output.Output) *Runner {
|
||||||
|
return &Runner{collectors: collectors, out: out}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunOnce собирает метрики один раз со всех коллекторов и публикует общий JSON.
|
||||||
|
func (r *Runner) RunOnce(ctx context.Context) {
|
||||||
|
payload := output.Payload{}
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var mu sync.Mutex
|
||||||
|
|
||||||
|
for _, c := range r.collectors {
|
||||||
|
if !c.Enabled() {
|
||||||
|
// Пустой блок при выключенном коллекторе
|
||||||
|
payload[c.Key()] = map[string]any{}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
go func(c collector.Collector) {
|
||||||
|
defer wg.Done()
|
||||||
|
res, err := c.Collect(ctx)
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("collector error", "name", c.Name(), "err", err)
|
||||||
|
res = map[string]any{}
|
||||||
|
}
|
||||||
|
mu.Lock()
|
||||||
|
payload[c.Key()] = res
|
||||||
|
mu.Unlock()
|
||||||
|
}(c)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
if isAllEmpty(payload) {
|
||||||
|
// Ничего не печатаем, если все блоки пустые
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := r.out.Write(ctx, payload); err != nil {
|
||||||
|
slog.Error("output write failed", "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunContinuous выполняет опрос в цикле с учетом индивидуальных интервалов.
|
||||||
|
func (r *Runner) RunContinuous(ctx context.Context) {
|
||||||
|
// Храним время последнего запуска по коллектору
|
||||||
|
lastRun := make(map[string]time.Time, len(r.collectors))
|
||||||
|
// Минимальный тик для проверки графика
|
||||||
|
ticker := time.NewTicker(1 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
latest := output.Payload{}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case now := <-ticker.C:
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var mu sync.Mutex
|
||||||
|
for _, c := range r.collectors {
|
||||||
|
if !c.Enabled() {
|
||||||
|
mu.Lock()
|
||||||
|
latest[c.Key()] = map[string]any{}
|
||||||
|
mu.Unlock()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
lr := lastRun[c.Name()]
|
||||||
|
if lr.IsZero() || now.Sub(lr) >= c.Interval() {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(c collector.Collector) {
|
||||||
|
defer wg.Done()
|
||||||
|
res, err := c.Collect(ctx)
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("collector error", "name", c.Name(), "err", err)
|
||||||
|
res = map[string]any{}
|
||||||
|
}
|
||||||
|
mu.Lock()
|
||||||
|
latest[c.Key()] = res
|
||||||
|
lastRun[c.Name()] = now
|
||||||
|
mu.Unlock()
|
||||||
|
}(c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
if isAllEmpty(latest) {
|
||||||
|
// Пропускаем вывод, пока нет данных
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := r.out.Write(ctx, latest); err != nil {
|
||||||
|
slog.Error("output write failed", "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// isAllEmpty проверяет, что все значения payload являются пустыми объектами
|
||||||
|
// или nil (что трактуем как пусто), чтобы подавлять бессмысленный вывод {}.
|
||||||
|
func isAllEmpty(p output.Payload) bool {
|
||||||
|
if len(p) == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
for _, v := range p {
|
||||||
|
if m, ok := v.(map[string]any); ok {
|
||||||
|
if len(m) > 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if v != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
|
164
src/main.go
Normal file
164
src/main.go
Normal file
@ -0,0 +1,164 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
// Автор: Сергей Антропов, сайт: https://devops.org.ru
|
||||||
|
// SensusAgent — модульный агент сбора метрик с поддержкой встроенных и внешних коллекторов.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/joho/godotenv"
|
||||||
|
|
||||||
|
"sensusagent/src/core/collector"
|
||||||
|
"sensusagent/src/core/config"
|
||||||
|
"sensusagent/src/core/logging"
|
||||||
|
"sensusagent/src/core/output"
|
||||||
|
"sensusagent/src/core/runner"
|
||||||
|
|
||||||
|
// Регистрируем exec/execdir из ядра
|
||||||
|
_ "sensusagent/src/core/execcollectors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// Загружаем .env если есть (для локальной разработки)
|
||||||
|
_ = godotenv.Load()
|
||||||
|
|
||||||
|
var (
|
||||||
|
configPath = envOr("CONFIG_PATH", "")
|
||||||
|
cfgPathFlag string
|
||||||
|
modeFlag string
|
||||||
|
once bool
|
||||||
|
)
|
||||||
|
|
||||||
|
flag.StringVar(&cfgPathFlag, "config", "", "Путь к config.yaml")
|
||||||
|
flag.StringVar(&modeFlag, "mode", "", "Режим работы: stdout|kafka|auto")
|
||||||
|
flag.BoolVar(&once, "once", false, "Единичный сбор метрик и вывод")
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
if cfgPathFlag != "" { configPath = cfgPathFlag }
|
||||||
|
|
||||||
|
// Разрешаем путь к конфигу: сначала берём из ENV/флага, иначе пытаемся найти локальный ./bin/agent/config.yaml,
|
||||||
|
// затем абсолютный /bin/agent/config.yaml для контейнера.
|
||||||
|
configPath = resolveConfigPath(configPath)
|
||||||
|
|
||||||
|
// Настройка логирования
|
||||||
|
logging.Setup(envOr("LOG_LEVEL", "info"))
|
||||||
|
|
||||||
|
// Загрузка конфигурации
|
||||||
|
cfg, err := config.Load(configPath)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("config load failed", "err", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
if modeFlag != "" {
|
||||||
|
cfg.Mode = modeFlag
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("config loaded", "mode", cfg.Mode, "collectors", len(cfg.Collectors), "config_path", configPath)
|
||||||
|
|
||||||
|
// Построение коллекторов
|
||||||
|
collectors, err := collector.BuildCollectors(cfg.Collectors)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("build collectors failed", "err", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Выбор вывода
|
||||||
|
out, err := selectOutput(cfg)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("select output failed", "err", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
defer func() { _ = out.Close(context.Background()) }()
|
||||||
|
|
||||||
|
r := runner.New(collectors, out)
|
||||||
|
|
||||||
|
// Контекст с обработкой SIGINT/SIGTERM
|
||||||
|
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if once {
|
||||||
|
r.RunOnce(ctx)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
r.RunContinuous(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// selectOutput выбирает способ вывода исходя из режима и окружения.
|
||||||
|
func selectOutput(cfg *config.AgentConfig) (output.Output, error) {
|
||||||
|
mode := strings.ToLower(strings.TrimSpace(cfg.Mode))
|
||||||
|
if mode == "" || mode == "auto" {
|
||||||
|
if underSystemd() && cfg.Kafka.Enabled {
|
||||||
|
mode = "kafka"
|
||||||
|
} else {
|
||||||
|
mode = "stdout"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
switch mode {
|
||||||
|
case "stdout":
|
||||||
|
slog.Info("using stdout output")
|
||||||
|
return &output.StdoutOutput{}, nil
|
||||||
|
case "kafka":
|
||||||
|
slog.Info("using kafka output")
|
||||||
|
to := config.MustParseDuration(cfg.Kafka.Timeout, 5*time.Second)
|
||||||
|
return output.NewKafkaOutput(output.KafkaOptions{
|
||||||
|
Brokers: cfg.Kafka.Brokers,
|
||||||
|
Topic: cfg.Kafka.Topic,
|
||||||
|
ClientID: cfg.Kafka.ClientID,
|
||||||
|
Timeout: to,
|
||||||
|
SASLUser: cfg.Kafka.SASLUser,
|
||||||
|
SASLPass: cfg.Kafka.SASLPass,
|
||||||
|
EnableTLS: cfg.Kafka.EnableTLS,
|
||||||
|
})
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("неизвестный режим: %s", mode)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// underSystemd определяет запуск под systemd по окружению.
|
||||||
|
func underSystemd() bool {
|
||||||
|
if os.Getenv("INVOCATION_ID") != "" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if os.Getenv("JOURNAL_STREAM") != "" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func envOr(k, def string) string {
|
||||||
|
v := strings.TrimSpace(os.Getenv(k))
|
||||||
|
if v == "" {
|
||||||
|
return def
|
||||||
|
}
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
// resolveConfigPath пытается определить корректный путь к config.yaml.
|
||||||
|
func resolveConfigPath(explicit string) string {
|
||||||
|
if strings.TrimSpace(explicit) != "" {
|
||||||
|
return explicit
|
||||||
|
}
|
||||||
|
// 1. Локальный путь разработки
|
||||||
|
local := filepath.Clean("./bin/agent/config.yaml")
|
||||||
|
if _, err := os.Stat(local); err == nil {
|
||||||
|
return local
|
||||||
|
}
|
||||||
|
// 2. Путь внутри контейнера/дистрибутива
|
||||||
|
container := "/bin/agent/config.yaml"
|
||||||
|
if _, err := os.Stat(container); err == nil {
|
||||||
|
return container
|
||||||
|
}
|
||||||
|
// 3. Фоллбек на локальный
|
||||||
|
return local
|
||||||
|
}
|
||||||
|
|
||||||
|
|
Loading…
x
Reference in New Issue
Block a user