跳到主要内容

内核链表创建线程池

· 阅读需 9 分钟
📺尚宇

使用内核链表实现线程池的接口设计

1. 基础知识

关于链表和线程的使用方法可以查看我的技术笔记:数据结构系统编程 ,在这两个页面都可以找到链表和线程的一些基础知识和使用示例。

在 Linux 系统中,线程池是一种用于管理和调度线程的机制。它由一组可以复用的线程组成,这些线程被保存在线程池中,并等待分配任务。

当一个新的任务到达时,线程池会从其中选择一个线程来执行该任务。如果所有线程都正在执行任务,则新任务将被放在队列中等待,直到有线程可用。

使用线程池的优点包括:

  • 可以更有效地利用 CPU 资源,因为线程可以在任务之间复用。
  • 可以更容易地管理线程,因为所有线程都在线程池中。
  • 可以更容易地维护线程的数量,因为可以限制线程池的大小。

线程池的实现原理可以参考这篇笔记:线程池

内核链表的使用方法可以参考这篇笔记:内核链表

下面我们来看看怎么通过内核链表跟线程的组合实现线程池。

2. 线程池接口设计

2.1 线程结构体

  • 接口文档
原型struct thread
功能描述线程节点,包含线程ID,通过内核链表连成一个线程队列,方便后续进行扩展
成员列表pthread_t id; // 线程 ID
struct list_head list; // 小结构体
备注线程队列最终是形成一条内核链表
  • 代码实现
// 线程池中的线程
struct thread {
pthread_t id; // 线程 ID
struct list_head list; // 小结构体
};

2.2 任务结构体

  • 接口文档
原型struct task
功能描述任务节点,包含需要执行的函数及其参数,通过链表连成一个任务队列
成员列表void *(*func)(void *arg);
void *arg;
struct list_head list;
备注任务队列最终是形成一条内核链表
  • 代码实现
// 线程池中的任务
struct task {
void *(*func)(void *); // 任务函数
void *arg; // 任务函数的参数
struct list_head list; // 小结构体
};

2.3 线程池结构体

  • 接口文档
原型thread_pool
功能描述包含线程池的所有信息
成员列表struct list_head threads; // 线程队列
struct list_head queue; // 任务队列
pthread_mutex_t lock; // 互斥锁
pthread_cond_t cond; // 条件变量
unsigned active_threads; // 线程队列中的线程数量
unsigned waiting_tasks; // 任务链队列中等待的任务个数
bool shutdown; // 线程池销毁标记
备注活跃线程个数由用户自行定义,但至少包含一条活跃线程
  • 代码实现
struct thread_pool {
struct list_head threads; // 线程队列
struct list_head queue; // 任务队列
pthread_mutex_t lock; // 互斥锁
pthread_cond_t cond; // 条件变量
unsigned active_threads; // 线程队列中的线程数量
unsigned waiting_tasks; // 任务链队列中等待的任务个数
bool shutdown; // 线程池销毁标记
};

2.4 线程池初始化

  • 接口文档
原型void thread_pool_init(struct thread_pool *pool, int num_threads)
功能描述创建一个新的线程池,包含 num_threads 个活跃线程
参数pool: 线程池指针
num_threads: 初始活跃线程个数(大于等于1)
返回值
所在头文件thread_pool.h
备注线程池最少线程个数为1,最大值不超过 MAX_ACTIVE_THREADS
  • 代码实现
void thread_pool_init(struct thread_pool *pool, int num_threads)
{
int i;

// 检查 num_threads 的值是否超过了最大值
if (num_threads > MAX_ACTIVE_THREADS) {
num_threads = MAX_ACTIVE_THREADS;
}

pool->waiting_tasks = 0; //初始化任务队列中等待的任务个数
pool->active_threads = num_threads; //初始化活跃线程个数
pool->shutdown = false; //初始化线程池销毁标记

INIT_LIST_HEAD(&pool->threads); // 初始化线程队列
INIT_LIST_HEAD(&pool->queue); // 初始化任务队列

// 是否初始化成功
if (!list_empty(&pool->queue) || !list_empty(&pool->threads)) {
fprintf(stderr, "init fail.\n");
exit(1);
}

pthread_mutex_init(&pool->lock, NULL);
pthread_cond_init(&pool->cond, NULL);

// 创建线程池中的线程
for (i = 0; i < num_threads; i++) {
struct thread *thread = (struct thread *)malloc(sizeof(struct thread));

INIT_LIST_HEAD(&thread->list);
// 使用 pthread_create 创建线程
int ret = pthread_create(&thread->id, NULL, thread_pool_main, pool);
if (ret != 0) {
perror("create threads error");
exit(1);
}

// 将新创建的线程添加到线程池中
list_add_tail(&thread->list, &pool->threads);
}
}

