SeriesPlugin 1.0: First public version
[enigma2-plugins.git] / seriesplugin / src / CancelableThread.py
1 # by betonme @2012
2
3 from Queue import Queue
4 import threading
5 import inspect
6 import ctypes
7
8 from time import time
9
10 # Localization
11 from . import _
12
13 # Plugin internal
14 from Logger import splog
15
16
17 ## {{{ http://code.activestate.com/recipes/465057/ (r1)
18 from threading import Lock
19
20 myLock = Lock()
21
22 def synchronized(lock):
23     """ Synchronization decorator. """
24     def wrap(f):
25         def newFunction(*args, **kw):
26             lock.acquire()
27             try:
28                 return f(*args, **kw)
29             finally:
30                                 lock.release()
31         return newFunction
32     return wrap
33 ## end of http://code.activestate.com/recipes/465057/ }}}
34
35
36 class QueueWithTimeOut(Queue):
37         def __init__(self):
38                 Queue.__init__(self)
39         
40         def join_with_timeout(self, timeout):
41                 self.all_tasks_done.acquire()
42                 endtime = time() + timeout
43                 #splog("SeriesPluginWorker for while")
44                 while self.unfinished_tasks:
45                         remaining = endtime - time()
46                         #splog("SeriesPluginWorker while", remaining)
47                         if remaining <= 0.0:
48                                 break
49                         #splog("SeriesPluginWorker before all_tasks_done wait")
50                         self.all_tasks_done.wait(remaining)
51                 #splog("SeriesPluginWorker before all_tasks_done release")
52                 self.all_tasks_done.release()
53                 
54                 # Release our semaphore
55                 try: myLock.release()
56                 except: pass
57
58
59 def _async_raise(tid, exctype):
60         """raises the exception, performs cleanup if needed"""
61         if not inspect.isclass(exctype):
62                 raise TypeError("Only types can be raised (not instances)")
63         res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
64         if res == 0:
65                 raise ValueError("invalid thread id")
66         elif res != 1:
67                 # """if it returns a number greater than one, you're in trouble, 
68                 # and you should call it again with exc=NULL to revert the effect"""
69                 ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
70                 raise SystemError("PyThreadState_SetAsyncExc failed")
71
72
73 class CancelableThread(threading.Thread):
74         def _get_my_tid(self):
75                 """determines this (self's) thread id"""
76                 if not self.isAlive():
77                         raise threading.ThreadError("the thread is not active")
78                 
79                 # do we have it cached?
80                 if hasattr(self, "_thread_id"):
81                         return self._thread_id
82                 
83                 # no, look for it in the _active dict
84                 for tid, tobj in threading._active.items():
85                         if tobj is self:
86                                 self._thread_id = tid
87                                 return tid
88                 
89                 raise AssertionError("could not determine the thread's id")
90         
91         def raise_exc(self, exctype):
92                 """raises the given exception type in the context of this thread"""
93                 _async_raise(self._get_my_tid(), exctype)
94         
95         def terminate(self):
96                 # Release our semaphore
97                 try: myLock.release()
98                 except: pass
99                 
100                 """raises SystemExit in the context of the given thread, which should 
101                 cause the thread to exit silently (unless caught)"""
102                 self.raise_exc(SystemExit)