Evenementensysteem in Python

Welk gebeurtenissysteem voor Python gebruikt u? Ik ben al op de hoogte van pydispatcher, maar ik vroeg me af wat er nog meer kan worden gevonden of vaak wordt gebruikt?

Ik ben niet geïnteresseerd in eventmanagers die deel uitmaken van grote frameworks, ik gebruik liever een kleine kale oplossing die ik gemakkelijk kan uitbreiden.


Antwoord 1, autoriteit 100%

PyPI-pakketten

Vanaf januari 2021 zijn dit de evenementgerelateerde pakketten die beschikbaar zijn op PyPI,
gerangschikt op meest recente releasedatum.

Er is meer

Dat zijn heel wat bibliotheken om uit te kiezen, die heel verschillende terminologie gebruiken (gebeurtenissen, signalen, handlers, verzending van methoden, hooks, …).

Ik probeer een overzicht te houden van de bovenstaande pakketten, plus de technieken die in de antwoorden hier worden genoemd.

Eerst wat terminologie…

Waarnemerpatroon

De meest basale stijl van een gebeurtenissysteem is de ‘bag of handler-methoden’, wat een
eenvoudige implementatie van het Observer-patroon.

Kortom, de handlermethoden (callables) worden opgeslagen in een array en worden elk aangeroepen wanneer de gebeurtenis ‘afgaat’.

Publiceren-Abonneren

Het nadeel van Observer-gebeurtenissystemen is dat u de handlers alleen kunt registreren op het daadwerkelijke evenement
object (of lijst met handlers). Dus bij registratie moet het evenement al bestaan.

Daarom bestaat de tweede stijl van evenementensystemen: de
publiceer-abonneerpatroon.
Hier registreren de handlers zich niet op een gebeurtenisobject (of handlerlijst), maar op een centrale dispatcher.
Ook praten de melders alleen met de coördinator. Waar je naar moet luisteren of wat je moet publiceren is
bepaald door ‘signaal’, wat niets meer is dan een naam (string).

Bemiddelaarpatroon

Misschien ook interessant: het Mediator-patroon.

Haken

Een ‘hook’-systeem wordt meestal gebruikt in de context van applicatie-plug-ins. De
applicatie bevat vaste integratiepunten (haken), en elke plug-in kan
maak verbinding met die haak en voer bepaalde acties uit.

Andere ‘evenementen’

Opmerking: threading.Eventis geen ‘event’ systeem’
in bovenstaande zin. Het is een threadsynchronisatiesysteem waarbij de ene thread wacht tot een andere thread het Event-object ‘signaleert’.

Netwerkberichtenbibliotheken gebruiken vaak ook de term ‘gebeurtenissen’; soms zijn deze vergelijkbaar qua concept; soms niet.
Ze kunnen natuurlijk draad-, proces- en computergrenzen overschrijden. Zie bijv.
pyzmq, pymq,
Twisted, Tornado, gevent, gebeurtenis.

Zwakke referenties

In Python zorgt het vasthouden van een verwijzing naar een methode of object ervoor dat het niet wordt verwijderd
door de vuilnisman. Dit kan wenselijk zijn, maar het kan ook leiden tot geheugenlekken:
de gekoppelde handlers zijn nooit
opgeruimd.

Sommige gebeurtenissystemen gebruiken zwakke referenties in plaats van gewone om dit op te lossen.

Enkele woorden over de verschillende bibliotheken

Eventsystemen in waarnemerstijl:

  • zope.eventlaat zien hoe dit werkt (zie Lennarts antwoord). Opmerking: dit voorbeeld ondersteunt niet eens handler-argumenten.
  • LongPoke’s ‘callable list’-implementatie laat zien dat een dergelijk evenementensysteem zeer minimalistisch kan worden geïmplementeerd door list.
  • Felk’s variant EventHookzorgt ook voor de handtekeningen van bellers en bellers.
  • spassig’s EventHook(Michael Foord’s Event Pattern) is een eenvoudige implementatie.
  • Josip’s Valued Lessons Event-klasseis in principe hetzelfde, maar gebruikt een setin plaats van een listom de tas op te bergen, en implementeert __call__die beide redelijke toevoegingen zijn.
  • PyNotifyis qua concept vergelijkbaar en biedt ook aanvullende concepten van variabelen en voorwaarden (‘variabele gewijzigde gebeurtenis’). Startpagina is niet functioneel.
  • axelis in feite een bag-of-handlers met meer functies met betrekking tot threading, foutafhandeling , …
  • python-dispatchvereist dat de even source-klassen afkomstig zijn van pydispatch.Dispatcher.
  • buslaneis gebaseerd op klassen, ondersteunt enkele of meerdere handlers en faciliteert uitgebreide typehints.
  • Pithikos’ Observer/Eventis een lichtgewicht ontwerp.

