多线程编程

头文件:#include<thread>

创建线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <iostream>
#include <thread>

void threadFunction() {
std::cout << "Hello from thread!\n";
}
int main() {
// 创建一个线程并执行 threadFunction
std::thread t(threadFunction);
// 主线程继续执行
std::cout << "Hello from main!\n";
// 等待线程 t 完成(join是一个阻塞等待)
t.join();
return 0;
}

分离线程: 不希望主线程等待子线程完成,可以调用detach()方法。分离后的线程将在后台运行,主线程不再与之关联。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <iostream>
#include <thread>
void threadFunction() {
std::cout << "Hello from thread!\n";
}
int main() {
std::thread t(threadFunction);
// 分离线程
t.detach();
// 主线程继续执行
std::cout << "Hello from main!\n";
// 注意:分离后的线程无法再 join
return 0;
}

传递参数给线程函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <iostream>
#include <thread>
void threadFunction(int x, const std::string& str) {
std::cout << "x = " << x << ", str = " << str << "\n";
}
int main() {
int x = 42;
std::string str = "Hello, World!";
// 创建线程并传递参数
std::thread t(threadFunction, x, str);
t.join();
return 0;
}

std::thread : 提供了一个成员函数 joinable(),用于检查一个线程对象是否可以被 joindetach

std::ref: 创建对象的引用包装器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include<iostream>
#include<thread>
void foo(int& x)
{
x += 1;
std::cout << x << std::endl;
}
int main()
{
int a = 1;
std::thread t(foo, std::ref(a));
t.join();
return 0;
}

子线程调用类的成员函数

直接传递成员函数指针和对象实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <iostream>
#include <thread>
class MyClass {
public:
void memberFunction(int x) {
std::cout << "Member function called with x = " << x << "\n";
}
};
int main() {
MyClass obj;
// 直接传递成员函数指针和对象实例
std::thread t(&MyClass::memberFunction, &obj, 42);
t.join();
return 0;
}

使用 std::shared_ptr 来管理类对象的生命周期,确保在线程执行期间对象不会被销毁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <iostream>
#include <thread>
#include <memory>

class MyClass {
public:
void func() {
std::cout << "Thread " << std::this_thread::get_id()
<< " started" << std::endl;
}
};
int main() {
std::shared_ptr<MyClass> obj = std::make_shared<MyClass>();
std::thread t(&MyClass::func, obj);
t.join();
return 0;
}

互斥量解决多线程数据共享问题

头文件:#include <mutex>

线程安全的定义:如果多线程程序每一次的运行结果和单线程程序的运行结果始终一样,那么就是线程安全的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
#include <thread>
#include <mutex>
int shared_data = 0;
std::mutex mtx;
void func(int n) {
for (int i = 0; i < 100000; ++i) {
mtx.lock(); //加锁
shared_data++;
std::cout << "Thread " << n
<< " increment shared_data to " << shared_data << std::endl;
mtx.unlock(); //解锁
}
}
int main() {
std::thread t1(func, 1);
std::thread t2(func, 2);
t1.join();
t2.join();
std::cout << "Final shared_data = " << shared_data << std::endl;
return 0;
}

互斥量封装类

  • std::lock_guard:作用是在构造时自动锁定互斥锁,在析构时自动解锁互斥锁。

    注:不支持手动控制锁的行为(如延迟锁定、提前解锁等)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    #include <iostream>
    #include <thread>
    #include <mutex>

    std::mutex mtx;

    void printMessage(const std::string& message) {
    std::lock_guard<std::mutex> lock(mtx); // 自动加锁
    std::cout << message << '\n';
    // 自动解锁(当 lock 超出作用域时)
    }
    int main() {
    std::thread t1(printMessage, "Hello from thread 1");
    std::thread t2(printMessage, "Hello from thread 2");
    t1.join();
    t2.join();
    return 0;
    }
  • std::unique_lock: 允许对锁进行更细粒度的控制,例如延迟锁定、手动解锁、尝试锁定等。

    注:相对于 std::lock_guard,开销稍大(因为它需要维护更多的状态)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    #include <iostream>
    #include <thread>
    #include <mutex>

    std::mutex mtx;

    void printMessage(const std::string& message) {
    std::unique_lock<std::mutex> lock(mtx, std::defer_lock); // 延迟锁定
    lock.lock(); // 手动加锁
    std::cout << message << '\n';
    lock.unlock(); // 手动解锁
    // 锁会在 unique_lock 析构时自动解锁(如果尚未解锁)
    }
    int main() {
    std::thread t1(printMessage, "Hello from thread 1");
    std::thread t2(printMessage, "Hello from thread 2");
    t1.join();
    t2.join();
    return 0;
    }
  • std::lock_guardstd::unique_lock 的对比

    image-20250323210830055

