Source code for pyloggr.utils.observable

# encoding: utf-8
__author__ = 'stef'

import logging
from abc import ABCMeta, abstractmethod
from future.utils import with_metaclass
from tornado.ioloop import IOLoop
import ujson


logger = logging.getLogger(__name__)


[docs]class Observable(object): """ An observable produces notifications and sends them to observers """ def __init__(self): self.observers = list() self.publisher = None
[docs] def register(self, observer): """ Subscribe an observe for future notifications :param observer: object that implements the Observer interface :type observer: Observer """ if observer not in self.observers: self.observers.append(observer)
[docs] def unregister(self, observer): """ Unsubscribe an observer :param observer: object that implements the Observer interface :type observer: Observer """ if observer in self.observers: self.observers.remove(observer)
[docs] def unregister_all(self): """ Unsubscribe all observers """ if self.observers: del self.observers[:] self.publisher = None
[docs] def notify_observers(self, d, routing_key=None): """ Notify observers that the observable has a message for them :param d: message :type d: dict :param routing_key: unused Note ==== Tornado coroutine """ for observer in self.observers: try: observer.notified(d) except Exception: logger.exception("Swallowing exception that happened in some observer")
[docs]class NotificationProducer(Observable): """ A NotificationProducer produces some notifications and sends them to RabbitMQ """ def register_publisher(self, publisher): self.publisher = publisher def unregister_publisher(self): self.publisher = None
[docs] def notify_observers(self, d, routing_key=None): """ :param d: a message to send to observers :type d: dict :param routing_key: routing key for the message :type routing_key: str Note ==== Tornado coroutine """ for observer in self.observers: # noinspection PyBroadException try: observer.notified(d) except Exception: logger.exception("notify_observers: swallowing exception") if not self.publisher: logger.debug("No notification queue") return logger.debug("Sending notification with routing key '{}'".format(routing_key)) if not routing_key: routing_key = "pyloggr.generic.notification" json_message = ujson.dumps(d) future = self.publisher.publish( exchange='pyloggr.pubsub', body=json_message, routing_key=routing_key, persistent=False ) IOLoop.current().add_future(future, self.after_published)
@classmethod def after_published(cls, future): # noinspection PyBroadException try: future.result() except: logger.exception("Exception happened while publishing notification to RabbitMQ")
[docs]class Observer(with_metaclass(ABCMeta, object)): """ Implemented by classes that should be observers """ @abstractmethod def notified(self, *args, **kwargs): pass