Time-out op een functie-oproep

Ik bel een functie in Python waarvan ik weet dat ik kan stallen en dwingt me om het script opnieuw op te starten.

Hoe noem ik de functie of wat wikkel ik het in, zodat als het langer dan 5 seconden duurt, het script het annuleert en iets anders doet?


Antwoord 1, Autoriteit 100%

U kunt de signaal pakket gebruiken als u op Unix gebruikt:

In [1]: import signal
# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print("Forever is over!")
   ...:     raise Exception("end of time")
   ...: 
# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print("sec")
   ...:         time.sleep(1)
   ...:         
   ...:         
# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0
# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0
In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print(exc)
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time
# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

10 seconden na de oproep signal.alarm(10), de handler wordt genoemd. Dit verhoogt een uitzondering die u kunt onderscheppen vanaf de reguliere Python-code.

Deze module speelt niet goed met draden (maar dan, wie wel?)

Merk op dat sinds we een uitzondering verhogen wanneer de time-out gebeurt, het kan uiteindelijk worden gepakt en genegeerd in de functie, bijvoorbeeld van een dergelijke functie:

def loop_forever():
    while 1:
        print('sec')
        try:
            time.sleep(10)
        except:
            continue

Antwoord 2, autoriteit 68%

Je kunt multiprocessing.Processgebruiken om precies dat te doen.

Code

import multiprocessing
import time
# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)
if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()
    # Wait for 10 seconds or until process finishes
    p.join(10)
    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."
        # Terminate - may not work if process is stuck for good
        p.terminate()
        # OR Kill - will work for sure, no chance for process to finish nicely however
        # p.kill()
        p.join()

Antwoord 3, autoriteit 34%

Hoe roep ik de functie aan of wat moet ik erin stoppen, zodat als het langer dan 5 seconden duurt, het script het annuleert?

Ik heb een gistgeplaatst die deze vraag/het probleem oplost met een decorateur en een threading.Timer. Hier is het met een storing.

Import en instellingen voor compatibiliteit

Het is getest met Python 2 en 3. Het zou ook moeten werken onder Unix/Linux en Windows.

Eerst de invoer. Deze proberen de code consistent te houden, ongeacht de Python-versie:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

Gebruik versie-onafhankelijke code:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

Nu hebben we onze functionaliteit geïmporteerd uit de standaardbibliotheek.

exit_afterdecorateur

Vervolgens hebben we een functie nodig om de main()uit de onderliggende thread te beëindigen:

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

En hier is de decorateur zelf:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

Gebruik

En hier is het gebruik dat direct antwoord geeft op uw vraag over afsluiten na 5 seconden!:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

Demo:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

De tweede functie-aanroep zal niet eindigen, in plaats daarvan moet het proces worden afgesloten met een traceback!

KeyboardInterruptstopt niet altijd een slapende thread

Houd er rekening mee dat de slaap niet altijd wordt onderbroken door een toetsenbordonderbreking, op Python 2 op Windows, bijvoorbeeld:

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')
>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

Het is ook niet waarschijnlijk dat code die wordt uitgevoerd in extensies wordt onderbroken, tenzij er expliciet wordt gecontroleerd op PyErr_CheckSignals(), zie Cython, Python en KeyboardInterrupt genegeerd

Ik zou in ieder geval vermijden om een thread langer dan een seconde te laten slapen – dat is een eon in processortijd.

Hoe roep ik de functie aan of wat stop ik erin zodat als het langer dan 5 seconden duurt, het script het annuleert en iets anders doet?

Om het te vangen en iets anders te doen, kunt u het toetsenboardBEURRUCT vangen.

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else

Antwoord 4, Autoriteit 22%

Ik heb een ander voorstel dat een pure functie is (met dezelfde API als de inrijgensuggestie) en lijkt te werken (op basis van suggesties op deze thread)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal
    class TimeoutError(Exception):
        pass
    def handler(signum, frame):
        raise TimeoutError()
    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)
    return result

Antwoord 5, Autoriteit 13%

Ik rende deze thread tegen bij het zoeken naar een time-out oproep op unit-tests. Ik heb niets eenvoudigs gevonden in de antwoorden of 3e partijpakketten, dus ik schreef de decorator hieronder waar je naartoe kunt vallen in de code:

