1. 简介
生产者和消费者问题是线程模型中的经典问题:
- 生产者和消费者共享同一个存储空间
- 生产者往存储空间中添加产品,消费者从存储空间中取走产品
- 当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞
Python 的内置模块 queue 提供了对生产者和消费者模型的支持,模块 queue 定义了类 Queue,类 Queue 表示一个被生产者和消费者共享的队列,类 Queue 提供如下常用方法:
2. 实现生产者消费者模型
创建生产者线程和消费者线程,使用一个共享队列连接这两个线程,代码如下:
- 1
- 2
- 3
- 4
- 导入 threading 模块和 queue 模块
- 创建共享队列 q
- 1
- 2
- 3
- 4
- 创建生产者线程的入口函数 produce
- 生产者生产 8 个数据
- 调用 q.put(item) 将生产的数据放入到共享队列 q 中
- 1
- 2
- 3
- 4
- 创建消费者线程的入口函数 consume
- 消费者消费 8 个数据
- 调用 q.get() 从共享队列 q 中取走数据
- 1
- 2
- 3
- 4
- 5
- 6
- 创建生产者线程 producer,线程入口为 produce
- 创建消费者线程 consumer,线程入口为 consume
- 启动生产者线程和消费者线程,并等待它们结束
运行程序,输出结果如下:
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 生产者生产了 8 个数据:a、b、c、d、e、f、g、h
- 消费者取走了 8 个数据:a、b、c、d、e、f、g、h
3. 实现生产者、计算者、消费者模型
创建生产者、计算者、消费者线程:
- 生产者生产 8 个数据
- 计算者对生产者输出的数据进行加工,将加工后的数据送往消费者
- 消费者取走计算者输出的数据
- 1
- 2
- 3
- 4
- 5
- 导入模块 threading 和模块 queue
- 使用两个共享队列连接这三个线程
- 共享队列 q0 连接生产者和计算者
- 共享队列 q1 连接计算者和消费者
- 1
- 2
- 3
- 4
- 创建生产者线程的入口函数 produce
- 生产者生产 8 个数据
- 调用 q0.put(item) 将生产的数据放入到共享队列 q0 中
- 1
- 2
- 3
- 4
- 5
- 创建计算者线程的入口函数 compute
- 调用 q0.get() 读取生产者输出数据,并进行加工
- 调用 q1.put(item) 将加工后的数据放入到共享队列 q1 中
- 1
- 2
- 3
- 4
- 创建消费者线程的入口函数 consume
- 消费者消费 8 个数据
- 调用 q1.get() 从共享队列 q1 中取走数据
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 创建生产者线程 producer,线程入口为 produce
- 创建计算者线程 computer,线程入口为 compute
- 创建消费者线程 consumer,线程入口为 consume
- 启动生产者线程、计算者线程、消费者线程,并等待它们结束
运行程序,输出结果如下:
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 生产者生产了 8 个数据:a、b、c、d、e、f、g、h
- 计算者将数据加工为:A、B、C、D、E、F、G、H
- 消费者取走了 8 个数据:A、B、C、D、E、F、G、H
4. 同步生产者与消费者的推进速度
在生产者、消费者模型中,可能会存在两者推进速度不匹配的问题:生产者生产数据的速度较快,但是,消费者取走数据的速度较慢。
可以使用 queue 的 task_done() 方法和 join() 方法同步生产者与消费者的推进速度:
- 生产者调用 join() 方法,等待队列中所有的数据被取走
- 消费者调用 task_done() 方法,表示取走了队列中的一项数据,当队列为空时,唤醒阻塞在 join() 方法中的生产者
- 1
- 2
- 3
- 4
- 导入 threading 模块和 queue 模块
- 创建共享队列 q
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 创建生产者线程的入口函数 produce
- 首先,生产 4 个数据:A、B、C、D
- 调用 q.put(item) 将它们放入到队列 q 中
- 调用 q.join() 等待消费者将它们全部取走
- 然后,生产 4 个数据:E、F、G、G
- 调用 q.put(item) 将它们放入到队列 q 中
- 调用 q.join() 等待消费者将它们全部取走
- 1
- 2
- 3
- 4
- 5
- 创建消费者线程的入口函数 consume
- 调用 q.get() 从队列 q 中取走一个数据
- 调用 q.task_done(),表示已经从队列 q 中取走了一个数据,当队列为空时,唤醒生产者
- 1
- 2
- 3
- 4
- 创建生产者线程 producer,线程入口为 produce
- 创建消费者线程 consumer,线程入口为 consume
- 启动生产者线程和消费者线程,并等待它们结束
运行程序,输出结果如下:
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 生产者生产第一批数据 A、B、C、D,消费者将其取走
- 当第一批数据完全被消费者取走后,生产者才开始生产第二批数据
- 生产者生产第二批数据 E、F、G、H,消费者将其取走