Bibliotheken publiceren-abonneren:

  • blinkerheeft een aantal handige functies, zoals automatische ontkoppeling en filtering op basis van afzender.
  • PyPubSubis een stabiel pakket en belooft “geavanceerde functies die het debuggen en onderhouden van onderwerpen vergemakkelijken en berichten”.
  • pymitteris een Python-poort van Node.js EventEmitter2 en biedt naamruimten, wildcards en TTL.
  • PyDispatcherlijkt de nadruk te leggen op flexibiliteit met betrekking tot veel-op-veel-publicaties enz. Ondersteunt zwakke referenties.
  • louieis een herwerkte PyDispatcher en zou “in een grote verscheidenheid aan contexten” moeten werken.
  • pypydispatcheris gebaseerd op (je raadt het al…) PyDispatcher en werkt ook in PyPy.
  • django.dispatchis een herschreven PyDispatcher “met een meer beperkte interface, maar hogere prestaties”.
  • pyeventdispatcheris gebaseerd op de event-dispatcher van het Symfony-framework van PHP.
  • dispatcheris geëxtraheerd uit django.dispatch maar wordt behoorlijk oud.
  • Cristian Garcia’s EventMangeris een erg korte implementatie.

Overige:

  • pluggybevat een hook-systeem dat wordt gebruikt door pytest-plug-ins.
  • RxPy3implementeert het waarneembare patroon en maakt het mogelijk om gebeurtenissen samen te voegen, opnieuw te proberen, enz.
  • Qt’s signalen en slots zijn verkrijgbaar bij PyQt
    of PySide2. Ze werken als callback wanneer ze in dezelfde thread worden gebruikt,
    of als gebeurtenissen (met behulp van een gebeurtenislus) tussen twee verschillende threads. Signalen en slots hebben de beperking dat ze
    werken alleen in objecten van klassen die zijn afgeleid van QObject.

Antwoord 2, autoriteit 45%

Ik heb het op deze manier gedaan:

class Event(list):
    """Event subscription.
    A list of callable objects. Calling an instance of this will cause a
    call to each item in the list in ascending order by index.
    Example Usage:
    >>> def f(x):
    ...     print 'f(%s)' % x
    >>> def g(x):
    ...     print 'g(%s)' % x
    >>> e = Event()
    >>> e()
    >>> e.append(f)
    >>> e(123)
    f(123)
    >>> e.remove(f)
    >>> e()
    >>> e += (f, g)
    >>> e(10)
    f(10)
    g(10)
    >>> del e[0]
    >>> e(2)
    g(2)
    """
    def __call__(self, *args, **kwargs):
        for f in self:
            f(*args, **kwargs)
    def __repr__(self):
        return "Event(%s)" % list.__repr__(self)

Echter, zoals met al het andere dat ik heb gezien, is hier geen automatisch gegenereerde pydoc voor, en geen handtekeningen, wat echt klote is.


Antwoord 3, autoriteit 29%

We gebruiken een EventHook zoals voorgesteld door Michael Foord in zijn Event Pattern:

Voeg EventHooks toe aan je lessen met:

class MyBroadcaster()
    def __init__():
        self.onChange = EventHook()
theBroadcaster = MyBroadcaster()
# add a listener to the event
theBroadcaster.onChange += myFunction
# remove listener from the event
theBroadcaster.onChange -= myFunction
# fire event
theBroadcaster.onChange.fire()

We voegen de functionaliteit toe om alle luisteraars van een object te verwijderen naar Michaels klasse en eindigden met dit:

class EventHook(object):
    def __init__(self):
        self.__handlers = []
    def __iadd__(self, handler):
        self.__handlers.append(handler)
        return self
    def __isub__(self, handler):
        self.__handlers.remove(handler)
        return self
    def fire(self, *args, **keywargs):
        for handler in self.__handlers:
            handler(*args, **keywargs)
    def clearObjectHandlers(self, inObject):
        for theHandler in self.__handlers:
            if theHandler.im_self == inObject:
                self -= theHandler

Antwoord 4, autoriteit 10%

Ik gebruik zope.event. Het zijn de meest kale botten die je je kunt voorstellen. 🙂
In feite is hier de volledige broncode:

subscribers = []
def notify(event):
    for subscriber in subscribers:
        subscriber(event)