import multiprocessing.pool
import functools
def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

Dan is het zo simpel als dit om een test of een andere functie die je leuk vindt te time-outen:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...

Antwoord 6, autoriteit 11%

Het stopitpakket, gevonden op pypi, lijkt goed om te gaan met time-outs.

Ik hou van de @stopit.threading_timeoutable-decorator, die een timeout-parameter toevoegt aan de gedecoreerde functie, die doet wat je verwacht, het stopt de functie.

>

Bekijk het op pypi: https://pypi.python.org/pypi/stopit


Antwoord 7, autoriteit 7%

Er zijn veel suggesties, maar geen enkele gebruikt concurrent.futures, wat volgens mij de meest leesbare manier is om hiermee om te gaan.

from concurrent.futures import ProcessPoolExecutor
# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

Super eenvoudig te lezen en te onderhouden.

We maken een pool, dienen een enkel proces in en wachten dan tot 5 seconden voordat we een TimeoutError opwerpen die je kunt opvangen en afhandelen zoals je maar wilt.

Inheems in python 3.2+ en gebackporteerd naar 2.7 (pip install futures).

Schakelen tussen threads en processen is net zo eenvoudig als het vervangen van ProcessPoolExecutordoor ThreadPoolExecutor.

Als u het proces op time-out wilt beëindigen, zou ik u voorstellen Pebble .


Antwoord 8, Autoriteit 6%

Geweldig, gemakkelijk te gebruiken en betrouwbaar PYPI Project Time-out-Decorator (https://pypi.org/project/timeout-decorator/ )

installatie :

pip install timeout-decorator

gebruik :

import time
import timeout_decorator
@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i
if __name__ == '__main__':
    mytest()

Antwoord 9, Autoriteit 5%

Ik ben de auteur van WrapT_Timeout_Decorator

De meeste oplossingen die hier worden gepresenteerd, werken wantwagevully onder Linux op de eerste blik – omdat we vork () en signalen () hebben – maar op Windows zien de dingen er een beetje anders uit.
En als het gaat om subThreads op Linux, kunt u geen signalen meer gebruiken.

Om een ​​proces onder Windows te spawnen, moet het paantelen zijn – en vele ingerichte functies of klassenmethoden zijn niet.

Dus je moet een betere kickler gebruiken zoals dille en multiprocess (niet augurk en multiprocessing) – daarom kun je ProcesspoolExcuter (of alleen met beperkte functionaliteit) niet gebruiken.

Voor de time-out zelf – U moet definiëren wat time-out betekent – omdat het in Windows aanzienlijke (en niet te bepalen) tijd zal vergen om het proces voort te brengen. Dit kan lastig zijn bij korte time-outs. Laten we aannemen dat het spawnen van het proces ongeveer 0,5 seconden duurt (gemakkelijk !!!). Als u een time-out van 0,2 seconden geeft, wat moet er dan gebeuren?
Moet de functie na 0,5 + 0,2 seconden een time-out hebben (laat de methode dus 0,2 seconden lopen)?
Of moet het aangeroepen proces na 0,2 seconden een time-out hebben (in dat geval zal de gedecoreerde functie ALTIJD een time-out hebben, omdat het in die tijd niet eens wordt voortgebracht) ?

Ook geneste decorateurs kunnen vervelend zijn en je kunt signalen niet gebruiken in een subthread. Als u een echt universele, platformonafhankelijke decorateur wilt maken, moet dit alles in overweging worden genomen (en worden getest).

Andere problemen zijn het teruggeven van uitzonderingen aan de beller, evenals logproblemen (indien gebruikt in de gedecoreerde functie – inloggen op bestanden in een ander proces wordt NIET ondersteund)

Ik heb geprobeerd alle randgevallen te behandelen. Je zou in het pakket wrapt_timeout_decorator kunnen kijken, of op zijn minst je eigen oplossingen kunnen testen, geïnspireerd door de unittests die daar worden gebruikt.

@Alexis Eggermont – helaas heb ik niet genoeg punten om commentaar te geven – misschien kan iemand anders je op de hoogte stellen – ik denk dat ik je importprobleem heb opgelost.


Antwoord 10, autoriteit 4%

Voortbouwend op en en het verbeteren van het antwoord van @piro , kunt u een contextmanager bouwen. Dit zorgt voor een zeer leesbare code die het alarmsignaal uitschakelt na een succesvolle run (sets signal.alarm(0))

from contextlib import contextmanager
import signal
import time
@contextmanager
def timeout(duration):
    def timeout_handler(signum, frame):
        raise Exception(f'block timedout after {duration} seconds')
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(duration)
    yield
    signal.alarm(0)
def sleeper(duration):
    time.sleep(duration)
    print('finished')

Voorbeeld van gebruik:

In [19]: with timeout(2):
    ...:     sleeper(1)
    ...:     
finished
In [20]: with timeout(2):
    ...:     sleeper(3)
    ...:         
---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-20-66c78858116f> in <module>()
      1 with timeout(2):
----> 2     sleeper(3)
      3 
<ipython-input-7-a75b966bf7ac> in sleeper(t)
      1 def sleeper(t):
----> 2     time.sleep(t)
      3     print('finished')
      4 
<ipython-input-18-533b9e684466> in timeout_handler(signum, frame)
      2 def timeout(duration):
      3     def timeout_handler(signum, frame):
----> 4         raise Exception(f'block timedout after {duration} seconds')
      5     signal.signal(signal.SIGALRM, timeout_handler)
      6     signal.alarm(duration)
Exception: block timedout after 2 seconds

Antwoord 11, autoriteit 3%

timeout-decoratorwerkt niet op Windows-systeem, omdat Windows het signalniet goed ondersteunt.

Als u timeout-decorator in het Windows-systeem gebruikt, krijgt u het volgende

AttributeError: module 'signal' has no attribute 'SIGALRM'

Sommigen stelden voor om use_signals=Falsete gebruiken, maar dat werkte niet voor mij.

Auteur @bitranox heeft het volgende pakket gemaakt:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

Codevoorbeeld:

import time
from wrapt_timeout_decorator import *
@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))
def main():
    mytest('starting')
