Redis相关库学习 - LibMR

LibMR 是一款适用于 Redis 集群的 Map Reduce (分发Redis命令并获取结果)的依赖库。它基于 libevent 的事件机制,通过使用多个线程池来分发异步任务,目前已经被 RedisTimeSeries 等模块使用。

一、简介

二、架构设计

2.1、相关命令

由于该依赖库在编译时可以设置自定义的 modulename ,因此可以避免在不同模块中使用的冲突。

  • modulename.INNERCOMMUNICATION : 从其他分片中获取消息;
  • modulename.HELLO : 获取当前实例的集群id,仅集群模式下才可以调用;
  • modulename.REFRESHCLUSTER : 更新当前集群的拓扑信息
  • modulename.CLUSTERSET : 强制设置集群的拓扑信息,该信息只会更改该依赖库中记录的拓扑信息,并不会影响实际的 Redis 集群;
  • modulename.CLUSTERSETFROMSHARD : 与 *.CLUSTERSET 命令类似,该命令为非强制的,即如果已知的集群信息非空则不会设置;
  • modulename.INFOCLUSTER : 返回记录的当前集群的拓扑信息;
  • modulename.NETWORKTEST : 向记录的集群的所有节点发送 test msg 消息;

以上命令的实现逻辑基本上都是:

  • 阻塞当前调用客户端 ( ``RedisModule_BlockClient` 函数);
  • 新增并立刻激活事件,调用相关函数处理相关逻辑;
  • 取消阻塞并回复客户端 ( RedisModule_UnblockClient 函数);

2.2、数据结构

2.2.1、全局数据结构

全局的数据结构记录了所有需要执行的任务信息,在使用该依赖库之前必须将其初始化。

相关的初始化函数:

  • MR_Init : 初始化全局 mrCtx 结构体;
  • MR_RegisterObject : 注册自定义的对象类型,对应 mrCtx 中的 objectTypesDict 成员变量;
  • MR_RegisterReader : 注册自定义的读取器,对应 mrCtx 中的 readerDict 成员变量;
// 全局变量
typedef struct mr_thpool_* mr_threadpool;
struct MRCtx {
size_t lastExecutionId; // 执行ID,每创建一个任务时该执行ID加1
mr_dict* executionsDict; // 执行器集合,任务开始时加入,结束时移除
MRObjectType** objectTypesDict; // 自定义的对象集合,开始时初始化,并且后续保持不变

mr_dict* readerDict; // 注册的读集合
mr_dict* mappersDict; // 注册的映射集合
mr_dict* filtersDict; // 注册的过滤器集合
mr_dict* accumulatorsDict; // 注册的累加器集合
mr_threadpool executionsThreadPool; // 线程池指针
MRStats stats; // 执行状态
} mrCtx;

// 全局状态
typedef struct MRStats {
size_t nMissedExecutions; // 找不到对应执行器的计数
size_t nMaxIdleReached; // 执行超时的计数
} MRStats;

// 自定义对象类型
typedef struct MRObjectType {
char* type;
size_t id;
ObjectFree free;
ObjectDuplicate dup;
ObjectSerialize serialize;
ObjectDeserialize deserialize;
ObjectToString tostring;
} MRObjectType;

2.2.2、任务执行数据结构

引用该依赖库 Redis 模块可以通过调用 MR_Run 函数来执行自定义的任务(向集群中的任意分片发送命令),在任务的执行过程中需要使用到以下的数据结构。

相关的执行函数:

  • MR_Run : 外部调用的任务执行入口;
  • MR_RunExecution : 任务不在集群中执行时调用;
  • MR_ExecutionDistribute : 任务需要在集群中执行时调用;
#define REDISMODULE_NODE_ID_LEN  40
#define ID_LEN REDISMODULE_NODE_ID_LEN + sizeof(size_t) // 48个字节
#define STR_ID_LEN REDISMODULE_NODE_ID_LEN + 13

// 执行任务
struct Execution {
int flags; // 特征Flag,目前有两个特征:已经初始化,仅本地执行的命令
size_t refCount; // 引用计数
char id[ID_LEN]; // 执行ID值,前40个字节记录实例ID,后8个字节记录递增的任务ID
char idStr[STR_ID_LEN]; // 执行ID值,格式为 %40s-%lld
Step* steps; // 执行任务的步骤,容量为10
pthread_mutex_t eLock; // 关键步骤所需要的锁
mr_list* tasks; // 执行的任务列表

size_t nRecieved; // 收到的回复计数,相关函数: MR_AckExecution
size_t nCompleted; // 完成计数,相关函数: MR_NotifyDone
Record** results; // 结果集,容量为10
Record** errors; // 错误结果集,容量为10

ExecutionCallbacks callbacks; // 回调函数
MR_LoopTaskCtx* timeoutTask; // 执行超时的任务
size_t timeoutMS; // 任务执行超时时间,默认5s
};

// 执行任务的步骤
struct Step {
int flags;
ExecutionBuilderStep bStep;
union {
MapStep map;
FilterStep filter;
ReadStep read;
CollectStep collect;
ReshuffleStep reshuffle;
AccumulateStep accumulate;
};
size_t index;
struct Step* child;
};

// 执行的任务列表
typedef struct mr_list {
mr_listNode *head;
mr_listNode *tail;
void *(*dup)(void *ptr);
void (*free)(void *ptr);
int (*match)(void *ptr, void *key);
unsigned long len;
} mr_list;

// 执行任务的回调函数
typedef struct ExecutionCallbacks {
ExecutionCallbackData done;
ExecutionCallbackData resume;
ExecutionCallbackData hold;
} ExecutionCallbacks;

typedef struct ExecutionCallbackData {
void* pd;
ExecutionCallback callback;
} ExecutionCallbackData;

2.2.3、线程池数据结构

线程池中包含许多要执行的任务,下面列出了关于线程池中一个任务相关的数据结构。由于多个线程同时消费一个任务队列,因此需要加锁处理。

线程池的相关函数:

  • mr_thpool_init : 线程初始化,由函数 MR_Init 调用;
  • mr_thpool_add_work : 线程池中新增任务;
  • mr_thpool_destroy : 销毁线程池中任务;
// 线程
typedef struct mr_thread {
int id; // 外部指定的线程索引
pthread_t pthread; // 线程指针
struct mr_thpool_* thpool_p; // 线程池指针
} mr_thread;

// 线程池
typedef struct mr_thpool_ {
mr_thread** threads; // 线程指针数组
volatile int num_threads_alive; // 存活的线程数
volatile int num_threads_working; // 工作的线程数
pthread_mutex_t thcount_lock; // 线程信息锁,变更 num_threads_alive/working 时会用到
pthread_cond_t threads_all_idle; // 线程空闲等待条件变量
mr_jobqueue jobqueue; // 任务队列,所有线程共用一个任务队列
} mr_thpool_;

// 线程池任务队列
typedef struct mr_jobqueue {
pthread_mutex_t rwmutex; // 从队列中新增/读取任务时的锁
mr_job* front; // 队列最前方的指针
mr_job* rear; // 队列最后方的指针
mr_bsem* has_jobs; // 二进制信号量,用于标记队列中是否有任务
int len; // 队列中任务数量
} mr_jobqueue;

// 线程池中任务
typedef struct mr_job {
struct mr_job* prev; // 前一个任务指针
void (*function)(void* arg); // 任务的函数指针
void* arg; // 任务的函数指针中函数的参数
} mr_job;

// 二进制信号量
typedef struct mr_bsem {
pthread_mutex_t mutex; // 锁
pthread_cond_t cond; // 条件变量
int v; // 该值为1代表队列中有任务,为0代表队列中无任务
} mr_bsem;

2.2.4、消息发送数据结构

线程池在处理任务时,需要将任务封装成一个 Message 的数据结构,然后发送该消息。

消息发送的相关函数:

  • MR_ClusterSendMsgTask : 根据任务类型给对应实例发送任务消息;
  • MR_ClusterSendMsgToNode : 给特定节点发送消息,如果节点连接状态异常则会将该消息临时存储与 pendingMessages 列表中;
// 发送消息结构体
typedef struct SendMsg {
size_t refCount; // 引用计数,非线程安全
union {
char idToSend[41]; // 目标实例的ID
size_t slotToSend; // 目标实例的SlotID
};
SendMsgType sendMsgType; // 发送消息的类型,目前有三种类型
size_t function; // 执行函数的ID
char* msg; // 消息内容
size_t msgLen; // 消息长度
} SendMsg;

// 发送消息类型
typedef enum SendMsgType {
SendMsgType_BySlot, // 发送给对应SlotID的所在实例
SendMsgType_ById, // 发送给目标实例
SendMsgType_ToAll // 发送给所有实例
} SendMsgType;

2.3、任务类型

三、持久化

无持久化的相关逻辑。

四、实践

五、问题与思考

作者: bugwz
链接: https://bugwz.com/2022/10/22/redislibrary-libmr/
声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 咕咕