2.5 投送任务

  • 接口文档
原型void thread_pool_add_task(struct thread_pool *pool, void *(*func)(void *), void *arg)
功能描述往线程池投送任务
参数pool: 线程池指针
func: 投送至线程池的执行例程
arg: 执行例程 func 的参数,若该执行例程不需要参数可设置为NULL
返回值
所在头文件thread_pool.h
备注任务队列中最大任务个数为 MAX_WAITING_TASKS
  • 代码实现
void thread_pool_add_task(struct thread_pool *pool, void *(*func)(void *), void *arg)
{
// 分配内存给新任务
struct task *new_task = (struct task *) malloc(sizeof(struct task));

if(new_task == NULL) {
perror("allocate memory error");
exit(1);
}

// 初始化任务节点
new_task->func = func;
new_task->arg = arg;

// 获取锁
pthread_mutex_lock(&pool->lock);
// 超过最大任务

if(pool->waiting_tasks >= MAX_WAITING_TASKS) {
pthread_mutex_unlock(&pool->lock);

fprintf(stderr, "too many tasks.\n");
free(new_task);

exit(1);
}

// 将新任务添加到任务队列的末尾
list_add_tail(&new_task->list, &pool->queue);

// 新任务加一
pool->waiting_tasks++;

// 唤醒一个线程,让它开始执行新任务
pthread_cond_signal(&pool->cond);

// 释放锁
pthread_mutex_unlock(&pool->lock);
}

2.6 增加活跃线程

  • 接口文档
原型void thread_pool_add_thread(struct thread_pool *pool, int num_threads)
功能描述增加线程池中活跃线程的个数
参数pool: 需要增加线程的线程池指针
num_threads: 新增线程个数
返回值
所在头文件thread_pool.h
备注增加的活跃线程的数量和之前初始化的线程数量不能大于 MAX_ACTIVE_THREADS
  • 代码实现
void thread_pool_add_thread(struct thread_pool *pool, int num_threads)
{
int i;

// 检查 num_threads 的值是否超过了最大值
if (num_threads > MAX_ACTIVE_THREADS - pool->active_threads) {
num_threads = MAX_ACTIVE_THREADS - pool->active_threads;
}

// 创建新线程
for (i = 0; i < num_threads; i++) {
struct thread *thread = (struct thread *)malloc(sizeof(struct thread));

INIT_LIST_HEAD(&thread->list);
// 使用 pthread_create 创建线程
int ret = pthread_create(&thread->id, NULL, thread_pool_main, pool);
if (ret != 0) {
perror("create threads error");
exit(1);
}

// 将新创建的线程添加到线程池中
list_add_tail(&thread->list, &pool->threads);
}

// 更新线程池中的活跃线程数
pool->active_threads += num_threads;
}

2.7 删除线程

  • 接口文档
原型void thread_pool_remove_thread(struct thread_pool *pool, int num_threads);
功能描述删除线程池中活跃线程的个数
参数pool: 需要删除线程的线程池指针
num_threads: 要删除的线程个数,该参数设置为0时直接返回当前线程池线程总数,对线程池不造成任何其它影响
返回值
所在头文件thread_pool.h
备注1,线程池至少会存在1条活跃线程
2,如果被删除的线程正在执行任务,则将等待其完成任务之后删除
  • 代码实现
void thread_pool_remove_thread(struct thread_pool *pool, int num_threads)
{
int i;
for (i = 0; i < num_threads; i++) {
// 从线程池中获取最后一个线程
struct thread *thread = list_last_entry(&pool->threads, struct thread, list);

// 使用 pthread_cancel 取消线程
int ret = pthread_cancel(thread->id);
if (ret != 0) {
fprintf(stderr, "pthread_cancel failed: %s\n", strerror(ret));
}

// 检查线程是否正在执行任务
if (pool->waiting_tasks > 0) {
// 如果线程正在执行任务,则等待它完成
pthread_join(thread->id, NULL);
}

// 从线程池中删除线程
list_del(&thread->list);
free(thread);

// 更新线程池中的线程数
pool->active_threads--;
}
}

