Fundamentals 11 min read

Observer Pattern and Publish‑Subscribe Pattern in Python with Practical Code Examples

This article explains the concepts, differences, and implementation approaches of the Observer and Publish‑Subscribe design patterns, and provides ten practical Python code examples covering GUI, networking, asynchronous, and distributed scenarios.

Test Development Learning Exchange
Test Development Learning Exchange
Test Development Learning Exchange
Observer Pattern and Publish‑Subscribe Pattern in Python with Practical Code Examples

Observer pattern and publish‑subscribe pattern are common software design patterns used to manage communication and event handling between objects.

Observer pattern defines a one‑to‑many dependency: when the subject’s state changes, all registered observers are notified. It is typically implemented by a subject maintaining a list of observers and invoking their update methods.

Publish‑subscribe pattern also creates a one‑to‑many relationship but introduces a message broker (or queue) to decouple publishers from subscribers, allowing flexible distribution of events.

Both patterns have many Python application scenarios, such as GUI updates, network data changes, game event handling, distributed systems, asynchronous processing, and logging.

The article provides ten practical Python examples demonstrating these patterns:

1. Observer pattern – simple event notification

class Subject:
    def __init__(self):
        self.observers = []

    def attach(self, observer):
        self.observers.append(observer)

    def detach(self, observer):
        self.observers.remove(observer)

    def notify(self, message):
        for observer in self.observers:
            observer.update(message)

class Observer:
    def update(self, message):
        print("Received message:", message)

subject = Subject()
observer1 = Observer()
observer2 = Observer()
subject.attach(observer1)
subject.attach(observer2)
subject.notify("Hello, observers!")

2. Observer pattern – event dispatcher

class Event:
    def __init__(self, name):
        self.name = name

class EventHandler:
    def handle(self, event):
        print("Handling event:", event.name)

class EventDispatcher:
    def __init__(self):
        self.handlers = []

    def add_handler(self, handler):
        self.handlers.append(handler)

    def dispatch(self, event):
        for handler in self.handlers:
            handler.handle(event)

event1 = Event("Event 1")
event2 = Event("Event 2")
handler1 = EventHandler()
handler2 = EventHandler()
dispatcher = EventDispatcher()
dispatcher.add_handler(handler1)
dispatcher.add_handler(handler2)
dispatcher.dispatch(event1)
dispatcher.dispatch(event2)

3. Publish‑subscribe using the pubsub library

from pubsub import pub

def subscriber(message):
    print("Received message:", message)

pub.subscribe(subscriber, "topic")
pub.sendMessage("topic", message="Hello, subscribers!")

4. Publish‑subscribe using redis as a message broker

import redis

class Publisher:
    def __init__(self):
        self.redis = redis.Redis()

    def publish(self, channel, message):
        self.redis.publish(channel, message)

class Subscriber:
    def __init__(self):
        self.redis = redis.Redis()

    def subscribe(self, channel):
        pubsub = self.redis.pubsub()
        pubsub.subscribe(channel)
        for message in pubsub.listen():
            print("Received message:", message["data"])

publisher = Publisher()
subscriber = Subscriber()
publisher.publish("channel", "Hello, subscribers!")
subscriber.subscribe("channel")

5. Observer pattern with tkinter GUI

import tkinter as tk

class Observable:
    def __init__(self):
        self.observers = []

    def attach(self, observer):
        self.observers.append(observer)

    def detach(self, observer):
        self.observers.remove(observer)

    def notify(self, event):
        for observer in self.observers:
            observer.update(event)

class Observer:
    def update(self, event):
        print("Received event:", event)

class App(tk.Tk):
    def __init__(self):
        super().__init__()
        self.observable = Observable()
        self.observer = Observer()
        self.observable.attach(self.observer)
        self.button = tk.Button(self, text="Click me", command=self.handle_click)
        self.button.pack()

    def handle_click(self):
        event = "Button clicked"
        self.observable.notify(event)

app = App()
app.mainloop()

6. Observer pattern with asyncio for asynchronous handling

import asyncio

class Subject:
    def __init__(self):
        self.observers = []

    def attach(self, observer):
        self.observers.append(observer)

    def detach(self, observer):
        self.observers.remove(observer)

    def notify(self, message):
        for observer in self.observers:
            observer.update(message)

class Observer:
    def update(self, message):
        print("Received message:", message)

async def main():
    subject = Subject()
    observer1 = Observer()
    observer2 = Observer()
    subject.attach(observer1)
    subject.attach(observer2)
    subject.notify("Hello, observers!")

asyncio.run(main())

7. Publish‑subscribe using flask web framework

from flask import Flask, request
from flask.signals import Namespace

app = Flask(__name__)
my_signals = Namespace()
event_signal = my_signals.signal("event")

@app.route("/publish", methods=["POST"])
def publish():
    message = request.json["message"]
    event_signal.send(message)
    return "Message published"

def subscriber(message):
    print("Received message:", message)

event_signal.connect(subscriber)

if __name__ == "__main__":
    app.run()

8. Observer pattern with threading for multithreaded events

import threading

class Subject:
    def __init__(self):
        self.observers = []

    def attach(self, observer):
        self.observers.append(observer)

    def detach(self, observer):
        self.observers.remove(observer)

    def notify(self, message):
        for observer in self.observers:
            observer.update(message)

class Observer:
    def update(self, message):
        print("Received message:", message)

def worker(subject):
    subject.notify("Hello, observers!")

subject = Subject()
observer1 = Observer()
observer2 = Observer()
subject.attach(observer1)
subject.attach(observer2)

thread = threading.Thread(target=worker, args=(subject,))
thread.start()
thread.join()

9. Publish‑subscribe using zeromq for distributed messaging

import zmq
import threading

context = zmq.Context()

def publisher():
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5555")
    socket.send_string("Hello, subscribers!")

def subscriber():
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:5555")
    socket.setsockopt(zmq.SUBSCRIBE, b"")
    message = socket.recv_string()
    print("Received message:", message)

publisher_thread = threading.Thread(target=publisher)
subscriber_thread = threading.Thread(target=subscriber)

publisher_thread.start()
subscriber_thread.start()

publisher_thread.join()
subscriber_thread.join()

10. Observer pattern with PySide2 Qt signals

from PySide2.QtCore import QObject, Signal
from PySide2.QtWidgets import QApplication, QPushButton

class Subject(QObject):
    event = Signal(str)

class Observer(QObject):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def update(self, message):
        print(f"{self.name} received message:", message)

app = QApplication([])
subject = Subject()
observer1 = Observer("Observer 1")
observer2 = Observer("Observer 2")
subject.event.connect(observer1.update)
subject.event.connect(observer2.update)
button = QPushButton("Click me")
button.clicked.connect(lambda: subject.event.emit("Hello, observers!"))
button.show()
app.exec_()

These examples illustrate how the Observer and Publish‑Subscribe patterns can be applied across different domains to build scalable, loosely‑coupled systems.

design patternsdistributed systemsAsynchronous ProgrammingEvent HandlingObserver PatternPublish-Subscribe
Test Development Learning Exchange
Written by

Test Development Learning Exchange

Test Development Learning Exchange

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.