Introduction to the Producer‑Consumer Model with Queue and JoinableQueue in Python
This article explains the producer‑consumer pattern, why it is needed, and demonstrates two Python implementations using a standard Queue and a JoinableQueue, including full code samples, execution results, and discussion of graceful termination for multiple consumers.
The producer‑consumer model separates data generation (producer) from data processing (consumer) to avoid inefficiencies when their speeds differ. When a producer is faster, consumers must wait for data; when consumers are faster, producers must wait for consumption. Introducing a queue decouples the two, eliminating the need for explicit locks.
Implementation with a standard Queue
By creating a multiprocessing.Queue , producers put items into the queue and consumers retrieve them, achieving decoupling. The following code shows a simple producer and consumer using this approach:
<code>from multiprocessing import Process, Queue
import time
# Consumer method
def consumer(q, name):
while True:
res = q.get()
# if res is None: break
print("%s ate %s" % (name, res))
# Producer method
def producer(q, name, food):
for i in range(3):
time.sleep(1) # simulate production delay
res = "%s %s" % (food, i)
print("%s produced %s" % (name, res))
q.put(res)
if __name__ == "__main__":
q = Queue()
p1 = Process(target=producer, args=(q, "kelly", "watermelon"))
c1 = Process(target=consumer, args=(q, "peter",))
p1.start()
c1.start()
print("main process")
</code>Running this code reveals a problem: the consumer blocks forever on q.get()</) because the producer never sends a termination signal.</p><p>To solve this, a sentinel value (e.g., <code>None ) can be placed in the queue after production, and the consumer exits when it receives the sentinel. Enabling the commented lines and adding q.put(None) after the producers finish resolves the issue.
When multiple consumers are present, a sentinel must be sent for each consumer, which can become cumbersome.
Implementation with JoinableQueue
JoinableQueue extends Queue with task_done() and join() methods, allowing producers to wait until all queued items are processed without explicit sentinel values.
<code>from multiprocessing import Process, JoinableQueue
import time
# Consumer method
def consumer(q, name):
while True:
res = q.get()
if res is None:
break
print("%s ate %s" % (name, res))
q.task_done()
# Producer method
def producer(q, name, food):
for i in range(3):
time.sleep(1)
res = "%s %s" % (food, i)
print("%s produced %s" % (name, res))
q.put(res)
q.join() # wait for all items to be processed
if __name__ == "__main__":
q = JoinableQueue()
p1 = Process(target=producer, args=(q, "kelly", "watermelon"))
p2 = Process(target=producer, args=(q, "kelly2", "banana"))
c1 = Process(target=consumer, args=(q, "peter",))
c2 = Process(target=consumer, args=(q, "peter2",))
c3 = Process(target=consumer, args=(q, "peter3",))
p1.start(); p2.start(); c1.start(); c2.start(); c3.start()
p1.join(); p2.join()
# Send sentinel values for each consumer
q.put(None); q.put(None); q.put(None)
print("main process")
</code>The execution result shows that the producers finish only after all items have been consumed, and the consumers exit cleanly after receiving the sentinel values. This demonstrates a robust way to implement the producer‑consumer pattern in Python.
Python Programming Learning Circle
A global community of Chinese Python developers offering technical articles, columns, original video tutorials, and problem sets. Topics include web full‑stack development, web scraping, data analysis, natural language processing, image processing, machine learning, automated testing, DevOps automation, and big data.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.