2.8 销毁线程池

  • 接口文档
原型void thread_pool_destroy(thread_pool *pool);
功能描述阻塞等待所有任务完成,然后立即销毁整个线程池,释放所有资源和内存
参数pool:将要销毁的线程池
返回值成功返回true,失败返回false
所在头文件thread_pool.h
备注
  • 代码实现
void thread_pool_destroy(struct thread_pool *pool)
{
pool->shutdown = true; //线程池的销毁标记
pthread_cond_broadcast(&pool->cond); //唤醒所有线程
int i;
// 销毁线程池中的所有线程
for (i = 1; i <= pool->active_threads; i++) {
struct thread *thread = list_entry(pool->threads.next, struct thread, list);

// 使用 pthread_join 等待线程结束
int ret = pthread_join(thread->id, NULL);
if(ret != 0) {
printf("join id[%d] error: %s\n", i, strerror(errno));
} else {
printf("[%ld] is joined, i=%d\n", thread->id, i);
}
// 从线程池中删除已退出的线程
list_del(&thread->list);
free(thread);
}

// 重置任务队列
INIT_LIST_HEAD(&pool->queue);
// 销毁锁和条件变量
pthread_mutex_destroy(&pool->lock);
pthread_cond_destroy(&pool->cond);

// 释放线程池占用的内存
// free(pool);
}

2.9 线程入口函数

  • 用来执行任务的线程函数:
void *thread_pool_main(void *arg)
{
struct thread_pool *pool = (struct thread_pool *) arg;
struct task *task;

while (1) {
// 访问任务队列前加锁,为防止取消后死锁,注册处理例程 handle
pthread_cleanup_push(handler, (void *)&pool->lock); //防止死锁
// 获取锁
pthread_mutex_lock(&pool->lock);

// 如果任务队列为空,则阻塞等待
while (list_empty(&pool->queue) && !pool->shutdown) {
pthread_cond_wait(&pool->cond, &pool->lock);
}

//如果任务为空,线程池被销毁,则立即释放互斥锁并退出
if(list_empty(&pool->queue) && pool->shutdown) {
pthread_mutex_unlock(&pool->lock);
pthread_exit(NULL);
}

// 将队列中的第一个任务取出
task = list_first_entry(&pool->queue, struct task, list);
list_del(&task->list);
pool->waiting_tasks--; // 任务数量减1
// 释放锁
pthread_mutex_unlock(&pool->lock);
// 任务取走,解锁,并弹栈 handle(但不执行它)
pthread_cleanup_pop(0);

//执行任务期间拒绝取消请求
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
task->func(task->arg); // 执行任务
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);

free(task); //释放资源
}
pthread_exit(NULL);
}
  • 注册死锁处理函数
static void handler(void *arg)
{
// 响应取消请求之后自动处理:释放互斥锁
pthread_mutex_unlock((pthread_mutex_t *)arg);
}

3. 使用示例

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "thread_pool.h"

// 任务1
void *task(void *arg)
{
printf("Task 1 is running...\n");

// 任务1休眠一段时间
sleep(1);

printf("Task 1 is done.\n");

//pthread_exit(NULL);
}

int main(int argc, char *argv[])
{
// 创建线程池
struct thread_pool pool;
thread_pool_init(&pool, 4);
printf("当前线程数为:%u\n", pool.active_threads);
sleep(1);

// 删除线程
thread_pool_remove_thread(&pool, 2);
printf("当前线程数为:%u\n", pool.active_threads);
sleep(1);

// 添加线程到线程池中
thread_pool_add_thread(&pool, 3);
printf("当前线程数为:%u\n", pool.active_threads);
sleep(1);

// 添加任务到线程池中
thread_pool_add_task(&pool, task, NULL);
sleep(1);
thread_pool_add_task(&pool, task, NULL);
sleep(1);
thread_pool_add_task(&pool, task, NULL);
sleep(1);

// 主线程休眠一段时间,让任务完成
sleep(1);
printf("\033[31m""---------Function: [%s]---------Line: [%d]---------\n""\033[m", __FUNCTION__, __LINE__);
// 清理线程池
thread_pool_destroy(&pool);

printf("\033[31m""---------Function: [%s]---------Line: [%d]---------\n""\033[m", __FUNCTION__, __LINE__);

return 0;
}

4. 源码

想要源码的话,不妨上我的🍍GitHub看看吧!

Loading Comments...