feat(kubernetes): расширены метрики — узлы (capacity/allocatable + usage), поды (usage, requests/limits, IPs), PV type, kubelet storage usage

This commit is contained in:
Sergey Antropoff 2025-09-08 18:07:45 +03:00
parent fbee229d5d
commit be7e3c3420

View File

@ -20,8 +20,12 @@ func collectKubernetes(ctx context.Context) (map[string]any, error) {
if _, err := exec.LookPath("kubectl"); err != nil { return nil, nil }
res := map[string]any{}
// Метрики из metrics.k8s.io (если доступно)
nodeUsage := k8sNodeUsage(ctx)
podUsage := k8sPodUsage(ctx)
// Masters / Workers (узлы)
if nodes := k8sNodes(ctx); len(nodes) > 0 { res["nodes"] = nodes }
if nodes := k8sNodes(ctx, nodeUsage); len(nodes) > 0 { res["nodes"] = nodes }
// Ingress controllers (Популярные: nginx, traefik; ищем Deployment/DaemonSet с меткой app.kubernetes.io/name)
if ings := k8sIngressControllers(ctx); len(ings) > 0 { res["ingress_controllers"] = ings }
@ -29,14 +33,17 @@ func collectKubernetes(ctx context.Context) (map[string]any, error) {
// LoadBalancers: сервисы типа LoadBalancer с внешним IP + сетевые RX/TX (если доступно через metrics)
if lbs := k8sLoadBalancers(ctx); len(lbs) > 0 { res["load_balancers"] = lbs }
// Pods: name, ns, state, cpu/mem, restarts
if pods := k8sPods(ctx); len(pods) > 0 { res["pods"] = pods }
// Pods: name, ns, state, cpu/mem usage, restarts, requests/limits, IPs
if pods := k8sPods(ctx, podUsage); len(pods) > 0 { res["pods"] = pods }
// Namespaces, Volumes (PV/PVC), Secrets
if nss := k8sNamespaces(ctx); len(nss) > 0 { res["namespaces"] = nss }
if vols := k8sVolumes(ctx); len(vols) > 0 { res["volumes"] = vols }
if secs := k8sSecrets(ctx); len(secs) > 0 { res["secrets"] = secs }
// Использование томов по данным kubelet summary API
if su := k8sStorageUsage(ctx); len(su) > 0 { res["storage_usage"] = su }
// Workloads: Deployments, DaemonSets, StatefulSets, CronJobs
if deps := k8sDeployments(ctx); len(deps) > 0 { res["deployments"] = deps }
if dss := k8sDaemonSets(ctx); len(dss) > 0 { res["daemonsets"] = dss }
@ -69,8 +76,17 @@ func kubectlJSON(ctx context.Context, args ...string) []map[string]any {
return res
}
// kubectlRawJSON выполняет kubectl get --raw для произвольного пути API и парсит JSON
func kubectlRawJSON(ctx context.Context, path string) map[string]any {
out, err := run(ctx, "kubectl", "get", "--raw", path)
if err != nil || strings.TrimSpace(out) == "" { return nil }
var obj map[string]any
if e := json.Unmarshal([]byte(out), &obj); e != nil { return nil }
return obj
}
// k8sNodes собирает сведения о master/worker/taints, версиях и простых ресурсах узлов
func k8sNodes(ctx context.Context) []map[string]any {
func k8sNodes(ctx context.Context, usage map[string]map[string]uint64) []map[string]any {
arr := kubectlJSON(ctx, "get", "nodes")
out := []map[string]any{}
for _, n := range arr {
@ -98,15 +114,21 @@ func k8sNodes(ctx context.Context) []map[string]any {
}
}
}
// Упрощённые CPU/Mem проценты: попробуем вытащить allocatable/capacity
// CPU/RAM: capacity/allocatable + использование (metrics-server)
alloc, _ := status["allocatable"].(map[string]any)
cap, _ := status["capacity"].(map[string]any)
cpuPct := 0.0
memPct := 0.0
if alloc != nil && cap != nil {
cpuPct = quantityPct(alloc["cpu"], cap["cpu"], 1000) // CPU в millicores эвристически
memPct = quantityPct(alloc["memory"], cap["memory"], 1) // Память приблизительно
}
cpuCapacityCores := quantityCores(cap["cpu"]) // float cores
memCapacityBytes := quantityBytes(cap["memory"]) // bytes
cpuAllocCores := quantityCores(alloc["cpu"])
memAllocBytes := quantityBytes(alloc["memory"])
var cpuUsedMillicores uint64
var memUsedBytes uint64
if u, ok := usage[name]; ok { cpuUsedMillicores = u["cpu_m"]; memUsedBytes = u["mem_bytes"] }
cpuUsedCores := float64(cpuUsedMillicores) / 1000.0
cpuUsedPct := 0.0
memUsedPct := 0.0
if cpuCapacityCores > 0 { cpuUsedPct = (cpuUsedCores / cpuCapacityCores) * 100.0 }
if memCapacityBytes > 0 { memUsedPct = (float64(memUsedBytes) / float64(memCapacityBytes)) * 100.0 }
role := "worker"
if labels, ok := meta["labels"].(map[string]any); ok {
for k := range labels {
@ -119,8 +141,14 @@ func k8sNodes(ctx context.Context) []map[string]any {
"version": version,
"status": nodeReady,
"role": role,
"cpu_pct": cpuPct,
"mem_pct": memPct,
"cpu_capacity_cores": cpuCapacityCores,
"mem_capacity_bytes": memCapacityBytes,
"cpu_allocatable_cores": cpuAllocCores,
"mem_allocatable_bytes": memAllocBytes,
"cpu_used_cores": cpuUsedCores,
"mem_used_bytes": memUsedBytes,
"cpu_used_pct": cpuUsedPct,
"mem_used_pct": memUsedPct,
})
}
return out
@ -171,12 +199,13 @@ func k8sLoadBalancers(ctx context.Context) []map[string]any {
}
// k8sPods — основные сведения по подам
func k8sPods(ctx context.Context) []map[string]any {
func k8sPods(ctx context.Context, usage map[string]map[string]uint64) []map[string]any {
pods := kubectlJSON(ctx, "get", "pods", "-A")
out := []map[string]any{}
for _, p := range pods {
meta, _ := p["metadata"].(map[string]any)
status, _ := p["status"].(map[string]any)
spec, _ := p["spec"].(map[string]any)
ns, _ := meta["namespace"].(string)
name, _ := meta["name"].(string)
phase, _ := status["phase"].(string)
@ -186,7 +215,50 @@ func k8sPods(ctx context.Context) []map[string]any {
for _, c := range cs {
if m, ok := c.(map[string]any); ok { restarts += intFrom(m["restartCount"]) }
}
out = append(out, map[string]any{ "namespace": ns, "name": name, "state": phase, "restarts": restarts })
// Внутренние IP (cluster)
ips := []string{}
if ip1, _ := status["podIP"].(string); ip1 != "" { ips = append(ips, ip1) }
if arr, ok := status["podIPs"].([]any); ok {
for _, it := range arr { if m, ok2 := it.(map[string]any); ok2 { if ip, _ := m["ip"].(string); ip != "" { ips = append(ips, ip) } } }
}
// Запрошенные/лимиты ресурсов
cpuReqM, memReqB, cpuLimM, memLimB := uint64(0), uint64(0), uint64(0), uint64(0)
if spec != nil {
if containers, ok := spec["containers"].([]any); ok {
for _, c := range containers {
if mc, ok2 := c.(map[string]any); ok2 {
if res, ok3 := mc["resources"].(map[string]any); ok3 {
if rq, ok4 := res["requests"].(map[string]any); ok4 {
cpuReqM += quantityMillicores(rq["cpu"])
memReqB += quantityBytes(rq["memory"])
}
if lm, ok4 := res["limits"].(map[string]any); ok4 {
cpuLimM += quantityMillicores(lm["cpu"])
memLimB += quantityBytes(lm["memory"])
}
}
}
}
}
}
// Фактическое потребление (metrics-server)
key := ns + "/" + name
cpuUsedM := usageValue(usage, key, "cpu_m")
memUsedB := usageValue(usage, key, "mem_bytes")
out = append(out, map[string]any{
"namespace": ns,
"name": name,
"state": phase,
"restarts": restarts,
"ips": ips,
"cpu_used_m": cpuUsedM,
"mem_used_bytes": memUsedB,
"cpu_requests_m": cpuReqM,
"mem_requests_bytes": memReqB,
"cpu_limits_m": cpuLimM,
"mem_limits_bytes": memLimB,
})
}
return out
}
@ -208,7 +280,8 @@ func k8sVolumes(ctx context.Context) []map[string]any {
name, _ := meta["name"].(string)
cap, _ := spec["capacity"].(map[string]any)
size := quantityToBytes(cap["storage"]) // приблизительно
out = append(out, map[string]any{ "pv": name, "size_bytes": size })
stype := pvType(spec)
out = append(out, map[string]any{ "pv": name, "size_bytes": size, "type": stype })
}
for _, pvc := range pvcs {
meta, _ := pvc["metadata"].(map[string]any)
@ -218,7 +291,8 @@ func k8sVolumes(ctx context.Context) []map[string]any {
rq, _ := spec["resources"].(map[string]any)
reqs, _ := rq["requests"].(map[string]any)
size := quantityToBytes(reqs["storage"]) // приблизительно
out = append(out, map[string]any{ "pvc": name, "namespace": ns, "size_bytes": size })
stClass, _ := spec["storageClassName"].(string)
out = append(out, map[string]any{ "pvc": name, "namespace": ns, "size_bytes": size, "storage_class": stClass })
}
return out
}
@ -303,6 +377,40 @@ func quantityPct(alloc any, cap any, scale int) float64 {
return float64(a) * 100.0 / float64(c)
}
// quantityCores: cpu cores as float from K8s quantity
func quantityCores(v any) float64 {
m := quantityMillicores(v)
return float64(m) / 1000.0
}
// quantityMillicores: cpu in millicores from quantity
func quantityMillicores(v any) uint64 {
s := quantityString(v)
s = strings.TrimSpace(strings.ToLower(s))
if strings.HasSuffix(s, "m") {
n, _ := strconv.ParseFloat(strings.TrimSuffix(s, "m"), 64)
return uint64(n)
}
n, _ := strconv.ParseFloat(s, 64)
return uint64(n * 1000)
}
// quantityBytes: memory in bytes from quantity
func quantityBytes(v any) uint64 {
s := quantityString(v)
return quantityToBytes(s)
}
func quantityString(v any) string {
switch t := v.(type) {
case string:
return t
case map[string]any:
if vs, ok := t["string"].(string); ok { return vs }
}
return ""
}
func parseQuantity(v any, scale int) uint64 {
s := ""
switch t := v.(type) {
@ -345,4 +453,114 @@ func quantityToBytes(v any) uint64 {
func intFrom(v any) int { switch t := v.(type) { case float64: return int(t); case int: return t; case string: i, _ := strconv.Atoi(t); return i; default: return 0 } }
func uint64From(v any) uint64 { switch t := v.(type) { case float64: return uint64(t); case int: return uint64(t); case string: u, _ := strconv.ParseUint(t, 10, 64); return u; default: return 0 } }
// usageValue получает значение метрики из карты usage по ключу
func usageValue(usage map[string]map[string]uint64, key, field string) uint64 {
if m, ok := usage[key]; ok { return m[field] }
return 0
}
// k8sNodeUsage — метрики узлов по metrics.k8s.io
func k8sNodeUsage(ctx context.Context) map[string]map[string]uint64 {
obj := kubectlRawJSON(ctx, "/apis/metrics.k8s.io/v1beta1/nodes")
res := map[string]map[string]uint64{}
if obj == nil { return res }
items, _ := obj["items"].([]any)
for _, it := range items {
m, _ := it.(map[string]any)
meta, _ := m["metadata"].(map[string]any)
name, _ := meta["name"].(string)
usage, _ := m["usage"].(map[string]any)
if name == "" || usage == nil { continue }
cpuM := quantityMillicores(usage["cpu"])
memB := quantityBytes(usage["memory"])
res[name] = map[string]uint64{"cpu_m": cpuM, "mem_bytes": memB}
}
return res
}
// k8sPodUsage — метрики подов по metrics.k8s.io
func k8sPodUsage(ctx context.Context) map[string]map[string]uint64 {
obj := kubectlRawJSON(ctx, "/apis/metrics.k8s.io/v1beta1/pods")
res := map[string]map[string]uint64{}
if obj == nil { return res }
items, _ := obj["items"].([]any)
for _, it := range items {
m, _ := it.(map[string]any)
meta, _ := m["metadata"].(map[string]any)
ns, _ := meta["namespace"].(string)
name, _ := meta["name"].(string)
key := ns + "/" + name
conts, _ := m["containers"].([]any)
var cpuM, memB uint64
for _, c := range conts {
if cm, ok := c.(map[string]any); ok {
if u, ok2 := cm["usage"].(map[string]any); ok2 {
cpuM += quantityMillicores(u["cpu"])
memB += quantityBytes(u["memory"])
}
}
}
res[key] = map[string]uint64{"cpu_m": cpuM, "mem_bytes": memB}
}
return res
}
// pvType — определение типа хранилища PV
func pvType(spec map[string]any) string {
if spec == nil { return "" }
for _, k := range []string{"hostPath","nfs","gcePersistentDisk","awsElasticBlockStore","azureDisk","azureFile","cephfs","cinder","iscsi","local","portworxVolume","vsphereVolume","csi"} {
if _, ok := spec[k]; ok { return k }
}
return ""
}
// k8sStorageUsage — использование томов по данным kubelet summary API
func k8sStorageUsage(ctx context.Context) []map[string]any {
nodeObjs := kubectlJSON(ctx, "get", "nodes")
out := []map[string]any{}
for _, n := range nodeObjs {
meta, _ := n["metadata"].(map[string]any)
nodeName, _ := meta["name"].(string)
if nodeName == "" { continue }
path := "/api/v1/nodes/" + nodeName + "/proxy/stats/summary"
obj := kubectlRawJSON(ctx, path)
if obj == nil { continue }
pods, _ := obj["pods"].([]any)
for _, p := range pods {
pm, _ := p.(map[string]any)
podRef, _ := pm["podRef"].(map[string]any)
pns, _ := podRef["namespace"].(string)
pname, _ := podRef["name"].(string)
vols, _ := pm["volumeStats"].([]any)
for _, v := range vols {
vm, _ := v.(map[string]any)
vname, _ := vm["name"].(string)
pvcRef, _ := vm["pvcRef"].(map[string]any)
pvcName, _ := pvcRef["name"].(string)
fs, _ := vm["fsStats"].(map[string]any)
capB := uint64(0); usedB := uint64(0); availB := uint64(0)
if fs != nil {
capB = uint64From(fs["capacityBytes"])
usedB = uint64From(fs["usedBytes"])
availB = uint64From(fs["availableBytes"])
}
name := pvcName
if name == "" { name = pns + "/" + pname + ":" + vname }
out = append(out, map[string]any{
"name": name,
"namespace": pns,
"pod": pname,
"node": nodeName,
"capacity_bytes": capB,
"used_bytes": usedB,
"available_bytes": availB,
})
}
}
}
return out
}