2 # ex:ts=4:sw=4:sts=4:et
3 # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
5 BitBake 'RunQueue' implementation
7 Handles preparation and execution of a queue of tasks
10 # Copyright (C) 2006-2007 Richard Purdie
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License version 2 as
14 # published by the Free Software Foundation.
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 # GNU General Public License for more details.
21 # You should have received a copy of the GNU General Public License along
22 # with this program; if not, write to the Free Software Foundation, Inc.,
23 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
33 from bb import msg, data, event
35 bblogger = logging.getLogger("BitBake")
36 logger = logging.getLogger("BitBake.RunQueue")
40 Holds statistics on the tasks handled by the associated runQueue
42 def __init__(self, total):
50 obj = self.__class__(self.total)
51 obj.__dict__.update(self.__dict__)
55 self.active = self.active - 1
56 self.failed = self.failed + 1
58 def taskCompleted(self, number = 1):
59 self.active = self.active - number
60 self.completed = self.completed + number
62 def taskSkipped(self, number = 1):
63 self.active = self.active + number
64 self.skipped = self.skipped + number
67 self.active = self.active + 1
69 # These values indicate the next step due to be run in the
70 # runQueue state machine
79 runQueueChildProcess = 10
81 class RunQueueScheduler(object):
83 Control the order tasks are scheduled in.
87 def __init__(self, runqueue, rqdata):
89 The default scheduler just returns the first buildable task (the
90 priority map is sorted by task numer)
94 numTasks = len(self.rqdata.runq_fnid)
97 self.prio_map.extend(range(numTasks))
99 def next_buildable_task(self):
101 Return the id of the first task we find that is buildable
103 for tasknum in xrange(len(self.rqdata.runq_fnid)):
104 taskid = self.prio_map[tasknum]
105 if self.rq.runq_running[taskid] == 1:
107 if self.rq.runq_buildable[taskid] == 1:
108 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[taskid]]
109 taskname = self.rqdata.runq_task[taskid]
110 stamp = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
111 if stamp in self.rq.build_stamps.values():
117 Return the id of the task we should build next
119 if self.rq.stats.active < self.rq.number_tasks:
120 return self.next_buildable_task()
122 class RunQueueSchedulerSpeed(RunQueueScheduler):
124 A scheduler optimised for speed. The priority map is sorted by task weight,
125 heavier weighted tasks (tasks needed by the most other tasks) are run first.
129 def __init__(self, runqueue, rqdata):
131 The priority map is sorted by task weight.
137 sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight))
138 copyweight = copy.deepcopy(self.rqdata.runq_weight)
141 for weight in sortweight:
142 idx = copyweight.index(weight)
143 self.prio_map.append(idx)
146 self.prio_map.reverse()
148 class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
150 A scheduler optimised to complete .bb files are quickly as possible. The
151 priority map is sorted by task weight, but then reordered so once a given
152 .bb file starts to build, its completed as quickly as possible. This works
153 well where disk space is at a premium and classes like OE's rm_work are in
158 def __init__(self, runqueue, rqdata):
159 RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata)
161 #FIXME - whilst this groups all fnids together it does not reorder the
162 #fnid groups optimally.
164 basemap = copy.deepcopy(self.prio_map)
166 while (len(basemap) > 0):
167 entry = basemap.pop(0)
168 self.prio_map.append(entry)
169 fnid = self.rqdata.runq_fnid[entry]
171 for entry in basemap:
172 entry_fnid = self.rqdata.runq_fnid[entry]
173 if entry_fnid == fnid:
174 todel.append(basemap.index(entry))
175 self.prio_map.append(entry)
182 BitBake Run Queue implementation
184 def __init__(self, rq, cooker, cfgData, dataCache, taskData, targets):
186 self.dataCache = dataCache
187 self.taskData = taskData
188 self.targets = targets
191 self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or ""
192 self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split()
199 self.runq_depends = []
200 self.runq_revdeps = []
203 def runq_depends_names(self, ids):
206 for id in self.runq_depends[ids]:
207 nam = os.path.basename(self.get_user_idstring(id))
208 nam = re.sub("_[^,]*,", ",", nam)
212 def get_user_idstring(self, task, task_name_suffix = ""):
213 fn = self.taskData.fn_index[self.runq_fnid[task]]
214 taskname = self.runq_task[task] + task_name_suffix
215 return "%s, %s" % (fn, taskname)
217 def get_task_id(self, fnid, taskname):
218 for listid in xrange(len(self.runq_fnid)):
219 if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname:
223 def circular_depchains_handler(self, tasks):
225 Some tasks aren't buildable, likely due to circular dependency issues.
226 Identify the circular dependencies and print them in a user readable format.
228 from copy import deepcopy
234 def chain_reorder(chain):
236 Reorder a dependency chain so the lowest task id is first
240 for entry in xrange(len(chain)):
241 if chain[entry] < chain[lowest]:
243 new_chain.extend(chain[lowest:])
244 new_chain.extend(chain[:lowest])
247 def chain_compare_equal(chain1, chain2):
249 Compare two dependency chains and see if they're the same
251 if len(chain1) != len(chain2):
253 for index in xrange(len(chain1)):
254 if chain1[index] != chain2[index]:
258 def chain_array_contains(chain, chain_array):
260 Return True if chain_array contains chain
262 for ch in chain_array:
263 if chain_compare_equal(ch, chain):
267 def find_chains(taskid, prev_chain):
268 prev_chain.append(taskid)
270 total_deps.extend(self.runq_revdeps[taskid])
271 for revdep in self.runq_revdeps[taskid]:
272 if revdep in prev_chain:
273 idx = prev_chain.index(revdep)
274 # To prevent duplicates, reorder the chain to start with the lowest taskid
275 # and search through an array of those we've already printed
276 chain = prev_chain[idx:]
277 new_chain = chain_reorder(chain)
278 if not chain_array_contains(new_chain, valid_chains):
279 valid_chains.append(new_chain)
280 msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
281 for dep in new_chain:
282 msgs.append(" Task %s (%s) (dependent Tasks %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends_names(dep)))
284 if len(valid_chains) > 10:
285 msgs.append("Aborted dependency loops search after 10 matches.\n")
289 if revdep not in explored_deps:
291 elif revdep in explored_deps[revdep]:
294 for dep in prev_chain:
295 if dep in explored_deps[revdep]:
298 find_chains(revdep, copy.deepcopy(prev_chain))
299 for dep in explored_deps[revdep]:
300 if dep not in total_deps:
301 total_deps.append(dep)
303 explored_deps[taskid] = total_deps
306 find_chains(task, [])
310 def calculate_task_weights(self, endpoints):
312 Calculate a number representing the "weight" of each task. Heavier weighted tasks
313 have more dependencies and hence should be executed sooner for maximum speed.
315 This function also sanity checks the task list finding tasks that are not
316 possible to execute due to circular dependencies.
319 numTasks = len(self.runq_fnid)
324 for listid in xrange(numTasks):
325 task_done.append(False)
327 deps_left.append(len(self.runq_revdeps[listid]))
329 for listid in endpoints:
331 task_done[listid] = True
335 for listid in endpoints:
336 for revdep in self.runq_depends[listid]:
337 weight[revdep] = weight[revdep] + weight[listid]
338 deps_left[revdep] = deps_left[revdep] - 1
339 if deps_left[revdep] == 0:
340 next_points.append(revdep)
341 task_done[revdep] = True
342 endpoints = next_points
343 if len(next_points) == 0:
346 # Circular dependency sanity check
348 for task in xrange(numTasks):
349 if task_done[task] is False or deps_left[task] != 0:
350 problem_tasks.append(task)
351 logger.debug(2, "Task %s (%s) is not buildable", task, self.get_user_idstring(task))
352 logger.debug(2, "(Complete marker was %s and the remaining dependency count was %s)\n", task_done[task], deps_left[task])
355 message = "Unbuildable tasks were found.\n"
356 message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
357 message = message + "Identifying dependency loops (this may take a short while)...\n"
358 logger.error(message)
360 msgs = self.circular_depchains_handler(problem_tasks)
364 message = message + msg
365 bb.msg.fatal("RunQueue", message)
371 Turn a set of taskData into a RunQueue and compute data needed
372 to optimise the execution order.
376 recursive_tdepends = {}
377 runq_recrdepends = []
380 taskData = self.taskData
382 if len(taskData.tasks_name) == 0:
386 logger.info("Preparing runqueue")
388 # Step A - Work out a list of tasks to run
390 # Taskdata gives us a list of possible providers for every build and run
391 # target ordered by priority. It also gives information on each of those
394 # To create the actual list of tasks to execute we fix the list of
395 # providers and then resolve the dependencies into task IDs. This
396 # process is repeated for each type of dependency (tdepends, deptask,
397 # rdeptast, recrdeptask, idepends).
399 def add_build_dependencies(depids, tasknames, depends):
401 # Won't be in build_targets if ASSUME_PROVIDED
402 if depid not in taskData.build_targets:
404 depdata = taskData.build_targets[depid][0]
407 dep = taskData.fn_index[depdata]
408 for taskname in tasknames:
409 taskid = taskData.gettask_id(dep, taskname, False)
410 if taskid is not None:
411 depends.append(taskid)
413 def add_runtime_dependencies(depids, tasknames, depends):
415 if depid not in taskData.run_targets:
417 depdata = taskData.run_targets[depid][0]
420 dep = taskData.fn_index[depdata]
421 for taskname in tasknames:
422 taskid = taskData.gettask_id(dep, taskname, False)
423 if taskid is not None:
424 depends.append(taskid)
426 for task in xrange(len(taskData.tasks_name)):
429 fnid = taskData.tasks_fnid[task]
430 fn = taskData.fn_index[fnid]
431 task_deps = self.dataCache.task_deps[fn]
433 logger.debug(2, "Processing %s:%s", fn, taskData.tasks_name[task])
435 if fnid not in taskData.failed_fnids:
437 # Resolve task internal dependencies
439 # e.g. addtask before X after Y
440 depends = taskData.tasks_tdepends[task]
442 # Resolve 'deptask' dependencies
444 # e.g. do_sometask[deptask] = "do_someothertask"
445 # (makes sure sometask runs after someothertask of all DEPENDS)
446 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
447 tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
448 add_build_dependencies(taskData.depids[fnid], tasknames, depends)
450 # Resolve 'rdeptask' dependencies
452 # e.g. do_sometask[rdeptask] = "do_someothertask"
453 # (makes sure sometask runs after someothertask of all RDEPENDS)
454 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
455 taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
456 add_runtime_dependencies(taskData.rdepids[fnid], [taskname], depends)
458 # Resolve inter-task dependencies
460 # e.g. do_sometask[depends] = "targetname:do_someothertask"
461 # (makes sure sometask runs after targetname's someothertask)
462 if fnid not in tdepends_fnid:
463 tdepends_fnid[fnid] = set()
464 idepends = taskData.tasks_idepends[task]
465 for (depid, idependtask) in idepends:
466 if depid in taskData.build_targets:
467 # Won't be in build_targets if ASSUME_PROVIDED
468 depdata = taskData.build_targets[depid][0]
469 if depdata is not None:
470 dep = taskData.fn_index[depdata]
471 taskid = taskData.gettask_id(dep, idependtask, False)
473 bb.msg.fatal("RunQueue", "Task %s in %s depends upon nonexistant task %s in %s" % (taskData.tasks_name[task], fn, idependtask, dep))
474 depends.append(taskid)
476 tdepends_fnid[fnid].add(taskid)
479 # Resolve recursive 'recrdeptask' dependencies (A)
481 # e.g. do_sometask[recrdeptask] = "do_someothertask"
482 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
483 # We cover the recursive part of the dependencies below
484 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
485 for taskname in task_deps['recrdeptask'][taskData.tasks_name[task]].split():
486 recrdepends.append(taskname)
487 add_build_dependencies(taskData.depids[fnid], [taskname], depends)
488 add_runtime_dependencies(taskData.rdepids[fnid], [taskname], depends)
490 # Rmove all self references
493 logger.debug(2, "Task %s (%s %s) contains self reference! %s", task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends)
499 self.runq_fnid.append(taskData.tasks_fnid[task])
500 self.runq_task.append(taskData.tasks_name[task])
501 self.runq_depends.append(set(depends))
502 self.runq_revdeps.append(set())
503 self.runq_hash.append("")
506 runq_recrdepends.append(recrdepends)
509 # Build a list of recursive cumulative dependencies for each fnid
510 # We do this by fnid, since if A depends on some task in B
511 # we're interested in later tasks B's fnid might have but B itself
514 # Algorithm is O(tasks) + O(tasks)*O(fnids)
517 for task in xrange(len(self.runq_fnid)):
518 fnid = self.runq_fnid[task]
519 if fnid not in reccumdepends:
520 if fnid in tdepends_fnid:
521 reccumdepends[fnid] = tdepends_fnid[fnid]
523 reccumdepends[fnid] = set()
524 reccumdepends[fnid].update(self.runq_depends[task])
525 for task in xrange(len(self.runq_fnid)):
526 taskfnid = self.runq_fnid[task]
527 for fnid in reccumdepends:
528 if task in reccumdepends[fnid]:
529 reccumdepends[fnid].add(task)
530 if taskfnid in reccumdepends:
531 reccumdepends[fnid].update(reccumdepends[taskfnid])
534 # Resolve recursive 'recrdeptask' dependencies (B)
536 # e.g. do_sometask[recrdeptask] = "do_someothertask"
537 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
538 for task in xrange(len(self.runq_fnid)):
539 if len(runq_recrdepends[task]) > 0:
540 taskfnid = self.runq_fnid[task]
541 for dep in reccumdepends[taskfnid]:
542 # Ignore self references
545 for taskname in runq_recrdepends[task]:
546 if taskData.tasks_name[dep] == taskname:
547 self.runq_depends[task].add(dep)
549 # Step B - Mark all active tasks
551 # Start with the tasks we were asked to run and mark all dependencies
552 # as active too. If the task is to be 'forced', clear its stamp. Once
553 # all active tasks are marked, prune the ones we don't need.
555 logger.verbose("Marking Active Tasks")
557 def mark_active(listid, depth):
559 Mark an item as active along with its depends
560 (calls itself recursively)
563 if runq_build[listid] == 1:
566 runq_build[listid] = 1
568 depends = self.runq_depends[listid]
569 for depend in depends:
570 mark_active(depend, depth+1)
572 self.target_pairs = []
573 for target in self.targets:
574 targetid = taskData.getbuild_id(target[0])
576 if targetid not in taskData.build_targets:
579 if targetid in taskData.failed_deps:
582 fnid = taskData.build_targets[targetid][0]
583 fn = taskData.fn_index[fnid]
584 self.target_pairs.append((fn, target[1]))
586 if fnid in taskData.failed_fnids:
589 if target[1] not in taskData.tasks_lookup[fnid]:
590 bb.msg.fatal("RunQueue", "Task %s does not exist for target %s" % (target[1], target[0]))
592 listid = taskData.tasks_lookup[fnid][target[1]]
594 mark_active(listid, 1)
596 # Step C - Prune all inactive tasks
598 # Once all active tasks are marked, prune the ones we don't need.
602 for listid in xrange(len(self.runq_fnid)):
603 if runq_build[listid-delcount] == 1:
604 maps.append(listid-delcount)
606 del self.runq_fnid[listid-delcount]
607 del self.runq_task[listid-delcount]
608 del self.runq_depends[listid-delcount]
609 del runq_build[listid-delcount]
610 del self.runq_revdeps[listid-delcount]
611 del self.runq_hash[listid-delcount]
612 delcount = delcount + 1
616 # Step D - Sanity checks and computation
619 # Check to make sure we still have tasks to run
620 if len(self.runq_fnid) == 0:
621 if not taskData.abort:
622 bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
624 bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.")
626 logger.verbose("Pruned %s inactive tasks, %s left", delcount, len(self.runq_fnid))
628 # Remap the dependencies to account for the deleted tasks
629 # Check we didn't delete a task we depend on
630 for listid in xrange(len(self.runq_fnid)):
632 origdeps = self.runq_depends[listid]
633 for origdep in origdeps:
634 if maps[origdep] == -1:
635 bb.msg.fatal("RunQueue", "Invalid mapping - Should never happen!")
636 newdeps.append(maps[origdep])
637 self.runq_depends[listid] = set(newdeps)
639 logger.verbose("Assign Weightings")
641 # Generate a list of reverse dependencies to ease future calculations
642 for listid in xrange(len(self.runq_fnid)):
643 for dep in self.runq_depends[listid]:
644 self.runq_revdeps[dep].add(listid)
646 # Identify tasks at the end of dependency chains
647 # Error on circular dependency loops (length two)
649 for listid in xrange(len(self.runq_fnid)):
650 revdeps = self.runq_revdeps[listid]
651 if len(revdeps) == 0:
652 endpoints.append(listid)
654 if dep in self.runq_depends[listid]:
655 #self.dump_data(taskData)
656 bb.msg.fatal("RunQueue", "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep], taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
658 logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
660 # Calculate task weights
661 # Check of higher length circular dependencies
662 self.runq_weight = self.calculate_task_weights(endpoints)
664 # Sanity Check - Check for multiple tasks building the same provider
667 for task in xrange(len(self.runq_fnid)):
668 fn = taskData.fn_index[self.runq_fnid[task]]
672 for prov in self.dataCache.fn_provides[fn]:
673 if prov not in prov_list:
674 prov_list[prov] = [fn]
675 elif fn not in prov_list[prov]:
676 prov_list[prov].append(fn)
678 for prov in prov_list:
679 if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
681 logger.error("Multiple .bb files are due to be built which each provide %s (%s).\n This usually means one provides something the other doesn't and should.", prov, " ".join(prov_list[prov]))
684 # Create a whitelist usable by the stamp checks
685 stampfnwhitelist = []
686 for entry in self.stampwhitelist.split():
687 entryid = self.taskData.getbuild_id(entry)
688 if entryid not in self.taskData.build_targets:
690 fnid = self.taskData.build_targets[entryid][0]
691 fn = self.taskData.fn_index[fnid]
692 stampfnwhitelist.append(fn)
693 self.stampfnwhitelist = stampfnwhitelist
695 # Interate over the task list looking for tasks with a 'setscene' function
696 self.runq_setscene = []
697 for task in range(len(self.runq_fnid)):
698 setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
701 self.runq_setscene.append(task)
703 # Interate over the task list and call into the siggen code
705 todeal = set(range(len(self.runq_fnid)))
706 while len(todeal) > 0:
707 for task in todeal.copy():
708 if len(self.runq_depends[task] - dealtwith) == 0:
712 for dep in self.runq_depends[task]:
713 procdep.append(self.taskData.fn_index[self.runq_fnid[dep]] + "." + self.runq_task[dep])
714 self.runq_hash[task] = bb.parse.siggen.get_taskhash(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task], procdep, self.dataCache)
718 for task in xrange(len(self.runq_fnid)):
719 identifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[task]],
720 self.runq_task[task])
721 self.hashes[identifier] = self.runq_hash[task]
723 for dep in self.runq_depends[task]:
724 depidentifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[dep]],
726 deps.append(depidentifier)
727 self.hash_deps[identifier] = deps
729 # Remove stamps for targets if force mode active
730 if self.cooker.configuration.force:
731 for (fn, target) in self.target_pairs:
732 logger.verbose("Remove stamp %s, %s", target, fn)
733 bb.build.del_stamp(target, self.dataCache, fn)
735 return len(self.runq_fnid)
737 def dump_data(self, taskQueue):
739 Dump some debug information on the internal data structures
741 logger.debug(3, "run_tasks:")
742 for task in xrange(len(self.rqdata.runq_task)):
743 logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
744 taskQueue.fn_index[self.rqdata.runq_fnid[task]],
745 self.rqdata.runq_task[task],
746 self.rqdata.runq_weight[task],
747 self.rqdata.runq_depends[task],
748 self.rqdata.runq_revdeps[task])
750 logger.debug(3, "sorted_tasks:")
751 for task1 in xrange(len(self.rqdata.runq_task)):
752 if task1 in self.prio_map:
753 task = self.prio_map[task1]
754 logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
755 taskQueue.fn_index[self.rqdata.runq_fnid[task]],
756 self.rqdata.runq_task[task],
757 self.rqdata.runq_weight[task],
758 self.rqdata.runq_depends[task],
759 self.rqdata.runq_revdeps[task])
762 def __init__(self, cooker, cfgData, dataCache, taskData, targets):
765 self.cfgData = cfgData
766 self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets)
768 self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, True) or "perfile"
769 self.hashvalidate = bb.data.getVar("BB_HASHCHECK_FUNCTION", cfgData, True) or None
770 self.setsceneverify = bb.data.getVar("BB_SETSCENE_VERIFY_FUNCTION", cfgData, True) or None
772 self.state = runQueuePrepare
774 def check_stamps(self):
780 if self.stamppolicy == "perfile":
785 if self.stamppolicy == "whitelist":
786 stampwhitelist = self.rqdata.stampfnwhitelist
788 for task in xrange(len(self.rqdata.runq_fnid)):
790 if len(self.rqdata.runq_depends[task]) == 0:
791 buildable.append(task)
793 def check_buildable(self, task, buildable):
794 for revdep in self.rqdata.runq_revdeps[task]:
796 for dep in self.rqdata.runq_depends[revdep]:
800 if revdep in unchecked:
801 buildable.append(revdep)
803 for task in xrange(len(self.rqdata.runq_fnid)):
804 if task not in unchecked:
806 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
807 taskname = self.rqdata.runq_task[task]
808 stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
809 # If the stamp is missing its not current
810 if not os.access(stampfile, os.F_OK):
812 notcurrent.append(task)
813 check_buildable(self, task, buildable)
815 # If its a 'nostamp' task, it's not current
816 taskdep = self.rqdata.dataCache.task_deps[fn]
817 if 'nostamp' in taskdep and task in taskdep['nostamp']:
819 notcurrent.append(task)
820 check_buildable(self, task, buildable)
823 while (len(buildable) > 0):
825 for task in buildable:
826 if task in unchecked:
827 fn = self.taskData.fn_index[self.rqdata.runq_fnid[task]]
828 taskname = self.rqdata.runq_task[task]
829 stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
832 t1 = os.stat(stampfile)[stat.ST_MTIME]
833 for dep in self.rqdata.runq_depends[task]:
835 fn2 = self.taskData.fn_index[self.rqdata.runq_fnid[dep]]
836 taskname2 = self.rqdata.runq_task[dep]
837 stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
838 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
839 if dep in notcurrent:
842 t2 = os.stat(stampfile2)[stat.ST_MTIME]
849 notcurrent.append(task)
851 check_buildable(self, task, nextbuildable)
853 buildable = nextbuildable
855 #for task in range(len(self.runq_fnid)):
856 # fn = self.taskData.fn_index[self.runq_fnid[task]]
857 # taskname = self.runq_task[task]
858 # print "%s %s.%s" % (task, taskname, fn)
860 #print "Unchecked: %s" % unchecked
861 #print "Current: %s" % current
862 #print "Not current: %s" % notcurrent
864 if len(unchecked) > 0:
865 bb.msg.fatal("RunQueue", "check_stamps fatal internal error")
868 def check_stamp_task(self, task, taskname = None):
869 def get_timestamp(f):
871 if not os.access(f, os.F_OK):
873 return os.stat(f)[stat.ST_MTIME]
877 if self.stamppolicy == "perfile":
882 if self.stamppolicy == "whitelist":
883 stampwhitelist = self.rqdata.stampfnwhitelist
885 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
887 taskname = self.rqdata.runq_task[task]
889 stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
891 # If the stamp is missing its not current
892 if not os.access(stampfile, os.F_OK):
893 logger.debug(2, "Stampfile %s not available", stampfile)
895 # If its a 'nostamp' task, it's not current
896 taskdep = self.rqdata.dataCache.task_deps[fn]
897 if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
898 logger.debug(2, "%s.%s is nostamp\n", fn, taskname)
901 if taskname != "do_setscene" and taskname.endswith("_setscene"):
905 t1 = get_timestamp(stampfile)
906 for dep in self.rqdata.runq_depends[task]:
908 fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]]
909 taskname2 = self.rqdata.runq_task[dep]
910 stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
911 stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCache, fn2)
912 t2 = get_timestamp(stampfile2)
913 t3 = get_timestamp(stampfile3)
916 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
918 logger.debug(2, 'Stampfile %s does not exist', stampfile2)
921 logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2)
926 def execute_runqueue(self):
928 Run the tasks in a queue prepared by rqdata.prepare()
929 Upon failure, optionally try to recover the build using any alternate providers
930 (if the abort on failure configuration option isn't set)
935 if self.state is runQueuePrepare:
936 self.rqexe = RunQueueExecuteDummy(self)
937 if self.rqdata.prepare() == 0:
938 self.state = runQueueComplete
940 self.state = runQueueSceneInit
942 if self.state is runQueueSceneInit:
943 if self.cooker.configuration.dump_signatures:
944 self.dump_signatures()
946 self.rqexe = RunQueueExecuteScenequeue(self)
948 if self.state is runQueueSceneRun:
949 retval = self.rqexe.execute()
951 if self.state is runQueueRunInit:
952 logger.info("Executing RunQueue Tasks")
953 self.rqexe = RunQueueExecuteTasks(self)
954 self.state = runQueueRunning
956 if self.state is runQueueRunning:
957 retval = self.rqexe.execute()
959 if self.state is runQueueCleanUp:
962 if self.state is runQueueFailed:
963 if not self.rqdata.taskData.tryaltconfigs:
964 raise bb.runqueue.TaskFailure(self.rqexe.failed_fnids)
965 for fnid in self.rqexe.failed_fnids:
966 self.rqdata.taskData.fail_fnid(fnid)
969 if self.state is runQueueComplete:
971 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
974 if self.state is runQueueChildProcess:
975 print("Child process, eeek, shouldn't happen!")
981 def finish_runqueue(self, now = False):
983 self.rqexe.finish_now()
987 def dump_signatures(self):
988 self.state = runQueueComplete
990 bb.note("Reparsing files to collect dependency data")
991 for task in range(len(self.rqdata.runq_fnid)):
992 if self.rqdata.runq_fnid[task] not in done:
993 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
994 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
995 done.add(self.rqdata.runq_fnid[task])
997 bb.parse.siggen.dump_sigs(self.rqdata.dataCache)
1002 class RunQueueExecute:
1004 def __init__(self, rq):
1006 self.cooker = rq.cooker
1007 self.cfgData = rq.cfgData
1008 self.rqdata = rq.rqdata
1010 self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", self.cfgData, 1) or 1)
1011 self.scheduler = bb.data.getVar("BB_SCHEDULER", self.cfgData, 1) or "speed"
1013 self.runq_buildable = []
1014 self.runq_running = []
1015 self.runq_complete = []
1016 self.build_pids = {}
1017 self.build_pipes = {}
1018 self.build_stamps = {}
1019 self.failed_fnids = []
1021 def runqueue_process_waitpid(self):
1023 Return none is there are no processes awaiting result collection, otherwise
1024 collect the process exit codes and close the information pipe.
1026 result = os.waitpid(-1, os.WNOHANG)
1027 if result[0] == 0 and result[1] == 0:
1029 task = self.build_pids[result[0]]
1030 del self.build_pids[result[0]]
1031 self.build_pipes[result[0]].close()
1032 del self.build_pipes[result[0]]
1033 # self.build_stamps[result[0]] may not exist when use shared work directory.
1034 if result[0] in self.build_stamps.keys():
1035 del self.build_stamps[result[0]]
1037 self.task_fail(task, result[1]>>8)
1039 self.task_complete(task)
1042 def finish_now(self):
1043 if self.stats.active:
1044 logger.info("Sending SIGTERM to remaining %s tasks", self.stats.active)
1045 for k, v in self.build_pids.iteritems():
1047 os.kill(-k, signal.SIGTERM)
1050 for pipe in self.build_pipes:
1051 self.build_pipes[pipe].read()
1054 self.rq.state = runQueueCleanUp
1056 for pipe in self.build_pipes:
1057 self.build_pipes[pipe].read()
1059 if self.stats.active > 0:
1060 bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1061 self.runqueue_process_waitpid()
1064 if len(self.failed_fnids) != 0:
1065 self.rq.state = runQueueFailed
1068 self.rq.state = runQueueComplete
1071 def fork_off_task(self, fn, task, taskname, quieterrors=False):
1072 # We need to setup the environment BEFORE the fork, since
1073 # a fork() or exec*() activates PSEUDO...
1079 taskdep = self.rqdata.dataCache.task_deps[fn]
1080 if 'umask' in taskdep and taskname in taskdep['umask']:
1081 # umask might come in as a number or text string..
1083 umask = int(taskdep['umask'][taskname],8)
1085 umask = taskdep['umask'][taskname]
1087 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
1088 envvars = (self.rqdata.dataCache.fakerootenv[fn] or "").split()
1089 for key, value in (var.split('=') for var in envvars):
1090 envbackup[key] = os.environ.get(key)
1091 os.environ[key] = value
1092 fakeenv[key] = value
1094 fakedirs = (self.rqdata.dataCache.fakerootdirs[fn] or "").split()
1096 bb.utils.mkdirhier(p)
1098 logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
1099 (fn, taskname, ', '.join(fakedirs)))
1101 envvars = (self.rqdata.dataCache.fakerootnoenv[fn] or "").split()
1102 for key, value in (var.split('=') for var in envvars):
1103 envbackup[key] = os.environ.get(key)
1104 os.environ[key] = value
1105 fakeenv[key] = value
1110 pipein, pipeout = os.pipe()
1111 pipein = os.fdopen(pipein, 'rb', 4096)
1112 pipeout = os.fdopen(pipeout, 'wb', 0)
1114 except OSError as e:
1115 bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
1120 # Save out the PID so that the event can include it the
1122 bb.event.worker_pid = os.getpid()
1123 bb.event.worker_pipe = pipeout
1125 self.rq.state = runQueueChildProcess
1126 # Make the child the process group leader
1129 newsi = os.open(os.devnull, os.O_RDWR)
1130 os.dup2(newsi, sys.stdin.fileno())
1135 bb.data.setVar("BB_WORKERCONTEXT", "1", self.cooker.configuration.data)
1136 bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
1137 bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data)
1138 bb.parse.siggen.set_taskdata(self.rqdata.hashes, self.rqdata.hash_deps)
1141 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
1142 the_data.setVar('BB_TASKHASH', self.rqdata.runq_hash[task])
1143 for h in self.rqdata.hashes:
1144 the_data.setVar("BBHASH_%s" % h, self.rqdata.hashes[h])
1145 for h in self.rqdata.hash_deps:
1146 the_data.setVar("BBHASHDEPS_%s" % h, self.rqdata.hash_deps[h])
1148 # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
1149 # successfully. We also need to unset anything from the environment which shouldn't be there
1150 exports = bb.data.exported_vars(the_data)
1151 bb.utils.empty_environment()
1152 for e, v in exports:
1155 os.environ[e] = fakeenv[e]
1156 the_data.setVar(e, fakeenv[e])
1159 the_data.setVarFlag(taskname, "quieterrors", "1")
1161 except Exception as exc:
1163 logger.critical(str(exc))
1166 ret = bb.build.exec_task(fn, taskname, the_data)
1171 for key, value in envbackup.iteritems():
1175 os.environ[key] = value
1177 return pid, pipein, pipeout
1179 class RunQueueExecuteDummy(RunQueueExecute):
1180 def __init__(self, rq):
1182 self.stats = RunQueueStats(0)
1185 self.rq.state = runQueueComplete
1188 class RunQueueExecuteTasks(RunQueueExecute):
1189 def __init__(self, rq):
1190 RunQueueExecute.__init__(self, rq)
1192 self.stats = RunQueueStats(len(self.rqdata.runq_fnid))
1194 # Mark initial buildable tasks
1195 for task in xrange(self.stats.total):
1196 self.runq_running.append(0)
1197 self.runq_complete.append(0)
1198 if len(self.rqdata.runq_depends[task]) == 0:
1199 self.runq_buildable.append(1)
1201 self.runq_buildable.append(0)
1202 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
1203 self.rq.scenequeue_covered.add(task)
1208 for task in xrange(self.stats.total):
1209 if task in self.rq.scenequeue_covered:
1211 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
1213 for revdep in self.rqdata.runq_revdeps[task]:
1214 if self.rqdata.runq_fnid[task] != self.rqdata.runq_fnid[revdep]:
1219 self.rq.scenequeue_covered.add(task)
1221 logger.debug(1, 'Skip list (pre setsceneverify) %s', sorted(self.rq.scenequeue_covered))
1223 # Allow the metadata to elect for setscene tasks to run anyway
1224 covered_remove = set()
1225 if self.rq.setsceneverify:
1226 call = self.rq.setsceneverify + "(covered, tasknames, fnids, fns, d)"
1227 locs = { "covered" : self.rq.scenequeue_covered, "tasknames" : self.rqdata.runq_task, "fnids" : self.rqdata.runq_fnid, "fns" : self.rqdata.taskData.fn_index, "d" : self.cooker.configuration.data }
1228 covered_remove = bb.utils.better_eval(call, locs)
1230 for task in covered_remove:
1231 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1232 taskname = self.rqdata.runq_task[task] + '_setscene'
1233 bb.build.del_stamp(taskname, self.rqdata.dataCache, fn)
1234 logger.debug(1, 'Not skipping task %s due to setsceneverify', task)
1235 self.rq.scenequeue_covered.remove(task)
1237 logger.debug(1, 'Full skip list %s', self.rq.scenequeue_covered)
1239 event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
1241 schedulers = self.get_schedulers()
1242 for scheduler in schedulers:
1243 if self.scheduler == scheduler.name:
1244 self.sched = scheduler(self, self.rqdata)
1245 logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name)
1248 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
1249 (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1252 def get_schedulers(self):
1253 schedulers = set(obj for obj in globals().values()
1254 if type(obj) is type and
1255 issubclass(obj, RunQueueScheduler))
1257 user_schedulers = bb.data.getVar("BB_SCHEDULERS", self.cfgData, True)
1259 for sched in user_schedulers.split():
1260 if not "." in sched:
1261 bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
1264 modname, name = sched.rsplit(".", 1)
1266 module = __import__(modname, fromlist=(name,))
1267 except ImportError as exc:
1268 logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
1271 schedulers.add(getattr(module, name))
1274 def task_completeoutright(self, task):
1276 Mark a task as completed
1277 Look at the reverse dependencies and mark any task with
1278 completed dependencies as buildable
1280 self.runq_complete[task] = 1
1281 for revdep in self.rqdata.runq_revdeps[task]:
1282 if self.runq_running[revdep] == 1:
1284 if self.runq_buildable[revdep] == 1:
1287 for dep in self.rqdata.runq_depends[revdep]:
1288 if self.runq_complete[dep] != 1:
1291 self.runq_buildable[revdep] = 1
1292 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
1293 taskname = self.rqdata.runq_task[revdep]
1294 logger.debug(1, "Marking task %s (%s, %s) as buildable", revdep, fn, taskname)
1296 def task_complete(self, task):
1297 self.stats.taskCompleted()
1298 bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1299 self.task_completeoutright(task)
1301 def task_fail(self, task, exitcode):
1303 Called when a task has failed
1304 Updates the state engine with the failure
1306 self.stats.taskFailed()
1307 fnid = self.rqdata.runq_fnid[task]
1308 self.failed_fnids.append(fnid)
1309 bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq), self.cfgData)
1310 if self.rqdata.taskData.abort:
1311 self.rq.state = runQueueCleanUp
1313 def task_skip(self, task):
1314 self.runq_running[task] = 1
1315 self.runq_buildable[task] = 1
1316 self.task_completeoutright(task)
1317 self.stats.taskCompleted()
1318 self.stats.taskSkipped()
1322 Run the tasks in a queue prepared by rqdata.prepare()
1325 if self.stats.total == 0:
1327 self.rq.state = runQueueCleanUp
1329 task = self.sched.next()
1330 if task is not None:
1331 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1332 taskname = self.rqdata.runq_task[task]
1334 if task in self.rq.scenequeue_covered:
1335 logger.debug(2, "Setscene covered task %s (%s)", task,
1336 self.rqdata.get_user_idstring(task))
1337 self.task_skip(task)
1340 if self.rq.check_stamp_task(task, taskname):
1341 logger.debug(2, "Stamp current task %s (%s)", task,
1342 self.rqdata.get_user_idstring(task))
1343 self.task_skip(task)
1345 elif self.cooker.configuration.dry_run:
1346 self.runq_running[task] = 1
1347 self.runq_buildable[task] = 1
1348 self.stats.taskActive()
1349 self.task_complete(task)
1352 taskdep = self.rqdata.dataCache.task_deps[fn]
1353 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1354 startevent = runQueueTaskStarted(task, self.stats, self.rq,
1356 bb.event.fire(startevent, self.cfgData)
1357 self.runq_running[task] = 1
1358 self.stats.taskActive()
1359 bb.build.make_stamp(taskname, self.rqdata.dataCache, fn)
1360 self.task_complete(task)
1363 startevent = runQueueTaskStarted(task, self.stats, self.rq)
1364 bb.event.fire(startevent, self.cfgData)
1366 pid, pipein, pipeout = self.fork_off_task(fn, task, taskname)
1368 self.build_pids[pid] = task
1369 self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
1370 self.build_stamps[pid] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
1371 self.runq_running[task] = 1
1372 self.stats.taskActive()
1373 if self.stats.active < self.number_tasks:
1376 for pipe in self.build_pipes:
1377 self.build_pipes[pipe].read()
1379 if self.stats.active > 0:
1380 if self.runqueue_process_waitpid() is None:
1384 if len(self.failed_fnids) != 0:
1385 self.rq.state = runQueueFailed
1389 for task in xrange(self.stats.total):
1390 if self.runq_buildable[task] == 0:
1391 logger.error("Task %s never buildable!", task)
1392 if self.runq_running[task] == 0:
1393 logger.error("Task %s never ran!", task)
1394 if self.runq_complete[task] == 0:
1395 logger.error("Task %s never completed!", task)
1396 self.rq.state = runQueueComplete
1399 class RunQueueExecuteScenequeue(RunQueueExecute):
1400 def __init__(self, rq):
1401 RunQueueExecute.__init__(self, rq)
1403 self.scenequeue_covered = set()
1404 self.scenequeue_notcovered = set()
1406 # If we don't have any setscene functions, skip this step
1407 if len(self.rqdata.runq_setscene) == 0:
1408 rq.scenequeue_covered = set()
1409 rq.state = runQueueRunInit
1412 self.stats = RunQueueStats(len(self.rqdata.runq_setscene))
1417 sq_revdeps_squash = []
1419 # We need to construct a dependency graph for the setscene functions. Intermediate
1420 # dependencies between the setscene tasks only complicate the code. This code
1421 # therefore aims to collapse the huge runqueue dependency tree into a smaller one
1422 # only containing the setscene functions.
1424 for task in xrange(self.stats.total):
1425 self.runq_running.append(0)
1426 self.runq_complete.append(0)
1427 self.runq_buildable.append(0)
1429 for task in xrange(len(self.rqdata.runq_fnid)):
1430 sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task]))
1431 sq_revdeps_new.append(set())
1432 if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
1433 endpoints[task] = None
1435 for task in self.rqdata.runq_setscene:
1436 for dep in self.rqdata.runq_depends[task]:
1437 endpoints[dep] = task
1439 def process_endpoints(endpoints):
1441 for point, task in endpoints.items():
1445 if sq_revdeps_new[point]:
1446 tasks |= sq_revdeps_new[point]
1447 sq_revdeps_new[point] = set()
1448 for dep in self.rqdata.runq_depends[point]:
1449 if point in sq_revdeps[dep]:
1450 sq_revdeps[dep].remove(point)
1452 sq_revdeps_new[dep] |= tasks
1453 if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene:
1454 newendpoints[dep] = task
1455 if len(newendpoints) != 0:
1456 process_endpoints(newendpoints)
1458 process_endpoints(endpoints)
1460 for task in xrange(len(self.rqdata.runq_fnid)):
1461 if task in self.rqdata.runq_setscene:
1463 for dep in sq_revdeps_new[task]:
1464 deps.add(self.rqdata.runq_setscene.index(dep))
1465 sq_revdeps_squash.append(deps)
1466 elif len(sq_revdeps_new[task]) != 0:
1467 bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
1469 #for task in xrange(len(sq_revdeps_squash)):
1470 # print "Task %s: %s.%s is %s " % (task, self.taskData.fn_index[self.runq_fnid[self.runq_setscene[task]]], self.runq_task[self.runq_setscene[task]] + "_setscene", sq_revdeps_squash[task])
1473 self.sq_revdeps = sq_revdeps_squash
1474 self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps)
1476 for task in xrange(len(self.sq_revdeps)):
1477 self.sq_deps.append(set())
1478 for task in xrange(len(self.sq_revdeps)):
1479 for dep in self.sq_revdeps[task]:
1480 self.sq_deps[dep].add(task)
1482 for task in xrange(len(self.sq_revdeps)):
1483 if len(self.sq_revdeps[task]) == 0:
1484 self.runq_buildable[task] = 1
1486 if self.rq.hashvalidate:
1494 for task in xrange(len(self.sq_revdeps)):
1495 realtask = self.rqdata.runq_setscene[task]
1496 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1497 taskname = self.rqdata.runq_task[realtask]
1498 taskdep = self.rqdata.dataCache.task_deps[fn]
1500 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1502 self.task_skip(task)
1503 bb.build.make_stamp(taskname + "_setscene", self.rqdata.dataCache, fn)
1506 if self.rq.check_stamp_task(realtask, taskname + "_setscene"):
1507 logger.debug(2, 'Setscene stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(realtask))
1508 stamppresent.append(task)
1509 self.task_skip(task)
1513 sq_hashfn.append(self.rqdata.dataCache.hashfn[fn])
1514 sq_hash.append(self.rqdata.runq_hash[realtask])
1515 sq_taskname.append(taskname)
1516 sq_task.append(task)
1517 call = self.rq.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
1518 locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.configuration.data }
1519 valid = bb.utils.better_eval(call, locs)
1521 valid_new = stamppresent
1523 valid_new.append(sq_task[v])
1525 for task in xrange(len(self.sq_revdeps)):
1526 if task not in valid_new and task not in noexec:
1527 realtask = self.rqdata.runq_setscene[task]
1528 logger.debug(2, 'No package found, so skipping setscene task %s',
1529 self.rqdata.get_user_idstring(realtask))
1530 self.task_failoutright(task)
1532 logger.info('Executing SetScene Tasks')
1534 self.rq.state = runQueueSceneRun
1536 def scenequeue_updatecounters(self, task):
1537 for dep in self.sq_deps[task]:
1538 self.sq_revdeps2[dep].remove(task)
1539 if len(self.sq_revdeps2[dep]) == 0:
1540 self.runq_buildable[dep] = 1
1542 def task_completeoutright(self, task):
1544 Mark a task as completed
1545 Look at the reverse dependencies and mark any task with
1546 completed dependencies as buildable
1549 index = self.rqdata.runq_setscene[task]
1550 logger.debug(1, 'Found task %s which could be accelerated',
1551 self.rqdata.get_user_idstring(index))
1553 self.scenequeue_covered.add(task)
1554 self.scenequeue_updatecounters(task)
1556 def task_complete(self, task):
1557 self.stats.taskCompleted()
1558 self.task_completeoutright(task)
1560 def task_fail(self, task, result):
1561 self.stats.taskFailed()
1562 index = self.rqdata.runq_setscene[task]
1563 bb.event.fire(sceneQueueTaskFailed(index, self.stats, result, self), self.cfgData)
1564 self.scenequeue_notcovered.add(task)
1565 self.scenequeue_updatecounters(task)
1567 def task_failoutright(self, task):
1568 self.runq_running[task] = 1
1569 self.runq_buildable[task] = 1
1570 self.stats.taskCompleted()
1571 self.stats.taskSkipped()
1572 index = self.rqdata.runq_setscene[task]
1573 self.scenequeue_notcovered.add(task)
1574 self.scenequeue_updatecounters(task)
1576 def task_skip(self, task):
1577 self.runq_running[task] = 1
1578 self.runq_buildable[task] = 1
1579 self.task_completeoutright(task)
1580 self.stats.taskCompleted()
1581 self.stats.taskSkipped()
1585 Run the tasks in a queue prepared by prepare_runqueue
1589 if self.stats.active < self.number_tasks:
1590 # Find the next setscene to run
1591 for nexttask in xrange(self.stats.total):
1592 if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
1595 if task is not None:
1596 realtask = self.rqdata.runq_setscene[task]
1597 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1599 taskname = self.rqdata.runq_task[realtask] + "_setscene"
1600 if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask]):
1601 logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant',
1602 task, self.rqdata.get_user_idstring(realtask))
1603 self.task_failoutright(task)
1606 if self.cooker.configuration.force:
1607 for target in self.rqdata.target_pairs:
1608 if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
1609 self.task_failoutright(task)
1612 if self.rq.check_stamp_task(realtask, taskname):
1613 logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies',
1614 task, self.rqdata.get_user_idstring(realtask))
1615 self.task_skip(task)
1618 pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname)
1620 self.build_pids[pid] = task
1621 self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
1622 self.runq_running[task] = 1
1623 self.stats.taskActive()
1624 if self.stats.active < self.number_tasks:
1627 for pipe in self.build_pipes:
1628 self.build_pipes[pipe].read()
1630 if self.stats.active > 0:
1631 if self.runqueue_process_waitpid() is None:
1635 # Convert scenequeue_covered task numbers into full taskgraph ids
1636 oldcovered = self.scenequeue_covered
1637 self.rq.scenequeue_covered = set()
1638 for task in oldcovered:
1639 self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
1641 logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered))
1643 self.rq.state = runQueueRunInit
1646 def fork_off_task(self, fn, task, taskname):
1647 return RunQueueExecute.fork_off_task(self, fn, task, taskname, quieterrors=True)
1649 class TaskFailure(Exception):
1651 Exception raised when a task in a runqueue fails
1653 def __init__(self, x):
1657 class runQueueExitWait(bb.event.Event):
1659 Event when waiting for task processes to exit
1662 def __init__(self, remain):
1663 self.remain = remain
1664 self.message = "Waiting for %s active tasks to finish" % remain
1665 bb.event.Event.__init__(self)
1667 class runQueueEvent(bb.event.Event):
1669 Base runQueue event class
1671 def __init__(self, task, stats, rq):
1673 self.taskstring = rq.rqdata.get_user_idstring(task)
1674 self.stats = stats.copy()
1675 bb.event.Event.__init__(self)
1677 class runQueueTaskStarted(runQueueEvent):
1679 Event notifing a task was started
1681 def __init__(self, task, stats, rq, noexec=False):
1682 runQueueEvent.__init__(self, task, stats, rq)
1683 self.noexec = noexec
1685 class runQueueTaskFailed(runQueueEvent):
1687 Event notifing a task failed
1689 def __init__(self, task, stats, exitcode, rq):
1690 runQueueEvent.__init__(self, task, stats, rq)
1691 self.exitcode = exitcode
1693 class sceneQueueTaskFailed(runQueueTaskFailed):
1695 Event notifing a setscene task failed
1697 def __init__(self, task, stats, exitcode, rq):
1698 runQueueTaskFailed.__init__(self, task, stats, exitcode, rq)
1699 self.taskstring = rq.rqdata.get_user_idstring(task, "_setscene")
1701 class runQueueTaskCompleted(runQueueEvent):
1703 Event notifing a task completed
1706 def check_stamp_fn(fn, taskname, d):
1707 rqexe = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)
1708 fn = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", d)
1709 fnid = rqexe.rqdata.taskData.getfn_id(fn)
1710 taskid = rqexe.rqdata.get_task_id(fnid, taskname)
1711 if taskid is not None:
1712 return rqexe.rq.check_stamp_task(taskid)
1715 class runQueuePipe():
1717 Abstraction for a pipe between a worker thread and the server
1719 def __init__(self, pipein, pipeout, d):
1722 fcntl.fcntl(self.input, fcntl.F_SETFL, fcntl.fcntl(self.input, fcntl.F_GETFL) | os.O_NONBLOCK)
1727 start = len(self.queue)
1729 self.queue = self.queue + self.input.read(102400)
1730 except (OSError, IOError):
1732 end = len(self.queue)
1733 index = self.queue.find("</event>")
1735 bb.event.fire_from_worker(self.queue[:index+8], self.d)
1736 self.queue = self.queue[index+8:]
1737 index = self.queue.find("</event>")
1738 return (end > start)
1743 if len(self.queue) > 0:
1744 print("Warning, worker left partial message: %s" % self.queue)