package main
import ( "encoding/json" "fmt" "os" "os/exec" "strconv" "strings" "sync" "sync/atomic" "time"
"github.com/olekukonko/tablewriter" "github.com/olekukonko/tablewriter/renderer" "github.com/olekukonko/tablewriter/tw" )
type CephMonStateQuorum struct { Rank int64 `json:"rank"` Name string `json:"name"` }
type CephMonState struct { Epoch int64 `json:"epoch"` MinMonReleaseName string `json:"min_mon_release_name"` NumMons int64 `json:"num_mons"` Leader string `json:"leader"` Quorum []CephMonStateQuorum `json:"quorum"` }
type CephPool struct { ID int64 `json:"pool_id"` Name string `json:"pool_name"` FlagsNames string `json:"flags_names"` Type int64 `json:"type"` Size int64 `json:"size"` MinSize int64 `json:"min_size"` PGAutoscaleMode string `json:"pg_autoscale_mode"` PGNum int64 `json:"pg_num"` TargetMaxBytes int64 `json:"target_max_bytes"` TargetMaxObjects int64 `json:"target_max_objects"` ApplicationMetadata map[string]interface{} `json:"application_metadata"` }
type CephRBDPoolImage struct { Name string `json:"name"` ID string `json:"id"` Size int64 `json:"size"` Objects int64 `json:"objects"` SnapshotCount int64 `json:"snapshot_count"` BlockNamePrefix string `json:"block_name_prefix"` Format int64 `json:"format"` }
type CephMONSessionAddrItem struct { Type string `json:"type"` Addr string `json:"addr"` Nonce int64 `json:"nonce"` }
type CephMONSessionAddrs struct { AddrVec []CephMONSessionAddrItem `json:"addrvec"` }
type CephMONSession struct { Name string `json:"name"` EntityName string `json:"entity_name"` Addrs CephMONSessionAddrs `json:"addrs"` SocketAddr CephMONSessionAddrItem `json:"socket_addr"` ConType string `json:"con_type"` Open bool `json:"open"` Authenticated bool `json:"authenticated"` GlobalId int64 `json:"global_id"` GlobalIdStatus string `json:"global_id_status"` OsdEpoch int64 `json:"osd_epoch"` RemoteHost string `json:"remote_host"` }
type CephRBDClientMetadata struct { Image CephRBDPoolImage `json:"image"` }
type CephRBDClient struct { Pool string `json:"pool"` Image string `json:"image"` Entity string `json:"entity"` IP string `json:"ip"` Hostname string `json:"hostname"` Type string `json:"type"` Gid int64 `json:"gid"` Metadata string `json:"medata"` }
type ImageTask struct { pool *CephPool img string }
type progressCounter struct { total int64 completed int64 poolName string }
func runCmd(cmd string) (string, error) { cmdobj := exec.Command("bash", "-c", cmd) ret, err := cmdobj.CombinedOutput() if err != nil { return "", fmt.Errorf("run cmd error, cmd: %s, err: %s", cmd, ret) } return string(ret), nil }
func getMONState(conf string) (cret *CephMonState, cerr error) { cmd := fmt.Sprintf("ceph --conf %s --user admin -f json mon stat", conf) ret, err := runCmd(cmd) if err != nil { return nil, fmt.Errorf("get ceph mon state error, cmd: %s", cmd) } var state CephMonState if err := json.Unmarshal([]byte(ret), &state); err != nil { return nil, fmt.Errorf("parse ceph mon state error, cmd: %s, ret: %s", cmd, ret) }
return &state, nil }
func getMONSessions(conf string) (cret map[int64]*CephMONSession, cerr error) { monstat, err := getMONState(conf) if err != nil { return nil, fmt.Errorf("get ceph mon state error") }
sessions := make(map[int64]*CephMONSession) for _, quorum := range monstat.Quorum { cmd := fmt.Sprintf("ceph --conf %s --user admin -f json tell mon.%s sessions", conf, quorum.Name) ret, err := runCmd(cmd) if err != nil { return nil, fmt.Errorf("get ceph mon sessions error, cmd: %s", cmd) } var monsessions []*CephMONSession if err := json.Unmarshal([]byte(ret), &monsessions); err != nil { return nil, fmt.Errorf("parse ceph mon sessions error, cmd: %s, ret: %s", cmd, ret) }
for _, session := range monsessions { if _, exists := sessions[session.GlobalId]; !exists { sessions[session.GlobalId] = session } } }
return sessions, nil }
func getPools(conf string) (cret []*CephPool, cerr error) { cmd := fmt.Sprintf("ceph --conf %s --user admin -f json osd pool ls detail", conf) ret, err := runCmd(cmd) if err != nil { return nil, fmt.Errorf("get ceph osd pool error, cmd: %s", cmd) } var pools []*CephPool if err := json.Unmarshal([]byte(ret), &pools); err != nil { return nil, fmt.Errorf("parse ceph osd pool error, cmd: %s, ret: %s", cmd, ret) }
return pools, nil }
func processImage(conf string, task *ImageTask, monsessions map[int64]*CephMONSession) ([]*CephRBDClient, []error) { pool := task.pool img := task.img
imginfocmd := fmt.Sprintf("rbd --conf %s --id admin info --format json --pool %s --image %s", conf, pool.Name, img) imginforet, err := runCmd(imginfocmd) if err != nil { return nil, []error{fmt.Errorf("get image info error, pool: %s, img: %s: %v", pool.Name, img, err)} }
var imginfo CephRBDPoolImage if err := json.Unmarshal([]byte(imginforet), &imginfo); err != nil { return nil, []error{fmt.Errorf("parse image info error, pool: %s, img: %s: %v", pool.Name, img, err)} }
blocknamepre := strings.Split(imginfo.BlockNamePrefix, ".") if len(blocknamepre) != 2 { return nil, []error{fmt.Errorf("invalid block_name_prefix: %s, pool: %s, img: %s", imginfo.BlockNamePrefix, pool.Name, img)} }
var clients []*CephRBDClient var errs []error cmd := fmt.Sprintf("rados --conf %s --id admin --format json --pool %s listwatchers rbd_header.%s", conf, pool.Name, blocknamepre[1]) ret, err := runCmd(cmd) if err != nil { return nil, []error{fmt.Errorf("get watchers error, pool: %s, img: %s", pool.Name, img)} } for _, imgwatchline := range strings.Split(ret, "\n") { lineinfo := strings.Split(imgwatchline, " ") if len(lineinfo) != 3 { continue }
cwatcherraw1 := strings.Split(lineinfo[0], "=") if len(cwatcherraw1) != 2 { continue } cwatcherraw2 := strings.Split(cwatcherraw1[1], ":") if len(cwatcherraw2) != 2 { continue } ip := cwatcherraw2[0]
gidraw := strings.Split(lineinfo[1], ".") if len(gidraw) != 2 { continue } gid, _ := strconv.ParseInt(gidraw[1], 10, 64)
session, exists := monsessions[gid] if !exists { return nil, []error{fmt.Errorf("get rbd client gid error, pool: %s, img: %s: %v", pool.Name, img, err)} }
client := &CephRBDClient{ IP: ip, Hostname: session.RemoteHost, Type: "", Gid: gid, Entity: session.EntityName, Pool: pool.Name, Image: img, }
clients = append(clients, client) }
return clients, errs }
func showProgress(counter *progressCounter) { ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop()
for range ticker.C { completed := atomic.LoadInt64(&counter.completed) total := atomic.LoadInt64(&counter.total)
if total == 0 { continue }
percent := float64(completed) / float64(total) * 100 fmt.Printf("\rProcessing pool %s: %d/%d (%.2f%%)", counter.poolName, completed, total, percent)
if completed >= total { fmt.Printf("\rProcessing pool %s: %d/%d (100.00%%)\n", counter.poolName, total, total) break } } }
func printClientTable(clients []*CephRBDClient) { symbols := tw.NewSymbolCustom("Nature").WithRow("-").WithColumn("|") table := tablewriter.NewTable(os.Stdout, tablewriter.WithRenderer(renderer.NewBlueprint(tw.Rendition{Symbols: symbols}))) table.Header([]string{"Pool", "Image", "Entity", "Client IP", "GID"})
for _, client := range clients { table.Append([]string{ client.Pool, client.Image, client.Entity, client.IP, strconv.FormatInt(client.Gid, 10), }) }
table.Render() }
func main() { if len(os.Args) < 4 { fmt.Println("Usage: go run ./ ceph_config_file ceph_keyring_file concurrency") fmt.Println("Example: go run ./ ceph.conf ceph.client.admin.keyring 20") os.Exit(1) }
cephConfig := os.Args[1] cephKeyring := os.Args[2] concurrency, err := strconv.Atoi(os.Args[3]) if err != nil || concurrency <= 0 { fmt.Println("Error: concurrency must be a positive integer") os.Exit(1) }
for _, file := range []string{cephConfig, cephKeyring} { if _, err := os.Stat(file); err != nil { if os.IsNotExist(err) { fmt.Printf("Error: file not found: %s\n", file) } else { fmt.Printf("Error: accessing file %s: %v\n", file, err) } os.Exit(1) } }
monsessions, err := getMONSessions(cephConfig) if err != nil { fmt.Printf("Error: get ceph mon sessions: %v\n", err) os.Exit(1) }
pools, err := getPools(cephConfig) if err != nil { fmt.Printf("Error: get ceph osd pools: %v\n", err) os.Exit(1) }
var allClients []*CephRBDClient var allErrors []error var wgResults sync.WaitGroup wgResults.Add(1)
errors := make(chan error, 1000) go func() { for err := range errors { allErrors = append(allErrors, err) } wgResults.Done() }()
for _, pool := range pools { if _, exists := pool.ApplicationMetadata["rbd"]; !exists { fmt.Printf("Skipping non-RBD pool: %s\n", pool.Name) continue } fmt.Printf("\nProcessing RBD pool: %s\n", pool.Name)
cmd := fmt.Sprintf("rbd --conf %s --id admin list --format json --pool %s", cephConfig, pool.Name) ret, err := runCmd(cmd) if err != nil { fmt.Printf("Error: get rbd images for pool %s: %v\n", pool.Name, err) continue } var images []string if err := json.Unmarshal([]byte(ret), &images); err != nil { fmt.Printf("Error: parse images for pool %s: %v\n", pool.Name, err) continue } totalImages := len(images) if totalImages == 0 { fmt.Printf("No images found in pool %s\n", pool.Name) continue }
counter := &progressCounter{ total: int64(totalImages), poolName: pool.Name, }
go showProgress(counter)
taskChan := make(chan *ImageTask, totalImages) results := make(chan []*CephRBDClient, totalImages)
for _, img := range images { taskChan <- &ImageTask{ pool: pool, img: img, } } close(taskChan)
var wg sync.WaitGroup wg.Add(concurrency)
for i := 0; i < concurrency; i++ { go func() { defer wg.Done() for task := range taskChan { clients, errs := processImage(cephConfig, task, monsessions)
if len(clients) > 0 { results <- clients }
for _, err := range errs { errors <- err }
atomic.AddInt64(&counter.completed, 1) } }() }
go func() { for clients := range results { allClients = append(allClients, clients...) } }()
wg.Wait() close(results) }
close(errors) wgResults.Wait()
if len(allErrors) > 0 { fmt.Printf("\n\nEncountered %d errors:\n", len(allErrors)) for i, err := range allErrors { fmt.Printf(" [%d] %s\n", i+1, err) } }
fmt.Printf("\nFound %d RBD client connections:\n", len(allClients)) printClientTable(allClients) }
|