std::call_once

std::call_once 用于确保某个函数或代码块在多线程环境中只被执行一次。

std::call_once 的主要作用是:

  • 线程安全的单次执行:确保某个函数或代码块在多个线程中只会被执行一次。
  • 避免竞态条件:在多线程环境下,多个线程可能同时尝试执行某段代码,std::call_once 可以保证只有一个线程能够成功执行,其他线程会等待该操作完成后再继续。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <iostream>
#include <thread>
#include <mutex>
std::once_flag flag; // 定义一个 once_flag 对象
void initialize() {
std::cout << "Initialization function executed by thread "
<< std::this_thread::get_id() << std::endl;
}
void thread_func() {
std::call_once(flag, initialize); // 确保 initialize 函数只被调用一次
}
int main() {
std::thread t1(thread_func);
std::thread t2(thread_func);
std::thread t3(thread_func);
t1.join();
t2.join();
t3.join();
return 0;
}

使用场景:

  • 延迟初始化:在某些情况下,对象的初始化成本较高,或者只有在需要时才进行初始化。可以使用 std::call_once 来确保初始化逻辑只执行一次。
  • 单例模式:在实现线程安全的单例模式时,可以使用 std::call_once 来确保实例化过程只发生一次。

注:std::call_once只能在线程函数中才能使用

条件变量

条件变量(std::condition_variable是一个用于线程间通信的同步原语。允许一个线程等待某个条件成立,而另一个线程可以在该条件满足时通知等待的线程继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>

std::queue<int> q;
std::mutex mtx;
std::condition_variable cond_var;

void producer() {
int data = 0;
while (true) {
std::unique_lock<std::mutex> lock(mtx);
q.push(data++);
cond_var.notify_one(); // 通知正在等待的消费者线程
lock.unlock();
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟生产时间
}
}
void consumer() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
cond_var.wait(lock, [](){return !q.empty();}); // 等待直到队列非空
int value = q.front();
q.pop();
lock.unlock();
std::cout << "Consumed: " << value << std::endl;
}
}
int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}

解释:

  • 生产者线程:不断地将数据添加到队列中,并通过 cond_var.notify_one() 通知可能正在等待的消费者线程,表示有新的数据可以处理了。
  • 消费者线程:首先尝试获取互斥锁,然后调用 cond_var.wait(lock, condition) 方法检查队列是否为空。如果队列为空,则当前线程会释放锁并进入等待状态,直到被生产者线程通知且条件满足(即队列非空)。一旦条件满足,消费者线程重新获取锁,并从队列中取出数据进行处理。

线程池

线程池(Thread Pool)是一种用于管理线程的机制,旨在减少频繁创建和销毁线程的开销。它预先创建一组线程,并将任务提交到一个任务队列中,线程从队列中取出任务并执行。这种方式特别适用于需要频繁执行短任务的场景,因为线程池可以避免每次任务执行时都创建新线程所带来的性能开销。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <queue>

