From be7e3c3420ca5f4dd6c5f4b671a39f3049c8692b Mon Sep 17 00:00:00 2001 From: Sergey Antropoff Date: Mon, 8 Sep 2025 18:07:45 +0300 Subject: [PATCH] =?UTF-8?q?feat(kubernetes):=20=D1=80=D0=B0=D1=81=D1=88?= =?UTF-8?q?=D0=B8=D1=80=D0=B5=D0=BD=D1=8B=20=D0=BC=D0=B5=D1=82=D1=80=D0=B8?= =?UTF-8?q?=D0=BA=D0=B8=20=E2=80=94=20=D1=83=D0=B7=D0=BB=D1=8B=20(capacity?= =?UTF-8?q?/allocatable=20+=20usage),=20=D0=BF=D0=BE=D0=B4=D1=8B=20(usage,?= =?UTF-8?q?=20requests/limits,=20IPs),=20PV=20type,=20kubelet=20storage=20?= =?UTF-8?q?usage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/collectors/kubernetes/kubernetes_linux.go | 252 ++++++++++++++++-- 1 file changed, 235 insertions(+), 17 deletions(-) diff --git a/src/collectors/kubernetes/kubernetes_linux.go b/src/collectors/kubernetes/kubernetes_linux.go index eb84e08..38f915d 100644 --- a/src/collectors/kubernetes/kubernetes_linux.go +++ b/src/collectors/kubernetes/kubernetes_linux.go @@ -20,8 +20,12 @@ func collectKubernetes(ctx context.Context) (map[string]any, error) { if _, err := exec.LookPath("kubectl"); err != nil { return nil, nil } res := map[string]any{} + // Метрики из metrics.k8s.io (если доступно) + nodeUsage := k8sNodeUsage(ctx) + podUsage := k8sPodUsage(ctx) + // Masters / Workers (узлы) - if nodes := k8sNodes(ctx); len(nodes) > 0 { res["nodes"] = nodes } + if nodes := k8sNodes(ctx, nodeUsage); len(nodes) > 0 { res["nodes"] = nodes } // Ingress controllers (Популярные: nginx, traefik; ищем Deployment/DaemonSet с меткой app.kubernetes.io/name) if ings := k8sIngressControllers(ctx); len(ings) > 0 { res["ingress_controllers"] = ings } @@ -29,14 +33,17 @@ func collectKubernetes(ctx context.Context) (map[string]any, error) { // LoadBalancers: сервисы типа LoadBalancer с внешним IP + сетевые RX/TX (если доступно через metrics) if lbs := k8sLoadBalancers(ctx); len(lbs) > 0 { res["load_balancers"] = lbs } - // Pods: name, ns, state, cpu/mem, restarts - if pods := k8sPods(ctx); len(pods) > 0 { res["pods"] = pods } + // Pods: name, ns, state, cpu/mem usage, restarts, requests/limits, IPs + if pods := k8sPods(ctx, podUsage); len(pods) > 0 { res["pods"] = pods } // Namespaces, Volumes (PV/PVC), Secrets if nss := k8sNamespaces(ctx); len(nss) > 0 { res["namespaces"] = nss } if vols := k8sVolumes(ctx); len(vols) > 0 { res["volumes"] = vols } if secs := k8sSecrets(ctx); len(secs) > 0 { res["secrets"] = secs } + // Использование томов по данным kubelet summary API + if su := k8sStorageUsage(ctx); len(su) > 0 { res["storage_usage"] = su } + // Workloads: Deployments, DaemonSets, StatefulSets, CronJobs if deps := k8sDeployments(ctx); len(deps) > 0 { res["deployments"] = deps } if dss := k8sDaemonSets(ctx); len(dss) > 0 { res["daemonsets"] = dss } @@ -69,8 +76,17 @@ func kubectlJSON(ctx context.Context, args ...string) []map[string]any { return res } +// kubectlRawJSON выполняет kubectl get --raw для произвольного пути API и парсит JSON +func kubectlRawJSON(ctx context.Context, path string) map[string]any { + out, err := run(ctx, "kubectl", "get", "--raw", path) + if err != nil || strings.TrimSpace(out) == "" { return nil } + var obj map[string]any + if e := json.Unmarshal([]byte(out), &obj); e != nil { return nil } + return obj +} + // k8sNodes собирает сведения о master/worker/taints, версиях и простых ресурсах узлов -func k8sNodes(ctx context.Context) []map[string]any { +func k8sNodes(ctx context.Context, usage map[string]map[string]uint64) []map[string]any { arr := kubectlJSON(ctx, "get", "nodes") out := []map[string]any{} for _, n := range arr { @@ -98,15 +114,21 @@ func k8sNodes(ctx context.Context) []map[string]any { } } } - // Упрощённые CPU/Mem проценты: попробуем вытащить allocatable/capacity + // CPU/RAM: capacity/allocatable + использование (metrics-server) alloc, _ := status["allocatable"].(map[string]any) cap, _ := status["capacity"].(map[string]any) - cpuPct := 0.0 - memPct := 0.0 - if alloc != nil && cap != nil { - cpuPct = quantityPct(alloc["cpu"], cap["cpu"], 1000) // CPU в millicores эвристически - memPct = quantityPct(alloc["memory"], cap["memory"], 1) // Память приблизительно - } + cpuCapacityCores := quantityCores(cap["cpu"]) // float cores + memCapacityBytes := quantityBytes(cap["memory"]) // bytes + cpuAllocCores := quantityCores(alloc["cpu"]) + memAllocBytes := quantityBytes(alloc["memory"]) + var cpuUsedMillicores uint64 + var memUsedBytes uint64 + if u, ok := usage[name]; ok { cpuUsedMillicores = u["cpu_m"]; memUsedBytes = u["mem_bytes"] } + cpuUsedCores := float64(cpuUsedMillicores) / 1000.0 + cpuUsedPct := 0.0 + memUsedPct := 0.0 + if cpuCapacityCores > 0 { cpuUsedPct = (cpuUsedCores / cpuCapacityCores) * 100.0 } + if memCapacityBytes > 0 { memUsedPct = (float64(memUsedBytes) / float64(memCapacityBytes)) * 100.0 } role := "worker" if labels, ok := meta["labels"].(map[string]any); ok { for k := range labels { @@ -119,8 +141,14 @@ func k8sNodes(ctx context.Context) []map[string]any { "version": version, "status": nodeReady, "role": role, - "cpu_pct": cpuPct, - "mem_pct": memPct, + "cpu_capacity_cores": cpuCapacityCores, + "mem_capacity_bytes": memCapacityBytes, + "cpu_allocatable_cores": cpuAllocCores, + "mem_allocatable_bytes": memAllocBytes, + "cpu_used_cores": cpuUsedCores, + "mem_used_bytes": memUsedBytes, + "cpu_used_pct": cpuUsedPct, + "mem_used_pct": memUsedPct, }) } return out @@ -171,12 +199,13 @@ func k8sLoadBalancers(ctx context.Context) []map[string]any { } // k8sPods — основные сведения по подам -func k8sPods(ctx context.Context) []map[string]any { +func k8sPods(ctx context.Context, usage map[string]map[string]uint64) []map[string]any { pods := kubectlJSON(ctx, "get", "pods", "-A") out := []map[string]any{} for _, p := range pods { meta, _ := p["metadata"].(map[string]any) status, _ := p["status"].(map[string]any) + spec, _ := p["spec"].(map[string]any) ns, _ := meta["namespace"].(string) name, _ := meta["name"].(string) phase, _ := status["phase"].(string) @@ -186,7 +215,50 @@ func k8sPods(ctx context.Context) []map[string]any { for _, c := range cs { if m, ok := c.(map[string]any); ok { restarts += intFrom(m["restartCount"]) } } - out = append(out, map[string]any{ "namespace": ns, "name": name, "state": phase, "restarts": restarts }) + // Внутренние IP (cluster) + ips := []string{} + if ip1, _ := status["podIP"].(string); ip1 != "" { ips = append(ips, ip1) } + if arr, ok := status["podIPs"].([]any); ok { + for _, it := range arr { if m, ok2 := it.(map[string]any); ok2 { if ip, _ := m["ip"].(string); ip != "" { ips = append(ips, ip) } } } + } + // Запрошенные/лимиты ресурсов + cpuReqM, memReqB, cpuLimM, memLimB := uint64(0), uint64(0), uint64(0), uint64(0) + if spec != nil { + if containers, ok := spec["containers"].([]any); ok { + for _, c := range containers { + if mc, ok2 := c.(map[string]any); ok2 { + if res, ok3 := mc["resources"].(map[string]any); ok3 { + if rq, ok4 := res["requests"].(map[string]any); ok4 { + cpuReqM += quantityMillicores(rq["cpu"]) + memReqB += quantityBytes(rq["memory"]) + } + if lm, ok4 := res["limits"].(map[string]any); ok4 { + cpuLimM += quantityMillicores(lm["cpu"]) + memLimB += quantityBytes(lm["memory"]) + } + } + } + } + } + } + // Фактическое потребление (metrics-server) + key := ns + "/" + name + cpuUsedM := usageValue(usage, key, "cpu_m") + memUsedB := usageValue(usage, key, "mem_bytes") + + out = append(out, map[string]any{ + "namespace": ns, + "name": name, + "state": phase, + "restarts": restarts, + "ips": ips, + "cpu_used_m": cpuUsedM, + "mem_used_bytes": memUsedB, + "cpu_requests_m": cpuReqM, + "mem_requests_bytes": memReqB, + "cpu_limits_m": cpuLimM, + "mem_limits_bytes": memLimB, + }) } return out } @@ -208,7 +280,8 @@ func k8sVolumes(ctx context.Context) []map[string]any { name, _ := meta["name"].(string) cap, _ := spec["capacity"].(map[string]any) size := quantityToBytes(cap["storage"]) // приблизительно - out = append(out, map[string]any{ "pv": name, "size_bytes": size }) + stype := pvType(spec) + out = append(out, map[string]any{ "pv": name, "size_bytes": size, "type": stype }) } for _, pvc := range pvcs { meta, _ := pvc["metadata"].(map[string]any) @@ -218,7 +291,8 @@ func k8sVolumes(ctx context.Context) []map[string]any { rq, _ := spec["resources"].(map[string]any) reqs, _ := rq["requests"].(map[string]any) size := quantityToBytes(reqs["storage"]) // приблизительно - out = append(out, map[string]any{ "pvc": name, "namespace": ns, "size_bytes": size }) + stClass, _ := spec["storageClassName"].(string) + out = append(out, map[string]any{ "pvc": name, "namespace": ns, "size_bytes": size, "storage_class": stClass }) } return out } @@ -303,6 +377,40 @@ func quantityPct(alloc any, cap any, scale int) float64 { return float64(a) * 100.0 / float64(c) } +// quantityCores: cpu cores as float from K8s quantity +func quantityCores(v any) float64 { + m := quantityMillicores(v) + return float64(m) / 1000.0 +} + +// quantityMillicores: cpu in millicores from quantity +func quantityMillicores(v any) uint64 { + s := quantityString(v) + s = strings.TrimSpace(strings.ToLower(s)) + if strings.HasSuffix(s, "m") { + n, _ := strconv.ParseFloat(strings.TrimSuffix(s, "m"), 64) + return uint64(n) + } + n, _ := strconv.ParseFloat(s, 64) + return uint64(n * 1000) +} + +// quantityBytes: memory in bytes from quantity +func quantityBytes(v any) uint64 { + s := quantityString(v) + return quantityToBytes(s) +} + +func quantityString(v any) string { + switch t := v.(type) { + case string: + return t + case map[string]any: + if vs, ok := t["string"].(string); ok { return vs } + } + return "" +} + func parseQuantity(v any, scale int) uint64 { s := "" switch t := v.(type) { @@ -345,4 +453,114 @@ func quantityToBytes(v any) uint64 { func intFrom(v any) int { switch t := v.(type) { case float64: return int(t); case int: return t; case string: i, _ := strconv.Atoi(t); return i; default: return 0 } } +func uint64From(v any) uint64 { switch t := v.(type) { case float64: return uint64(t); case int: return uint64(t); case string: u, _ := strconv.ParseUint(t, 10, 64); return u; default: return 0 } } + +// usageValue получает значение метрики из карты usage по ключу +func usageValue(usage map[string]map[string]uint64, key, field string) uint64 { + if m, ok := usage[key]; ok { return m[field] } + return 0 +} + +// k8sNodeUsage — метрики узлов по metrics.k8s.io +func k8sNodeUsage(ctx context.Context) map[string]map[string]uint64 { + obj := kubectlRawJSON(ctx, "/apis/metrics.k8s.io/v1beta1/nodes") + res := map[string]map[string]uint64{} + if obj == nil { return res } + items, _ := obj["items"].([]any) + for _, it := range items { + m, _ := it.(map[string]any) + meta, _ := m["metadata"].(map[string]any) + name, _ := meta["name"].(string) + usage, _ := m["usage"].(map[string]any) + if name == "" || usage == nil { continue } + cpuM := quantityMillicores(usage["cpu"]) + memB := quantityBytes(usage["memory"]) + res[name] = map[string]uint64{"cpu_m": cpuM, "mem_bytes": memB} + } + return res +} + +// k8sPodUsage — метрики подов по metrics.k8s.io +func k8sPodUsage(ctx context.Context) map[string]map[string]uint64 { + obj := kubectlRawJSON(ctx, "/apis/metrics.k8s.io/v1beta1/pods") + res := map[string]map[string]uint64{} + if obj == nil { return res } + items, _ := obj["items"].([]any) + for _, it := range items { + m, _ := it.(map[string]any) + meta, _ := m["metadata"].(map[string]any) + ns, _ := meta["namespace"].(string) + name, _ := meta["name"].(string) + key := ns + "/" + name + conts, _ := m["containers"].([]any) + var cpuM, memB uint64 + for _, c := range conts { + if cm, ok := c.(map[string]any); ok { + if u, ok2 := cm["usage"].(map[string]any); ok2 { + cpuM += quantityMillicores(u["cpu"]) + memB += quantityBytes(u["memory"]) + } + } + } + res[key] = map[string]uint64{"cpu_m": cpuM, "mem_bytes": memB} + } + return res +} + +// pvType — определение типа хранилища PV +func pvType(spec map[string]any) string { + if spec == nil { return "" } + for _, k := range []string{"hostPath","nfs","gcePersistentDisk","awsElasticBlockStore","azureDisk","azureFile","cephfs","cinder","iscsi","local","portworxVolume","vsphereVolume","csi"} { + if _, ok := spec[k]; ok { return k } + } + return "" +} + +// k8sStorageUsage — использование томов по данным kubelet summary API +func k8sStorageUsage(ctx context.Context) []map[string]any { + nodeObjs := kubectlJSON(ctx, "get", "nodes") + out := []map[string]any{} + for _, n := range nodeObjs { + meta, _ := n["metadata"].(map[string]any) + nodeName, _ := meta["name"].(string) + if nodeName == "" { continue } + path := "/api/v1/nodes/" + nodeName + "/proxy/stats/summary" + obj := kubectlRawJSON(ctx, path) + if obj == nil { continue } + pods, _ := obj["pods"].([]any) + for _, p := range pods { + pm, _ := p.(map[string]any) + podRef, _ := pm["podRef"].(map[string]any) + pns, _ := podRef["namespace"].(string) + pname, _ := podRef["name"].(string) + vols, _ := pm["volumeStats"].([]any) + for _, v := range vols { + vm, _ := v.(map[string]any) + vname, _ := vm["name"].(string) + pvcRef, _ := vm["pvcRef"].(map[string]any) + pvcName, _ := pvcRef["name"].(string) + fs, _ := vm["fsStats"].(map[string]any) + capB := uint64(0); usedB := uint64(0); availB := uint64(0) + if fs != nil { + capB = uint64From(fs["capacityBytes"]) + usedB = uint64From(fs["usedBytes"]) + availB = uint64From(fs["availableBytes"]) + } + name := pvcName + if name == "" { name = pns + "/" + pname + ":" + vname } + out = append(out, map[string]any{ + "name": name, + "namespace": pns, + "pod": pname, + "node": nodeName, + "capacity_bytes": capB, + "used_bytes": usedB, + "available_bytes": availB, + }) + } + } + } + return out +} +