if __name__ == '__main__':
    main()

Geeft de volgende uitzondering:

TimeoutError: Function mytest timed out after 5 seconds

Antwoord 12, autoriteit 2%

Hoogtepunten

  • Verhoogt TimeoutErrorgebruikt uitzonderingen om te waarschuwen bij time-out – kan eenvoudig worden gewijzigd
  • Cross-platform: Windows & Mac OS X
  • Compatibiliteit: Python 3.6+ (ik heb ook getest op python 2.7 en het werkt met kleine syntaxisaanpassingen)

Voor een volledige uitleg en uitbreiding van parallelle kaarten, zie hier https://flipdazed.github.io/blog/quant%20dev/parallel-functions-with-timeouts

Minimaal voorbeeld

>>> @killer_call(timeout=4)
... def bar(x):
...        import time
...        time.sleep(x)
...        return x
>>> bar(10)
Traceback (most recent call last):
  ...
__main__.TimeoutError: function 'bar' timed out after 4s

en zoals verwacht

>>> bar(2)
2

Volledige code

import multiprocessing as mp
import multiprocessing.queues as mpq
import functools
import dill
from typing import Tuple, Callable, Dict, Optional, Iterable, List, Any
class TimeoutError(Exception):
    def __init__(self, func: Callable, timeout: int):
        self.t = timeout
        self.fname = func.__name__
    def __str__(self):
            return f"function '{self.fname}' timed out after {self.t}s"
def _lemmiwinks(func: Callable, args: Tuple, kwargs: Dict[str, Any], q: mp.Queue):
    """lemmiwinks crawls into the unknown"""
    q.put(dill.loads(func)(*args, **kwargs))
def killer_call(func: Callable = None, timeout: int = 10) -> Callable:
    """
    Single function call with a timeout
    Args:
        func: the function
        timeout: The timeout in seconds
    """
    if not isinstance(timeout, int):
        raise ValueError(f'timeout needs to be an int. Got: {timeout}')
    if func is None:
        return functools.partial(killer_call, timeout=timeout)
    @functools.wraps(killer_call)
    def _inners(*args, **kwargs) -> Any:
        q_worker = mp.Queue()
        proc = mp.Process(target=_lemmiwinks, args=(dill.dumps(func), args, kwargs, q_worker))
        proc.start()
        try:
            return q_worker.get(timeout=timeout)
        except mpq.Empty:
            raise TimeoutError(func, timeout)
        finally:
            try:
                proc.terminate()
            except:
                pass
    return _inners