Houd er rekening mee dat u bijvoorbeeld geen berichten tussen processen kunt verzenden. Het is geen berichtensysteem, alleen een evenementensysteem, niets meer en niets minder.


Antwoord 5, autoriteit 7%

Ik vond dit kleine script op Waardevolle lessen. Het lijkt precies de juiste eenvoud / vermogensverhouding te hebben waar ik naar op zoek ben. Peter Thatcher is de auteur van de volgende code (er wordt geen licentie genoemd).

class Event:
    def __init__(self):
        self.handlers = set()
    def handle(self, handler):
        self.handlers.add(handler)
        return self
    def unhandle(self, handler):
        try:
            self.handlers.remove(handler)
        except:
            raise ValueError("Handler is not handling this event, so cannot unhandle it.")
        return self
    def fire(self, *args, **kargs):
        for handler in self.handlers:
            handler(*args, **kargs)
    def getHandlerCount(self):
        return len(self.handlers)
    __iadd__ = handle
    __isub__ = unhandle
    __call__ = fire
    __len__  = getHandlerCount
class MockFileWatcher:
    def __init__(self):
        self.fileChanged = Event()
    def watchFiles(self):
        source_path = "foo"
        self.fileChanged(source_path)
def log_file_change(source_path):
    print "%r changed." % (source_path,)
def log_file_change2(source_path):
    print "%r changed!" % (source_path,)
watcher              = MockFileWatcher()
watcher.fileChanged += log_file_change2
watcher.fileChanged += log_file_change
watcher.fileChanged -= log_file_change2
watcher.watchFiles()

Antwoord 6, autoriteit 5%

Hier is een minimaal ontwerp dat goed zou moeten werken. Wat je moet doen is gewoon Observerin een klas erven en daarna observe(event_name, callback_fn)gebruiken om naar een specifieke gebeurtenis te luisteren. Telkens wanneer die specifieke gebeurtenis ergens in de code wordt geactiveerd (bijv. Event('USB connected')), wordt de bijbehorende callback geactiveerd.

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observed_events = []
    def observe(self, event_name, callback_fn):
        self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})
class Event():
    def __init__(self, event_name, *callback_args):
        for observer in Observer._observers:
            for observable in observer._observed_events:
                if observable['event_name'] == event_name:
                    observable['callback_fn'](*callback_args)

Voorbeeld:

class Room(Observer):
    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # DON'T FORGET THIS
    def someone_arrived(self, who):
        print(who + " has arrived!")
# Observe for specific event
room = Room()
room.observe('someone arrived',  room.someone_arrived)
# Fire some events
Event('someone left',    'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted',  'Lenard')

Antwoord 7, autoriteit 4%

Ik heb een klasse EventManagergemaakt (code aan het einde). De syntaxis is als volgt:

#Create an event with no listeners assigned to it
EventManager.addEvent( eventName = [] )
#Create an event with listeners assigned to it
EventManager.addEvent( eventName = [fun1, fun2,...] )
#Create any number event with listeners assigned to them
EventManager.addEvent( eventName1 = [e1fun1, e1fun2,...], eventName2 = [e2fun1, e2fun2,...], ... )
#Add or remove listener to an existing event
EventManager.eventName += extra_fun
EventManager.eventName -= removed_fun
#Delete an event
del EventManager.eventName
#Fire the event
EventManager.eventName()

Hier is een voorbeeld:

def hello(name):
    print "Hello {}".format(name)
def greetings(name):
    print "Greetings {}".format(name)
EventManager.addEvent( salute = [greetings] )
EventManager.salute += hello
print "\nInitial salute"
EventManager.salute('Oscar')
print "\nNow remove greetings"
EventManager.salute -= greetings
EventManager.salute('Oscar')

Uitvoer:

Eerste groet
Groetjes Oscar
Hallo Oscar

Verwijder nu de begroetingen
Hallo Oscar

EventManger-code:

class EventManager:
    class Event:
        def __init__(self,functions):
            if type(functions) is not list:
                raise ValueError("functions parameter has to be a list")
            self.functions = functions
        def __iadd__(self,func):
            self.functions.append(func)
            return self
        def __isub__(self,func):
            self.functions.remove(func)
            return self
        def __call__(self,*args,**kvargs):
            for func in self.functions : func(*args,**kvargs)
    @classmethod
    def addEvent(cls,**kvargs):
        """
        addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... )
        creates events using **kvargs to create any number of events. Each event recieves a list of functions,
        where every function in the list recieves the same parameters.
        Example:
        def hello(): print "Hello ",
        def world(): print "World"
        EventManager.addEvent( salute = [hello] )
        EventManager.salute += world
        EventManager.salute()
        Output:
        Hello World
        """
        for key in kvargs.keys():
            if type(kvargs[key]) is not list:
                raise ValueError("value has to be a list")
            else:
                kvargs[key] = cls.Event(kvargs[key])
        cls.__dict__.update(kvargs)

