用 C++ 模拟 Mesa Monitor

Monitor(管程)是并发程序的同步方式之一。Monitor 至少有两类,Mesa monitor 和 Hoare monitor。Mesa monitor 在 notify 之后会继续运行,Hoare monitor 在 notify 之后会进行 context switch,来到 wait 的地方开始运行,所以在写 wait 的时候,Mesa monitor 需要这样:


while (locked)
    wait();
        

但是 Hoare Monitor 只需要这样:


if (locked)
    wait();
        

目前还是 Mesa Monitor 最为常见。

实现 monitor 需要语言层面的支持。Java 有 synchronized 关键字,可以用来实现 monitor,但是 C++ 就没有了,不过还是可以用 condition variable 和 RAII,来模拟 Mesa monitor。


#include <mutex>
#include <condition_variable>

class Monitor{
public:
  Monitor():lk{m, std::defer_lock}{}
  void notify(){cv.notify_one();}
  void broadcast(){cv.notify_all();}
  template<typename F>
  void wait(F pred){cv.wait(lk, pred);};
  std::unique_lock<std::mutex> synchronize()
  {
    return std::unique_lock<std::mutex>{m};
  }
private:
  std::mutex m;
  std::unique_lock<std::mutex> lk;
  std::condition_variable cv;
};
        

来看一个简单的例子,用 monitor 实现互斥锁。虽然这里例子没什么实际意义,但是足够简单:

// To compile: g++ -std=c++14 -lpthread MonitorLock.cpp
        
        #include "Monitor.h"
        #include <thread>
        #include <iostream>
        
        using namespace std;
        class MonitorLock{
        public:
          void lock()
          {
            auto lk = m.synchronize(); // unique_lock 会通过 RAII 自动 unlock
            m.wait([&](){return !locked;});
            locked = true;
          }
          void unlock()
          {
            auto lk = m.synchronize();
            locked = false;
            m.notify();
          }
        private:
          Monitor m;
          bool locked = false;
        };
        int main()
        {
          MonitorLock m;
          thread t1{[&](){
            for (int i = 1; i <= 30; i++){
              m.lock();
              cout << "t1: " << i << endl;
              m.unlock();
            }
          }};
          thread t2{[&](){
            for (int i = 1; i <= 30; i++){
              m.lock();
              cout << "t2: " << i << endl;
              m.unlock();
            }
          }};
          t1.join();
          t2.join();
          return 0;
        }
        

另一个例子稍微实用一点,解决生产者消费者问题。

// To compile: g++ -std=c++14 -lpthread ProducerConsumer.cpp
        
        #include "Monitor.h"
        #include <thread>
        #include <iostream>
        #include <queue>
        
        using namespace std;
        template<typename T, int N>
        class ProducerConsumer{
        public:
          void insert(T& item)
          {
            auto lk = m.synchronize(); 
            m.wait([&](){return items.size() < N;}); // if(!full)
            items.push(item);
            if(items.size()  == 1){
              m.notify();
            }
            cout << "insert: " << item << endl;
          }
          T remove()
          {
            auto lk = m.synchronize();
            m.wait([&](){return items.size() > 0;}); // if(!empty)
            auto item = items.front();
            items.pop();
            if(items.size() == N-1){
              m.notify();
            }
            cout << "consume: " << item << endl;
            return item;
          }
        private:
          Monitor m;
          std::queue<T> items;
        };
        int main()
        {
          ProducerConsumer<int, 10> q;
          thread p{[&](){
            for(int i = 1; i < 30; i++){
              q.insert(i);
            }
          }};
          thread c{[&](){
            for(int i = 1; i < 30; i++){
              auto item = q.remove();
            }
          }};
          p.join();
          c.join();
          return 0;
        }
        

上面的例子只适用于单生产者单消费者问题,如果要解决多生产者多消费者问题,一种做法是设置一个 threshold:

// insert()
        if (items.size() >= comsumerThreshold)
          m.broadcast();
        // remove()
        if(items.size() <= producerThreshold)
          m.broadcast()
        

或者更细粒度的控制 condition variable 的使用:

// To compile: g++ -std=c++14 -lpthread ProducerConsumer.cpp
        
        #include <thread>
        #include <iostream>
        #include <queue>
        #include <mutex>
        #include <condition_variable>
        
        using namespace std;
        template<typename T, int N>
        class ProducerConsumer{
        public:
          void insert(T& item)
          {
            std::unique_lock<std::mutex> lk{m};
            insert_cv.wait(lk, [&](){return items.size() < N;}); // if(!full)
        
            items.push(item);
            remove_cv.notify_one();// 如果这里是Hoare monitor就会跳转到正在wait的remove函数,可惜这里是mesa
            cout << "insert: " << item << endl;
          }
          T remove()
          {
            std::unique_lock<std::mutex> lk{m};
            remove_cv.wait(lk, [&](){return items.size() > 0;}); // if(!empty)
        
            auto item = items.front();
            items.pop();
            insert_cv.notify_one();
            cout << "consume: " << item << endl;
            return item;
          }
        private:
          mutex m;
          condition_variable insert_cv, remove_cv;
          std::queue<T> items;
        };
        int main()
        {
          ProducerConsumer<int, 10> q;
          thread p{[&](){
            for(int i = 1; i < 30; i++){
              q.insert(i);
            }
          }};
          thread c{[&](){
            for(int i = 1; i < 30; i++){
              auto item = q.remove();
            }
          }};
          p.join();
          c.join();
          return 0;
        }

©️ 2017-2019 奈卜拉