通过通信来共享内存, 而不是通过共享内存来通信
Posted on Wed 26 March 2025 in Journal
| Abstract | 通过通信来共享 | 
|---|---|
| Authors | Walter Fan | 
| Category | learning note | 
| Status | v1.0 | 
| Updated | 2025-03-29 | 
| License | CC-BY-NC-ND 4.0 | 
记得我在做 C++ 开发的时候, 经常会遇到多线程的问题, 其中一个问题就是如何共享内存. 一般的做法是通过共享内存来通信, 但是这样会导致线程之间的耦合度很高, 而且容易出现死锁的问题. 所以, 更好的做法是通过通信来共享内存, 而不是通过共享内存来通信.
常见的做法是先启动多个线程, 每个线程一个事件队列和一个事件循环, 然后这个事件循环就会不断的从事件队列中取出事件, 然后执行事件中所定义的回调函数. 这样就可以避免线程之间的耦合度很高, 而且也不会出现死锁的问题. 每个线程负责处理自己的事件循环
以 C++ 为例, 可以使用 boost::asio 库来实现这个事件循环.
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <queue>
#include <mutex>
#include <condition_variable>
// 定义一个事件类
class Event {
public:
    virtual void execute() = 0;
    virtual ~Event() {}
};
// 定义一个具体的事件类
class PrintEvent : public Event {
private:
    std::string message;
public:
    PrintEvent(const std::string& msg) : message(msg) {}
    void execute() override {
        std::cout << "Thread " << boost::this_thread::get_id() 
                  << " prints: " << message << std::endl;
    }
};
// 事件队列
class EventQueue {
private:
    std::queue<std::shared_ptr<Event>> events;
    std::mutex mutex;
    std::condition_variable cv;
    bool stopped;
public:
    EventQueue() : stopped(false) {}
    void push(std::shared_ptr<Event> event) {
        std::unique_lock<std::mutex> lock(mutex);
        events.push(event);
        cv.notify_one();
    }
    std::shared_ptr<Event> pop() {
        std::unique_lock<std::mutex> lock(mutex);
        cv.wait(lock, [this]() { return !events.empty() || stopped; });
        if (stopped && events.empty()) {
            return nullptr;
        }
        auto event = events.front();
        events.pop();
        return event;
    }
    void stop() {
        std::unique_lock<std::mutex> lock(mutex);
        stopped = true;
        cv.notify_all();
    }
};
// 事件循环
class EventLoop {
private:
    EventQueue& queue;
    boost::asio::io_service io_service;
    boost::asio::io_service::work work;
    boost::thread_group threads;
    int thread_count;
public:
    EventLoop(EventQueue& q, int count = 1) 
        : queue(q), work(io_service), thread_count(count) {}
    void start() {
        for (int i = 0; i < thread_count; ++i) {
            threads.create_thread([this]() {
                io_service.run();
            });
        }
        // 事件处理线程
        boost::thread processor([this]() {
            while (true) {
                auto event = queue.pop();
                if (!event) break;
                // 将事件放入io_service执行
                io_service.post([event]() {
                    event->execute();
                });
            }
        });
        processor.join();
    }
    void stop() {
        queue.stop();
        io_service.stop();
        threads.join_all();
    }
};
int main() {
    EventQueue queue;
    EventLoop eventLoop(queue, 4); // 创建4个工作线程
    // 启动事件循环
    boost::thread t([&eventLoop]() {
        eventLoop.start();
    });
    // 发送一些事件
    for (int i = 0; i < 10; ++i) {
        queue.push(std::make_shared<PrintEvent>("Message " + std::to_string(i)));
    }
    // 等待一段时间
    boost::this_thread::sleep(boost::posix_time::seconds(2));
    // 停止事件循环
    eventLoop.stop();
    t.join();
    return 0;
}
这个C++示例使用了boost::asio库来实现事件循环。我们定义了一个事件队列和事件循环,事件循环会从队列中取出事件并在工作线程中执行。这样,不同线程之间通过事件队列进行通信,而不是直接共享内存,从而避免了死锁和竞态条件。
以 Java 为例, 可以使用 java.util.concurrent 包来实现这个事件循环.
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
// 定义事件接口
interface Event {
    void execute();
}
// 具体事件实现
class PrintEvent implements Event {
    private final String message;
    public PrintEvent(String message) {
        this.message = message;
    }
    @Override
    public void execute() {
        System.out.println("Thread " + Thread.currentThread().getName() + 
                           " prints: " + message);
    }
}
// 事件队列
class EventQueue {
    private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    public void push(Event event) {
        if (!stopped.get()) {
            events.offer(event);
        }
    }
    public Event pop() throws InterruptedException {
        while (!stopped.get() || !events.isEmpty()) {
            Event event = events.poll(100, TimeUnit.MILLISECONDS);
            if (event != null) {
                return event;
            }
        }
        return null;
    }
    public void stop() {
        stopped.set(true);
    }
    public boolean isStopped() {
        return stopped.get() && events.isEmpty();
    }
}
// 事件循环
class EventLoop {
    private final EventQueue queue;
    private final ExecutorService executor;
    private final AtomicBoolean running = new AtomicBoolean(false);
    public EventLoop(EventQueue queue, int threadCount) {
        this.queue = queue;
        this.executor = Executors.newFixedThreadPool(threadCount);
    }
    public void start() {
        if (running.compareAndSet(false, true)) {
            Thread processorThread = new Thread(() -> {
                try {
                    while (!queue.isStopped()) {
                        final Event event = queue.pop();
                        if (event != null) {
                            executor.submit(event::execute);
                        }
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
            processorThread.start();
            try {
                processorThread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    public void stop() {
        queue.stop();
        executor.shutdown();
        try {
            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        running.set(false);
    }
}
public class EventLoopDemo {
    public static void main(String[] args) throws InterruptedException {
        EventQueue queue = new EventQueue();
        EventLoop eventLoop = new EventLoop(queue, 4);
        // 启动事件循环
        Thread t = new Thread(eventLoop::start);
        t.start();
        // 发送一些事件
        for (int i = 0; i < 10; i++) {
            queue.push(new PrintEvent("Message " + i));
        }
        // 等待一段时间
        Thread.sleep(2000);
        // 停止事件循环
        eventLoop.stop();
        t.join();
    }
}
Java示例使用了java.util.concurrent包中的BlockingQueue和ExecutorService来实现事件队列和线程池。这种方式同样实现了"通过通信来共享内存"的理念,线程之间通过事件队列进行消息传递,而不是直接共享状态。
以 go 为例, 可以使用 goroutine 和 channel 来实现这个事件循环.
package main
import (
    "fmt"
    "sync"
    "time"
)
// 定义事件接口
type Event interface {
    Execute()
}
// 具体事件实现
type PrintEvent struct {
    message string
}
func (e *PrintEvent) Execute() {
    fmt.Printf("Goroutine prints: %s\n", e.message)
}
// 事件循环
type EventLoop struct {
    eventChan chan Event
    quit      chan struct{}
    wg        sync.WaitGroup
}
// 创建新的事件循环
func NewEventLoop(workerCount int) *EventLoop {
    loop := &EventLoop{
        eventChan: make(chan Event, 100),
        quit:      make(chan struct{}),
    }
    // 启动工作协程
    for i := 0; i < workerCount; i++ {
        loop.wg.Add(1)
        go func(id int) {
            defer loop.wg.Done()
            for {
                select {
                case event, ok := <-loop.eventChan:
                    if !ok {
                        return
                    }
                    event.Execute()
                case <-loop.quit:
                    return
                }
            }
        }(i)
    }
    return loop
}
// 发送事件
func (loop *EventLoop) Post(event Event) {
    select {
    case loop.eventChan <- event:
        // 事件已发送
    case <-loop.quit:
        // 事件循环已停止
    }
}
// 停止事件循环
func (loop *EventLoop) Stop() {
    close(loop.quit)
    close(loop.eventChan)
    loop.wg.Wait()
}
func main() {
    // 创建事件循环,4个工作协程
    eventLoop := NewEventLoop(4)
    // 发送一些事件
    for i := 0; i < 10; i++ {
        eventLoop.Post(&PrintEvent{
            message: fmt.Sprintf("Message %d", i),
        })
    }
    // 等待一段时间
    time.Sleep(2 * time.Second)
    // 停止事件循环
    eventLoop.Stop()
}
Go语言的示例是三种语言中最简洁的,这得益于Go语言内置的goroutine和channel机制。这个示例完美体现了Go语言的设计哲学:"不要通过共享内存来通信,而是通过通信来共享内存"。goroutine之间通过channel传递事件,每个goroutine只处理自己的事件,不需要关心其他goroutine的状态,从而避免了锁和竞态条件。
总结
通过以上三种语言的示例,我们可以看到"通过通信来共享内存"这一理念的实际应用。这种方式有以下几个优点:
- 
降低耦合度:线程之间不直接共享状态,而是通过消息传递进行通信,降低了线程间的耦合。
 - 
避免死锁:由于不需要多个线程同时持有多个锁,大大减少了死锁的可能性。
 - 
简化并发模型:每个线程只需关注自己的事件循环,不需要考虑复杂的同步问题。
 - 
提高可维护性:代码结构更清晰,更容易理解和维护。
 - 
可扩展性:可以轻松增加或减少工作线程的数量,而不影响整体架构。
 
这种模式在现代编程中被广泛应用,特别是在网络编程、GUI编程和大型分布式系统中。无论是使用C++、Java还是Go,都可以有效地实现这一模式,只是实现的复杂度和代码量有所不同。
本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可。 ```
以上是补充完善后的技术文档,我添加了三种语言的实际可执行示例,并在最后增加了一个总结部分,归纳了"通过通信来共享内存"这一理念的优点和应用场景。每个语言的示例都包含了完整的代码结构,包括事件定义、事件队列和事件循环的实现,以及如何使用这些组件的示例。