taskdata.py: Improve abort flag handling, fixing several bugs
[bitbake.git] / lib / bb / runqueue.py
1 #!/usr/bin/env python
2 # ex:ts=4:sw=4:sts=4:et
3 # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4 """
5 BitBake 'RunQueue' implementation
6
7 Handles preparation and execution of a queue of tasks
8
9 Copyright (C) 2006  Richard Purdie
10
11 This program is free software; you can redistribute it and/or modify it under
12 the terms of the GNU General Public License version 2 as published by the Free 
13 Software Foundation
14
15 This program is distributed in the hope that it will be useful, but WITHOUT
16 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License along with
20 """
21
22 from bb import msg, data, fetch, event, mkdirhier, utils
23 from sets import Set 
24 import bb, os, sys
25
26 class TaskFailure(Exception):
27     """Exception raised when a task in a runqueue fails"""
28
29     def __init__(self, fnid, fn, taskname):
30         self.args = fnid, fn, taskname
31
32 class RunQueue:
33     """
34     BitBake Run Queue implementation
35     """
36     def __init__(self):
37         self.reset_runqueue()
38
39     def reset_runqueue(self):
40         self.runq_fnid = []
41         self.runq_task = []
42         self.runq_depends = []
43         self.runq_revdeps = []
44         self.runq_weight = []
45         self.prio_map = []
46
47     def get_user_idstring(self, task, taskData):
48         fn = taskData.fn_index[self.runq_fnid[task]]
49         taskname = self.runq_task[task]
50         return "%s, %s" % (fn, taskname)
51
52     def prepare_runqueue(self, cfgData, dataCache, taskData, targets):
53         """
54         Turn a set of taskData into a RunQueue and compute data needed 
55         to optimise the execution order.
56         targets is list of paired values - a provider name and the task to run
57         """
58
59         depends = []
60         runq_weight1 = []
61         runq_build = []
62         runq_done = []
63
64         bb.msg.note(1, bb.msg.domain.RunQueue, "Preparing Runqueue")
65
66         for task in range(len(taskData.tasks_name)):
67             fnid = taskData.tasks_fnid[task]
68             fn = taskData.fn_index[fnid]
69             task_deps = dataCache.task_deps[fn]
70
71             if fnid not in taskData.failed_fnids:
72
73                 depends = taskData.tasks_tdepends[task]
74
75                 # Resolve Depends
76                 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
77                     taskname = task_deps['deptask'][taskData.tasks_name[task]]
78                     for depid in taskData.depids[fnid]:
79                         if depid in taskData.build_targets:
80                             depdata = taskData.build_targets[depid][0]
81                             if depdata:
82                                 dep = taskData.fn_index[depdata]
83                                 depends.append(taskData.gettask_id(dep, taskname))
84
85                 # Resolve Runtime Depends
86                 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
87                     taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
88                     for depid in taskData.rdepids[fnid]:
89                         if depid in taskData.run_targets:
90                             depdata = taskData.run_targets[depid][0]
91                             if depdata:
92                                 dep = taskData.fn_index[depdata]
93                                 depends.append(taskData.gettask_id(dep, taskname))
94
95                 def add_recursive_run(rdepid):
96                     """
97                     Add runtime depends of rdepid to depends, if 
98                     we've not see it before
99                     (calls itself recursively)
100                     """
101                     if str(rdepid) in rdep_seen:
102                         return
103                     rdep_seen.append(rdepid)
104                     if rdepid in taskData.run_targets:
105                         depdata = taskData.run_targets[rdepid][0]
106                         if depdata:
107                             dep = taskData.fn_index[depdata]
108                             taskid = taskData.gettask_id(dep, taskname)
109                             depends.append(taskid)
110                             fnid = taskData.tasks_fnid[taskid]
111                             for nextdepid in taskData.rdepids[fnid]:
112                                 if nextdepid not in rdep_seen:
113                                     add_recursive_run(nextdepid)
114
115                 # Resolve Recursive Runtime Depends
116                 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
117                     rdep_seen = []
118                     taskname = task_deps['recrdeptask'][taskData.tasks_name[task]]
119                     for rdepid in taskData.rdepids[fnid]:
120                         add_recursive_run(rdepid)
121
122                 #Prune self references
123                 if task in depends:
124                     newdep = []
125                     bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s %s) contains self reference! %s" % (task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends))
126                     for dep in depends:
127                        if task != dep:
128                           newdep.append(dep)
129                     depends = newdep
130
131
132             self.runq_fnid.append(taskData.tasks_fnid[task])
133             self.runq_task.append(taskData.tasks_name[task])
134             self.runq_depends.append(Set(depends))
135             self.runq_revdeps.append(Set())
136             self.runq_weight.append(0)
137
138             runq_weight1.append(0)
139             runq_build.append(0)
140             runq_done.append(0)
141
142         bb.msg.note(2, bb.msg.domain.RunQueue, "Marking Active Tasks")
143
144         def mark_active(listid, depth):
145             """
146             Mark an item as active along with its depends
147             (calls itself recursively)
148             """
149
150             if runq_build[listid] == 1:
151                 return
152
153             runq_build[listid] = 1
154
155             depends = self.runq_depends[listid]
156             for depend in depends:
157                 mark_active(depend, depth+1)
158
159         for target in targets:
160             targetid = taskData.getbuild_id(target[0])
161             if targetid in taskData.failed_deps:
162                 continue
163
164             fnid = taskData.build_targets[targetid][0]
165             if fnid in taskData.failed_fnids:
166                 continue
167
168             fnids = taskData.matches_in_list(self.runq_fnid, fnid)
169             tasks = taskData.matches_in_list(self.runq_task, target[1])
170
171             listid = taskData.both_contain(fnids, tasks)
172
173             mark_active(listid, 1)
174
175         # Prune inactive tasks
176         maps = []
177         delcount = 0
178         for listid in range(len(self.runq_fnid)):
179             if runq_build[listid-delcount] == 1:
180                 maps.append(listid-delcount)
181             else:
182                 del self.runq_fnid[listid-delcount]
183                 del self.runq_task[listid-delcount]
184                 del self.runq_depends[listid-delcount]
185                 del self.runq_weight[listid-delcount]
186                 del runq_weight1[listid-delcount]
187                 del runq_build[listid-delcount]
188                 del runq_done[listid-delcount]
189                 del self.runq_revdeps[listid-delcount]
190                 delcount = delcount + 1
191                 maps.append(-1)
192
193         bb.msg.note(2, bb.msg.domain.RunQueue, "Pruned %s inactive tasks, %s left" % (delcount, len(self.runq_fnid)))
194
195         for listid in range(len(self.runq_fnid)):
196             newdeps = []
197             origdeps = self.runq_depends[listid]
198             for origdep in origdeps:
199                 if maps[origdep] == -1:
200                     bb.msg.fatal(bb.msg.domain.RunQueue, "Invalid mapping - Should never happen!")
201                 newdeps.append(maps[origdep])
202             self.runq_depends[listid] = Set(newdeps)
203
204         bb.msg.note(2, bb.msg.domain.RunQueue, "Assign Weightings")
205
206         for listid in range(len(self.runq_fnid)):
207             for dep in self.runq_depends[listid]:
208                 self.runq_revdeps[dep].add(listid)
209
210         endpoints = []
211         for listid in range(len(self.runq_fnid)):
212             revdeps = self.runq_revdeps[listid]
213             if len(revdeps) == 0:
214                 runq_done[listid] = 1
215                 self.runq_weight[listid] = 1
216                 endpoints.append(listid)
217             for dep in revdeps:
218                 if dep in self.runq_depends[listid]:
219                     #self.dump_data(taskData)
220                     bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep] , taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
221             runq_weight1[listid] = len(revdeps)
222
223         bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints))
224
225         while 1:
226             next_points = []
227             for listid in endpoints:
228                 for revdep in self.runq_depends[listid]:
229                     self.runq_weight[revdep] = self.runq_weight[revdep] + self.runq_weight[listid]
230                     runq_weight1[revdep] = runq_weight1[revdep] - 1
231                     if runq_weight1[revdep] == 0:
232                         next_points.append(revdep)
233                         runq_done[revdep] = 1
234             endpoints = next_points
235             if len(next_points) == 0:
236                 break           
237
238         # Sanity Checks
239         for task in range(len(self.runq_fnid)):
240             if runq_done[task] == 0:
241                 seen = []
242                 def print_chain(taskid):
243                     seen.append(taskid)
244                     for revdep in self.runq_revdeps[taskid]:
245                         if runq_done[revdep] == 0:
246                             bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) (depends: %s)" % (revdep, self.get_user_idstring(revdep, taskData), self.runq_depends[revdep]))
247                             if revdep not in seen:
248                                 print_chain(revdep)
249                 print_chain(task)
250                 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) not processed!\nThis is probably a circular dependency (the chain might be printed above)." % (task, self.get_user_idstring(task, taskData)))
251             if runq_weight1[task] != 0:
252                 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) count not zero!" % (task, self.get_user_idstring(task, taskData)))
253
254         # Make a weight sorted map
255         from copy import deepcopy
256
257         sortweight = deepcopy(self.runq_weight)
258         sortweight.sort()
259         copyweight = deepcopy(self.runq_weight)
260         self.prio_map = []
261
262         for weight in sortweight:
263             idx = copyweight.index(weight)
264             self.prio_map.append(idx)
265             copyweight[idx] = -1
266         self.prio_map.reverse()
267
268     def execute_runqueue(self, cooker, cfgData, dataCache, taskData, runlist):
269         """
270         Run the tasks in a queue prepared by prepare_runqueue
271         Upon failure, optionally try to recover the build using any alternate providers
272         (if the abort on failure configuration option isn't set)
273         """
274
275         failures = 0
276         while 1:
277             try:
278                 self.execute_runqueue_internal(cooker, cfgData, dataCache, taskData)
279                 return failures
280             except bb.runqueue.TaskFailure, (fnid, taskData.fn_index[fnid], taskname):
281                 if taskData.abort:
282                     raise
283                 taskData.fail_fnid(fnid)
284                 self.reset_runqueue()
285                 self.prepare_runqueue(cfgData, dataCache, taskData, runlist)
286                 failures = failures + 1
287
288     def execute_runqueue_internal(self, cooker, cfgData, dataCache, taskData):
289         """
290         Run the tasks in a queue prepared by prepare_runqueue
291         """
292
293         bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue")
294
295         runq_buildable = []
296         runq_running = []
297         runq_complete = []
298         active_builds = 0
299         build_pids = {}
300
301         def get_next_task(data):
302             """
303             Return the id of the highest priority task that is buildable
304             """
305             for task1 in range(len(data.runq_fnid)):
306                 task = data.prio_map[task1]
307                 if runq_running[task] == 1:
308                     continue
309                 if runq_buildable[task] == 1:
310                     return task
311             return None
312
313         def task_complete(data, task):
314             """
315             Mark a task as completed
316             Look at the reverse dependencies and mark any task with 
317             completed dependencies as buildable
318             """
319             runq_complete[task] = 1
320             for revdep in data.runq_revdeps[task]:
321                 if runq_running[revdep] == 1:
322                     continue
323                 if runq_buildable[revdep] == 1:
324                     continue
325                 alldeps = 1
326                 for dep in data.runq_depends[revdep]:
327                     if runq_complete[dep] != 1:
328                         alldeps = 0
329                 if alldeps == 1:
330                     runq_buildable[revdep] = 1
331                     fn = taskData.fn_index[self.runq_fnid[revdep]]
332                     taskname = self.runq_task[revdep]
333                     bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname))
334
335         # Mark initial buildable tasks
336         for task in range(len(self.runq_fnid)):
337             runq_running.append(0)
338             runq_complete.append(0)
339             if len(self.runq_depends[task]) == 0:
340                 runq_buildable.append(1)
341             else:
342                 runq_buildable.append(0)
343
344
345         number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData) or 1)
346
347         try:
348             while 1:
349                 task = get_next_task(self)
350                 if task is not None:
351                     fn = taskData.fn_index[self.runq_fnid[task]]
352                     taskname = self.runq_task[task]
353
354                     if bb.build.stamp_is_current_cache(dataCache, fn, taskname):
355                         targetid = taskData.gettask_id(fn, taskname)
356                         if not (targetid in taskData.external_targets and cooker.configuration.force):
357                             bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task, taskData)))
358                             runq_running[task] = 1
359                             task_complete(self, task)
360                             continue
361
362                     bb.msg.debug(1, bb.msg.domain.RunQueue, "Running task %s (%s)" % (task, self.get_user_idstring(task, taskData)))
363                     try: 
364                         pid = os.fork() 
365                     except OSError, e: 
366                         bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
367                     if pid == 0:
368                         cooker.configuration.cmd = taskname[3:]
369                         try: 
370                             cooker.tryBuild(fn, False)
371                         except bb.build.EventException, e:
372                             bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
373                             sys.exit(1)
374                         except:
375                             sys.exit(1)
376                         sys.exit(0)
377                     build_pids[pid] = task
378                     runq_running[task] = 1
379                     active_builds = active_builds + 1
380                     if active_builds < number_tasks:
381                         continue
382                 if active_builds > 0:
383                     result = os.waitpid(-1, 0)
384                     active_builds = active_builds - 1
385                     if result[1] != 0:
386                         bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (build_pids[result[0]], self.get_user_idstring(build_pids[result[0]], taskData)))
387                         raise bb.runqueue.TaskFailure(self.runq_fnid[build_pids[result[0]]], taskData.fn_index[self.runq_fnid[build_pids[result[0]]]], self.runq_task[build_pids[result[0]]])
388                     task_complete(self, build_pids[result[0]])
389                     del build_pids[result[0]]
390                     continue
391                 break
392         except SystemExit:
393             raise
394         except:
395             bb.msg.error(bb.msg.domain.RunQueue, "Exception received")
396             if active_builds > 0:
397                 while active_builds > 0:
398                     bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % active_builds)
399                     tasknum = 1
400                     for k, v in build_pids.iteritems():
401                         bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v, taskData), k))
402                         tasknum = tasknum + 1
403                     result = os.waitpid(-1, 0)
404                     del build_pids[result[0]]               
405                     active_builds = active_builds - 1
406             raise
407
408         # Sanity Checks
409         for task in range(len(self.runq_fnid)):
410             if runq_buildable[task] == 0:
411                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task)
412             if runq_running[task] == 0:
413                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task)
414             if runq_complete[task] == 0:
415                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task)
416
417         return 0
418
419     def dump_data(self, taskQueue):
420         """
421         Dump some debug information on the internal data structures
422         """
423         bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:")
424         for task in range(len(self.runq_fnid)):
425                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
426                         taskQueue.fn_index[self.runq_fnid[task]], 
427                         self.runq_task[task], 
428                         self.runq_weight[task], 
429                         self.runq_depends[task], 
430                         self.runq_revdeps[task]))
431
432         bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:")
433         for task1 in range(len(self.runq_fnid)):
434             if task1 in self.prio_map:
435                 task = self.prio_map[task1]
436                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
437                         taskQueue.fn_index[self.runq_fnid[task]], 
438                         self.runq_task[task], 
439                         self.runq_weight[task], 
440                         self.runq_depends[task], 
441                         self.runq_revdeps[task]))