feat: улучшен коллектор proxvmservices - добавлено обнаружение нод кластеров

- Добавлено поле cluster_nodes в структуру ServiceInfo для вывода IP всех нод кластера
- Реализовано извлечение нод для PostgreSQL с Patroni через patronictl list
- Реализовано извлечение нод для etcd кластера через etcdctl member list
- Реализовано извлечение нод для Kubernetes кластера через kubectl get nodes
- Реализовано извлечение нод для Redis кластера через redis-cli cluster nodes
- Реализовано извлечение нод для ClickHouse кластера через system.clusters
- Реализовано извлечение нод для RabbitMQ кластера через rabbitmqctl cluster_status
- Реализовано извлечение нод для Kafka кластера (базовая реализация)
- Реализовано извлечение нод для MongoDB кластера через rs.status()
- Добавлены вспомогательные функции: extractIPFromURL, isValidIP, resolveHostname
- Исправлена паника в extractClusterNodes при работе с nil указателями
- Протестировано на удаленных серверах

Автор: Сергей Антропов
Сайт: https://devops.org.ru
This commit is contained in:
Sergey Antropoff 2025-09-15 17:19:54 +03:00
parent 5fa101dfff
commit ceab977da1

View File

@ -19,14 +19,15 @@ import (
// ServiceInfo представляет информацию о сервисе // ServiceInfo представляет информацию о сервисе
type ServiceInfo struct { type ServiceInfo struct {
Name string `json:"name"` Name string `json:"name"`
Type string `json:"type"` // standalone/cluster Type string `json:"type"` // standalone/cluster
Status string `json:"status"` // running/stopped/unknown Status string `json:"status"` // running/stopped/unknown
Version string `json:"version"` Version string `json:"version"`
Ports []int `json:"ports"` Ports []int `json:"ports"`
Config map[string]any `json:"config"` Config map[string]any `json:"config"`
Cluster interface{} `json:"cluster,omitempty"` Cluster interface{} `json:"cluster,omitempty"`
Connections []ConnectionInfo `json:"connections,omitempty"` ClusterNodes []string `json:"cluster_nodes,omitempty"` // IP всех нод кластера
Connections []ConnectionInfo `json:"connections,omitempty"`
} }
// ConnectionInfo представляет информацию о соединениях // ConnectionInfo представляет информацию о соединениях
@ -206,15 +207,19 @@ func detectPostgreSQL() *ServiceInfo {
// Получаем информацию о репликации // Получаем информацию о репликации
connections := getPostgreSQLConnections() connections := getPostgreSQLConnections()
// Извлекаем IP всех нод кластера
clusterNodes := extractClusterNodes(cluster)
return &ServiceInfo{ return &ServiceInfo{
Name: "postgresql", Name: "postgresql",
Type: determineServiceType(cluster), Type: determineServiceType(cluster),
Status: "running", Status: "running",
Version: version, Version: version,
Ports: ports, Ports: ports,
Config: config, Config: config,
Cluster: cluster, Cluster: cluster,
Connections: connections, ClusterNodes: clusterNodes,
Connections: connections,
} }
} }
@ -472,6 +477,331 @@ func determineServiceType(cluster interface{}) string {
return "standalone" return "standalone"
} }
// extractClusterNodes извлекает IP всех нод кластера из различных типов кластеров
func extractClusterNodes(cluster interface{}) []string {
var nodes []string
if cluster == nil {
return nodes
}
// Для Patroni кластера
if patroniCluster, ok := cluster.(*PatroniClusterInfo); ok && patroniCluster != nil {
for _, member := range patroniCluster.Members {
if member.Host != "" {
nodes = append(nodes, member.Host)
}
}
return nodes
}
// Для etcd кластера
if etcdCluster, ok := cluster.(*EtcdClusterInfo); ok && etcdCluster != nil {
for _, member := range etcdCluster.Members {
// Извлекаем IP из client_urls
for _, url := range member.ClientURLs {
if ip := extractIPFromURL(url); ip != "" {
nodes = append(nodes, ip)
}
}
}
return nodes
}
// Для Kubernetes кластера - ноды уже получены в detectKubernetes
// и переданы через clusterNodes, поэтому здесь ничего не делаем
return nodes
}
// extractIPFromURL извлекает IP из URL (например, "http://10.14.246.77:2379" -> "10.14.246.77")
func extractIPFromURL(url string) string {
// Убираем протокол
if strings.Contains(url, "://") {
parts := strings.Split(url, "://")
if len(parts) > 1 {
url = parts[1]
}
}
// Извлекаем IP:порт
if strings.Contains(url, ":") {
parts := strings.Split(url, ":")
if len(parts) > 0 {
ip := parts[0]
// Проверяем, что это валидный IP
if isValidIP(ip) {
return ip
}
}
}
return ""
}
// isValidIP проверяет, является ли строка валидным IP адресом
func isValidIP(ip string) bool {
parts := strings.Split(ip, ".")
if len(parts) != 4 {
return false
}
for _, part := range parts {
if num, err := strconv.Atoi(part); err != nil || num < 0 || num > 255 {
return false
}
}
return true
}
// getKubernetesNodes получает IP всех нод Kubernetes кластера
func getKubernetesNodes() []string {
var nodes []string
// Получаем список нод
output, err := runCommand("kubectl", "get", "nodes", "-o", "jsonpath={.items[*].status.addresses[?(@.type==\"InternalIP\")].address}")
if err != nil {
// Пробуем альтернативный способ
output, err = runCommand("kubectl", "get", "nodes", "-o", "wide", "--no-headers")
if err != nil {
return nodes
}
// Парсим вывод kubectl get nodes -o wide
lines := strings.Split(output, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}
// Парсим строку типа: "kube-dbrain-node01 Ready control-plane,master 236d v1.28.2 10.14.246.75 10.14.246.75 Ubuntu 22.04.3 LTS 5.15.0-91-generic containerd://1.7.6"
parts := strings.Fields(line)
if len(parts) >= 6 {
// IP обычно в 6-м поле (индекс 5)
ip := parts[5]
if isValidIP(ip) {
nodes = append(nodes, ip)
}
}
}
return nodes
}
// Парсим JSONPath вывод
if output != "" {
ips := strings.Fields(output)
for _, ip := range ips {
if isValidIP(ip) {
nodes = append(nodes, ip)
}
}
}
return nodes
}
// getRedisClusterNodes получает ноды Redis кластера
func getRedisClusterNodes() []string {
var nodes []string
// Пробуем получить информацию о кластере
output, err := runCommand("redis-cli", "cluster", "nodes")
if err != nil {
// Если не кластер, возвращаем только localhost
return []string{"127.0.0.1"}
}
// Парсим вывод cluster nodes
lines := strings.Split(output, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}
// Парсим строку типа: "abc123... 127.0.0.1:7000@17000 master - 0 1234567890 1 connected 0-5460"
parts := strings.Fields(line)
if len(parts) >= 2 {
// IP:порт находится во втором поле
hostPort := parts[1]
if strings.Contains(hostPort, ":") {
ip := strings.Split(hostPort, ":")[0]
if isValidIP(ip) {
nodes = append(nodes, ip)
}
}
}
}
// Если не удалось получить ноды кластера, возвращаем localhost
if len(nodes) == 0 {
nodes = []string{"127.0.0.1"}
}
return nodes
}
// getClickHouseClusterNodes получает ноды ClickHouse кластера
func getClickHouseClusterNodes() []string {
var nodes []string
// Пробуем получить информацию о кластере через system.clusters
output, err := runCommand("clickhouse-client", "--query", "SELECT host_name FROM system.clusters")
if err != nil {
// Если не удалось, возвращаем localhost
return []string{"127.0.0.1"}
}
// Парсим вывод
lines := strings.Split(output, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}
// Проверяем, является ли это IP адресом
if isValidIP(line) {
nodes = append(nodes, line)
}
}
// Если не удалось получить ноды кластера, возвращаем localhost
if len(nodes) == 0 {
nodes = []string{"127.0.0.1"}
}
return nodes
}
// getRabbitMQClusterNodes получает ноды RabbitMQ кластера
func getRabbitMQClusterNodes() []string {
var nodes []string
// Пробуем получить информацию о кластере
output, err := runCommand("rabbitmqctl", "cluster_status")
if err != nil {
// Если не удалось, возвращаем localhost
return []string{"127.0.0.1"}
}
// Парсим вывод cluster_status
lines := strings.Split(output, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if strings.Contains(line, "running_nodes") {
// Извлекаем ноды из строки типа: "running_nodes,['rabbit@node1','rabbit@node2']"
re := regexp.MustCompile(`'rabbit@([^']+)'`)
matches := re.FindAllStringSubmatch(line, -1)
for _, match := range matches {
if len(match) > 1 {
hostname := match[1]
// Пробуем разрешить hostname в IP
if ip := resolveHostname(hostname); ip != "" {
nodes = append(nodes, ip)
} else {
nodes = append(nodes, hostname)
}
}
}
break
}
}
// Если не удалось получить ноды кластера, возвращаем localhost
if len(nodes) == 0 {
nodes = []string{"127.0.0.1"}
}
return nodes
}
// resolveHostname пытается разрешить hostname в IP адрес
func resolveHostname(hostname string) string {
// Простая проверка, является ли это уже IP
if isValidIP(hostname) {
return hostname
}
// Пробуем разрешить через nslookup или getent
output, err := runCommand("getent", "hosts", hostname)
if err != nil {
return ""
}
// Парсим вывод getent hosts
parts := strings.Fields(output)
if len(parts) > 0 {
ip := parts[0]
if isValidIP(ip) {
return ip
}
}
return ""
}
// getKafkaClusterNodes получает ноды Kafka кластера
func getKafkaClusterNodes() []string {
var nodes []string
// Пробуем получить информацию о кластере через kafka-topics
_, err := runCommand("kafka-topics", "--bootstrap-server", "localhost:9092", "--list")
if err != nil {
// Если не удалось, возвращаем localhost
return []string{"127.0.0.1"}
}
// Для Kafka пока возвращаем localhost, так как получение нод кластера
// требует более сложной логики через JMX или конфигурационные файлы
// TODO: улучшить обнаружение нод Kafka кластера
nodes = []string{"127.0.0.1"}
return nodes
}
// getMongoDBClusterNodes получает ноды MongoDB кластера
func getMongoDBClusterNodes() []string {
var nodes []string
// Пробуем получить информацию о кластере
output, err := runCommand("mongosh", "--quiet", "--eval", "rs.status().members.map(m => m.name)")
if err != nil {
// Если не удалось, возвращаем localhost
return []string{"127.0.0.1"}
}
// Парсим вывод MongoDB
lines := strings.Split(output, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" || line == "null" {
continue
}
// Убираем кавычки и скобки
line = strings.Trim(line, "[]\"'")
if strings.Contains(line, ":") {
// Извлекаем hostname:port
hostname := strings.Split(line, ":")[0]
if ip := resolveHostname(hostname); ip != "" {
nodes = append(nodes, ip)
} else {
nodes = append(nodes, hostname)
}
}
}
// Если не удалось получить ноды кластера, возвращаем localhost
if len(nodes) == 0 {
nodes = []string{"127.0.0.1"}
}
return nodes
}
// detectEtcd обнаруживает etcd // detectEtcd обнаруживает etcd
func detectEtcd() *ServiceInfo { func detectEtcd() *ServiceInfo {
// Проверяем процессы // Проверяем процессы
@ -491,14 +821,18 @@ func detectEtcd() *ServiceInfo {
// Проверяем кластер // Проверяем кластер
cluster := getEtcdCluster() cluster := getEtcdCluster()
// Извлекаем IP всех нод кластера
clusterNodes := extractClusterNodes(cluster)
return &ServiceInfo{ return &ServiceInfo{
Name: "etcd", Name: "etcd",
Type: "cluster", Type: "cluster",
Status: "running", Status: "running",
Version: version, Version: version,
Ports: ports, Ports: ports,
Config: config, Config: config,
Cluster: cluster, Cluster: cluster,
ClusterNodes: clusterNodes,
} }
} }
@ -643,7 +977,7 @@ func parseEtcdMembers(output string) []EtcdMember {
return members return members
} }
// Заглушки для остальных сервисов (пока не реализованы) // detectRedis обнаруживает Redis (standalone или cluster)
func detectRedis() *ServiceInfo { func detectRedis() *ServiceInfo {
if !isProcessRunning("redis-server") { if !isProcessRunning("redis-server") {
return nil return nil
@ -661,13 +995,21 @@ func detectRedis() *ServiceInfo {
} }
} }
// Проверяем, является ли это Redis кластером
clusterNodes := getRedisClusterNodes()
serviceType := "standalone"
if len(clusterNodes) > 1 {
serviceType = "cluster"
}
return &ServiceInfo{ return &ServiceInfo{
Name: "redis", Name: "redis",
Type: "standalone", Type: serviceType,
Status: "running", Status: "running",
Version: version, Version: version,
Ports: ports, Ports: ports,
Config: make(map[string]any), Config: make(map[string]any),
ClusterNodes: clusterNodes,
} }
} }
@ -688,13 +1030,21 @@ func detectClickHouse() *ServiceInfo {
} }
} }
// Получаем ноды ClickHouse кластера
clusterNodes := getClickHouseClusterNodes()
serviceType := "standalone"
if len(clusterNodes) > 1 {
serviceType = "cluster"
}
return &ServiceInfo{ return &ServiceInfo{
Name: "clickhouse", Name: "clickhouse",
Type: "standalone", Type: serviceType,
Status: "running", Status: "running",
Version: version, Version: version,
Ports: ports, Ports: ports,
Config: make(map[string]any), Config: make(map[string]any),
ClusterNodes: clusterNodes,
} }
} }
@ -715,13 +1065,21 @@ func detectRabbitMQ() *ServiceInfo {
} }
} }
// Получаем ноды RabbitMQ кластера
clusterNodes := getRabbitMQClusterNodes()
serviceType := "standalone"
if len(clusterNodes) > 1 {
serviceType = "cluster"
}
return &ServiceInfo{ return &ServiceInfo{
Name: "rabbitmq", Name: "rabbitmq",
Type: "standalone", Type: serviceType,
Status: "running", Status: "running",
Version: version, Version: version,
Ports: ports, Ports: ports,
Config: make(map[string]any), Config: make(map[string]any),
ClusterNodes: clusterNodes,
} }
} }
@ -733,13 +1091,21 @@ func detectKafka() *ServiceInfo {
ports := getListeningPorts(9092) ports := getListeningPorts(9092)
version := "unknown" version := "unknown"
// Получаем ноды Kafka кластера
clusterNodes := getKafkaClusterNodes()
serviceType := "standalone"
if len(clusterNodes) > 1 {
serviceType = "cluster"
}
return &ServiceInfo{ return &ServiceInfo{
Name: "kafka", Name: "kafka",
Type: "standalone", Type: serviceType,
Status: "running", Status: "running",
Version: version, Version: version,
Ports: ports, Ports: ports,
Config: make(map[string]any), Config: make(map[string]any),
ClusterNodes: clusterNodes,
} }
} }
@ -760,13 +1126,21 @@ func detectMongoDB() *ServiceInfo {
} }
} }
// Получаем ноды MongoDB кластера
clusterNodes := getMongoDBClusterNodes()
serviceType := "standalone"
if len(clusterNodes) > 1 {
serviceType = "cluster"
}
return &ServiceInfo{ return &ServiceInfo{
Name: "mongodb", Name: "mongodb",
Type: "standalone", Type: serviceType,
Status: "running", Status: "running",
Version: version, Version: version,
Ports: ports, Ports: ports,
Config: make(map[string]any), Config: make(map[string]any),
ClusterNodes: clusterNodes,
} }
} }
@ -787,12 +1161,16 @@ func detectKubernetes() *ServiceInfo {
} }
} }
// Получаем информацию о нодах кластера
clusterNodes := getKubernetesNodes()
return &ServiceInfo{ return &ServiceInfo{
Name: "kubernetes", Name: "kubernetes",
Type: "cluster", Type: "cluster",
Status: "running", Status: "running",
Version: version, Version: version,
Ports: ports, Ports: ports,
Config: make(map[string]any), Config: make(map[string]any),
ClusterNodes: clusterNodes,
} }
} }