2 # ex:ts=4:sw=4:sts=4:et
3 # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
5 BitBake 'RunQueue' implementation
7 Handles preparation and execution of a queue of tasks
10 # Copyright (C) 2006-2007 Richard Purdie
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License version 2 as
14 # published by the Free Software Foundation.
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 # GNU General Public License for more details.
21 # You should have received a copy of the GNU General Public License along
22 # with this program; if not, write to the Free Software Foundation, Inc.,
23 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
33 from bb import msg, data, event
35 bblogger = logging.getLogger("BitBake")
36 logger = logging.getLogger("BitBake.RunQueue")
40 Holds statistics on the tasks handled by the associated runQueue
42 def __init__(self, total):
50 obj = self.__class__(self.total)
51 obj.__dict__.update(self.__dict__)
55 self.active = self.active - 1
56 self.failed = self.failed + 1
58 def taskCompleted(self, number = 1):
59 self.active = self.active - number
60 self.completed = self.completed + number
62 def taskSkipped(self, number = 1):
63 self.active = self.active + number
64 self.skipped = self.skipped + number
67 self.active = self.active + 1
69 # These values indicate the next step due to be run in the
70 # runQueue state machine
79 runQueueChildProcess = 10
81 class RunQueueScheduler(object):
83 Control the order tasks are scheduled in.
87 def __init__(self, runqueue, rqdata):
89 The default scheduler just returns the first buildable task (the
90 priority map is sorted by task numer)
94 numTasks = len(self.rqdata.runq_fnid)
97 self.prio_map.extend(range(numTasks))
99 def next_buildable_task(self):
101 Return the id of the first task we find that is buildable
103 for tasknum in xrange(len(self.rqdata.runq_fnid)):
104 taskid = self.prio_map[tasknum]
105 if self.rq.runq_running[taskid] == 1:
107 if self.rq.runq_buildable[taskid] == 1:
108 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[taskid]]
109 taskname = self.rqdata.runq_task[taskid]
110 stamp = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
111 if stamp in self.rq.build_stamps.values():
117 Return the id of the task we should build next
119 if self.rq.stats.active < self.rq.number_tasks:
120 return self.next_buildable_task()
122 class RunQueueSchedulerSpeed(RunQueueScheduler):
124 A scheduler optimised for speed. The priority map is sorted by task weight,
125 heavier weighted tasks (tasks needed by the most other tasks) are run first.
129 def __init__(self, runqueue, rqdata):
131 The priority map is sorted by task weight.
137 sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight))
138 copyweight = copy.deepcopy(self.rqdata.runq_weight)
141 for weight in sortweight:
142 idx = copyweight.index(weight)
143 self.prio_map.append(idx)
146 self.prio_map.reverse()
148 class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
150 A scheduler optimised to complete .bb files are quickly as possible. The
151 priority map is sorted by task weight, but then reordered so once a given
152 .bb file starts to build, its completed as quickly as possible. This works
153 well where disk space is at a premium and classes like OE's rm_work are in
158 def __init__(self, runqueue, rqdata):
159 RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata)
161 #FIXME - whilst this groups all fnids together it does not reorder the
162 #fnid groups optimally.
164 basemap = copy.deepcopy(self.prio_map)
166 while (len(basemap) > 0):
167 entry = basemap.pop(0)
168 self.prio_map.append(entry)
169 fnid = self.rqdata.runq_fnid[entry]
171 for entry in basemap:
172 entry_fnid = self.rqdata.runq_fnid[entry]
173 if entry_fnid == fnid:
174 todel.append(basemap.index(entry))
175 self.prio_map.append(entry)
182 BitBake Run Queue implementation
184 def __init__(self, rq, cooker, cfgData, dataCache, taskData, targets):
186 self.dataCache = dataCache
187 self.taskData = taskData
188 self.targets = targets
191 self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or ""
192 self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split()
199 self.runq_depends = []
200 self.runq_revdeps = []
203 def runq_depends_names(self, ids):
206 for id in self.runq_depends[ids]:
207 nam = os.path.basename(self.get_user_idstring(id))
208 nam = re.sub("_[^,]*,", ",", nam)
212 def get_user_idstring(self, task, task_name_suffix = ""):
213 fn = self.taskData.fn_index[self.runq_fnid[task]]
214 taskname = self.runq_task[task] + task_name_suffix
215 return "%s, %s" % (fn, taskname)
217 def get_task_id(self, fnid, taskname):
218 for listid in xrange(len(self.runq_fnid)):
219 if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname:
223 def circular_depchains_handler(self, tasks):
225 Some tasks aren't buildable, likely due to circular dependency issues.
226 Identify the circular dependencies and print them in a user readable format.
228 from copy import deepcopy
234 def chain_reorder(chain):
236 Reorder a dependency chain so the lowest task id is first
240 for entry in xrange(len(chain)):
241 if chain[entry] < chain[lowest]:
243 new_chain.extend(chain[lowest:])
244 new_chain.extend(chain[:lowest])
247 def chain_compare_equal(chain1, chain2):
249 Compare two dependency chains and see if they're the same
251 if len(chain1) != len(chain2):
253 for index in xrange(len(chain1)):
254 if chain1[index] != chain2[index]:
258 def chain_array_contains(chain, chain_array):
260 Return True if chain_array contains chain
262 for ch in chain_array:
263 if chain_compare_equal(ch, chain):
267 def find_chains(taskid, prev_chain):
268 prev_chain.append(taskid)
270 total_deps.extend(self.runq_revdeps[taskid])
271 for revdep in self.runq_revdeps[taskid]:
272 if revdep in prev_chain:
273 idx = prev_chain.index(revdep)
274 # To prevent duplicates, reorder the chain to start with the lowest taskid
275 # and search through an array of those we've already printed
276 chain = prev_chain[idx:]
277 new_chain = chain_reorder(chain)
278 if not chain_array_contains(new_chain, valid_chains):
279 valid_chains.append(new_chain)
280 msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
281 for dep in new_chain:
282 msgs.append(" Task %s (%s) (dependent Tasks %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends_names(dep)))
284 if len(valid_chains) > 10:
285 msgs.append("Aborted dependency loops search after 10 matches.\n")
289 if revdep not in explored_deps:
291 elif revdep in explored_deps[revdep]:
294 for dep in prev_chain:
295 if dep in explored_deps[revdep]:
298 find_chains(revdep, copy.deepcopy(prev_chain))
299 for dep in explored_deps[revdep]:
300 if dep not in total_deps:
301 total_deps.append(dep)
303 explored_deps[taskid] = total_deps
306 find_chains(task, [])
310 def calculate_task_weights(self, endpoints):
312 Calculate a number representing the "weight" of each task. Heavier weighted tasks
313 have more dependencies and hence should be executed sooner for maximum speed.
315 This function also sanity checks the task list finding tasks that are not
316 possible to execute due to circular dependencies.
319 numTasks = len(self.runq_fnid)
324 for listid in xrange(numTasks):
325 task_done.append(False)
327 deps_left.append(len(self.runq_revdeps[listid]))
329 for listid in endpoints:
331 task_done[listid] = True
335 for listid in endpoints:
336 for revdep in self.runq_depends[listid]:
337 weight[revdep] = weight[revdep] + weight[listid]
338 deps_left[revdep] = deps_left[revdep] - 1
339 if deps_left[revdep] == 0:
340 next_points.append(revdep)
341 task_done[revdep] = True
342 endpoints = next_points
343 if len(next_points) == 0:
346 # Circular dependency sanity check
348 for task in xrange(numTasks):
349 if task_done[task] is False or deps_left[task] != 0:
350 problem_tasks.append(task)
351 logger.debug(2, "Task %s (%s) is not buildable", task, self.get_user_idstring(task))
352 logger.debug(2, "(Complete marker was %s and the remaining dependency count was %s)\n", task_done[task], deps_left[task])
355 message = "Unbuildable tasks were found.\n"
356 message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
357 message = message + "Identifying dependency loops (this may take a short while)...\n"
358 logger.error(message)
360 msgs = self.circular_depchains_handler(problem_tasks)
364 message = message + msg
365 bb.msg.fatal("RunQueue", message)
371 Turn a set of taskData into a RunQueue and compute data needed
372 to optimise the execution order.
376 recursive_tdepends = {}
377 runq_recrdepends = []
380 taskData = self.taskData
382 if len(taskData.tasks_name) == 0:
386 logger.info("Preparing runqueue")
388 # Step A - Work out a list of tasks to run
390 # Taskdata gives us a list of possible providers for every build and run
391 # target ordered by priority. It also gives information on each of those
394 # To create the actual list of tasks to execute we fix the list of
395 # providers and then resolve the dependencies into task IDs. This
396 # process is repeated for each type of dependency (tdepends, deptask,
397 # rdeptast, recrdeptask, idepends).
399 def add_build_dependencies(depids, tasknames, depends):
401 # Won't be in build_targets if ASSUME_PROVIDED
402 if depid not in taskData.build_targets:
404 depdata = taskData.build_targets[depid][0]
407 dep = taskData.fn_index[depdata]
408 for taskname in tasknames:
409 taskid = taskData.gettask_id(dep, taskname, False)
410 if taskid is not None:
411 depends.append(taskid)
413 def add_runtime_dependencies(depids, tasknames, depends):
415 if depid not in taskData.run_targets:
417 depdata = taskData.run_targets[depid][0]
420 dep = taskData.fn_index[depdata]
421 for taskname in tasknames:
422 taskid = taskData.gettask_id(dep, taskname, False)
423 if taskid is not None:
424 depends.append(taskid)
426 for task in xrange(len(taskData.tasks_name)):
429 fnid = taskData.tasks_fnid[task]
430 fn = taskData.fn_index[fnid]
431 task_deps = self.dataCache.task_deps[fn]
433 logger.debug(2, "Processing %s:%s", fn, taskData.tasks_name[task])
435 if fnid not in taskData.failed_fnids:
437 # Resolve task internal dependencies
439 # e.g. addtask before X after Y
440 depends = taskData.tasks_tdepends[task]
442 # Resolve 'deptask' dependencies
444 # e.g. do_sometask[deptask] = "do_someothertask"
445 # (makes sure sometask runs after someothertask of all DEPENDS)
446 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
447 tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
448 add_build_dependencies(taskData.depids[fnid], tasknames, depends)
450 # Resolve 'rdeptask' dependencies
452 # e.g. do_sometask[rdeptask] = "do_someothertask"
453 # (makes sure sometask runs after someothertask of all RDEPENDS)
454 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
455 taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
456 add_runtime_dependencies(taskData.rdepids[fnid], [taskname], depends)
458 # Resolve inter-task dependencies
460 # e.g. do_sometask[depends] = "targetname:do_someothertask"
461 # (makes sure sometask runs after targetname's someothertask)
462 if fnid not in tdepends_fnid:
463 tdepends_fnid[fnid] = set()
464 idepends = taskData.tasks_idepends[task]
465 for (depid, idependtask) in idepends:
466 if depid in taskData.build_targets:
467 # Won't be in build_targets if ASSUME_PROVIDED
468 depdata = taskData.build_targets[depid][0]
469 if depdata is not None:
470 dep = taskData.fn_index[depdata]
471 taskid = taskData.gettask_id(dep, idependtask, False)
473 bb.msg.fatal("RunQueue", "Task %s in %s depends upon nonexistant task %s in %s" % (taskData.tasks_name[task], fn, idependtask, dep))
474 depends.append(taskid)
476 tdepends_fnid[fnid].add(taskid)
479 # Resolve recursive 'recrdeptask' dependencies (A)
481 # e.g. do_sometask[recrdeptask] = "do_someothertask"
482 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
483 # We cover the recursive part of the dependencies below
484 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
485 for taskname in task_deps['recrdeptask'][taskData.tasks_name[task]].split():
486 recrdepends.append(taskname)
487 add_build_dependencies(taskData.depids[fnid], [taskname], depends)
488 add_runtime_dependencies(taskData.rdepids[fnid], [taskname], depends)
490 # Rmove all self references
493 logger.debug(2, "Task %s (%s %s) contains self reference! %s", task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends)
499 self.runq_fnid.append(taskData.tasks_fnid[task])
500 self.runq_task.append(taskData.tasks_name[task])
501 self.runq_depends.append(set(depends))
502 self.runq_revdeps.append(set())
503 self.runq_hash.append("")
506 runq_recrdepends.append(recrdepends)
509 # Build a list of recursive cumulative dependencies for each fnid
510 # We do this by fnid, since if A depends on some task in B
511 # we're interested in later tasks B's fnid might have but B itself
514 # Algorithm is O(tasks) + O(tasks)*O(fnids)
517 for task in xrange(len(self.runq_fnid)):
518 fnid = self.runq_fnid[task]
519 if fnid not in reccumdepends:
520 if fnid in tdepends_fnid:
521 reccumdepends[fnid] = tdepends_fnid[fnid]
523 reccumdepends[fnid] = set()
524 reccumdepends[fnid].update(self.runq_depends[task])
525 for task in xrange(len(self.runq_fnid)):
526 taskfnid = self.runq_fnid[task]
527 for fnid in reccumdepends:
528 if task in reccumdepends[fnid]:
529 reccumdepends[fnid].add(task)
530 if taskfnid in reccumdepends:
531 reccumdepends[fnid].update(reccumdepends[taskfnid])
534 # Resolve recursive 'recrdeptask' dependencies (B)
536 # e.g. do_sometask[recrdeptask] = "do_someothertask"
537 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
538 for task in xrange(len(self.runq_fnid)):
539 if len(runq_recrdepends[task]) > 0:
540 taskfnid = self.runq_fnid[task]
541 for dep in reccumdepends[taskfnid]:
542 # Ignore self references
545 for taskname in runq_recrdepends[task]:
546 if taskData.tasks_name[dep] == taskname:
547 self.runq_depends[task].add(dep)
549 # Step B - Mark all active tasks
551 # Start with the tasks we were asked to run and mark all dependencies
552 # as active too. If the task is to be 'forced', clear its stamp. Once
553 # all active tasks are marked, prune the ones we don't need.
555 logger.verbose("Marking Active Tasks")
557 def mark_active(listid, depth):
559 Mark an item as active along with its depends
560 (calls itself recursively)
563 if runq_build[listid] == 1:
566 runq_build[listid] = 1
568 depends = self.runq_depends[listid]
569 for depend in depends:
570 mark_active(depend, depth+1)
572 self.target_pairs = []
573 for target in self.targets:
574 targetid = taskData.getbuild_id(target[0])
576 if targetid not in taskData.build_targets:
579 if targetid in taskData.failed_deps:
582 fnid = taskData.build_targets[targetid][0]
583 fn = taskData.fn_index[fnid]
584 self.target_pairs.append((fn, target[1]))
586 if fnid in taskData.failed_fnids:
589 if target[1] not in taskData.tasks_lookup[fnid]:
590 bb.msg.fatal("RunQueue", "Task %s does not exist for target %s" % (target[1], target[0]))
592 listid = taskData.tasks_lookup[fnid][target[1]]
594 mark_active(listid, 1)
596 # Step C - Prune all inactive tasks
598 # Once all active tasks are marked, prune the ones we don't need.
602 for listid in xrange(len(self.runq_fnid)):
603 if runq_build[listid-delcount] == 1:
604 maps.append(listid-delcount)
606 del self.runq_fnid[listid-delcount]
607 del self.runq_task[listid-delcount]
608 del self.runq_depends[listid-delcount]
609 del runq_build[listid-delcount]
610 del self.runq_revdeps[listid-delcount]
611 del self.runq_hash[listid-delcount]
612 delcount = delcount + 1
616 # Step D - Sanity checks and computation
619 # Check to make sure we still have tasks to run
620 if len(self.runq_fnid) == 0:
621 if not taskData.abort:
622 bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
624 bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.")
626 logger.verbose("Pruned %s inactive tasks, %s left", delcount, len(self.runq_fnid))
628 # Remap the dependencies to account for the deleted tasks
629 # Check we didn't delete a task we depend on
630 for listid in xrange(len(self.runq_fnid)):
632 origdeps = self.runq_depends[listid]
633 for origdep in origdeps:
634 if maps[origdep] == -1:
635 bb.msg.fatal("RunQueue", "Invalid mapping - Should never happen!")
636 newdeps.append(maps[origdep])
637 self.runq_depends[listid] = set(newdeps)
639 logger.verbose("Assign Weightings")
641 # Generate a list of reverse dependencies to ease future calculations
642 for listid in xrange(len(self.runq_fnid)):
643 for dep in self.runq_depends[listid]:
644 self.runq_revdeps[dep].add(listid)
646 # Identify tasks at the end of dependency chains
647 # Error on circular dependency loops (length two)
649 for listid in xrange(len(self.runq_fnid)):
650 revdeps = self.runq_revdeps[listid]
651 if len(revdeps) == 0:
652 endpoints.append(listid)
654 if dep in self.runq_depends[listid]:
655 #self.dump_data(taskData)
656 bb.msg.fatal("RunQueue", "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep], taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
658 logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
660 # Calculate task weights
661 # Check of higher length circular dependencies
662 self.runq_weight = self.calculate_task_weights(endpoints)
664 # Sanity Check - Check for multiple tasks building the same provider
667 for task in xrange(len(self.runq_fnid)):
668 fn = taskData.fn_index[self.runq_fnid[task]]
672 for prov in self.dataCache.fn_provides[fn]:
673 if prov not in prov_list:
674 prov_list[prov] = [fn]
675 elif fn not in prov_list[prov]:
676 prov_list[prov].append(fn)
678 for prov in prov_list:
679 if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
681 logger.error("Multiple .bb files are due to be built which each provide %s (%s).\n This usually means one provides something the other doesn't and should.", prov, " ".join(prov_list[prov]))
684 # Create a whitelist usable by the stamp checks
685 stampfnwhitelist = []
686 for entry in self.stampwhitelist.split():
687 entryid = self.taskData.getbuild_id(entry)
688 if entryid not in self.taskData.build_targets:
690 fnid = self.taskData.build_targets[entryid][0]
691 fn = self.taskData.fn_index[fnid]
692 stampfnwhitelist.append(fn)
693 self.stampfnwhitelist = stampfnwhitelist
695 # Interate over the task list looking for tasks with a 'setscene' function
696 self.runq_setscene = []
697 for task in range(len(self.runq_fnid)):
698 setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
701 self.runq_setscene.append(task)
703 # Interate over the task list and call into the siggen code
705 todeal = set(range(len(self.runq_fnid)))
706 while len(todeal) > 0:
707 for task in todeal.copy():
708 if len(self.runq_depends[task] - dealtwith) == 0:
712 for dep in self.runq_depends[task]:
713 procdep.append(self.taskData.fn_index[self.runq_fnid[dep]] + "." + self.runq_task[dep])
714 self.runq_hash[task] = bb.parse.siggen.get_taskhash(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task], procdep, self.dataCache)
718 for task in xrange(len(self.runq_fnid)):
719 identifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[task]],
720 self.runq_task[task])
721 self.hashes[identifier] = self.runq_hash[task]
723 for dep in self.runq_depends[task]:
724 depidentifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[dep]],
726 deps.append(depidentifier)
727 self.hash_deps[identifier] = deps
729 # Remove stamps for targets if force mode active
730 if self.cooker.configuration.force:
731 for (fn, target) in self.target_pairs:
732 logger.verbose("Remove stamp %s, %s", target, fn)
733 bb.build.del_stamp(target, self.dataCache, fn)
735 return len(self.runq_fnid)
737 def dump_data(self, taskQueue):
739 Dump some debug information on the internal data structures
741 logger.debug(3, "run_tasks:")
742 for task in xrange(len(self.rqdata.runq_task)):
743 logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
744 taskQueue.fn_index[self.rqdata.runq_fnid[task]],
745 self.rqdata.runq_task[task],
746 self.rqdata.runq_weight[task],
747 self.rqdata.runq_depends[task],
748 self.rqdata.runq_revdeps[task])
750 logger.debug(3, "sorted_tasks:")
751 for task1 in xrange(len(self.rqdata.runq_task)):
752 if task1 in self.prio_map:
753 task = self.prio_map[task1]
754 logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
755 taskQueue.fn_index[self.rqdata.runq_fnid[task]],
756 self.rqdata.runq_task[task],
757 self.rqdata.runq_weight[task],
758 self.rqdata.runq_depends[task],
759 self.rqdata.runq_revdeps[task])
762 def __init__(self, cooker, cfgData, dataCache, taskData, targets):
765 self.cfgData = cfgData
766 self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets)
768 self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, True) or "perfile"
769 self.hashvalidate = bb.data.getVar("BB_HASHCHECK_FUNCTION", cfgData, True) or None
771 self.state = runQueuePrepare
773 def check_stamps(self):
779 if self.stamppolicy == "perfile":
784 if self.stamppolicy == "whitelist":
785 stampwhitelist = self.rqdata.stampfnwhitelist
787 for task in xrange(len(self.rqdata.runq_fnid)):
789 if len(self.rqdata.runq_depends[task]) == 0:
790 buildable.append(task)
792 def check_buildable(self, task, buildable):
793 for revdep in self.rqdata.runq_revdeps[task]:
795 for dep in self.rqdata.runq_depends[revdep]:
799 if revdep in unchecked:
800 buildable.append(revdep)
802 for task in xrange(len(self.rqdata.runq_fnid)):
803 if task not in unchecked:
805 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
806 taskname = self.rqdata.runq_task[task]
807 stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
808 # If the stamp is missing its not current
809 if not os.access(stampfile, os.F_OK):
811 notcurrent.append(task)
812 check_buildable(self, task, buildable)
814 # If its a 'nostamp' task, it's not current
815 taskdep = self.rqdata.dataCache.task_deps[fn]
816 if 'nostamp' in taskdep and task in taskdep['nostamp']:
818 notcurrent.append(task)
819 check_buildable(self, task, buildable)
822 while (len(buildable) > 0):
824 for task in buildable:
825 if task in unchecked:
826 fn = self.taskData.fn_index[self.rqdata.runq_fnid[task]]
827 taskname = self.rqdata.runq_task[task]
828 stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
831 t1 = os.stat(stampfile)[stat.ST_MTIME]
832 for dep in self.rqdata.runq_depends[task]:
834 fn2 = self.taskData.fn_index[self.rqdata.runq_fnid[dep]]
835 taskname2 = self.rqdata.runq_task[dep]
836 stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
837 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
838 if dep in notcurrent:
841 t2 = os.stat(stampfile2)[stat.ST_MTIME]
848 notcurrent.append(task)
850 check_buildable(self, task, nextbuildable)
852 buildable = nextbuildable
854 #for task in range(len(self.runq_fnid)):
855 # fn = self.taskData.fn_index[self.runq_fnid[task]]
856 # taskname = self.runq_task[task]
857 # print "%s %s.%s" % (task, taskname, fn)
859 #print "Unchecked: %s" % unchecked
860 #print "Current: %s" % current
861 #print "Not current: %s" % notcurrent
863 if len(unchecked) > 0:
864 bb.msg.fatal("RunQueue", "check_stamps fatal internal error")
867 def check_stamp_task(self, task, taskname = None):
868 def get_timestamp(f):
870 if not os.access(f, os.F_OK):
872 return os.stat(f)[stat.ST_MTIME]
876 if self.stamppolicy == "perfile":
881 if self.stamppolicy == "whitelist":
882 stampwhitelist = self.rqdata.stampfnwhitelist
884 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
886 taskname = self.rqdata.runq_task[task]
888 stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
890 # If the stamp is missing its not current
891 if not os.access(stampfile, os.F_OK):
892 logger.debug(2, "Stampfile %s not available", stampfile)
894 # If its a 'nostamp' task, it's not current
895 taskdep = self.rqdata.dataCache.task_deps[fn]
896 if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
897 logger.debug(2, "%s.%s is nostamp\n", fn, taskname)
900 if taskname != "do_setscene" and taskname.endswith("_setscene"):
904 t1 = get_timestamp(stampfile)
905 for dep in self.rqdata.runq_depends[task]:
907 fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]]
908 taskname2 = self.rqdata.runq_task[dep]
909 stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
910 stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCache, fn2)
911 t2 = get_timestamp(stampfile2)
912 t3 = get_timestamp(stampfile3)
915 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
917 logger.debug(2, 'Stampfile %s does not exist', stampfile2)
920 logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2)
925 def execute_runqueue(self):
927 Run the tasks in a queue prepared by rqdata.prepare()
928 Upon failure, optionally try to recover the build using any alternate providers
929 (if the abort on failure configuration option isn't set)
934 if self.state is runQueuePrepare:
935 self.rqexe = RunQueueExecuteDummy(self)
936 if self.rqdata.prepare() == 0:
937 self.state = runQueueComplete
939 self.state = runQueueSceneInit
941 if self.state is runQueueSceneInit:
942 if self.cooker.configuration.dump_signatures:
943 self.dump_signatures()
945 self.rqexe = RunQueueExecuteScenequeue(self)
947 if self.state is runQueueSceneRun:
948 retval = self.rqexe.execute()
950 if self.state is runQueueRunInit:
951 logger.info("Executing RunQueue Tasks")
952 self.rqexe = RunQueueExecuteTasks(self)
953 self.state = runQueueRunning
955 if self.state is runQueueRunning:
956 retval = self.rqexe.execute()
958 if self.state is runQueueCleanUp:
961 if self.state is runQueueFailed:
962 if not self.rqdata.taskData.tryaltconfigs:
963 raise bb.runqueue.TaskFailure(self.rqexe.failed_fnids)
964 for fnid in self.rqexe.failed_fnids:
965 self.rqdata.taskData.fail_fnid(fnid)
968 if self.state is runQueueComplete:
970 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
973 if self.state is runQueueChildProcess:
974 print("Child process, eeek, shouldn't happen!")
980 def finish_runqueue(self, now = False):
982 self.rqexe.finish_now()
986 def dump_signatures(self):
987 self.state = runQueueComplete
989 bb.note("Reparsing files to collect dependency data")
990 for task in range(len(self.rqdata.runq_fnid)):
991 if self.rqdata.runq_fnid[task] not in done:
992 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
993 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
994 done.add(self.rqdata.runq_fnid[task])
996 bb.parse.siggen.dump_sigs(self.rqdata.dataCache)
1001 class RunQueueExecute:
1003 def __init__(self, rq):
1005 self.cooker = rq.cooker
1006 self.cfgData = rq.cfgData
1007 self.rqdata = rq.rqdata
1009 self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", self.cfgData, 1) or 1)
1010 self.scheduler = bb.data.getVar("BB_SCHEDULER", self.cfgData, 1) or "speed"
1012 self.runq_buildable = []
1013 self.runq_running = []
1014 self.runq_complete = []
1015 self.build_pids = {}
1016 self.build_pipes = {}
1017 self.build_stamps = {}
1018 self.failed_fnids = []
1020 def runqueue_process_waitpid(self):
1022 Return none is there are no processes awaiting result collection, otherwise
1023 collect the process exit codes and close the information pipe.
1025 result = os.waitpid(-1, os.WNOHANG)
1026 if result[0] == 0 and result[1] == 0:
1028 task = self.build_pids[result[0]]
1029 del self.build_pids[result[0]]
1030 self.build_pipes[result[0]].close()
1031 del self.build_pipes[result[0]]
1032 # self.build_stamps[result[0]] may not exist when use shared work directory.
1033 if result[0] in self.build_stamps.keys():
1034 del self.build_stamps[result[0]]
1036 self.task_fail(task, result[1]>>8)
1038 self.task_complete(task)
1041 def finish_now(self):
1042 if self.stats.active:
1043 logger.info("Sending SIGTERM to remaining %s tasks", self.stats.active)
1044 for k, v in self.build_pids.iteritems():
1046 os.kill(-k, signal.SIGTERM)
1049 for pipe in self.build_pipes:
1050 self.build_pipes[pipe].read()
1053 self.rq.state = runQueueCleanUp
1055 for pipe in self.build_pipes:
1056 self.build_pipes[pipe].read()
1058 if self.stats.active > 0:
1059 bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1060 self.runqueue_process_waitpid()
1063 if len(self.failed_fnids) != 0:
1064 self.rq.state = runQueueFailed
1067 self.rq.state = runQueueComplete
1070 def fork_off_task(self, fn, task, taskname, quieterrors=False):
1071 # We need to setup the environment BEFORE the fork, since
1072 # a fork() or exec*() activates PSEUDO...
1078 taskdep = self.rqdata.dataCache.task_deps[fn]
1079 if 'umask' in taskdep and taskname in taskdep['umask']:
1080 # umask might come in as a number or text string..
1082 umask = int(taskdep['umask'][taskname],8)
1084 umask = taskdep['umask'][taskname]
1086 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
1087 envvars = (self.rqdata.dataCache.fakerootenv[fn] or "").split()
1088 for key, value in (var.split('=') for var in envvars):
1089 envbackup[key] = os.environ.get(key)
1090 os.environ[key] = value
1091 fakeenv[key] = value
1093 fakedirs = (self.rqdata.dataCache.fakerootdirs[fn] or "").split()
1095 bb.utils.mkdirhier(p)
1097 logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
1098 (fn, taskname, ', '.join(fakedirs)))
1100 envvars = (self.rqdata.dataCache.fakerootnoenv[fn] or "").split()
1101 for key, value in (var.split('=') for var in envvars):
1102 envbackup[key] = os.environ.get(key)
1103 os.environ[key] = value
1104 fakeenv[key] = value
1109 pipein, pipeout = os.pipe()
1110 pipein = os.fdopen(pipein, 'rb', 4096)
1111 pipeout = os.fdopen(pipeout, 'wb', 0)
1113 except OSError as e:
1114 bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
1119 # Save out the PID so that the event can include it the
1121 bb.event.worker_pid = os.getpid()
1122 bb.event.worker_pipe = pipeout
1124 self.rq.state = runQueueChildProcess
1125 # Make the child the process group leader
1128 newsi = os.open(os.devnull, os.O_RDWR)
1129 os.dup2(newsi, sys.stdin.fileno())
1134 bb.data.setVar("BB_WORKERCONTEXT", "1", self.cooker.configuration.data)
1135 bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
1136 bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data)
1137 bb.parse.siggen.set_taskdata(self.rqdata.hashes, self.rqdata.hash_deps)
1140 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
1141 the_data.setVar('BB_TASKHASH', self.rqdata.runq_hash[task])
1142 for h in self.rqdata.hashes:
1143 the_data.setVar("BBHASH_%s" % h, self.rqdata.hashes[h])
1144 for h in self.rqdata.hash_deps:
1145 the_data.setVar("BBHASHDEPS_%s" % h, self.rqdata.hash_deps[h])
1147 # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
1148 # successfully. We also need to unset anything from the environment which shouldn't be there
1149 exports = bb.data.exported_vars(the_data)
1150 bb.utils.empty_environment()
1151 for e, v in exports:
1154 os.environ[e] = fakeenv[e]
1155 the_data.setVar(e, fakeenv[e])
1158 the_data.setVarFlag(taskname, "quieterrors", "1")
1160 except Exception as exc:
1162 logger.critical(str(exc))
1165 ret = bb.build.exec_task(fn, taskname, the_data)
1170 for key, value in envbackup.iteritems():
1174 os.environ[key] = value
1176 return pid, pipein, pipeout
1178 class RunQueueExecuteDummy(RunQueueExecute):
1179 def __init__(self, rq):
1181 self.stats = RunQueueStats(0)
1184 self.rq.state = runQueueComplete
1187 class RunQueueExecuteTasks(RunQueueExecute):
1188 def __init__(self, rq):
1189 RunQueueExecute.__init__(self, rq)
1191 self.stats = RunQueueStats(len(self.rqdata.runq_fnid))
1193 # Mark initial buildable tasks
1194 for task in xrange(self.stats.total):
1195 self.runq_running.append(0)
1196 self.runq_complete.append(0)
1197 if len(self.rqdata.runq_depends[task]) == 0:
1198 self.runq_buildable.append(1)
1200 self.runq_buildable.append(0)
1201 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
1202 self.rq.scenequeue_covered.add(task)
1207 for task in xrange(self.stats.total):
1208 if task in self.rq.scenequeue_covered:
1210 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
1212 for revdep in self.rqdata.runq_revdeps[task]:
1213 if self.rqdata.runq_fnid[task] != self.rqdata.runq_fnid[revdep]:
1217 self.rq.scenequeue_covered.add(task)
1219 # Detect when the real task needs to be run anyway by looking to see
1220 # if any of its dependencies within the same package are scheduled
1222 covered_remove = set()
1223 for task in self.rq.scenequeue_covered:
1224 task_fnid = self.rqdata.runq_fnid[task]
1225 for dep in self.rqdata.runq_depends[task]:
1226 if self.rqdata.runq_fnid[dep] == task_fnid:
1227 if dep not in self.rq.scenequeue_covered:
1228 covered_remove.add(task)
1231 for task in covered_remove:
1232 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1233 taskname = self.rqdata.runq_task[task] + '_setscene'
1234 bb.build.del_stamp(taskname, self.rqdata.dataCache, fn)
1235 logger.debug(1, 'Not skipping task %s because it will have to be run anyway', task)
1236 self.rq.scenequeue_covered.remove(task)
1238 logger.debug(1, 'Full skip list %s', self.rq.scenequeue_covered)
1240 for task in self.rq.scenequeue_covered:
1241 self.task_skip(task)
1243 event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
1245 schedulers = self.get_schedulers()
1246 for scheduler in schedulers:
1247 if self.scheduler == scheduler.name:
1248 self.sched = scheduler(self, self.rqdata)
1249 logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name)
1252 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
1253 (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1256 def get_schedulers(self):
1257 schedulers = set(obj for obj in globals().values()
1258 if type(obj) is type and
1259 issubclass(obj, RunQueueScheduler))
1261 user_schedulers = bb.data.getVar("BB_SCHEDULERS", self.cfgData, True)
1263 for sched in user_schedulers.split():
1264 if not "." in sched:
1265 bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
1268 modname, name = sched.rsplit(".", 1)
1270 module = __import__(modname, fromlist=(name,))
1271 except ImportError as exc:
1272 logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
1275 schedulers.add(getattr(module, name))
1278 def task_completeoutright(self, task):
1280 Mark a task as completed
1281 Look at the reverse dependencies and mark any task with
1282 completed dependencies as buildable
1284 self.runq_complete[task] = 1
1285 for revdep in self.rqdata.runq_revdeps[task]:
1286 if self.runq_running[revdep] == 1:
1288 if self.runq_buildable[revdep] == 1:
1291 for dep in self.rqdata.runq_depends[revdep]:
1292 if self.runq_complete[dep] != 1:
1295 self.runq_buildable[revdep] = 1
1296 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
1297 taskname = self.rqdata.runq_task[revdep]
1298 logger.debug(1, "Marking task %s (%s, %s) as buildable", revdep, fn, taskname)
1300 def task_complete(self, task):
1301 self.stats.taskCompleted()
1302 bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1303 self.task_completeoutright(task)
1305 def task_fail(self, task, exitcode):
1307 Called when a task has failed
1308 Updates the state engine with the failure
1310 self.stats.taskFailed()
1311 fnid = self.rqdata.runq_fnid[task]
1312 self.failed_fnids.append(fnid)
1313 bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq), self.cfgData)
1314 if self.rqdata.taskData.abort:
1315 self.rq.state = runQueueCleanUp
1317 def task_skip(self, task):
1318 self.runq_running[task] = 1
1319 self.runq_buildable[task] = 1
1320 self.task_completeoutright(task)
1321 self.stats.taskCompleted()
1322 self.stats.taskSkipped()
1326 Run the tasks in a queue prepared by rqdata.prepare()
1329 if self.stats.total == 0:
1331 self.rq.state = runQueueCleanUp
1333 task = self.sched.next()
1334 if task is not None:
1335 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1337 taskname = self.rqdata.runq_task[task]
1338 if self.rq.check_stamp_task(task, taskname):
1339 logger.debug(2, "Stamp current task %s (%s)", task,
1340 self.rqdata.get_user_idstring(task))
1341 self.task_skip(task)
1343 elif self.cooker.configuration.dry_run:
1344 self.runq_running[task] = 1
1345 self.runq_buildable[task] = 1
1346 self.stats.taskActive()
1347 self.task_complete(task)
1350 taskdep = self.rqdata.dataCache.task_deps[fn]
1351 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1352 startevent = runQueueTaskStarted(task, self.stats, self.rq,
1354 bb.event.fire(startevent, self.cfgData)
1355 self.runq_running[task] = 1
1356 self.stats.taskActive()
1357 bb.build.make_stamp(taskname, self.rqdata.dataCache, fn)
1358 self.task_complete(task)
1361 startevent = runQueueTaskStarted(task, self.stats, self.rq)
1362 bb.event.fire(startevent, self.cfgData)
1364 pid, pipein, pipeout = self.fork_off_task(fn, task, taskname)
1366 self.build_pids[pid] = task
1367 self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
1368 self.build_stamps[pid] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
1369 self.runq_running[task] = 1
1370 self.stats.taskActive()
1371 if self.stats.active < self.number_tasks:
1374 for pipe in self.build_pipes:
1375 self.build_pipes[pipe].read()
1377 if self.stats.active > 0:
1378 if self.runqueue_process_waitpid() is None:
1382 if len(self.failed_fnids) != 0:
1383 self.rq.state = runQueueFailed
1387 for task in xrange(self.stats.total):
1388 if self.runq_buildable[task] == 0:
1389 logger.error("Task %s never buildable!", task)
1390 if self.runq_running[task] == 0:
1391 logger.error("Task %s never ran!", task)
1392 if self.runq_complete[task] == 0:
1393 logger.error("Task %s never completed!", task)
1394 self.rq.state = runQueueComplete
1397 class RunQueueExecuteScenequeue(RunQueueExecute):
1398 def __init__(self, rq):
1399 RunQueueExecute.__init__(self, rq)
1401 self.scenequeue_covered = set()
1402 self.scenequeue_notcovered = set()
1404 # If we don't have any setscene functions, skip this step
1405 if len(self.rqdata.runq_setscene) == 0:
1406 rq.scenequeue_covered = set()
1407 rq.state = runQueueRunInit
1410 self.stats = RunQueueStats(len(self.rqdata.runq_setscene))
1415 sq_revdeps_squash = []
1417 # We need to construct a dependency graph for the setscene functions. Intermediate
1418 # dependencies between the setscene tasks only complicate the code. This code
1419 # therefore aims to collapse the huge runqueue dependency tree into a smaller one
1420 # only containing the setscene functions.
1422 for task in xrange(self.stats.total):
1423 self.runq_running.append(0)
1424 self.runq_complete.append(0)
1425 self.runq_buildable.append(0)
1427 for task in xrange(len(self.rqdata.runq_fnid)):
1428 sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task]))
1429 sq_revdeps_new.append(set())
1430 if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
1431 endpoints[task] = None
1433 for task in self.rqdata.runq_setscene:
1434 for dep in self.rqdata.runq_depends[task]:
1435 endpoints[dep] = task
1437 def process_endpoints(endpoints):
1439 for point, task in endpoints.items():
1443 if sq_revdeps_new[point]:
1444 tasks |= sq_revdeps_new[point]
1445 sq_revdeps_new[point] = set()
1446 for dep in self.rqdata.runq_depends[point]:
1447 if point in sq_revdeps[dep]:
1448 sq_revdeps[dep].remove(point)
1450 sq_revdeps_new[dep] |= tasks
1451 if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene:
1452 newendpoints[dep] = task
1453 if len(newendpoints) != 0:
1454 process_endpoints(newendpoints)
1456 process_endpoints(endpoints)
1458 for task in xrange(len(self.rqdata.runq_fnid)):
1459 if task in self.rqdata.runq_setscene:
1461 for dep in sq_revdeps_new[task]:
1462 deps.add(self.rqdata.runq_setscene.index(dep))
1463 sq_revdeps_squash.append(deps)
1464 elif len(sq_revdeps_new[task]) != 0:
1465 bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
1467 #for task in xrange(len(sq_revdeps_squash)):
1468 # print "Task %s: %s.%s is %s " % (task, self.taskData.fn_index[self.runq_fnid[self.runq_setscene[task]]], self.runq_task[self.runq_setscene[task]] + "_setscene", sq_revdeps_squash[task])
1471 self.sq_revdeps = sq_revdeps_squash
1472 self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps)
1474 for task in xrange(len(self.sq_revdeps)):
1475 self.sq_deps.append(set())
1476 for task in xrange(len(self.sq_revdeps)):
1477 for dep in self.sq_revdeps[task]:
1478 self.sq_deps[dep].add(task)
1480 for task in xrange(len(self.sq_revdeps)):
1481 if len(self.sq_revdeps[task]) == 0:
1482 self.runq_buildable[task] = 1
1484 if self.rq.hashvalidate:
1492 for task in xrange(len(self.sq_revdeps)):
1493 realtask = self.rqdata.runq_setscene[task]
1494 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1495 taskname = self.rqdata.runq_task[realtask]
1496 taskdep = self.rqdata.dataCache.task_deps[fn]
1498 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1500 self.task_skip(task)
1501 bb.build.make_stamp(taskname + "_setscene", self.rqdata.dataCache, fn)
1504 if self.rq.check_stamp_task(realtask, taskname + "_setscene"):
1505 logger.debug(2, 'Setscene stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(realtask))
1506 stamppresent.append(task)
1507 self.task_skip(task)
1511 sq_hashfn.append(self.rqdata.dataCache.hashfn[fn])
1512 sq_hash.append(self.rqdata.runq_hash[realtask])
1513 sq_taskname.append(taskname)
1514 sq_task.append(task)
1515 call = self.rq.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
1516 locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.configuration.data }
1517 valid = bb.utils.better_eval(call, locs)
1519 valid_new = stamppresent
1521 valid_new.append(sq_task[v])
1523 for task in xrange(len(self.sq_revdeps)):
1524 if task not in valid_new and task not in noexec:
1525 logger.debug(2, 'No package found, so skipping setscene task %s',
1526 self.rqdata.get_user_idstring(task))
1527 self.task_failoutright(task)
1529 logger.info('Executing SetScene Tasks')
1531 self.rq.state = runQueueSceneRun
1533 def scenequeue_updatecounters(self, task):
1534 for dep in self.sq_deps[task]:
1535 self.sq_revdeps2[dep].remove(task)
1536 if len(self.sq_revdeps2[dep]) == 0:
1537 self.runq_buildable[dep] = 1
1539 def task_completeoutright(self, task):
1541 Mark a task as completed
1542 Look at the reverse dependencies and mark any task with
1543 completed dependencies as buildable
1546 index = self.rqdata.runq_setscene[task]
1547 logger.debug(1, 'Found task %s which could be accelerated',
1548 self.rqdata.get_user_idstring(index))
1550 self.scenequeue_covered.add(task)
1551 self.scenequeue_updatecounters(task)
1553 def task_complete(self, task):
1554 self.stats.taskCompleted()
1555 self.task_completeoutright(task)
1557 def task_fail(self, task, result):
1558 self.stats.taskFailed()
1559 index = self.rqdata.runq_setscene[task]
1560 bb.event.fire(sceneQueueTaskFailed(index, self.stats, result, self), self.cfgData)
1561 self.scenequeue_notcovered.add(task)
1562 self.scenequeue_updatecounters(task)
1564 def task_failoutright(self, task):
1565 self.runq_running[task] = 1
1566 self.runq_buildable[task] = 1
1567 self.stats.taskCompleted()
1568 self.stats.taskSkipped()
1569 index = self.rqdata.runq_setscene[task]
1570 self.scenequeue_notcovered.add(task)
1571 self.scenequeue_updatecounters(task)
1573 def task_skip(self, task):
1574 self.runq_running[task] = 1
1575 self.runq_buildable[task] = 1
1576 self.task_completeoutright(task)
1577 self.stats.taskCompleted()
1578 self.stats.taskSkipped()
1582 Run the tasks in a queue prepared by prepare_runqueue
1586 if self.stats.active < self.number_tasks:
1587 # Find the next setscene to run
1588 for nexttask in xrange(self.stats.total):
1589 if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
1592 if task is not None:
1593 realtask = self.rqdata.runq_setscene[task]
1594 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1596 taskname = self.rqdata.runq_task[realtask] + "_setscene"
1597 if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask]):
1598 logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant',
1599 task, self.rqdata.get_user_idstring(realtask))
1600 self.task_failoutright(task)
1603 if self.cooker.configuration.force:
1604 for target in self.rqdata.target_pairs:
1605 if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
1606 self.task_failoutright(task)
1609 if self.rq.check_stamp_task(realtask, taskname):
1610 logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies',
1611 task, self.rqdata.get_user_idstring(realtask))
1612 self.task_skip(task)
1615 pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname)
1617 self.build_pids[pid] = task
1618 self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
1619 self.runq_running[task] = 1
1620 self.stats.taskActive()
1621 if self.stats.active < self.number_tasks:
1624 for pipe in self.build_pipes:
1625 self.build_pipes[pipe].read()
1627 if self.stats.active > 0:
1628 if self.runqueue_process_waitpid() is None:
1632 # Convert scenequeue_covered task numbers into full taskgraph ids
1633 oldcovered = self.scenequeue_covered
1634 self.rq.scenequeue_covered = set()
1635 for task in oldcovered:
1636 self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
1638 logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered))
1640 self.rq.state = runQueueRunInit
1643 def fork_off_task(self, fn, task, taskname):
1644 return RunQueueExecute.fork_off_task(self, fn, task, taskname, quieterrors=True)
1646 class TaskFailure(Exception):
1648 Exception raised when a task in a runqueue fails
1650 def __init__(self, x):
1654 class runQueueExitWait(bb.event.Event):
1656 Event when waiting for task processes to exit
1659 def __init__(self, remain):
1660 self.remain = remain
1661 self.message = "Waiting for %s active tasks to finish" % remain
1662 bb.event.Event.__init__(self)
1664 class runQueueEvent(bb.event.Event):
1666 Base runQueue event class
1668 def __init__(self, task, stats, rq):
1670 self.taskstring = rq.rqdata.get_user_idstring(task)
1671 self.stats = stats.copy()
1672 bb.event.Event.__init__(self)
1674 class runQueueTaskStarted(runQueueEvent):
1676 Event notifing a task was started
1678 def __init__(self, task, stats, rq, noexec=False):
1679 runQueueEvent.__init__(self, task, stats, rq)
1680 self.noexec = noexec
1682 class runQueueTaskFailed(runQueueEvent):
1684 Event notifing a task failed
1686 def __init__(self, task, stats, exitcode, rq):
1687 runQueueEvent.__init__(self, task, stats, rq)
1688 self.exitcode = exitcode
1690 class sceneQueueTaskFailed(runQueueTaskFailed):
1692 Event notifing a setscene task failed
1694 def __init__(self, task, stats, exitcode, rq):
1695 runQueueTaskFailed.__init__(self, task, stats, exitcode, rq)
1696 self.taskstring = rq.rqdata.get_user_idstring(task, "_setscene")
1698 class runQueueTaskCompleted(runQueueEvent):
1700 Event notifing a task completed
1703 def check_stamp_fn(fn, taskname, d):
1704 rqexe = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)
1705 fn = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", d)
1706 fnid = rqexe.rqdata.taskData.getfn_id(fn)
1707 taskid = rqexe.rqdata.get_task_id(fnid, taskname)
1708 if taskid is not None:
1709 return rqexe.rq.check_stamp_task(taskid)
1712 class runQueuePipe():
1714 Abstraction for a pipe between a worker thread and the server
1716 def __init__(self, pipein, pipeout, d):
1719 fcntl.fcntl(self.input, fcntl.F_SETFL, fcntl.fcntl(self.input, fcntl.F_GETFL) | os.O_NONBLOCK)
1724 start = len(self.queue)
1726 self.queue = self.queue + self.input.read(102400)
1727 except (OSError, IOError):
1729 end = len(self.queue)
1730 index = self.queue.find("</event>")
1732 bb.event.fire_from_worker(self.queue[:index+8], self.d)
1733 self.queue = self.queue[index+8:]
1734 index = self.queue.find("</event>")
1735 return (end > start)
1740 if len(self.queue) > 0:
1741 print("Warning, worker left partial message: %s" % self.queue)