bitbake/runqueue.py: Sort the list of skipped tasks as it makes searching the list...
[bitbake.git] / lib / bb / runqueue.py
1 #!/usr/bin/env python
2 # ex:ts=4:sw=4:sts=4:et
3 # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4 """
5 BitBake 'RunQueue' implementation
6
7 Handles preparation and execution of a queue of tasks
8 """
9
10 # Copyright (C) 2006-2007  Richard Purdie
11 #
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License version 2 as
14 # published by the Free Software Foundation.
15 #
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 # GNU General Public License for more details.
20 #
21 # You should have received a copy of the GNU General Public License along
22 # with this program; if not, write to the Free Software Foundation, Inc.,
23 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25 import copy
26 import os
27 import sys
28 import signal
29 import stat
30 import fcntl
31 import logging
32 import bb
33 from bb import msg, data, event
34
35 bblogger = logging.getLogger("BitBake")
36 logger = logging.getLogger("BitBake.RunQueue")
37
38 class RunQueueStats:
39     """
40     Holds statistics on the tasks handled by the associated runQueue
41     """
42     def __init__(self, total):
43         self.completed = 0
44         self.skipped = 0
45         self.failed = 0
46         self.active = 0
47         self.total = total
48
49     def copy(self):
50         obj = self.__class__(self.total)
51         obj.__dict__.update(self.__dict__)
52         return obj
53
54     def taskFailed(self):
55         self.active = self.active - 1
56         self.failed = self.failed + 1
57
58     def taskCompleted(self, number = 1):
59         self.active = self.active - number
60         self.completed = self.completed + number
61
62     def taskSkipped(self, number = 1):
63         self.active = self.active + number
64         self.skipped = self.skipped + number
65
66     def taskActive(self):
67         self.active = self.active + 1
68
69 # These values indicate the next step due to be run in the
70 # runQueue state machine
71 runQueuePrepare = 2
72 runQueueSceneInit = 3
73 runQueueSceneRun = 4
74 runQueueRunInit = 5
75 runQueueRunning = 6
76 runQueueFailed = 7
77 runQueueCleanUp = 8
78 runQueueComplete = 9
79 runQueueChildProcess = 10
80
81 class RunQueueScheduler(object):
82     """
83     Control the order tasks are scheduled in.
84     """
85     name = "basic"
86
87     def __init__(self, runqueue, rqdata):
88         """
89         The default scheduler just returns the first buildable task (the
90         priority map is sorted by task numer)
91         """
92         self.rq = runqueue
93         self.rqdata = rqdata
94         numTasks = len(self.rqdata.runq_fnid)
95
96         self.prio_map = []
97         self.prio_map.extend(range(numTasks))
98
99     def next_buildable_task(self):
100         """
101         Return the id of the first task we find that is buildable
102         """
103         for tasknum in xrange(len(self.rqdata.runq_fnid)):
104             taskid = self.prio_map[tasknum]
105             if self.rq.runq_running[taskid] == 1:
106                 continue
107             if self.rq.runq_buildable[taskid] == 1:
108                 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[taskid]]
109                 taskname = self.rqdata.runq_task[taskid]
110                 stamp = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
111                 if stamp in self.rq.build_stamps.values():
112                     continue
113                 return taskid
114
115     def next(self):
116         """
117         Return the id of the task we should build next
118         """
119         if self.rq.stats.active < self.rq.number_tasks:
120             return self.next_buildable_task()
121
122 class RunQueueSchedulerSpeed(RunQueueScheduler):
123     """
124     A scheduler optimised for speed. The priority map is sorted by task weight,
125     heavier weighted tasks (tasks needed by the most other tasks) are run first.
126     """
127     name = "speed"
128
129     def __init__(self, runqueue, rqdata):
130         """
131         The priority map is sorted by task weight.
132         """
133
134         self.rq = runqueue
135         self.rqdata = rqdata
136
137         sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight))
138         copyweight = copy.deepcopy(self.rqdata.runq_weight)
139         self.prio_map = []
140
141         for weight in sortweight:
142             idx = copyweight.index(weight)
143             self.prio_map.append(idx)
144             copyweight[idx] = -1
145
146         self.prio_map.reverse()
147
148 class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
149     """
150     A scheduler optimised to complete .bb files are quickly as possible. The
151     priority map is sorted by task weight, but then reordered so once a given
152     .bb file starts to build, its completed as quickly as possible. This works
153     well where disk space is at a premium and classes like OE's rm_work are in
154     force.
155     """
156     name = "completion"
157
158     def __init__(self, runqueue, rqdata):
159         RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata)
160
161         #FIXME - whilst this groups all fnids together it does not reorder the
162         #fnid groups optimally.
163
164         basemap = copy.deepcopy(self.prio_map)
165         self.prio_map = []
166         while (len(basemap) > 0):
167             entry = basemap.pop(0)
168             self.prio_map.append(entry)
169             fnid = self.rqdata.runq_fnid[entry]
170             todel = []
171             for entry in basemap:
172                 entry_fnid = self.rqdata.runq_fnid[entry]
173                 if entry_fnid == fnid:
174                     todel.append(basemap.index(entry))
175                     self.prio_map.append(entry)
176             todel.reverse()
177             for idx in todel:
178                 del basemap[idx]
179
180 class RunQueueData:
181     """
182     BitBake Run Queue implementation
183     """
184     def __init__(self, rq, cooker, cfgData, dataCache, taskData, targets):
185         self.cooker = cooker
186         self.dataCache = dataCache
187         self.taskData = taskData
188         self.targets = targets
189         self.rq = rq
190
191         self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or ""
192         self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split()
193
194         self.reset()
195
196     def reset(self):
197         self.runq_fnid = []
198         self.runq_task = []
199         self.runq_depends = []
200         self.runq_revdeps = []
201         self.runq_hash = []
202
203     def runq_depends_names(self, ids):
204         import re
205         ret = []
206         for id in self.runq_depends[ids]:
207             nam = os.path.basename(self.get_user_idstring(id))
208             nam = re.sub("_[^,]*,", ",", nam)
209             ret.extend([nam])
210         return ret
211
212     def get_user_idstring(self, task, task_name_suffix = ""):
213         fn = self.taskData.fn_index[self.runq_fnid[task]]
214         taskname = self.runq_task[task] + task_name_suffix
215         return "%s, %s" % (fn, taskname)
216
217     def get_task_id(self, fnid, taskname):
218         for listid in xrange(len(self.runq_fnid)):
219             if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname:
220                 return listid
221         return None
222
223     def circular_depchains_handler(self, tasks):
224         """
225         Some tasks aren't buildable, likely due to circular dependency issues.
226         Identify the circular dependencies and print them in a user readable format.
227         """
228         from copy import deepcopy
229
230         valid_chains = []
231         explored_deps = {}
232         msgs = []
233
234         def chain_reorder(chain):
235             """
236             Reorder a dependency chain so the lowest task id is first
237             """
238             lowest = 0
239             new_chain = []
240             for entry in xrange(len(chain)):
241                 if chain[entry] < chain[lowest]:
242                     lowest = entry
243             new_chain.extend(chain[lowest:])
244             new_chain.extend(chain[:lowest])
245             return new_chain
246
247         def chain_compare_equal(chain1, chain2):
248             """
249             Compare two dependency chains and see if they're the same
250             """
251             if len(chain1) != len(chain2):
252                 return False
253             for index in xrange(len(chain1)):
254                 if chain1[index] != chain2[index]:
255                     return False
256             return True
257
258         def chain_array_contains(chain, chain_array):
259             """
260             Return True if chain_array contains chain
261             """
262             for ch in chain_array:
263                 if chain_compare_equal(ch, chain):
264                     return True
265             return False
266
267         def find_chains(taskid, prev_chain):
268             prev_chain.append(taskid)
269             total_deps = []
270             total_deps.extend(self.runq_revdeps[taskid])
271             for revdep in self.runq_revdeps[taskid]:
272                 if revdep in prev_chain:
273                     idx = prev_chain.index(revdep)
274                     # To prevent duplicates, reorder the chain to start with the lowest taskid
275                     # and search through an array of those we've already printed
276                     chain = prev_chain[idx:]
277                     new_chain = chain_reorder(chain)
278                     if not chain_array_contains(new_chain, valid_chains):
279                         valid_chains.append(new_chain)
280                         msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
281                         for dep in new_chain:
282                             msgs.append("  Task %s (%s) (dependent Tasks %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends_names(dep)))
283                         msgs.append("\n")
284                     if len(valid_chains) > 10:
285                         msgs.append("Aborted dependency loops search after 10 matches.\n")
286                         return msgs
287                     continue
288                 scan = False
289                 if revdep not in explored_deps:
290                     scan = True
291                 elif revdep in explored_deps[revdep]:
292                     scan = True
293                 else:
294                     for dep in prev_chain:
295                         if dep in explored_deps[revdep]:
296                             scan = True
297                 if scan:
298                     find_chains(revdep, copy.deepcopy(prev_chain))
299                 for dep in explored_deps[revdep]:
300                     if dep not in total_deps:
301                         total_deps.append(dep)
302
303             explored_deps[taskid] = total_deps
304
305         for task in tasks:
306             find_chains(task, [])
307
308         return msgs
309
310     def calculate_task_weights(self, endpoints):
311         """
312         Calculate a number representing the "weight" of each task. Heavier weighted tasks
313         have more dependencies and hence should be executed sooner for maximum speed.
314
315         This function also sanity checks the task list finding tasks that are not
316         possible to execute due to circular dependencies.
317         """
318
319         numTasks = len(self.runq_fnid)
320         weight = []
321         deps_left = []
322         task_done = []
323
324         for listid in xrange(numTasks):
325             task_done.append(False)
326             weight.append(0)
327             deps_left.append(len(self.runq_revdeps[listid]))
328
329         for listid in endpoints:
330             weight[listid] = 1
331             task_done[listid] = True
332
333         while True:
334             next_points = []
335             for listid in endpoints:
336                 for revdep in self.runq_depends[listid]:
337                     weight[revdep] = weight[revdep] + weight[listid]
338                     deps_left[revdep] = deps_left[revdep] - 1
339                     if deps_left[revdep] == 0:
340                         next_points.append(revdep)
341                         task_done[revdep] = True
342             endpoints = next_points
343             if len(next_points) == 0:
344                 break
345
346         # Circular dependency sanity check
347         problem_tasks = []
348         for task in xrange(numTasks):
349             if task_done[task] is False or deps_left[task] != 0:
350                 problem_tasks.append(task)
351                 logger.debug(2, "Task %s (%s) is not buildable", task, self.get_user_idstring(task))
352                 logger.debug(2, "(Complete marker was %s and the remaining dependency count was %s)\n", task_done[task], deps_left[task])
353
354         if problem_tasks:
355             message = "Unbuildable tasks were found.\n"
356             message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
357             message = message + "Identifying dependency loops (this may take a short while)...\n"
358             logger.error(message)
359
360             msgs = self.circular_depchains_handler(problem_tasks)
361
362             message = "\n"
363             for msg in msgs:
364                 message = message + msg
365             bb.msg.fatal("RunQueue", message)
366
367         return weight
368
369     def prepare(self):
370         """
371         Turn a set of taskData into a RunQueue and compute data needed
372         to optimise the execution order.
373         """
374
375         runq_build = []
376         recursive_tdepends = {}
377         runq_recrdepends = []
378         tdepends_fnid = {}
379
380         taskData = self.taskData
381
382         if len(taskData.tasks_name) == 0:
383             # Nothing to do
384             return 0
385
386         logger.info("Preparing runqueue")
387
388         # Step A - Work out a list of tasks to run
389         #
390         # Taskdata gives us a list of possible providers for every build and run
391         # target ordered by priority. It also gives information on each of those
392         # providers.
393         #
394         # To create the actual list of tasks to execute we fix the list of
395         # providers and then resolve the dependencies into task IDs. This
396         # process is repeated for each type of dependency (tdepends, deptask,
397         # rdeptast, recrdeptask, idepends).
398
399         def add_build_dependencies(depids, tasknames, depends):
400             for depid in depids:
401                 # Won't be in build_targets if ASSUME_PROVIDED
402                 if depid not in taskData.build_targets:
403                     continue
404                 depdata = taskData.build_targets[depid][0]
405                 if depdata is None:
406                     continue
407                 dep = taskData.fn_index[depdata]
408                 for taskname in tasknames:
409                     taskid = taskData.gettask_id(dep, taskname, False)
410                     if taskid is not None:
411                         depends.append(taskid)
412
413         def add_runtime_dependencies(depids, tasknames, depends):
414             for depid in depids:
415                 if depid not in taskData.run_targets:
416                     continue
417                 depdata = taskData.run_targets[depid][0]
418                 if depdata is None:
419                     continue
420                 dep = taskData.fn_index[depdata]
421                 for taskname in tasknames:
422                     taskid = taskData.gettask_id(dep, taskname, False)
423                     if taskid is not None:
424                         depends.append(taskid)
425
426         for task in xrange(len(taskData.tasks_name)):
427             depends = []
428             recrdepends = []
429             fnid = taskData.tasks_fnid[task]
430             fn = taskData.fn_index[fnid]
431             task_deps = self.dataCache.task_deps[fn]
432
433             logger.debug(2, "Processing %s:%s", fn, taskData.tasks_name[task])
434
435             if fnid not in taskData.failed_fnids:
436
437                 # Resolve task internal dependencies
438                 #
439                 # e.g. addtask before X after Y
440                 depends = taskData.tasks_tdepends[task]
441
442                 # Resolve 'deptask' dependencies
443                 #
444                 # e.g. do_sometask[deptask] = "do_someothertask"
445                 # (makes sure sometask runs after someothertask of all DEPENDS)
446                 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
447                     tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
448                     add_build_dependencies(taskData.depids[fnid], tasknames, depends)
449
450                 # Resolve 'rdeptask' dependencies
451                 #
452                 # e.g. do_sometask[rdeptask] = "do_someothertask"
453                 # (makes sure sometask runs after someothertask of all RDEPENDS)
454                 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
455                     taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
456                     add_runtime_dependencies(taskData.rdepids[fnid], [taskname], depends)
457
458                 # Resolve inter-task dependencies
459                 #
460                 # e.g. do_sometask[depends] = "targetname:do_someothertask"
461                 # (makes sure sometask runs after targetname's someothertask)
462                 if fnid not in tdepends_fnid:
463                     tdepends_fnid[fnid] = set()
464                 idepends = taskData.tasks_idepends[task]
465                 for (depid, idependtask) in idepends:
466                     if depid in taskData.build_targets:
467                         # Won't be in build_targets if ASSUME_PROVIDED
468                         depdata = taskData.build_targets[depid][0]
469                         if depdata is not None:
470                             dep = taskData.fn_index[depdata]
471                             taskid = taskData.gettask_id(dep, idependtask, False)
472                             if taskid is None:
473                                 bb.msg.fatal("RunQueue", "Task %s in %s depends upon nonexistant task %s in %s" % (taskData.tasks_name[task], fn, idependtask, dep))
474                             depends.append(taskid)
475                             if depdata != fnid:
476                                 tdepends_fnid[fnid].add(taskid)
477
478
479                 # Resolve recursive 'recrdeptask' dependencies (A)
480                 #
481                 # e.g. do_sometask[recrdeptask] = "do_someothertask"
482                 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
483                 # We cover the recursive part of the dependencies below
484                 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
485                     for taskname in task_deps['recrdeptask'][taskData.tasks_name[task]].split():
486                         recrdepends.append(taskname)
487                         add_build_dependencies(taskData.depids[fnid], [taskname], depends)
488                         add_runtime_dependencies(taskData.rdepids[fnid], [taskname], depends)
489
490                 # Rmove all self references
491                 if task in depends:
492                     newdep = []
493                     logger.debug(2, "Task %s (%s %s) contains self reference! %s", task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends)
494                     for dep in depends:
495                         if task != dep:
496                             newdep.append(dep)
497                     depends = newdep
498
499             self.runq_fnid.append(taskData.tasks_fnid[task])
500             self.runq_task.append(taskData.tasks_name[task])
501             self.runq_depends.append(set(depends))
502             self.runq_revdeps.append(set())
503             self.runq_hash.append("")
504
505             runq_build.append(0)
506             runq_recrdepends.append(recrdepends)
507
508         #
509         # Build a list of recursive cumulative dependencies for each fnid
510         # We do this by fnid, since if A depends on some task in B
511         # we're interested in later tasks B's fnid might have but B itself
512         # doesn't depend on
513         #
514         # Algorithm is O(tasks) + O(tasks)*O(fnids)
515         #
516         reccumdepends = {}
517         for task in xrange(len(self.runq_fnid)):
518             fnid = self.runq_fnid[task]
519             if fnid not in reccumdepends:
520                 if fnid in tdepends_fnid:
521                     reccumdepends[fnid] = tdepends_fnid[fnid]
522                 else:
523                     reccumdepends[fnid] = set()
524             reccumdepends[fnid].update(self.runq_depends[task])
525         for task in xrange(len(self.runq_fnid)):
526             taskfnid = self.runq_fnid[task]
527             for fnid in reccumdepends:
528                 if task in reccumdepends[fnid]:
529                     reccumdepends[fnid].add(task)
530                     if taskfnid in reccumdepends:
531                         reccumdepends[fnid].update(reccumdepends[taskfnid])
532
533
534         # Resolve recursive 'recrdeptask' dependencies (B)
535         #
536         # e.g. do_sometask[recrdeptask] = "do_someothertask"
537         # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
538         for task in xrange(len(self.runq_fnid)):
539             if len(runq_recrdepends[task]) > 0:
540                 taskfnid = self.runq_fnid[task]
541                 for dep in reccumdepends[taskfnid]:
542                     # Ignore self references
543                     if dep == task:
544                         continue
545                     for taskname in runq_recrdepends[task]:
546                         if taskData.tasks_name[dep] == taskname:
547                             self.runq_depends[task].add(dep)
548
549         # Step B - Mark all active tasks
550         #
551         # Start with the tasks we were asked to run and mark all dependencies
552         # as active too. If the task is to be 'forced', clear its stamp. Once
553         # all active tasks are marked, prune the ones we don't need.
554
555         logger.verbose("Marking Active Tasks")
556
557         def mark_active(listid, depth):
558             """
559             Mark an item as active along with its depends
560             (calls itself recursively)
561             """
562
563             if runq_build[listid] == 1:
564                 return
565
566             runq_build[listid] = 1
567
568             depends = self.runq_depends[listid]
569             for depend in depends:
570                 mark_active(depend, depth+1)
571
572         self.target_pairs = []
573         for target in self.targets:
574             targetid = taskData.getbuild_id(target[0])
575
576             if targetid not in taskData.build_targets:
577                 continue
578
579             if targetid in taskData.failed_deps:
580                 continue
581
582             fnid = taskData.build_targets[targetid][0]
583             fn = taskData.fn_index[fnid]
584             self.target_pairs.append((fn, target[1]))
585
586             if fnid in taskData.failed_fnids:
587                 continue
588
589             if target[1] not in taskData.tasks_lookup[fnid]:
590                 bb.msg.fatal("RunQueue", "Task %s does not exist for target %s" % (target[1], target[0]))
591
592             listid = taskData.tasks_lookup[fnid][target[1]]
593
594             mark_active(listid, 1)
595
596         # Step C - Prune all inactive tasks
597         #
598         # Once all active tasks are marked, prune the ones we don't need.
599
600         maps = []
601         delcount = 0
602         for listid in xrange(len(self.runq_fnid)):
603             if runq_build[listid-delcount] == 1:
604                 maps.append(listid-delcount)
605             else:
606                 del self.runq_fnid[listid-delcount]
607                 del self.runq_task[listid-delcount]
608                 del self.runq_depends[listid-delcount]
609                 del runq_build[listid-delcount]
610                 del self.runq_revdeps[listid-delcount]
611                 del self.runq_hash[listid-delcount]
612                 delcount = delcount + 1
613                 maps.append(-1)
614
615         #
616         # Step D - Sanity checks and computation
617         #
618
619         # Check to make sure we still have tasks to run
620         if len(self.runq_fnid) == 0:
621             if not taskData.abort:
622                 bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
623             else:
624                 bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.")
625
626         logger.verbose("Pruned %s inactive tasks, %s left", delcount, len(self.runq_fnid))
627
628         # Remap the dependencies to account for the deleted tasks
629         # Check we didn't delete a task we depend on
630         for listid in xrange(len(self.runq_fnid)):
631             newdeps = []
632             origdeps = self.runq_depends[listid]
633             for origdep in origdeps:
634                 if maps[origdep] == -1:
635                     bb.msg.fatal("RunQueue", "Invalid mapping - Should never happen!")
636                 newdeps.append(maps[origdep])
637             self.runq_depends[listid] = set(newdeps)
638
639         logger.verbose("Assign Weightings")
640
641         # Generate a list of reverse dependencies to ease future calculations
642         for listid in xrange(len(self.runq_fnid)):
643             for dep in self.runq_depends[listid]:
644                 self.runq_revdeps[dep].add(listid)
645
646         # Identify tasks at the end of dependency chains
647         # Error on circular dependency loops (length two)
648         endpoints = []
649         for listid in xrange(len(self.runq_fnid)):
650             revdeps = self.runq_revdeps[listid]
651             if len(revdeps) == 0:
652                 endpoints.append(listid)
653             for dep in revdeps:
654                 if dep in self.runq_depends[listid]:
655                     #self.dump_data(taskData)
656                     bb.msg.fatal("RunQueue", "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep], taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
657
658         logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
659
660         # Calculate task weights
661         # Check of higher length circular dependencies
662         self.runq_weight = self.calculate_task_weights(endpoints)
663
664         # Sanity Check - Check for multiple tasks building the same provider
665         prov_list = {}
666         seen_fn = []
667         for task in xrange(len(self.runq_fnid)):
668             fn = taskData.fn_index[self.runq_fnid[task]]
669             if fn in seen_fn:
670                 continue
671             seen_fn.append(fn)
672             for prov in self.dataCache.fn_provides[fn]:
673                 if prov not in prov_list:
674                     prov_list[prov] = [fn]
675                 elif fn not in prov_list[prov]:
676                     prov_list[prov].append(fn)
677         error = False
678         for prov in prov_list:
679             if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
680                 error = True
681                 logger.error("Multiple .bb files are due to be built which each provide %s (%s).\n This usually means one provides something the other doesn't and should.", prov, " ".join(prov_list[prov]))
682
683
684         # Create a whitelist usable by the stamp checks
685         stampfnwhitelist = []
686         for entry in self.stampwhitelist.split():
687             entryid = self.taskData.getbuild_id(entry)
688             if entryid not in self.taskData.build_targets:
689                 continue
690             fnid = self.taskData.build_targets[entryid][0]
691             fn = self.taskData.fn_index[fnid]
692             stampfnwhitelist.append(fn)
693         self.stampfnwhitelist = stampfnwhitelist
694
695         # Interate over the task list looking for tasks with a 'setscene' function
696         self.runq_setscene = []
697         for task in range(len(self.runq_fnid)):
698             setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
699             if not setscene:
700                 continue
701             self.runq_setscene.append(task)
702
703         # Interate over the task list and call into the siggen code
704         dealtwith = set()
705         todeal = set(range(len(self.runq_fnid)))
706         while len(todeal) > 0:
707             for task in todeal.copy():
708                 if len(self.runq_depends[task] - dealtwith) == 0:
709                     dealtwith.add(task)
710                     todeal.remove(task)
711                     procdep = []
712                     for dep in self.runq_depends[task]:
713                         procdep.append(self.taskData.fn_index[self.runq_fnid[dep]] + "." + self.runq_task[dep])
714                     self.runq_hash[task] = bb.parse.siggen.get_taskhash(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task], procdep, self.dataCache)
715
716         self.hashes = {}
717         self.hash_deps = {}
718         for task in xrange(len(self.runq_fnid)):
719             identifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[task]],
720                                     self.runq_task[task])
721             self.hashes[identifier] = self.runq_hash[task]
722             deps = []
723             for dep in self.runq_depends[task]:
724                 depidentifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[dep]],
725                                            self.runq_task[dep])
726                 deps.append(depidentifier)
727             self.hash_deps[identifier] = deps
728
729         # Remove stamps for targets if force mode active
730         if self.cooker.configuration.force:
731             for (fn, target) in self.target_pairs:
732                 logger.verbose("Remove stamp %s, %s", target, fn)
733                 bb.build.del_stamp(target, self.dataCache, fn)
734
735         return len(self.runq_fnid)
736
737     def dump_data(self, taskQueue):
738         """
739         Dump some debug information on the internal data structures
740         """
741         logger.debug(3, "run_tasks:")
742         for task in xrange(len(self.rqdata.runq_task)):
743             logger.debug(3, " (%s)%s - %s: %s   Deps %s RevDeps %s", task,
744                          taskQueue.fn_index[self.rqdata.runq_fnid[task]],
745                          self.rqdata.runq_task[task],
746                          self.rqdata.runq_weight[task],
747                          self.rqdata.runq_depends[task],
748                          self.rqdata.runq_revdeps[task])
749
750         logger.debug(3, "sorted_tasks:")
751         for task1 in xrange(len(self.rqdata.runq_task)):
752             if task1 in self.prio_map:
753                 task = self.prio_map[task1]
754                 logger.debug(3, " (%s)%s - %s: %s   Deps %s RevDeps %s", task,
755                            taskQueue.fn_index[self.rqdata.runq_fnid[task]],
756                            self.rqdata.runq_task[task],
757                            self.rqdata.runq_weight[task],
758                            self.rqdata.runq_depends[task],
759                            self.rqdata.runq_revdeps[task])
760
761 class RunQueue:
762     def __init__(self, cooker, cfgData, dataCache, taskData, targets):
763
764         self.cooker = cooker
765         self.cfgData = cfgData
766         self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets)
767
768         self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, True) or "perfile"
769         self.hashvalidate = bb.data.getVar("BB_HASHCHECK_FUNCTION", cfgData, True) or None
770
771         self.state = runQueuePrepare
772
773     def check_stamps(self):
774         unchecked = {}
775         current = []
776         notcurrent = []
777         buildable = []
778
779         if self.stamppolicy == "perfile":
780             fulldeptree = False
781         else:
782             fulldeptree = True
783             stampwhitelist = []
784             if self.stamppolicy == "whitelist":
785                 stampwhitelist = self.rqdata.stampfnwhitelist
786
787         for task in xrange(len(self.rqdata.runq_fnid)):
788             unchecked[task] = ""
789             if len(self.rqdata.runq_depends[task]) == 0:
790                 buildable.append(task)
791
792         def check_buildable(self, task, buildable):
793             for revdep in self.rqdata.runq_revdeps[task]:
794                 alldeps = 1
795                 for dep in self.rqdata.runq_depends[revdep]:
796                     if dep in unchecked:
797                         alldeps = 0
798                 if alldeps == 1:
799                     if revdep in unchecked:
800                         buildable.append(revdep)
801
802         for task in xrange(len(self.rqdata.runq_fnid)):
803             if task not in unchecked:
804                 continue
805             fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
806             taskname = self.rqdata.runq_task[task]
807             stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
808             # If the stamp is missing its not current
809             if not os.access(stampfile, os.F_OK):
810                 del unchecked[task]
811                 notcurrent.append(task)
812                 check_buildable(self, task, buildable)
813                 continue
814             # If its a 'nostamp' task, it's not current
815             taskdep = self.rqdata.dataCache.task_deps[fn]
816             if 'nostamp' in taskdep and task in taskdep['nostamp']:
817                 del unchecked[task]
818                 notcurrent.append(task)
819                 check_buildable(self, task, buildable)
820                 continue
821
822         while (len(buildable) > 0):
823             nextbuildable = []
824             for task in buildable:
825                 if task in unchecked:
826                     fn = self.taskData.fn_index[self.rqdata.runq_fnid[task]]
827                     taskname = self.rqdata.runq_task[task]
828                     stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
829                     iscurrent = True
830
831                     t1 = os.stat(stampfile)[stat.ST_MTIME]
832                     for dep in self.rqdata.runq_depends[task]:
833                         if iscurrent:
834                             fn2 = self.taskData.fn_index[self.rqdata.runq_fnid[dep]]
835                             taskname2 = self.rqdata.runq_task[dep]
836                             stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
837                             if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
838                                 if dep in notcurrent:
839                                     iscurrent = False
840                                 else:
841                                     t2 = os.stat(stampfile2)[stat.ST_MTIME]
842                                     if t1 < t2:
843                                         iscurrent = False
844                     del unchecked[task]
845                     if iscurrent:
846                         current.append(task)
847                     else:
848                         notcurrent.append(task)
849
850                 check_buildable(self, task, nextbuildable)
851
852             buildable = nextbuildable
853
854         #for task in range(len(self.runq_fnid)):
855         #    fn = self.taskData.fn_index[self.runq_fnid[task]]
856         #    taskname = self.runq_task[task]
857         #    print "%s %s.%s" % (task, taskname, fn)
858
859         #print "Unchecked: %s" % unchecked
860         #print "Current: %s" % current
861         #print "Not current: %s" % notcurrent
862
863         if len(unchecked) > 0:
864             bb.msg.fatal("RunQueue", "check_stamps fatal internal error")
865         return current
866
867     def check_stamp_task(self, task, taskname = None):
868         def get_timestamp(f):
869             try:
870                 if not os.access(f, os.F_OK):
871                     return None
872                 return os.stat(f)[stat.ST_MTIME]
873             except:
874                 return None
875
876         if self.stamppolicy == "perfile":
877             fulldeptree = False
878         else:
879             fulldeptree = True
880             stampwhitelist = []
881             if self.stamppolicy == "whitelist":
882                 stampwhitelist = self.rqdata.stampfnwhitelist
883
884         fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
885         if taskname is None:
886             taskname = self.rqdata.runq_task[task]
887
888         stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
889
890         # If the stamp is missing its not current
891         if not os.access(stampfile, os.F_OK):
892             logger.debug(2, "Stampfile %s not available", stampfile)
893             return False
894         # If its a 'nostamp' task, it's not current
895         taskdep = self.rqdata.dataCache.task_deps[fn]
896         if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
897             logger.debug(2, "%s.%s is nostamp\n", fn, taskname)
898             return False
899
900         if taskname != "do_setscene" and taskname.endswith("_setscene"):
901             return True
902
903         iscurrent = True
904         t1 = get_timestamp(stampfile)
905         for dep in self.rqdata.runq_depends[task]:
906             if iscurrent:
907                 fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]]
908                 taskname2 = self.rqdata.runq_task[dep]
909                 stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
910                 stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCache, fn2)
911                 t2 = get_timestamp(stampfile2)
912                 t3 = get_timestamp(stampfile3)
913                 if t3 and t3 > t2:
914                    continue
915                 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
916                     if not t2:
917                         logger.debug(2, 'Stampfile %s does not exist', stampfile2)
918                         iscurrent = False
919                     if t1 < t2:
920                         logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2)
921                         iscurrent = False
922
923         return iscurrent
924
925     def execute_runqueue(self):
926         """
927         Run the tasks in a queue prepared by rqdata.prepare()
928         Upon failure, optionally try to recover the build using any alternate providers
929         (if the abort on failure configuration option isn't set)
930         """
931
932         retval = 0.5
933
934         if self.state is runQueuePrepare:
935             self.rqexe = RunQueueExecuteDummy(self)
936             if self.rqdata.prepare() == 0:
937                 self.state = runQueueComplete
938             else:
939                 self.state = runQueueSceneInit
940
941         if self.state is runQueueSceneInit:
942             if self.cooker.configuration.dump_signatures:
943                 self.dump_signatures()
944             else:
945                 self.rqexe = RunQueueExecuteScenequeue(self)
946
947         if self.state is runQueueSceneRun:
948             retval = self.rqexe.execute()
949
950         if self.state is runQueueRunInit:
951             logger.info("Executing RunQueue Tasks")
952             self.rqexe = RunQueueExecuteTasks(self)
953             self.state = runQueueRunning
954
955         if self.state is runQueueRunning:
956             retval = self.rqexe.execute()
957
958         if self.state is runQueueCleanUp:
959            self.rqexe.finish()
960
961         if self.state is runQueueFailed:
962             if not self.rqdata.taskData.tryaltconfigs:
963                 raise bb.runqueue.TaskFailure(self.rqexe.failed_fnids)
964             for fnid in self.rqexe.failed_fnids:
965                 self.rqdata.taskData.fail_fnid(fnid)
966             self.rqdata.reset()
967
968         if self.state is runQueueComplete:
969             # All done
970             logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
971             return False
972
973         if self.state is runQueueChildProcess:
974             print("Child process, eeek, shouldn't happen!")
975             return False
976
977         # Loop
978         return retval
979
980     def finish_runqueue(self, now = False):
981         if now:
982             self.rqexe.finish_now()
983         else:
984             self.rqexe.finish()
985
986     def dump_signatures(self):
987         self.state = runQueueComplete
988         done = set()
989         bb.note("Reparsing files to collect dependency data")
990         for task in range(len(self.rqdata.runq_fnid)):
991             if self.rqdata.runq_fnid[task] not in done:
992                 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
993                 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
994                 done.add(self.rqdata.runq_fnid[task])
995
996         bb.parse.siggen.dump_sigs(self.rqdata.dataCache)
997
998         return
999
1000
1001 class RunQueueExecute:
1002
1003     def __init__(self, rq):
1004         self.rq = rq
1005         self.cooker = rq.cooker
1006         self.cfgData = rq.cfgData
1007         self.rqdata = rq.rqdata
1008
1009         self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", self.cfgData, 1) or 1)
1010         self.scheduler = bb.data.getVar("BB_SCHEDULER", self.cfgData, 1) or "speed"
1011
1012         self.runq_buildable = []
1013         self.runq_running = []
1014         self.runq_complete = []
1015         self.build_pids = {}
1016         self.build_pipes = {}
1017         self.build_stamps = {}
1018         self.failed_fnids = []
1019
1020     def runqueue_process_waitpid(self):
1021         """
1022         Return none is there are no processes awaiting result collection, otherwise
1023         collect the process exit codes and close the information pipe.
1024         """
1025         result = os.waitpid(-1, os.WNOHANG)
1026         if result[0] == 0 and result[1] == 0:
1027             return None
1028         task = self.build_pids[result[0]]
1029         del self.build_pids[result[0]]
1030         self.build_pipes[result[0]].close()
1031         del self.build_pipes[result[0]]
1032         # self.build_stamps[result[0]] may not exist when use shared work directory.
1033         if result[0] in self.build_stamps.keys():
1034             del self.build_stamps[result[0]]
1035         if result[1] != 0:
1036             self.task_fail(task, result[1]>>8)
1037         else:
1038             self.task_complete(task)
1039         return True
1040
1041     def finish_now(self):
1042         if self.stats.active:
1043             logger.info("Sending SIGTERM to remaining %s tasks", self.stats.active)
1044             for k, v in self.build_pids.iteritems():
1045                 try:
1046                     os.kill(-k, signal.SIGTERM)
1047                 except:
1048                     pass
1049         for pipe in self.build_pipes:
1050             self.build_pipes[pipe].read()
1051
1052     def finish(self):
1053         self.rq.state = runQueueCleanUp
1054
1055         for pipe in self.build_pipes:
1056             self.build_pipes[pipe].read()
1057
1058         if self.stats.active > 0:
1059             bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1060             self.runqueue_process_waitpid()
1061             return
1062
1063         if len(self.failed_fnids) != 0:
1064             self.rq.state = runQueueFailed
1065             return
1066
1067         self.rq.state = runQueueComplete
1068         return
1069
1070     def fork_off_task(self, fn, task, taskname, quieterrors=False):
1071         # We need to setup the environment BEFORE the fork, since
1072         # a fork() or exec*() activates PSEUDO...
1073
1074         envbackup = {}
1075         fakeenv = {}
1076         umask = None
1077
1078         taskdep = self.rqdata.dataCache.task_deps[fn]
1079         if 'umask' in taskdep and taskname in taskdep['umask']:
1080             # umask might come in as a number or text string..
1081             try:
1082                  umask = int(taskdep['umask'][taskname],8)
1083             except TypeError:
1084                  umask = taskdep['umask'][taskname]
1085
1086         if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
1087             envvars = (self.rqdata.dataCache.fakerootenv[fn] or "").split()
1088             for key, value in (var.split('=') for var in envvars):
1089                 envbackup[key] = os.environ.get(key)
1090                 os.environ[key] = value
1091                 fakeenv[key] = value
1092
1093             fakedirs = (self.rqdata.dataCache.fakerootdirs[fn] or "").split()
1094             for p in fakedirs:
1095                 bb.utils.mkdirhier(p)
1096
1097             logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
1098                             (fn, taskname, ', '.join(fakedirs)))
1099         else:
1100             envvars = (self.rqdata.dataCache.fakerootnoenv[fn] or "").split()
1101             for key, value in (var.split('=') for var in envvars):
1102                 envbackup[key] = os.environ.get(key)
1103                 os.environ[key] = value
1104                 fakeenv[key] = value
1105
1106         sys.stdout.flush()
1107         sys.stderr.flush()
1108         try:
1109             pipein, pipeout = os.pipe()
1110             pipein = os.fdopen(pipein, 'rb', 4096)
1111             pipeout = os.fdopen(pipeout, 'wb', 0)
1112             pid = os.fork()
1113         except OSError as e:
1114             bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
1115
1116         if pid == 0:
1117             pipein.close()
1118
1119             # Save out the PID so that the event can include it the
1120             # events
1121             bb.event.worker_pid = os.getpid()
1122             bb.event.worker_pipe = pipeout
1123
1124             self.rq.state = runQueueChildProcess
1125             # Make the child the process group leader
1126             os.setpgid(0, 0)
1127             # No stdin
1128             newsi = os.open(os.devnull, os.O_RDWR)
1129             os.dup2(newsi, sys.stdin.fileno())
1130
1131             if umask:
1132                 os.umask(umask)
1133
1134             bb.data.setVar("BB_WORKERCONTEXT", "1", self.cooker.configuration.data)
1135             bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
1136             bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data)
1137             bb.parse.siggen.set_taskdata(self.rqdata.hashes, self.rqdata.hash_deps)
1138             ret = 0
1139             try:
1140                 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
1141                 the_data.setVar('BB_TASKHASH', self.rqdata.runq_hash[task])
1142                 for h in self.rqdata.hashes:
1143                     the_data.setVar("BBHASH_%s" % h, self.rqdata.hashes[h])
1144                 for h in self.rqdata.hash_deps:
1145                     the_data.setVar("BBHASHDEPS_%s" % h, self.rqdata.hash_deps[h])
1146
1147                 # exported_vars() returns a generator which *cannot* be passed to os.environ.update() 
1148                 # successfully. We also need to unset anything from the environment which shouldn't be there 
1149                 exports = bb.data.exported_vars(the_data)
1150                 bb.utils.empty_environment()
1151                 for e, v in exports:
1152                     os.environ[e] = v
1153                 for e in fakeenv:
1154                     os.environ[e] = fakeenv[e]
1155                     the_data.setVar(e, fakeenv[e])
1156
1157                 if quieterrors:
1158                     the_data.setVarFlag(taskname, "quieterrors", "1")
1159
1160             except Exception as exc:
1161                 if not quieterrors:
1162                     logger.critical(str(exc))
1163                 os._exit(1)
1164             try:
1165                 ret = bb.build.exec_task(fn, taskname, the_data)
1166                 os._exit(ret)
1167             except:
1168                 os._exit(1)
1169         else:
1170             for key, value in envbackup.iteritems():
1171                 if value is None:
1172                     del os.environ[key]
1173                 else:
1174                     os.environ[key] = value
1175
1176         return pid, pipein, pipeout
1177
1178 class RunQueueExecuteDummy(RunQueueExecute):
1179     def __init__(self, rq):
1180         self.rq = rq
1181         self.stats = RunQueueStats(0)
1182
1183     def finish(self):
1184         self.rq.state = runQueueComplete
1185         return
1186
1187 class RunQueueExecuteTasks(RunQueueExecute):
1188     def __init__(self, rq):
1189         RunQueueExecute.__init__(self, rq)
1190
1191         self.stats = RunQueueStats(len(self.rqdata.runq_fnid))
1192
1193         # Mark initial buildable tasks
1194         for task in xrange(self.stats.total):
1195             self.runq_running.append(0)
1196             self.runq_complete.append(0)
1197             if len(self.rqdata.runq_depends[task]) == 0:
1198                 self.runq_buildable.append(1)
1199             else:
1200                 self.runq_buildable.append(0)
1201             if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
1202                 self.rq.scenequeue_covered.add(task)
1203
1204         found = True
1205         while found:
1206             found = False
1207             for task in xrange(self.stats.total):
1208                 if task in self.rq.scenequeue_covered:
1209                     continue
1210                 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
1211                     found = True
1212                     for revdep in self.rqdata.runq_revdeps[task]:
1213                         if self.rqdata.runq_fnid[task] != self.rqdata.runq_fnid[revdep]:
1214                             found = False
1215                             break
1216                     if found:
1217                         self.rq.scenequeue_covered.add(task)
1218
1219         # Detect when the real task needs to be run anyway by looking to see
1220         # if any of its dependencies within the same package are scheduled
1221         # to be run.
1222         covered_remove = set()
1223         for task in self.rq.scenequeue_covered:
1224             task_fnid = self.rqdata.runq_fnid[task]
1225             for dep in self.rqdata.runq_depends[task]:
1226                 if self.rqdata.runq_fnid[dep] == task_fnid:
1227                     if dep not in self.rq.scenequeue_covered:
1228                         covered_remove.add(task)
1229                         break
1230
1231         for task in covered_remove:
1232             fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1233             taskname = self.rqdata.runq_task[task] + '_setscene'
1234             bb.build.del_stamp(taskname, self.rqdata.dataCache, fn)
1235             logger.debug(1, 'Not skipping task %s because it will have to be run anyway', task)
1236             self.rq.scenequeue_covered.remove(task)
1237
1238         logger.debug(1, 'Full skip list %s', self.rq.scenequeue_covered)
1239
1240         for task in self.rq.scenequeue_covered:
1241             self.task_skip(task)
1242
1243         event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
1244
1245         schedulers = self.get_schedulers()
1246         for scheduler in schedulers:
1247             if self.scheduler == scheduler.name:
1248                 self.sched = scheduler(self, self.rqdata)
1249                 logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name)
1250                 break
1251         else:
1252             bb.fatal("Invalid scheduler '%s'.  Available schedulers: %s" %
1253                      (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1254
1255
1256     def get_schedulers(self):
1257         schedulers = set(obj for obj in globals().values()
1258                              if type(obj) is type and
1259                                 issubclass(obj, RunQueueScheduler))
1260
1261         user_schedulers = bb.data.getVar("BB_SCHEDULERS", self.cfgData, True)
1262         if user_schedulers:
1263             for sched in user_schedulers.split():
1264                 if not "." in sched:
1265                     bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
1266                     continue
1267
1268                 modname, name = sched.rsplit(".", 1)
1269                 try:
1270                     module = __import__(modname, fromlist=(name,))
1271                 except ImportError as exc:
1272                     logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
1273                     raise SystemExit(1)
1274                 else:
1275                     schedulers.add(getattr(module, name))
1276         return schedulers
1277
1278     def task_completeoutright(self, task):
1279         """
1280         Mark a task as completed
1281         Look at the reverse dependencies and mark any task with
1282         completed dependencies as buildable
1283         """
1284         self.runq_complete[task] = 1
1285         for revdep in self.rqdata.runq_revdeps[task]:
1286             if self.runq_running[revdep] == 1:
1287                 continue
1288             if self.runq_buildable[revdep] == 1:
1289                 continue
1290             alldeps = 1
1291             for dep in self.rqdata.runq_depends[revdep]:
1292                 if self.runq_complete[dep] != 1:
1293                     alldeps = 0
1294             if alldeps == 1:
1295                 self.runq_buildable[revdep] = 1
1296                 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
1297                 taskname = self.rqdata.runq_task[revdep]
1298                 logger.debug(1, "Marking task %s (%s, %s) as buildable", revdep, fn, taskname)
1299
1300     def task_complete(self, task):
1301         self.stats.taskCompleted()
1302         bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1303         self.task_completeoutright(task)
1304
1305     def task_fail(self, task, exitcode):
1306         """
1307         Called when a task has failed
1308         Updates the state engine with the failure
1309         """
1310         self.stats.taskFailed()
1311         fnid = self.rqdata.runq_fnid[task]
1312         self.failed_fnids.append(fnid)
1313         bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq), self.cfgData)
1314         if self.rqdata.taskData.abort:
1315             self.rq.state = runQueueCleanUp
1316
1317     def task_skip(self, task):
1318         self.runq_running[task] = 1
1319         self.runq_buildable[task] = 1
1320         self.task_completeoutright(task)
1321         self.stats.taskCompleted()
1322         self.stats.taskSkipped()
1323
1324     def execute(self):
1325         """
1326         Run the tasks in a queue prepared by rqdata.prepare()
1327         """
1328
1329         if self.stats.total == 0:
1330             # nothing to do
1331             self.rq.state = runQueueCleanUp
1332
1333         task = self.sched.next()
1334         if task is not None:
1335             fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1336
1337             taskname = self.rqdata.runq_task[task]
1338             if self.rq.check_stamp_task(task, taskname):
1339                 logger.debug(2, "Stamp current task %s (%s)", task,
1340                                 self.rqdata.get_user_idstring(task))
1341                 self.task_skip(task)
1342                 return True
1343             elif self.cooker.configuration.dry_run:
1344                 self.runq_running[task] = 1
1345                 self.runq_buildable[task] = 1
1346                 self.stats.taskActive()
1347                 self.task_complete(task)
1348                 return True
1349
1350             taskdep = self.rqdata.dataCache.task_deps[fn]
1351             if 'noexec' in taskdep and taskname in taskdep['noexec']:
1352                 startevent = runQueueTaskStarted(task, self.stats, self.rq,
1353                                                  noexec=True)
1354                 bb.event.fire(startevent, self.cfgData)
1355                 self.runq_running[task] = 1
1356                 self.stats.taskActive()
1357                 bb.build.make_stamp(taskname, self.rqdata.dataCache, fn)
1358                 self.task_complete(task)
1359                 return True
1360             else:
1361                 startevent = runQueueTaskStarted(task, self.stats, self.rq)
1362                 bb.event.fire(startevent, self.cfgData)
1363
1364             pid, pipein, pipeout = self.fork_off_task(fn, task, taskname)
1365
1366             self.build_pids[pid] = task
1367             self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
1368             self.build_stamps[pid] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
1369             self.runq_running[task] = 1
1370             self.stats.taskActive()
1371             if self.stats.active < self.number_tasks:
1372                 return True
1373
1374         for pipe in self.build_pipes:
1375             self.build_pipes[pipe].read()
1376
1377         if self.stats.active > 0:
1378             if self.runqueue_process_waitpid() is None:
1379                 return 0.5
1380             return True
1381
1382         if len(self.failed_fnids) != 0:
1383             self.rq.state = runQueueFailed
1384             return True
1385
1386         # Sanity Checks
1387         for task in xrange(self.stats.total):
1388             if self.runq_buildable[task] == 0:
1389                 logger.error("Task %s never buildable!", task)
1390             if self.runq_running[task] == 0:
1391                 logger.error("Task %s never ran!", task)
1392             if self.runq_complete[task] == 0:
1393                 logger.error("Task %s never completed!", task)
1394         self.rq.state = runQueueComplete
1395         return True
1396
1397 class RunQueueExecuteScenequeue(RunQueueExecute):
1398     def __init__(self, rq):
1399         RunQueueExecute.__init__(self, rq)
1400
1401         self.scenequeue_covered = set()
1402         self.scenequeue_notcovered = set()
1403
1404         # If we don't have any setscene functions, skip this step
1405         if len(self.rqdata.runq_setscene) == 0:
1406             rq.scenequeue_covered = set()
1407             rq.state = runQueueRunInit
1408             return
1409
1410         self.stats = RunQueueStats(len(self.rqdata.runq_setscene))
1411
1412         endpoints = {}
1413         sq_revdeps = []
1414         sq_revdeps_new = []
1415         sq_revdeps_squash = []
1416
1417         # We need to construct a dependency graph for the setscene functions. Intermediate
1418         # dependencies between the setscene tasks only complicate the code. This code
1419         # therefore aims to collapse the huge runqueue dependency tree into a smaller one
1420         # only containing the setscene functions.
1421
1422         for task in xrange(self.stats.total):
1423             self.runq_running.append(0)
1424             self.runq_complete.append(0)
1425             self.runq_buildable.append(0)
1426
1427         for task in xrange(len(self.rqdata.runq_fnid)):
1428             sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task]))
1429             sq_revdeps_new.append(set())
1430             if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
1431                 endpoints[task] = None
1432
1433         for task in self.rqdata.runq_setscene:
1434             for dep in self.rqdata.runq_depends[task]:
1435                     endpoints[dep] = task
1436
1437         def process_endpoints(endpoints):
1438             newendpoints = {}
1439             for point, task in endpoints.items():
1440                 tasks = set()
1441                 if task:
1442                     tasks.add(task)
1443                 if sq_revdeps_new[point]:
1444                     tasks |= sq_revdeps_new[point]
1445                 sq_revdeps_new[point] = set()
1446                 for dep in self.rqdata.runq_depends[point]:
1447                     if point in sq_revdeps[dep]:
1448                         sq_revdeps[dep].remove(point)
1449                     if tasks:
1450                         sq_revdeps_new[dep] |= tasks
1451                     if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene:
1452                         newendpoints[dep] = task
1453             if len(newendpoints) != 0:
1454                 process_endpoints(newendpoints)
1455
1456         process_endpoints(endpoints)
1457
1458         for task in xrange(len(self.rqdata.runq_fnid)):
1459             if task in self.rqdata.runq_setscene:
1460                 deps = set()
1461                 for dep in sq_revdeps_new[task]:
1462                     deps.add(self.rqdata.runq_setscene.index(dep))
1463                 sq_revdeps_squash.append(deps)
1464             elif len(sq_revdeps_new[task]) != 0:
1465                 bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
1466
1467         #for task in xrange(len(sq_revdeps_squash)):
1468         #    print "Task %s: %s.%s is %s " % (task, self.taskData.fn_index[self.runq_fnid[self.runq_setscene[task]]], self.runq_task[self.runq_setscene[task]] + "_setscene", sq_revdeps_squash[task])
1469
1470         self.sq_deps = []
1471         self.sq_revdeps = sq_revdeps_squash
1472         self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps)
1473
1474         for task in xrange(len(self.sq_revdeps)):
1475             self.sq_deps.append(set())
1476         for task in xrange(len(self.sq_revdeps)):
1477             for dep in self.sq_revdeps[task]:
1478                 self.sq_deps[dep].add(task)
1479
1480         for task in xrange(len(self.sq_revdeps)):
1481             if len(self.sq_revdeps[task]) == 0:
1482                 self.runq_buildable[task] = 1
1483
1484         if self.rq.hashvalidate:
1485             sq_hash = []
1486             sq_hashfn = []
1487             sq_fn = []
1488             sq_taskname = []
1489             sq_task = []
1490             noexec = []
1491             stamppresent = []
1492             for task in xrange(len(self.sq_revdeps)):
1493                 realtask = self.rqdata.runq_setscene[task]
1494                 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1495                 taskname = self.rqdata.runq_task[realtask]
1496                 taskdep = self.rqdata.dataCache.task_deps[fn]
1497
1498                 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1499                     noexec.append(task)
1500                     self.task_skip(task)
1501                     bb.build.make_stamp(taskname + "_setscene", self.rqdata.dataCache, fn)
1502                     continue
1503
1504                 if self.rq.check_stamp_task(realtask, taskname + "_setscene"):
1505                     logger.debug(2, 'Setscene stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(realtask))
1506                     stamppresent.append(task)
1507                     self.task_skip(task)
1508                     continue
1509
1510                 sq_fn.append(fn)
1511                 sq_hashfn.append(self.rqdata.dataCache.hashfn[fn])
1512                 sq_hash.append(self.rqdata.runq_hash[realtask])
1513                 sq_taskname.append(taskname)
1514                 sq_task.append(task)
1515             call = self.rq.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
1516             locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.configuration.data }
1517             valid = bb.utils.better_eval(call, locs)
1518
1519             valid_new = stamppresent
1520             for v in valid:
1521                 valid_new.append(sq_task[v])
1522
1523             for task in xrange(len(self.sq_revdeps)):
1524                 if task not in valid_new and task not in noexec:
1525                     logger.debug(2, 'No package found, so skipping setscene task %s',
1526                                  self.rqdata.get_user_idstring(task))
1527                     self.task_failoutright(task)
1528
1529         logger.info('Executing SetScene Tasks')
1530
1531         self.rq.state = runQueueSceneRun
1532
1533     def scenequeue_updatecounters(self, task):
1534         for dep in self.sq_deps[task]:
1535             self.sq_revdeps2[dep].remove(task)
1536             if len(self.sq_revdeps2[dep]) == 0:
1537                 self.runq_buildable[dep] = 1
1538
1539     def task_completeoutright(self, task):
1540         """
1541         Mark a task as completed
1542         Look at the reverse dependencies and mark any task with
1543         completed dependencies as buildable
1544         """
1545
1546         index = self.rqdata.runq_setscene[task]
1547         logger.debug(1, 'Found task %s which could be accelerated',
1548                         self.rqdata.get_user_idstring(index))
1549
1550         self.scenequeue_covered.add(task)
1551         self.scenequeue_updatecounters(task)
1552
1553     def task_complete(self, task):
1554         self.stats.taskCompleted()
1555         self.task_completeoutright(task)
1556
1557     def task_fail(self, task, result):
1558         self.stats.taskFailed()
1559         index = self.rqdata.runq_setscene[task]
1560         bb.event.fire(sceneQueueTaskFailed(index, self.stats, result, self), self.cfgData)
1561         self.scenequeue_notcovered.add(task)
1562         self.scenequeue_updatecounters(task)
1563
1564     def task_failoutright(self, task):
1565         self.runq_running[task] = 1
1566         self.runq_buildable[task] = 1
1567         self.stats.taskCompleted()
1568         self.stats.taskSkipped()
1569         index = self.rqdata.runq_setscene[task]
1570         self.scenequeue_notcovered.add(task)
1571         self.scenequeue_updatecounters(task)
1572
1573     def task_skip(self, task):
1574         self.runq_running[task] = 1
1575         self.runq_buildable[task] = 1
1576         self.task_completeoutright(task)
1577         self.stats.taskCompleted()
1578         self.stats.taskSkipped()
1579
1580     def execute(self):
1581         """
1582         Run the tasks in a queue prepared by prepare_runqueue
1583         """
1584
1585         task = None
1586         if self.stats.active < self.number_tasks:
1587             # Find the next setscene to run
1588             for nexttask in xrange(self.stats.total):
1589                 if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
1590                     task = nexttask
1591                     break
1592         if task is not None:
1593             realtask = self.rqdata.runq_setscene[task]
1594             fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1595
1596             taskname = self.rqdata.runq_task[realtask] + "_setscene"
1597             if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask]):
1598                 logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant',
1599                              task, self.rqdata.get_user_idstring(realtask))
1600                 self.task_failoutright(task)
1601                 return True
1602
1603             if self.cooker.configuration.force:
1604                 for target in self.rqdata.target_pairs:
1605                     if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
1606                         self.task_failoutright(task)
1607                         return True
1608
1609             if self.rq.check_stamp_task(realtask, taskname):
1610                 logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies',
1611                              task, self.rqdata.get_user_idstring(realtask))
1612                 self.task_skip(task)
1613                 return True
1614
1615             pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname)
1616
1617             self.build_pids[pid] = task
1618             self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
1619             self.runq_running[task] = 1
1620             self.stats.taskActive()
1621             if self.stats.active < self.number_tasks:
1622                 return True
1623
1624         for pipe in self.build_pipes:
1625             self.build_pipes[pipe].read()
1626
1627         if self.stats.active > 0:
1628             if self.runqueue_process_waitpid() is None:
1629                 return 0.5
1630             return True
1631
1632         # Convert scenequeue_covered task numbers into full taskgraph ids
1633         oldcovered = self.scenequeue_covered
1634         self.rq.scenequeue_covered = set()
1635         for task in oldcovered:
1636             self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
1637
1638         logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered))
1639
1640         self.rq.state = runQueueRunInit
1641         return True
1642
1643     def fork_off_task(self, fn, task, taskname):
1644         return RunQueueExecute.fork_off_task(self, fn, task, taskname, quieterrors=True)
1645
1646 class TaskFailure(Exception):
1647     """
1648     Exception raised when a task in a runqueue fails
1649     """
1650     def __init__(self, x):
1651         self.args = x
1652
1653
1654 class runQueueExitWait(bb.event.Event):
1655     """
1656     Event when waiting for task processes to exit
1657     """
1658
1659     def __init__(self, remain):
1660         self.remain = remain
1661         self.message = "Waiting for %s active tasks to finish" % remain
1662         bb.event.Event.__init__(self)
1663
1664 class runQueueEvent(bb.event.Event):
1665     """
1666     Base runQueue event class
1667     """
1668     def __init__(self, task, stats, rq):
1669         self.taskid = task
1670         self.taskstring = rq.rqdata.get_user_idstring(task)
1671         self.stats = stats.copy()
1672         bb.event.Event.__init__(self)
1673
1674 class runQueueTaskStarted(runQueueEvent):
1675     """
1676     Event notifing a task was started
1677     """
1678     def __init__(self, task, stats, rq, noexec=False):
1679         runQueueEvent.__init__(self, task, stats, rq)
1680         self.noexec = noexec
1681
1682 class runQueueTaskFailed(runQueueEvent):
1683     """
1684     Event notifing a task failed
1685     """
1686     def __init__(self, task, stats, exitcode, rq):
1687         runQueueEvent.__init__(self, task, stats, rq)
1688         self.exitcode = exitcode
1689
1690 class sceneQueueTaskFailed(runQueueTaskFailed):
1691     """
1692     Event notifing a setscene task failed
1693     """
1694     def __init__(self, task, stats, exitcode, rq):
1695         runQueueTaskFailed.__init__(self, task, stats, exitcode, rq)
1696         self.taskstring = rq.rqdata.get_user_idstring(task, "_setscene")
1697
1698 class runQueueTaskCompleted(runQueueEvent):
1699     """
1700     Event notifing a task completed
1701     """
1702
1703 def check_stamp_fn(fn, taskname, d):
1704     rqexe = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)
1705     fn = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", d)
1706     fnid = rqexe.rqdata.taskData.getfn_id(fn)
1707     taskid = rqexe.rqdata.get_task_id(fnid, taskname)
1708     if taskid is not None:
1709         return rqexe.rq.check_stamp_task(taskid)
1710     return None
1711
1712 class runQueuePipe():
1713     """
1714     Abstraction for a pipe between a worker thread and the server
1715     """
1716     def __init__(self, pipein, pipeout, d):
1717         self.input = pipein
1718         pipeout.close()
1719         fcntl.fcntl(self.input, fcntl.F_SETFL, fcntl.fcntl(self.input, fcntl.F_GETFL) | os.O_NONBLOCK)
1720         self.queue = ""
1721         self.d = d
1722
1723     def read(self):
1724         start = len(self.queue)
1725         try:
1726             self.queue = self.queue + self.input.read(102400)
1727         except (OSError, IOError):
1728             pass
1729         end = len(self.queue)
1730         index = self.queue.find("</event>")
1731         while index != -1:
1732             bb.event.fire_from_worker(self.queue[:index+8], self.d)
1733             self.queue = self.queue[index+8:]
1734             index = self.queue.find("</event>")
1735         return (end > start)
1736
1737     def close(self):
1738         while self.read():
1739             continue
1740         if len(self.queue) > 0:
1741             print("Warning, worker left partial message: %s" % self.queue)
1742         self.input.close()