if __name__ == '__main__':
    @killer_call(timeout=4)
    def bar(x):
        import time
        time.sleep(x)
        return x
    print(bar(2))
    bar(10)

Opmerkingen

U moet in de functie in de functie importeren vanwege de manier dillWERKEN.

Dit betekent ook dat deze functies mogelijk niet compatibel zijn met doctestals er invoer in uw doelfuncties zijn. U krijgt een probleem met __import__niet gevonden.


Antwoord 13

We kunnen signalen voor hetzelfde gebruiken. Ik denk dat het onderstaande voorbeeld nuttig voor u zal zijn. Het is heel eenvoudig in vergelijking met draden.

import signal
def timeout(signum, frame):
    raise myException
#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',
#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)
#change 5 to however many seconds you need
signal.alarm(5)
try:
    main()
except myException:
    print "whoops"

Antwoord 14

Een andere oplossing met Asyncio:

Als u de achtergrondtaak wilt annuleren en niet alleen time-out op het lopende hoofdcode, hebt u een expliciete communicatie nodig van de hoofddraad om de code van de taak te vragen om te annuleren, zoals een threading.event ()

import asyncio
import functools
import multiprocessing
from concurrent.futures.thread import ThreadPoolExecutor
class SingletonTimeOut:
    pool = None
    @classmethod
    def run(cls, to_run: functools.partial, timeout: float):
        pool = cls.get_pool()
        loop = cls.get_loop()
        try:
            task = loop.run_in_executor(pool, to_run)
            return loop.run_until_complete(asyncio.wait_for(task, timeout=timeout))
        except asyncio.TimeoutError as e:
            error_type = type(e).__name__ #TODO
            raise e
    @classmethod
    def get_pool(cls):
        if cls.pool is None:
            cls.pool = ThreadPoolExecutor(multiprocessing.cpu_count())
        return cls.pool
    @classmethod
    def get_loop(cls):
        try:
            return asyncio.get_event_loop()
        except RuntimeError:
            asyncio.set_event_loop(asyncio.new_event_loop())
            # print("NEW LOOP" + str(threading.current_thread().ident))
            return asyncio.get_event_loop()
# ---------------
TIME_OUT = float('0.2')  # seconds
def toto(input_items,nb_predictions):
    return 1
to_run = functools.partial(toto,
                           input_items=1,
                           nb_predictions="a")
results = SingletonTimeOut.run(to_run, TIME_OUT)

Antwoord 15

#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)

Antwoord 16

Voor het geval het voor iemand nuttig is, voortbouwend op het antwoord van @piro, heb ik een functiedecorateur gemaakt:

import time
import signal
from functools import wraps
def timeout(timeout_secs: int):
    def wrapper(func):
        @wraps(func)
        def time_limited(*args, **kwargs):
            # Register an handler for the timeout
            def handler(signum, frame):
                raise Exception(f"Timeout for function '{func.__name__}'")
            # Register the signal function handler
            signal.signal(signal.SIGALRM, handler)
            # Define a timeout for your function
            signal.alarm(timeout_secs)
            result = None
            try:
                result = func(*args, **kwargs)
            except Exception as exc:
                raise exc
            finally:
                # disable the signal alarm
                signal.alarm(0)
            return result
        return time_limited
    return wrapper

De wrapper gebruiken op een functie met een 20 secondstime-out ziet er iets uit als:

   @timeout(20)
    def my_slow_or_never_ending_function(name):
        while True:
            time.sleep(1)
            print(f"Yet another second passed {name}...")
    try:
        results = my_slow_or_never_ending_function("Yooo!")
    except Exception as e:
        print(f"ERROR: {e}")

Antwoord 17

Ik had behoefte aan nestbaar getimed interrupts (welke sigalarm niet kan doen) die niet geblokkeerd raken door Time.Sleep (die de op thread gebaseerde aanpak niet kan doen). Ik heb uiteindelijk het kopiëren en licht wijzigen van de code van hier: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

