runqueue.py: improve printing dependent tasks
[bitbake.git] / lib / bb / runqueue.py
1 #!/usr/bin/env python
2 # ex:ts=4:sw=4:sts=4:et
3 # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4 """
5 BitBake 'RunQueue' implementation
6
7 Handles preparation and execution of a queue of tasks
8 """
9
10 # Copyright (C) 2006-2007  Richard Purdie
11 #
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License version 2 as
14 # published by the Free Software Foundation.
15 #
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 # GNU General Public License for more details.
20 #
21 # You should have received a copy of the GNU General Public License along
22 # with this program; if not, write to the Free Software Foundation, Inc.,
23 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25 from bb import msg, data, event, mkdirhier, utils
26 import bb, os, sys
27 import signal
28 import stat
29 import fcntl
30
31 class TaskFailure(Exception):
32     """Exception raised when a task in a runqueue fails"""
33     def __init__(self, x): 
34         self.args = x
35
36
37 class RunQueueStats:
38     """
39     Holds statistics on the tasks handled by the associated runQueue
40     """
41     def __init__(self, total):
42         self.completed = 0
43         self.skipped = 0
44         self.failed = 0
45         self.active = 0
46         self.total = total
47
48     def taskFailed(self):
49         self.active = self.active - 1
50         self.failed = self.failed + 1
51
52     def taskCompleted(self, number = 1):
53         self.active = self.active - number
54         self.completed = self.completed + number
55
56     def taskSkipped(self, number = 1):
57         self.active = self.active + number
58         self.skipped = self.skipped + number
59
60     def taskActive(self):
61         self.active = self.active + 1
62
63 # These values indicate the next step due to be run in the 
64 # runQueue state machine
65 runQueuePrepare = 2
66 runQueueRunInit = 3
67 runQueueRunning = 4
68 runQueueFailed = 6
69 runQueueCleanUp = 7
70 runQueueComplete = 8
71 runQueueChildProcess = 9
72
73 class RunQueueScheduler:
74     """
75     Control the order tasks are scheduled in.
76     """
77     def __init__(self, runqueue):
78         """
79         The default scheduler just returns the first buildable task (the 
80         priority map is sorted by task numer)
81         """
82         self.rq = runqueue
83         numTasks = len(self.rq.runq_fnid)
84
85         self.prio_map = []
86         self.prio_map.extend(range(numTasks))
87
88     def next(self):
89         """
90         Return the id of the first task we find that is buildable
91         """
92         for task1 in range(len(self.rq.runq_fnid)):
93             task = self.prio_map[task1]
94             if self.rq.runq_running[task] == 1:
95                 continue
96             if self.rq.runq_buildable[task] == 1:
97                 return task
98
99 class RunQueueSchedulerSpeed(RunQueueScheduler):
100     """
101     A scheduler optimised for speed. The priority map is sorted by task weight,
102     heavier weighted tasks (tasks needed by the most other tasks) are run first.
103     """
104     def __init__(self, runqueue):
105         """
106         The priority map is sorted by task weight.
107         """
108         from copy import deepcopy
109
110         self.rq = runqueue
111
112         sortweight = deepcopy(self.rq.runq_weight)
113         sortweight.sort()
114         copyweight = deepcopy(self.rq.runq_weight)
115         self.prio_map = []
116
117         for weight in sortweight:
118             idx = copyweight.index(weight)
119             self.prio_map.append(idx)
120             copyweight[idx] = -1
121
122         self.prio_map.reverse()
123
124 class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
125     """
126     A scheduler optimised to complete .bb files are quickly as possible. The 
127     priority map is sorted by task weight, but then reordered so once a given 
128     .bb file starts to build, its completed as quickly as possible. This works
129     well where disk space is at a premium and classes like OE's rm_work are in 
130     force.
131     """
132     def __init__(self, runqueue):
133         RunQueueSchedulerSpeed.__init__(self, runqueue)
134         from copy import deepcopy
135
136         #FIXME - whilst this groups all fnids together it does not reorder the
137         #fnid groups optimally.
138  
139         basemap = deepcopy(self.prio_map)
140         self.prio_map = []
141         while (len(basemap) > 0):
142             entry = basemap.pop(0)
143             self.prio_map.append(entry)
144             fnid = self.rq.runq_fnid[entry]
145             todel = []
146             for entry in basemap:
147                 entry_fnid = self.rq.runq_fnid[entry]
148                 if entry_fnid == fnid:
149                     todel.append(basemap.index(entry))
150                     self.prio_map.append(entry)
151             todel.reverse()
152             for idx in todel:
153                 del basemap[idx]
154
155 class RunQueue:
156     """
157     BitBake Run Queue implementation
158     """
159     def __init__(self, cooker, cfgData, dataCache, taskData, targets):
160         self.reset_runqueue()
161         self.cooker = cooker
162         self.dataCache = dataCache
163         self.taskData = taskData
164         self.cfgData = cfgData
165         self.targets = targets
166
167         self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1)
168         self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split()
169         self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed"
170         self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, 1) or "perfile"
171         self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or ""
172
173     def reset_runqueue(self):
174         self.runq_fnid = []
175         self.runq_task = []
176         self.runq_depends = []
177         self.runq_revdeps = []
178         self.state = runQueuePrepare
179
180     def runq_depends_names(self, ids):
181         import re
182         ret = []
183         for id in self.runq_depends[ids]:
184             nam = os.path.basename(self.get_user_idstring(id))
185             nam = re.sub("_[^,]*,", ",", nam)
186             ret.extend([nam])
187         return ret
188
189     def get_user_idstring(self, task):
190         fn = self.taskData.fn_index[self.runq_fnid[task]]
191         taskname = self.runq_task[task]
192         return "%s, %s" % (fn, taskname)
193
194     def get_task_id(self, fnid, taskname):
195         for listid in range(len(self.runq_fnid)):
196             if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname:
197                 return listid
198         return None
199
200     def circular_depchains_handler(self, tasks):
201         """
202         Some tasks aren't buildable, likely due to circular dependency issues.
203         Identify the circular dependencies and print them in a user readable format.
204         """
205         from copy import deepcopy
206
207         valid_chains = []
208         explored_deps = {}
209         msgs = []
210
211         def chain_reorder(chain):
212             """
213             Reorder a dependency chain so the lowest task id is first
214             """
215             lowest = 0
216             new_chain = []
217             for entry in range(len(chain)):
218                 if chain[entry] < chain[lowest]:
219                     lowest = entry
220             new_chain.extend(chain[lowest:])
221             new_chain.extend(chain[:lowest])
222             return new_chain
223
224         def chain_compare_equal(chain1, chain2):
225             """
226             Compare two dependency chains and see if they're the same
227             """
228             if len(chain1) != len(chain2):
229                 return False
230             for index in range(len(chain1)):
231                 if chain1[index] != chain2[index]:
232                     return False
233             return True
234             
235         def chain_array_contains(chain, chain_array):
236             """
237             Return True if chain_array contains chain
238             """
239             for ch in chain_array:
240                 if chain_compare_equal(ch, chain):
241                     return True
242             return False
243
244         def find_chains(taskid, prev_chain):
245             prev_chain.append(taskid)
246             total_deps = []
247             total_deps.extend(self.runq_revdeps[taskid])
248             for revdep in self.runq_revdeps[taskid]:
249                 if revdep in prev_chain:
250                     idx = prev_chain.index(revdep)
251                     # To prevent duplicates, reorder the chain to start with the lowest taskid
252                     # and search through an array of those we've already printed
253                     chain = prev_chain[idx:]
254                     new_chain = chain_reorder(chain)
255                     if not chain_array_contains(new_chain, valid_chains):
256                         valid_chains.append(new_chain)
257                         msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
258                         for dep in new_chain:
259                             msgs.append("  Task %s (%s) (dependent Tasks %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends_names(dep)))
260                         msgs.append("\n")
261                     if len(valid_chains) > 10:
262                         msgs.append("Aborted dependency loops search after 10 matches.\n")
263                         return msgs
264                     continue
265                 scan = False
266                 if revdep not in explored_deps:
267                     scan = True
268                 elif revdep in explored_deps[revdep]:
269                     scan = True
270                 else:
271                     for dep in prev_chain:
272                         if dep in explored_deps[revdep]:
273                             scan = True
274                 if scan:
275                     find_chains(revdep, deepcopy(prev_chain))
276                 for dep in explored_deps[revdep]:
277                     if dep not in total_deps:
278                         total_deps.append(dep)
279
280             explored_deps[taskid] = total_deps
281
282         for task in tasks:
283             find_chains(task, [])
284
285         return msgs
286
287     def calculate_task_weights(self, endpoints):
288         """
289         Calculate a number representing the "weight" of each task. Heavier weighted tasks 
290         have more dependencies and hence should be executed sooner for maximum speed.
291
292         This function also sanity checks the task list finding tasks that its not
293         possible to execute due to circular dependencies.
294         """
295
296         numTasks = len(self.runq_fnid)
297         weight = []
298         deps_left = []
299         task_done = []
300
301         for listid in range(numTasks):
302             task_done.append(False)
303             weight.append(0)
304             deps_left.append(len(self.runq_revdeps[listid]))
305
306         for listid in endpoints:
307             weight[listid] = 1
308             task_done[listid] = True
309
310         while 1:
311             next_points = []
312             for listid in endpoints:
313                 for revdep in self.runq_depends[listid]:
314                     weight[revdep] = weight[revdep] + weight[listid]
315                     deps_left[revdep] = deps_left[revdep] - 1
316                     if deps_left[revdep] == 0:
317                         next_points.append(revdep)
318                         task_done[revdep] = True
319             endpoints = next_points
320             if len(next_points) == 0:
321                 break      
322
323         # Circular dependency sanity check
324         problem_tasks = []
325         for task in range(numTasks):
326             if task_done[task] is False or deps_left[task] != 0:
327                 problem_tasks.append(task)
328                 bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s) is not buildable\n" % (task, self.get_user_idstring(task)))
329                 bb.msg.debug(2, bb.msg.domain.RunQueue, "(Complete marker was %s and the remaining dependency count was %s)\n\n" % (task_done[task], deps_left[task]))
330
331         if problem_tasks:
332             message = "Unbuildable tasks were found.\n"
333             message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
334             message = message + "Identifying dependency loops (this may take a short while)...\n"
335             bb.msg.error(bb.msg.domain.RunQueue, message)
336
337             msgs = self.circular_depchains_handler(problem_tasks)
338
339             message = "\n"
340             for msg in msgs:
341                 message = message + msg
342             bb.msg.fatal(bb.msg.domain.RunQueue, message)
343
344         return weight
345
346     def prepare_runqueue(self):
347         """
348         Turn a set of taskData into a RunQueue and compute data needed 
349         to optimise the execution order.
350         """
351
352         runq_build = []
353         recursive_tdepends = {}
354         runq_recrdepends = []
355         tdepends_fnid = {}
356
357         taskData = self.taskData
358
359         if len(taskData.tasks_name) == 0:
360             # Nothing to do
361             return
362
363         bb.msg.note(1, bb.msg.domain.RunQueue, "Preparing runqueue")
364
365         # Step A - Work out a list of tasks to run
366         #
367         # Taskdata gives us a list of possible providers for every build and run
368         # target ordered by priority. It also gives information on each of those 
369         # providers.
370         #
371         # To create the actual list of tasks to execute we fix the list of 
372         # providers and then resolve the dependencies into task IDs. This 
373         # process is repeated for each type of dependency (tdepends, deptask, 
374         # rdeptast, recrdeptask, idepends).
375
376         def add_build_dependencies(depids, tasknames, depends):
377             for depid in depids:
378                 # Won't be in build_targets if ASSUME_PROVIDED
379                 if depid not in taskData.build_targets:
380                     continue
381                 depdata = taskData.build_targets[depid][0]
382                 if depdata is None:
383                     continue
384                 dep = taskData.fn_index[depdata]
385                 for taskname in tasknames:
386                     taskid = taskData.gettask_id(dep, taskname, False)
387                     if taskid is not None:
388                         depends.append(taskid)
389
390         def add_runtime_dependencies(depids, tasknames, depends):
391             for depid in depids:
392                 if depid not in taskData.run_targets:
393                     continue
394                 depdata = taskData.run_targets[depid][0]
395                 if depdata is None:
396                     continue
397                 dep = taskData.fn_index[depdata]
398                 for taskname in tasknames:
399                     taskid = taskData.gettask_id(dep, taskname, False)
400                     if taskid is not None:
401                         depends.append(taskid)
402
403         for task in range(len(taskData.tasks_name)):
404             depends = []
405             recrdepends = []
406             fnid = taskData.tasks_fnid[task]
407             fn = taskData.fn_index[fnid]
408             task_deps = self.dataCache.task_deps[fn]
409
410             bb.msg.debug(2, bb.msg.domain.RunQueue, "Processing %s:%s" %(fn, taskData.tasks_name[task]))
411
412             if fnid not in taskData.failed_fnids:
413
414                 # Resolve task internal dependencies 
415                 #
416                 # e.g. addtask before X after Y
417                 depends = taskData.tasks_tdepends[task]
418
419                 # Resolve 'deptask' dependencies 
420                 #
421                 # e.g. do_sometask[deptask] = "do_someothertask"
422                 # (makes sure sometask runs after someothertask of all DEPENDS)
423                 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
424                     tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
425                     add_build_dependencies(taskData.depids[fnid], tasknames, depends)
426
427                 # Resolve 'rdeptask' dependencies 
428                 #
429                 # e.g. do_sometask[rdeptask] = "do_someothertask"
430                 # (makes sure sometask runs after someothertask of all RDEPENDS)
431                 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
432                     taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
433                     add_runtime_dependencies(taskData.rdepids[fnid], [taskname], depends)
434
435                 # Resolve inter-task dependencies 
436                 #
437                 # e.g. do_sometask[depends] = "targetname:do_someothertask"
438                 # (makes sure sometask runs after targetname's someothertask)
439                 if fnid not in tdepends_fnid:
440                     tdepends_fnid[fnid] = set()
441                 idepends = taskData.tasks_idepends[task]
442                 for (depid, idependtask) in idepends:
443                     if depid in taskData.build_targets:
444                         # Won't be in build_targets if ASSUME_PROVIDED
445                         depdata = taskData.build_targets[depid][0]
446                         if depdata is not None:
447                             dep = taskData.fn_index[depdata]
448                             taskid = taskData.gettask_id(dep, idependtask)
449                             depends.append(taskid)
450                             if depdata != fnid:
451                                 tdepends_fnid[fnid].add(taskid)
452
453
454                 # Resolve recursive 'recrdeptask' dependencies (A)
455                 #
456                 # e.g. do_sometask[recrdeptask] = "do_someothertask"
457                 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
458                 # We cover the recursive part of the dependencies below
459                 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
460                     for taskname in task_deps['recrdeptask'][taskData.tasks_name[task]].split():
461                         recrdepends.append(taskname)
462                         add_build_dependencies(taskData.depids[fnid], [taskname], depends)
463                         add_runtime_dependencies(taskData.rdepids[fnid], [taskname], depends)
464
465                 # Rmove all self references
466                 if task in depends:
467                     newdep = []
468                     bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s %s) contains self reference! %s" % (task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends))
469                     for dep in depends:
470                        if task != dep:
471                           newdep.append(dep)
472                     depends = newdep
473
474             self.runq_fnid.append(taskData.tasks_fnid[task])
475             self.runq_task.append(taskData.tasks_name[task])
476             self.runq_depends.append(set(depends))
477             self.runq_revdeps.append(set())
478
479             runq_build.append(0)
480             runq_recrdepends.append(recrdepends)
481
482         #
483         # Build a list of recursive cumulative dependencies for each fnid
484         # We do this by fnid, since if A depends on some task in B
485         # we're interested in later tasks B's fnid might have but B itself 
486         # doesn't depend on
487         #
488         # Algorithm is O(tasks) + O(tasks)*O(fnids)
489         #
490         reccumdepends = {}
491         for task in range(len(self.runq_fnid)):
492             fnid = self.runq_fnid[task]
493             if fnid not in reccumdepends:
494                 if fnid in tdepends_fnid:
495                     reccumdepends[fnid] = tdepends_fnid[fnid]
496                 else:
497                     reccumdepends[fnid] = set()
498             reccumdepends[fnid].update(self.runq_depends[task])
499         for task in range(len(self.runq_fnid)):
500             taskfnid = self.runq_fnid[task]
501             for fnid in reccumdepends:
502                 if task in reccumdepends[fnid]:
503                     reccumdepends[fnid].add(task)
504                     if taskfnid in reccumdepends:
505                         reccumdepends[fnid].update(reccumdepends[taskfnid])
506
507
508         # Resolve recursive 'recrdeptask' dependencies (B)
509         #
510         # e.g. do_sometask[recrdeptask] = "do_someothertask"
511         # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
512         for task in range(len(self.runq_fnid)):
513             if len(runq_recrdepends[task]) > 0:
514                 taskfnid = self.runq_fnid[task]
515                 for dep in reccumdepends[taskfnid]:
516                     # Ignore self references 
517                     if dep == task:
518                         continue
519                     for taskname in runq_recrdepends[task]:
520                         if taskData.tasks_name[dep] == taskname:
521                             self.runq_depends[task].add(dep)
522
523         # Step B - Mark all active tasks
524         #
525         # Start with the tasks we were asked to run and mark all dependencies
526         # as active too. If the task is to be 'forced', clear its stamp. Once
527         # all active tasks are marked, prune the ones we don't need.
528
529         bb.msg.note(2, bb.msg.domain.RunQueue, "Marking Active Tasks")
530
531         def mark_active(listid, depth):
532             """
533             Mark an item as active along with its depends
534             (calls itself recursively)
535             """
536
537             if runq_build[listid] == 1:
538                 return
539
540             runq_build[listid] = 1
541
542             depends = self.runq_depends[listid]
543             for depend in depends:
544                 mark_active(depend, depth+1)
545
546         self.target_pairs = []
547         for target in self.targets:
548             targetid = taskData.getbuild_id(target[0])
549
550             if targetid not in taskData.build_targets:
551                 continue
552
553             if targetid in taskData.failed_deps:
554                 continue
555
556             fnid = taskData.build_targets[targetid][0]
557             fn = taskData.fn_index[fnid]
558             self.target_pairs.append((fn, target[1]))
559
560             # Remove stamps for targets if force mode active
561             if self.cooker.configuration.force:
562                 bb.msg.note(2, bb.msg.domain.RunQueue, "Remove stamp %s, %s" % (target[1], fn))
563                 bb.build.del_stamp(target[1], self.dataCache, fn)
564
565             if fnid in taskData.failed_fnids:
566                 continue
567
568             if target[1] not in taskData.tasks_lookup[fnid]:
569                 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s does not exist for target %s" % (target[1], target[0]))
570
571             listid = taskData.tasks_lookup[fnid][target[1]]
572
573             mark_active(listid, 1)
574
575         # Step C - Prune all inactive tasks
576         #
577         # Once all active tasks are marked, prune the ones we don't need.
578
579         maps = []
580         delcount = 0
581         for listid in range(len(self.runq_fnid)):
582             if runq_build[listid-delcount] == 1:
583                 maps.append(listid-delcount)
584             else:
585                 del self.runq_fnid[listid-delcount]
586                 del self.runq_task[listid-delcount]
587                 del self.runq_depends[listid-delcount]
588                 del runq_build[listid-delcount]
589                 del self.runq_revdeps[listid-delcount]
590                 delcount = delcount + 1
591                 maps.append(-1)
592
593         #
594         # Step D - Sanity checks and computation
595         #
596
597         # Check to make sure we still have tasks to run
598         if len(self.runq_fnid) == 0:
599             if not taskData.abort:
600                 bb.msg.fatal(bb.msg.domain.RunQueue, "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
601             else:
602                 bb.msg.fatal(bb.msg.domain.RunQueue, "No active tasks and not in --continue mode?! Please report this bug.")
603
604         bb.msg.note(2, bb.msg.domain.RunQueue, "Pruned %s inactive tasks, %s left" % (delcount, len(self.runq_fnid)))
605
606         # Remap the dependencies to account for the deleted tasks
607         # Check we didn't delete a task we depend on
608         for listid in range(len(self.runq_fnid)):
609             newdeps = []
610             origdeps = self.runq_depends[listid]
611             for origdep in origdeps:
612                 if maps[origdep] == -1:
613                     bb.msg.fatal(bb.msg.domain.RunQueue, "Invalid mapping - Should never happen!")
614                 newdeps.append(maps[origdep])
615             self.runq_depends[listid] = set(newdeps)
616
617         bb.msg.note(2, bb.msg.domain.RunQueue, "Assign Weightings")
618
619         # Generate a list of reverse dependencies to ease future calculations
620         for listid in range(len(self.runq_fnid)):
621             for dep in self.runq_depends[listid]:
622                 self.runq_revdeps[dep].add(listid)
623
624         # Identify tasks at the end of dependency chains
625         # Error on circular dependency loops (length two)
626         endpoints = []
627         for listid in range(len(self.runq_fnid)):
628             revdeps = self.runq_revdeps[listid]
629             if len(revdeps) == 0:
630                 endpoints.append(listid)
631             for dep in revdeps:
632                 if dep in self.runq_depends[listid]:
633                     #self.dump_data(taskData)
634                     bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep] , taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
635
636         bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints))
637
638         # Calculate task weights 
639         # Check of higher length circular dependencies
640         self.runq_weight = self.calculate_task_weights(endpoints)
641
642         # Decide what order to execute the tasks in, pick a scheduler
643         #self.sched = RunQueueScheduler(self)
644         if self.scheduler == "completion":
645             self.sched = RunQueueSchedulerCompletion(self)
646         else:
647             self.sched = RunQueueSchedulerSpeed(self)
648
649         # Sanity Check - Check for multiple tasks building the same provider
650         prov_list = {}
651         seen_fn = []
652         for task in range(len(self.runq_fnid)):
653             fn = taskData.fn_index[self.runq_fnid[task]]
654             if fn in seen_fn:
655                 continue
656             seen_fn.append(fn)
657             for prov in self.dataCache.fn_provides[fn]:
658                 if prov not in prov_list:
659                     prov_list[prov] = [fn]
660                 elif fn not in prov_list[prov]: 
661                     prov_list[prov].append(fn)
662         error = False
663         for prov in prov_list:
664             if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
665                 error = True
666                 bb.msg.error(bb.msg.domain.RunQueue, "Multiple .bb files are due to be built which each provide %s (%s).\n This usually means one provides something the other doesn't and should." % (prov, " ".join(prov_list[prov])))
667         #if error:
668         #    bb.msg.fatal(bb.msg.domain.RunQueue, "Corrupted metadata configuration detected, aborting...")
669
670
671         # Create a whitelist usable by the stamp checks
672         stampfnwhitelist = []
673         for entry in self.stampwhitelist.split():
674             entryid = self.taskData.getbuild_id(entry)
675             if entryid not in self.taskData.build_targets:
676                 continue
677             fnid = self.taskData.build_targets[entryid][0]
678             fn = self.taskData.fn_index[fnid]
679             stampfnwhitelist.append(fn)
680         self.stampfnwhitelist = stampfnwhitelist
681
682         #self.dump_data(taskData)
683
684         self.state = runQueueRunInit
685
686     def check_stamps(self):
687         unchecked = {}
688         current = []
689         notcurrent = []
690         buildable = []
691
692         if self.stamppolicy == "perfile":
693             fulldeptree = False
694         else:
695             fulldeptree = True
696             stampwhitelist = []
697             if self.stamppolicy == "whitelist":
698                 stampwhitelist = self.self.stampfnwhitelist
699
700         for task in range(len(self.runq_fnid)):
701             unchecked[task] = ""
702             if len(self.runq_depends[task]) == 0:
703                 buildable.append(task)
704
705         def check_buildable(self, task, buildable):
706              for revdep in self.runq_revdeps[task]:
707                 alldeps = 1
708                 for dep in self.runq_depends[revdep]:
709                     if dep in unchecked:
710                         alldeps = 0
711                 if alldeps == 1:
712                     if revdep in unchecked:
713                         buildable.append(revdep)
714
715         for task in range(len(self.runq_fnid)):
716             if task not in unchecked:
717                 continue
718             fn = self.taskData.fn_index[self.runq_fnid[task]]
719             taskname = self.runq_task[task]
720             stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname)
721             # If the stamp is missing its not current
722             if not os.access(stampfile, os.F_OK):
723                 del unchecked[task]
724                 notcurrent.append(task)
725                 check_buildable(self, task, buildable)
726                 continue
727             # If its a 'nostamp' task, it's not current
728             taskdep = self.dataCache.task_deps[fn]
729             if 'nostamp' in taskdep and task in taskdep['nostamp']:
730                 del unchecked[task]
731                 notcurrent.append(task)
732                 check_buildable(self, task, buildable)
733                 continue
734
735         while (len(buildable) > 0):
736             nextbuildable = []
737             for task in buildable:
738                 if task in unchecked:
739                     fn = self.taskData.fn_index[self.runq_fnid[task]]
740                     taskname = self.runq_task[task]
741                     stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname)
742                     iscurrent = True
743
744                     t1 = os.stat(stampfile)[stat.ST_MTIME]
745                     for dep in self.runq_depends[task]:
746                         if iscurrent:
747                             fn2 = self.taskData.fn_index[self.runq_fnid[dep]]
748                             taskname2 = self.runq_task[dep]
749                             stampfile2 = "%s.%s" % (self.dataCache.stamp[fn2], taskname2)
750                             if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
751                                 if dep in notcurrent:
752                                     iscurrent = False
753                                 else:
754                                     t2 = os.stat(stampfile2)[stat.ST_MTIME]
755                                     if t1 < t2:
756                                         iscurrent = False
757                     del unchecked[task]
758                     if iscurrent:
759                         current.append(task)
760                     else:
761                         notcurrent.append(task)
762
763                 check_buildable(self, task, nextbuildable)
764
765             buildable = nextbuildable
766
767         #for task in range(len(self.runq_fnid)):
768         #    fn = self.taskData.fn_index[self.runq_fnid[task]]
769         #    taskname = self.runq_task[task]
770         #    print "%s %s.%s" % (task, taskname, fn)
771
772         #print "Unchecked: %s" % unchecked
773         #print "Current: %s" % current
774         #print "Not current: %s" % notcurrent
775
776         if len(unchecked) > 0:
777             bb.fatal("check_stamps fatal internal error")
778         return current
779
780     def check_stamp_task(self, task):
781
782         if self.stamppolicy == "perfile":
783             fulldeptree = False
784         else:
785             fulldeptree = True
786             stampwhitelist = []
787             if self.stamppolicy == "whitelist":
788                 stampwhitelist = self.stampfnwhitelist
789
790         fn = self.taskData.fn_index[self.runq_fnid[task]]
791         taskname = self.runq_task[task]
792         stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname)
793         # If the stamp is missing its not current
794         if not os.access(stampfile, os.F_OK):
795             bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s not available\n" % stampfile)
796             return False
797         # If its a 'nostamp' task, it's not current
798         taskdep = self.dataCache.task_deps[fn]
799         if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
800             bb.msg.debug(2, bb.msg.domain.RunQueue, "%s.%s is nostamp\n" % (fn, taskname))
801             return False
802
803         iscurrent = True
804         t1 =  os.stat(stampfile)[stat.ST_MTIME]
805         for dep in self.runq_depends[task]:
806             if iscurrent:
807                 fn2 = self.taskData.fn_index[self.runq_fnid[dep]]
808                 taskname2 = self.runq_task[dep]
809                 stampfile2 = "%s.%s" % (self.dataCache.stamp[fn2], taskname2)
810                 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
811                     try:
812                         t2 = os.stat(stampfile2)[stat.ST_MTIME]
813                         if t1 < t2:
814                             bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s < %s" % (stampfile,stampfile2))
815                             iscurrent = False
816                     except:
817                         bb.msg.debug(2, bb.msg.domain.RunQueue, "Exception reading %s for %s" % (stampfile2 ,stampfile))
818                         iscurrent = False
819
820         return iscurrent
821
822     def execute_runqueue(self):
823         """
824         Run the tasks in a queue prepared by prepare_runqueue
825         Upon failure, optionally try to recover the build using any alternate providers
826         (if the abort on failure configuration option isn't set)
827         """
828
829         if self.state is runQueuePrepare:
830             self.prepare_runqueue()
831
832         if self.state is runQueueRunInit:
833             bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue")
834             self.execute_runqueue_initVars()
835
836         if self.state is runQueueRunning:
837             self.execute_runqueue_internal()
838
839         if self.state is runQueueCleanUp:
840             self.finish_runqueue()
841
842         if self.state is runQueueFailed:
843             if not self.taskData.tryaltconfigs:
844                 raise bb.runqueue.TaskFailure(self.failed_fnids)
845             for fnid in self.failed_fnids:
846                 self.taskData.fail_fnid(fnid)
847             self.reset_runqueue()
848
849         if self.state is runQueueComplete:
850             # All done
851             bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
852             return False
853
854         if self.state is runQueueChildProcess:
855             print "Child process"
856             return False
857
858         # Loop
859         return True
860
861     def execute_runqueue_initVars(self):
862
863         self.stats = RunQueueStats(len(self.runq_fnid))
864
865         self.runq_buildable = []
866         self.runq_running = []
867         self.runq_complete = []
868         self.build_pids = {}
869         self.build_pipes = {}
870         self.failed_fnids = []
871
872         # Mark initial buildable tasks
873         for task in range(self.stats.total):
874             self.runq_running.append(0)
875             self.runq_complete.append(0)
876             if len(self.runq_depends[task]) == 0:
877                 self.runq_buildable.append(1)
878             else:
879                 self.runq_buildable.append(0)
880
881         self.state = runQueueRunning
882
883         event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp), self.cfgData)
884
885     def task_complete(self, task):
886         """
887         Mark a task as completed
888         Look at the reverse dependencies and mark any task with 
889         completed dependencies as buildable
890         """
891         self.runq_complete[task] = 1
892         for revdep in self.runq_revdeps[task]:
893             if self.runq_running[revdep] == 1:
894                 continue
895             if self.runq_buildable[revdep] == 1:
896                 continue
897             alldeps = 1
898             for dep in self.runq_depends[revdep]:
899                 if self.runq_complete[dep] != 1:
900                     alldeps = 0
901             if alldeps == 1:
902                 self.runq_buildable[revdep] = 1
903                 fn = self.taskData.fn_index[self.runq_fnid[revdep]]
904                 taskname = self.runq_task[revdep]
905                 bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname))
906
907     def task_fail(self, task, exitcode):
908         """
909         Called when a task has failed
910         Updates the state engine with the failure
911         """
912         bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed with %s" % (task, self.get_user_idstring(task), exitcode))
913         self.stats.taskFailed()
914         fnid = self.runq_fnid[task]
915         self.failed_fnids.append(fnid)
916         bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData)
917         if self.taskData.abort:
918             self.state = runQueueCleanUp
919
920     def execute_runqueue_internal(self):
921         """
922         Run the tasks in a queue prepared by prepare_runqueue
923         """
924
925         if self.stats.total == 0:
926             # nothing to do
927             self.state = runQueueCleanup
928
929         while True:
930             task = None
931             if self.stats.active < self.number_tasks:
932                 task = self.sched.next()
933             if task is not None:
934                 fn = self.taskData.fn_index[self.runq_fnid[task]]
935
936                 taskname = self.runq_task[task]
937                 if self.check_stamp_task(task):
938                     bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task)))
939                     self.runq_running[task] = 1
940                     self.runq_buildable[task] = 1
941                     self.task_complete(task)
942                     self.stats.taskCompleted()
943                     self.stats.taskSkipped()
944                     continue
945
946                 sys.stdout.flush()
947                 sys.stderr.flush()
948                 try:
949                     pipein, pipeout = os.pipe()
950                     pid = os.fork() 
951                 except OSError, e: 
952                     bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
953                 if pid == 0:
954                     os.close(pipein)
955                     # Save out the PID so that the event can include it the
956                     # events
957                     bb.event.worker_pid = os.getpid()
958                     bb.event.worker_pipe = pipeout
959
960                     self.state = runQueueChildProcess
961                     # Make the child the process group leader
962                     os.setpgid(0, 0)
963                     # No stdin
964                     newsi = os.open('/dev/null', os.O_RDWR)
965                     os.dup2(newsi, sys.stdin.fileno())
966                     # Stdout to a logfile
967                     #logout = data.expand("${TMPDIR}/log/stdout.%s" % os.getpid(), self.cfgData, True)
968                     #mkdirhier(os.path.dirname(logout))
969                     #newso = open(logout, 'w')
970                     #os.dup2(newso.fileno(), sys.stdout.fileno())
971                     #os.dup2(newso.fileno(), sys.stderr.fileno())
972
973                     bb.event.fire(runQueueTaskStarted(task, self.stats, self), self.cfgData)
974                     bb.msg.note(1, bb.msg.domain.RunQueue,
975                                 "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + 1,
976                                                                         self.stats.total,
977                                                                         task,
978                                                                         self.get_user_idstring(task)))
979
980                     bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
981                     try:
982                         self.cooker.tryBuild(fn, taskname[3:])
983                     except bb.build.EventException:
984                         bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
985                         os._exit(1)
986                     except:
987                         bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
988                         os._exit(1)
989                     os._exit(0)
990
991                 self.build_pids[pid] = task
992                 self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
993                 self.runq_running[task] = 1
994                 self.stats.taskActive()
995                 if self.stats.active < self.number_tasks:
996                     continue
997
998             for pipe in self.build_pipes:
999                 self.build_pipes[pipe].read()
1000
1001             if self.stats.active > 0:
1002                 result = os.waitpid(-1, os.WNOHANG)
1003                 if result[0] is 0 and result[1] is 0:
1004                     return
1005                 task = self.build_pids[result[0]]
1006                 del self.build_pids[result[0]]
1007                 self.build_pipes[result[0]].close()
1008                 del self.build_pipes[result[0]]
1009                 if result[1] != 0:
1010                     self.task_fail(task, result[1])
1011                     return
1012                 self.task_complete(task)
1013                 self.stats.taskCompleted()
1014                 bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData)
1015                 continue
1016
1017             if len(self.failed_fnids) != 0:
1018                 self.state = runQueueFailed
1019                 return
1020
1021             # Sanity Checks
1022             for task in range(self.stats.total):
1023                 if self.runq_buildable[task] == 0:
1024                     bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task)
1025                 if self.runq_running[task] == 0:
1026                     bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task)
1027                 if self.runq_complete[task] == 0:
1028                     bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task)
1029             self.state = runQueueComplete
1030             return
1031
1032     def finish_runqueue_now(self):
1033         bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.stats.active)
1034         for k, v in self.build_pids.iteritems():
1035              try:
1036                  os.kill(-k, signal.SIGINT)
1037              except:
1038                  pass
1039         for pipe in self.build_pipes:
1040             self.build_pipes[pipe].read()
1041
1042     def finish_runqueue(self, now = False):
1043         self.state = runQueueCleanUp
1044
1045         for pipe in self.build_pipes:
1046             self.build_pipes[pipe].read()
1047
1048         if now:
1049             self.finish_runqueue_now()
1050         try:
1051             while self.stats.active > 0:
1052                 bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1053                 #bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.stats.active)
1054                 #tasknum = 1
1055                 #for k, v in self.build_pids.iteritems():
1056                 #    bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (pid %s)" % (tasknum, self.get_user_idstring(v), k))
1057                 #    tasknum = tasknum + 1
1058                 result = os.waitpid(-1, os.WNOHANG)
1059                 if result[0] is 0 and result[1] is 0:
1060                     return
1061                 task = self.build_pids[result[0]]
1062                 del self.build_pids[result[0]]
1063                 self.build_pipes[result[0]].close()
1064                 del self.build_pipes[result[0]]
1065                 if result[1] != 0:
1066                     self.task_fail(task, result[1])
1067                 else:
1068                     self.stats.taskCompleted()
1069                     bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData)
1070         except:
1071             self.finish_runqueue_now()
1072             raise
1073
1074         if len(self.failed_fnids) != 0:
1075             self.state = runQueueFailed
1076             return
1077
1078         self.state = runQueueComplete
1079         return
1080
1081     def dump_data(self, taskQueue):
1082         """
1083         Dump some debug information on the internal data structures
1084         """
1085         bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:")
1086         for task in range(len(self.runq_task)):
1087                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
1088                         taskQueue.fn_index[self.runq_fnid[task]], 
1089                         self.runq_task[task], 
1090                         self.runq_weight[task], 
1091                         self.runq_depends[task], 
1092                         self.runq_revdeps[task]))
1093
1094         bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:")
1095         for task1 in range(len(self.runq_task)):
1096             if task1 in self.prio_map:
1097                 task = self.prio_map[task1]
1098                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
1099                         taskQueue.fn_index[self.runq_fnid[task]], 
1100                         self.runq_task[task], 
1101                         self.runq_weight[task], 
1102                         self.runq_depends[task], 
1103                         self.runq_revdeps[task]))
1104
1105
1106 class TaskFailure(Exception):
1107     """
1108     Exception raised when a task in a runqueue fails
1109     """
1110     def __init__(self, x): 
1111         self.args = x
1112
1113
1114 class runQueueExitWait(bb.event.Event):
1115     """
1116     Event when waiting for task processes to exit
1117     """
1118
1119     def __init__(self, remain):
1120         self.remain = remain
1121         self.message = "Waiting for %s active tasks to finish" % remain
1122         bb.event.Event.__init__(self)
1123
1124 class runQueueEvent(bb.event.Event):
1125     """
1126     Base runQueue event class
1127     """
1128     def __init__(self, task, stats, rq):
1129         self.taskid = task
1130         self.taskstring = rq.get_user_idstring(task)
1131         self.stats = stats
1132         bb.event.Event.__init__(self)
1133
1134 class runQueueTaskStarted(runQueueEvent):
1135     """
1136     Event notifing a task was started
1137     """
1138     def __init__(self, task, stats, rq):
1139         runQueueEvent.__init__(self, task, stats, rq)
1140         self.message = "Running task %s (%d of %d) (%s)" % (task, stats.completed + stats.active + 1, self.stats.total, self.taskstring)
1141
1142 class runQueueTaskFailed(runQueueEvent):
1143     """
1144     Event notifing a task failed
1145     """
1146     def __init__(self, task, stats, rq):
1147         runQueueEvent.__init__(self, task, stats, rq)
1148         self.message = "Task %s failed (%s)" % (task, self.taskstring)
1149
1150 class runQueueTaskCompleted(runQueueEvent):
1151     """
1152     Event notifing a task completed
1153     """
1154     def __init__(self, task, stats, rq):
1155         runQueueEvent.__init__(self, task, stats, rq)
1156         self.message = "Task %s completed (%s)" % (task, self.taskstring)
1157
1158 def check_stamp_fn(fn, taskname, d):
1159     rq = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)
1160     fnid = rq.taskData.getfn_id(fn)
1161     taskid = rq.get_task_id(fnid, taskname)
1162     if taskid is not None:
1163         return rq.check_stamp_task(taskid)
1164     return None
1165
1166 class runQueuePipe():
1167     """
1168     Abstraction for a pipe between a worker thread and the server
1169     """
1170     def __init__(self, pipein, pipeout, d):
1171         self.fd = pipein
1172         os.close(pipeout)
1173         fcntl.fcntl(self.fd, fcntl.F_SETFL, fcntl.fcntl(self.fd, fcntl.F_GETFL) | os.O_NONBLOCK)
1174         self.queue = ""
1175         self.d = d
1176
1177     def read(self):
1178         start = len(self.queue)
1179         try:
1180             self.queue = self.queue + os.read(self.fd, 1024)
1181         except OSError:
1182             pass
1183         end = len(self.queue)
1184         index = self.queue.find("</event>")
1185         while index != -1:
1186             bb.event.fire_from_worker(self.queue[:index+8], self.d)
1187             self.queue = self.queue[index+8:]
1188             index = self.queue.find("</event>")
1189         return (end > start)
1190
1191     def close(self):
1192         while self.read():
1193             continue
1194         if len(self.queue) > 0:
1195             print "Warning, worker left partial message"
1196         os.close(self.fd)
1197