Antwoord 8, autoriteit 3%

Je kunt eens kijken op pymitter(pypi). Het is een kleine benadering met één bestand (~ 250 locs)
“met namespaces, wildcards en TTL”.

Hier is een eenvoudig voorbeeld:

from pymitter import EventEmitter
ee = EventEmitter()
# decorator usage
@ee.on("myevent")
def handler1(arg):
   print "handler1 called with", arg
# callback usage
def handler2(arg):
    print "handler2 called with", arg
ee.on("myotherevent", handler2)
# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"
ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"

Antwoord 9, autoriteit 3%

Ik heb een variatie gemaakt op de minimalistische benadering van Longpoke die ook zorgt voor de handtekeningen voor zowel bellers als bellers:

class EventHook(object):
    '''
    A simple implementation of the Observer-Pattern.
    The user can specify an event signature upon inizializazion,
    defined by kwargs in the form of argumentname=class (e.g. id=int).
    The arguments' types are not checked in this implementation though.
    Callables with a fitting signature can be added with += or removed with -=.
    All listeners can be notified by calling the EventHook class with fitting
    arguments.
    >>> event = EventHook(id=int, data=dict)
    >>> event += lambda id, data: print("%d %s" % (id, data))
    >>> event(id=5, data={"foo": "bar"})
    5 {'foo': 'bar'}
    >>> event = EventHook(id=int)
    >>> event += lambda wrong_name: None
    Traceback (most recent call last):
        ...
    ValueError: Listener must have these arguments: (id=int)
    >>> event = EventHook(id=int)
    >>> event += lambda id: None
    >>> event(wrong_name=0)
    Traceback (most recent call last):
        ...
    ValueError: This EventHook must be called with these arguments: (id=int)
    '''
    def __init__(self, **signature):
        self._signature = signature
        self._argnames = set(signature.keys())
        self._handlers = []
    def _kwargs_str(self):
        return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())
    def __iadd__(self, handler):
        params = inspect.signature(handler).parameters
        valid = True
        argnames = set(n for n in params.keys())
        if argnames != self._argnames:
            valid = False
        for p in params.values():
            if p.kind == p.VAR_KEYWORD:
                valid = True
                break
            if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
                valid = False
                break
        if not valid:
            raise ValueError("Listener must have these arguments: (%s)"
                             % self._kwargs_str())
        self._handlers.append(handler)
        return self
    def __isub__(self, handler):
        self._handlers.remove(handler)
        return self
    def __call__(self, *args, **kwargs):
        if args or set(kwargs.keys()) != self._argnames:
            raise ValueError("This EventHook must be called with these " +
                             "keyword arguments: (%s)" % self._kwargs_str())
        for handler in self._handlers[:]:
            handler(**kwargs)
    def __repr__(self):
        return "EventHook(%s)" % self._kwargs_str()

Antwoord 10

Als ik code in pyQt gebruik, gebruik ik het QT-sockets/signalen-paradigma, hetzelfde geldt voor django

Als ik asynchrone I/O doe, gebruik ik de native select-module

Als ik een SAX-python-parser gebruik, gebruik ik de gebeurtenis-API van SAX. Het lijkt er dus op dat ik het slachtoffer ben van de onderliggende API 🙂

Misschien moet je jezelf afvragen wat je verwacht van het raamwerk/module voor evenementen. Mijn persoonlijke voorkeur gaat uit naar het gebruik van het Socket/Signal-paradigma van QT. meer informatie hierover vindt u hier


Antwoord 11

Hier is nog een moduleter overweging. Het lijkt een haalbare keuze voor meer veeleisende toepassingen.

Py-notify is een Python-pakket
tools aanreiken voor implementatie
Observer programmeerpatroon. Deze
tools omvatten signalen, voorwaarden en
variabelen.

Signalen zijn lijsten met handlers die:
gebeld wanneer het signaal wordt uitgezonden.
Voorwaarden zijn in principe booleaans
variabelen gekoppeld aan een signaal dat
wordt uitgezonden wanneer de voorwaarde staat:
veranderingen. Ze kunnen worden gecombineerd met:
standaard logische operatoren (niet, en,
enz.) in samengestelde omstandigheden.
Variabelen kunnen, in tegenstelling tot omstandigheden, vasthouden
elk Python-object, niet alleen booleans,
maar ze kunnen niet worden gecombineerd.