De code zelf:

#!/usr/bin/python
# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/
"""alarm.py: Permits multiple SIGALRM events to be queued.
Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""
import heapq
import signal
from time import time
__version__ = '$Revision: 2539 $'.split()[1]
alarmlist = []
__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))
class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_
class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass
def __clear_alarm():
    """Clear an existing alarm.
    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))
def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.
    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()
def alarm(sec, func, *args, **keys):
    """Set an alarm.
    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()
def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.
    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

en een gebruiksvoorbeeld:

import alarm
from time import sleep
try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'

Antwoord 18

Ik heb hetzelfde probleem gehad, maar mijn situatie is dat ik aan de subthread moet werken, het signaal werkte niet voor mij, dus schreef ik een python-pakket: time-out-timer om dit probleem op te lossen, ondersteuning voor gebruik als context of decorateur, gebruik de signaal- of subthread-module om een time-outonderbreking te activeren:

from timeout_timer import timeout, TimeoutInterrupt
class TimeoutInterruptNested(TimeoutInterrupt):
    pass
def test_timeout_nested_loop_both_timeout(timer="thread"):
    cnt = 0
    try:
        with timeout(5, timer=timer):
            try:
                with timeout(2, timer=timer, exception=TimeoutInterruptNested):
                    sleep(2)
            except TimeoutInterruptNested:
                cnt += 1
            time.sleep(10)
    except TimeoutInterrupt:
        cnt += 1
    assert cnt == 2

zie meer: https://github.com/dozysun/timeout-timer


Antwoord 19

Hier is een kleine verbetering ten opzichte van de gegeven thread-gebaseerde oplossing.

De onderstaande code ondersteunt uitzonderingen:

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]
    return ["RESULT", result]
def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default
    if it.result[0] == "exception":
        raise it.result[1]
    return it.result[1]

Het aanroepen met een time-out van 5 seconden:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)

Antwoord 20

Hier is een POSIX-versie die veel van de eerdere antwoorden combineert om de volgende functies te bieden:

  1. Subprocessen die de uitvoering blokkeren.
  2. Gebruik van de time-outfunctie op klassenlidfuncties.
  3. Strenge eis voor opzegtermijn.

Hier is de code en enkele testgevallen:

import threading
import signal
import os
import time
class TerminateExecution(Exception):
    """
    Exception to indicate that execution has exceeded the preset running time.
    """
def quit_function(pid):
    # Killing all subprocesses
    os.setpgrp()
    os.killpg(0, signal.SIGTERM)
    # Killing the main thread
    os.kill(pid, signal.SIGTERM)
def handle_term(signum, frame):
    raise TerminateExecution()
def invoke_with_timeout(timeout, fn, *args, **kwargs):
    # Setting a sigterm handler and initiating a timer
    old_handler = signal.signal(signal.SIGTERM, handle_term)
    timer = threading.Timer(timeout, quit_function, args=[os.getpid()])
    terminate = False
    # Executing the function
    timer.start()
    try:
        result = fn(*args, **kwargs)
    except TerminateExecution:
        terminate = True
    finally:
        # Restoring original handler and cancel timer
        signal.signal(signal.SIGTERM, old_handler)
        timer.cancel()
    if terminate:
        raise BaseException("xxx")
    return result
### Test cases
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        time.sleep(1)
    print('countdown finished')
    return 1337
def really_long_function():
    time.sleep(10)
def really_long_function2():
    os.system("sleep 787")
# Checking that we can run a function as expected.
assert invoke_with_timeout(3, countdown, 1) == 1337
# Testing various scenarios
t1 = time.time()
try:
    print(invoke_with_timeout(1, countdown, 3))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)
t1 = time.time()
try:
    print(invoke_with_timeout(1, really_long_function2))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)
t1 = time.time()
try:
    print(invoke_with_timeout(1, really_long_function))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)
# Checking that classes are referenced and not
# copied (as would be the case with multiprocessing)
class X:
    def __init__(self):
        self.value = 0
    def set(self, v):
        self.value = v
x = X()
invoke_with_timeout(2, x.set, 9)
assert x.value == 9

Other episodes