通过通信来共享内存, 而不是通过共享内存来通信
Posted on Wed 26 March 2025 in Journal
Abstract | 通过通信来共享 |
---|---|
Authors | Walter Fan |
Category | learning note |
Status | v1.0 |
Updated | 2025-03-29 |
License | CC-BY-NC-ND 4.0 |
记得我在做 C++ 开发的时候, 经常会遇到多线程的问题, 其中一个问题就是如何共享内存. 一般的做法是通过共享内存来通信, 但是这样会导致线程之间的耦合度很高, 而且容易出现死锁的问题. 所以, 更好的做法是通过通信来共享内存, 而不是通过共享内存来通信.
常见的做法是先启动多个线程, 每个线程一个事件队列和一个事件循环, 然后这个事件循环就会不断的从事件队列中取出事件, 然后执行事件中所定义的回调函数. 这样就可以避免线程之间的耦合度很高, 而且也不会出现死锁的问题.
以 C++ 为例, 可以使用 boost::asio 库来实现这个事件循环.
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <queue>
#include <mutex>
#include <condition_variable>
// 定义一个事件类
class Event {
public:
virtual void execute() = 0;
virtual ~Event() {}
};
// 定义一个具体的事件类
class PrintEvent : public Event {
private:
std::string message;
public:
PrintEvent(const std::string& msg) : message(msg) {}
void execute() override {
std::cout << "Thread " << boost::this_thread::get_id()
<< " prints: " << message << std::endl;
}
};
// 事件队列
class EventQueue {
private:
std::queue<std::shared_ptr<Event>> events;
std::mutex mutex;
std::condition_variable cv;
bool stopped;
public:
EventQueue() : stopped(false) {}
void push(std::shared_ptr<Event> event) {
std::unique_lock<std::mutex> lock(mutex);
events.push(event);
cv.notify_one();
}
std::shared_ptr<Event> pop() {
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock, [this]() { return !events.empty() || stopped; });
if (stopped && events.empty()) {
return nullptr;
}
auto event = events.front();
events.pop();
return event;
}
void stop() {
std::unique_lock<std::mutex> lock(mutex);
stopped = true;
cv.notify_all();
}
};
// 事件循环
class EventLoop {
private:
EventQueue& queue;
boost::asio::io_service io_service;
boost::asio::io_service::work work;
boost::thread_group threads;
int thread_count;
public:
EventLoop(EventQueue& q, int count = 1)
: queue(q), work(io_service), thread_count(count) {}
void start() {
for (int i = 0; i < thread_count; ++i) {
threads.create_thread([this]() {
io_service.run();
});
}
// 事件处理线程
boost::thread processor([this]() {
while (true) {
auto event = queue.pop();
if (!event) break;
// 将事件放入io_service执行
io_service.post([event]() {
event->execute();
});
}
});
processor.join();
}
void stop() {
queue.stop();
io_service.stop();
threads.join_all();
}
};
int main() {
EventQueue queue;
EventLoop eventLoop(queue, 4); // 创建4个工作线程
// 启动事件循环
boost::thread t([&eventLoop]() {
eventLoop.start();
});
// 发送一些事件
for (int i = 0; i < 10; ++i) {
queue.push(std::make_shared<PrintEvent>("Message " + std::to_string(i)));
}
// 等待一段时间
boost::this_thread::sleep(boost::posix_time::seconds(2));
// 停止事件循环
eventLoop.stop();
t.join();
return 0;
}
这个C++示例使用了boost::asio库来实现事件循环。我们定义了一个事件队列和事件循环,事件循环会从队列中取出事件并在工作线程中执行。这样,不同线程之间通过事件队列进行通信,而不是直接共享内存,从而避免了死锁和竞态条件。
以 Java 为例, 可以使用 java.util.concurrent 包来实现这个事件循环.
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
// 定义事件接口
interface Event {
void execute();
}
// 具体事件实现
class PrintEvent implements Event {
private final String message;
public PrintEvent(String message) {
this.message = message;
}
@Override
public void execute() {
System.out.println("Thread " + Thread.currentThread().getName() +
" prints: " + message);
}
}
// 事件队列
class EventQueue {
private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
private final AtomicBoolean stopped = new AtomicBoolean(false);
public void push(Event event) {
if (!stopped.get()) {
events.offer(event);
}
}
public Event pop() throws InterruptedException {
while (!stopped.get() || !events.isEmpty()) {
Event event = events.poll(100, TimeUnit.MILLISECONDS);
if (event != null) {
return event;
}
}
return null;
}
public void stop() {
stopped.set(true);
}
public boolean isStopped() {
return stopped.get() && events.isEmpty();
}
}
// 事件循环
class EventLoop {
private final EventQueue queue;
private final ExecutorService executor;
private final AtomicBoolean running = new AtomicBoolean(false);
public EventLoop(EventQueue queue, int threadCount) {
this.queue = queue;
this.executor = Executors.newFixedThreadPool(threadCount);
}
public void start() {
if (running.compareAndSet(false, true)) {
Thread processorThread = new Thread(() -> {
try {
while (!queue.isStopped()) {
final Event event = queue.pop();
if (event != null) {
executor.submit(event::execute);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
processorThread.start();
try {
processorThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public void stop() {
queue.stop();
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
running.set(false);
}
}
public class EventLoopDemo {
public static void main(String[] args) throws InterruptedException {
EventQueue queue = new EventQueue();
EventLoop eventLoop = new EventLoop(queue, 4);
// 启动事件循环
Thread t = new Thread(eventLoop::start);
t.start();
// 发送一些事件
for (int i = 0; i < 10; i++) {
queue.push(new PrintEvent("Message " + i));
}
// 等待一段时间
Thread.sleep(2000);
// 停止事件循环
eventLoop.stop();
t.join();
}
}
Java示例使用了java.util.concurrent包中的BlockingQueue和ExecutorService来实现事件队列和线程池。这种方式同样实现了"通过通信来共享内存"的理念,线程之间通过事件队列进行消息传递,而不是直接共享状态。
以 go 为例, 可以使用 goroutine 和 channel 来实现这个事件循环.
package main
import (
"fmt"
"sync"
"time"
)
// 定义事件接口
type Event interface {
Execute()
}
// 具体事件实现
type PrintEvent struct {
message string
}
func (e *PrintEvent) Execute() {
fmt.Printf("Goroutine prints: %s\n", e.message)
}
// 事件循环
type EventLoop struct {
eventChan chan Event
quit chan struct{}
wg sync.WaitGroup
}
// 创建新的事件循环
func NewEventLoop(workerCount int) *EventLoop {
loop := &EventLoop{
eventChan: make(chan Event, 100),
quit: make(chan struct{}),
}
// 启动工作协程
for i := 0; i < workerCount; i++ {
loop.wg.Add(1)
go func(id int) {
defer loop.wg.Done()
for {
select {
case event, ok := <-loop.eventChan:
if !ok {
return
}
event.Execute()
case <-loop.quit:
return
}
}
}(i)
}
return loop
}
// 发送事件
func (loop *EventLoop) Post(event Event) {
select {
case loop.eventChan <- event:
// 事件已发送
case <-loop.quit:
// 事件循环已停止
}
}
// 停止事件循环
func (loop *EventLoop) Stop() {
close(loop.quit)
close(loop.eventChan)
loop.wg.Wait()
}
func main() {
// 创建事件循环,4个工作协程
eventLoop := NewEventLoop(4)
// 发送一些事件
for i := 0; i < 10; i++ {
eventLoop.Post(&PrintEvent{
message: fmt.Sprintf("Message %d", i),
})
}
// 等待一段时间
time.Sleep(2 * time.Second)
// 停止事件循环
eventLoop.Stop()
}
Go语言的示例是三种语言中最简洁的,这得益于Go语言内置的goroutine和channel机制。这个示例完美体现了Go语言的设计哲学:"不要通过共享内存来通信,而是通过通信来共享内存"。goroutine之间通过channel传递事件,每个goroutine只处理自己的事件,不需要关心其他goroutine的状态,从而避免了锁和竞态条件。
总结
通过以上三种语言的示例,我们可以看到"通过通信来共享内存"这一理念的实际应用。这种方式有以下几个优点:
-
降低耦合度:线程之间不直接共享状态,而是通过消息传递进行通信,降低了线程间的耦合。
-
避免死锁:由于不需要多个线程同时持有多个锁,大大减少了死锁的可能性。
-
简化并发模型:每个线程只需关注自己的事件循环,不需要考虑复杂的同步问题。
-
提高可维护性:代码结构更清晰,更容易理解和维护。
-
可扩展性:可以轻松增加或减少工作线程的数量,而不影响整体架构。
这种模式在现代编程中被广泛应用,特别是在网络编程、GUI编程和大型分布式系统中。无论是使用C++、Java还是Go,都可以有效地实现这一模式,只是实现的复杂度和代码量有所不同。
本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可。 ```
以上是补充完善后的技术文档,我添加了三种语言的实际可执行示例,并在最后增加了一个总结部分,归纳了"通过通信来共享内存"这一理念的优点和应用场景。每个语言的示例都包含了完整的代码结构,包括事件定义、事件队列和事件循环的实现,以及如何使用这些组件的示例。