并发编程的威力与挑战
欢迎进入C++并发编程的精彩世界!在现代计算环境中,多核处理器已成为标准配置,充分利用这些计算资源是提升程序性能的关键。并发编程允许程序同时执行多个任务,就像餐厅里多位服务员同时服务不同餐桌,大大提高了整体效率。然而,并发编程也带来了新的挑战:竞争条件、死锁、数据同步等问题需要开发者谨慎处理。
C++11引入的标准线程库彻底改变了C++并发编程的面貌,提供了跨平台的线程、互斥量、条件变量等工具,使开发者能够构建高效可靠的多线程应用。无论你是开发高性能服务器、游戏引擎还是科学计算应用,掌握并发编程都是必不可少的技能。
线程基础:创建与管理线程
创建线程的三种方式
C++11通过std::thread
类简化了线程创建:
#include <iostream>
#include <thread>
#include <vector>// 1. 使用函数指针
void threadFunction(int id) {std::cout << "线程 " << id << " 运行中\n";
}// 2. 使用函数对象
class ThreadFunctor {
public:void operator()(int id) {std::cout << "函数对象线程 " << id << " 运行中\n";}
};int main() {std::vector<std::thread> threads;// 创建线程threads.emplace_back(threadFunction, 1); // 函数指针threads.emplace_back(ThreadFunctor(), 2); // 函数对象// 3. 使用lambda表达式threads.emplace_back([](int id) {std::cout << "Lambda线程 " << id << " 运行中\n";}, 3);// 等待所有线程完成for (auto& t : threads) {t.join();}std::cout << "所有线程已完成\n";return 0;
}
线程管理与分离
线程可以在创建后分离(detach),使其在后台运行:
#include <iostream>
#include <thread>
#include <chrono>void backgroundTask() {for (int i = 0; i < 5; ++i) {std::cout << "后台任务运行中... (" << i+1 << "/5)\n";std::this_thread::sleep_for(std::chrono::seconds(1));}
}int main() {std::thread worker(backgroundTask);worker.detach(); // 分离线程,使其独立运行// 主线程继续执行其他任务std::cout << "主线程继续执行...\n";std::this_thread::sleep_for(std::chrono::seconds(3));std::cout << "主线程即将退出\n";// 注意:分离线程可能在主线程退出后继续运行return 0;
}
共享数据与同步机制
互斥量(Mutex):保护共享数据
互斥量是最基本的同步机制,用于防止多个线程同时访问共享资源:
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>std::mutex mtx; // 全局互斥量
int sharedCounter = 0;void incrementCounter(int iterations) {for (int i = 0; i < iterations; ++i) {// 使用RAII风格的锁管理std::lock_guard<std::mutex> lock(mtx);++sharedCounter;}
}int main() {const int numThreads = 5;const int iterationsPerThread = 10000;std::vector<std::thread> threads;for (int i = 0; i < numThreads; ++i) {threads.emplace_back(incrementCounter, iterationsPerThread);}for (auto& t : threads) {t.join();}std::cout << "最终计数器值: " << sharedCounter << " (应为 " << numThreads * iterationsPerThread << ")\n";return 0;
}
更灵活的锁管理:unique_lock
std::unique_lock
比lock_guard
更灵活,支持延迟锁定和手动解锁:
#include <mutex>
#include <thread>
#include <vector>std::mutex mtx;
std::vector<int> sharedData;void processData(int id) {std::unique_lock<std::mutex> lock(mtx, std::defer_lock); // 延迟锁定// 执行一些不需要锁的操作std::this_thread::sleep_for(std::chrono::milliseconds(100));lock.lock(); // 显式锁定sharedData.push_back(id);lock.unlock(); // 提前解锁// 继续执行不需要锁的操作
}
递归互斥量
当函数可能递归调用自身或需要多次锁定同一互斥量时,使用递归互斥量:
#include <mutex>
#include <iostream>std::recursive_mutex recursiveMtx;void recursiveFunction(int depth) {std::lock_guard<std::recursive_mutex> lock(recursiveMtx);std::cout << "深度: " << depth << "\n";if (depth > 0) {recursiveFunction(depth - 1);}
}int main() {recursiveFunction(3);return 0;
}
高级同步机制
条件变量:线程间通信
条件变量允许线程等待特定条件成立:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>std::mutex mtx;
std::condition_variable cv;
std::queue<int> dataQueue;
bool producerFinished = false;void producer() {for (int i = 0; i < 10; ++i) {std::this_thread::sleep_for(std::chrono::milliseconds(100));{std::lock_guard<std::mutex> lock(mtx);dataQueue.push(i);std::cout << "生产: " << i << "\n";}cv.notify_one(); // 通知一个等待的消费者}{std::lock_guard<std::mutex> lock(mtx);producerFinished = true;}cv.notify_all(); // 通知所有消费者
}void consumer(int id) {while (true) {std::unique_lock<std::mutex> lock(mtx);// 等待条件:队列不为空或生产者已完成cv.wait(lock, [] { return !dataQueue.empty() || producerFinished; });if (producerFinished && dataQueue.empty()) {break; // 退出条件}if (!dataQueue.empty()) {int data = dataQueue.front();dataQueue.pop();lock.unlock(); // 提前解锁std::cout << "消费者 " << id << " 消费: " << data << "\n";std::this_thread::sleep_for(std::chrono::milliseconds(200));}}
}int main() {std::thread prod(producer);std::thread cons1(consumer, 1);std::thread cons2(consumer, 2);prod.join();cons1.join();cons2.join();return 0;
}
原子操作:无锁编程
原子类型提供无需互斥量的线程安全操作:
#include <iostream>
#include <thread>
#include <atomic>
#include <vector>std::atomic<int> atomicCounter(0);void atomicIncrement(int iterations) {for (int i = 0; i < iterations; ++i) {atomicCounter.fetch_add(1, std::memory_order_relaxed);}
}int main() {const int numThreads = 5;const int iterationsPerThread = 100000;std::vector<std::thread> threads;for (int i = 0; i < numThreads; ++i) {threads.emplace_back(atomicIncrement, iterationsPerThread);}for (auto& t : threads) {t.join();}std::cout << "原子计数器值: " << atomicCounter << " (应为 " << numThreads * iterationsPerThread << ")\n";return 0;
}
内存顺序模型
C++提供了不同的内存顺序选项,平衡性能与一致性需求:
#include <atomic>
#include <thread>
#include <iostream>std::atomic<int> x(0), y(0);
int r1, r2;void thread1() {x.store(1, std::memory_order_release); // 释放语义
}void thread2() {y.store(1, std::memory_order_release); // 释放语义
}void thread3() {r1 = x.load(std::memory_order_acquire); // 获取语义r2 = y.load(std::memory_order_acquire); // 获取语义
}int main() {while (true) {x = 0;y = 0;r1 = 0;r2 = 0;std::thread t1(thread1);std::thread t2(thread2);std::thread t3(thread3);t1.join();t2.join();t3.join();if (r1 == 1 && r2 == 0) {std::cout << "检测到重排序: r1=" << r1 << ", r2=" << r2 << "\n";break;}}return 0;
}
异步任务与Future
std::async与std::future
std::async
和std::future
简化了异步操作:
#include <iostream>
#include <future>
#include <vector>
#include <numeric>// 计算数组部分和
int partialSum(const std::vector<int>& data, int start, int end) {return std::accumulate(data.begin() + start, data.begin() + end, 0);
}int main() {std::vector<int> data(1000000);std::iota(data.begin(), data.end(), 1); // 填充1到1000000// 启动异步任务auto future1 = std::async(std::launch::async, partialSum, std::ref(data), 0, data.size()/2);// 主线程同时计算另一半int sum2 = partialSum(data, data.size()/2, data.size());// 获取异步任务结果int sum1 = future1.get();std::cout << "总和: " << sum1 + sum2 << " (应为 " << 1000000 * 1000001 / 2 << ")\n";return 0;
}
std::promise与std::future
std::promise
允许显式设置值,通过std::future
获取:
#include <iostream>
#include <thread>
#include <future>
#include <cmath>void computeSqrt(std::promise<double>&& prom, double value) {try {if (value < 0) {throw std::runtime_error("负数不能计算平方根");}prom.set_value(std::sqrt(value)); // 设置结果值} catch (...) {prom.set_exception(std::current_exception()); // 设置异常}
}int main() {std::promise<double> prom;auto future = prom.get_future();std::thread worker(computeSqrt, std::move(prom), -9);try {double result = future.get();std::cout << "结果: " << result << "\n";} catch (const std::exception& e) {std::cerr << "错误: " << e.what() << "\n";}worker.join();return 0;
}
线程池实现
线程池可减少线程创建销毁开销,提高性能:
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>class ThreadPool {
public:ThreadPool(size_t numThreads) : stop(false) {for (size_t i = 0; i < numThreads; ++i) {workers.emplace_back([this] {while (true) {std::function<void()> task;{std::unique_lock<std::mutex> lock(queueMutex);condition.wait(lock, [this] {return stop || !tasks.empty();});if (stop && tasks.empty()) {return;}task = std::move(tasks.front());tasks.pop();}task();}});}}template <class F, class... Args>auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queueMutex);if (stop) {throw std::runtime_error("线程池已停止");}tasks.emplace([task] { (*task)(); });}condition.notify_one();return res;}~ThreadPool() {{std::unique_lock<std::mutex> lock(queueMutex);stop = true;}condition.notify_all();for (std::thread &worker : workers) {worker.join();}}private:std::vector<std::thread> workers;std::queue<std::function<void()>> tasks;std::mutex queueMutex;std::condition_variable condition;bool stop;
};int main() {ThreadPool pool(4);std::vector<std::future<int>> results;for (int i = 0; i < 8; ++i) {results.emplace_back(pool.enqueue([i] {std::cout << "任务 " << i << " 开始\n";std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "任务 " << i << " 结束\n";return i * i;}));}for (auto& result : results) {std::cout << "结果: " << result.get() << std::endl;}return 0;
}
并行算法(C++17)
C++17引入了并行执行策略,简化并行算法实现:
#include <iostream>
#include <vector>
#include <algorithm>
#include <execution>
#include <cmath>
#include <chrono>int main() {const size_t size = 10000000;std::vector<double> data(size);// 初始化数据for (size_t i = 0; i < size; ++i) {data[i] = std::sin(i * 0.001);}// 串行排序计时auto start = std::chrono::high_resolution_clock::now();std::sort(data.begin(), data.end());auto end = std::chrono::high_resolution_clock::now();std::cout << "串行排序耗时: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()<< " ms\n";// 重置数据for (size_t i = 0; i < size; ++i) {data[i] = std::sin(i * 0.001);}// 并行排序计时start = std::chrono::high_resolution_clock::now();std::sort(std::execution::par, data.begin(), data.end());end = std::chrono::high_resolution_clock::now();std::cout << "并行排序耗时: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()<< " ms\n";// 并行变换std::vector<double> result(size);std::transform(std::execution::par,data.begin(), data.end(),result.begin(),[](double x) { return x * x; });// 并行归约double sum = std::reduce(std::execution::par,result.begin(), result.end(),0.0);std::cout << "平方和: " << sum << "\n";return 0;
}
实践项目:并发Web请求处理器
让我们实现一个处理多个Web请求的并发系统:
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <future>
#include <random>
#include <chrono>
#include <queue>
#include <atomic>// 模拟Web请求
struct WebRequest {int id;std::string url;
};// 模拟Web响应
struct WebResponse {int requestId;int statusCode;std::string content;
};// 模拟获取Web响应
WebResponse fetchUrl(const WebRequest& request) {// 模拟网络延迟std::random_device rd;std::mt19937 gen(rd());std::uniform_int_distribution<> delayDist(100, 500);std::this_thread::sleep_for(std::chrono::milliseconds(delayDist(gen)));// 模拟响应return {request.id,200, // HTTP OK"响应内容: " + request.url};
}class RequestProcessor {
public:RequestProcessor(size_t workerCount) : stop(false) {for (size_t i = 0; i < workerCount; ++i) {workers.emplace_back(&RequestProcessor::workerThread, this);}}~RequestProcessor() {stop = true;cv.notify_all();for (auto& worker : workers) {if (worker.joinable()) worker.join();}}std::future<WebResponse> submitRequest(WebRequest request) {std::promise<WebResponse> prom;auto future = prom.get_future();{std::lock_guard<std::mutex> lock(queueMutex);requestQueue.emplace(std::move(request), std::move(prom));}cv.notify_one();return future;}void workerThread() {while (!stop) {std::pair<WebRequest, std::promise<WebResponse>> task;{std::unique_lock<std::mutex> lock(queueMutex);cv.wait(lock, [this] {return stop || !requestQueue.empty();});if (stop && requestQueue.empty()) {return;}task = std::move(requestQueue.front());requestQueue.pop();}// 处理请求WebResponse response = fetchUrl(task.first);task.second.set_value(std::move(response));// 更新统计processedCount.fetch_add(1, std::memory_order_relaxed);}}int getProcessedCount() const {return processedCount.load();}private:std::vector<std::thread> workers;std::queue<std::pair<WebRequest, std::promise<WebResponse>>> requestQueue;std::mutex queueMutex;std::condition_variable cv;std::atomic<bool> stop;std::atomic<int> processedCount{0};
};int main() {const int numRequests = 20;const int numWorkers = 4;RequestProcessor processor(numWorkers);std::vector<std::future<WebResponse>> futures;// 提交请求for (int i = 0; i < numRequests; ++i) {WebRequest request{i, "https://example.com/page/" + std::to_string(i)};futures.emplace_back(processor.submitRequest(std::move(request)));}// 获取结果for (int i = 0; i < numRequests; ++i) {WebResponse response = futures[i].get();std::cout << "请求 " << response.requestId << " 完成, 状态: " << response.statusCode<< ", 内容: " << response.content.substr(0, 20) << "..." << std::endl;}std::cout << "总共处理的请求: " << processor.getProcessedCount() << std::endl;return 0;
}
并发编程最佳实践
- 优先使用高级抽象:如
std::async
和并行算法 - 最小化共享数据:使用线程本地存储或消息传递
- 使用RAII管理资源:自动释放锁和线程资源
- 避免死锁:
- 按固定顺序获取锁
- 使用
std::scoped_lock
同时获取多个锁(C++17) - 设置锁超时
- 性能考虑:
- 避免过度同步
- 使用无锁数据结构
- 考虑缓存一致性
- 测试与调试:
- 使用线程分析工具(如TSan)
- 编写确定性测试
- 模拟不同调度顺序
下一步学习路径
现在你已经掌握了C++并发编程的基础,接下来可以探索:
- C++20协程:更轻量级的并发模型
- 无锁数据结构和算法
- GPU并行计算(如CUDA或OpenCL)
- 分布式系统编程
- 高级同步原语:如信号量和屏障
在下一篇文章中,我们将深入探讨C++的现代特性,包括Lambda表达式、移动语义、constexpr等,这些特性让C++代码更简洁高效。
记住,并发编程既是艺术也是科学。真正的掌握来自于实践——尝试扩展今天的Web请求处理器,添加速率限制、错误处理或优先级队列。每个挑战都是提升你并发编程技能的机会!