本文分享自华为云社区《GaussDB(DWS) 集群通信系列二:stream线程池设计》,作者:半岛里有个小铁盒。
1.前言
适用版本:【8.1.0(及以上)】
GaussDB(DWS)分布式架构的Stream算子作为SQL join操作时频繁发生的执行算子,共存在三种模式:Gather、Redistribute、Broadcast,分别负责CN节点GATHER数据,DN节点REDISTRIBUTE和BROACAST数据。大集群高并发场景下,Stream算子过多可能会导致通信的性能瓶颈,引起性能劣化(2000个stream同时启动,进程初始化耗时从ms级劣化到s级),因此需要尽可能减少Stream算子。但是在某些现场环境下,存在数据倾斜、join查询不包含必要分布键等客观情况,Stream算子无法有效减少,为多表join场景下的查询时延保障带来挑战。因此GaussDB(DWS)对于线程初始化->线程任务执行->线程退出执行的流程方面做了stream线程池优化,减少了线程初始化与线程退出所带来的开销。
2.实现原理
stream线程是临时线程,随query启动和退出,负责stream算子的执行,stream线程初始化和退出都会争抢锁等进程级资源,在stream线程个数无法进一步优化的场景下,需要设计有效方案以减少stream线程初始化和退出的时间代价,将进程初始化耗时稳定在ms级,保障数据库的确定性时延查询。Stream线程池的核心思想是等stream线程执行完计划任务,保留必要且可复用的线程信息,将线程放入线程池中。
线程池中的线程执行过程如上图所示,其具体步骤为:
- 步骤一:线程信息初始化
- 步骤二:线程待唤醒后轻量级初始化(query级初始化)
- 步骤三:线程任务执行
- 步骤四:线程清理
- 返回步骤二:继续等待下条query执行
在返回步骤二时,当线程等待超时、超出线程池容量(最大stream线程个数)、异常时线程已不可用,需要销毁。
其中步骤一中在线程初始化时,需要执行的操作有:线程创建、创建相关内存上下文、信号处理函数注册、内存追踪信息初始化、初始化GUC选项等操作;
步骤二中在线程轻量级/查询级初始化时,需要执行的操作有恢复GUC参数、初始化BackendParams、重置GUC参数等操作。
stream线程池为了高效管理线程的出/入池操作,采用无锁队列实现。定义结构体ThreadSlot保存线程池中每一个线程的信息,包含:线程状态、线程号、线程对应的database oid、线程执行所需的信息StreamProducer、线程唤醒所需的锁和条件变量。
当线程还未被创建时,初始化一定数量的ThreadSlot数量以预留stream线程,这些ThreadSlot被保存在数组threadSlots中。当stream线程执行完毕,需要将stream线程放置到表征可复用线程的无锁队列,称之为idleRing;当线程因为超时、异常等原因不再复用,需要退出时,将stream线程对应的ThreadSlot放置到表征未创建线程的无锁队列,称之为emptyRing。
idleRing的作用是为了快速获取并复用线程池中的线程,emptyRing的作用是快速获取一个未被使用的ThreadSlot结构,以创建一个新的stream线程。由于stream线程的初始化信息和database是强相关的,如果不保留database相关的信息,那么线程初始化的时间代价仍然较高,所以线程池中的线程复用时,需要满足database信息匹配。对于设计线程池而言,每一个database都应该对应一个idleRing。
综上所述,基于无锁队列的stream线程池设计如下所示:
从上图可以看出,一个线程池包含预留stream线程结构的threadSlots、一个表征未创建线程的无锁队列emptyRing和表征可复用线程的无锁队列idleRing,由于每个database对应一个idleRing,因此多个idleRing被组织为链表结构。
3.具体实现机制
3.1 数据结构设计
定义结构体ThreadSlot保存线程池中每一个线程的信息,包含:线程状态、线程号、线程对应的database oid、线程执行所需的信息StreamProducer,StreamProducer是父线程向子线程传递的唯一结构、线程唤醒所需的锁和条件变量。
typedef struct { int status; uint32 idx; ThreadId tid; Oid dbOid; StreamProducer* streamObj; pthread_mutex_t m_mutex; pthread_cond_t m_cond; } ThreadSlot;
定义结构体StreamThreadPool表征线程池,其中size表示线程池中拟预留的ThreadSlot个数,ThreadSlot被保存在threadSlots数组中;无锁队列emptyRing用来保存未创建线程的ThreadSlot,对应地,idleRing用来保存空闲的已创建stream线程的ThreadSlot。结构如下所示:
class StreamThreadPool: public BaseObject { public: StreamThreadPool(); void Init(int num); // streamThreadPool init int Call(StreamProducer* obj); // 获取idle线程 或 create 新线程 bool Wait(); // idle线程等待唤醒或者超时退出 ThreadSlot* GetLocalSlot(); // get streamThreadSlot void SetLocalSlot(int slotIdx); // set streamThreadSlot StreamPool* GetLocalPool(); // 获取streamDBPool 或 新建一个 ThreadSlot* PopSlot(); // 从idleRing/emptyRing获取一slot void PushToEmpty(ThreadSlot* slot); // 将slot直接放入emptyRing void PushToIdle(StreamPool* pool, ThreadSlot* slot); // 将slot直接放入idleRing void LocalPushToIdle(); // 根据状态,将slot放入idleRing void LocalPushToEmpty(); // 根据状态,将slot放入emptyRing int CleanStreamPool(const char *dbName, cleanOption cleanMode); // 根据db信息清线程 void CleanInAllStreamPool(int desNum); // 调整线程池中stream线程个数 int GetStreamNum(); // 获取线程池中stream线程个数 bool Release(); // 判断超时线程是否需要清理 bool TimeoutClean(); // 清理超时idle线程 private: int size; ThreadSlot* threadSlots; ArrayLockFreeQueue emptyRing; StreamPool* PoolListHead; }
定义结构体StreamPool,由于stream线程的初始化信息和database是强相关的,如果不保留database相关的信息,那么线程初始化的时间代价仍然较高,所以线程池中的线程复用时,需要满足database信息匹配,所以一个emptyRing和一个database相匹配,保存在链表PoolListHead中。
typedef struct StreamPool { Oid dbOid; ArrayLockFreeQueue idleRing; struct StreamPool* next; } StreamPool;
综上,我们可以得到各结构间组织的直观图,如下所示:
上图中threadSlots可以放在idleRing(蓝色)、emptyRing(绿色)和运行空间(黄色)中。
3.2 stream线程状态转移DFA设计
每一个记录线程信息的结构ThreadSlot中都保存了线程当前的状态status,记录线程状态的目的是为了保障线程执行过程的有序控制,也可以通过状态的互斥避免threadSlot不会被两个线程同时使用。
stream线程状态转移用确定性有限状态机(DFA,definite automata)表征,共包含4个状态:
STREAM_SLOT_EXIT、STREAM_SLOT_IDLE、STREAM_SLOT_HOLD和STREAM_SLOT_RUN状态。其物理含义如下:
- STREAM_SLOT_EXIT:线程退出状态,表示线程未被创建或线程已退出;
- STREAM_SLOT_IDLE:线程可复用状态,表示线程在idleRing中,可以被复用;
- STREAM_SLOT_HOLD:线程临时独占状态,表示线程在做进入下一个状态的准备工作;
- STREAM_SLOT_RUN:线程运行状态,表示线程正在执行任务。
状态间转移条件如下所示,图中粗箭头表示状态机主循环部分:
与状态对应的,是slot所处的位置,slot所处的位置有三处,分别是idleRing、emptyRing和运行空间,slot从无锁队列中拿出,运行时所处的位置,我们称之为运行空间。各状态所处的位置情况如下所示:
- STREAM_SLOT_EXIT:idleRing(idle线程超时)、emptyRing(初始化或者FATAL);
- STREAM_SLOT_IDLE:idleRing
- STREAM_SLOT_HOLD:运行空间(从无锁队列中取出)、idleRing(idle线程超时或中断);
- STREAM_SLOT_RUN:运行空间。
Slot的位置变化和状态转移的关系如下,图中粗箭头表示状态机主循环部分:
根据各状态所处的位置情况,从idleRing中取出的slot可能有三种状态:EXIT、IDLE、HOLD。当取出IDLE状态的slot,说明线程可复用;当取出EXIT状态的slot,说明线程已退出,此时需要将slot转存到emptyRing;当取出HOLD状态,说明线程正在被使用,此时需要放回idleRing。
EmptyRing中slot的状态只能是EXIT,运行空间中slot的状态要么是HOLD(刚取出还未运行),要么是RUN(正在运行),不再赘述
3.3 单个stream线程执行流程
Stream线程池中stream线程整体执行流程如下图所示:
stream线程初始化仅初始化一次,执行完query之后,便将连接归还到连接池里,循环执行上图中黄色部分的语句,如果有异常则线程退出,连接销毁,slot归还至emptyRing;如果正常执行结束,将连接中内容清理,避免下个连接误用,并将slot归还至idleRing等待下个连接复用。
那么stream线程复用时如何保持参数的一致性呢,对应上图中的set GUC params阶段。父线程保存自己的guc_variables在syncGucVariables中,syncGucVariables是需要传递给stream的结构用以保证父子线程guc参数的一致。然后父线程在初始化streamProducer时将syncGucVariables保存在该结构中传递。Stream线程根据streamProducer初始化自己的syncGucVariables变量,首先reset所有的guc变量,然后根据syncGucVariables修正自己的variables。
4.外部接口
4.1 GUC参数
max_stream_pool:设置stream线程池能够容纳stream线程的最大个数。该参数8.1.2及以上版本支持。默认值为65535。设置为-1表示不开启stream线程池。该参数支持reload更新,更新规则:设置max_stream_pool小于当前可用线程个数,支持线程个数实时减少;当设置max_stream_pool大于当前idle线程个数,将由业务驱动线程个数的增加
4.2 视图
pg_thread_wait_status:展示了集群所有CN/DN进程内的所有线程的实时 等待状态,是定位集群通信问题最重要的视图
其中对于wait_status列状态说明如下:
-
wait stream task:空闲的stream线程;
-
wait node:等待其他DN的数据,需要关注对端状态;
-
flush data:发送数据给其他DN时因为对端buffer满而阻塞;
-
wait cmd:DN上空闲的postgres线程,等待CN的下一个query;
-
none:未定义状态,极有可能是阻塞原因;
-
synchronize quit:同步退出状态,自身任务已完成,在等待同一个query的其他线程一起退出;
5.通过表象看stream线程池逻辑
【场景一】集群基础行为场景——建立多数据库场景
Create database ***;(建立多库)
分别执行带stream算子的查询;
例:create table test_01(c1 int, c2 int)with(orientation=column) distribute by hash(c1);
insert into test_01 select generate_series(1,100), generate_series(1,100);analyze test_01;
select * from test_01 a, test_01 b, test_01 c, test_01 d, test_01 e, test_01 f where a.c2 =b.c2 and c.c2 = d.c2 and e.c2=f.c2 limit 100;
查询结束,查pgxc_thread_wait_status看DN节点:预期stream线程状态为wait thread cond。且多database之间stream线程不复用。
【场景二】集群基础行为场景——建立多用户场景
Create user ***;(建立多用户)
分别执行带stream算子的查询;(参考场景一示例)
查询结束,查pgxc_thread_wait_status看DN节点:预期stream线程状态为wait thread cond。且多user之间stream线程可以复用。
例:用户一执行完查询,视图中显示共有四个stream线程在线程池,用户二执行同样查询返回正确结果,视图中的stream线程个数不变,且线程号也是一致的,则说明复用。
【场景三】集群基础行为场景——线程清理场景
调整guc参数max_stream_pool的值,观测是否生效;预期:当设置max_stream_pool小于当前idle线程个数,支持线程个数实时减少;当设置max_stream_pool大于当前idle线程个数,将由业务驱动线程个数的增加,但是不会超过max_stream_pool。
执行clean connection(ALL force),查看stream线程是否被清理;预期:该database的stream线程被完全清理。
执行drop database命令,查看stream线程是否被清理;预期:该database的stream线程被完全清理。
6.总结
本文详细介绍了stream连接池及其原理,让我们更好的理解GaussDB(DWS)集群通信中数据交互的具体逻辑,对于GaussDB通信运维也具备一定的参考意义。
点击关注,第一时间了解华为云新鲜技术~