Ik heb meerdere threads die hetzelfde proces uitvoeren en die elkaar moeten kunnen informeren dat er de komende n seconden niet aan iets moet worden gewerkt. Het is echter niet het einde van de wereld als ze dat wel doen.
Mijn doel is om een string en een TTL door te geven aan de cache en alle strings die in de cache zitten als een lijst op te halen. De cache kan in het geheugen blijven en de TTL’s zijn niet langer dan 20 seconden.
Heeft iemand suggesties voor hoe dit kan worden bereikt?
Antwoord 1, autoriteit 100%
Als je geen derde bibliotheken wilt gebruiken, kun je nog een parameter aan je dure functie toevoegen: ttl_hash=None
. Deze nieuwe parameter is de zogenaamde “tijdgevoelige hash”, het enige doel is om lru_cache
te beïnvloeden.
Bijvoorbeeld:
from functools import lru_cache
import time
@lru_cache()
def my_expensive_function(a, b, ttl_hash=None):
del ttl_hash # to emphasize we don't use it and to shut pylint up
return a + b # horrible CPU load...
def get_ttl_hash(seconds=3600):
"""Return the same value withing `seconds` time period"""
return round(time.time() / seconds)
# somewhere in your code...
res = my_expensive_function(2, 2, ttl_hash=get_ttl_hash())
# cache will be updated once in an hour
Antwoord 2, autoriteit 99%
De OP gebruikt python 2.7 maar als je python 3 gebruikt, is ExpiringDict
genoemd in het geaccepteerde antwoord momenteel, nou ja, verlopen. De laatste toezegging aan de github-repowas 17 juni 2017 en er is een openstaand probleem dat werkt niet met Python 3.5
Vanaf 1 september 2020 is er een recenter onderhouden project cachetools.
pip install cachetools
from cachetools import TTLCache
cache = TTLCache(maxsize=10, ttl=360)
cache['apple'] = 'top dog'
...
>>> cache['apple']
'top dog'
... after 360 seconds...
>>> cache['apple']
KeyError exception raised
ttl
is de tijd om te leven in seconden.
Antwoord 3, autoriteit 27%
U kunt de module ExpiringDict
gebruiken:
De kern van de bibliotheek is de klasse
ExpiringDict
, een geordend woordenboek met automatisch verlopende waarden voor cachingdoeleinden.
In de beschrijving wordt niet gesproken over multithreading, dus gebruik een Lock
om niet te knoeien.
Antwoord 4, autoriteit 27%
Met betrekking tot een verlopende in-memory cache, voor algemeen gebruik, is een algemeen ontwerppatroon om dit niet via een woordenboek te doen, maar via een functie- of methode-decorateur. Achter de schermen wordt een cache-woordenboek beheerd. Als zodanig vormt dit antwoord enigszins een aanvulling op het antwoord door Gebruikerdat een woordenboek gebruikt in plaats van een decorateur .
De ttl_cache
-decorateur in cachetools
werkt veel op functools.lru_cache
, maar met een tijd om te leven.
import cachetools.func
@cachetools.func.ttl_cache(maxsize=128, ttl=10 * 60)
def example_function(key):
return get_expensively_computed_value(key)
class ExampleClass:
EXP = 2
@classmethod
@cachetools.func.ttl_cache()
def example_classmethod(cls, i):
return i * cls.EXP
@staticmethod
@cachetools.func.ttl_cache()
def example_staticmethod(i):
return i * 3
Antwoord 5, autoriteit 13%
Ik ben helemaal weg van het idee van @iutinvg, ik wilde het gewoon wat verder uitdiepen; ontkoppel het van het moeten weten om de ttl
in elke functie door te geven en maak er gewoon een decorateur van, zodat u er niet over hoeft na te denken. Als je django
, py3
hebt en geen zin hebt om pip afhankelijkheden te installeren, probeer dit dan eens.
import time
from django.utils.functional import lazy
from functools import lru_cache, partial, update_wrapper
def lru_cache_time(seconds, maxsize=None):
"""
Adds time aware caching to lru_cache
"""
def wrapper(func):
# Lazy function that makes sure the lru_cache() invalidate after X secs
ttl_hash = lazy(lambda: round(time.time() / seconds), int)()
@lru_cache(maxsize)
def time_aware(__ttl, *args, **kwargs):
"""
Main wrapper, note that the first argument ttl is not passed down.
This is because no function should bother to know this that
this is here.
"""
def wrapping(*args, **kwargs):
return func(*args, **kwargs)
return wrapping(*args, **kwargs)
return update_wrapper(partial(time_aware, ttl_hash), func)
return wrapper
Bewijzen dat het werkt (met voorbeelden):
@lru_cache_time(seconds=10)
def meaning_of_life():
"""
This message should show up if you call help().
"""
print('this better only show up once!')
return 42
@lru_cache_time(seconds=10)
def multiply(a, b):
"""
This message should show up if you call help().
"""
print('this better only show up once!')
return a * b
# This is a test, prints a `.` for every second, there should be 10s
# between each "this better only show up once!" *2 because of the two functions.
for _ in range(20):
meaning_of_life()
multiply(50, 99991)
print('.')
time.sleep(1)
Antwoord 6, autoriteit 10%
Ik weet dat dit een beetje oud is, maar voor degenen die geïnteresseerd zijn in geen afhankelijkheden van derden, dit is een kleine omhulling rond de ingebouwde functools.lru_cache
(ik merkte Javier’s soortgelijk antwoordna het schrijven van dit, maar dacht ik post het toch omdat dit geen Django vereist):
import functools
import time
def time_cache(max_age, maxsize=128, typed=False):
"""Least-recently-used cache decorator with time-based cache invalidation.
Args:
max_age: Time to live for cached results (in seconds).
maxsize: Maximum cache size (see `functools.lru_cache`).
typed: Cache on distinct input types (see `functools.lru_cache`).
"""
def _decorator(fn):
@functools.lru_cache(maxsize=maxsize, typed=typed)
def _new(*args, __time_salt, **kwargs):
return fn(*args, **kwargs)
@functools.wraps(fn)
def _wrapped(*args, **kwargs):
return _new(*args, **kwargs, __time_salt=int(time.time() / max_age))
return _wrapped
return _decorator
En het gebruik ervan:
@time_cache(10)
def expensive(a: int):
"""An expensive function."""
time.sleep(1 + a)
print("Starting...")
expensive(1)
print("Again...")
expensive(1)
print("Done")
NB dit gebruikt time.time
en komt met al zijn voorbehouden. Misschien wilt u in plaats daarvan time.monotonic
gebruiken, indien beschikbaar/gepast.
Antwoord 7, autoriteit 6%
Als u pakketten van derden wilt vermijden, kunt u een aangepaste timed_lru_cache-decorateur, die voortbouwt op de lru_cache-decorateur.
Het onderstaande is standaard ingesteld op een levensduur van 20 seconden en een maximale grootte van 128. Houd er rekening mee dat de volledige cache na 20 seconden verloopt, niet afzonderlijke items.
from datetime import datetime, timedelta
from functools import lru_cache, wraps
def timed_lru_cache(seconds: int = 20, maxsize: int = 128):
def wrapper_cache(func):
func = lru_cache(maxsize=maxsize)(func)
func.lifetime = timedelta(seconds=seconds)
func.expiration = datetime.utcnow() + func.lifetime
@wraps(func)
def wrapped_func(*args, **kwargs):
if datetime.utcnow() >= func.expiration:
func.cache_clear()
func.expiration = datetime.utcnow() + func.lifetime
return func(*args, **kwargs)
return wrapped_func
return wrapper_cache
Voeg dan gewoon @timed_lru_cache
toe boven je functie en je bent klaar om te gaan:
@timed_lru_cache
def my_function():
# code goes here...
Antwoord 8, autoriteit 4%
Zoiets?
from time import time, sleep
import itertools
from threading import Thread, RLock
import signal
class CacheEntry():
def __init__(self, string, ttl=20):
self.string = string
self.expires_at = time() + ttl
self._expired = False
def expired(self):
if self._expired is False:
return (self.expires_at < time())
else:
return self._expired
class CacheList():
def __init__(self):
self.entries = []
self.lock = RLock()
def add_entry(self, string, ttl=20):
with self.lock:
self.entries.append(CacheEntry(string, ttl))
def read_entries(self):
with self.lock:
self.entries = list(itertools.dropwhile(lambda x:x.expired(), self.entries))
return self.entries
def read_entries(name, slp, cachelist):
while True:
print "{}: {}".format(name, ",".join(map(lambda x:x.string, cachelist.read_entries())))
sleep(slp)
def add_entries(name, ttl, cachelist):
s = 'A'
while True:
cachelist.add_entry(s, ttl)
print("Added ({}): {}".format(name, s))
sleep(1)
s += 'A'
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal.SIG_DFL)
cl = CacheList()
print_threads = []
print_threads.append(Thread(None, read_entries, args=('t1', 1, cl)))
# print_threads.append(Thread(None, read_entries, args=('t2', 2, cl)))
# print_threads.append(Thread(None, read_entries, args=('t3', 3, cl)))
adder_thread = Thread(None, add_entries, args=('a1', 2, cl))
adder_thread.start()
for t in print_threads:
t.start()
for t in print_threads:
t.join()
adder_thread.join()
Antwoord 9
Je kunt ook gaan voor dictttl, met MutableMapping, OrderedDict en defaultDict(list)
Initialiseer een gewoon dictaat waarbij elke toets een ttl van 30 seconden heeft
data = {'a': 1, 'b': 2}
dict_ttl = DictTTL(30, data)
Geordende Dict
data = {'a': 1, 'b': 2}
dict_ttl = OrderedDictTTL(30, data)
defaultDict(lijst)
dict_ttl = DefaultDictTTL(30)
data = {'a': [10, 20], 'b': [1, 2]}
[dict_ttl.append_values(k, v) for k, v in data.items()]