Fork me on GitHub

实现一个生产者多个消费者

生产者和消费者模型是现实编码中经常遇到的一种处理数据流通的模式,一般都是一个生产者对应一个消费者,java中的ArrayBlockingQueue实现就是这样。ArrayBlockingQueue实现了一种阻塞队列,当生产者向队列中填充数据时会先判断队列是否已经满了,如果满了就阻塞当前的生产者线程,否则填充数据并唤醒消费者消费队列中数据;当消费者从队列中取数据时判断队列是否为空,如果为空就阻塞当前的消费者队列,否则消费数据并唤醒生产者生产数据向队·列中填充。下面BoundedBuffer是java文档给出示例代码

class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length) 
         notFull.await();
       items[putptr] = x; 
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0) 
         notEmpty.await();
       Object x = items[takeptr]; 
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   } 
 }

文档中指出ArrayBlockingQueue已经实现了示例代码的情况。代码中采用了一个锁并生成了两个条件来支持两个线程等待队列,这是为了防止刚刚释放锁的线程再次获得锁。 在实际中我们会遇到生产数据速度比消费者消费数据的速度快的情况,为了使整个的流量提升我们必须尽可能的保证两边的速率一致,这时我们可以想到采用一个生产者多个消费者,但是这要怎么实现呢?我们应该保证生产者写数据时所有的消费者都不能读,不然这会导致数据不一致的情况出现,即写读应该是互斥的。我们的消费应该是可以并行的,所以不能加独占锁,我们想到这是否可以采用读写锁ReentrantReaderWriterLock,由于读写锁的读锁是共享锁,所有的读线程都不会互斥,而写锁是独占的可以用于生产者;但是这是两把锁,我们需要同时只有一个线程对队列进行操作,所以只能加一把独占锁,这样看来我们还是只能使用ReentrantLock。
已有的工具不能满足我们的需求时,我们只能另外再想办法了。由于队列是导致互斥的原因,所以我们想能不能每个线程自己拥有一个buffer,写线程在向队列中填充数据之前先向buffer中填充数据,等达到我们设置的容量时再刷新到队列中,读线程也有自己的buffer,在从队列中读数据时,可以一个读线程从队列中读取数据将buffer填充满,消费者再从buffer中取数据消费,这样就可以实现多个读线程同时消费数据,因为读线程都是从自己的buffer中取数据,这样也保证了读写线程是互斥的操作队列,读线程是并行的读取数据,使得整体的流量提升。


最新评论

    还没有人评论...

当当

友情链接

Powered by Python. Copyright © 2017.

鄂ICP备17010875号. All rights reserved.