runqueue/taskdata.py: Make sure recrdeps tasks include all inter-task dependencies...
[bitbake.git] / lib / bb / runqueue.py
1 #!/usr/bin/env python
2 # ex:ts=4:sw=4:sts=4:et
3 # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4 """
5 BitBake 'RunQueue' implementation
6
7 Handles preparation and execution of a queue of tasks
8 """
9
10 # Copyright (C) 2006-2007  Richard Purdie
11 #
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License version 2 as
14 # published by the Free Software Foundation.
15 #
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 # GNU General Public License for more details.
20 #
21 # You should have received a copy of the GNU General Public License along
22 # with this program; if not, write to the Free Software Foundation, Inc.,
23 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25 from bb import msg, data, event, mkdirhier, utils
26 from sets import Set 
27 import bb, os, sys
28 import signal
29 import stat
30
31 class TaskFailure(Exception):
32     """Exception raised when a task in a runqueue fails"""
33     def __init__(self, x): 
34         self.args = x
35
36
37 class RunQueueStats:
38     """
39     Holds statistics on the tasks handled by the associated runQueue
40     """
41     def __init__(self):
42         self.completed = 0
43         self.skipped = 0
44         self.failed = 0
45
46     def taskFailed(self):
47         self.failed = self.failed + 1
48
49     def taskCompleted(self, number = 1):
50         self.completed = self.completed + number
51
52     def taskSkipped(self, number = 1):
53         self.skipped = self.skipped + number
54
55 class RunQueueScheduler:
56     """
57     Control the order tasks are scheduled in.
58     """
59     def __init__(self, runqueue):
60         """
61         The default scheduler just returns the first buildable task (the 
62         priority map is sorted by task numer)
63         """
64         self.rq = runqueue
65         numTasks = len(self.rq.runq_fnid)
66
67         self.prio_map = []
68         self.prio_map.extend(range(numTasks))
69
70     def next(self):
71         """
72         Return the id of the first task we find that is buildable
73         """
74         for task1 in range(len(self.rq.runq_fnid)):
75             task = self.prio_map[task1]
76             if self.rq.runq_running[task] == 1:
77                 continue
78             if self.rq.runq_buildable[task] == 1:
79                 return task
80
81 class RunQueueSchedulerSpeed(RunQueueScheduler):
82     """
83     A scheduler optimised for speed. The priority map is sorted by task weight,
84     heavier weighted tasks (tasks needed by the most other tasks) are run first.
85     """
86     def __init__(self, runqueue):
87         """
88         The priority map is sorted by task weight.
89         """
90         from copy import deepcopy
91
92         self.rq = runqueue
93
94         sortweight = deepcopy(self.rq.runq_weight)
95         sortweight.sort()
96         copyweight = deepcopy(self.rq.runq_weight)
97         self.prio_map = []
98
99         for weight in sortweight:
100             idx = copyweight.index(weight)
101             self.prio_map.append(idx)
102             copyweight[idx] = -1
103
104         self.prio_map.reverse()
105
106 class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
107     """
108     A scheduler optimised to complete .bb files are quickly as possible. The 
109     priority map is sorted by task weight, but then reordered so once a given 
110     .bb file starts to build, its completed as quickly as possible. This works
111     well where disk space is at a premium and classes like OE's rm_work are in 
112     force.
113     """
114     def __init__(self, runqueue):
115         RunQueueSchedulerSpeed.__init__(self, runqueue)
116         from copy import deepcopy
117
118         #FIXME - whilst this groups all fnids together it does not reorder the
119         #fnid groups optimally.
120  
121         basemap = deepcopy(self.prio_map)
122         self.prio_map = []
123         while (len(basemap) > 0):
124             entry = basemap.pop(0)
125             self.prio_map.append(entry)
126             fnid = self.rq.runq_fnid[entry]
127             todel = []
128             for entry in basemap:
129                 entry_fnid = self.rq.runq_fnid[entry]
130                 if entry_fnid == fnid:
131                     todel.append(basemap.index(entry))
132                     self.prio_map.append(entry)
133             todel.reverse()
134             for idx in todel:
135                 del basemap[idx]
136
137 class RunQueue:
138     """
139     BitBake Run Queue implementation
140     """
141     def __init__(self, cooker, cfgData, dataCache, taskData, targets):
142         self.reset_runqueue()
143         self.cooker = cooker
144         self.dataCache = dataCache
145         self.taskData = taskData
146         self.targets = targets
147
148         self.cfgdata = cfgData
149         self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1)
150         self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split()
151         self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed"
152         self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, 1) or "perfile"
153         self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or ""
154
155     def reset_runqueue(self):
156
157         self.runq_fnid = []
158         self.runq_task = []
159         self.runq_depends = []
160         self.runq_revdeps = []
161
162     def get_user_idstring(self, task):
163         fn = self.taskData.fn_index[self.runq_fnid[task]]
164         taskname = self.runq_task[task]
165         return "%s, %s" % (fn, taskname)
166
167     def circular_depchains_handler(self, tasks):
168         """
169         Some tasks aren't buildable, likely due to circular dependency issues.
170         Identify the circular dependencies and print them in a user readable format.
171         """
172         from copy import deepcopy
173
174         valid_chains = []
175         explored_deps = {}
176         msgs = []
177
178         def chain_reorder(chain):
179             """
180             Reorder a dependency chain so the lowest task id is first
181             """
182             lowest = 0
183             new_chain = []
184             for entry in range(len(chain)):
185                 if chain[entry] < chain[lowest]:
186                     lowest = entry
187             new_chain.extend(chain[lowest:])
188             new_chain.extend(chain[:lowest])
189             return new_chain
190
191         def chain_compare_equal(chain1, chain2):
192             """
193             Compare two dependency chains and see if they're the same
194             """
195             if len(chain1) != len(chain2):
196                 return False
197             for index in range(len(chain1)):
198                 if chain1[index] != chain2[index]:
199                     return False
200             return True
201             
202         def chain_array_contains(chain, chain_array):
203             """
204             Return True if chain_array contains chain
205             """
206             for ch in chain_array:
207                 if chain_compare_equal(ch, chain):
208                     return True
209             return False
210
211         def find_chains(taskid, prev_chain):
212             prev_chain.append(taskid)
213             total_deps = []
214             total_deps.extend(self.runq_revdeps[taskid])
215             for revdep in self.runq_revdeps[taskid]:
216                 if revdep in prev_chain:
217                     idx = prev_chain.index(revdep)
218                     # To prevent duplicates, reorder the chain to start with the lowest taskid
219                     # and search through an array of those we've already printed
220                     chain = prev_chain[idx:]
221                     new_chain = chain_reorder(chain)
222                     if not chain_array_contains(new_chain, valid_chains):
223                         valid_chains.append(new_chain)
224                         msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
225                         for dep in new_chain:
226                             msgs.append("  Task %s (%s) (depends: %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends[dep]))
227                         msgs.append("\n")
228                     if len(valid_chains) > 10:
229                         msgs.append("Aborted dependency loops search after 10 matches.\n")
230                         return msgs
231                     continue
232                 scan = False
233                 if revdep not in explored_deps:
234                     scan = True
235                 elif revdep in explored_deps[revdep]:
236                     scan = True
237                 else:
238                     for dep in prev_chain:
239                         if dep in explored_deps[revdep]:
240                             scan = True
241                 if scan:
242                     find_chains(revdep, deepcopy(prev_chain))
243                 for dep in explored_deps[revdep]:
244                     if dep not in total_deps:
245                         total_deps.append(dep)
246
247             explored_deps[taskid] = total_deps
248
249         for task in tasks:
250             find_chains(task, [])
251
252         return msgs
253
254     def calculate_task_weights(self, endpoints):
255         """
256         Calculate a number representing the "weight" of each task. Heavier weighted tasks 
257         have more dependencies and hence should be executed sooner for maximum speed.
258
259         This function also sanity checks the task list finding tasks that its not
260         possible to execute due to circular dependencies.
261         """
262
263         numTasks = len(self.runq_fnid)
264         weight = []
265         deps_left = []
266         task_done = []
267
268         for listid in range(numTasks):
269             task_done.append(False)
270             weight.append(0)
271             deps_left.append(len(self.runq_revdeps[listid]))
272
273         for listid in endpoints:
274             weight[listid] = 1
275             task_done[listid] = True
276
277         while 1:
278             next_points = []
279             for listid in endpoints:
280                 for revdep in self.runq_depends[listid]:
281                     weight[revdep] = weight[revdep] + weight[listid]
282                     deps_left[revdep] = deps_left[revdep] - 1
283                     if deps_left[revdep] == 0:
284                         next_points.append(revdep)
285                         task_done[revdep] = True
286             endpoints = next_points
287             if len(next_points) == 0:
288                 break      
289
290         # Circular dependency sanity check
291         problem_tasks = []
292         for task in range(numTasks):
293             if task_done[task] is False or deps_left[task] != 0:
294                 problem_tasks.append(task)
295                 bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s) is not buildable\n" % (task, self.get_user_idstring(task)))
296                 bb.msg.debug(2, bb.msg.domain.RunQueue, "(Complete marker was %s and the remaining dependency count was %s)\n\n" % (task_done[task], deps_left[task]))
297
298         if problem_tasks:
299             message = "Unbuildable tasks were found.\n"
300             message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
301             message = message + "Identifying dependency loops (this may take a short while)...\n"
302             bb.msg.error(bb.msg.domain.RunQueue, message)
303
304             msgs = self.circular_depchains_handler(problem_tasks)
305
306             message = "\n"
307             for msg in msgs:
308                 message = message + msg
309             bb.msg.fatal(bb.msg.domain.RunQueue, message)
310
311         return weight
312
313     def prepare_runqueue(self):
314         """
315         Turn a set of taskData into a RunQueue and compute data needed 
316         to optimise the execution order.
317         """
318
319         depends = []
320         runq_build = []
321         recursive_tdepends = {}
322
323         taskData = self.taskData
324
325         if len(taskData.tasks_name) == 0:
326             # Nothing to do
327             return
328
329         bb.msg.note(1, bb.msg.domain.RunQueue, "Preparing runqueue")
330
331         # Step A - Work out a list of tasks to run
332         #
333         # Taskdata gives us a list of possible providers for a every target 
334         # ordered by priority (build_targets, run_targets). It also gives
335         # information on each of those providers.
336         #
337         # To create the actual list of tasks to execute we fix the list of 
338         # providers and then resolve the dependencies into task IDs. This 
339         # process is repeated for each type of dependency (tdepends, deptask, 
340         # rdeptast, recrdeptask, idepends).
341
342         for task in range(len(taskData.tasks_name)):
343             fnid = taskData.tasks_fnid[task]
344             fn = taskData.fn_index[fnid]
345             task_deps = self.dataCache.task_deps[fn]
346
347             if fnid not in taskData.failed_fnids:
348
349                 # Resolve task internal dependencies 
350                 #
351                 # e.g. addtask before X after Y
352                 depends = taskData.tasks_tdepends[task]
353
354                 # Resolve 'deptask' dependencies 
355                 #
356                 # e.g. do_sometask[deptask] = "do_someothertask"
357                 # (makes sure sometask runs after someothertask of all DEPENDS)
358                 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
359                     tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
360                     for depid in taskData.depids[fnid]:
361                         # Won't be in build_targets if ASSUME_PROVIDED
362                         if depid in taskData.build_targets:
363                             depdata = taskData.build_targets[depid][0]
364                             if depdata is not None:
365                                 dep = taskData.fn_index[depdata]
366                                 for taskname in tasknames:
367                                     depends.append(taskData.gettask_id(dep, taskname))
368
369                 # Resolve 'rdeptask' dependencies 
370                 #
371                 # e.g. do_sometask[rdeptask] = "do_someothertask"
372                 # (makes sure sometask runs after someothertask of all RDEPENDS)
373                 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
374                     taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
375                     for depid in taskData.rdepids[fnid]:
376                         if depid in taskData.run_targets:
377                             depdata = taskData.run_targets[depid][0]
378                             if depdata is not None:
379                                 dep = taskData.fn_index[depdata]
380                                 depends.append(taskData.gettask_id(dep, taskname))
381
382                 # Resolve inter-task dependencies 
383                 #
384                 # e.g. do_sometask[depends] = "targetname:do_someothertask"
385                 # (makes sure sometask runs after targetname's someothertask)
386                 idepends = taskData.tasks_idepends[task]
387                 for (depid, idependtask) in idepends:
388                     if depid in taskData.build_targets:
389                         # Won't be in build_targets if ASSUME_PROVIDED
390                         depdata = taskData.build_targets[depid][0]
391                         if depdata is not None:
392                             dep = taskData.fn_index[depdata]
393                             depends.append(taskData.gettask_id(dep, idependtask))
394
395                 # Create a list of recursive dependent tasks (from tdepends) and cache
396                 def get_recursive_tdepends(task):
397                     if not task:
398                         return []
399                     if task in recursive_tdepends:
400                         return recursive_tdepends[task]
401
402                     fnid = taskData.tasks_fnid[task]
403                     taskids = taskData.gettask_ids(fnid)
404
405                     rectdepends = taskids
406                     nextdeps = taskids
407                     while len(nextdeps) != 0:
408                         newdeps = []
409                         for nextdep in nextdeps:
410                             for tdepend in taskData.tasks_tdepends[nextdep]:
411                                 if tdepend not in rectdepends:
412                                     rectdepends.append(tdepend)
413                                     newdeps.append(tdepend)
414                         nextdeps = newdeps
415                     recursive_tdepends[task] = rectdepends
416                     return rectdepends
417
418                 # Using the list of tdepends for this task create a list of 
419                 # the recursive idepends we have
420                 def get_recursive_idepends(task):
421                     if not task:
422                         return []
423                     rectdepends = get_recursive_tdepends(task)
424
425                     recidepends = []
426                     for tdepend in rectdepends:
427                         for idepend in taskData.tasks_idepends[tdepend]:
428                             recidepends.append(idepend)
429                     return recidepends
430
431                 def add_recursive_build(depid, depfnid):
432                     """
433                     Add build depends of depid to depends
434                     (if we've not see it before)
435                     (calls itself recursively)
436                     """
437                     if str(depid) in dep_seen:
438                         return
439                     dep_seen.append(depid)
440                     if depid in taskData.build_targets:
441                         depdata = taskData.build_targets[depid][0]
442                         if depdata is not None:
443                             dep = taskData.fn_index[depdata]
444                             # Need to avoid creating new tasks here
445                             taskid = taskData.gettask_id(dep, taskname, False)
446                             if taskid is not None:
447                                 depends.append(taskid)
448                                 fnid = taskData.tasks_fnid[taskid]
449                                 #print "Added %s (%s) due to %s" % (taskid, taskData.fn_index[fnid], taskData.fn_index[depfnid])
450                             else:
451                                 fnid = taskData.getfn_id(dep)
452                             for nextdepid in taskData.depids[fnid]:
453                                 if nextdepid not in dep_seen:
454                                     add_recursive_build(nextdepid, fnid)
455                             for nextdepid in taskData.rdepids[fnid]:
456                                 if nextdepid not in rdep_seen:
457                                     add_recursive_run(nextdepid, fnid)
458                             for (idependid, idependtask) in get_recursive_idepends(taskid):
459                                 if idependid not in dep_seen:
460                                     add_recursive_build(idependid, fnid)
461
462                 def add_recursive_run(rdepid, depfnid):
463                     """
464                     Add runtime depends of rdepid to depends
465                     (if we've not see it before)
466                     (calls itself recursively)
467                     """
468                     if str(rdepid) in rdep_seen:
469                         return
470                     rdep_seen.append(rdepid)
471                     if rdepid in taskData.run_targets:
472                         depdata = taskData.run_targets[rdepid][0]
473                         if depdata is not None:
474                             dep = taskData.fn_index[depdata]
475                             # Need to avoid creating new tasks here
476                             taskid = taskData.gettask_id(dep, taskname, False)
477                             if taskid is not None:
478                                 depends.append(taskid)
479                                 fnid = taskData.tasks_fnid[taskid]
480                                 #print "Added %s (%s) due to %s" % (taskid, taskData.fn_index[fnid], taskData.fn_index[depfnid])
481                             else:
482                                 fnid = taskData.getfn_id(dep)
483                             for nextdepid in taskData.depids[fnid]:
484                                 if nextdepid not in dep_seen:
485                                     add_recursive_build(nextdepid, fnid)
486                             for nextdepid in taskData.rdepids[fnid]:
487                                 if nextdepid not in rdep_seen:
488                                     add_recursive_run(nextdepid, fnid)
489                             for (idependid, idependtask) in get_recursive_idepends(taskid):
490                                 if idependid not in dep_seen:
491                                     add_recursive_build(idependid, fnid)
492
493                 # Resolve recursive 'recrdeptask' dependencies 
494                 #
495                 # e.g. do_sometask[recrdeptask] = "do_someothertask"
496                 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
497                 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
498                     for taskname in task_deps['recrdeptask'][taskData.tasks_name[task]].split():
499                         dep_seen = []
500                         rdep_seen = []
501                         idep_seen = []
502                         for depid in taskData.depids[fnid]:
503                             add_recursive_build(depid, fnid)
504                         for rdepid in taskData.rdepids[fnid]:
505                             add_recursive_run(rdepid, fnid)
506                         deptaskid = taskData.gettask_id(fn, taskname, False)
507                         for (idependid, idependtask) in get_recursive_idepends(deptaskid):
508                             add_recursive_build(idependid, fnid)
509
510                 # Rmove all self references
511                 if task in depends:
512                     newdep = []
513                     bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s %s) contains self reference! %s" % (task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends))
514                     for dep in depends:
515                        if task != dep:
516                           newdep.append(dep)
517                     depends = newdep
518
519
520             self.runq_fnid.append(taskData.tasks_fnid[task])
521             self.runq_task.append(taskData.tasks_name[task])
522             self.runq_depends.append(Set(depends))
523             self.runq_revdeps.append(Set())
524
525             runq_build.append(0)
526
527         # Step B - Mark all active tasks
528         #
529         # Start with the tasks we were asked to run and mark all dependencies
530         # as active too. If the task is to be 'forced', clear its stamp. Once
531         # all active tasks are marked, prune the ones we don't need.
532
533         bb.msg.note(2, bb.msg.domain.RunQueue, "Marking Active Tasks")
534
535         def mark_active(listid, depth):
536             """
537             Mark an item as active along with its depends
538             (calls itself recursively)
539             """
540
541             if runq_build[listid] == 1:
542                 return
543
544             runq_build[listid] = 1
545
546             depends = self.runq_depends[listid]
547             for depend in depends:
548                 mark_active(depend, depth+1)
549
550         self.target_pairs = []
551         for target in self.targets:
552             targetid = taskData.getbuild_id(target[0])
553
554             if targetid not in taskData.build_targets:
555                 continue
556
557             if targetid in taskData.failed_deps:
558                 continue
559
560             fnid = taskData.build_targets[targetid][0]
561             fn = taskData.fn_index[fnid]
562             self.target_pairs.append((fn, target[1]))
563
564             # Remove stamps for targets if force mode active
565             if self.cooker.configuration.force:
566                 bb.msg.note(2, bb.msg.domain.RunQueue, "Remove stamp %s, %s" % (target[1], fn))
567                 bb.build.del_stamp(target[1], self.dataCache, fn)
568
569             if fnid in taskData.failed_fnids:
570                 continue
571
572             if target[1] not in taskData.tasks_lookup[fnid]:
573                 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s does not exist for target %s" % (target[1], target[0]))
574
575             listid = taskData.tasks_lookup[fnid][target[1]]
576
577             mark_active(listid, 1)
578
579         # Step C - Prune all inactive tasks
580         #
581         # Once all active tasks are marked, prune the ones we don't need.
582
583         maps = []
584         delcount = 0
585         for listid in range(len(self.runq_fnid)):
586             if runq_build[listid-delcount] == 1:
587                 maps.append(listid-delcount)
588             else:
589                 del self.runq_fnid[listid-delcount]
590                 del self.runq_task[listid-delcount]
591                 del self.runq_depends[listid-delcount]
592                 del runq_build[listid-delcount]
593                 del self.runq_revdeps[listid-delcount]
594                 delcount = delcount + 1
595                 maps.append(-1)
596
597         #
598         # Step D - Sanity checks and computation
599         #
600
601         # Check to make sure we still have tasks to run
602         if len(self.runq_fnid) == 0:
603             if not taskData.abort:
604                 bb.msg.fatal(bb.msg.domain.RunQueue, "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
605             else:               
606                 bb.msg.fatal(bb.msg.domain.RunQueue, "No active tasks and not in --continue mode?! Please report this bug.")
607
608         bb.msg.note(2, bb.msg.domain.RunQueue, "Pruned %s inactive tasks, %s left" % (delcount, len(self.runq_fnid)))
609
610         # Remap the dependencies to account for the deleted tasks
611         # Check we didn't delete a task we depend on
612         for listid in range(len(self.runq_fnid)):
613             newdeps = []
614             origdeps = self.runq_depends[listid]
615             for origdep in origdeps:
616                 if maps[origdep] == -1:
617                     bb.msg.fatal(bb.msg.domain.RunQueue, "Invalid mapping - Should never happen!")
618                 newdeps.append(maps[origdep])
619             self.runq_depends[listid] = Set(newdeps)
620
621         bb.msg.note(2, bb.msg.domain.RunQueue, "Assign Weightings")
622
623         # Generate a list of reverse dependencies to ease future calculations
624         for listid in range(len(self.runq_fnid)):
625             for dep in self.runq_depends[listid]:
626                 self.runq_revdeps[dep].add(listid)
627
628         # Identify tasks at the end of dependency chains
629         # Error on circular dependency loops (length two)
630         endpoints = []
631         for listid in range(len(self.runq_fnid)):
632             revdeps = self.runq_revdeps[listid]
633             if len(revdeps) == 0:
634                 endpoints.append(listid)
635             for dep in revdeps:
636                 if dep in self.runq_depends[listid]:
637                     #self.dump_data(taskData)
638                     bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep] , taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
639
640         bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints))
641
642
643         # Calculate task weights 
644         # Check of higher length circular dependencies
645         self.runq_weight = self.calculate_task_weights(endpoints)
646
647         # Decide what order to execute the tasks in, pick a scheduler
648         #self.sched = RunQueueScheduler(self)
649         if self.scheduler == "completion":
650             self.sched = RunQueueSchedulerCompletion(self)
651         else:
652             self.sched = RunQueueSchedulerSpeed(self)
653
654         # Sanity Check - Check for multiple tasks building the same provider
655         prov_list = {}
656         seen_fn = []
657         for task in range(len(self.runq_fnid)):
658             fn = taskData.fn_index[self.runq_fnid[task]]
659             if fn in seen_fn:
660                 continue
661             seen_fn.append(fn)
662             for prov in self.dataCache.fn_provides[fn]:
663                 if prov not in prov_list:
664                     prov_list[prov] = [fn]
665                 elif fn not in prov_list[prov]: 
666                     prov_list[prov].append(fn)
667         error = False
668         for prov in prov_list:
669             if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
670                 error = True
671                 bb.msg.error(bb.msg.domain.RunQueue, "Multiple .bb files are due to be built which each provide %s (%s).\n This usually means one provides something the other doesn't and should." % (prov, " ".join(prov_list[prov])))
672         #if error:
673         #    bb.msg.fatal(bb.msg.domain.RunQueue, "Corrupted metadata configuration detected, aborting...")
674
675
676         # Create a whitelist usable by the stamp checks
677         stampfnwhitelist = []
678         for entry in self.stampwhitelist.split():
679             entryid = self.taskData.getbuild_id(entry)
680             if entryid not in self.taskData.build_targets:
681                 continue
682             fnid = self.taskData.build_targets[entryid][0]
683             fn = self.taskData.fn_index[fnid]
684             stampfnwhitelist.append(fn)
685         self.stampfnwhitelist = stampfnwhitelist
686
687         #self.dump_data(taskData)
688
689     def check_stamps(self):
690         unchecked = {}
691         current = []
692         notcurrent = []
693         buildable = []
694
695         if self.stamppolicy == "perfile":
696             fulldeptree = False
697         else:
698             fulldeptree = True
699             stampwhitelist = []
700             if self.stamppolicy == "whitelist":
701                 stampwhitelist = self.self.stampfnwhitelist
702
703         for task in range(len(self.runq_fnid)):
704             unchecked[task] = ""
705             if len(self.runq_depends[task]) == 0:
706                 buildable.append(task)
707
708         def check_buildable(self, task, buildable):
709              for revdep in self.runq_revdeps[task]:
710                 alldeps = 1
711                 for dep in self.runq_depends[revdep]:
712                     if dep in unchecked:
713                         alldeps = 0
714                 if alldeps == 1:
715                     if revdep in unchecked:
716                         buildable.append(revdep)
717
718         for task in range(len(self.runq_fnid)):
719             if task not in unchecked:
720                 continue
721             fn = self.taskData.fn_index[self.runq_fnid[task]]
722             taskname = self.runq_task[task]
723             stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname)
724             # If the stamp is missing its not current
725             if not os.access(stampfile, os.F_OK):
726                 del unchecked[task]
727                 notcurrent.append(task)
728                 check_buildable(self, task, buildable)
729                 continue
730             # If its a 'nostamp' task, it's not current
731             taskdep = self.dataCache.task_deps[fn]
732             if 'nostamp' in taskdep and task in taskdep['nostamp']:
733                 del unchecked[task]
734                 notcurrent.append(task)
735                 check_buildable(self, task, buildable)
736                 continue
737
738         while (len(buildable) > 0):
739             nextbuildable = []
740             for task in buildable:
741                 if task in unchecked:
742                     fn = self.taskData.fn_index[self.runq_fnid[task]]
743                     taskname = self.runq_task[task]
744                     stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname)
745                     iscurrent = True
746
747                     t1 = os.stat(stampfile)[stat.ST_MTIME]
748                     for dep in self.runq_depends[task]:
749                         if iscurrent:
750                             fn2 = self.taskData.fn_index[self.runq_fnid[dep]]
751                             taskname2 = self.runq_task[dep]
752                             stampfile2 = "%s.%s" % (self.dataCache.stamp[fn2], taskname2)
753                             if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
754                                 if dep in notcurrent:
755                                     iscurrent = False
756                                 else:
757                                     t2 = os.stat(stampfile2)[stat.ST_MTIME]
758                                     if t1 < t2:
759                                         iscurrent = False
760                     del unchecked[task]
761                     if iscurrent:
762                         current.append(task)
763                     else:
764                         notcurrent.append(task)
765
766                 check_buildable(self, task, nextbuildable)
767
768             buildable = nextbuildable
769
770         #for task in range(len(self.runq_fnid)):
771         #    fn = self.taskData.fn_index[self.runq_fnid[task]]
772         #    taskname = self.runq_task[task]
773         #    print "%s %s.%s" % (task, taskname, fn)
774
775         #print "Unchecked: %s" % unchecked
776         #print "Current: %s" % current
777         #print "Not current: %s" % notcurrent
778
779         if len(unchecked) > 0:
780             bb.fatal("check_stamps fatal internal error")
781         return current
782
783     def check_stamp(self, task):
784
785         if self.stamppolicy == "perfile":
786             fulldeptree = False
787         else:
788             fulldeptree = True
789             stampwhitelist = []
790             if self.stamppolicy == "whitelist":
791                 stampwhitelist = self.stampfnwhitelist
792
793         fn = self.taskData.fn_index[self.runq_fnid[task]]
794         taskname = self.runq_task[task]
795         stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname)
796         # If the stamp is missing its not current
797         if not os.access(stampfile, os.F_OK):
798             return False
799         # If its a 'nostamp' task, it's not current
800         taskdep = self.dataCache.task_deps[fn]
801         if 'nostamp' in taskdep and task in taskdep['nostamp']:
802             return False
803
804         iscurrent = True
805         t1 =  os.stat(stampfile)[stat.ST_MTIME]
806         for dep in self.runq_depends[task]:
807             if iscurrent:
808                 fn2 = self.taskData.fn_index[self.runq_fnid[dep]]
809                 taskname2 = self.runq_task[dep]
810                 stampfile2 = "%s.%s" % (self.dataCache.stamp[fn2], taskname2)
811                 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
812                     try:
813                         t2 = os.stat(stampfile2)[stat.ST_MTIME]
814                         if t1 < t2:
815                             iscurrent = False
816                     except:
817                         iscurrent = False
818
819         return iscurrent
820
821     def execute_runqueue(self):
822         """
823         Run the tasks in a queue prepared by prepare_runqueue
824         Upon failure, optionally try to recover the build using any alternate providers
825         (if the abort on failure configuration option isn't set)
826         """
827
828         failures = 0
829         while 1:
830             failed_fnids = []
831             try:
832                 self.execute_runqueue_internal()
833             finally:
834                 if self.master_process:
835                     failed_fnids = self.finish_runqueue()
836             if len(failed_fnids) == 0:
837                 return failures
838             if self.taskData.abort:
839                 raise bb.runqueue.TaskFailure(failed_fnids)
840             for fnid in failed_fnids:
841                 #print "Failure: %s %s %s" % (fnid, self.taskData.fn_index[fnid],  self.runq_task[fnid])
842                 self.taskData.fail_fnid(fnid)
843                 failures = failures + 1
844             self.reset_runqueue()
845             self.prepare_runqueue()
846
847     def execute_runqueue_initVars(self):
848
849         self.stats = RunQueueStats()
850
851         self.active_builds = 0
852         self.runq_buildable = []
853         self.runq_running = []
854         self.runq_complete = []
855         self.build_pids = {}
856         self.failed_fnids = []
857         self.master_process = True
858
859         # Mark initial buildable tasks
860         for task in range(len(self.runq_fnid)):
861             self.runq_running.append(0)
862             self.runq_complete.append(0)
863             if len(self.runq_depends[task]) == 0:
864                 self.runq_buildable.append(1)
865             else:
866                 self.runq_buildable.append(0)
867
868     def task_complete(self, task):
869         """
870         Mark a task as completed
871         Look at the reverse dependencies and mark any task with 
872         completed dependencies as buildable
873         """
874         self.runq_complete[task] = 1
875         for revdep in self.runq_revdeps[task]:
876             if self.runq_running[revdep] == 1:
877                 continue
878             if self.runq_buildable[revdep] == 1:
879                 continue
880             alldeps = 1
881             for dep in self.runq_depends[revdep]:
882                 if self.runq_complete[dep] != 1:
883                     alldeps = 0
884             if alldeps == 1:
885                 self.runq_buildable[revdep] = 1
886                 fn = self.taskData.fn_index[self.runq_fnid[revdep]]
887                 taskname = self.runq_task[revdep]
888                 bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname))
889
890     def execute_runqueue_internal(self):
891         """
892         Run the tasks in a queue prepared by prepare_runqueue
893         """
894
895         bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue")
896
897         self.execute_runqueue_initVars()
898
899         if len(self.runq_fnid) == 0:
900             # nothing to do
901             return []
902
903         def sigint_handler(signum, frame):
904             raise KeyboardInterrupt
905
906         event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp, self.cfgdata))
907
908         while True:
909             task = self.sched.next()
910             if task is not None:
911                 fn = self.taskData.fn_index[self.runq_fnid[task]]
912
913                 taskname = self.runq_task[task]
914                 if self.check_stamp(task):
915                     bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task)))
916                     self.runq_running[task] = 1
917                     self.task_complete(task)
918                     self.stats.taskCompleted()
919                     self.stats.taskSkipped()
920                     continue
921
922                 bb.msg.note(1, bb.msg.domain.RunQueue, "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.active_builds + 1, len(self.runq_fnid), task, self.get_user_idstring(task)))
923                 sys.stdout.flush()
924                 sys.stderr.flush()
925                 try: 
926                     pid = os.fork() 
927                 except OSError, e: 
928                     bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
929                 if pid == 0:
930                     # Bypass master process' handling
931                     self.master_process = False
932                     # Stop Ctrl+C being sent to children
933                     # signal.signal(signal.SIGINT, signal.SIG_IGN)
934                     # Make the child the process group leader
935                     os.setpgid(0, 0)
936                     newsi = os.open('/dev/null', os.O_RDWR)
937                     os.dup2(newsi, sys.stdin.fileno())
938                     self.cooker.configuration.cmd = taskname[3:]
939                     try: 
940                         self.cooker.tryBuild(fn)
941                     except bb.build.EventException:
942                         bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
943                         sys.exit(1)
944                     except:
945                         bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
946                         raise
947                     sys.exit(0)
948                 self.build_pids[pid] = task
949                 self.runq_running[task] = 1
950                 self.active_builds = self.active_builds + 1
951                 if self.active_builds < self.number_tasks:
952                     continue
953             if self.active_builds > 0:
954                 result = os.waitpid(-1, 0)
955                 self.active_builds = self.active_builds - 1
956                 task = self.build_pids[result[0]]
957                 if result[1] != 0:
958                     del self.build_pids[result[0]]
959                     bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task)))
960                     self.failed_fnids.append(self.runq_fnid[task])
961                     self.stats.taskFailed()
962                     break
963                 self.task_complete(task)
964                 self.stats.taskCompleted()
965                 del self.build_pids[result[0]]
966                 continue
967             return
968
969     def finish_runqueue(self):
970         try:
971             while self.active_builds > 0:
972                 bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.active_builds)
973                 tasknum = 1
974                 for k, v in self.build_pids.iteritems():
975                      bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v), k))
976                      tasknum = tasknum + 1
977                 result = os.waitpid(-1, 0)
978                 task = self.build_pids[result[0]]
979                 if result[1] != 0:
980                      bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task)))
981                      self.failed_fnids.append(self.runq_fnid[task])
982                      self.stats.taskFailed()
983                 del self.build_pids[result[0]]
984                 self.active_builds = self.active_builds - 1
985             bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
986             return self.failed_fnids
987         except KeyboardInterrupt:
988             bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.active_builds)
989             for k, v in self.build_pids.iteritems():
990                  try:
991                      os.kill(-k, signal.SIGINT)
992                  except:
993                      pass
994             raise
995
996         # Sanity Checks
997         for task in range(len(self.runq_fnid)):
998             if self.runq_buildable[task] == 0:
999                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task)
1000             if self.runq_running[task] == 0:
1001                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task)
1002             if self.runq_complete[task] == 0:
1003                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task)
1004
1005         bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
1006
1007         return self.failed_fnids
1008
1009     def dump_data(self, taskQueue):
1010         """
1011         Dump some debug information on the internal data structures
1012         """
1013         bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:")
1014         for task in range(len(self.runq_fnid)):
1015                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
1016                         taskQueue.fn_index[self.runq_fnid[task]], 
1017                         self.runq_task[task], 
1018                         self.runq_weight[task], 
1019                         self.runq_depends[task], 
1020                         self.runq_revdeps[task]))
1021
1022         bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:")
1023         for task1 in range(len(self.runq_fnid)):
1024             if task1 in self.prio_map:
1025                 task = self.prio_map[task1]
1026                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
1027                         taskQueue.fn_index[self.runq_fnid[task]], 
1028                         self.runq_task[task], 
1029                         self.runq_weight[task], 
1030                         self.runq_depends[task], 
1031                         self.runq_revdeps[task]))