Antwoord 12

Als je een eventbus nodig hebt die over proces- of netwerkgrenzen heen werkt, kun je PyMQproberen. Het ondersteunt momenteel pub/sub, berichtenwachtrijen en synchrone RPC. De standaardversie werkt bovenop een Redis-backend, dus je hebt een draaiende Redis-server nodig. Er is ook een in-memory backend om te testen. Je kunt ook je eigen backend schrijven.

import pymq
# common code
class MyEvent:
    pass
# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
    print('event received')
# publisher code
pymq.publish(MyEvent())
# you can also customize channels
pymq.subscribe(on_event, channel='my_channel')
pymq.publish(MyEvent(), channel='my_channel')

Om het systeem te initialiseren:

from pymq.provider.redis import RedisConfig
# starts a new thread with a Redis event loop
pymq.init(RedisConfig())
# main application control loop
pymq.shutdown()

Disclaimer: ik ben de auteur van deze bibliotheek


Antwoord 13

Als je meer gecompliceerde dingen wilt doen, zoals het samenvoegen van gebeurtenissen of het opnieuw proberen, kun je het Observable-patroon gebruiken en een volwassen bibliotheek die dat implementeert. https://github.com/ReactiveX/RxPY. Observables zijn heel gebruikelijk in Javascript en Java en zijn erg handig om te gebruiken voor sommige asynchrone taken.

from rx import Observable, Observer
def push_five_strings(observer):
        observer.on_next("Alpha")
        observer.on_next("Beta")
        observer.on_next("Gamma")
        observer.on_next("Delta")
        observer.on_next("Epsilon")
        observer.on_completed()
class PrintObserver(Observer):
    def on_next(self, value):
        print("Received {0}".format(value))
    def on_completed(self):
        print("Done!")
    def on_error(self, error):
        print("Error Occurred: {0}".format(error))
source = Observable.create(push_five_strings)
source.subscribe(PrintObserver())

UITVOER:

Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!

Antwoord 14

Enige tijd geleden heb ik een bibliotheek geschreven die nuttig voor je zou kunnen zijn.
Hiermee kunt u lokale en wereldwijde luisteraars hebben, meerdere verschillende manieren om ze te registreren, uitvoeringsprioriteit enzovoort.

from pyeventdispatcher import register
register("foo.bar", lambda event: print("second"))
register("foo.bar", lambda event: print("first "), -100)
dispatch(Event("foo.bar", {"id": 1}))
# first second

Kijk eens naar pyeventdispatcher


Antwoord 15

Een ander handig pakket is events. Het bevat de kern van het abonnement op evenementen en het afvuren van evenementen en voelt aan als een “natuurlijk” onderdeel van de taal. Het lijkt op de C#-taal, die een handige manier biedt om gebeurtenissen te declareren, erop te abonneren en te activeren. Technisch gezien is een gebeurtenis een “slot” waaraan callback-functies (event-handlers) kunnen worden gekoppeld – een proces dat wordt aangeduid als abonneren op een gebeurtenis.

# Define a callback function
def something_changed(reason):
    print "something changed because %s" % reason
# Use events module to create an event and register one or more callback functions
from events import Events
events = Events()
events.on_change += something_changed

Wanneer de gebeurtenis wordt geactiveerd, worden alle gekoppelde gebeurtenishandlers in volgorde aangeroepen. Voer een oproep op het slot uit om de gebeurtenis te activeren:

events.on_change('it had to happen')

Dit levert het volgende op:

'something changed because it had to happen'

Meer documentatie is te vinden in de github-repoof de documentatie.


Antwoord 16

Je kunt de module buslaneproberen.

Deze bibliotheek maakt de implementatie van een op berichten gebaseerd systeem eenvoudiger. Het ondersteunt commando’s (enkele handler) en gebeurtenissen (0 of meerdere handlers). Buslane gebruikt annotaties van het Python-type om de handler correct te registreren.

Eenvoudig voorbeeld:

from dataclasses import dataclass
from buslane.commands import Command, CommandHandler, CommandBus
@dataclass(frozen=True)
class RegisterUserCommand(Command):
    email: str
    password: str
class RegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):
    def handle(self, command: RegisterUserCommand) -> None:
        assert command == RegisterUserCommand(
            email='[email protected]',
            password='secret',
        )
command_bus = CommandBus()
command_bus.register(handler=RegisterUserCommandHandler())
command_bus.execute(command=RegisterUserCommand(
    email='[email protected]',
    password='secret',
))

Om buslane te installeren, gebruik je gewoon pip:

$ pip install buslane

Other episodes