C++20 线程支持库:jthread 与同步原语
C++20 线程支持库:jthread 与同步原语
C++11 引入的 std::thread 解决了"有没有"的问题,但留下了两个设计缺陷:析构时如果没 join 也没 detach 就直接 std::terminate(),以及无法从外部优雅地请求线程停止。C++20 引入了一系列新设施来弥补这些缺陷——std::jthread 的 RAII 线程管理、stop_token 的协作取消机制,以及 latch、barrier、counting_semaphore 等更高层的同步原语。
1. std::jthread:RAII 线程
std::thread 最危险的设计是它的析构函数:
void dangerous() {
std::thread t([] { std::this_thread::sleep_for(10s); });
// 如果这里抛异常,t.~thread() 调用 std::terminate() → 整个程序挂掉
t.join();
}std::jthread 修正了这个行为——析构时自动 join()(如果线程 joinable()):
void safe() {
std::jthread t([] { std::this_thread::sleep_for(10s); });
// 即使异常抛出,t.~jthread() 也会自动 join
}
void explicit_stop() {
std::jthread t([] { std::this_thread::sleep_for(10s); });
// 也可以显式操作:
// t.join(); // OK
// t.detach(); // OK(需要主动选择)
// 默认析构 = request_stop() + join()
}jthread 的第二个关键改进是内建的 stop token 支持。
2. stop_token:协作取消
传统取消线程的方法是通过一个 std::atomic<bool> 标志位:
std::atomic<bool> stop_flag{false};
std::thread worker([&stop_flag] {
while (!stop_flag.load()) {
do_work();
}
});
stop_flag = true;
worker.join();问题:如果 do_work() 内部有一个长时间阻塞的操作,stop_flag 检查不到。
std::stop_token / std::stop_source / std::stop_callback 提供了标准化的协作取消基础设施:
void worker(std::stop_token token) {
while (!token.stop_requested()) {
// 阻塞式等待,但可以响应停止请求
std::unique_lock lock(mutex_);
if (cv_.wait_for(lock, 500ms,
[&] { return data_ready || token.stop_requested(); })) {
if (token.stop_requested()) return;
process();
}
}
}
int main() {
std::jthread t(worker); // jthread 自动传递 stop_token
// ...
t.request_stop(); // 发出停止请求
// t.~jthread() 自动 join
}jthread 构造函数自动接受一个 (std::stop_token) 作为第一个参数的 callable。如果 callable 的第一个参数是 std::stop_token,jthread 内部创建的 stop_source 会自动把 token 传进去。
std::stop_callback:在停止时注册回调
void interruptible_worker(std::stop_token token) {
int fd = open_device();
std::stop_callback cb(token, [fd] {
close(fd); // 强制关闭阻塞的 IO,让工作线程从 read() 中醒来
});
while (!token.stop_requested()) {
char buf[1024];
auto n = read(fd, buf, sizeof(buf)); // 阻塞 IO
if (n > 0) process(buf, n);
}
}stop_callback 在 stop_requested() 变为 true 时执行,只执行一次。这在需要"中断一个阻塞的系统调用"的场景中至关重要——你不能靠轮询 stop_token 来中断 read(),但可以通过回调来 close() 文件描述符,让 read() 以错误返回。
3. std::latch:一次性同步点
std::latch 是一个只能减少、不可重置的计数器。它允许一个或多个线程等待,直到计数器归零:
std::latch done{3}; // 初始计数为 3
// 分发到 3 个线程
for (int i = 0; i < 3; ++i) {
std::jthread([&done, i] {
do_work(i);
done.count_down(); // 原子减 1,归零时唤醒所有等待者
}).detach();
}
done.wait(); // 阻塞直到 count == 0
// 所有 3 个线程都已经完成了各自的工作典型场景:
- 多阶段初始化的完成信号
- 等待所有 worker 完成一轮工作后汇总
- 一次性闸门:主线程创建 N 个 worker,等全部 ready 后再一起开始
latch vs barrier
latch 是一次性的,barrier 可以反复使用。latch 用于"等所有事情完成"的场景,barrier 用于"每轮迭代都要同步"的场景。
4. std::barrier:可重复的同步壁垒
std::barrier 是 latch 的可重用版本:
constexpr int kThreads = 4;
std::barrier sync_point{kThreads}; // 需要 kThreads 个到达者
auto phase = [&](int id) {
for (int round = 0; round < 10; ++round) {
do_computation(round, id);
// 等待所有线程完成这一轮
sync_point.arrive_and_wait();
// 所有线程都到了这里——这一轮的结果是确定性的
exchange_results(round, id);
sync_point.arrive_and_wait(); // 再同步一次
}
};barrier 还支持一个完成函数(completion function),在每次所有线程到达后、释放它们之前执行:
auto on_phase_complete = [round = 0]() mutable {
std::cout << "Phase " << round++ << " done\n";
};
std::barrier sync_point{kThreads, on_phase_complete};完成函数会在参与同步的线程之一上执行(不是独立线程)。
5. std::counting_semaphore:信号量
C++20 终于有了标准的信号量,而且是两种:
std::counting_semaphore<N>:最大计数为N的信号量(N是编译期常量)std::binary_semaphore:等价于counting_semaphore<1>
// 经典的生产者-消费者(有界缓冲)
std::counting_semaphore<8> slots{8}; // 初始有 8 个空位
std::counting_semaphore<8> items{0}; // 初始有 0 个产品
void producer() {
for (int i = 0; i < 100; ++i) {
slots.acquire(); // 等空位(减 1)
buffer_[write_pos_] = produce(i);
write_pos_ = (write_pos_ + 1) % 8;
items.release(); // 新产品就绪(加 1)
}
}
void consumer() {
for (int i = 0; i < 100; ++i) {
items.acquire(); // 等产品(减 1)
auto data = buffer_[read_pos_];
read_pos_ = (read_pos_ + 1) % 8;
slots.release(); // 释放空位(加 1)
consume(data);
}
}信号量是最基础的同步原语,mutex + condition_variable 可以模拟它,但手工实现往往更冗长且易错。对于简单的"允许最多 N 个并发访问"场景,信号量是最直接的表达。
6. 选型速查
| 场景 | 选择 | 原因 |
|---|---|---|
| 启动线程,需要自动 join | std::jthread | RAII 析构,避免 terminate |
| 需要请求线程停止 | std::jthread + std::stop_token | 标准化协作取消 |
| 需要中断阻塞的 IO 来响应停止 | stop_token + stop_callback | 回调中 close fd |
| N 个线程完成工作后一起继续 | std::barrier | 可重复,支持完成函数 |
| 一次性等待 N 个完成通知 | std::latch | 比 barrier 更轻量 |
| 限制最大并发数 / 产品-空位 | std::counting_semaphore | 最简语义 |
总结
C++20 的线程支持库是对 C++11 并发体系的"补课"而非"替代"。std::jthread 和 stop_token 修正了 std::thread 的设计缺陷,latch/barrier/counting_semaphore 填补了同步原语的空缺,让实现 fork-join、管道、有界缓冲等常见并发模式变得不需要手写条件变量和状态管理。
在实际项目中,将 std::thread 替换为 std::jthread 几乎总是正确的默认选择。