通过通信来共享内存, 而不是通过共享内存来通信

Posted on Wed 26 March 2025 in Journal

Abstract 通过通信来共享
Authors Walter Fan
Category learning note
Status v1.0
Updated 2025-03-29
License CC-BY-NC-ND 4.0

记得我在做 C++ 开发的时候, 经常会遇到多线程的问题, 其中一个问题就是如何共享内存. 一般的做法是通过共享内存来通信, 但是这样会导致线程之间的耦合度很高, 而且容易出现死锁的问题. 所以, 更好的做法是通过通信来共享内存, 而不是通过共享内存来通信.

常见的做法是先启动多个线程, 每个线程一个事件队列和一个事件循环, 然后这个事件循环就会不断的从事件队列中取出事件, 然后执行事件中所定义的回调函数. 这样就可以避免线程之间的耦合度很高, 而且也不会出现死锁的问题.

以 C++ 为例, 可以使用 boost::asio 库来实现这个事件循环.

#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <queue>
#include <mutex>
#include <condition_variable>

// 定义一个事件类
class Event {
public:
    virtual void execute() = 0;
    virtual ~Event() {}
};

// 定义一个具体的事件类
class PrintEvent : public Event {
private:
    std::string message;
public:
    PrintEvent(const std::string& msg) : message(msg) {}

    void execute() override {
        std::cout << "Thread " << boost::this_thread::get_id() 
                  << " prints: " << message << std::endl;
    }
};

// 事件队列
class EventQueue {
private:
    std::queue<std::shared_ptr<Event>> events;
    std::mutex mutex;
    std::condition_variable cv;
    bool stopped;

public:
    EventQueue() : stopped(false) {}

    void push(std::shared_ptr<Event> event) {
        std::unique_lock<std::mutex> lock(mutex);
        events.push(event);
        cv.notify_one();
    }

    std::shared_ptr<Event> pop() {
        std::unique_lock<std::mutex> lock(mutex);
        cv.wait(lock, [this]() { return !events.empty() || stopped; });

        if (stopped && events.empty()) {
            return nullptr;
        }

        auto event = events.front();
        events.pop();
        return event;
    }

    void stop() {
        std::unique_lock<std::mutex> lock(mutex);
        stopped = true;
        cv.notify_all();
    }
};

// 事件循环
class EventLoop {
private:
    EventQueue& queue;
    boost::asio::io_service io_service;
    boost::asio::io_service::work work;
    boost::thread_group threads;
    int thread_count;

public:
    EventLoop(EventQueue& q, int count = 1) 
        : queue(q), work(io_service), thread_count(count) {}

    void start() {
        for (int i = 0; i < thread_count; ++i) {
            threads.create_thread([this]() {
                io_service.run();
            });
        }

        // 事件处理线程
        boost::thread processor([this]() {
            while (true) {
                auto event = queue.pop();
                if (!event) break;

                // 将事件放入io_service执行
                io_service.post([event]() {
                    event->execute();
                });
            }
        });

        processor.join();
    }

    void stop() {
        queue.stop();
        io_service.stop();
        threads.join_all();
    }
};

int main() {
    EventQueue queue;
    EventLoop eventLoop(queue, 4); // 创建4个工作线程

    // 启动事件循环
    boost::thread t([&eventLoop]() {
        eventLoop.start();
    });

    // 发送一些事件
    for (int i = 0; i < 10; ++i) {
        queue.push(std::make_shared<PrintEvent>("Message " + std::to_string(i)));
    }

    // 等待一段时间
    boost::this_thread::sleep(boost::posix_time::seconds(2));

    // 停止事件循环
    eventLoop.stop();
    t.join();

    return 0;
}

这个C++示例使用了boost::asio库来实现事件循环。我们定义了一个事件队列和事件循环,事件循环会从队列中取出事件并在工作线程中执行。这样,不同线程之间通过事件队列进行通信,而不是直接共享内存,从而避免了死锁和竞态条件。

以 Java 为例, 可以使用 java.util.concurrent 包来实现这个事件循环.

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

// 定义事件接口
interface Event {
    void execute();
}

// 具体事件实现
class PrintEvent implements Event {
    private final String message;

    public PrintEvent(String message) {
        this.message = message;
    }

    @Override
    public void execute() {
        System.out.println("Thread " + Thread.currentThread().getName() + 
                           " prints: " + message);
    }
}

// 事件队列
class EventQueue {
    private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
    private final AtomicBoolean stopped = new AtomicBoolean(false);

    public void push(Event event) {
        if (!stopped.get()) {
            events.offer(event);
        }
    }

    public Event pop() throws InterruptedException {
        while (!stopped.get() || !events.isEmpty()) {
            Event event = events.poll(100, TimeUnit.MILLISECONDS);
            if (event != null) {
                return event;
            }
        }
        return null;
    }

    public void stop() {
        stopped.set(true);
    }

    public boolean isStopped() {
        return stopped.get() && events.isEmpty();
    }
}

