Ceph 命令注册及执行流程
一、介绍
本文的内容基于 Ceph v20.2.0 版本进行分析。
分析 ceph -h 的输出信息可以看到,其支持两种类型的命令,分别是 Local Commands 和 Monitor Commands。其中 Local Commands 比较典型的就是 ceph daemon {type.id|path} <cmd> 命令,用于直接和本地组件的 socket 进行通信,执行一些命令。而 Monitor Commands 则是将相关命令发送给 Monitor,有些是由 Monitor 自身处理该命令并返回,也有一些只是经由 Monitor 中转给其他组件执行(比如 ceph tell osd.0 * 等命令)。
- 对于 Local Commands,我们分析每个组件(MON/MGR/OSD/MDS)的 admin socket 命令的注册和执行流程;
- 对于 Monitor Commands,我们分析
二、Local Commands 分析
对于本地的命令,我们仅分析每个组件的 admin socket 的命令注册和执行的流程。注意:虽然 ceph daemon {type.id|path} <cmd> 和 ceph tell <type.id> <args>... 可以达到相同的效果,但是两个命令的执行链路并不相同。
2.1、命令注册流程
- 所有注册的命令存储在 AdminSocket 类的成员变量 hooks 中,其类型为 std::multimap<std::string, hook_info, std::less<>>;
- 所有 admin socket 命令都是通过执行 AdminSocket::register_command 函数注册的,注册时允许重复 key ,但是要求相同命令的 desc 要不同;
- 之后通过调用 common_init_finish 和 AdminSocket::init 函数来初始化对应的 Unix Socket;
- AdminSocket::init 函数内部会启动一个 admin_socket 的线程来接受和处理 socket 请求,对应的线程入口函数为 AdminSocket::entry ;
|
2.1.1、MON
monitor 服务的启动服务入口为 src/ceph_mon.cc 文件。
注册命令的函数:
- CephContext::CephContext 构造函数 : 内部注册了一些各组件通用的命令,比如
config等命令,注意其中有些命令的 help 字段内容为空,当使用客户端获取可执行的命令,这些命令并不会在命令列表中展示,但是仍然可以被执行; - MempoolObs::MempoolObs 构造函数 : 其中仅注册了一个
dump_mempools命令; - AdminSocket::init 函数 : 内部注册了一些各组件通用的命令,比如
version,git_version,help等命令,其中help命令在输出可用命令时会过滤每个已注册命令的 help 字段,如果为空则不输出对应的命令; - AsyncMessenger::AsyncMessenger 构造函数 : 其中仅注册了一个
messenger dump命令; - Monitor::preinit 函数 : 其中注册了大量的使用 COMMAND_WITH_FLAG 宏定义的带有 FLAG(TELL) 的命令;
注册命令的函数调用链路:
|
注册命令的函数调用链路示意图:
graph TD
%% 定义样式
classDef outerNode fill:#e1f5fe,stroke:#01579b,stroke-width:1px
classDef middleNode fill:#fff3e0,stroke:#e65100,stroke-width:1px
classDef finalNode fill:#e8f5e8,stroke:#2e7d32,stroke-width:1px
%% 主入口节点
main[main]:::outerNode
%% 最长的中间链路(居中)
main --> global_init[global_init]:::middleNode
global_init --> global_pre_init[global_pre_init]:::middleNode
global_pre_init --> common_preinit[common_preinit]:::middleNode
common_preinit --> CephContext_CephContext[CephContext::CephContext]:::middleNode
CephContext_CephContext --> register_command1[register_command]:::finalNode
%% 从CephContext::CephContext分叉的链路
CephContext_CephContext --> MempoolObs_MempoolObs[MempoolObs::MempoolObs]:::middleNode
MempoolObs_MempoolObs --> register_command2[register_command]:::finalNode
%% 从main分叉的较短链路(右侧)
main --> common_init_finish[common_init_finish]:::middleNode
common_init_finish --> CephContext_start_service_thread[CephContext::start_service_thread]:::middleNode
CephContext_start_service_thread --> AdminSocket_init[AdminSocket::init]:::middleNode
AdminSocket_init --> register_command3[register_command]:::finalNode
%% 从main分叉的较短链路(左侧)
main --> Messenger_create[Messenger::create]:::middleNode
Messenger_create --> AsyncMessenger_AsyncMessenger[AsyncMessenger::AsyncMessenger]:::middleNode
AsyncMessenger_AsyncMessenger --> register_command4[register_command]:::finalNode
%% 从main分叉的最短链路(最左侧)
main --> Monitor_preinit[Monitor::preinit]:::middleNode
Monitor_preinit --> register_command5[register_command]:::finalNode
%% 对齐所有最终节点
register_command1
register_command2
register_command3
register_command4
register_command5相关代码:
|
2.1.2、MGR
manager 服务的启动服务入口为 src/ceph_mgr.cc 文件。
注册命令的函数:
- CephContext::CephContext 构造函数 : 内部注册了一些各组件通用的命令,比如
config等命令,注意其中有些命令的 help 字段内容为空,当使用客户端获取可执行的命令,这些命令并不会在命令列表中展示,但是仍然可以被执行; - MempoolObs::MempoolObs 构造函数 : 其中仅注册了一个
dump_mempools命令; - AdminSocket::init 函数 : 内部注册了一些各组件通用的命令,比如
version,git_version,help等命令,其中help命令在输出可用命令时会过滤每个已注册命令的 help 字段,如果为空则不输出对应的命令; - AsyncMessenger::AsyncMessenger 构造函数 : 其中仅注册了一个
messenger dump命令; - MgrStandby::init 函数 : 其中仅注册了一个
status命令; - MonClient::init 函数 : 其中仅注册了一个
rotate-key命令; - Objecter::init 函数 : 其中仅注册了一个
objecter_requests命令; - Mgr::init 函数 : 其中仅注册了一个
mgr_status命令; - DaemonServer::init 函数 : 其中注册了一些使用
dump_为前缀的命令; - ClusterState::final_init 函数 : 其中仅注册了一个
dump_osd_network命令;
注册命令的函数调用链路:
|
注册命令的函数调用链路示意图:
graph TD
%% 定义样式
classDef outerNode fill:#e1f5fe,stroke:#01579b,stroke-width:1px
classDef middleNode fill:#fff3e0,stroke:#e65100,stroke-width:1px
classDef finalNode fill:#e8f5e8,stroke:#2e7d32,stroke-width:1px
%% 最外层节点
main["main"]:::outerNode
%% 中间节点
global_init["global_init"]:::middleNode
global_pre_init["global_pre_init"]:::middleNode
common_preinit["common_preinit"]:::middleNode
CephContext_CephContext1["CephContext::CephContext"]:::middleNode
MempoolObs_MempoolObs["MempoolObs::MempoolObs"]:::middleNode
common_init_finish["common_init_finish"]:::middleNode
CephContext_start_service_thread["CephContext::start_service_thread"]:::middleNode
AdminSocket_init["AdminSocket::init"]:::middleNode
MgrStandby_MgrStandby["MgrStandby::MgrStandby"]:::middleNode
Messenger_create["Messenger::create"]:::middleNode
AsyncMessenger_AsyncMessenger["AsyncMessenger::AsyncMessenger"]:::middleNode
MgrStandby_init["MgrStandby::init"]:::middleNode
MonClient_init["MonClient::init"]:::middleNode
Objecter_init["Objecter::init"]:::middleNode
Finisher_start["Finisher::start"]:::middleNode
Finisher_finisher_thread_entry["Finisher::finisher_thread_entry"]:::middleNode
Mgr_init["Mgr::init"]:::middleNode
DaemonServer_init["DaemonServer::init"]:::middleNode
ClusterState_final_init["ClusterState::final_init"]:::middleNode
%% 最终节点
register_command["register_command"]:::finalNode
%% 最长链路(中间)
main --> global_init
global_init --> global_pre_init
global_pre_init --> common_preinit
common_preinit --> CephContext_CephContext1
CephContext_CephContext1 --> register_command
%% 左侧较短链路
main --> common_init_finish
common_init_finish --> CephContext_start_service_thread
CephContext_start_service_thread --> AdminSocket_init
AdminSocket_init --> register_command
main --> MgrStandby_MgrStandby
MgrStandby_MgrStandby --> Messenger_create
Messenger_create --> AsyncMessenger_AsyncMessenger
AsyncMessenger_AsyncMessenger --> register_command
%% 右侧较短链路
main --> MgrStandby_init
MgrStandby_init --> register_command
MgrStandby_init --> MonClient_init
MonClient_init --> register_command
MgrStandby_init --> Objecter_init
Objecter_init --> register_command
%% 带执行信息的链路
MgrStandby_init --> Finisher_start
Finisher_start -- "执行创建线程逻辑" --> Finisher_finisher_thread_entry
Finisher_finisher_thread_entry -- "执行另一个线程中调用 Mgr::background_init 函数添加的 Mgr::init 函数" --> Mgr_init
%% Mgr_init 相关链路
Mgr_init --> register_command
Mgr_init --> DaemonServer_init
DaemonServer_init --> register_command
Mgr_init --> ClusterState_final_init
ClusterState_final_init --> register_command
%% CephContext_CephContext1 的另一个分支
CephContext_CephContext1 --> MempoolObs_MempoolObs
MempoolObs_MempoolObs --> register_commandMgr::init 相关流程:
- Manager 给 Monitor 发送订阅
mgrmap的消息; - Monitor 给 Manager 回复
mgrmap的订阅消息; - Manager 收到 Monitor 的消息后,调用
MgrStandby::handle_mgr_map函数处理消息,必要时通过创建Mgr对象并交由finisher线程执行Mgr::init操作。
2.1.3、OSD
osd 服务的启动服务入口为 src/ceph_osd.cc 文件。
注册命令的函数:
- CephContext::CephContext 构造函数 : 内部注册了一些各组件通用的命令,比如
config等命令,注意其中有些命令的 help 字段内容为空,当使用客户端获取可执行的命令,这些命令并不会在命令列表中展示,但是仍然可以被执行; - MempoolObs::MempoolObs 构造函数 : 其中仅注册了一个
dump_mempools命令; - AdminSocket::init 函数 : 内部注册了一些各组件通用的命令,比如
version,git_version,help等命令,其中help命令在输出可用命令时会过滤每个已注册命令的 help 字段,如果为空则不输出对应的命令; - AsyncMessenger::AsyncMessenger 构造函数 : 其中仅注册了一个
messenger dump命令; - MonClient::init 函数 : 其中仅注册了一个
rotate-key命令; - BlueStore::SocketHook::create 函数 : 其中注册了
bluestore bluefs device info,bluefs stats等命令; - BlueStore::BlueStore::SocketHook 构造函数 : 其中注册了
bluestore collections,bluestore list等命令; - AllocatorBase::SocketHook::SocketHook 构造函数 : 其中注册了
bluestore allocator dump block等后缀为block的命令; - OSD::final_init 函数 : 其中注册了大量的命令;
- Objecter::init 函数 : 其中仅注册了一个
objecter_requests命令;
注册命令的函数调用链路:
|
注册命令的函数调用链路示意图:
graph TD
%% 最外层节点
main["main"]:::outer
%% 第一层调用
main --> global_init["global_init"]
main --> common_init_finish["common_init_finish"]
main --> OSD_init["OSD::init"]
main --> OSD_final_init["OSD::final_init"]
main --> OSD_mkfs["OSD::mkfs"]
main --> ObjectStore_create["ObjectStore::create"]
main --> OSD_OSD["OSD::OSD"]
%% global_init 分支
global_init --> global_pre_init["global_pre_init"]
global_pre_init --> common_preinit["common_preinit"]
common_preinit --> CephContext_CephContext["CephContext::CephContext"]
CephContext_CephContext --> register_command_1["register_command"]
CephContext_CephContext --> MempoolObs_MempoolObs["MempoolObs::MempoolObs"]
MempoolObs_MempoolObs --> register_command_2["register_command"]
%% common_init_finish 分支
common_init_finish --> CephContext_start_service_thread["CephContext::start_service_thread"]
CephContext_start_service_thread --> AdminSocket_init["AdminSocket::init"]
AdminSocket_init --> register_command_3["register_command"]
%% OSD::init 分支(第一个)
OSD_init --> MonClient_init["MonClient::init"]
MonClient_init --> register_command_4["register_command"]
%% OSD::init 分支(第二个)
OSD_init --> BlueStore_mount["BlueStore::mount"]
BlueStore_mount --> BlueStore_mount_inner["BlueStore::_mount"]
BlueStore_mount_inner --> BlueStore_open_db_and_around["BlueStore::_open_db_and_around"]
BlueStore_open_db_and_around --> BlueStore_init_alloc["BlueStore::_init_alloc"]
BlueStore_init_alloc --> BlueStore_create_alloc["BlueStore::_create_alloc"]
BlueStore_create_alloc --> Allocator_create["Allocator::create"]
Allocator_create -- "创建对象时执行父类的构造函数" --> AllocatorBase_AllocatorBase["AllocatorBase::AllocatorBase"]
AllocatorBase_AllocatorBase --> AllocatorBase_SocketHook_SocketHook["AllocatorBase::SocketHook::SocketHook"]
AllocatorBase_SocketHook_SocketHook --> register_command_8["register_command"]
%% OSD::final_init 分支
OSD_final_init --> register_command_5["register_command"]
%% OSD::mkfs 分支(最长链路)
OSD_mkfs --> BlueStore_mkfs["BlueStore::mkfs"]
BlueStore_mkfs --> BlueStore_open_db["BlueStore::_open_db"]
BlueStore_open_db --> BlueStore_prepare_db_environment["BlueStore::_prepare_db_environment"]
BlueStore_prepare_db_environment --> BlueStore_open_bluefs["BlueStore::_open_bluefs"]
BlueStore_open_bluefs --> BlueStore_minimal_open_bluefs["BlueStore::_minimal_open_bluefs"]
BlueStore_minimal_open_bluefs --> BlueFS_BlueFS["BlueFS::BlueFS"]
BlueFS_BlueFS --> BlueFS_SocketHook_create["BlueFS::SocketHook::create"]
BlueFS_SocketHook_create --> register_command_6["register_command"]
%% ObjectStore::create 分支
ObjectStore_create --> BlueStore_BlueStore["BlueStore::BlueStore"]
BlueStore_BlueStore --> BlueStore_BlueStore_SocketHook["BlueStore::BlueStore::SocketHook"]
BlueStore_BlueStore_SocketHook --> register_command_7["register_command"]
%% OSD::OSD 分支
OSD_OSD --> OSDService_OSDService["OSDService::OSDService"]
OSDService_OSDService --> Objecter_init["Objecter::init"]
Objecter_init --> register_command_9["register_command"]
%% 样式定义
classDef outer fill:#e1f5fe,stroke:#01579b,stroke-width:1px
classDef inner fill:#fff3e0,stroke:#e65100,stroke-width:1px
classDef final fill:#e8f5e8,stroke:#2e7d32,stroke-width:1px
%% 节点分类
class main outer
class register_command_1,register_command_2,register_command_3,register_command_4,register_command_5,register_command_6,register_command_7,register_command_8,register_command_9 final
class global_init,common_init_finish,OSD_init,OSD_final_init,OSD_mkfs,ObjectStore_create,OSD_OSD,global_pre_init,common_preinit,CephContext_CephContext,MempoolObs_MempoolObs,CephContext_start_service_thread,AdminSocket_init,MonClient_init,BlueStore_mount,BlueStore_mount_inner,BlueStore_open_db_and_around,BlueStore_init_alloc,BlueStore_create_alloc,Allocator_create,AllocatorBase_AllocatorBase,AllocatorBase_SocketHook_SocketHook,BlueStore_mkfs,BlueStore_open_db,BlueStore_prepare_db_environment,BlueStore_open_bluefs,BlueStore_minimal_open_bluefs,BlueFS_BlueFS,BlueFS_SocketHook_create,BlueStore_BlueStore,BlueStore_BlueStore_SocketHook,OSDService_OSDService,Objecter_init inner2.1.4、MDS
mds 服务的启动服务入口为 src/ceph_mds.cc 文件。
注册命令的函数:
- CephContext::CephContext 构造函数 : 内部注册了一些各组件通用的命令,比如
config等命令,注意其中有些命令的 help 字段内容为空,当使用客户端获取可执行的命令,这些命令并不会在命令列表中展示,但是仍然可以被执行; - MempoolObs::MempoolObs 构造函数 : 其中仅注册了一个
dump_mempools命令; - AdminSocket::init 函数 : 内部注册了一些各组件通用的命令,比如
version,git_version,help等命令,其中help命令在输出可用命令时会过滤每个已注册命令的 help 字段,如果为空则不输出对应的命令; - AsyncMessenger::AsyncMessenger 构造函数 : 其中仅注册了一个
messenger dump命令; - MonClient::init 函数 : 其中仅注册了一个
rotate-key命令; - MDSDaemon::set_up_admin_socket 函数 : 其中注册了大量的命令;
- Objecter::init 函数 : 其中仅注册了一个
objecter_requests命令;
注册命令的函数调用链路:
|
注册命令的函数调用链路示意图:
graph TB
%% 定义样式
classDef outerNode fill:#e1f5f5,stroke:#01579b,stroke-width:1px
classDef middleNode fill:#fff3e0,stroke:#e65100,stroke-width:1px
classDef finalNode fill:#e8f5e8,stroke:#2e7d32,stroke-width:1px
%% 最外层节点
main["main"]:::outerNode
%% 中间节点
global_init["global_init"]:::middleNode
global_pre_init["global_pre_init"]:::middleNode
common_preinit["common_preinit"]:::middleNode
CephContext_Ctor["CephContext::CephContext"]:::middleNode
MempoolObs_Ctor["MempoolObs::MempoolObs"]:::middleNode
common_init_finish["common_init_finish"]:::middleNode
CephContext_start_service["CephContext::start_service_thread"]:::middleNode
AdminSocket_init["AdminSocket::init"]:::middleNode
Messenger_create["Messenger::create"]:::middleNode
AsyncMessenger_Ctor["AsyncMessenger::AsyncMessenger"]:::middleNode
MDSDaemon_init["MDSDaemon::init"]:::middleNode
MonClient_init["MonClient::init"]:::middleNode
MDSDaemon_set_up_admin["MDSDaemon::set_up_admin_socket"]:::middleNode
Messenger_add_dispatcher["Messenger::add_dispatcher_head"]:::middleNode
Messenger_ready["Messenger::ready(纯虚函数)"]:::middleNode
AsyncMessenger_ready["AsyncMessenger::ready"]:::middleNode
DispatchQueue_start["DispatchQueue::start"]:::middleNode
create_dispatch_thread["创建 dispatch_thread 线程"]:::middleNode
DispatchQueue_entry["DispatchQueue::entry"]:::middleNode
Messenger_deliver["Messenger::ms_deliver_dispatch"]:::middleNode
MDSDaemon_dispatch2["MDSDaemon::ms_dispatch2"]:::middleNode
MDSDaemon_handle_core["MDSDaemon::handle_core_message"]:::middleNode
handle_mds_map["MDSDaemon::handle_mds_map"]:::middleNode
create_MDSRankDispatcher["创建MDSRankDispatcher对象"]:::middleNode
MDSRankDispatcher_init["MDSRankDispatcher::init"]:::middleNode
Objecter_init["Objecter::init"]:::middleNode
%% 最终节点
register_command["register_command"]:::finalNode
%% 调用关系 - 主要长链路(中间)
main --> global_init
global_init --> global_pre_init
global_pre_init --> common_preinit
common_preinit --> CephContext_Ctor
CephContext_Ctor --> register_command
%% 左侧较短链路
main --> common_init_finish
common_init_finish --> CephContext_start_service
CephContext_start_service --> AdminSocket_init
AdminSocket_init --> register_command
main --> Messenger_create
Messenger_create --> AsyncMessenger_Ctor
AsyncMessenger_Ctor --> register_command
%% 右侧较短链路
main --> MDSDaemon_init
MDSDaemon_init --> MonClient_init
MonClient_init --> register_command
MDSDaemon_init --> MDSDaemon_set_up_admin
MDSDaemon_set_up_admin --> register_command
%% 中间最长链路(继续)
MDSDaemon_init --> Messenger_add_dispatcher
Messenger_add_dispatcher --> Messenger_ready
Messenger_ready --> AsyncMessenger_ready
AsyncMessenger_ready --> DispatchQueue_start
DispatchQueue_start --> create_dispatch_thread
create_dispatch_thread --> DispatchQueue_entry
DispatchQueue_entry --> Messenger_deliver
Messenger_deliver --> MDSDaemon_dispatch2
MDSDaemon_dispatch2 --> MDSDaemon_handle_core
MDSDaemon_handle_core --> handle_mds_map
handle_mds_map --> create_MDSRankDispatcher
create_MDSRankDispatcher --> MDSRankDispatcher_init
MDSRankDispatcher_init --> Objecter_init
Objecter_init --> register_command
%% 额外的调用关系
CephContext_Ctor --> MempoolObs_Ctor
MempoolObs_Ctor --> register_command
%% 添加注释说明
linkStyle 0,1,2,3 stroke:#01579b,stroke-width:1px
linkStyle 4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23 stroke:#e65100,stroke-width:1px2.2、命令执行流程
2.2.1、admin socket 初始化流程
MON/MGR/OSD/MDS 等组件 admin socket 的命令执行流程基本一致。大致就是创建对应的 Unix Socket 以及对应的处理线程,然后等待接受命令并执行。
执行流程:
- 函数调用链路: main -> common_init_finish -> CephContext::start_service_thread -> AdminSocket::init;
- 其中,AdminSocket::init 函数内部创建 Unix Socket ,并启动 AdminSocket::entry 线程(线程名为 admin_socket);
- 之后 common_init_finish 函数在调用 CephContext::start_service_thread 函数之后,再根据需要调整对应 Unix Socket 的 owner 和 mode ;
初始化流程示意图:
sequenceDiagram
participant main as main函数
participant common as common_init_finish
participant ceph as CephContext::start_service_thread
participant admin as AdminSocket::init
participant socket as Unix Socket
participant thread as admin_socket线程
Note over main,thread: Admin Socket 初始化时序
main->>common: 调用
activate common
common->>ceph: start_service_thread()
activate ceph
ceph->>admin: init()
activate admin
admin->>socket: 创建Unix Socket
activate socket
socket-->>admin: Socket创建成功
deactivate socket
admin->>thread: 启动AdminSocket::entry线程
activate thread
admin-->>ceph: 返回
deactivate admin
ceph-->>common: 返回
deactivate ceph
%% common_init_finish内部继续执行权限调整
Note over common: common_init_finish内部继续执行
common->>common: 调整Socket owner
common->>common: 调整Socket mode
Note over common: 权限调整完成
%% 线程开始工作(与权限调整并行)
thread->>thread: 等待接受命令并执行
Note over thread: 线程持续运行
common-->>main: 返回
deactivate common
Note over main,thread: 初始化完成,等待命令2.2.2、接收并执行命令流程
我们可以在 AdminSocket::entry 函数中看到使用 poll 同时监听了两个文件描述符:m_sock_fd(对外服务的Unix域套接字,接受客户端连接),m_wakeup_rd_fd(管道或eventfd的读端,用于内部线程间通信,触发 do_tell_queue 处理)。两者有不同的执行链路。
两个fd的对比:
| 特性 | m_sock_fd (外部路径) | m_wakeup_rd_fd (内部路径) |
|---|---|---|
| 触发源 | 外部管理工具 | Ceph内部组件线程 |
| 协议 | asok协议(JSON/二进制) | 内部消息协议(MCommand等) |
| 执行模式 | 同步等待结果 | 异步回调 |
| 响应方式 | 通过同一socket返回 | 通过原消息连接返回 |
| 主要用途 | 运维管理、监控 | 集群内部控制、状态同步 |
| 处理函数 | do_accept() | do_tell_queue() |
| 队列机制 | 无队列,直接处理 | 使用 tell_queue 缓冲 |
m_sock_fd - 外部客户端请求处理时序图:
sequenceDiagram
participant Client as 外部客户端<br/>(ceph daemon)
participant Socket as Unix Socket<br/>(/var/run/ceph/*.asok)
participant AdminSocket as AdminSocket::entry()
participant do_accept as do_accept()
participant ExecCmd as execute_command<br/>(同步版本)
participant Hook as 命令钩子<br/>(hook->call_async)
participant Formatter as 格式化器
Note over Client,Formatter: 外部请求处理流程
Client->>Socket: 1. connect() 建立连接
AdminSocket->>AdminSocket: 2. poll() 检测到m_sock_fd有POLLIN事件
AdminSocket->>do_accept: 3. 调用do_accept()
do_accept->>Socket: 4. accept() 接受连接
Socket-->>do_accept: 5. 返回connection_fd
Client->>do_accept: 6. 发送命令数据<br/>(JSON或旧协议)
loop 读取完整命令
do_accept->>do_accept: 7. safe_recv() 逐字节读取
alt 遇到终止符(\n或\0)
do_accept->>do_accept: 8. 构建命令字符串c
else 缓冲区溢出
do_accept->>Socket: 关闭连接并返回错误
end
end
do_accept->>ExecCmd: 9. 调用execute_command(c)
ExecCmd->>ExecCmd: 10. 创建锁/条件变量<br/>等待异步完成
ExecCmd->>Hook: 11. 异步调用hook->call_async()
Hook->>Hook: 12. 执行实际命令逻辑
Hook-->>ExecCmd: 13. 回调返回结果(rval, err, out)
ExecCmd-->>do_accept: 14. 返回执行结果
do_accept->>Formatter: 15. 格式化错误信息(如果需要)
do_accept->>Client: 16. 发送响应长度(htonl(out.length()))
do_accept->>Client: 17. 发送响应数据(out.send_fd())
do_accept->>Socket: 18. closesocket(connection_fd)
Client->>Client: 19. 解析并显示结果m_wakeup_rd_fd - 内部线程唤醒处理时序图:
sequenceDiagram
participant Internal as 内部线程<br/>(如OSD线程)
participant Queue as tell_queue<br/>tell_legacy_queue
participant WakePipe as 唤醒管道<br/>(pipe/eventfd)
participant AdminSocket as AdminSocket::entry()
participant do_tell_queue as do_tell_queue()
participant ExecCmd as execute_command<br/>(异步版本)
participant Hook as 命令钩子<br/>(hook->call_async)
participant Reply as 回复消息构造
Note over Internal,Reply: 内部唤醒处理流程
Internal->>Queue: 1. 获取tell_lock锁
Internal->>Queue: 2. 将MCommand/MMonCommand<br/>放入对应队列
Internal->>Queue: 3. 释放tell_lock锁
Internal->>WakePipe: 4. 向m_wakeup_wr_fd写入1字节
AdminSocket->>AdminSocket: 5. poll() 检测到m_wakeup_rd_fd有POLLIN事件
AdminSocket->>WakePipe: 6. safe_recv() 读取唤醒字节
AdminSocket->>do_tell_queue: 7. 调用do_tell_queue()
do_tell_queue->>Queue: 8. 获取tell_lock锁
do_tell_queue->>Queue: 9. 交换队列内容到局部变量
do_tell_queue->>Queue: 10. 释放tell_lock锁
loop 处理每个队列中的消息
do_tell_queue->>ExecCmd: 11. 调用execute_command(异步版本)
ExecCmd->>Hook: 12. 异步调用hook->call_async()
Hook->>Hook: 13. 执行实际命令逻辑
Hook-->>ExecCmd: 14. 回调返回结果(rval, err, outbl)
ExecCmd-->>do_tell_queue: 15. 在回调中构造回复消息
alt MCommand类型
do_tell_queue->>Reply: 16. 创建MCommandReply
else MMonCommand类型
do_tell_queue->>Reply: 17. 创建MMonCommandAck
end
do_tell_queue->>Internal: 18. 通过原连接发送回复消息<br/>(m->get_connection()->send_message())
end
Note over AdminSocket: 处理完成后继续poll()等待三、Monitor Commands 分析
3.1、命令注册流程
当执行 ceph -h 命令后,其实是向 Monitor 发送了 get_command_descriptions 命令, 对应的处理函数为 Monitor::handle_command ,相关的处理代码如下。可以看到其中含有了 mon 的 leader_mon_commands 和 mgr 的一些命令。
因此 Monitor Commands 包含两部分:
- Monitor 相关的命令: 其中包括操作 Ceph 集群的各种类型的命令;
- Manager 上报的命令: Manager 通过与 Monitor 进行通信,上报的关于 Manager 自身以及其内部启用的 Python 模块的命令;
|
3.1.1、Monitor 相关的命令
leader_mon_commands 的设置流程:
- Monitor::Monitor 构造函数内部,使用 src/mon/MonCommands.h 中定义组合而成的 mon_commands 变量,来设置 leader_mon_commands 变量(这个地方的含义是,在选举完成前,我们暂时接受所有命令。这仅意味着在选举期间我们不会以 EINVAL 错误拒绝命令;任何真正重要的命令都会等待我们获得法定人数等条件满足后,重新尝试(并重新验证)。
- 当 Monitor 赢得选举的时候,使用 Monitor::set_leader_commands 函数来设置 leader_mon_commands 变量;
3.1.2、Manager 上报的命令
Mgr 向 Mon 发送命令描述信息的两个触发方式:
- 主动触发:
- 介绍: 当 mgr 初始化的时候就会向 mon 发送 MSG_MGR_BEACON 类型的消息(其中包含 mgr 和 py_module_registry 的命令信息) ;
- 函数调用链路: MgrStandby::init -> MgrStandby::tick -> MgrStandby::send_beacon -发送 MSG_MGR_BEACON 类型的消息-> mon ;
- 被动触发:
- 介绍: 当 mgr 收到 MSG_MGR_MAP 类型的消息后,会向 mon 发送 MSG_MGR_BEACON 类型的消息;
- 函数调用链路: MgrStandby::ms_dispatch2 -收到 MSG_MGR_MAP 消息-> MgrStandby::handle_mgr_map -> MgrStandby::send_beacon -发送 MSG_MGR_BEACON 类型的消息-> mon ;
Manager 主动向 Monitor 注册命令描述信息的流程示意图:
sequenceDiagram
participant MgrStandby
participant Monitor
participant MgrMonitor
participant Paxos
participant Storage
%% 触发路径
Note over MgrStandby: 触发条件
MgrStandby->>MgrStandby: send_beacon()
MgrStandby->>Monitor: MSG_MGR_BEACON
Note over MgrStandby,Monitor: 消息包含command_descs命令信息
%% Monitor接收解析
Monitor->>Monitor: decode_message()
Note over Monitor: 解析出MMgrBeacon.command_descs
%% 准备阶段
Monitor->>MgrMonitor: prepare_beacon()
MgrMonitor->>MgrMonitor: get_command_descs()
Note over MgrMonitor: command_descs → pending_command_descs
%% 写入存储
MgrMonitor->>Storage: 事务写入command_descs_prefix
Note over MgrMonitor: 包含command_descs信息
Note over MgrMonitor: 添加C_Committed回调
%% 提交刷新
Note over MgrMonitor: C_Committed::finish()
MgrMonitor->>Paxos: commit_finish()
Paxos->>Monitor: refresh_from_paxos()
Monitor->>MgrMonitor: update_from_paxos()
MgrMonitor->>Storage: 读取command_descs_prefix
Storage-->>MgrMonitor: command_descs数据3.2、命令执行流程
3.2.1、Client 侧执行流程
函数调用链路:
|
validate_command 函数执行流程:
- 评分筛选阶段: 从所有命令签名中筛选出与输入参数最匹配的候选命令;
- 匹配度计算: 使用 matchnum() 计算匹配度(支持部分匹配);
- 完全匹配优先: 完全匹配的命令获得额外 0.5 加分;
- 分数阈值: 只保留达到当前最高分的命令;
- 多候选收集: 相同分数的命令都保留
- 排序阶段: 对候选命令进行排序,优化后续验证效率;
- 评分函数: grade() 计算命令的必需参数总数;
- 排序规则:按必需参数数量升序排列;
- 精确验证阶段: 对排序后的候选命令进行精确验证,找到真正匹配的命令;
- 顺序尝试: 按排序顺序逐一验证;
- 短路优化: 一旦验证成功立即退出循环;
- 结果处理阶段: 根据验证结果提供反馈并返回最终结果;
- 成功情况: 返回解析后的参数字典;
- 失败情况: 命令找到但参数错误(显示具体错误和命令帮助), 完全无匹配:显示最接近的10个命令(过滤废弃/隐藏命令);
validate_command 函数执行流程示意图:
sequenceDiagram
participant U as 用户
participant VC as validate_command
participant M as matchnum
participant V as validate
participant S as 系统输出
U->>VC: 调用(sigdict, args, verbose)
Note over VC: 阶段1: 评分筛选
loop 遍历sigdict中每个cmd
VC->>M: matchnum(args, sig, partial=True)
M-->>VC: 返回分数matched
VC->>M: matchnum(args, sig, partial=False)
M-->>VC: 返回完全匹配分数
alt 完全匹配奖励
VC->>VC: matched += 0.5
end
alt 更新最佳匹配
VC->>VC: 更新best_match_cnt和bestcmds
end
alt verbose模式
VC->>S: 输出"better match: X > Y: command"
end
end
Note over VC: 阶段2: 排序
VC->>VC: 按必需参数数量排序bestcmds
alt verbose模式
VC->>S: 输出"bestcmds_sorted:"和列表
end
Note over VC: 阶段3: 精确验证
loop 遍历bestcmds_sorted
VC->>V: validate(args, sig, flags)
alt 验证成功
V-->>VC: 返回valid_dict
VC->>VC: found = cmd, break循环
else 验证失败
V--xVC: 抛出异常
alt ArgumentPrefix异常
VC->>VC: 继续尝试下一个
else ArgumentMissing异常
VC->>VC: 记录ex, 如果只有一个候选则found=cmd
VC->>VC: break循环
else ArgumentTooFew异常
VC->>VC: verbose时输出提示,继续尝试
else ArgumentError异常
VC->>VC: 记录ex, found = cmd
VC->>VC: break循环
end
end
end
Note over VC: 阶段4: 结果处理
alt found不为空
alt valid_dict不为空
VC-->>U: 返回valid_dict
else
VC->>S: 输出"Invalid command:"和ex
VC->>S: 输出命令帮助
VC-->>U: 返回空字典
end
else
VC->>VC: 过滤已弃用/隐藏命令
VC->>VC: 限制最多10个建议
VC->>S: no valid command found. Closest matches:
loop 遍历bestcmds
VC->>S: 输出命令签名
end
VC-->>U: 返回空字典
endmatchnum 函数执行流程:
- 从左到右扫描用户输入;
- 按模板顺序尝试匹配每个参数;
- 必需参数是关卡:失败就结束,成功就加分;
- 可选参数是通道:失败就跳过,成功就通过。注意:如果可选参数匹配失败了,则下一次匹配的时候还会匹配当前命令的这个参数(因为把匹配失败的当前参数又放回了队列头部),相当于实现非贪婪匹配机制(可选参数失败时回退,让其他规则尝试);
- 最后返回:成功闯过了多少必需参数关卡,就返回多少数值;
3.2.2、Server 侧执行流程
执行流程详解:
- 基础验证 :
- 命令类型断言 : 确保请求是命令类型;
- FSID 校验 : 验证消息中的集群 FSID 与当前 Monitor 的集群 ID 匹配,防止跨集群操作;
- 会话检查 : 确保命令来自一个已建立的客户端会话,丢弃“游离”消息;
- 命令非空检查 : 确保命令内容不为空;
- 命令解析与验证 :
- JSON 解析 : 将客户端发送的 JSON 格式命令字符串解析为内部的 cmdmap(命令参数映射);
- 提取前缀 : 从 cmdmap 中获取命令的 prefix(例如 “osd dump”, “status”), prefix 是命令的唯一标识符;
- 特殊命令处理 : “get_command_descriptions” 命令用于获取所有支持的命令描述,直接在此处理并返回;
- 命令查找与兼容性检查
- 模块提取 : 从 prefix 中提取第一个单词作为命令模块(如 osd, mds, mon);
- 三层命令查找 :
- Leader 命令集 (leader_mon_commands) : 集群领导者支持的所有命令;
- Mgr 命令集 (mgrmon()->get_command_descs()) : 由 Ceph Manager 模块管理的命令;
- 本地命令集 (get_local_commands) : 当前 Monitor 实例支持的命令(考虑特性兼容性);
- 领导者/追随者逻辑处理 : 如果当前 Monitor 不是领导者,则通常可以转发命令给领导者,但是如果执行的命令不在本地命令集中且命令不允许转发或者与领导者的命令不兼容,则不转发并返回错误;
- 权限与审计 :
- 废弃命令检查 : 拒绝执行已废弃或过时的命令;
- 权限验证 (_allowed_command): 基于客户端的会话、身份(entity_name,如 client.admin)和能力(Capabilities)验证其是否有权执行该命令,涉及 ‘w’(写)或 ‘x’(执行)权限的命令被视为“读写命令”;
- 审计日志 : 所有命令(除少数配置命令外)都会被记录到审计日志 (audit_clog) 中,区分读写操作的日志级别;
- 命令路由与分发 : 这是函数的主体,根据 prefix 或 module 将命令分发给对应的 Monitor 子服务 或直接处理;
- 代理到 Manager : 如果命令标记为 is_mgr(),则代理给 mgr_client 执行,并设有字节数配额防止过载;
- 分发给专用监视器 : 执行对应的 PaxosService::dispatch 函数;
- mdsmon() : 处理 mds 或 fs 相关命令;
- osdmon() : 处理 osd、pg map、pg repeer 命令;
- configmon() : 处理 config 配置命令;
- monmon() : 处理大部分 mon 命令(除明确列出的几个);
- healthmon() : 处理 health 子命令;
- authmon() : 处理 auth 认证和授权命令;
- logmon() : 处理 log 日志命令;
- kvmon() : 处理 config-key 键值存储命令;
- mgrmon() : 处理 mgr 管理器命令;
- nvmegwmon() : 处理 nvme-gw 网关命令;
- Monitor 自身处理的命令 :一系列核心的、跨模块的或状态查询命令由 Monitor 类直接处理;这是函数中 else if 链最长的部分,包括:
- fsid : 返回集群 FSID;
- mon scrub : 启动 Monitor 存储区清理;
- time-sync-status : 返回集群时间同步状态;
- status / health / df : 返回集群状态、健康详情或使用情况;
- report : 生成包含集群各方面信息的详细报告;
- node ls : 列出各种类型的守护进程节点;
- features : 列出集群中激活的特性;
- mon metadata / versions / count-metadata : 查询 Monitor 元数据或版本信息;
- quorum_status : 返回仲裁状态;
- mon ok-to-stop / ok-to-add-offline / ok-to-rm : 安全性地检查 Monitor 节点操作;
- version / versions : 返回 Ceph 版本信息;
简要执行流程图:
sequenceDiagram
participant C as Client
participant M as Monitor
participant L as Leader
participant S as SubService
C->>M: MMonCommand 请求
Note over M: 1. 基础验证
M->>M: FSID/会话/命令检查
Note over M: 2. 命令解析
M->>M: 解析JSON,提取prefix
alt 特殊命令
M->>C: 直接返回
end
Note over M: 3. 领导者检查
alt M不是领导者
M->>L: 转发请求
L->>C: 处理并返回
end
Note over M: 4. 权限检查
M->>M: 权限验证
Note over M: 5. 命令分发
alt Mgr命令
M->>M: 代理到Manager
else 模块命令
M->>S: 分发到子服务
S->>M: 返回结果
else Monitor命令
M->>M: 直接处理
end
M->>C: 返回结果

