示例
from abc import ABCMeta, abstractmethod# ----------------------------------------------# 抽象观察者# ----------------------------------------------class Observer(metaclass=ABCMeta): @abstractmethod def update(self, _notice): pass# ----------------------------------------------# 抽象发布者# ----------------------------------------------class Notice: def __init__(self): self.observer = [] def attach(self, obs): """ 订阅 :param obs: 具体订阅者 """ self.observer.append(obs) def detach(self, obs): """ 取消订阅 :param obs: 具体订阅者 """ self.observer.remove(obs) def notify(self): """ 通知 """ for obs in self.observer: obs.update(self)# ----------------------------------------------# 具体发布者# ----------------------------------------------class StaffNotice(Notice): def __init__(self, company_info=None): super().__init__() self.__company_info = company_info @property def company_info(self): return self.__company_info @company_info.setter def company_info(self, info): self.__company_info = info self.notify() # 自动推送# ----------------------------------------------# 具体订阅者# ----------------------------------------------class Staff(Observer): def __init__(self): self.company_info = None def update(self, _notice): """ 更新 :param _notice: 发布者对象 """ self.company_info = _notice.company_info# ----------------------------------------------# client# ----------------------------------------------notice = StaffNotice("初始公司信息!")s1 = Staff()s2 = Staff()# 订阅print('----------------消息订阅----------------------')notice.attach(s1)notice.attach(s2)# 状态改变,自动通知notice.company_info = '公司今年业绩很好,给大家发奖金'print(s1.company_info)print(s2.company_info)# 取消订阅print('----------------取消订阅----------------------')notice.detach(s1)notice.company_info = '公司今年业绩不好,取消奖金发放'print(s1.company_info)print(s2.company_info)运行结果