// 事件循环
class EventLoop {
    private final EventQueue queue;
    private final ExecutorService executor;
    private final AtomicBoolean running = new AtomicBoolean(false);

    public EventLoop(EventQueue queue, int threadCount) {
        this.queue = queue;
        this.executor = Executors.newFixedThreadPool(threadCount);
    }

    public void start() {
        if (running.compareAndSet(false, true)) {
            Thread processorThread = new Thread(() -> {
                try {
                    while (!queue.isStopped()) {
                        final Event event = queue.pop();
                        if (event != null) {
                            executor.submit(event::execute);
                        }
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
            processorThread.start();

            try {
                processorThread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void stop() {
        queue.stop();
        executor.shutdown();
        try {
            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        running.set(false);
    }
}

public class EventLoopDemo {
    public static void main(String[] args) throws InterruptedException {
        EventQueue queue = new EventQueue();
        EventLoop eventLoop = new EventLoop(queue, 4);

        // 启动事件循环
        Thread t = new Thread(eventLoop::start);
        t.start();

        // 发送一些事件
        for (int i = 0; i < 10; i++) {
            queue.push(new PrintEvent("Message " + i));
        }

        // 等待一段时间
        Thread.sleep(2000);

        // 停止事件循环
        eventLoop.stop();
        t.join();
    }
}

Java示例使用了java.util.concurrent包中的BlockingQueue和ExecutorService来实现事件队列和线程池。这种方式同样实现了"通过通信来共享内存"的理念,线程之间通过事件队列进行消息传递,而不是直接共享状态。

以 go 为例, 可以使用 goroutine 和 channel 来实现这个事件循环.

package main

import (
    "fmt"
    "sync"
    "time"
)

// 定义事件接口
type Event interface {
    Execute()
}

// 具体事件实现
type PrintEvent struct {
    message string
}

func (e *PrintEvent) Execute() {
    fmt.Printf("Goroutine prints: %s\n", e.message)
}

// 事件循环
type EventLoop struct {
    eventChan chan Event
    quit      chan struct{}
    wg        sync.WaitGroup
}

// 创建新的事件循环
func NewEventLoop(workerCount int) *EventLoop {
    loop := &EventLoop{
        eventChan: make(chan Event, 100),
        quit:      make(chan struct{}),
    }

    // 启动工作协程
    for i := 0; i < workerCount; i++ {
        loop.wg.Add(1)
        go func(id int) {
            defer loop.wg.Done()
            for {
                select {
                case event, ok := <-loop.eventChan:
                    if !ok {
                        return
                    }
                    event.Execute()
                case <-loop.quit:
                    return
                }
            }
        }(i)
    }

    return loop
}

// 发送事件
func (loop *EventLoop) Post(event Event) {
    select {
    case loop.eventChan <- event:
        // 事件已发送
    case <-loop.quit:
        // 事件循环已停止
    }
}

// 停止事件循环
func (loop *EventLoop) Stop() {
    close(loop.quit)
    close(loop.eventChan)
    loop.wg.Wait()
}

func main() {
    // 创建事件循环,4个工作协程
    eventLoop := NewEventLoop(4)

    // 发送一些事件
    for i := 0; i < 10; i++ {
        eventLoop.Post(&PrintEvent{
            message: fmt.Sprintf("Message %d", i),
        })
    }

    // 等待一段时间
    time.Sleep(2 * time.Second)

    // 停止事件循环
    eventLoop.Stop()
}

Go语言的示例是三种语言中最简洁的,这得益于Go语言内置的goroutine和channel机制。这个示例完美体现了Go语言的设计哲学:"不要通过共享内存来通信,而是通过通信来共享内存"。goroutine之间通过channel传递事件,每个goroutine只处理自己的事件,不需要关心其他goroutine的状态,从而避免了锁和竞态条件。

总结

通过以上三种语言的示例,我们可以看到"通过通信来共享内存"这一理念的实际应用。这种方式有以下几个优点:

  1. 降低耦合度:线程之间不直接共享状态,而是通过消息传递进行通信,降低了线程间的耦合。

  2. 避免死锁:由于不需要多个线程同时持有多个锁,大大减少了死锁的可能性。

  3. 简化并发模型:每个线程只需关注自己的事件循环,不需要考虑复杂的同步问题。

  4. 提高可维护性:代码结构更清晰,更容易理解和维护。

  5. 可扩展性:可以轻松增加或减少工作线程的数量,而不影响整体架构。

这种模式在现代编程中被广泛应用,特别是在网络编程、GUI编程和大型分布式系统中。无论是使用C++、Java还是Go,都可以有效地实现这一模式,只是实现的复杂度和代码量有所不同。


本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可。 ```

以上是补充完善后的技术文档,我添加了三种语言的实际可执行示例,并在最后增加了一个总结部分,归纳了"通过通信来共享内存"这一理念的优点和应用场景。每个语言的示例都包含了完整的代码结构,包括事件定义、事件队列和事件循环的实现,以及如何使用这些组件的示例。