LibMR
是一款适用于 Redis 集群的 Map Reduce
(分发Redis命令并获取结果)的依赖库。它基于 libevent 的事件机制,通过使用多个线程池来分发异步任务,目前已经被 RedisTimeSeries
等模块使用。
一、简介
二、架构设计
2.1、相关命令
由于该依赖库在编译时可以设置自定义的 modulename
,因此可以避免在不同模块中使用的冲突。
- modulename.INNERCOMMUNICATION : 从其他分片中获取消息;
- modulename.HELLO : 获取当前实例的集群id,仅集群模式下才可以调用;
- modulename.REFRESHCLUSTER : 更新当前集群的拓扑信息
- modulename.CLUSTERSET : 强制设置集群的拓扑信息,该信息只会更改该依赖库中记录的拓扑信息,并不会影响实际的 Redis 集群;
- modulename.CLUSTERSETFROMSHARD : 与
*.CLUSTERSET
命令类似,该命令为非强制的,即如果已知的集群信息非空则不会设置;
- modulename.INFOCLUSTER : 返回记录的当前集群的拓扑信息;
- modulename.NETWORKTEST : 向记录的集群的所有节点发送
test msg
消息;
以上命令的实现逻辑基本上都是:
- 阻塞当前调用客户端 ( ``RedisModule_BlockClient` 函数);
- 新增并立刻激活事件,调用相关函数处理相关逻辑;
- 取消阻塞并回复客户端 (
RedisModule_UnblockClient
函数);
2.2、数据结构
2.2.1、全局数据结构
全局的数据结构记录了所有需要执行的任务信息,在使用该依赖库之前必须将其初始化。
相关的初始化函数:
- MR_Init : 初始化全局
mrCtx
结构体;
- MR_RegisterObject : 注册自定义的对象类型,对应
mrCtx
中的 objectTypesDict
成员变量;
- MR_RegisterReader : 注册自定义的读取器,对应
mrCtx
中的 readerDict
成员变量;
typedef struct mr_thpool_* mr_threadpool; struct MRCtx { size_t lastExecutionId; mr_dict* executionsDict; MRObjectType** objectTypesDict;
mr_dict* readerDict; mr_dict* mappersDict; mr_dict* filtersDict; mr_dict* accumulatorsDict; mr_threadpool executionsThreadPool; MRStats stats; } mrCtx;
typedef struct MRStats { size_t nMissedExecutions; size_t nMaxIdleReached; } MRStats;
typedef struct MRObjectType { char* type; size_t id; ObjectFree free; ObjectDuplicate dup; ObjectSerialize serialize; ObjectDeserialize deserialize; ObjectToString tostring; } MRObjectType;
|
2.2.2、任务执行数据结构
引用该依赖库 Redis 模块可以通过调用 MR_Run
函数来执行自定义的任务(向集群中的任意分片发送命令),在任务的执行过程中需要使用到以下的数据结构。
相关的执行函数:
- MR_Run : 外部调用的任务执行入口;
- MR_RunExecution : 任务不在集群中执行时调用;
- MR_ExecutionDistribute : 任务需要在集群中执行时调用;
#define REDISMODULE_NODE_ID_LEN 40 #define ID_LEN REDISMODULE_NODE_ID_LEN + sizeof(size_t) #define STR_ID_LEN REDISMODULE_NODE_ID_LEN + 13
struct Execution { int flags; size_t refCount; char id[ID_LEN]; char idStr[STR_ID_LEN]; Step* steps; pthread_mutex_t eLock; mr_list* tasks;
size_t nRecieved; size_t nCompleted; Record** results; Record** errors;
ExecutionCallbacks callbacks; MR_LoopTaskCtx* timeoutTask; size_t timeoutMS; };
struct Step { int flags; ExecutionBuilderStep bStep; union { MapStep map; FilterStep filter; ReadStep read; CollectStep collect; ReshuffleStep reshuffle; AccumulateStep accumulate; }; size_t index; struct Step* child; };
typedef struct mr_list { mr_listNode *head; mr_listNode *tail; void *(*dup)(void *ptr); void (*free)(void *ptr); int (*match)(void *ptr, void *key); unsigned long len; } mr_list;
typedef struct ExecutionCallbacks { ExecutionCallbackData done; ExecutionCallbackData resume; ExecutionCallbackData hold; } ExecutionCallbacks;
typedef struct ExecutionCallbackData { void* pd; ExecutionCallback callback; } ExecutionCallbackData;
|
2.2.3、线程池数据结构
线程池中包含许多要执行的任务,下面列出了关于线程池中一个任务相关的数据结构。由于多个线程同时消费一个任务队列,因此需要加锁处理。
线程池的相关函数:
- mr_thpool_init : 线程初始化,由函数 MR_Init 调用;
- mr_thpool_add_work : 线程池中新增任务;
- mr_thpool_destroy : 销毁线程池中任务;
typedef struct mr_thread { int id; pthread_t pthread; struct mr_thpool_* thpool_p; } mr_thread;
typedef struct mr_thpool_ { mr_thread** threads; volatile int num_threads_alive; volatile int num_threads_working; pthread_mutex_t thcount_lock; pthread_cond_t threads_all_idle; mr_jobqueue jobqueue; } mr_thpool_;
typedef struct mr_jobqueue { pthread_mutex_t rwmutex; mr_job* front; mr_job* rear; mr_bsem* has_jobs; int len; } mr_jobqueue;
typedef struct mr_job { struct mr_job* prev; void (*function)(void* arg); void* arg; } mr_job;
typedef struct mr_bsem { pthread_mutex_t mutex; pthread_cond_t cond; int v; } mr_bsem;
|
2.2.4、消息发送数据结构
线程池在处理任务时,需要将任务封装成一个 Message 的数据结构,然后发送该消息。
消息发送的相关函数:
- MR_ClusterSendMsgTask : 根据任务类型给对应实例发送任务消息;
- MR_ClusterSendMsgToNode : 给特定节点发送消息,如果节点连接状态异常则会将该消息临时存储与
pendingMessages
列表中;
typedef struct SendMsg { size_t refCount; union { char idToSend[41]; size_t slotToSend; }; SendMsgType sendMsgType; size_t function; char* msg; size_t msgLen; } SendMsg;
typedef enum SendMsgType { SendMsgType_BySlot, SendMsgType_ById, SendMsgType_ToAll } SendMsgType;
|
2.3、任务类型
三、持久化
无持久化的相关逻辑。
四、实践
五、问题与思考