SensusAgent/src/collectors/kubernetes/kubernetes_linux.go

349 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//go:build linux
package main
// Автор: Сергей Антропов, сайт: https://devops.org.ru
// Linux-реализация kubernetes-коллектора: использует kubectl и вывод в JSON.
// Для производительности и устойчивости выбираем краткие поля и агрегируем по минимуму,
// оставляя только запрошенные пользователем метрики.
import (
"context"
"encoding/json"
"os/exec"
"strconv"
"strings"
)
// collectKubernetes собирает сводную информацию по кластеру
func collectKubernetes(ctx context.Context) (map[string]any, error) {
if _, err := exec.LookPath("kubectl"); err != nil { return nil, nil }
res := map[string]any{}
// Masters / Workers (узлы)
if nodes := k8sNodes(ctx); len(nodes) > 0 { res["nodes"] = nodes }
// Ingress controllers (Популярные: nginx, traefik; ищем Deployment/DaemonSet с меткой app.kubernetes.io/name)
if ings := k8sIngressControllers(ctx); len(ings) > 0 { res["ingress_controllers"] = ings }
// LoadBalancers: сервисы типа LoadBalancer с внешним IP + сетевые RX/TX (если доступно через metrics)
if lbs := k8sLoadBalancers(ctx); len(lbs) > 0 { res["load_balancers"] = lbs }
// Pods: name, ns, state, cpu/mem, restarts
if pods := k8sPods(ctx); len(pods) > 0 { res["pods"] = pods }
// Namespaces, Volumes (PV/PVC), Secrets
if nss := k8sNamespaces(ctx); len(nss) > 0 { res["namespaces"] = nss }
if vols := k8sVolumes(ctx); len(vols) > 0 { res["volumes"] = vols }
if secs := k8sSecrets(ctx); len(secs) > 0 { res["secrets"] = secs }
// Workloads: Deployments, DaemonSets, StatefulSets, CronJobs
if deps := k8sDeployments(ctx); len(deps) > 0 { res["deployments"] = deps }
if dss := k8sDaemonSets(ctx); len(dss) > 0 { res["daemonsets"] = dss }
if sfs := k8sStatefulSets(ctx); len(sfs) > 0 { res["statefulsets"] = sfs }
if cjs := k8sCronJobs(ctx); len(cjs) > 0 { res["cronjobs"] = cjs }
if len(res) == 0 { return nil, nil }
return res, nil
}
// Вспомогательные функции запуска команд
func run(ctx context.Context, bin string, args ...string) (string, error) {
cmd := exec.CommandContext(ctx, bin, args...)
b, err := cmd.Output()
if err != nil { return "", err }
return string(b), nil
}
func kubectlJSON(ctx context.Context, args ...string) []map[string]any {
base := append(args, "-o", "json")
out, err := run(ctx, "kubectl", base...)
if err != nil || strings.TrimSpace(out) == "" { return nil }
var obj map[string]any
if e := json.Unmarshal([]byte(out), &obj); e != nil { return nil }
items, _ := obj["items"].([]any)
res := []map[string]any{}
for _, it := range items {
if m, ok := it.(map[string]any); ok { res = append(res, m) }
}
return res
}
// k8sNodes собирает сведения о master/worker/taints, версиях и простых ресурсах узлов
func k8sNodes(ctx context.Context) []map[string]any {
arr := kubectlJSON(ctx, "get", "nodes")
out := []map[string]any{}
for _, n := range arr {
meta, _ := n["metadata"].(map[string]any)
status, _ := n["status"].(map[string]any)
name, _ := meta["name"].(string)
addresses, _ := status["addresses"].([]any)
ip := ""
for _, a := range addresses {
if m, ok := a.(map[string]any); ok {
if t, _ := m["type"].(string); t == "InternalIP" {
ip, _ = m["address"].(string)
break
}
}
}
nodeInfo, _ := status["nodeInfo"].(map[string]any)
version, _ := nodeInfo["kubeletVersion"].(string)
conditions, _ := status["conditions"].([]any)
nodeReady := "Unknown"
for _, c := range conditions {
if m, ok := c.(map[string]any); ok {
if t, _ := m["type"].(string); t == "Ready" {
nodeReady, _ = m["status"].(string)
}
}
}
// Упрощённые CPU/Mem проценты: попробуем вытащить allocatable/capacity
alloc, _ := status["allocatable"].(map[string]any)
cap, _ := status["capacity"].(map[string]any)
cpuPct := 0.0
memPct := 0.0
if alloc != nil && cap != nil {
cpuPct = quantityPct(alloc["cpu"], cap["cpu"], 1000) // CPU в millicores эвристически
memPct = quantityPct(alloc["memory"], cap["memory"], 1) // Память приблизительно
}
role := "worker"
if labels, ok := meta["labels"].(map[string]any); ok {
for k := range labels {
if strings.Contains(k, "node-role.kubernetes.io/master") || strings.Contains(k, "node-role.kubernetes.io/control-plane") { role = "master"; break }
}
}
out = append(out, map[string]any{
"name": name,
"ip": ip,
"version": version,
"status": nodeReady,
"role": role,
"cpu_pct": cpuPct,
"mem_pct": memPct,
})
}
return out
}
// k8sIngressControllers собирает версии/реплики популярных Ingress-контроллеров
func k8sIngressControllers(ctx context.Context) []map[string]any {
// Ищем Deployments/DaemonSets с типичными метками
sel := "app.kubernetes.io/name in (ingress-nginx,nginx-ingress-controller,traefik)"
deps := kubectlJSON(ctx, "get", "deployments", "-A", "-l", sel)
dss := kubectlJSON(ctx, "get", "daemonsets", "-A", "-l", sel)
res := []map[string]any{}
for _, o := range append(deps, dss...) {
meta, _ := o["metadata"].(map[string]any)
ns, _ := meta["namespace"].(string)
name, _ := meta["name"].(string)
// Реплики
status, _ := o["status"].(map[string]any)
replicas := intFrom(status["replicas"]) // для DaemonSet это число подов
// Версию попытаемся взять из аннотаций/меток или образа контейнера
version := ""
if ann, ok := meta["annotations"].(map[string]any); ok {
if v, ok2 := ann["meta.helm.sh/release-name"].(string); ok2 { version = v }
}
res = append(res, map[string]any{ "name": name, "namespace": ns, "replicas": replicas, "version": version })
}
return res
}
// k8sLoadBalancers — сервисы типа LoadBalancer с внешним IP.
func k8sLoadBalancers(ctx context.Context) []map[string]any {
svcs := kubectlJSON(ctx, "get", "svc", "-A")
out := []map[string]any{}
for _, s := range svcs {
spec, _ := s["spec"].(map[string]any)
if t, _ := spec["type"].(string); strings.ToLower(t) != "loadbalancer" { continue }
meta, _ := s["metadata"].(map[string]any)
ns, _ := meta["namespace"].(string)
name, _ := meta["name"].(string)
status, _ := s["status"].(map[string]any)
lb, _ := status["loadBalancer"].(map[string]any)
ing, _ := lb["ingress"].([]any)
ext := ""
if len(ing) > 0 { if m, ok := ing[0].(map[string]any); ok { ext, _ = m["ip"].(string) } }
out = append(out, map[string]any{ "namespace": ns, "name": name, "external_ip": ext })
}
return out
}
// k8sPods — основные сведения по подам
func k8sPods(ctx context.Context) []map[string]any {
pods := kubectlJSON(ctx, "get", "pods", "-A")
out := []map[string]any{}
for _, p := range pods {
meta, _ := p["metadata"].(map[string]any)
status, _ := p["status"].(map[string]any)
ns, _ := meta["namespace"].(string)
name, _ := meta["name"].(string)
phase, _ := status["phase"].(string)
// Рестарты
cs, _ := status["containerStatuses"].([]any)
restarts := 0
for _, c := range cs {
if m, ok := c.(map[string]any); ok { restarts += intFrom(m["restartCount"]) }
}
out = append(out, map[string]any{ "namespace": ns, "name": name, "state": phase, "restarts": restarts })
}
return out
}
func k8sNamespaces(ctx context.Context) []string {
nss := kubectlJSON(ctx, "get", "ns")
out := []string{}
for _, n := range nss { if meta, ok := n["metadata"].(map[string]any); ok { if name, _ := meta["name"].(string); name != "" { out = append(out, name) } } }
return out
}
func k8sVolumes(ctx context.Context) []map[string]any {
pvs := kubectlJSON(ctx, "get", "pv")
pvcs := kubectlJSON(ctx, "get", "pvc", "-A")
out := []map[string]any{}
for _, pv := range pvs {
meta, _ := pv["metadata"].(map[string]any)
spec, _ := pv["spec"].(map[string]any)
name, _ := meta["name"].(string)
cap, _ := spec["capacity"].(map[string]any)
size := quantityToBytes(cap["storage"]) // приблизительно
out = append(out, map[string]any{ "pv": name, "size_bytes": size })
}
for _, pvc := range pvcs {
meta, _ := pvc["metadata"].(map[string]any)
spec, _ := pvc["spec"].(map[string]any)
name, _ := meta["name"].(string)
ns, _ := meta["namespace"].(string)
rq, _ := spec["resources"].(map[string]any)
reqs, _ := rq["requests"].(map[string]any)
size := quantityToBytes(reqs["storage"]) // приблизительно
out = append(out, map[string]any{ "pvc": name, "namespace": ns, "size_bytes": size })
}
return out
}
func k8sSecrets(ctx context.Context) []map[string]any {
secs := kubectlJSON(ctx, "get", "secrets", "-A")
out := []map[string]any{}
for _, s := range secs {
meta, _ := s["metadata"].(map[string]any)
ns, _ := meta["namespace"].(string)
name, _ := meta["name"].(string)
t, _ := s["type"].(string)
out = append(out, map[string]any{ "namespace": ns, "name": name, "type": t })
}
return out
}
func k8sDeployments(ctx context.Context) []map[string]any {
deps := kubectlJSON(ctx, "get", "deployments", "-A")
out := []map[string]any{}
for _, d := range deps {
meta, _ := d["metadata"].(map[string]any)
status, _ := d["status"].(map[string]any)
ns, _ := meta["namespace"].(string)
name, _ := meta["name"].(string)
ready := intFrom(status["readyReplicas"])
replicas := intFrom(status["replicas"])
out = append(out, map[string]any{ "namespace": ns, "name": name, "ready": ready, "replicas": replicas })
}
return out
}
func k8sDaemonSets(ctx context.Context) []map[string]any {
dss := kubectlJSON(ctx, "get", "daemonsets", "-A")
out := []map[string]any{}
for _, d := range dss {
meta, _ := d["metadata"].(map[string]any)
status, _ := d["status"].(map[string]any)
ns, _ := meta["namespace"].(string)
name, _ := meta["name"].(string)
ready := intFrom(status["numberReady"])
desired := intFrom(status["desiredNumberScheduled"])
out = append(out, map[string]any{ "namespace": ns, "name": name, "ready": ready, "desired": desired })
}
return out
}
func k8sStatefulSets(ctx context.Context) []map[string]any {
sfs := kubectlJSON(ctx, "get", "statefulsets", "-A")
out := []map[string]any{}
for _, s := range sfs {
meta, _ := s["metadata"].(map[string]any)
status, _ := s["status"].(map[string]any)
ns, _ := meta["namespace"].(string)
name, _ := meta["name"].(string)
ready := intFrom(status["readyReplicas"])
replicas := intFrom(status["replicas"])
out = append(out, map[string]any{ "namespace": ns, "name": name, "ready": ready, "replicas": replicas })
}
return out
}
func k8sCronJobs(ctx context.Context) []map[string]any {
cjs := kubectlJSON(ctx, "get", "cronjobs", "-A")
out := []map[string]any{}
for _, c := range cjs {
meta, _ := c["metadata"].(map[string]any)
ns, _ := meta["namespace"].(string)
name, _ := meta["name"].(string)
spec, _ := c["spec"].(map[string]any)
sch, _ := spec["schedule"].(string)
out = append(out, map[string]any{ "namespace": ns, "name": name, "schedule": sch })
}
return out
}
// quantityPct — приблизительный процент: allocatable/capacity
func quantityPct(alloc any, cap any, scale int) float64 {
a := parseQuantity(alloc, scale)
c := parseQuantity(cap, scale)
if c == 0 { return 0 }
return float64(a) * 100.0 / float64(c)
}
func parseQuantity(v any, scale int) uint64 {
s := ""
switch t := v.(type) {
case string:
s = t
case map[string]any:
if vs, ok := t["string"].(string); ok { s = vs }
}
s = strings.TrimSpace(strings.ToLower(s))
s = strings.TrimSuffix(s, "m") // millicores
s = strings.TrimSuffix(s, "mi") // mebibytes
s = strings.TrimSuffix(s, "gi") // gibibytes
s = strings.TrimSuffix(s, "g")
s = strings.TrimSuffix(s, "m")
n, _ := strconv.ParseFloat(s, 64)
if strings.HasSuffix(strings.ToLower(s), "gi") || strings.HasSuffix(strings.ToLower(s), "g") { n *= 1024 * 1024 * 1024 }
if strings.HasSuffix(strings.ToLower(s), "mi") { n *= 1024 * 1024 }
if scale == 1000 { return uint64(n * 1000) }
return uint64(n)
}
func quantityToBytes(v any) uint64 {
s := ""
switch t := v.(type) {
case string: s = t
case map[string]any:
if vs, ok := t["string"].(string); ok { s = vs }
}
s = strings.TrimSpace(strings.ToLower(s))
mult := float64(1)
for _, suf := range []struct{K string; M float64}{
{"ki", 1024}, {"mi", 1024*1024}, {"gi", 1024*1024*1024},
{"k", 1000}, {"m", 1000*1000}, {"g", 1000*1000*1000},
} {
if strings.HasSuffix(s, suf.K) { mult = suf.M; s = strings.TrimSuffix(s, suf.K); break }
}
f, _ := strconv.ParseFloat(s, 64)
return uint64(f * mult)
}
func intFrom(v any) int { switch t := v.(type) { case float64: return int(t); case int: return t; case string: i, _ := strconv.Atoi(t); return i; default: return 0 } }