runqueue: simplify fakeroot environment handling
[bitbake.git] / lib / bb / runqueue.py
1 #!/usr/bin/env python
2 # ex:ts=4:sw=4:sts=4:et
3 # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4 """
5 BitBake 'RunQueue' implementation
6
7 Handles preparation and execution of a queue of tasks
8 """
9
10 # Copyright (C) 2006-2007  Richard Purdie
11 #
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License version 2 as
14 # published by the Free Software Foundation.
15 #
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 # GNU General Public License for more details.
20 #
21 # You should have received a copy of the GNU General Public License along
22 # with this program; if not, write to the Free Software Foundation, Inc.,
23 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25 import copy
26 import os
27 import sys
28 import signal
29 import stat
30 import fcntl
31 import logging
32 import bb
33 from bb import msg, data, event
34
35 bblogger = logging.getLogger("BitBake")
36 logger = logging.getLogger("BitBake.RunQueue")
37
38 class RunQueueStats:
39     """
40     Holds statistics on the tasks handled by the associated runQueue
41     """
42     def __init__(self, total):
43         self.completed = 0
44         self.skipped = 0
45         self.failed = 0
46         self.active = 0
47         self.total = total
48
49     def copy(self):
50         obj = self.__class__(self.total)
51         obj.__dict__.update(self.__dict__)
52         return obj
53
54     def taskFailed(self):
55         self.active = self.active - 1
56         self.failed = self.failed + 1
57
58     def taskCompleted(self, number = 1):
59         self.active = self.active - number
60         self.completed = self.completed + number
61
62     def taskSkipped(self, number = 1):
63         self.active = self.active + number
64         self.skipped = self.skipped + number
65
66     def taskActive(self):
67         self.active = self.active + 1
68
69 # These values indicate the next step due to be run in the
70 # runQueue state machine
71 runQueuePrepare = 2
72 runQueueSceneInit = 3
73 runQueueSceneRun = 4
74 runQueueRunInit = 5
75 runQueueRunning = 6
76 runQueueFailed = 7
77 runQueueCleanUp = 8
78 runQueueComplete = 9
79 runQueueChildProcess = 10
80
81 class RunQueueScheduler(object):
82     """
83     Control the order tasks are scheduled in.
84     """
85     name = "basic"
86
87     def __init__(self, runqueue, rqdata):
88         """
89         The default scheduler just returns the first buildable task (the
90         priority map is sorted by task numer)
91         """
92         self.rq = runqueue
93         self.rqdata = rqdata
94         numTasks = len(self.rqdata.runq_fnid)
95
96         self.prio_map = []
97         self.prio_map.extend(range(numTasks))
98
99     def next_buildable_task(self):
100         """
101         Return the id of the first task we find that is buildable
102         """
103         for tasknum in xrange(len(self.rqdata.runq_fnid)):
104             taskid = self.prio_map[tasknum]
105             if self.rq.runq_running[taskid] == 1:
106                 continue
107             if self.rq.runq_buildable[taskid] == 1:
108                 return taskid
109
110     def next(self):
111         """
112         Return the id of the task we should build next
113         """
114         if self.rq.stats.active < self.rq.number_tasks:
115             return self.next_buildable_task()
116
117 class RunQueueSchedulerSpeed(RunQueueScheduler):
118     """
119     A scheduler optimised for speed. The priority map is sorted by task weight,
120     heavier weighted tasks (tasks needed by the most other tasks) are run first.
121     """
122     name = "speed"
123
124     def __init__(self, runqueue, rqdata):
125         """
126         The priority map is sorted by task weight.
127         """
128
129         self.rq = runqueue
130         self.rqdata = rqdata
131
132         sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight))
133         copyweight = copy.deepcopy(self.rqdata.runq_weight)
134         self.prio_map = []
135
136         for weight in sortweight:
137             idx = copyweight.index(weight)
138             self.prio_map.append(idx)
139             copyweight[idx] = -1
140
141         self.prio_map.reverse()
142
143 class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
144     """
145     A scheduler optimised to complete .bb files are quickly as possible. The
146     priority map is sorted by task weight, but then reordered so once a given
147     .bb file starts to build, its completed as quickly as possible. This works
148     well where disk space is at a premium and classes like OE's rm_work are in
149     force.
150     """
151     name = "completion"
152
153     def __init__(self, runqueue, rqdata):
154         RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata)
155
156         #FIXME - whilst this groups all fnids together it does not reorder the
157         #fnid groups optimally.
158
159         basemap = copy.deepcopy(self.prio_map)
160         self.prio_map = []
161         while (len(basemap) > 0):
162             entry = basemap.pop(0)
163             self.prio_map.append(entry)
164             fnid = self.rqdata.runq_fnid[entry]
165             todel = []
166             for entry in basemap:
167                 entry_fnid = self.rqdata.runq_fnid[entry]
168                 if entry_fnid == fnid:
169                     todel.append(basemap.index(entry))
170                     self.prio_map.append(entry)
171             todel.reverse()
172             for idx in todel:
173                 del basemap[idx]
174
175 class RunQueueData:
176     """
177     BitBake Run Queue implementation
178     """
179     def __init__(self, rq, cooker, cfgData, dataCache, taskData, targets):
180         self.cooker = cooker
181         self.dataCache = dataCache
182         self.taskData = taskData
183         self.targets = targets
184         self.rq = rq
185
186         self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or ""
187         self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split()
188
189         self.reset()
190
191     def reset(self):
192         self.runq_fnid = []
193         self.runq_task = []
194         self.runq_depends = []
195         self.runq_revdeps = []
196         self.runq_hash = []
197
198     def runq_depends_names(self, ids):
199         import re
200         ret = []
201         for id in self.runq_depends[ids]:
202             nam = os.path.basename(self.get_user_idstring(id))
203             nam = re.sub("_[^,]*,", ",", nam)
204             ret.extend([nam])
205         return ret
206
207     def get_user_idstring(self, task):
208         fn = self.taskData.fn_index[self.runq_fnid[task]]
209         taskname = self.runq_task[task]
210         return "%s, %s" % (fn, taskname)
211
212     def get_task_id(self, fnid, taskname):
213         for listid in xrange(len(self.runq_fnid)):
214             if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname:
215                 return listid
216         return None
217
218     def circular_depchains_handler(self, tasks):
219         """
220         Some tasks aren't buildable, likely due to circular dependency issues.
221         Identify the circular dependencies and print them in a user readable format.
222         """
223         from copy import deepcopy
224
225         valid_chains = []
226         explored_deps = {}
227         msgs = []
228
229         def chain_reorder(chain):
230             """
231             Reorder a dependency chain so the lowest task id is first
232             """
233             lowest = 0
234             new_chain = []
235             for entry in xrange(len(chain)):
236                 if chain[entry] < chain[lowest]:
237                     lowest = entry
238             new_chain.extend(chain[lowest:])
239             new_chain.extend(chain[:lowest])
240             return new_chain
241
242         def chain_compare_equal(chain1, chain2):
243             """
244             Compare two dependency chains and see if they're the same
245             """
246             if len(chain1) != len(chain2):
247                 return False
248             for index in xrange(len(chain1)):
249                 if chain1[index] != chain2[index]:
250                     return False
251             return True
252
253         def chain_array_contains(chain, chain_array):
254             """
255             Return True if chain_array contains chain
256             """
257             for ch in chain_array:
258                 if chain_compare_equal(ch, chain):
259                     return True
260             return False
261
262         def find_chains(taskid, prev_chain):
263             prev_chain.append(taskid)
264             total_deps = []
265             total_deps.extend(self.runq_revdeps[taskid])
266             for revdep in self.runq_revdeps[taskid]:
267                 if revdep in prev_chain:
268                     idx = prev_chain.index(revdep)
269                     # To prevent duplicates, reorder the chain to start with the lowest taskid
270                     # and search through an array of those we've already printed
271                     chain = prev_chain[idx:]
272                     new_chain = chain_reorder(chain)
273                     if not chain_array_contains(new_chain, valid_chains):
274                         valid_chains.append(new_chain)
275                         msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
276                         for dep in new_chain:
277                             msgs.append("  Task %s (%s) (dependent Tasks %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends_names(dep)))
278                         msgs.append("\n")
279                     if len(valid_chains) > 10:
280                         msgs.append("Aborted dependency loops search after 10 matches.\n")
281                         return msgs
282                     continue
283                 scan = False
284                 if revdep not in explored_deps:
285                     scan = True
286                 elif revdep in explored_deps[revdep]:
287                     scan = True
288                 else:
289                     for dep in prev_chain:
290                         if dep in explored_deps[revdep]:
291                             scan = True
292                 if scan:
293                     find_chains(revdep, copy.deepcopy(prev_chain))
294                 for dep in explored_deps[revdep]:
295                     if dep not in total_deps:
296                         total_deps.append(dep)
297
298             explored_deps[taskid] = total_deps
299
300         for task in tasks:
301             find_chains(task, [])
302
303         return msgs
304
305     def calculate_task_weights(self, endpoints):
306         """
307         Calculate a number representing the "weight" of each task. Heavier weighted tasks
308         have more dependencies and hence should be executed sooner for maximum speed.
309
310         This function also sanity checks the task list finding tasks that are not
311         possible to execute due to circular dependencies.
312         """
313
314         numTasks = len(self.runq_fnid)
315         weight = []
316         deps_left = []
317         task_done = []
318
319         for listid in xrange(numTasks):
320             task_done.append(False)
321             weight.append(0)
322             deps_left.append(len(self.runq_revdeps[listid]))
323
324         for listid in endpoints:
325             weight[listid] = 1
326             task_done[listid] = True
327
328         while True:
329             next_points = []
330             for listid in endpoints:
331                 for revdep in self.runq_depends[listid]:
332                     weight[revdep] = weight[revdep] + weight[listid]
333                     deps_left[revdep] = deps_left[revdep] - 1
334                     if deps_left[revdep] == 0:
335                         next_points.append(revdep)
336                         task_done[revdep] = True
337             endpoints = next_points
338             if len(next_points) == 0:
339                 break
340
341         # Circular dependency sanity check
342         problem_tasks = []
343         for task in xrange(numTasks):
344             if task_done[task] is False or deps_left[task] != 0:
345                 problem_tasks.append(task)
346                 logger.debug(2, "Task %s (%s) is not buildable", task, self.get_user_idstring(task))
347                 logger.debug(2, "(Complete marker was %s and the remaining dependency count was %s)\n", task_done[task], deps_left[task])
348
349         if problem_tasks:
350             message = "Unbuildable tasks were found.\n"
351             message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
352             message = message + "Identifying dependency loops (this may take a short while)...\n"
353             logger.error(message)
354
355             msgs = self.circular_depchains_handler(problem_tasks)
356
357             message = "\n"
358             for msg in msgs:
359                 message = message + msg
360             bb.msg.fatal(bb.msg.domain.RunQueue, message)
361
362         return weight
363
364     def prepare(self):
365         """
366         Turn a set of taskData into a RunQueue and compute data needed
367         to optimise the execution order.
368         """
369
370         runq_build = []
371         recursive_tdepends = {}
372         runq_recrdepends = []
373         tdepends_fnid = {}
374
375         taskData = self.taskData
376
377         if len(taskData.tasks_name) == 0:
378             # Nothing to do
379             return 0
380
381         logger.info("Preparing runqueue")
382
383         # Step A - Work out a list of tasks to run
384         #
385         # Taskdata gives us a list of possible providers for every build and run
386         # target ordered by priority. It also gives information on each of those
387         # providers.
388         #
389         # To create the actual list of tasks to execute we fix the list of
390         # providers and then resolve the dependencies into task IDs. This
391         # process is repeated for each type of dependency (tdepends, deptask,
392         # rdeptast, recrdeptask, idepends).
393
394         def add_build_dependencies(depids, tasknames, depends):
395             for depid in depids:
396                 # Won't be in build_targets if ASSUME_PROVIDED
397                 if depid not in taskData.build_targets:
398                     continue
399                 depdata = taskData.build_targets[depid][0]
400                 if depdata is None:
401                     continue
402                 dep = taskData.fn_index[depdata]
403                 for taskname in tasknames:
404                     taskid = taskData.gettask_id(dep, taskname, False)
405                     if taskid is not None:
406                         depends.append(taskid)
407
408         def add_runtime_dependencies(depids, tasknames, depends):
409             for depid in depids:
410                 if depid not in taskData.run_targets:
411                     continue
412                 depdata = taskData.run_targets[depid][0]
413                 if depdata is None:
414                     continue
415                 dep = taskData.fn_index[depdata]
416                 for taskname in tasknames:
417                     taskid = taskData.gettask_id(dep, taskname, False)
418                     if taskid is not None:
419                         depends.append(taskid)
420
421         for task in xrange(len(taskData.tasks_name)):
422             depends = []
423             recrdepends = []
424             fnid = taskData.tasks_fnid[task]
425             fn = taskData.fn_index[fnid]
426             task_deps = self.dataCache.task_deps[fn]
427
428             logger.debug(2, "Processing %s:%s", fn, taskData.tasks_name[task])
429
430             if fnid not in taskData.failed_fnids:
431
432                 # Resolve task internal dependencies
433                 #
434                 # e.g. addtask before X after Y
435                 depends = taskData.tasks_tdepends[task]
436
437                 # Resolve 'deptask' dependencies
438                 #
439                 # e.g. do_sometask[deptask] = "do_someothertask"
440                 # (makes sure sometask runs after someothertask of all DEPENDS)
441                 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
442                     tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
443                     add_build_dependencies(taskData.depids[fnid], tasknames, depends)
444
445                 # Resolve 'rdeptask' dependencies
446                 #
447                 # e.g. do_sometask[rdeptask] = "do_someothertask"
448                 # (makes sure sometask runs after someothertask of all RDEPENDS)
449                 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
450                     taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
451                     add_runtime_dependencies(taskData.rdepids[fnid], [taskname], depends)
452
453                 # Resolve inter-task dependencies
454                 #
455                 # e.g. do_sometask[depends] = "targetname:do_someothertask"
456                 # (makes sure sometask runs after targetname's someothertask)
457                 if fnid not in tdepends_fnid:
458                     tdepends_fnid[fnid] = set()
459                 idepends = taskData.tasks_idepends[task]
460                 for (depid, idependtask) in idepends:
461                     if depid in taskData.build_targets:
462                         # Won't be in build_targets if ASSUME_PROVIDED
463                         depdata = taskData.build_targets[depid][0]
464                         if depdata is not None:
465                             dep = taskData.fn_index[depdata]
466                             taskid = taskData.gettask_id(dep, idependtask, False)
467                             if taskid is None:
468                                 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s in %s depends upon nonexistant task %s in %s" % (taskData.tasks_name[task], fn, idependtask, dep))
469                             depends.append(taskid)
470                             if depdata != fnid:
471                                 tdepends_fnid[fnid].add(taskid)
472
473
474                 # Resolve recursive 'recrdeptask' dependencies (A)
475                 #
476                 # e.g. do_sometask[recrdeptask] = "do_someothertask"
477                 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
478                 # We cover the recursive part of the dependencies below
479                 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
480                     for taskname in task_deps['recrdeptask'][taskData.tasks_name[task]].split():
481                         recrdepends.append(taskname)
482                         add_build_dependencies(taskData.depids[fnid], [taskname], depends)
483                         add_runtime_dependencies(taskData.rdepids[fnid], [taskname], depends)
484
485                 # Rmove all self references
486                 if task in depends:
487                     newdep = []
488                     logger.debug(2, "Task %s (%s %s) contains self reference! %s", task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends)
489                     for dep in depends:
490                         if task != dep:
491                             newdep.append(dep)
492                     depends = newdep
493
494             self.runq_fnid.append(taskData.tasks_fnid[task])
495             self.runq_task.append(taskData.tasks_name[task])
496             self.runq_depends.append(set(depends))
497             self.runq_revdeps.append(set())
498             self.runq_hash.append("")
499
500             runq_build.append(0)
501             runq_recrdepends.append(recrdepends)
502
503         #
504         # Build a list of recursive cumulative dependencies for each fnid
505         # We do this by fnid, since if A depends on some task in B
506         # we're interested in later tasks B's fnid might have but B itself
507         # doesn't depend on
508         #
509         # Algorithm is O(tasks) + O(tasks)*O(fnids)
510         #
511         reccumdepends = {}
512         for task in xrange(len(self.runq_fnid)):
513             fnid = self.runq_fnid[task]
514             if fnid not in reccumdepends:
515                 if fnid in tdepends_fnid:
516                     reccumdepends[fnid] = tdepends_fnid[fnid]
517                 else:
518                     reccumdepends[fnid] = set()
519             reccumdepends[fnid].update(self.runq_depends[task])
520         for task in xrange(len(self.runq_fnid)):
521             taskfnid = self.runq_fnid[task]
522             for fnid in reccumdepends:
523                 if task in reccumdepends[fnid]:
524                     reccumdepends[fnid].add(task)
525                     if taskfnid in reccumdepends:
526                         reccumdepends[fnid].update(reccumdepends[taskfnid])
527
528
529         # Resolve recursive 'recrdeptask' dependencies (B)
530         #
531         # e.g. do_sometask[recrdeptask] = "do_someothertask"
532         # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
533         for task in xrange(len(self.runq_fnid)):
534             if len(runq_recrdepends[task]) > 0:
535                 taskfnid = self.runq_fnid[task]
536                 for dep in reccumdepends[taskfnid]:
537                     # Ignore self references
538                     if dep == task:
539                         continue
540                     for taskname in runq_recrdepends[task]:
541                         if taskData.tasks_name[dep] == taskname:
542                             self.runq_depends[task].add(dep)
543
544         # Step B - Mark all active tasks
545         #
546         # Start with the tasks we were asked to run and mark all dependencies
547         # as active too. If the task is to be 'forced', clear its stamp. Once
548         # all active tasks are marked, prune the ones we don't need.
549
550         logger.verbose("Marking Active Tasks")
551
552         def mark_active(listid, depth):
553             """
554             Mark an item as active along with its depends
555             (calls itself recursively)
556             """
557
558             if runq_build[listid] == 1:
559                 return
560
561             runq_build[listid] = 1
562
563             depends = self.runq_depends[listid]
564             for depend in depends:
565                 mark_active(depend, depth+1)
566
567         self.target_pairs = []
568         for target in self.targets:
569             targetid = taskData.getbuild_id(target[0])
570
571             if targetid not in taskData.build_targets:
572                 continue
573
574             if targetid in taskData.failed_deps:
575                 continue
576
577             fnid = taskData.build_targets[targetid][0]
578             fn = taskData.fn_index[fnid]
579             self.target_pairs.append((fn, target[1]))
580
581             if fnid in taskData.failed_fnids:
582                 continue
583
584             if target[1] not in taskData.tasks_lookup[fnid]:
585                 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s does not exist for target %s" % (target[1], target[0]))
586
587             listid = taskData.tasks_lookup[fnid][target[1]]
588
589             mark_active(listid, 1)
590
591         # Step C - Prune all inactive tasks
592         #
593         # Once all active tasks are marked, prune the ones we don't need.
594
595         maps = []
596         delcount = 0
597         for listid in xrange(len(self.runq_fnid)):
598             if runq_build[listid-delcount] == 1:
599                 maps.append(listid-delcount)
600             else:
601                 del self.runq_fnid[listid-delcount]
602                 del self.runq_task[listid-delcount]
603                 del self.runq_depends[listid-delcount]
604                 del runq_build[listid-delcount]
605                 del self.runq_revdeps[listid-delcount]
606                 del self.runq_hash[listid-delcount]
607                 delcount = delcount + 1
608                 maps.append(-1)
609
610         #
611         # Step D - Sanity checks and computation
612         #
613
614         # Check to make sure we still have tasks to run
615         if len(self.runq_fnid) == 0:
616             if not taskData.abort:
617                 bb.msg.fatal(bb.msg.domain.RunQueue, "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
618             else:
619                 bb.msg.fatal(bb.msg.domain.RunQueue, "No active tasks and not in --continue mode?! Please report this bug.")
620
621         logger.verbose("Pruned %s inactive tasks, %s left", delcount, len(self.runq_fnid))
622
623         # Remap the dependencies to account for the deleted tasks
624         # Check we didn't delete a task we depend on
625         for listid in xrange(len(self.runq_fnid)):
626             newdeps = []
627             origdeps = self.runq_depends[listid]
628             for origdep in origdeps:
629                 if maps[origdep] == -1:
630                     bb.msg.fatal(bb.msg.domain.RunQueue, "Invalid mapping - Should never happen!")
631                 newdeps.append(maps[origdep])
632             self.runq_depends[listid] = set(newdeps)
633
634         logger.verbose("Assign Weightings")
635
636         # Generate a list of reverse dependencies to ease future calculations
637         for listid in xrange(len(self.runq_fnid)):
638             for dep in self.runq_depends[listid]:
639                 self.runq_revdeps[dep].add(listid)
640
641         # Identify tasks at the end of dependency chains
642         # Error on circular dependency loops (length two)
643         endpoints = []
644         for listid in xrange(len(self.runq_fnid)):
645             revdeps = self.runq_revdeps[listid]
646             if len(revdeps) == 0:
647                 endpoints.append(listid)
648             for dep in revdeps:
649                 if dep in self.runq_depends[listid]:
650                     #self.dump_data(taskData)
651                     bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep], taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
652
653         logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
654
655         # Calculate task weights
656         # Check of higher length circular dependencies
657         self.runq_weight = self.calculate_task_weights(endpoints)
658
659         # Sanity Check - Check for multiple tasks building the same provider
660         prov_list = {}
661         seen_fn = []
662         for task in xrange(len(self.runq_fnid)):
663             fn = taskData.fn_index[self.runq_fnid[task]]
664             if fn in seen_fn:
665                 continue
666             seen_fn.append(fn)
667             for prov in self.dataCache.fn_provides[fn]:
668                 if prov not in prov_list:
669                     prov_list[prov] = [fn]
670                 elif fn not in prov_list[prov]:
671                     prov_list[prov].append(fn)
672         error = False
673         for prov in prov_list:
674             if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
675                 error = True
676                 logger.error("Multiple .bb files are due to be built which each provide %s (%s).\n This usually means one provides something the other doesn't and should.", prov, " ".join(prov_list[prov]))
677
678
679         # Create a whitelist usable by the stamp checks
680         stampfnwhitelist = []
681         for entry in self.stampwhitelist.split():
682             entryid = self.taskData.getbuild_id(entry)
683             if entryid not in self.taskData.build_targets:
684                 continue
685             fnid = self.taskData.build_targets[entryid][0]
686             fn = self.taskData.fn_index[fnid]
687             stampfnwhitelist.append(fn)
688         self.stampfnwhitelist = stampfnwhitelist
689
690         # Interate over the task list looking for tasks with a 'setscene' function
691         self.runq_setscene = []
692         for task in range(len(self.runq_fnid)):
693             setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
694             if not setscene:
695                 continue
696             self.runq_setscene.append(task)
697
698         # Interate over the task list and call into the siggen code
699         dealtwith = set()
700         todeal = set(range(len(self.runq_fnid)))
701         while len(todeal) > 0:
702             for task in todeal.copy():
703                 if len(self.runq_depends[task] - dealtwith) == 0:
704                     dealtwith.add(task)
705                     todeal.remove(task)
706                     procdep = []
707                     for dep in self.runq_depends[task]:
708                         procdep.append(self.taskData.fn_index[self.runq_fnid[dep]] + "." + self.runq_task[dep])
709                     self.runq_hash[task] = bb.parse.siggen.get_taskhash(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task], procdep, self.dataCache)
710
711         self.hashes = {}
712         self.hash_deps = {}
713         for task in xrange(len(self.runq_fnid)):
714             identifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[task]],
715                                     self.runq_task[task])
716             self.hashes[identifier] = self.runq_hash[task]
717             deps = []
718             for dep in self.runq_depends[task]:
719                 depidentifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[dep]],
720                                            self.runq_task[dep])
721                 deps.append(depidentifier)
722             self.hash_deps[identifier] = deps
723
724         # Remove stamps for targets if force mode active
725         if self.cooker.configuration.force:
726             for (fn, target) in self.target_pairs:
727                 logger.verbose("Remove stamp %s, %s", target, fn)
728                 bb.build.del_stamp(target, self.dataCache, fn)
729
730         return len(self.runq_fnid)
731
732     def dump_data(self, taskQueue):
733         """
734         Dump some debug information on the internal data structures
735         """
736         logger.debug(3, "run_tasks:")
737         for task in xrange(len(self.rqdata.runq_task)):
738             logger.debug(3, " (%s)%s - %s: %s   Deps %s RevDeps %s", task,
739                          taskQueue.fn_index[self.rqdata.runq_fnid[task]],
740                          self.rqdata.runq_task[task],
741                          self.rqdata.runq_weight[task],
742                          self.rqdata.runq_depends[task],
743                          self.rqdata.runq_revdeps[task])
744
745         logger.debug(3, "sorted_tasks:")
746         for task1 in xrange(len(self.rqdata.runq_task)):
747             if task1 in self.prio_map:
748                 task = self.prio_map[task1]
749                 logger.debug(3, " (%s)%s - %s: %s   Deps %s RevDeps %s", task,
750                            taskQueue.fn_index[self.rqdata.runq_fnid[task]],
751                            self.rqdata.runq_task[task],
752                            self.rqdata.runq_weight[task],
753                            self.rqdata.runq_depends[task],
754                            self.rqdata.runq_revdeps[task])
755
756 class RunQueue:
757     def __init__(self, cooker, cfgData, dataCache, taskData, targets):
758
759         self.cooker = cooker
760         self.cfgData = cfgData
761         self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets)
762
763         self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, True) or "perfile"
764         self.hashvalidate = bb.data.getVar("BB_HASHCHECK_FUNCTION", cfgData, True) or None
765
766         self.state = runQueuePrepare
767
768     def check_stamps(self):
769         unchecked = {}
770         current = []
771         notcurrent = []
772         buildable = []
773
774         if self.stamppolicy == "perfile":
775             fulldeptree = False
776         else:
777             fulldeptree = True
778             stampwhitelist = []
779             if self.stamppolicy == "whitelist":
780                 stampwhitelist = self.rqdata.stampfnwhitelist
781
782         for task in xrange(len(self.rqdata.runq_fnid)):
783             unchecked[task] = ""
784             if len(self.rqdata.runq_depends[task]) == 0:
785                 buildable.append(task)
786
787         def check_buildable(self, task, buildable):
788             for revdep in self.rqdata.runq_revdeps[task]:
789                 alldeps = 1
790                 for dep in self.rqdata.runq_depends[revdep]:
791                     if dep in unchecked:
792                         alldeps = 0
793                 if alldeps == 1:
794                     if revdep in unchecked:
795                         buildable.append(revdep)
796
797         for task in xrange(len(self.rqdata.runq_fnid)):
798             if task not in unchecked:
799                 continue
800             fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
801             taskname = self.rqdata.runq_task[task]
802             stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
803             # If the stamp is missing its not current
804             if not os.access(stampfile, os.F_OK):
805                 del unchecked[task]
806                 notcurrent.append(task)
807                 check_buildable(self, task, buildable)
808                 continue
809             # If its a 'nostamp' task, it's not current
810             taskdep = self.rqdata.dataCache.task_deps[fn]
811             if 'nostamp' in taskdep and task in taskdep['nostamp']:
812                 del unchecked[task]
813                 notcurrent.append(task)
814                 check_buildable(self, task, buildable)
815                 continue
816
817         while (len(buildable) > 0):
818             nextbuildable = []
819             for task in buildable:
820                 if task in unchecked:
821                     fn = self.taskData.fn_index[self.rqdata.runq_fnid[task]]
822                     taskname = self.rqdata.runq_task[task]
823                     stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
824                     iscurrent = True
825
826                     t1 = os.stat(stampfile)[stat.ST_MTIME]
827                     for dep in self.rqdata.runq_depends[task]:
828                         if iscurrent:
829                             fn2 = self.taskData.fn_index[self.rqdata.runq_fnid[dep]]
830                             taskname2 = self.rqdata.runq_task[dep]
831                             stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
832                             if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
833                                 if dep in notcurrent:
834                                     iscurrent = False
835                                 else:
836                                     t2 = os.stat(stampfile2)[stat.ST_MTIME]
837                                     if t1 < t2:
838                                         iscurrent = False
839                     del unchecked[task]
840                     if iscurrent:
841                         current.append(task)
842                     else:
843                         notcurrent.append(task)
844
845                 check_buildable(self, task, nextbuildable)
846
847             buildable = nextbuildable
848
849         #for task in range(len(self.runq_fnid)):
850         #    fn = self.taskData.fn_index[self.runq_fnid[task]]
851         #    taskname = self.runq_task[task]
852         #    print "%s %s.%s" % (task, taskname, fn)
853
854         #print "Unchecked: %s" % unchecked
855         #print "Current: %s" % current
856         #print "Not current: %s" % notcurrent
857
858         if len(unchecked) > 0:
859             bb.msg.fatal(bb.msg.domain.RunQueue, "check_stamps fatal internal error")
860         return current
861
862     def check_stamp_task(self, task, taskname = None):
863         def get_timestamp(f):
864             try:
865                 if not os.access(f, os.F_OK):
866                     return None
867                 return os.stat(f)[stat.ST_MTIME]
868             except:
869                 return None
870
871         if self.stamppolicy == "perfile":
872             fulldeptree = False
873         else:
874             fulldeptree = True
875             stampwhitelist = []
876             if self.stamppolicy == "whitelist":
877                 stampwhitelist = self.rqdata.stampfnwhitelist
878
879         fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
880         if taskname is None:
881             taskname = self.rqdata.runq_task[task]
882
883         stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
884
885         # If the stamp is missing its not current
886         if not os.access(stampfile, os.F_OK):
887             logger.debug(2, "Stampfile %s not available", stampfile)
888             return False
889         # If its a 'nostamp' task, it's not current
890         taskdep = self.rqdata.dataCache.task_deps[fn]
891         if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
892             logger.debug(2, "%s.%s is nostamp\n", fn, taskname)
893             return False
894
895         if taskname != "do_setscene" and taskname.endswith("_setscene"):
896             return True
897
898         iscurrent = True
899         t1 = get_timestamp(stampfile)
900         for dep in self.rqdata.runq_depends[task]:
901             if iscurrent:
902                 fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]]
903                 taskname2 = self.rqdata.runq_task[dep]
904                 stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
905                 stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCache, fn2)
906                 t2 = get_timestamp(stampfile2)
907                 t3 = get_timestamp(stampfile3)
908                 if t3 and t3 > t2:
909                    continue
910                 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
911                     if not t2:
912                         logger.debug(2, 'Stampfile %s does not exist', stampfile2)
913                         iscurrent = False
914                     if t1 < t2:
915                         logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2)
916                         iscurrent = False
917
918         return iscurrent
919
920     def execute_runqueue(self):
921         """
922         Run the tasks in a queue prepared by rqdata.prepare()
923         Upon failure, optionally try to recover the build using any alternate providers
924         (if the abort on failure configuration option isn't set)
925         """
926
927         retval = 0.5
928
929         if self.state is runQueuePrepare:
930             self.rqexe = RunQueueExecuteDummy(self)
931             if self.rqdata.prepare() == 0:
932                 self.state = runQueueComplete
933             else:
934                 self.state = runQueueSceneInit
935
936         if self.state is runQueueSceneInit:
937             if self.cooker.configuration.dump_signatures:
938                 self.dump_signatures()
939             else:
940                 self.rqexe = RunQueueExecuteScenequeue(self)
941
942         if self.state is runQueueSceneRun:
943             retval = self.rqexe.execute()
944
945         if self.state is runQueueRunInit:
946             logger.info("Executing RunQueue Tasks")
947             self.rqexe = RunQueueExecuteTasks(self)
948             self.state = runQueueRunning
949
950         if self.state is runQueueRunning:
951             retval = self.rqexe.execute()
952
953         if self.state is runQueueCleanUp:
954            self.rqexe.finish()
955
956         if self.state is runQueueFailed:
957             if not self.rqdata.taskData.tryaltconfigs:
958                 raise bb.runqueue.TaskFailure(self.rqexe.failed_fnids)
959             for fnid in self.rqexe.failed_fnids:
960                 self.rqdata.taskData.fail_fnid(fnid)
961             self.rqdata.reset()
962
963         if self.state is runQueueComplete:
964             # All done
965             logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
966             return False
967
968         if self.state is runQueueChildProcess:
969             print("Child process, eeek, shouldn't happen!")
970             return False
971
972         # Loop
973         return retval
974
975     def finish_runqueue(self, now = False):
976         if now:
977             self.rqexe.finish_now()
978         else:
979             self.rqexe.finish()
980
981     def dump_signatures(self):
982         self.state = runQueueComplete
983         done = set()
984         bb.note("Reparsing files to collect dependency data")
985         for task in range(len(self.rqdata.runq_fnid)):
986             if self.rqdata.runq_fnid[task] not in done:
987                 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
988                 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
989                 done.add(self.rqdata.runq_fnid[task])
990
991         bb.parse.siggen.dump_sigs(self.rqdata.dataCache)
992
993         return
994
995
996 class RunQueueExecute:
997
998     def __init__(self, rq):
999         self.rq = rq
1000         self.cooker = rq.cooker
1001         self.cfgData = rq.cfgData
1002         self.rqdata = rq.rqdata
1003
1004         self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", self.cfgData, 1) or 1)
1005         self.scheduler = bb.data.getVar("BB_SCHEDULER", self.cfgData, 1) or "speed"
1006
1007         self.runq_buildable = []
1008         self.runq_running = []
1009         self.runq_complete = []
1010         self.build_pids = {}
1011         self.build_pipes = {}
1012         self.failed_fnids = []
1013
1014     def runqueue_process_waitpid(self):
1015         """
1016         Return none is there are no processes awaiting result collection, otherwise
1017         collect the process exit codes and close the information pipe.
1018         """
1019         result = os.waitpid(-1, os.WNOHANG)
1020         if result[0] == 0 and result[1] == 0:
1021             return None
1022         task = self.build_pids[result[0]]
1023         del self.build_pids[result[0]]
1024         self.build_pipes[result[0]].close()
1025         del self.build_pipes[result[0]]
1026         if result[1] != 0:
1027             self.task_fail(task, result[1]>>8)
1028         else:
1029             self.task_complete(task)
1030
1031     def finish_now(self):
1032         if self.stats.active:
1033             logger.info("Sending SIGTERM to remaining %s tasks", self.stats.active)
1034             for k, v in self.build_pids.iteritems():
1035                 try:
1036                     os.kill(-k, signal.SIGTERM)
1037                 except:
1038                     pass
1039         for pipe in self.build_pipes:
1040             self.build_pipes[pipe].read()
1041
1042     def finish(self):
1043         self.rq.state = runQueueCleanUp
1044
1045         for pipe in self.build_pipes:
1046             self.build_pipes[pipe].read()
1047
1048         if self.stats.active > 0:
1049             bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1050             self.runqueue_process_waitpid()
1051             return
1052
1053         if len(self.failed_fnids) != 0:
1054             self.rq.state = runQueueFailed
1055             return
1056
1057         self.rq.state = runQueueComplete
1058         return
1059
1060     def fork_off_task(self, fn, task, taskname, quieterrors=False):
1061         # We need to setup the environment BEFORE the fork, since
1062         # a fork() or exec*() activates PSEUDO...
1063
1064         envbackup = {}
1065
1066         taskdep = self.rqdata.dataCache.task_deps[fn]
1067         if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
1068             envvars = (self.rqdata.dataCache.fakerootenv[fn] or "").split()
1069             for key, value in (var.split('=') for var in envvars):
1070                 envbackup[key] = os.environ.get(key)
1071                 os.environ[key] = value
1072
1073             fakedirs = (self.rqdata.dataCache.fakerootdirs[fn] or "").split()
1074             for p in fakedirs:
1075                 bb.utils.mkdirhier(p)
1076
1077             logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
1078                             (fn, taskname, ', '.join(fakedirs)))
1079
1080         sys.stdout.flush()
1081         sys.stderr.flush()
1082         try:
1083             pipein, pipeout = os.pipe()
1084             pipein = os.fdopen(pipein, 'rb', 4096)
1085             pipeout = os.fdopen(pipeout, 'wb', 0)
1086             pid = os.fork()
1087         except OSError as e:
1088             bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
1089
1090         if pid == 0:
1091             pipein.close()
1092
1093             # Save out the PID so that the event can include it the
1094             # events
1095             bb.event.worker_pid = os.getpid()
1096             bb.event.worker_pipe = pipeout
1097
1098             self.rq.state = runQueueChildProcess
1099             # Make the child the process group leader
1100             os.setpgid(0, 0)
1101             # No stdin
1102             newsi = os.open(os.devnull, os.O_RDWR)
1103             os.dup2(newsi, sys.stdin.fileno())
1104
1105             bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
1106             bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data)
1107             bb.parse.siggen.set_taskdata(self.rqdata.hashes, self.rqdata.hash_deps)
1108             ret = 0
1109             try:
1110                 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
1111                 the_data.setVar('BB_TASKHASH', self.rqdata.runq_hash[task])
1112                 os.environ.update(bb.data.exported_vars(the_data))
1113             except Exception as exc:
1114                 if not quieterrors:
1115                     logger.critical(str(exc))
1116                 os._exit(1)
1117             try:
1118                 ret = bb.build.exec_task(fn, taskname, the_data)
1119                 os._exit(ret)
1120             except:
1121                 os._exit(1)
1122         else:
1123             for key, value in envbackup.iteritems():
1124                 if value is None:
1125                     del os.environ[key]
1126                 else:
1127                     os.environ[key] = value
1128
1129         return pid, pipein, pipeout
1130
1131 class RunQueueExecuteDummy(RunQueueExecute):
1132     def __init__(self, rq):
1133         self.rq = rq
1134         self.stats = RunQueueStats(0)
1135
1136     def finish(self):
1137         self.rq.state = runQueueComplete
1138         return
1139
1140 class RunQueueExecuteTasks(RunQueueExecute):
1141     def __init__(self, rq):
1142         RunQueueExecute.__init__(self, rq)
1143
1144         self.stats = RunQueueStats(len(self.rqdata.runq_fnid))
1145
1146         # Mark initial buildable tasks
1147         for task in xrange(self.stats.total):
1148             self.runq_running.append(0)
1149             self.runq_complete.append(0)
1150             if len(self.rqdata.runq_depends[task]) == 0:
1151                 self.runq_buildable.append(1)
1152             else:
1153                 self.runq_buildable.append(0)
1154             if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
1155                 self.rq.scenequeue_covered.add(task)
1156
1157         found = True
1158         while found:
1159             found = False
1160             for task in xrange(self.stats.total):
1161                 if task in self.rq.scenequeue_covered:
1162                     continue
1163                 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
1164                     self.rq.scenequeue_covered.add(task)
1165                     found = True
1166
1167         logger.debug(1, 'Full skip list %s', self.rq.scenequeue_covered)
1168
1169         for task in self.rq.scenequeue_covered:
1170             self.task_skip(task)
1171
1172         event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
1173
1174         schedulers = self.get_schedulers()
1175         for scheduler in schedulers:
1176             if self.scheduler == scheduler.name:
1177                 self.sched = scheduler(self, self.rqdata)
1178                 logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name)
1179                 break
1180         else:
1181             bb.fatal("Invalid scheduler '%s'.  Available schedulers: %s" %
1182                      (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1183
1184
1185     def get_schedulers(self):
1186         schedulers = set(obj for obj in globals().values()
1187                              if type(obj) is type and
1188                                 issubclass(obj, RunQueueScheduler))
1189
1190         user_schedulers = bb.data.getVar("BB_SCHEDULERS", self.cfgData, True)
1191         if user_schedulers:
1192             for sched in user_schedulers.split():
1193                 if not "." in sched:
1194                     bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
1195                     continue
1196
1197                 modname, name = sched.rsplit(".", 1)
1198                 try:
1199                     module = __import__(modname, fromlist=(name,))
1200                 except ImportError, exc:
1201                     logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
1202                     raise SystemExit(1)
1203                 else:
1204                     schedulers.add(getattr(module, name))
1205         return schedulers
1206
1207     def task_completeoutright(self, task):
1208         """
1209         Mark a task as completed
1210         Look at the reverse dependencies and mark any task with
1211         completed dependencies as buildable
1212         """
1213         self.runq_complete[task] = 1
1214         for revdep in self.rqdata.runq_revdeps[task]:
1215             if self.runq_running[revdep] == 1:
1216                 continue
1217             if self.runq_buildable[revdep] == 1:
1218                 continue
1219             alldeps = 1
1220             for dep in self.rqdata.runq_depends[revdep]:
1221                 if self.runq_complete[dep] != 1:
1222                     alldeps = 0
1223             if alldeps == 1:
1224                 self.runq_buildable[revdep] = 1
1225                 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
1226                 taskname = self.rqdata.runq_task[revdep]
1227                 logger.debug(1, "Marking task %s (%s, %s) as buildable", revdep, fn, taskname)
1228
1229     def task_complete(self, task):
1230         self.stats.taskCompleted()
1231         bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1232         self.task_completeoutright(task)
1233
1234     def task_fail(self, task, exitcode):
1235         """
1236         Called when a task has failed
1237         Updates the state engine with the failure
1238         """
1239         self.stats.taskFailed()
1240         fnid = self.rqdata.runq_fnid[task]
1241         self.failed_fnids.append(fnid)
1242         bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq), self.cfgData)
1243         if self.rqdata.taskData.abort:
1244             self.rq.state = runQueueCleanUp
1245
1246     def task_skip(self, task):
1247         self.runq_running[task] = 1
1248         self.runq_buildable[task] = 1
1249         self.task_completeoutright(task)
1250         self.stats.taskCompleted()
1251         self.stats.taskSkipped()
1252
1253     def execute(self):
1254         """
1255         Run the tasks in a queue prepared by rqdata.prepare()
1256         """
1257
1258         if self.stats.total == 0:
1259             # nothing to do
1260             self.rq.state = runQueueCleanUp
1261
1262         task = self.sched.next()
1263         if task is not None:
1264             fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1265
1266             taskname = self.rqdata.runq_task[task]
1267             if self.rq.check_stamp_task(task, taskname):
1268                 logger.debug(2, "Stamp current task %s (%s)", task,
1269                                 self.rqdata.get_user_idstring(task))
1270                 self.task_skip(task)
1271                 return True
1272             elif self.cooker.configuration.dry_run:
1273                 self.runq_running[task] = 1
1274                 self.runq_buildable[task] = 1
1275                 self.stats.taskActive()
1276                 self.task_complete(task)
1277                 return True
1278
1279             taskdep = self.rqdata.dataCache.task_deps[fn]
1280             if 'noexec' in taskdep and taskname in taskdep['noexec']:
1281                 startevent = runQueueTaskStarted(task, self.stats, self.rq,
1282                                                  noexec=True)
1283                 bb.event.fire(startevent, self.cfgData)
1284                 self.runq_running[task] = 1
1285                 self.stats.taskActive()
1286                 bb.build.make_stamp(taskname, self.rqdata.dataCache, fn)
1287                 self.task_complete(task)
1288                 return True
1289             else:
1290                 startevent = runQueueTaskStarted(task, self.stats, self.rq)
1291                 bb.event.fire(startevent, self.cfgData)
1292
1293             pid, pipein, pipeout = self.fork_off_task(fn, task, taskname)
1294
1295             self.build_pids[pid] = task
1296             self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
1297             self.runq_running[task] = 1
1298             self.stats.taskActive()
1299
1300         for pipe in self.build_pipes:
1301             self.build_pipes[pipe].read()
1302
1303         if self.stats.active > 0:
1304             if self.runqueue_process_waitpid() is None:
1305                 return 0.5
1306             return True
1307
1308         if len(self.failed_fnids) != 0:
1309             self.rq.state = runQueueFailed
1310             return True
1311
1312         # Sanity Checks
1313         for task in xrange(self.stats.total):
1314             if self.runq_buildable[task] == 0:
1315                 logger.error("Task %s never buildable!", task)
1316             if self.runq_running[task] == 0:
1317                 logger.error("Task %s never ran!", task)
1318             if self.runq_complete[task] == 0:
1319                 logger.error("Task %s never completed!", task)
1320         self.rq.state = runQueueComplete
1321         return True
1322
1323 class RunQueueExecuteScenequeue(RunQueueExecute):
1324     def __init__(self, rq):
1325         RunQueueExecute.__init__(self, rq)
1326
1327         self.scenequeue_covered = set()
1328         self.scenequeue_notcovered = set()
1329
1330         # If we don't have any setscene functions, skip this step
1331         if len(self.rqdata.runq_setscene) == 0:
1332             rq.scenequeue_covered = set()
1333             rq.state = runQueueRunInit
1334             return
1335
1336         self.stats = RunQueueStats(len(self.rqdata.runq_setscene))
1337
1338         endpoints = {}
1339         sq_revdeps = []
1340         sq_revdeps_new = []
1341         sq_revdeps_squash = []
1342
1343         # We need to construct a dependency graph for the setscene functions. Intermediate
1344         # dependencies between the setscene tasks only complicate the code. This code
1345         # therefore aims to collapse the huge runqueue dependency tree into a smaller one
1346         # only containing the setscene functions.
1347
1348         for task in xrange(self.stats.total):
1349             self.runq_running.append(0)
1350             self.runq_complete.append(0)
1351             self.runq_buildable.append(0)
1352
1353         for task in xrange(len(self.rqdata.runq_fnid)):
1354             sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task]))
1355             sq_revdeps_new.append(set())
1356             if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
1357                 endpoints[task] = None
1358
1359         for task in self.rqdata.runq_setscene:
1360             for dep in self.rqdata.runq_depends[task]:
1361                     endpoints[dep] = task
1362
1363         def process_endpoints(endpoints):
1364             newendpoints = {}
1365             for point, task in endpoints.items():
1366                 tasks = set()
1367                 if task:
1368                     tasks.add(task)
1369                 if sq_revdeps_new[point]:
1370                     tasks |= sq_revdeps_new[point]
1371                 sq_revdeps_new[point] = set()
1372                 for dep in self.rqdata.runq_depends[point]:
1373                     if point in sq_revdeps[dep]:
1374                         sq_revdeps[dep].remove(point)
1375                     if tasks:
1376                         sq_revdeps_new[dep] |= tasks
1377                     if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene:
1378                         newendpoints[dep] = task
1379             if len(newendpoints) != 0:
1380                 process_endpoints(newendpoints)
1381
1382         process_endpoints(endpoints)
1383
1384         for task in xrange(len(self.rqdata.runq_fnid)):
1385             if task in self.rqdata.runq_setscene:
1386                 deps = set()
1387                 for dep in sq_revdeps_new[task]:
1388                     deps.add(self.rqdata.runq_setscene.index(dep))
1389                 sq_revdeps_squash.append(deps)
1390             elif len(sq_revdeps_new[task]) != 0:
1391                 bb.msg.fatal(bb.msg.domain.RunQueue, "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
1392
1393         #for task in xrange(len(sq_revdeps_squash)):
1394         #    print "Task %s: %s.%s is %s " % (task, self.taskData.fn_index[self.runq_fnid[self.runq_setscene[task]]], self.runq_task[self.runq_setscene[task]] + "_setscene", sq_revdeps_squash[task])
1395
1396         self.sq_deps = []
1397         self.sq_revdeps = sq_revdeps_squash
1398         self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps)
1399
1400         for task in xrange(len(self.sq_revdeps)):
1401             self.sq_deps.append(set())
1402         for task in xrange(len(self.sq_revdeps)):
1403             for dep in self.sq_revdeps[task]:
1404                 self.sq_deps[dep].add(task)
1405
1406         for task in xrange(len(self.sq_revdeps)):
1407             if len(self.sq_revdeps[task]) == 0:
1408                 self.runq_buildable[task] = 1
1409
1410         if self.rq.hashvalidate:
1411             sq_hash = []
1412             sq_hashfn = []
1413             sq_fn = []
1414             sq_taskname = []
1415             sq_task = []
1416             noexec = []
1417             for task in xrange(len(self.sq_revdeps)):
1418                 realtask = self.rqdata.runq_setscene[task]
1419                 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1420                 taskname = self.rqdata.runq_task[realtask]
1421                 taskdep = self.rqdata.dataCache.task_deps[fn]
1422                 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1423                     noexec.append(task)
1424                     self.task_skip(task)
1425                     bb.build.make_stamp(taskname + "_setscene", self.rqdata.dataCache, fn)
1426                     continue
1427                 sq_fn.append(fn)
1428                 sq_hashfn.append(self.rqdata.dataCache.hashfn[fn])
1429                 sq_hash.append(self.rqdata.runq_hash[realtask])
1430                 sq_taskname.append(taskname)
1431                 sq_task.append(task)
1432             call = self.rq.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
1433             locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.configuration.data }
1434             valid = bb.utils.better_eval(call, locs)
1435
1436             valid_new = []
1437             for v in valid:
1438                 valid_new.append(sq_task[v])
1439
1440             for task in xrange(len(self.sq_revdeps)):
1441                 if task not in valid_new and task not in noexec:
1442                     logger.debug(2, 'No package found, so skipping setscene task %s',
1443                                  self.rqdata.get_user_idstring(task))
1444                     self.task_failoutright(task)
1445
1446         logger.info('Executing SetScene Tasks')
1447
1448         self.rq.state = runQueueSceneRun
1449
1450     def scenequeue_updatecounters(self, task):
1451         for dep in self.sq_deps[task]:
1452             self.sq_revdeps2[dep].remove(task)
1453             if len(self.sq_revdeps2[dep]) == 0:
1454                 self.runq_buildable[dep] = 1
1455
1456     def task_completeoutright(self, task):
1457         """
1458         Mark a task as completed
1459         Look at the reverse dependencies and mark any task with
1460         completed dependencies as buildable
1461         """
1462
1463         index = self.rqdata.runq_setscene[task]
1464         logger.debug(1, 'Found task %s which could be accelerated',
1465                         self.rqdata.get_user_idstring(index))
1466
1467         self.scenequeue_covered.add(task)
1468         self.scenequeue_updatecounters(task)
1469
1470     def task_complete(self, task):
1471         self.stats.taskCompleted()
1472         self.task_completeoutright(task)
1473
1474     def task_fail(self, task, result):
1475         self.stats.taskFailed()
1476         index = self.rqdata.runq_setscene[task]
1477         bb.event.fire(runQueueTaskFailed(task, self.stats, result, self), self.cfgData)
1478         self.scenequeue_notcovered.add(task)
1479         self.scenequeue_updatecounters(task)
1480
1481     def task_failoutright(self, task):
1482         self.runq_running[task] = 1
1483         self.runq_buildable[task] = 1
1484         self.stats.taskCompleted()
1485         self.stats.taskSkipped()
1486         index = self.rqdata.runq_setscene[task]
1487         self.scenequeue_notcovered.add(task)
1488         self.scenequeue_updatecounters(task)
1489
1490     def task_skip(self, task):
1491         self.runq_running[task] = 1
1492         self.runq_buildable[task] = 1
1493         self.task_completeoutright(task)
1494         self.stats.taskCompleted()
1495         self.stats.taskSkipped()
1496
1497     def execute(self):
1498         """
1499         Run the tasks in a queue prepared by prepare_runqueue
1500         """
1501
1502         task = None
1503         if self.stats.active < self.number_tasks:
1504             # Find the next setscene to run
1505             for nexttask in xrange(self.stats.total):
1506                 if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
1507                     task = nexttask
1508                     break
1509         if task is not None:
1510             realtask = self.rqdata.runq_setscene[task]
1511             fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1512
1513             taskname = self.rqdata.runq_task[realtask] + "_setscene"
1514             if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask]):
1515                 logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant',
1516                              task, self.rqdata.get_user_idstring(task))
1517                 self.task_failoutright(task)
1518                 return True
1519
1520             if self.cooker.configuration.force:
1521                 for target in self.rqdata.target_pairs:
1522                     if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
1523                         self.task_failoutright(task)
1524                         return True
1525
1526             if self.rq.check_stamp_task(realtask, taskname):
1527                 logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies',
1528                              task, self.rqdata.get_user_idstring(realtask))
1529                 self.task_skip(task)
1530                 return True
1531
1532             pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname)
1533
1534             self.build_pids[pid] = task
1535             self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
1536             self.runq_running[task] = 1
1537             self.stats.taskActive()
1538             if self.stats.active < self.number_tasks:
1539                 return True
1540
1541         for pipe in self.build_pipes:
1542             self.build_pipes[pipe].read()
1543
1544         if self.stats.active > 0:
1545             if self.runqueue_process_waitpid() is None:
1546                 return 0.5
1547             return True
1548
1549         # Convert scenequeue_covered task numbers into full taskgraph ids
1550         oldcovered = self.scenequeue_covered
1551         self.rq.scenequeue_covered = set()
1552         for task in oldcovered:
1553             self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
1554
1555         logger.debug(1, 'We can skip tasks %s', self.rq.scenequeue_covered)
1556
1557         self.rq.state = runQueueRunInit
1558         return True
1559
1560     def fork_off_task(self, fn, task, taskname):
1561         return RunQueueExecute.fork_off_task(self, fn, task, taskname, quieterrors=True)
1562
1563 class TaskFailure(Exception):
1564     """
1565     Exception raised when a task in a runqueue fails
1566     """
1567     def __init__(self, x):
1568         self.args = x
1569
1570
1571 class runQueueExitWait(bb.event.Event):
1572     """
1573     Event when waiting for task processes to exit
1574     """
1575
1576     def __init__(self, remain):
1577         self.remain = remain
1578         self.message = "Waiting for %s active tasks to finish" % remain
1579         bb.event.Event.__init__(self)
1580
1581 class runQueueEvent(bb.event.Event):
1582     """
1583     Base runQueue event class
1584     """
1585     def __init__(self, task, stats, rq):
1586         self.taskid = task
1587         self.taskstring = rq.rqdata.get_user_idstring(task)
1588         self.stats = stats.copy()
1589         bb.event.Event.__init__(self)
1590
1591 class runQueueTaskStarted(runQueueEvent):
1592     """
1593     Event notifing a task was started
1594     """
1595     def __init__(self, task, stats, rq, noexec=False):
1596         runQueueEvent.__init__(self, task, stats, rq)
1597         self.noexec = noexec
1598
1599 class runQueueTaskFailed(runQueueEvent):
1600     """
1601     Event notifing a task failed
1602     """
1603     def __init__(self, task, stats, exitcode, rq):
1604         runQueueEvent.__init__(self, task, stats, rq)
1605         self.exitcode = exitcode
1606
1607 class runQueueTaskCompleted(runQueueEvent):
1608     """
1609     Event notifing a task completed
1610     """
1611
1612 def check_stamp_fn(fn, taskname, d):
1613     rqexe = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)
1614     fn = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", d)
1615     fnid = rqexe.rqdata.taskData.getfn_id(fn)
1616     taskid = rqexe.rqdata.get_task_id(fnid, taskname)
1617     if taskid is not None:
1618         return rqexe.rq.check_stamp_task(taskid)
1619     return None
1620
1621 class runQueuePipe():
1622     """
1623     Abstraction for a pipe between a worker thread and the server
1624     """
1625     def __init__(self, pipein, pipeout, d):
1626         self.input = pipein
1627         pipeout.close()
1628         fcntl.fcntl(self.input, fcntl.F_SETFL, fcntl.fcntl(self.input, fcntl.F_GETFL) | os.O_NONBLOCK)
1629         self.queue = ""
1630         self.d = d
1631
1632     def read(self):
1633         start = len(self.queue)
1634         try:
1635             self.queue = self.queue + self.input.read(102400)
1636         except (OSError, IOError):
1637             pass
1638         end = len(self.queue)
1639         index = self.queue.find("</event>")
1640         while index != -1:
1641             bb.event.fire_from_worker(self.queue[:index+8], self.d)
1642             self.queue = self.queue[index+8:]
1643             index = self.queue.find("</event>")
1644         return (end > start)
1645
1646     def close(self):
1647         while self.read():
1648             continue
1649         if len(self.queue) > 0:
1650             print("Warning, worker left partial message: %s" % self.queue)
1651         self.input.close()