runqueue.py: Add BB_SCHEDULER and BB_STAMP_POLICY variables
[bitbake.git] / lib / bb / runqueue.py
1 #!/usr/bin/env python
2 # ex:ts=4:sw=4:sts=4:et
3 # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4 """
5 BitBake 'RunQueue' implementation
6
7 Handles preparation and execution of a queue of tasks
8 """
9
10 # Copyright (C) 2006-2007  Richard Purdie
11 #
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License version 2 as
14 # published by the Free Software Foundation.
15 #
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 # GNU General Public License for more details.
20 #
21 # You should have received a copy of the GNU General Public License along
22 # with this program; if not, write to the Free Software Foundation, Inc.,
23 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25 from bb import msg, data, event, mkdirhier, utils
26 from sets import Set 
27 import bb, os, sys
28 import signal
29 import stat
30
31 class TaskFailure(Exception):
32     """Exception raised when a task in a runqueue fails"""
33     def __init__(self, x): 
34         self.args = x
35
36
37 class RunQueueStats:
38     """
39     Holds statistics on the tasks handled by the associated runQueue
40     """
41     def __init__(self):
42         self.completed = 0
43         self.skipped = 0
44         self.failed = 0
45
46     def taskFailed(self):
47         self.failed = self.failed + 1
48
49     def taskCompleted(self, number = 1):
50         self.completed = self.completed + number
51
52     def taskSkipped(self, number = 1):
53         self.skipped = self.skipped + number
54
55 class RunQueueScheduler:
56     """
57     Control the order tasks are scheduled in.
58     """
59     def __init__(self, runqueue):
60         """
61         The default scheduler just returns the first buildable task (the 
62         priority map is sorted by task numer)
63         """
64         self.rq = runqueue
65         numTasks = len(self.rq.runq_fnid)
66
67         self.prio_map = []
68         self.prio_map.extend(range(numTasks))
69
70     def next(self):
71         """
72         Return the id of the first task we find that is buildable
73         """
74         for task1 in range(len(self.rq.runq_fnid)):
75             task = self.prio_map[task1]
76             if self.rq.runq_running[task] == 1:
77                 continue
78             if self.rq.runq_buildable[task] == 1:
79                 return task
80
81 class RunQueueSchedulerSpeed(RunQueueScheduler):
82     """
83     A scheduler optimised for speed. The priority map is sorted by task weight,
84     heavier weighted tasks (tasks needed by the most other tasks) are run first.
85     """
86     def __init__(self, runqueue):
87         """
88         The priority map is sorted by task weight.
89         """
90         from copy import deepcopy
91
92         self.rq = runqueue
93
94         sortweight = deepcopy(self.rq.runq_weight)
95         sortweight.sort()
96         copyweight = deepcopy(self.rq.runq_weight)
97         self.prio_map = []
98
99         for weight in sortweight:
100             idx = copyweight.index(weight)
101             self.prio_map.append(idx)
102             copyweight[idx] = -1
103
104         self.prio_map.reverse()
105
106 class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
107     """
108     A scheduler optimised to complete .bb files are quickly as possible. The 
109     priority map is sorted by task weight, but then reordered so once a given 
110     .bb file starts to build, its completed as quickly as possible. This works
111     well where disk space is at a premium and classes like OE's rm_work are in 
112     force.
113     """
114     def __init__(self, runqueue):
115         RunQueueSchedulerSpeed.__init__(self, runqueue)
116         from copy import deepcopy
117
118         #FIXME - whilst this groups all fnids together it does not reorder the
119         #fnid groups optimally.
120  
121         basemap = deepcopy(self.prio_map)
122         self.prio_map = []
123         while (len(basemap) > 0):
124             entry = basemap.pop(0)
125             self.prio_map.append(entry)
126             fnid = self.rq.runq_fnid[entry]
127             todel = []
128             for entry in basemap:
129                 entry_fnid = self.rq.runq_fnid[entry]
130                 if entry_fnid == fnid:
131                     todel.append(basemap.index(entry))
132                     self.prio_map.append(entry)
133             todel.reverse()
134             for idx in todel:
135                 del basemap[idx]
136
137 class RunQueue:
138     """
139     BitBake Run Queue implementation
140     """
141     def __init__(self, cooker, cfgData, dataCache, taskData, targets):
142         self.reset_runqueue()
143         self.cooker = cooker
144         self.dataCache = dataCache
145         self.taskData = taskData
146         self.targets = targets
147
148         self.cfgdata = cfgData
149         self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1)
150         self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split()
151         self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed"
152         self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, 1) or "perfile"
153
154     def reset_runqueue(self):
155
156         self.runq_fnid = []
157         self.runq_task = []
158         self.runq_depends = []
159         self.runq_revdeps = []
160
161     def get_user_idstring(self, task):
162         fn = self.taskData.fn_index[self.runq_fnid[task]]
163         taskname = self.runq_task[task]
164         return "%s, %s" % (fn, taskname)
165
166     def circular_depchains_handler(self, tasks):
167         """
168         Some tasks aren't buildable, likely due to circular dependency issues.
169         Identify the circular dependencies and print them in a user readable format.
170         """
171         from copy import deepcopy
172
173         valid_chains = []
174         explored_deps = {}
175         msgs = []
176
177         def chain_reorder(chain):
178             """
179             Reorder a dependency chain so the lowest task id is first
180             """
181             lowest = 0
182             new_chain = []
183             for entry in range(len(chain)):
184                 if chain[entry] < chain[lowest]:
185                     lowest = entry
186             new_chain.extend(chain[lowest:])
187             new_chain.extend(chain[:lowest])
188             return new_chain
189
190         def chain_compare_equal(chain1, chain2):
191             """
192             Compare two dependency chains and see if they're the same
193             """
194             if len(chain1) != len(chain2):
195                 return False
196             for index in range(len(chain1)):
197                 if chain1[index] != chain2[index]:
198                     return False
199             return True
200             
201         def chain_array_contains(chain, chain_array):
202             """
203             Return True if chain_array contains chain
204             """
205             for ch in chain_array:
206                 if chain_compare_equal(ch, chain):
207                     return True
208             return False
209
210         def find_chains(taskid, prev_chain):
211             prev_chain.append(taskid)
212             total_deps = []
213             total_deps.extend(self.runq_revdeps[taskid])
214             for revdep in self.runq_revdeps[taskid]:
215                 if revdep in prev_chain:
216                     idx = prev_chain.index(revdep)
217                     # To prevent duplicates, reorder the chain to start with the lowest taskid
218                     # and search through an array of those we've already printed
219                     chain = prev_chain[idx:]
220                     new_chain = chain_reorder(chain)
221                     if not chain_array_contains(new_chain, valid_chains):
222                         valid_chains.append(new_chain)
223                         msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
224                         for dep in new_chain:
225                             msgs.append("  Task %s (%s) (depends: %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends[dep]))
226                         msgs.append("\n")
227                     if len(valid_chains) > 10:
228                         msgs.append("Aborted dependency loops search after 10 matches.\n")
229                         return msgs
230                     continue
231                 scan = False
232                 if revdep not in explored_deps:
233                     scan = True
234                 elif revdep in explored_deps[revdep]:
235                     scan = True
236                 else:
237                     for dep in prev_chain:
238                         if dep in explored_deps[revdep]:
239                             scan = True
240                 if scan:
241                     find_chains(revdep, deepcopy(prev_chain))
242                 for dep in explored_deps[revdep]:
243                     if dep not in total_deps:
244                         total_deps.append(dep)
245
246             explored_deps[taskid] = total_deps
247
248         for task in tasks:
249             find_chains(task, [])
250
251         return msgs
252
253     def calculate_task_weights(self, endpoints):
254         """
255         Calculate a number representing the "weight" of each task. Heavier weighted tasks 
256         have more dependencies and hence should be executed sooner for maximum speed.
257
258         This function also sanity checks the task list finding tasks that its not
259         possible to execute due to circular dependencies.
260         """
261
262         numTasks = len(self.runq_fnid)
263         weight = []
264         deps_left = []
265         task_done = []
266
267         for listid in range(numTasks):
268             task_done.append(False)
269             weight.append(0)
270             deps_left.append(len(self.runq_revdeps[listid]))
271
272         for listid in endpoints:
273             weight[listid] = 1
274             task_done[listid] = True
275
276         while 1:
277             next_points = []
278             for listid in endpoints:
279                 for revdep in self.runq_depends[listid]:
280                     weight[revdep] = weight[revdep] + weight[listid]
281                     deps_left[revdep] = deps_left[revdep] - 1
282                     if deps_left[revdep] == 0:
283                         next_points.append(revdep)
284                         task_done[revdep] = True
285             endpoints = next_points
286             if len(next_points) == 0:
287                 break      
288
289         # Circular dependency sanity check
290         problem_tasks = []
291         for task in range(numTasks):
292             if task_done[task] is False or deps_left[task] != 0:
293                 problem_tasks.append(task)
294                 bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s) is not buildable\n" % (task, self.get_user_idstring(task)))
295                 bb.msg.debug(2, bb.msg.domain.RunQueue, "(Complete marker was %s and the remaining dependency count was %s)\n\n" % (task_done[task], deps_left[task]))
296
297         if problem_tasks:
298             message = "Unbuildable tasks were found.\n"
299             message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
300             message = message + "Identifying dependency loops (this may take a short while)...\n"
301             bb.msg.error(bb.msg.domain.RunQueue, message)
302
303             msgs = self.circular_depchains_handler(problem_tasks)
304
305             message = "\n"
306             for msg in msgs:
307                 message = message + msg
308             bb.msg.fatal(bb.msg.domain.RunQueue, message)
309
310         return weight
311
312     def prepare_runqueue(self):
313         """
314         Turn a set of taskData into a RunQueue and compute data needed 
315         to optimise the execution order.
316         """
317
318         depends = []
319         runq_build = []
320
321         taskData = self.taskData
322
323         if len(taskData.tasks_name) == 0:
324             # Nothing to do
325             return
326
327         bb.msg.note(1, bb.msg.domain.RunQueue, "Preparing runqueue")
328
329         # Step A - Work out a list of tasks to run
330         #
331         # Taskdata gives us a list of possible providers for a every target 
332         # ordered by priority (build_targets, run_targets). It also gives
333         # information on each of those providers.
334         #
335         # To create the actual list of tasks to execute we fix the list of 
336         # providers and then resolve the dependencies into task IDs. This 
337         # process is repeated for each type of dependency (tdepends, deptask, 
338         # rdeptast, recrdeptask, idepends).
339
340         for task in range(len(taskData.tasks_name)):
341             fnid = taskData.tasks_fnid[task]
342             fn = taskData.fn_index[fnid]
343             task_deps = self.dataCache.task_deps[fn]
344
345             if fnid not in taskData.failed_fnids:
346
347                 # Resolve task internal dependencies 
348                 #
349                 # e.g. addtask before X after Y
350                 depends = taskData.tasks_tdepends[task]
351
352                 # Resolve 'deptask' dependencies 
353                 #
354                 # e.g. do_sometask[deptask] = "do_someothertask"
355                 # (makes sure sometask runs after someothertask of all DEPENDS)
356                 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
357                     tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
358                     for depid in taskData.depids[fnid]:
359                         # Won't be in build_targets if ASSUME_PROVIDED
360                         if depid in taskData.build_targets:
361                             depdata = taskData.build_targets[depid][0]
362                             if depdata is not None:
363                                 dep = taskData.fn_index[depdata]
364                                 for taskname in tasknames:
365                                     depends.append(taskData.gettask_id(dep, taskname))
366
367                 # Resolve 'rdeptask' dependencies 
368                 #
369                 # e.g. do_sometask[rdeptask] = "do_someothertask"
370                 # (makes sure sometask runs after someothertask of all RDEPENDS)
371                 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
372                     taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
373                     for depid in taskData.rdepids[fnid]:
374                         if depid in taskData.run_targets:
375                             depdata = taskData.run_targets[depid][0]
376                             if depdata is not None:
377                                 dep = taskData.fn_index[depdata]
378                                 depends.append(taskData.gettask_id(dep, taskname))
379
380                 # Resolve inter-task dependencies 
381                 #
382                 # e.g. do_sometask[depends] = "targetname:do_someothertask"
383                 # (makes sure sometask runs after targetname's someothertask)
384                 idepends = taskData.tasks_idepends[task]
385                 for idepend in idepends:
386                     depid = int(idepend.split(":")[0])
387                     if depid in taskData.build_targets:
388                         # Won't be in build_targets if ASSUME_PROVIDED
389                         depdata = taskData.build_targets[depid][0]
390                         if depdata is not None:
391                             dep = taskData.fn_index[depdata]
392                             depends.append(taskData.gettask_id(dep, idepend.split(":")[1]))
393
394                 def add_recursive_build(depid, depfnid):
395                     """
396                     Add build depends of depid to depends
397                     (if we've not see it before)
398                     (calls itself recursively)
399                     """
400                     if str(depid) in dep_seen:
401                         return
402                     dep_seen.append(depid)
403                     if depid in taskData.build_targets:
404                         depdata = taskData.build_targets[depid][0]
405                         if depdata is not None:
406                             dep = taskData.fn_index[depdata]
407                             idepends = []
408                             # Need to avoid creating new tasks here
409                             taskid = taskData.gettask_id(dep, taskname, False)
410                             if taskid is not None:
411                                 depends.append(taskid)
412                                 fnid = taskData.tasks_fnid[taskid]
413                                 idepends = taskData.tasks_idepends[taskid]
414                                 #print "Added %s (%s) due to %s" % (taskid, taskData.fn_index[fnid], taskData.fn_index[depfnid])
415                             else:
416                                 fnid = taskData.getfn_id(dep)
417                             for nextdepid in taskData.depids[fnid]:
418                                 if nextdepid not in dep_seen:
419                                     add_recursive_build(nextdepid, fnid)
420                             for nextdepid in taskData.rdepids[fnid]:
421                                 if nextdepid not in rdep_seen:
422                                     add_recursive_run(nextdepid, fnid)
423                             for idepend in idepends:
424                                 nextdepid = int(idepend.split(":")[0])
425                                 if nextdepid not in dep_seen:
426                                     add_recursive_build(nextdepid, fnid)
427
428                 def add_recursive_run(rdepid, depfnid):
429                     """
430                     Add runtime depends of rdepid to depends
431                     (if we've not see it before)
432                     (calls itself recursively)
433                     """
434                     if str(rdepid) in rdep_seen:
435                         return
436                     rdep_seen.append(rdepid)
437                     if rdepid in taskData.run_targets:
438                         depdata = taskData.run_targets[rdepid][0]
439                         if depdata is not None:
440                             dep = taskData.fn_index[depdata]
441                             idepends = []
442                             # Need to avoid creating new tasks here
443                             taskid = taskData.gettask_id(dep, taskname, False)
444                             if taskid is not None:
445                                 depends.append(taskid)
446                                 fnid = taskData.tasks_fnid[taskid]
447                                 idepends = taskData.tasks_idepends[taskid]
448                                 #print "Added %s (%s) due to %s" % (taskid, taskData.fn_index[fnid], taskData.fn_index[depfnid])
449                             else:
450                                 fnid = taskData.getfn_id(dep)
451                             for nextdepid in taskData.depids[fnid]:
452                                 if nextdepid not in dep_seen:
453                                     add_recursive_build(nextdepid, fnid)
454                             for nextdepid in taskData.rdepids[fnid]:
455                                 if nextdepid not in rdep_seen:
456                                     add_recursive_run(nextdepid, fnid)
457                             for idepend in idepends:
458                                 nextdepid = int(idepend.split(":")[0])
459                                 if nextdepid not in dep_seen:
460                                     add_recursive_build(nextdepid, fnid)
461
462                 # Resolve recursive 'recrdeptask' dependencies 
463                 #
464                 # e.g. do_sometask[recrdeptask] = "do_someothertask"
465                 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
466                 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
467                     for taskname in task_deps['recrdeptask'][taskData.tasks_name[task]].split():
468                         dep_seen = []
469                         rdep_seen = []
470                         idep_seen = []
471                         for depid in taskData.depids[fnid]:
472                             add_recursive_build(depid, fnid)
473                         for rdepid in taskData.rdepids[fnid]:
474                             add_recursive_run(rdepid, fnid)
475                         for idepend in idepends:
476                             depid = int(idepend.split(":")[0])
477                             add_recursive_build(depid, fnid)
478
479                 # Rmove all self references
480                 if task in depends:
481                     newdep = []
482                     bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s %s) contains self reference! %s" % (task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends))
483                     for dep in depends:
484                        if task != dep:
485                           newdep.append(dep)
486                     depends = newdep
487
488
489             self.runq_fnid.append(taskData.tasks_fnid[task])
490             self.runq_task.append(taskData.tasks_name[task])
491             self.runq_depends.append(Set(depends))
492             self.runq_revdeps.append(Set())
493
494             runq_build.append(0)
495
496         # Step B - Mark all active tasks
497         #
498         # Start with the tasks we were asked to run and mark all dependencies
499         # as active too. If the task is to be 'forced', clear its stamp. Once
500         # all active tasks are marked, prune the ones we don't need.
501
502         bb.msg.note(2, bb.msg.domain.RunQueue, "Marking Active Tasks")
503
504         def mark_active(listid, depth):
505             """
506             Mark an item as active along with its depends
507             (calls itself recursively)
508             """
509
510             if runq_build[listid] == 1:
511                 return
512
513             runq_build[listid] = 1
514
515             depends = self.runq_depends[listid]
516             for depend in depends:
517                 mark_active(depend, depth+1)
518
519         self.target_pairs = []
520         for target in self.targets:
521             targetid = taskData.getbuild_id(target[0])
522
523             if targetid not in taskData.build_targets:
524                 continue
525
526             if targetid in taskData.failed_deps:
527                 continue
528
529             fnid = taskData.build_targets[targetid][0]
530             fn = taskData.fn_index[fnid]
531             self.target_pairs.append((fn, target[1]))
532
533             # Remove stamps for targets if force mode active
534             if self.cooker.configuration.force:
535                 bb.msg.note(2, bb.msg.domain.RunQueue, "Remove stamp %s, %s" % (target[1], fn))
536                 bb.build.del_stamp(target[1], self.dataCache, fn)
537
538             if fnid in taskData.failed_fnids:
539                 continue
540
541             if target[1] not in taskData.tasks_lookup[fnid]:
542                 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s does not exist for target %s" % (target[1], target[0]))
543
544             listid = taskData.tasks_lookup[fnid][target[1]]
545
546             mark_active(listid, 1)
547
548         # Step C - Prune all inactive tasks
549         #
550         # Once all active tasks are marked, prune the ones we don't need.
551
552         maps = []
553         delcount = 0
554         for listid in range(len(self.runq_fnid)):
555             if runq_build[listid-delcount] == 1:
556                 maps.append(listid-delcount)
557             else:
558                 del self.runq_fnid[listid-delcount]
559                 del self.runq_task[listid-delcount]
560                 del self.runq_depends[listid-delcount]
561                 del runq_build[listid-delcount]
562                 del self.runq_revdeps[listid-delcount]
563                 delcount = delcount + 1
564                 maps.append(-1)
565
566         #
567         # Step D - Sanity checks and computation
568         #
569
570         # Check to make sure we still have tasks to run
571         if len(self.runq_fnid) == 0:
572             if not taskData.abort:
573                 bb.msg.fatal(bb.msg.domain.RunQueue, "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
574             else:               
575                 bb.msg.fatal(bb.msg.domain.RunQueue, "No active tasks and not in --continue mode?! Please report this bug.")
576
577         bb.msg.note(2, bb.msg.domain.RunQueue, "Pruned %s inactive tasks, %s left" % (delcount, len(self.runq_fnid)))
578
579         # Remap the dependencies to account for the deleted tasks
580         # Check we didn't delete a task we depend on
581         for listid in range(len(self.runq_fnid)):
582             newdeps = []
583             origdeps = self.runq_depends[listid]
584             for origdep in origdeps:
585                 if maps[origdep] == -1:
586                     bb.msg.fatal(bb.msg.domain.RunQueue, "Invalid mapping - Should never happen!")
587                 newdeps.append(maps[origdep])
588             self.runq_depends[listid] = Set(newdeps)
589
590         bb.msg.note(2, bb.msg.domain.RunQueue, "Assign Weightings")
591
592         # Generate a list of reverse dependencies to ease future calculations
593         for listid in range(len(self.runq_fnid)):
594             for dep in self.runq_depends[listid]:
595                 self.runq_revdeps[dep].add(listid)
596
597         # Identify tasks at the end of dependency chains
598         # Error on circular dependency loops (length two)
599         endpoints = []
600         for listid in range(len(self.runq_fnid)):
601             revdeps = self.runq_revdeps[listid]
602             if len(revdeps) == 0:
603                 endpoints.append(listid)
604             for dep in revdeps:
605                 if dep in self.runq_depends[listid]:
606                     #self.dump_data(taskData)
607                     bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep] , taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
608
609         bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints))
610
611
612         # Calculate task weights 
613         # Check of higher length circular dependencies
614         self.runq_weight = self.calculate_task_weights(endpoints)
615
616         # Decide what order to execute the tasks in, pick a scheduler
617         #self.sched = RunQueueScheduler(self)
618         if self.scheduler == "completion":
619             self.sched = RunQueueSchedulerCompletion(self)
620         else:
621             self.sched = RunQueueSchedulerSpeed(self)
622
623         # Sanity Check - Check for multiple tasks building the same provider
624         prov_list = {}
625         seen_fn = []
626         for task in range(len(self.runq_fnid)):
627             fn = taskData.fn_index[self.runq_fnid[task]]
628             if fn in seen_fn:
629                 continue
630             seen_fn.append(fn)
631             for prov in self.dataCache.fn_provides[fn]:
632                 if prov not in prov_list:
633                     prov_list[prov] = [fn]
634                 elif fn not in prov_list[prov]: 
635                     prov_list[prov].append(fn)
636         error = False
637         for prov in prov_list:
638             if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
639                 error = True
640                 bb.msg.error(bb.msg.domain.RunQueue, "Multiple .bb files are due to be built which each provide %s (%s).\n This usually means one provides something the other doesn't and should." % (prov, " ".join(prov_list[prov])))
641         #if error:
642         #    bb.msg.fatal(bb.msg.domain.RunQueue, "Corrupted metadata configuration detected, aborting...")
643
644         #self.dump_data(taskData)
645
646     def check_stamps(self):
647         unchecked = {}
648         current = []
649         notcurrent = []
650         buildable = []
651
652         if self.stamppolicy == "perfile":
653             fulldeptree = False
654         else:
655             fulldeptree = True
656
657         for task in range(len(self.runq_fnid)):
658             unchecked[task] = ""
659             if len(self.runq_depends[task]) == 0:
660                 buildable.append(task)
661
662         for task in range(len(self.runq_fnid)):
663             if task not in unchecked:
664                 continue
665             fn = self.taskData.fn_index[self.runq_fnid[task]]
666             taskname = self.runq_task[task]
667             stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname)
668             # If the stamp is missing its not current
669             if not os.access(stampfile, os.F_OK):
670                 del unchecked[task]
671                 notcurrent.append(task)
672                 continue
673             # If its a 'nostamp' task, it's not current
674             taskdep = self.dataCache.task_deps[fn]
675             if 'nostamp' in taskdep and task in taskdep['nostamp']:
676                 del unchecked[task]
677                 notcurrent.append(task)
678                 continue
679
680         while (len(buildable) > 0):
681             nextbuildable = []
682             for task in buildable:
683                 if task in unchecked:
684                     fn = self.taskData.fn_index[self.runq_fnid[task]]
685                     taskname = self.runq_task[task]
686                     stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname)
687                     iscurrent = True
688
689                     t1 = os.stat(stampfile)[stat.ST_MTIME]
690                     for dep in self.runq_depends[task]:
691                         if iscurrent:
692                             fn2 = self.taskData.fn_index[self.runq_fnid[dep]]
693                             taskname2 = self.runq_task[dep]
694                             stampfile2 = "%s.%s" % (self.dataCache.stamp[fn2], taskname2)
695                             if fulldeptree or fn == fn2:
696                                 if dep in notcurrent:
697                                     iscurrent = False
698                                 else:
699                                     t2 = os.stat(stampfile2)[stat.ST_MTIME]
700                                     if t1 < t2:
701                                         iscurrent = False
702                     del unchecked[task]
703                     if iscurrent:
704                         current.append(task)
705                     else:
706                         notcurrent.append(task)
707
708                 for revdep in self.runq_revdeps[task]:
709                     alldeps = 1
710                     for dep in self.runq_depends[revdep]:
711                         if dep in unchecked:
712                             alldeps = 0
713                     if alldeps == 1:
714                         if revdep in unchecked:
715                             nextbuildable.append(revdep)
716
717             buildable = nextbuildable
718
719         #for task in range(len(self.runq_fnid)):
720         #    fn = self.taskData.fn_index[self.runq_fnid[task]]
721         #    taskname = self.runq_task[task]
722         #    print "%s %s.%s" % (task, taskname, fn)
723
724         #print "Unchecked: %s" % unchecked
725         #print "Current: %s" % current
726         #print "Not current: %s" % notcurrent
727
728         if len(unchecked) > 0:
729             bb.fatal("check_stamps fatal internal error")
730         return current
731
732
733     def execute_runqueue(self):
734         """
735         Run the tasks in a queue prepared by prepare_runqueue
736         Upon failure, optionally try to recover the build using any alternate providers
737         (if the abort on failure configuration option isn't set)
738         """
739
740         failures = 0
741         while 1:
742             failed_fnids = []
743             try:
744                 self.execute_runqueue_internal()
745             finally:
746                 if self.master_process:
747                     failed_fnids = self.finish_runqueue()
748             if len(failed_fnids) == 0:
749                 return failures
750             if self.taskData.abort:
751                 raise bb.runqueue.TaskFailure(failed_fnids)
752             for fnid in failed_fnids:
753                 #print "Failure: %s %s %s" % (fnid, self.taskData.fn_index[fnid],  self.runq_task[fnid])
754                 self.taskData.fail_fnid(fnid)
755                 failures = failures + 1
756             self.reset_runqueue()
757             self.prepare_runqueue()
758
759     def execute_runqueue_initVars(self):
760
761         self.stats = RunQueueStats()
762
763         self.active_builds = 0
764         self.runq_buildable = []
765         self.runq_running = []
766         self.runq_complete = []
767         self.build_pids = {}
768         self.failed_fnids = []
769         self.master_process = True
770
771         # Mark initial buildable tasks
772         for task in range(len(self.runq_fnid)):
773             self.runq_running.append(0)
774             self.runq_complete.append(0)
775             if len(self.runq_depends[task]) == 0:
776                 self.runq_buildable.append(1)
777             else:
778                 self.runq_buildable.append(0)
779
780     def task_complete(self, task):
781         """
782         Mark a task as completed
783         Look at the reverse dependencies and mark any task with 
784         completed dependencies as buildable
785         """
786         self.runq_complete[task] = 1
787         for revdep in self.runq_revdeps[task]:
788             if self.runq_running[revdep] == 1:
789                 continue
790             if self.runq_buildable[revdep] == 1:
791                 continue
792             alldeps = 1
793             for dep in self.runq_depends[revdep]:
794                 if self.runq_complete[dep] != 1:
795                     alldeps = 0
796             if alldeps == 1:
797                 self.runq_buildable[revdep] = 1
798                 fn = self.taskData.fn_index[self.runq_fnid[revdep]]
799                 taskname = self.runq_task[revdep]
800                 bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname))
801
802     def execute_runqueue_internal(self):
803         """
804         Run the tasks in a queue prepared by prepare_runqueue
805         """
806
807         bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue")
808
809         self.execute_runqueue_initVars()
810
811         if len(self.runq_fnid) == 0:
812             # nothing to do
813             return []
814
815         def sigint_handler(signum, frame):
816             raise KeyboardInterrupt
817
818         event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp, self.cfgdata))
819
820         # Find out which tasks have current stamps which we can skip when the
821         # time comes
822         currentstamps = self.check_stamps()
823         self.stats.taskSkipped(len(currentstamps))
824         self.stats.taskCompleted(len(currentstamps))
825
826         while True:
827             task = self.sched.next()
828             if task is not None:
829                 fn = self.taskData.fn_index[self.runq_fnid[task]]
830
831                 taskname = self.runq_task[task]
832                 if task in currentstamps:
833                 #if bb.build.stamp_is_current(taskname, self.dataCache, fn):
834                     bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task)))
835                     self.runq_running[task] = 1
836                     self.task_complete(task)
837                     #self.stats.taskCompleted()
838                     #self.stats.taskSkipped()
839                     continue
840
841                 bb.msg.note(1, bb.msg.domain.RunQueue, "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.active_builds + 1, len(self.runq_fnid), task, self.get_user_idstring(task)))
842                 try: 
843                     pid = os.fork() 
844                 except OSError, e: 
845                     bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
846                 if pid == 0:
847                     # Bypass master process' handling
848                     self.master_process = False
849                     # Stop Ctrl+C being sent to children
850                     # signal.signal(signal.SIGINT, signal.SIG_IGN)
851                     # Make the child the process group leader
852                     os.setpgid(0, 0)
853                     newsi = os.open('/dev/null', os.O_RDWR)
854                     os.dup2(newsi, sys.stdin.fileno())
855                     self.cooker.configuration.cmd = taskname[3:]
856                     try: 
857                         self.cooker.tryBuild(fn)
858                     except bb.build.EventException:
859                         bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
860                         sys.exit(1)
861                     except:
862                         bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
863                         raise
864                     sys.exit(0)
865                 self.build_pids[pid] = task
866                 self.runq_running[task] = 1
867                 self.active_builds = self.active_builds + 1
868                 if self.active_builds < self.number_tasks:
869                     continue
870             if self.active_builds > 0:
871                 result = os.waitpid(-1, 0)
872                 self.active_builds = self.active_builds - 1
873                 task = self.build_pids[result[0]]
874                 if result[1] != 0:
875                     del self.build_pids[result[0]]
876                     bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task)))
877                     self.failed_fnids.append(self.runq_fnid[task])
878                     self.stats.taskFailed()
879                     break
880                 self.task_complete(task)
881                 self.stats.taskCompleted()
882                 del self.build_pids[result[0]]
883                 continue
884             return
885
886     def finish_runqueue(self):
887         try:
888             while self.active_builds > 0:
889                 bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.active_builds)
890                 tasknum = 1
891                 for k, v in self.build_pids.iteritems():
892                      bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v), k))
893                      tasknum = tasknum + 1
894                 result = os.waitpid(-1, 0)
895                 task = self.build_pids[result[0]]
896                 if result[1] != 0:
897                      bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task)))
898                      self.failed_fnids.append(self.runq_fnid[task])
899                      self.stats.taskFailed()
900                 del self.build_pids[result[0]]
901                 self.active_builds = self.active_builds - 1
902             bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
903             return self.failed_fnids
904         except KeyboardInterrupt:
905             bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.active_builds)
906             for k, v in self.build_pids.iteritems():
907                  try:
908                      os.kill(-k, signal.SIGINT)
909                  except:
910                      pass
911             raise
912
913         # Sanity Checks
914         for task in range(len(self.runq_fnid)):
915             if self.runq_buildable[task] == 0:
916                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task)
917             if self.runq_running[task] == 0:
918                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task)
919             if self.runq_complete[task] == 0:
920                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task)
921
922         bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
923
924         return self.failed_fnids
925
926     def dump_data(self, taskQueue):
927         """
928         Dump some debug information on the internal data structures
929         """
930         bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:")
931         for task in range(len(self.runq_fnid)):
932                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
933                         taskQueue.fn_index[self.runq_fnid[task]], 
934                         self.runq_task[task], 
935                         self.runq_weight[task], 
936                         self.runq_depends[task], 
937                         self.runq_revdeps[task]))
938
939         bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:")
940         for task1 in range(len(self.runq_fnid)):
941             if task1 in self.prio_map:
942                 task = self.prio_map[task1]
943                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
944                         taskQueue.fn_index[self.runq_fnid[task]], 
945                         self.runq_task[task], 
946                         self.runq_weight[task], 
947                         self.runq_depends[task], 
948                         self.runq_revdeps[task]))