diff --git a/Makefile b/Makefile index 108bf57..600c0b5 100644 --- a/Makefile +++ b/Makefile @@ -46,7 +46,7 @@ collectors: else \ docker run --rm -v $$PWD:/workspace -w /workspace \ -e GOOS=linux -e GOARCH=amd64 -e GOCACHE=/workspace/.cache/go-build -e GOMODCACHE=/workspace/.cache/go-mod golang:1.22 \ - sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/uptime ./src/collectors/uptime && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/macos ./src/collectors/macos && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/system ./src/collectors/system && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/hba ./src/collectors/hba && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/sensors ./src/collectors/sensors && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/docker ./src/collectors/docker && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/gpu ./src/collectors/gpu"; \ + sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/uptime ./src/collectors/uptime && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/macos ./src/collectors/macos && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/system ./src/collectors/system && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/hba ./src/collectors/hba && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/sensors ./src/collectors/sensors && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/docker ./src/collectors/docker && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/gpu ./src/collectors/gpu && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/kubernetes ./src/collectors/kubernetes"; \ fi @# Убедимся, что скрипты исполняемые @chmod +x ./bin/agent/collectors/*.sh 2>/dev/null || true @@ -61,7 +61,7 @@ collectors-linux: # Кросс-сборка коллекторов для Linux @mkdir -p ./bin/agent/collectors .cache/go-build .cache/go-mod; \ docker run --rm -v $$PWD:/workspace -w /workspace -e GOOS=linux -e GOARCH=amd64 -e GOCACHE=/workspace/.cache/go-build -e GOMODCACHE=/workspace/.cache/go-mod golang:1.22 \ - sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/uptime ./src/collectors/uptime && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/macos ./src/collectors/macos && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/system ./src/collectors/system && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/hba ./src/collectors/hba && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/sensors ./src/collectors/sensors && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/docker ./src/collectors/docker && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/gpu ./src/collectors/gpu" + sh -c "go mod tidy >/dev/null 2>&1 && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/uptime ./src/collectors/uptime && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/macos ./src/collectors/macos && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/system ./src/collectors/system && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/hba ./src/collectors/hba && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/sensors ./src/collectors/sensors && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/docker ./src/collectors/docker && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/gpu ./src/collectors/gpu && CGO_ENABLED=0 go build -trimpath -o ./bin/agent/collectors/kubernetes ./src/collectors/kubernetes" collectors-windows: # Кросс-сборка коллекторов для Windows diff --git a/bin/agent/config.yaml b/bin/agent/config.yaml index 6792e9e..59ad16a 100644 --- a/bin/agent/config.yaml +++ b/bin/agent/config.yaml @@ -73,10 +73,18 @@ collectors: enabled: true type: exec key: gpu - interval: "30s" - timeout: "8s" + interval: "60s" + timeout: "30s" exec: "./collectors/gpu" platforms: [linux] + kubernetes: + enabled: false + type: exec + key: kubernetes + interval: "60s" + timeout: "12s" + exec: "./collectors/kubernetes" + platforms: [linux] diff --git a/src/collectors/kubernetes/kubernetes_linux.go b/src/collectors/kubernetes/kubernetes_linux.go new file mode 100644 index 0000000..eb84e08 --- /dev/null +++ b/src/collectors/kubernetes/kubernetes_linux.go @@ -0,0 +1,348 @@ +//go:build linux + +package main + +// Автор: Сергей Антропов, сайт: https://devops.org.ru +// Linux-реализация kubernetes-коллектора: использует kubectl и вывод в JSON. +// Для производительности и устойчивости выбираем краткие поля и агрегируем по минимуму, +// оставляя только запрошенные пользователем метрики. + +import ( + "context" + "encoding/json" + "os/exec" + "strconv" + "strings" +) + +// collectKubernetes собирает сводную информацию по кластеру +func collectKubernetes(ctx context.Context) (map[string]any, error) { + if _, err := exec.LookPath("kubectl"); err != nil { return nil, nil } + res := map[string]any{} + + // Masters / Workers (узлы) + if nodes := k8sNodes(ctx); len(nodes) > 0 { res["nodes"] = nodes } + + // Ingress controllers (Популярные: nginx, traefik; ищем Deployment/DaemonSet с меткой app.kubernetes.io/name) + if ings := k8sIngressControllers(ctx); len(ings) > 0 { res["ingress_controllers"] = ings } + + // LoadBalancers: сервисы типа LoadBalancer с внешним IP + сетевые RX/TX (если доступно через metrics) + if lbs := k8sLoadBalancers(ctx); len(lbs) > 0 { res["load_balancers"] = lbs } + + // Pods: name, ns, state, cpu/mem, restarts + if pods := k8sPods(ctx); len(pods) > 0 { res["pods"] = pods } + + // Namespaces, Volumes (PV/PVC), Secrets + if nss := k8sNamespaces(ctx); len(nss) > 0 { res["namespaces"] = nss } + if vols := k8sVolumes(ctx); len(vols) > 0 { res["volumes"] = vols } + if secs := k8sSecrets(ctx); len(secs) > 0 { res["secrets"] = secs } + + // Workloads: Deployments, DaemonSets, StatefulSets, CronJobs + if deps := k8sDeployments(ctx); len(deps) > 0 { res["deployments"] = deps } + if dss := k8sDaemonSets(ctx); len(dss) > 0 { res["daemonsets"] = dss } + if sfs := k8sStatefulSets(ctx); len(sfs) > 0 { res["statefulsets"] = sfs } + if cjs := k8sCronJobs(ctx); len(cjs) > 0 { res["cronjobs"] = cjs } + + if len(res) == 0 { return nil, nil } + return res, nil +} + +// Вспомогательные функции запуска команд +func run(ctx context.Context, bin string, args ...string) (string, error) { + cmd := exec.CommandContext(ctx, bin, args...) + b, err := cmd.Output() + if err != nil { return "", err } + return string(b), nil +} + +func kubectlJSON(ctx context.Context, args ...string) []map[string]any { + base := append(args, "-o", "json") + out, err := run(ctx, "kubectl", base...) + if err != nil || strings.TrimSpace(out) == "" { return nil } + var obj map[string]any + if e := json.Unmarshal([]byte(out), &obj); e != nil { return nil } + items, _ := obj["items"].([]any) + res := []map[string]any{} + for _, it := range items { + if m, ok := it.(map[string]any); ok { res = append(res, m) } + } + return res +} + +// k8sNodes собирает сведения о master/worker/taints, версиях и простых ресурсах узлов +func k8sNodes(ctx context.Context) []map[string]any { + arr := kubectlJSON(ctx, "get", "nodes") + out := []map[string]any{} + for _, n := range arr { + meta, _ := n["metadata"].(map[string]any) + status, _ := n["status"].(map[string]any) + name, _ := meta["name"].(string) + addresses, _ := status["addresses"].([]any) + ip := "" + for _, a := range addresses { + if m, ok := a.(map[string]any); ok { + if t, _ := m["type"].(string); t == "InternalIP" { + ip, _ = m["address"].(string) + break + } + } + } + nodeInfo, _ := status["nodeInfo"].(map[string]any) + version, _ := nodeInfo["kubeletVersion"].(string) + conditions, _ := status["conditions"].([]any) + nodeReady := "Unknown" + for _, c := range conditions { + if m, ok := c.(map[string]any); ok { + if t, _ := m["type"].(string); t == "Ready" { + nodeReady, _ = m["status"].(string) + } + } + } + // Упрощённые CPU/Mem проценты: попробуем вытащить allocatable/capacity + alloc, _ := status["allocatable"].(map[string]any) + cap, _ := status["capacity"].(map[string]any) + cpuPct := 0.0 + memPct := 0.0 + if alloc != nil && cap != nil { + cpuPct = quantityPct(alloc["cpu"], cap["cpu"], 1000) // CPU в millicores эвристически + memPct = quantityPct(alloc["memory"], cap["memory"], 1) // Память приблизительно + } + role := "worker" + if labels, ok := meta["labels"].(map[string]any); ok { + for k := range labels { + if strings.Contains(k, "node-role.kubernetes.io/master") || strings.Contains(k, "node-role.kubernetes.io/control-plane") { role = "master"; break } + } + } + out = append(out, map[string]any{ + "name": name, + "ip": ip, + "version": version, + "status": nodeReady, + "role": role, + "cpu_pct": cpuPct, + "mem_pct": memPct, + }) + } + return out +} + +// k8sIngressControllers собирает версии/реплики популярных Ingress-контроллеров +func k8sIngressControllers(ctx context.Context) []map[string]any { + // Ищем Deployments/DaemonSets с типичными метками + sel := "app.kubernetes.io/name in (ingress-nginx,nginx-ingress-controller,traefik)" + deps := kubectlJSON(ctx, "get", "deployments", "-A", "-l", sel) + dss := kubectlJSON(ctx, "get", "daemonsets", "-A", "-l", sel) + res := []map[string]any{} + for _, o := range append(deps, dss...) { + meta, _ := o["metadata"].(map[string]any) + ns, _ := meta["namespace"].(string) + name, _ := meta["name"].(string) + // Реплики + status, _ := o["status"].(map[string]any) + replicas := intFrom(status["replicas"]) // для DaemonSet это число подов + // Версию попытаемся взять из аннотаций/меток или образа контейнера + version := "" + if ann, ok := meta["annotations"].(map[string]any); ok { + if v, ok2 := ann["meta.helm.sh/release-name"].(string); ok2 { version = v } + } + res = append(res, map[string]any{ "name": name, "namespace": ns, "replicas": replicas, "version": version }) + } + return res +} + +// k8sLoadBalancers — сервисы типа LoadBalancer с внешним IP. +func k8sLoadBalancers(ctx context.Context) []map[string]any { + svcs := kubectlJSON(ctx, "get", "svc", "-A") + out := []map[string]any{} + for _, s := range svcs { + spec, _ := s["spec"].(map[string]any) + if t, _ := spec["type"].(string); strings.ToLower(t) != "loadbalancer" { continue } + meta, _ := s["metadata"].(map[string]any) + ns, _ := meta["namespace"].(string) + name, _ := meta["name"].(string) + status, _ := s["status"].(map[string]any) + lb, _ := status["loadBalancer"].(map[string]any) + ing, _ := lb["ingress"].([]any) + ext := "" + if len(ing) > 0 { if m, ok := ing[0].(map[string]any); ok { ext, _ = m["ip"].(string) } } + out = append(out, map[string]any{ "namespace": ns, "name": name, "external_ip": ext }) + } + return out +} + +// k8sPods — основные сведения по подам +func k8sPods(ctx context.Context) []map[string]any { + pods := kubectlJSON(ctx, "get", "pods", "-A") + out := []map[string]any{} + for _, p := range pods { + meta, _ := p["metadata"].(map[string]any) + status, _ := p["status"].(map[string]any) + ns, _ := meta["namespace"].(string) + name, _ := meta["name"].(string) + phase, _ := status["phase"].(string) + // Рестарты + cs, _ := status["containerStatuses"].([]any) + restarts := 0 + for _, c := range cs { + if m, ok := c.(map[string]any); ok { restarts += intFrom(m["restartCount"]) } + } + out = append(out, map[string]any{ "namespace": ns, "name": name, "state": phase, "restarts": restarts }) + } + return out +} + +func k8sNamespaces(ctx context.Context) []string { + nss := kubectlJSON(ctx, "get", "ns") + out := []string{} + for _, n := range nss { if meta, ok := n["metadata"].(map[string]any); ok { if name, _ := meta["name"].(string); name != "" { out = append(out, name) } } } + return out +} + +func k8sVolumes(ctx context.Context) []map[string]any { + pvs := kubectlJSON(ctx, "get", "pv") + pvcs := kubectlJSON(ctx, "get", "pvc", "-A") + out := []map[string]any{} + for _, pv := range pvs { + meta, _ := pv["metadata"].(map[string]any) + spec, _ := pv["spec"].(map[string]any) + name, _ := meta["name"].(string) + cap, _ := spec["capacity"].(map[string]any) + size := quantityToBytes(cap["storage"]) // приблизительно + out = append(out, map[string]any{ "pv": name, "size_bytes": size }) + } + for _, pvc := range pvcs { + meta, _ := pvc["metadata"].(map[string]any) + spec, _ := pvc["spec"].(map[string]any) + name, _ := meta["name"].(string) + ns, _ := meta["namespace"].(string) + rq, _ := spec["resources"].(map[string]any) + reqs, _ := rq["requests"].(map[string]any) + size := quantityToBytes(reqs["storage"]) // приблизительно + out = append(out, map[string]any{ "pvc": name, "namespace": ns, "size_bytes": size }) + } + return out +} + +func k8sSecrets(ctx context.Context) []map[string]any { + secs := kubectlJSON(ctx, "get", "secrets", "-A") + out := []map[string]any{} + for _, s := range secs { + meta, _ := s["metadata"].(map[string]any) + ns, _ := meta["namespace"].(string) + name, _ := meta["name"].(string) + t, _ := s["type"].(string) + out = append(out, map[string]any{ "namespace": ns, "name": name, "type": t }) + } + return out +} + +func k8sDeployments(ctx context.Context) []map[string]any { + deps := kubectlJSON(ctx, "get", "deployments", "-A") + out := []map[string]any{} + for _, d := range deps { + meta, _ := d["metadata"].(map[string]any) + status, _ := d["status"].(map[string]any) + ns, _ := meta["namespace"].(string) + name, _ := meta["name"].(string) + ready := intFrom(status["readyReplicas"]) + replicas := intFrom(status["replicas"]) + out = append(out, map[string]any{ "namespace": ns, "name": name, "ready": ready, "replicas": replicas }) + } + return out +} + +func k8sDaemonSets(ctx context.Context) []map[string]any { + dss := kubectlJSON(ctx, "get", "daemonsets", "-A") + out := []map[string]any{} + for _, d := range dss { + meta, _ := d["metadata"].(map[string]any) + status, _ := d["status"].(map[string]any) + ns, _ := meta["namespace"].(string) + name, _ := meta["name"].(string) + ready := intFrom(status["numberReady"]) + desired := intFrom(status["desiredNumberScheduled"]) + out = append(out, map[string]any{ "namespace": ns, "name": name, "ready": ready, "desired": desired }) + } + return out +} + +func k8sStatefulSets(ctx context.Context) []map[string]any { + sfs := kubectlJSON(ctx, "get", "statefulsets", "-A") + out := []map[string]any{} + for _, s := range sfs { + meta, _ := s["metadata"].(map[string]any) + status, _ := s["status"].(map[string]any) + ns, _ := meta["namespace"].(string) + name, _ := meta["name"].(string) + ready := intFrom(status["readyReplicas"]) + replicas := intFrom(status["replicas"]) + out = append(out, map[string]any{ "namespace": ns, "name": name, "ready": ready, "replicas": replicas }) + } + return out +} + +func k8sCronJobs(ctx context.Context) []map[string]any { + cjs := kubectlJSON(ctx, "get", "cronjobs", "-A") + out := []map[string]any{} + for _, c := range cjs { + meta, _ := c["metadata"].(map[string]any) + ns, _ := meta["namespace"].(string) + name, _ := meta["name"].(string) + spec, _ := c["spec"].(map[string]any) + sch, _ := spec["schedule"].(string) + out = append(out, map[string]any{ "namespace": ns, "name": name, "schedule": sch }) + } + return out +} + +// quantityPct — приблизительный процент: allocatable/capacity +func quantityPct(alloc any, cap any, scale int) float64 { + a := parseQuantity(alloc, scale) + c := parseQuantity(cap, scale) + if c == 0 { return 0 } + return float64(a) * 100.0 / float64(c) +} + +func parseQuantity(v any, scale int) uint64 { + s := "" + switch t := v.(type) { + case string: + s = t + case map[string]any: + if vs, ok := t["string"].(string); ok { s = vs } + } + s = strings.TrimSpace(strings.ToLower(s)) + s = strings.TrimSuffix(s, "m") // millicores + s = strings.TrimSuffix(s, "mi") // mebibytes + s = strings.TrimSuffix(s, "gi") // gibibytes + s = strings.TrimSuffix(s, "g") + s = strings.TrimSuffix(s, "m") + n, _ := strconv.ParseFloat(s, 64) + if strings.HasSuffix(strings.ToLower(s), "gi") || strings.HasSuffix(strings.ToLower(s), "g") { n *= 1024 * 1024 * 1024 } + if strings.HasSuffix(strings.ToLower(s), "mi") { n *= 1024 * 1024 } + if scale == 1000 { return uint64(n * 1000) } + return uint64(n) +} + +func quantityToBytes(v any) uint64 { + s := "" + switch t := v.(type) { + case string: s = t + case map[string]any: + if vs, ok := t["string"].(string); ok { s = vs } + } + s = strings.TrimSpace(strings.ToLower(s)) + mult := float64(1) + for _, suf := range []struct{K string; M float64}{ + {"ki", 1024}, {"mi", 1024*1024}, {"gi", 1024*1024*1024}, + {"k", 1000}, {"m", 1000*1000}, {"g", 1000*1000*1000}, + } { + if strings.HasSuffix(s, suf.K) { mult = suf.M; s = strings.TrimSuffix(s, suf.K); break } + } + f, _ := strconv.ParseFloat(s, 64) + return uint64(f * mult) +} + +func intFrom(v any) int { switch t := v.(type) { case float64: return int(t); case int: return t; case string: i, _ := strconv.Atoi(t); return i; default: return 0 } } + + diff --git a/src/collectors/kubernetes/kubernetes_unsupported.go b/src/collectors/kubernetes/kubernetes_unsupported.go new file mode 100644 index 0000000..d6f7301 --- /dev/null +++ b/src/collectors/kubernetes/kubernetes_unsupported.go @@ -0,0 +1,12 @@ +//go:build !linux + +package main + +// Автор: Сергей Антропов, сайт: https://devops.org.ru +// Заглушка для неподдерживаемых платформ: возвращает пустой результат. + +import "context" + +func collectKubernetes(ctx context.Context) (map[string]any, error) { return nil, nil } + + diff --git a/src/collectors/kubernetes/main.go b/src/collectors/kubernetes/main.go new file mode 100644 index 0000000..05e4ec6 --- /dev/null +++ b/src/collectors/kubernetes/main.go @@ -0,0 +1,43 @@ +package main + +// Автор: Сергей Антропов, сайт: https://devops.org.ru +// Коллектор kubernetes. Собирает агрегированную информацию о кластере через kubectl. +// Требует настроенный kubeconfig/доступ от имени пользователя, под которым запускается агент. + +import ( + "context" + "encoding/json" + "fmt" + "os" + "strings" + "time" +) + +// collectKubernetes реализуется платформенно. + +func main() { + // Таймаут можно переопределить окружением COLLECTOR_TIMEOUT + timeout := parseDurationOr("COLLECTOR_TIMEOUT", 12*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + data, err := collectKubernetes(ctx) + if err != nil || data == nil { + fmt.Println("{}") + return + } + enc := json.NewEncoder(os.Stdout) + enc.SetEscapeHTML(false) + _ = enc.Encode(data) +} + +// parseDurationOr парсит длительность из переменной окружения или возвращает дефолт +func parseDurationOr(env string, def time.Duration) time.Duration { + v := strings.TrimSpace(os.Getenv(env)) + if v == "" { return def } + d, err := time.ParseDuration(v) + if err != nil { return def } + return d +} + +