class ThreadPool {
public:
ThreadPool(int numThreads) : stop(false) {
for (int i = 0; i < numThreads; ++i) {
threads.emplace_back([this] {
while (true) {//每个线程会不断地运行,直到满足特定的退出条件(如线程池关闭且任务队列为空)
std::unique_lock<std::mutex> lock(mutex);
//stop || !tasks.empty()是解锁的条件
condition.wait(lock, [this] { return stop || !tasks.empty(); });
if (stop && tasks.empty()) {//线程退出的条件
return;
}
std::function<void()> task(std::move(tasks.front()));
tasks.pop();
lock.unlock();
task();
}
});
}
}

~ThreadPool() {
{
std::unique_lock<std::mutex> lock(mutex);
stop = true;
}
condition.notify_all();//唤醒所有等待的线程。
for (std::thread& thread : threads) {//等待所有线程完成其当前任务并退出
thread.join();
}
}

template<typename F, typename... Args>
void enqueue(F&& f, Args&&... args) { //f是函数,args是参数
//将用户提交的任务(函数 f 和其参数 args...)封装成一个无参数、无返回值的函数对象
//std::bind 的作用是将函数及其参数绑定在一起,生成一个无参数的函数对象,从而满足任务队列的要求。
std::function<void()> task(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
{
std::unique_lock<std::mutex> lock(mutex);
tasks.emplace(std::move(task));
}
condition.notify_one();
}

private:
std::vector<std::thread> threads;//保存线程池中的所有线程。
std::queue<std::function<void()>> tasks;//任务队列
std::mutex mutex;//用于保护对共享资源(如任务队列)的访问,确保线程安全
std::condition_variable condition;//条件变量,用于在线程间进行同步。线程会在任务队列为空时等待,直到有新任务被加入队列。
bool stop;//标志位,用于指示线程池是否停止运行。当线程池析构时,会设置该标志为 true,通知所有线程退出。
};

int main() {
ThreadPool pool(4);
for (int i = 0; i < 8; ++i) {
pool.enqueue([i] {
std::cout << "Task " << i << " is running in thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Task " << i << " is done" << std::endl;
});
}
return 0;
}

异步并发机制

std::asyncstd::future

  • std::async: 启动一个异步任务,返回一个std::future对象
  • 两种启动策略:
    • std::launch::async:立即在新线程中执行
    • std::launch::deferred:延迟执行,直到调用future.get()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
#include <future>
#include <thread>

int compute() {
std::this_thread::sleep_for(std::chrono::seconds(2));
return 42;
}

int main() {
// 异步执行compute函数
std::future<int> result = std::async(std::launch::async, compute);

// 可以做其他工作...
std::cout << "Doing other work...\n";

// 获取结果(会阻塞直到结果就绪)
int value = result.get();
std::cout << "Result: " << value << std::endl;

return 0;
}
  • std::future: 表示异步操作的结果
    • get():获取结果,会阻塞直到结果就绪
    • wait():等待结果就绪
    • valid():检查future是否关联了共享状态
    • share():转换为std::shared_future

std::packaged_task: 是一个可调用的对象包装器,它将函数调用与future关联起来。

  • std::async更灵活,可以控制任务的执行方式

  • 可以多次使用(通过重置)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    #include <iostream>
    #include <future>
    #include <thread>
    #include <functional>

    int compute(int x, int y) {
    return x + y;
    }
    int main() {
    // 创建一个packaged_task,包装compute函数
    std::packaged_task<int(int,int)> task(compute);
    // 获取与任务关联的future
    std::future<int> result = task.get_future();
    // 在另一个线程中执行任务
    std::thread t(std::move(task), 10, 20);
    t.detach(); // 或使用t.join()
    // 获取结果
    std::cout << "Result: " << result.get() << std::endl;
    return 0;
    }

std::promise: 提供了更底层的设置异步结果的方式,允许显式设置值或异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <iostream>
#include <future>
#include <thread>

void compute(std::promise<int>&& prom) {
try {
// 模拟计算
std::this_thread::sleep_for(std::chrono::seconds(1));
// 设置结果
prom.set_value(42);
} catch (...) {
// 捕获异常并存储到promise中
prom.set_exception(std::current_exception());
}
}
int main() {
std::promise<int> prom;
std::future<int> result = prom.get_future();
std::thread t(compute, std::move(prom));
// 获取结果
try {
std::cout << "Result: " << result.get() << std::endl;
} catch (const std::exception& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
t.join();
return 0;
}