runqueue.py: Fix debug message to reference the correct task
[bitbake.git] / lib / bb / runqueue.py
1 #!/usr/bin/env python
2 # ex:ts=4:sw=4:sts=4:et
3 # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4 """
5 BitBake 'RunQueue' implementation
6
7 Handles preparation and execution of a queue of tasks
8 """
9
10 # Copyright (C) 2006-2007  Richard Purdie
11 #
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License version 2 as
14 # published by the Free Software Foundation.
15 #
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 # GNU General Public License for more details.
20 #
21 # You should have received a copy of the GNU General Public License along
22 # with this program; if not, write to the Free Software Foundation, Inc.,
23 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25 import copy
26 import os
27 import sys
28 import signal
29 import stat
30 import fcntl
31 import logging
32 import bb
33 from bb import msg, data, event
34
35 bblogger = logging.getLogger("BitBake")
36 logger = logging.getLogger("BitBake.RunQueue")
37
38 class RunQueueStats:
39     """
40     Holds statistics on the tasks handled by the associated runQueue
41     """
42     def __init__(self, total):
43         self.completed = 0
44         self.skipped = 0
45         self.failed = 0
46         self.active = 0
47         self.total = total
48
49     def copy(self):
50         obj = self.__class__(self.total)
51         obj.__dict__.update(self.__dict__)
52         return obj
53
54     def taskFailed(self):
55         self.active = self.active - 1
56         self.failed = self.failed + 1
57
58     def taskCompleted(self, number = 1):
59         self.active = self.active - number
60         self.completed = self.completed + number
61
62     def taskSkipped(self, number = 1):
63         self.active = self.active + number
64         self.skipped = self.skipped + number
65
66     def taskActive(self):
67         self.active = self.active + 1
68
69 # These values indicate the next step due to be run in the
70 # runQueue state machine
71 runQueuePrepare = 2
72 runQueueSceneInit = 3
73 runQueueSceneRun = 4
74 runQueueRunInit = 5
75 runQueueRunning = 6
76 runQueueFailed = 7
77 runQueueCleanUp = 8
78 runQueueComplete = 9
79 runQueueChildProcess = 10
80
81 class RunQueueScheduler(object):
82     """
83     Control the order tasks are scheduled in.
84     """
85     name = "basic"
86
87     def __init__(self, runqueue, rqdata):
88         """
89         The default scheduler just returns the first buildable task (the
90         priority map is sorted by task numer)
91         """
92         self.rq = runqueue
93         self.rqdata = rqdata
94         numTasks = len(self.rqdata.runq_fnid)
95
96         self.prio_map = []
97         self.prio_map.extend(range(numTasks))
98
99     def next_buildable_task(self):
100         """
101         Return the id of the first task we find that is buildable
102         """
103         for tasknum in xrange(len(self.rqdata.runq_fnid)):
104             taskid = self.prio_map[tasknum]
105             if self.rq.runq_running[taskid] == 1:
106                 continue
107             if self.rq.runq_buildable[taskid] == 1:
108                 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[taskid]]
109                 taskname = self.rqdata.runq_task[taskid]
110                 stamp = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
111                 if stamp in self.rq.build_stamps.values():
112                     continue
113                 return taskid
114
115     def next(self):
116         """
117         Return the id of the task we should build next
118         """
119         if self.rq.stats.active < self.rq.number_tasks:
120             return self.next_buildable_task()
121
122 class RunQueueSchedulerSpeed(RunQueueScheduler):
123     """
124     A scheduler optimised for speed. The priority map is sorted by task weight,
125     heavier weighted tasks (tasks needed by the most other tasks) are run first.
126     """
127     name = "speed"
128
129     def __init__(self, runqueue, rqdata):
130         """
131         The priority map is sorted by task weight.
132         """
133
134         self.rq = runqueue
135         self.rqdata = rqdata
136
137         sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight))
138         copyweight = copy.deepcopy(self.rqdata.runq_weight)
139         self.prio_map = []
140
141         for weight in sortweight:
142             idx = copyweight.index(weight)
143             self.prio_map.append(idx)
144             copyweight[idx] = -1
145
146         self.prio_map.reverse()
147
148 class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
149     """
150     A scheduler optimised to complete .bb files are quickly as possible. The
151     priority map is sorted by task weight, but then reordered so once a given
152     .bb file starts to build, its completed as quickly as possible. This works
153     well where disk space is at a premium and classes like OE's rm_work are in
154     force.
155     """
156     name = "completion"
157
158     def __init__(self, runqueue, rqdata):
159         RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata)
160
161         #FIXME - whilst this groups all fnids together it does not reorder the
162         #fnid groups optimally.
163
164         basemap = copy.deepcopy(self.prio_map)
165         self.prio_map = []
166         while (len(basemap) > 0):
167             entry = basemap.pop(0)
168             self.prio_map.append(entry)
169             fnid = self.rqdata.runq_fnid[entry]
170             todel = []
171             for entry in basemap:
172                 entry_fnid = self.rqdata.runq_fnid[entry]
173                 if entry_fnid == fnid:
174                     todel.append(basemap.index(entry))
175                     self.prio_map.append(entry)
176             todel.reverse()
177             for idx in todel:
178                 del basemap[idx]
179
180 class RunQueueData:
181     """
182     BitBake Run Queue implementation
183     """
184     def __init__(self, rq, cooker, cfgData, dataCache, taskData, targets):
185         self.cooker = cooker
186         self.dataCache = dataCache
187         self.taskData = taskData
188         self.targets = targets
189         self.rq = rq
190
191         self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or ""
192         self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split()
193
194         self.reset()
195
196     def reset(self):
197         self.runq_fnid = []
198         self.runq_task = []
199         self.runq_depends = []
200         self.runq_revdeps = []
201         self.runq_hash = []
202
203     def runq_depends_names(self, ids):
204         import re
205         ret = []
206         for id in self.runq_depends[ids]:
207             nam = os.path.basename(self.get_user_idstring(id))
208             nam = re.sub("_[^,]*,", ",", nam)
209             ret.extend([nam])
210         return ret
211
212     def get_user_idstring(self, task, task_name_suffix = ""):
213         fn = self.taskData.fn_index[self.runq_fnid[task]]
214         taskname = self.runq_task[task] + task_name_suffix
215         return "%s, %s" % (fn, taskname)
216
217     def get_task_id(self, fnid, taskname):
218         for listid in xrange(len(self.runq_fnid)):
219             if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname:
220                 return listid
221         return None
222
223     def circular_depchains_handler(self, tasks):
224         """
225         Some tasks aren't buildable, likely due to circular dependency issues.
226         Identify the circular dependencies and print them in a user readable format.
227         """
228         from copy import deepcopy
229
230         valid_chains = []
231         explored_deps = {}
232         msgs = []
233
234         def chain_reorder(chain):
235             """
236             Reorder a dependency chain so the lowest task id is first
237             """
238             lowest = 0
239             new_chain = []
240             for entry in xrange(len(chain)):
241                 if chain[entry] < chain[lowest]:
242                     lowest = entry
243             new_chain.extend(chain[lowest:])
244             new_chain.extend(chain[:lowest])
245             return new_chain
246
247         def chain_compare_equal(chain1, chain2):
248             """
249             Compare two dependency chains and see if they're the same
250             """
251             if len(chain1) != len(chain2):
252                 return False
253             for index in xrange(len(chain1)):
254                 if chain1[index] != chain2[index]:
255                     return False
256             return True
257
258         def chain_array_contains(chain, chain_array):
259             """
260             Return True if chain_array contains chain
261             """
262             for ch in chain_array:
263                 if chain_compare_equal(ch, chain):
264                     return True
265             return False
266
267         def find_chains(taskid, prev_chain):
268             prev_chain.append(taskid)
269             total_deps = []
270             total_deps.extend(self.runq_revdeps[taskid])
271             for revdep in self.runq_revdeps[taskid]:
272                 if revdep in prev_chain:
273                     idx = prev_chain.index(revdep)
274                     # To prevent duplicates, reorder the chain to start with the lowest taskid
275                     # and search through an array of those we've already printed
276                     chain = prev_chain[idx:]
277                     new_chain = chain_reorder(chain)
278                     if not chain_array_contains(new_chain, valid_chains):
279                         valid_chains.append(new_chain)
280                         msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
281                         for dep in new_chain:
282                             msgs.append("  Task %s (%s) (dependent Tasks %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends_names(dep)))
283                         msgs.append("\n")
284                     if len(valid_chains) > 10:
285                         msgs.append("Aborted dependency loops search after 10 matches.\n")
286                         return msgs
287                     continue
288                 scan = False
289                 if revdep not in explored_deps:
290                     scan = True
291                 elif revdep in explored_deps[revdep]:
292                     scan = True
293                 else:
294                     for dep in prev_chain:
295                         if dep in explored_deps[revdep]:
296                             scan = True
297                 if scan:
298                     find_chains(revdep, copy.deepcopy(prev_chain))
299                 for dep in explored_deps[revdep]:
300                     if dep not in total_deps:
301                         total_deps.append(dep)
302
303             explored_deps[taskid] = total_deps
304
305         for task in tasks:
306             find_chains(task, [])
307
308         return msgs
309
310     def calculate_task_weights(self, endpoints):
311         """
312         Calculate a number representing the "weight" of each task. Heavier weighted tasks
313         have more dependencies and hence should be executed sooner for maximum speed.
314
315         This function also sanity checks the task list finding tasks that are not
316         possible to execute due to circular dependencies.
317         """
318
319         numTasks = len(self.runq_fnid)
320         weight = []
321         deps_left = []
322         task_done = []
323
324         for listid in xrange(numTasks):
325             task_done.append(False)
326             weight.append(0)
327             deps_left.append(len(self.runq_revdeps[listid]))
328
329         for listid in endpoints:
330             weight[listid] = 1
331             task_done[listid] = True
332
333         while True:
334             next_points = []
335             for listid in endpoints:
336                 for revdep in self.runq_depends[listid]:
337                     weight[revdep] = weight[revdep] + weight[listid]
338                     deps_left[revdep] = deps_left[revdep] - 1
339                     if deps_left[revdep] == 0:
340                         next_points.append(revdep)
341                         task_done[revdep] = True
342             endpoints = next_points
343             if len(next_points) == 0:
344                 break
345
346         # Circular dependency sanity check
347         problem_tasks = []
348         for task in xrange(numTasks):
349             if task_done[task] is False or deps_left[task] != 0:
350                 problem_tasks.append(task)
351                 logger.debug(2, "Task %s (%s) is not buildable", task, self.get_user_idstring(task))
352                 logger.debug(2, "(Complete marker was %s and the remaining dependency count was %s)\n", task_done[task], deps_left[task])
353
354         if problem_tasks:
355             message = "Unbuildable tasks were found.\n"
356             message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
357             message = message + "Identifying dependency loops (this may take a short while)...\n"
358             logger.error(message)
359
360             msgs = self.circular_depchains_handler(problem_tasks)
361
362             message = "\n"
363             for msg in msgs:
364                 message = message + msg
365             bb.msg.fatal("RunQueue", message)
366
367         return weight
368
369     def prepare(self):
370         """
371         Turn a set of taskData into a RunQueue and compute data needed
372         to optimise the execution order.
373         """
374
375         runq_build = []
376         recursive_tdepends = {}
377         runq_recrdepends = []
378         tdepends_fnid = {}
379
380         taskData = self.taskData
381
382         if len(taskData.tasks_name) == 0:
383             # Nothing to do
384             return 0
385
386         logger.info("Preparing runqueue")
387
388         # Step A - Work out a list of tasks to run
389         #
390         # Taskdata gives us a list of possible providers for every build and run
391         # target ordered by priority. It also gives information on each of those
392         # providers.
393         #
394         # To create the actual list of tasks to execute we fix the list of
395         # providers and then resolve the dependencies into task IDs. This
396         # process is repeated for each type of dependency (tdepends, deptask,
397         # rdeptast, recrdeptask, idepends).
398
399         def add_build_dependencies(depids, tasknames, depends):
400             for depid in depids:
401                 # Won't be in build_targets if ASSUME_PROVIDED
402                 if depid not in taskData.build_targets:
403                     continue
404                 depdata = taskData.build_targets[depid][0]
405                 if depdata is None:
406                     continue
407                 dep = taskData.fn_index[depdata]
408                 for taskname in tasknames:
409                     taskid = taskData.gettask_id(dep, taskname, False)
410                     if taskid is not None:
411                         depends.append(taskid)
412
413         def add_runtime_dependencies(depids, tasknames, depends):
414             for depid in depids:
415                 if depid not in taskData.run_targets:
416                     continue
417                 depdata = taskData.run_targets[depid][0]
418                 if depdata is None:
419                     continue
420                 dep = taskData.fn_index[depdata]
421                 for taskname in tasknames:
422                     taskid = taskData.gettask_id(dep, taskname, False)
423                     if taskid is not None:
424                         depends.append(taskid)
425
426         for task in xrange(len(taskData.tasks_name)):
427             depends = []
428             recrdepends = []
429             fnid = taskData.tasks_fnid[task]
430             fn = taskData.fn_index[fnid]
431             task_deps = self.dataCache.task_deps[fn]
432
433             logger.debug(2, "Processing %s:%s", fn, taskData.tasks_name[task])
434
435             if fnid not in taskData.failed_fnids:
436
437                 # Resolve task internal dependencies
438                 #
439                 # e.g. addtask before X after Y
440                 depends = taskData.tasks_tdepends[task]
441
442                 # Resolve 'deptask' dependencies
443                 #
444                 # e.g. do_sometask[deptask] = "do_someothertask"
445                 # (makes sure sometask runs after someothertask of all DEPENDS)
446                 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
447                     tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
448                     add_build_dependencies(taskData.depids[fnid], tasknames, depends)
449
450                 # Resolve 'rdeptask' dependencies
451                 #
452                 # e.g. do_sometask[rdeptask] = "do_someothertask"
453                 # (makes sure sometask runs after someothertask of all RDEPENDS)
454                 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
455                     taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
456                     add_runtime_dependencies(taskData.rdepids[fnid], [taskname], depends)
457
458                 # Resolve inter-task dependencies
459                 #
460                 # e.g. do_sometask[depends] = "targetname:do_someothertask"
461                 # (makes sure sometask runs after targetname's someothertask)
462                 if fnid not in tdepends_fnid:
463                     tdepends_fnid[fnid] = set()
464                 idepends = taskData.tasks_idepends[task]
465                 for (depid, idependtask) in idepends:
466                     if depid in taskData.build_targets:
467                         # Won't be in build_targets if ASSUME_PROVIDED
468                         depdata = taskData.build_targets[depid][0]
469                         if depdata is not None:
470                             dep = taskData.fn_index[depdata]
471                             taskid = taskData.gettask_id(dep, idependtask, False)
472                             if taskid is None:
473                                 bb.msg.fatal("RunQueue", "Task %s in %s depends upon nonexistant task %s in %s" % (taskData.tasks_name[task], fn, idependtask, dep))
474                             depends.append(taskid)
475                             if depdata != fnid:
476                                 tdepends_fnid[fnid].add(taskid)
477
478
479                 # Resolve recursive 'recrdeptask' dependencies (A)
480                 #
481                 # e.g. do_sometask[recrdeptask] = "do_someothertask"
482                 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
483                 # We cover the recursive part of the dependencies below
484                 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
485                     for taskname in task_deps['recrdeptask'][taskData.tasks_name[task]].split():
486                         recrdepends.append(taskname)
487                         add_build_dependencies(taskData.depids[fnid], [taskname], depends)
488                         add_runtime_dependencies(taskData.rdepids[fnid], [taskname], depends)
489
490                 # Rmove all self references
491                 if task in depends:
492                     newdep = []
493                     logger.debug(2, "Task %s (%s %s) contains self reference! %s", task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends)
494                     for dep in depends:
495                         if task != dep:
496                             newdep.append(dep)
497                     depends = newdep
498
499             self.runq_fnid.append(taskData.tasks_fnid[task])
500             self.runq_task.append(taskData.tasks_name[task])
501             self.runq_depends.append(set(depends))
502             self.runq_revdeps.append(set())
503             self.runq_hash.append("")
504
505             runq_build.append(0)
506             runq_recrdepends.append(recrdepends)
507
508         #
509         # Build a list of recursive cumulative dependencies for each fnid
510         # We do this by fnid, since if A depends on some task in B
511         # we're interested in later tasks B's fnid might have but B itself
512         # doesn't depend on
513         #
514         # Algorithm is O(tasks) + O(tasks)*O(fnids)
515         #
516         reccumdepends = {}
517         for task in xrange(len(self.runq_fnid)):
518             fnid = self.runq_fnid[task]
519             if fnid not in reccumdepends:
520                 if fnid in tdepends_fnid:
521                     reccumdepends[fnid] = tdepends_fnid[fnid]
522                 else:
523                     reccumdepends[fnid] = set()
524             reccumdepends[fnid].update(self.runq_depends[task])
525         for task in xrange(len(self.runq_fnid)):
526             taskfnid = self.runq_fnid[task]
527             for fnid in reccumdepends:
528                 if task in reccumdepends[fnid]:
529                     reccumdepends[fnid].add(task)
530                     if taskfnid in reccumdepends:
531                         reccumdepends[fnid].update(reccumdepends[taskfnid])
532
533
534         # Resolve recursive 'recrdeptask' dependencies (B)
535         #
536         # e.g. do_sometask[recrdeptask] = "do_someothertask"
537         # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
538         for task in xrange(len(self.runq_fnid)):
539             if len(runq_recrdepends[task]) > 0:
540                 taskfnid = self.runq_fnid[task]
541                 for dep in reccumdepends[taskfnid]:
542                     # Ignore self references
543                     if dep == task:
544                         continue
545                     for taskname in runq_recrdepends[task]:
546                         if taskData.tasks_name[dep] == taskname:
547                             self.runq_depends[task].add(dep)
548
549         # Step B - Mark all active tasks
550         #
551         # Start with the tasks we were asked to run and mark all dependencies
552         # as active too. If the task is to be 'forced', clear its stamp. Once
553         # all active tasks are marked, prune the ones we don't need.
554
555         logger.verbose("Marking Active Tasks")
556
557         def mark_active(listid, depth):
558             """
559             Mark an item as active along with its depends
560             (calls itself recursively)
561             """
562
563             if runq_build[listid] == 1:
564                 return
565
566             runq_build[listid] = 1
567
568             depends = self.runq_depends[listid]
569             for depend in depends:
570                 mark_active(depend, depth+1)
571
572         self.target_pairs = []
573         for target in self.targets:
574             targetid = taskData.getbuild_id(target[0])
575
576             if targetid not in taskData.build_targets:
577                 continue
578
579             if targetid in taskData.failed_deps:
580                 continue
581
582             fnid = taskData.build_targets[targetid][0]
583             fn = taskData.fn_index[fnid]
584             self.target_pairs.append((fn, target[1]))
585
586             if fnid in taskData.failed_fnids:
587                 continue
588
589             if target[1] not in taskData.tasks_lookup[fnid]:
590                 bb.msg.fatal("RunQueue", "Task %s does not exist for target %s" % (target[1], target[0]))
591
592             listid = taskData.tasks_lookup[fnid][target[1]]
593
594             mark_active(listid, 1)
595
596         # Step C - Prune all inactive tasks
597         #
598         # Once all active tasks are marked, prune the ones we don't need.
599
600         maps = []
601         delcount = 0
602         for listid in xrange(len(self.runq_fnid)):
603             if runq_build[listid-delcount] == 1:
604                 maps.append(listid-delcount)
605             else:
606                 del self.runq_fnid[listid-delcount]
607                 del self.runq_task[listid-delcount]
608                 del self.runq_depends[listid-delcount]
609                 del runq_build[listid-delcount]
610                 del self.runq_revdeps[listid-delcount]
611                 del self.runq_hash[listid-delcount]
612                 delcount = delcount + 1
613                 maps.append(-1)
614
615         #
616         # Step D - Sanity checks and computation
617         #
618
619         # Check to make sure we still have tasks to run
620         if len(self.runq_fnid) == 0:
621             if not taskData.abort:
622                 bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
623             else:
624                 bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.")
625
626         logger.verbose("Pruned %s inactive tasks, %s left", delcount, len(self.runq_fnid))
627
628         # Remap the dependencies to account for the deleted tasks
629         # Check we didn't delete a task we depend on
630         for listid in xrange(len(self.runq_fnid)):
631             newdeps = []
632             origdeps = self.runq_depends[listid]
633             for origdep in origdeps:
634                 if maps[origdep] == -1:
635                     bb.msg.fatal("RunQueue", "Invalid mapping - Should never happen!")
636                 newdeps.append(maps[origdep])
637             self.runq_depends[listid] = set(newdeps)
638
639         logger.verbose("Assign Weightings")
640
641         # Generate a list of reverse dependencies to ease future calculations
642         for listid in xrange(len(self.runq_fnid)):
643             for dep in self.runq_depends[listid]:
644                 self.runq_revdeps[dep].add(listid)
645
646         # Identify tasks at the end of dependency chains
647         # Error on circular dependency loops (length two)
648         endpoints = []
649         for listid in xrange(len(self.runq_fnid)):
650             revdeps = self.runq_revdeps[listid]
651             if len(revdeps) == 0:
652                 endpoints.append(listid)
653             for dep in revdeps:
654                 if dep in self.runq_depends[listid]:
655                     #self.dump_data(taskData)
656                     bb.msg.fatal("RunQueue", "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep], taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
657
658         logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
659
660         # Calculate task weights
661         # Check of higher length circular dependencies
662         self.runq_weight = self.calculate_task_weights(endpoints)
663
664         # Sanity Check - Check for multiple tasks building the same provider
665         prov_list = {}
666         seen_fn = []
667         for task in xrange(len(self.runq_fnid)):
668             fn = taskData.fn_index[self.runq_fnid[task]]
669             if fn in seen_fn:
670                 continue
671             seen_fn.append(fn)
672             for prov in self.dataCache.fn_provides[fn]:
673                 if prov not in prov_list:
674                     prov_list[prov] = [fn]
675                 elif fn not in prov_list[prov]:
676                     prov_list[prov].append(fn)
677         error = False
678         for prov in prov_list:
679             if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
680                 error = True
681                 logger.error("Multiple .bb files are due to be built which each provide %s (%s).\n This usually means one provides something the other doesn't and should.", prov, " ".join(prov_list[prov]))
682
683
684         # Create a whitelist usable by the stamp checks
685         stampfnwhitelist = []
686         for entry in self.stampwhitelist.split():
687             entryid = self.taskData.getbuild_id(entry)
688             if entryid not in self.taskData.build_targets:
689                 continue
690             fnid = self.taskData.build_targets[entryid][0]
691             fn = self.taskData.fn_index[fnid]
692             stampfnwhitelist.append(fn)
693         self.stampfnwhitelist = stampfnwhitelist
694
695         # Interate over the task list looking for tasks with a 'setscene' function
696         self.runq_setscene = []
697         for task in range(len(self.runq_fnid)):
698             setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
699             if not setscene:
700                 continue
701             self.runq_setscene.append(task)
702
703         # Interate over the task list and call into the siggen code
704         dealtwith = set()
705         todeal = set(range(len(self.runq_fnid)))
706         while len(todeal) > 0:
707             for task in todeal.copy():
708                 if len(self.runq_depends[task] - dealtwith) == 0:
709                     dealtwith.add(task)
710                     todeal.remove(task)
711                     procdep = []
712                     for dep in self.runq_depends[task]:
713                         procdep.append(self.taskData.fn_index[self.runq_fnid[dep]] + "." + self.runq_task[dep])
714                     self.runq_hash[task] = bb.parse.siggen.get_taskhash(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task], procdep, self.dataCache)
715
716         self.hashes = {}
717         self.hash_deps = {}
718         for task in xrange(len(self.runq_fnid)):
719             identifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[task]],
720                                     self.runq_task[task])
721             self.hashes[identifier] = self.runq_hash[task]
722             deps = []
723             for dep in self.runq_depends[task]:
724                 depidentifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[dep]],
725                                            self.runq_task[dep])
726                 deps.append(depidentifier)
727             self.hash_deps[identifier] = deps
728
729         # Remove stamps for targets if force mode active
730         if self.cooker.configuration.force:
731             for (fn, target) in self.target_pairs:
732                 logger.verbose("Remove stamp %s, %s", target, fn)
733                 bb.build.del_stamp(target, self.dataCache, fn)
734
735         return len(self.runq_fnid)
736
737     def dump_data(self, taskQueue):
738         """
739         Dump some debug information on the internal data structures
740         """
741         logger.debug(3, "run_tasks:")
742         for task in xrange(len(self.rqdata.runq_task)):
743             logger.debug(3, " (%s)%s - %s: %s   Deps %s RevDeps %s", task,
744                          taskQueue.fn_index[self.rqdata.runq_fnid[task]],
745                          self.rqdata.runq_task[task],
746                          self.rqdata.runq_weight[task],
747                          self.rqdata.runq_depends[task],
748                          self.rqdata.runq_revdeps[task])
749
750         logger.debug(3, "sorted_tasks:")
751         for task1 in xrange(len(self.rqdata.runq_task)):
752             if task1 in self.prio_map:
753                 task = self.prio_map[task1]
754                 logger.debug(3, " (%s)%s - %s: %s   Deps %s RevDeps %s", task,
755                            taskQueue.fn_index[self.rqdata.runq_fnid[task]],
756                            self.rqdata.runq_task[task],
757                            self.rqdata.runq_weight[task],
758                            self.rqdata.runq_depends[task],
759                            self.rqdata.runq_revdeps[task])
760
761 class RunQueue:
762     def __init__(self, cooker, cfgData, dataCache, taskData, targets):
763
764         self.cooker = cooker
765         self.cfgData = cfgData
766         self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets)
767
768         self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, True) or "perfile"
769         self.hashvalidate = bb.data.getVar("BB_HASHCHECK_FUNCTION", cfgData, True) or None
770         self.setsceneverify = bb.data.getVar("BB_SETSCENE_VERIFY_FUNCTION", cfgData, True) or None
771
772         self.state = runQueuePrepare
773
774     def check_stamps(self):
775         unchecked = {}
776         current = []
777         notcurrent = []
778         buildable = []
779
780         if self.stamppolicy == "perfile":
781             fulldeptree = False
782         else:
783             fulldeptree = True
784             stampwhitelist = []
785             if self.stamppolicy == "whitelist":
786                 stampwhitelist = self.rqdata.stampfnwhitelist
787
788         for task in xrange(len(self.rqdata.runq_fnid)):
789             unchecked[task] = ""
790             if len(self.rqdata.runq_depends[task]) == 0:
791                 buildable.append(task)
792
793         def check_buildable(self, task, buildable):
794             for revdep in self.rqdata.runq_revdeps[task]:
795                 alldeps = 1
796                 for dep in self.rqdata.runq_depends[revdep]:
797                     if dep in unchecked:
798                         alldeps = 0
799                 if alldeps == 1:
800                     if revdep in unchecked:
801                         buildable.append(revdep)
802
803         for task in xrange(len(self.rqdata.runq_fnid)):
804             if task not in unchecked:
805                 continue
806             fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
807             taskname = self.rqdata.runq_task[task]
808             stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
809             # If the stamp is missing its not current
810             if not os.access(stampfile, os.F_OK):
811                 del unchecked[task]
812                 notcurrent.append(task)
813                 check_buildable(self, task, buildable)
814                 continue
815             # If its a 'nostamp' task, it's not current
816             taskdep = self.rqdata.dataCache.task_deps[fn]
817             if 'nostamp' in taskdep and task in taskdep['nostamp']:
818                 del unchecked[task]
819                 notcurrent.append(task)
820                 check_buildable(self, task, buildable)
821                 continue
822
823         while (len(buildable) > 0):
824             nextbuildable = []
825             for task in buildable:
826                 if task in unchecked:
827                     fn = self.taskData.fn_index[self.rqdata.runq_fnid[task]]
828                     taskname = self.rqdata.runq_task[task]
829                     stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
830                     iscurrent = True
831
832                     t1 = os.stat(stampfile)[stat.ST_MTIME]
833                     for dep in self.rqdata.runq_depends[task]:
834                         if iscurrent:
835                             fn2 = self.taskData.fn_index[self.rqdata.runq_fnid[dep]]
836                             taskname2 = self.rqdata.runq_task[dep]
837                             stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
838                             if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
839                                 if dep in notcurrent:
840                                     iscurrent = False
841                                 else:
842                                     t2 = os.stat(stampfile2)[stat.ST_MTIME]
843                                     if t1 < t2:
844                                         iscurrent = False
845                     del unchecked[task]
846                     if iscurrent:
847                         current.append(task)
848                     else:
849                         notcurrent.append(task)
850
851                 check_buildable(self, task, nextbuildable)
852
853             buildable = nextbuildable
854
855         #for task in range(len(self.runq_fnid)):
856         #    fn = self.taskData.fn_index[self.runq_fnid[task]]
857         #    taskname = self.runq_task[task]
858         #    print "%s %s.%s" % (task, taskname, fn)
859
860         #print "Unchecked: %s" % unchecked
861         #print "Current: %s" % current
862         #print "Not current: %s" % notcurrent
863
864         if len(unchecked) > 0:
865             bb.msg.fatal("RunQueue", "check_stamps fatal internal error")
866         return current
867
868     def check_stamp_task(self, task, taskname = None):
869         def get_timestamp(f):
870             try:
871                 if not os.access(f, os.F_OK):
872                     return None
873                 return os.stat(f)[stat.ST_MTIME]
874             except:
875                 return None
876
877         if self.stamppolicy == "perfile":
878             fulldeptree = False
879         else:
880             fulldeptree = True
881             stampwhitelist = []
882             if self.stamppolicy == "whitelist":
883                 stampwhitelist = self.rqdata.stampfnwhitelist
884
885         fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
886         if taskname is None:
887             taskname = self.rqdata.runq_task[task]
888
889         stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
890
891         # If the stamp is missing its not current
892         if not os.access(stampfile, os.F_OK):
893             logger.debug(2, "Stampfile %s not available", stampfile)
894             return False
895         # If its a 'nostamp' task, it's not current
896         taskdep = self.rqdata.dataCache.task_deps[fn]
897         if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
898             logger.debug(2, "%s.%s is nostamp\n", fn, taskname)
899             return False
900
901         if taskname != "do_setscene" and taskname.endswith("_setscene"):
902             return True
903
904         iscurrent = True
905         t1 = get_timestamp(stampfile)
906         for dep in self.rqdata.runq_depends[task]:
907             if iscurrent:
908                 fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]]
909                 taskname2 = self.rqdata.runq_task[dep]
910                 stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
911                 stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCache, fn2)
912                 t2 = get_timestamp(stampfile2)
913                 t3 = get_timestamp(stampfile3)
914                 if t3 and t3 > t2:
915                    continue
916                 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
917                     if not t2:
918                         logger.debug(2, 'Stampfile %s does not exist', stampfile2)
919                         iscurrent = False
920                     if t1 < t2:
921                         logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2)
922                         iscurrent = False
923
924         return iscurrent
925
926     def execute_runqueue(self):
927         """
928         Run the tasks in a queue prepared by rqdata.prepare()
929         Upon failure, optionally try to recover the build using any alternate providers
930         (if the abort on failure configuration option isn't set)
931         """
932
933         retval = 0.5
934
935         if self.state is runQueuePrepare:
936             self.rqexe = RunQueueExecuteDummy(self)
937             if self.rqdata.prepare() == 0:
938                 self.state = runQueueComplete
939             else:
940                 self.state = runQueueSceneInit
941
942         if self.state is runQueueSceneInit:
943             if self.cooker.configuration.dump_signatures:
944                 self.dump_signatures()
945             else:
946                 self.rqexe = RunQueueExecuteScenequeue(self)
947
948         if self.state is runQueueSceneRun:
949             retval = self.rqexe.execute()
950
951         if self.state is runQueueRunInit:
952             logger.info("Executing RunQueue Tasks")
953             self.rqexe = RunQueueExecuteTasks(self)
954             self.state = runQueueRunning
955
956         if self.state is runQueueRunning:
957             retval = self.rqexe.execute()
958
959         if self.state is runQueueCleanUp:
960            self.rqexe.finish()
961
962         if self.state is runQueueFailed:
963             if not self.rqdata.taskData.tryaltconfigs:
964                 raise bb.runqueue.TaskFailure(self.rqexe.failed_fnids)
965             for fnid in self.rqexe.failed_fnids:
966                 self.rqdata.taskData.fail_fnid(fnid)
967             self.rqdata.reset()
968
969         if self.state is runQueueComplete:
970             # All done
971             logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
972             return False
973
974         if self.state is runQueueChildProcess:
975             print("Child process, eeek, shouldn't happen!")
976             return False
977
978         # Loop
979         return retval
980
981     def finish_runqueue(self, now = False):
982         if now:
983             self.rqexe.finish_now()
984         else:
985             self.rqexe.finish()
986
987     def dump_signatures(self):
988         self.state = runQueueComplete
989         done = set()
990         bb.note("Reparsing files to collect dependency data")
991         for task in range(len(self.rqdata.runq_fnid)):
992             if self.rqdata.runq_fnid[task] not in done:
993                 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
994                 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
995                 done.add(self.rqdata.runq_fnid[task])
996
997         bb.parse.siggen.dump_sigs(self.rqdata.dataCache)
998
999         return
1000
1001
1002 class RunQueueExecute:
1003
1004     def __init__(self, rq):
1005         self.rq = rq
1006         self.cooker = rq.cooker
1007         self.cfgData = rq.cfgData
1008         self.rqdata = rq.rqdata
1009
1010         self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", self.cfgData, 1) or 1)
1011         self.scheduler = bb.data.getVar("BB_SCHEDULER", self.cfgData, 1) or "speed"
1012
1013         self.runq_buildable = []
1014         self.runq_running = []
1015         self.runq_complete = []
1016         self.build_pids = {}
1017         self.build_pipes = {}
1018         self.build_stamps = {}
1019         self.failed_fnids = []
1020
1021     def runqueue_process_waitpid(self):
1022         """
1023         Return none is there are no processes awaiting result collection, otherwise
1024         collect the process exit codes and close the information pipe.
1025         """
1026         result = os.waitpid(-1, os.WNOHANG)
1027         if result[0] == 0 and result[1] == 0:
1028             return None
1029         task = self.build_pids[result[0]]
1030         del self.build_pids[result[0]]
1031         self.build_pipes[result[0]].close()
1032         del self.build_pipes[result[0]]
1033         # self.build_stamps[result[0]] may not exist when use shared work directory.
1034         if result[0] in self.build_stamps.keys():
1035             del self.build_stamps[result[0]]
1036         if result[1] != 0:
1037             self.task_fail(task, result[1]>>8)
1038         else:
1039             self.task_complete(task)
1040         return True
1041
1042     def finish_now(self):
1043         if self.stats.active:
1044             logger.info("Sending SIGTERM to remaining %s tasks", self.stats.active)
1045             for k, v in self.build_pids.iteritems():
1046                 try:
1047                     os.kill(-k, signal.SIGTERM)
1048                 except:
1049                     pass
1050         for pipe in self.build_pipes:
1051             self.build_pipes[pipe].read()
1052
1053     def finish(self):
1054         self.rq.state = runQueueCleanUp
1055
1056         for pipe in self.build_pipes:
1057             self.build_pipes[pipe].read()
1058
1059         if self.stats.active > 0:
1060             bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1061             self.runqueue_process_waitpid()
1062             return
1063
1064         if len(self.failed_fnids) != 0:
1065             self.rq.state = runQueueFailed
1066             return
1067
1068         self.rq.state = runQueueComplete
1069         return
1070
1071     def fork_off_task(self, fn, task, taskname, quieterrors=False):
1072         # We need to setup the environment BEFORE the fork, since
1073         # a fork() or exec*() activates PSEUDO...
1074
1075         envbackup = {}
1076         fakeenv = {}
1077         umask = None
1078
1079         taskdep = self.rqdata.dataCache.task_deps[fn]
1080         if 'umask' in taskdep and taskname in taskdep['umask']:
1081             # umask might come in as a number or text string..
1082             try:
1083                  umask = int(taskdep['umask'][taskname],8)
1084             except TypeError:
1085                  umask = taskdep['umask'][taskname]
1086
1087         if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
1088             envvars = (self.rqdata.dataCache.fakerootenv[fn] or "").split()
1089             for key, value in (var.split('=') for var in envvars):
1090                 envbackup[key] = os.environ.get(key)
1091                 os.environ[key] = value
1092                 fakeenv[key] = value
1093
1094             fakedirs = (self.rqdata.dataCache.fakerootdirs[fn] or "").split()
1095             for p in fakedirs:
1096                 bb.utils.mkdirhier(p)
1097
1098             logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
1099                             (fn, taskname, ', '.join(fakedirs)))
1100         else:
1101             envvars = (self.rqdata.dataCache.fakerootnoenv[fn] or "").split()
1102             for key, value in (var.split('=') for var in envvars):
1103                 envbackup[key] = os.environ.get(key)
1104                 os.environ[key] = value
1105                 fakeenv[key] = value
1106
1107         sys.stdout.flush()
1108         sys.stderr.flush()
1109         try:
1110             pipein, pipeout = os.pipe()
1111             pipein = os.fdopen(pipein, 'rb', 4096)
1112             pipeout = os.fdopen(pipeout, 'wb', 0)
1113             pid = os.fork()
1114         except OSError as e:
1115             bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
1116
1117         if pid == 0:
1118             pipein.close()
1119
1120             # Save out the PID so that the event can include it the
1121             # events
1122             bb.event.worker_pid = os.getpid()
1123             bb.event.worker_pipe = pipeout
1124
1125             self.rq.state = runQueueChildProcess
1126             # Make the child the process group leader
1127             os.setpgid(0, 0)
1128             # No stdin
1129             newsi = os.open(os.devnull, os.O_RDWR)
1130             os.dup2(newsi, sys.stdin.fileno())
1131
1132             if umask:
1133                 os.umask(umask)
1134
1135             bb.data.setVar("BB_WORKERCONTEXT", "1", self.cooker.configuration.data)
1136             bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
1137             bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data)
1138             bb.parse.siggen.set_taskdata(self.rqdata.hashes, self.rqdata.hash_deps)
1139             ret = 0
1140             try:
1141                 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
1142                 the_data.setVar('BB_TASKHASH', self.rqdata.runq_hash[task])
1143                 for h in self.rqdata.hashes:
1144                     the_data.setVar("BBHASH_%s" % h, self.rqdata.hashes[h])
1145                 for h in self.rqdata.hash_deps:
1146                     the_data.setVar("BBHASHDEPS_%s" % h, self.rqdata.hash_deps[h])
1147
1148                 # exported_vars() returns a generator which *cannot* be passed to os.environ.update() 
1149                 # successfully. We also need to unset anything from the environment which shouldn't be there 
1150                 exports = bb.data.exported_vars(the_data)
1151                 bb.utils.empty_environment()
1152                 for e, v in exports:
1153                     os.environ[e] = v
1154                 for e in fakeenv:
1155                     os.environ[e] = fakeenv[e]
1156                     the_data.setVar(e, fakeenv[e])
1157
1158                 if quieterrors:
1159                     the_data.setVarFlag(taskname, "quieterrors", "1")
1160
1161             except Exception as exc:
1162                 if not quieterrors:
1163                     logger.critical(str(exc))
1164                 os._exit(1)
1165             try:
1166                 ret = bb.build.exec_task(fn, taskname, the_data)
1167                 os._exit(ret)
1168             except:
1169                 os._exit(1)
1170         else:
1171             for key, value in envbackup.iteritems():
1172                 if value is None:
1173                     del os.environ[key]
1174                 else:
1175                     os.environ[key] = value
1176
1177         return pid, pipein, pipeout
1178
1179 class RunQueueExecuteDummy(RunQueueExecute):
1180     def __init__(self, rq):
1181         self.rq = rq
1182         self.stats = RunQueueStats(0)
1183
1184     def finish(self):
1185         self.rq.state = runQueueComplete
1186         return
1187
1188 class RunQueueExecuteTasks(RunQueueExecute):
1189     def __init__(self, rq):
1190         RunQueueExecute.__init__(self, rq)
1191
1192         self.stats = RunQueueStats(len(self.rqdata.runq_fnid))
1193
1194         # Mark initial buildable tasks
1195         for task in xrange(self.stats.total):
1196             self.runq_running.append(0)
1197             self.runq_complete.append(0)
1198             if len(self.rqdata.runq_depends[task]) == 0:
1199                 self.runq_buildable.append(1)
1200             else:
1201                 self.runq_buildable.append(0)
1202             if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
1203                 self.rq.scenequeue_covered.add(task)
1204
1205         found = True
1206         while found:
1207             found = False
1208             for task in xrange(self.stats.total):
1209                 if task in self.rq.scenequeue_covered:
1210                     continue
1211                 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
1212                     ok = True
1213                     for revdep in self.rqdata.runq_revdeps[task]:
1214                         if self.rqdata.runq_fnid[task] != self.rqdata.runq_fnid[revdep]:
1215                             ok = False
1216                             break
1217                     if ok:
1218                         found = True
1219                         self.rq.scenequeue_covered.add(task)
1220
1221         logger.debug(1, 'Skip list (pre setsceneverify) %s', sorted(self.rq.scenequeue_covered))
1222
1223         # Allow the metadata to elect for setscene tasks to run anyway
1224         covered_remove = set()
1225         if self.rq.setsceneverify:
1226             call = self.rq.setsceneverify + "(covered, tasknames, fnids, fns, d)"
1227             locs = { "covered" : self.rq.scenequeue_covered, "tasknames" : self.rqdata.runq_task, "fnids" : self.rqdata.runq_fnid, "fns" : self.rqdata.taskData.fn_index, "d" : self.cooker.configuration.data }
1228             covered_remove = bb.utils.better_eval(call, locs)
1229
1230         for task in covered_remove:
1231             fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1232             taskname = self.rqdata.runq_task[task] + '_setscene'
1233             bb.build.del_stamp(taskname, self.rqdata.dataCache, fn)
1234             logger.debug(1, 'Not skipping task %s due to setsceneverify', task)
1235             self.rq.scenequeue_covered.remove(task)
1236
1237         logger.debug(1, 'Full skip list %s', self.rq.scenequeue_covered)
1238
1239         event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
1240
1241         schedulers = self.get_schedulers()
1242         for scheduler in schedulers:
1243             if self.scheduler == scheduler.name:
1244                 self.sched = scheduler(self, self.rqdata)
1245                 logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name)
1246                 break
1247         else:
1248             bb.fatal("Invalid scheduler '%s'.  Available schedulers: %s" %
1249                      (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1250
1251
1252     def get_schedulers(self):
1253         schedulers = set(obj for obj in globals().values()
1254                              if type(obj) is type and
1255                                 issubclass(obj, RunQueueScheduler))
1256
1257         user_schedulers = bb.data.getVar("BB_SCHEDULERS", self.cfgData, True)
1258         if user_schedulers:
1259             for sched in user_schedulers.split():
1260                 if not "." in sched:
1261                     bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
1262                     continue
1263
1264                 modname, name = sched.rsplit(".", 1)
1265                 try:
1266                     module = __import__(modname, fromlist=(name,))
1267                 except ImportError as exc:
1268                     logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
1269                     raise SystemExit(1)
1270                 else:
1271                     schedulers.add(getattr(module, name))
1272         return schedulers
1273
1274     def task_completeoutright(self, task):
1275         """
1276         Mark a task as completed
1277         Look at the reverse dependencies and mark any task with
1278         completed dependencies as buildable
1279         """
1280         self.runq_complete[task] = 1
1281         for revdep in self.rqdata.runq_revdeps[task]:
1282             if self.runq_running[revdep] == 1:
1283                 continue
1284             if self.runq_buildable[revdep] == 1:
1285                 continue
1286             alldeps = 1
1287             for dep in self.rqdata.runq_depends[revdep]:
1288                 if self.runq_complete[dep] != 1:
1289                     alldeps = 0
1290             if alldeps == 1:
1291                 self.runq_buildable[revdep] = 1
1292                 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
1293                 taskname = self.rqdata.runq_task[revdep]
1294                 logger.debug(1, "Marking task %s (%s, %s) as buildable", revdep, fn, taskname)
1295
1296     def task_complete(self, task):
1297         self.stats.taskCompleted()
1298         bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1299         self.task_completeoutright(task)
1300
1301     def task_fail(self, task, exitcode):
1302         """
1303         Called when a task has failed
1304         Updates the state engine with the failure
1305         """
1306         self.stats.taskFailed()
1307         fnid = self.rqdata.runq_fnid[task]
1308         self.failed_fnids.append(fnid)
1309         bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq), self.cfgData)
1310         if self.rqdata.taskData.abort:
1311             self.rq.state = runQueueCleanUp
1312
1313     def task_skip(self, task):
1314         self.runq_running[task] = 1
1315         self.runq_buildable[task] = 1
1316         self.task_completeoutright(task)
1317         self.stats.taskCompleted()
1318         self.stats.taskSkipped()
1319
1320     def execute(self):
1321         """
1322         Run the tasks in a queue prepared by rqdata.prepare()
1323         """
1324
1325         if self.stats.total == 0:
1326             # nothing to do
1327             self.rq.state = runQueueCleanUp
1328
1329         task = self.sched.next()
1330         if task is not None:
1331             fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1332             taskname = self.rqdata.runq_task[task]
1333
1334             if task in self.rq.scenequeue_covered:
1335                 logger.debug(2, "Setscene covered task %s (%s)", task,
1336                                 self.rqdata.get_user_idstring(task))
1337                 self.task_skip(task)
1338                 return True
1339
1340             if self.rq.check_stamp_task(task, taskname):
1341                 logger.debug(2, "Stamp current task %s (%s)", task,
1342                                 self.rqdata.get_user_idstring(task))
1343                 self.task_skip(task)
1344                 return True
1345             elif self.cooker.configuration.dry_run:
1346                 self.runq_running[task] = 1
1347                 self.runq_buildable[task] = 1
1348                 self.stats.taskActive()
1349                 self.task_complete(task)
1350                 return True
1351
1352             taskdep = self.rqdata.dataCache.task_deps[fn]
1353             if 'noexec' in taskdep and taskname in taskdep['noexec']:
1354                 startevent = runQueueTaskStarted(task, self.stats, self.rq,
1355                                                  noexec=True)
1356                 bb.event.fire(startevent, self.cfgData)
1357                 self.runq_running[task] = 1
1358                 self.stats.taskActive()
1359                 bb.build.make_stamp(taskname, self.rqdata.dataCache, fn)
1360                 self.task_complete(task)
1361                 return True
1362             else:
1363                 startevent = runQueueTaskStarted(task, self.stats, self.rq)
1364                 bb.event.fire(startevent, self.cfgData)
1365
1366             pid, pipein, pipeout = self.fork_off_task(fn, task, taskname)
1367
1368             self.build_pids[pid] = task
1369             self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
1370             self.build_stamps[pid] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
1371             self.runq_running[task] = 1
1372             self.stats.taskActive()
1373             if self.stats.active < self.number_tasks:
1374                 return True
1375
1376         for pipe in self.build_pipes:
1377             self.build_pipes[pipe].read()
1378
1379         if self.stats.active > 0:
1380             if self.runqueue_process_waitpid() is None:
1381                 return 0.5
1382             return True
1383
1384         if len(self.failed_fnids) != 0:
1385             self.rq.state = runQueueFailed
1386             return True
1387
1388         # Sanity Checks
1389         for task in xrange(self.stats.total):
1390             if self.runq_buildable[task] == 0:
1391                 logger.error("Task %s never buildable!", task)
1392             if self.runq_running[task] == 0:
1393                 logger.error("Task %s never ran!", task)
1394             if self.runq_complete[task] == 0:
1395                 logger.error("Task %s never completed!", task)
1396         self.rq.state = runQueueComplete
1397         return True
1398
1399 class RunQueueExecuteScenequeue(RunQueueExecute):
1400     def __init__(self, rq):
1401         RunQueueExecute.__init__(self, rq)
1402
1403         self.scenequeue_covered = set()
1404         self.scenequeue_notcovered = set()
1405
1406         # If we don't have any setscene functions, skip this step
1407         if len(self.rqdata.runq_setscene) == 0:
1408             rq.scenequeue_covered = set()
1409             rq.state = runQueueRunInit
1410             return
1411
1412         self.stats = RunQueueStats(len(self.rqdata.runq_setscene))
1413
1414         endpoints = {}
1415         sq_revdeps = []
1416         sq_revdeps_new = []
1417         sq_revdeps_squash = []
1418
1419         # We need to construct a dependency graph for the setscene functions. Intermediate
1420         # dependencies between the setscene tasks only complicate the code. This code
1421         # therefore aims to collapse the huge runqueue dependency tree into a smaller one
1422         # only containing the setscene functions.
1423
1424         for task in xrange(self.stats.total):
1425             self.runq_running.append(0)
1426             self.runq_complete.append(0)
1427             self.runq_buildable.append(0)
1428
1429         for task in xrange(len(self.rqdata.runq_fnid)):
1430             sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task]))
1431             sq_revdeps_new.append(set())
1432             if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
1433                 endpoints[task] = None
1434
1435         for task in self.rqdata.runq_setscene:
1436             for dep in self.rqdata.runq_depends[task]:
1437                     endpoints[dep] = task
1438
1439         def process_endpoints(endpoints):
1440             newendpoints = {}
1441             for point, task in endpoints.items():
1442                 tasks = set()
1443                 if task:
1444                     tasks.add(task)
1445                 if sq_revdeps_new[point]:
1446                     tasks |= sq_revdeps_new[point]
1447                 sq_revdeps_new[point] = set()
1448                 for dep in self.rqdata.runq_depends[point]:
1449                     if point in sq_revdeps[dep]:
1450                         sq_revdeps[dep].remove(point)
1451                     if tasks:
1452                         sq_revdeps_new[dep] |= tasks
1453                     if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene:
1454                         newendpoints[dep] = task
1455             if len(newendpoints) != 0:
1456                 process_endpoints(newendpoints)
1457
1458         process_endpoints(endpoints)
1459
1460         for task in xrange(len(self.rqdata.runq_fnid)):
1461             if task in self.rqdata.runq_setscene:
1462                 deps = set()
1463                 for dep in sq_revdeps_new[task]:
1464                     deps.add(self.rqdata.runq_setscene.index(dep))
1465                 sq_revdeps_squash.append(deps)
1466             elif len(sq_revdeps_new[task]) != 0:
1467                 bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
1468
1469         #for task in xrange(len(sq_revdeps_squash)):
1470         #    print "Task %s: %s.%s is %s " % (task, self.taskData.fn_index[self.runq_fnid[self.runq_setscene[task]]], self.runq_task[self.runq_setscene[task]] + "_setscene", sq_revdeps_squash[task])
1471
1472         self.sq_deps = []
1473         self.sq_revdeps = sq_revdeps_squash
1474         self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps)
1475
1476         for task in xrange(len(self.sq_revdeps)):
1477             self.sq_deps.append(set())
1478         for task in xrange(len(self.sq_revdeps)):
1479             for dep in self.sq_revdeps[task]:
1480                 self.sq_deps[dep].add(task)
1481
1482         for task in xrange(len(self.sq_revdeps)):
1483             if len(self.sq_revdeps[task]) == 0:
1484                 self.runq_buildable[task] = 1
1485
1486         if self.rq.hashvalidate:
1487             sq_hash = []
1488             sq_hashfn = []
1489             sq_fn = []
1490             sq_taskname = []
1491             sq_task = []
1492             noexec = []
1493             stamppresent = []
1494             for task in xrange(len(self.sq_revdeps)):
1495                 realtask = self.rqdata.runq_setscene[task]
1496                 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1497                 taskname = self.rqdata.runq_task[realtask]
1498                 taskdep = self.rqdata.dataCache.task_deps[fn]
1499
1500                 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1501                     noexec.append(task)
1502                     self.task_skip(task)
1503                     bb.build.make_stamp(taskname + "_setscene", self.rqdata.dataCache, fn)
1504                     continue
1505
1506                 if self.rq.check_stamp_task(realtask, taskname + "_setscene"):
1507                     logger.debug(2, 'Setscene stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(realtask))
1508                     stamppresent.append(task)
1509                     self.task_skip(task)
1510                     continue
1511
1512                 sq_fn.append(fn)
1513                 sq_hashfn.append(self.rqdata.dataCache.hashfn[fn])
1514                 sq_hash.append(self.rqdata.runq_hash[realtask])
1515                 sq_taskname.append(taskname)
1516                 sq_task.append(task)
1517             call = self.rq.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
1518             locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.configuration.data }
1519             valid = bb.utils.better_eval(call, locs)
1520
1521             valid_new = stamppresent
1522             for v in valid:
1523                 valid_new.append(sq_task[v])
1524
1525             for task in xrange(len(self.sq_revdeps)):
1526                 if task not in valid_new and task not in noexec:
1527                     realtask = self.rqdata.runq_setscene[task]
1528                     logger.debug(2, 'No package found, so skipping setscene task %s',
1529                                  self.rqdata.get_user_idstring(realtask))
1530                     self.task_failoutright(task)
1531
1532         logger.info('Executing SetScene Tasks')
1533
1534         self.rq.state = runQueueSceneRun
1535
1536     def scenequeue_updatecounters(self, task):
1537         for dep in self.sq_deps[task]:
1538             self.sq_revdeps2[dep].remove(task)
1539             if len(self.sq_revdeps2[dep]) == 0:
1540                 self.runq_buildable[dep] = 1
1541
1542     def task_completeoutright(self, task):
1543         """
1544         Mark a task as completed
1545         Look at the reverse dependencies and mark any task with
1546         completed dependencies as buildable
1547         """
1548
1549         index = self.rqdata.runq_setscene[task]
1550         logger.debug(1, 'Found task %s which could be accelerated',
1551                         self.rqdata.get_user_idstring(index))
1552
1553         self.scenequeue_covered.add(task)
1554         self.scenequeue_updatecounters(task)
1555
1556     def task_complete(self, task):
1557         self.stats.taskCompleted()
1558         self.task_completeoutright(task)
1559
1560     def task_fail(self, task, result):
1561         self.stats.taskFailed()
1562         index = self.rqdata.runq_setscene[task]
1563         bb.event.fire(sceneQueueTaskFailed(index, self.stats, result, self), self.cfgData)
1564         self.scenequeue_notcovered.add(task)
1565         self.scenequeue_updatecounters(task)
1566
1567     def task_failoutright(self, task):
1568         self.runq_running[task] = 1
1569         self.runq_buildable[task] = 1
1570         self.stats.taskCompleted()
1571         self.stats.taskSkipped()
1572         index = self.rqdata.runq_setscene[task]
1573         self.scenequeue_notcovered.add(task)
1574         self.scenequeue_updatecounters(task)
1575
1576     def task_skip(self, task):
1577         self.runq_running[task] = 1
1578         self.runq_buildable[task] = 1
1579         self.task_completeoutright(task)
1580         self.stats.taskCompleted()
1581         self.stats.taskSkipped()
1582
1583     def execute(self):
1584         """
1585         Run the tasks in a queue prepared by prepare_runqueue
1586         """
1587
1588         task = None
1589         if self.stats.active < self.number_tasks:
1590             # Find the next setscene to run
1591             for nexttask in xrange(self.stats.total):
1592                 if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
1593                     task = nexttask
1594                     break
1595         if task is not None:
1596             realtask = self.rqdata.runq_setscene[task]
1597             fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1598
1599             taskname = self.rqdata.runq_task[realtask] + "_setscene"
1600             if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask]):
1601                 logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant',
1602                              task, self.rqdata.get_user_idstring(realtask))
1603                 self.task_failoutright(task)
1604                 return True
1605
1606             if self.cooker.configuration.force:
1607                 for target in self.rqdata.target_pairs:
1608                     if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
1609                         self.task_failoutright(task)
1610                         return True
1611
1612             if self.rq.check_stamp_task(realtask, taskname):
1613                 logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies',
1614                              task, self.rqdata.get_user_idstring(realtask))
1615                 self.task_skip(task)
1616                 return True
1617
1618             pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname)
1619
1620             self.build_pids[pid] = task
1621             self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
1622             self.runq_running[task] = 1
1623             self.stats.taskActive()
1624             if self.stats.active < self.number_tasks:
1625                 return True
1626
1627         for pipe in self.build_pipes:
1628             self.build_pipes[pipe].read()
1629
1630         if self.stats.active > 0:
1631             if self.runqueue_process_waitpid() is None:
1632                 return 0.5
1633             return True
1634
1635         # Convert scenequeue_covered task numbers into full taskgraph ids
1636         oldcovered = self.scenequeue_covered
1637         self.rq.scenequeue_covered = set()
1638         for task in oldcovered:
1639             self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
1640
1641         logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered))
1642
1643         self.rq.state = runQueueRunInit
1644         return True
1645
1646     def fork_off_task(self, fn, task, taskname):
1647         return RunQueueExecute.fork_off_task(self, fn, task, taskname, quieterrors=True)
1648
1649 class TaskFailure(Exception):
1650     """
1651     Exception raised when a task in a runqueue fails
1652     """
1653     def __init__(self, x):
1654         self.args = x
1655
1656
1657 class runQueueExitWait(bb.event.Event):
1658     """
1659     Event when waiting for task processes to exit
1660     """
1661
1662     def __init__(self, remain):
1663         self.remain = remain
1664         self.message = "Waiting for %s active tasks to finish" % remain
1665         bb.event.Event.__init__(self)
1666
1667 class runQueueEvent(bb.event.Event):
1668     """
1669     Base runQueue event class
1670     """
1671     def __init__(self, task, stats, rq):
1672         self.taskid = task
1673         self.taskstring = rq.rqdata.get_user_idstring(task)
1674         self.stats = stats.copy()
1675         bb.event.Event.__init__(self)
1676
1677 class runQueueTaskStarted(runQueueEvent):
1678     """
1679     Event notifing a task was started
1680     """
1681     def __init__(self, task, stats, rq, noexec=False):
1682         runQueueEvent.__init__(self, task, stats, rq)
1683         self.noexec = noexec
1684
1685 class runQueueTaskFailed(runQueueEvent):
1686     """
1687     Event notifing a task failed
1688     """
1689     def __init__(self, task, stats, exitcode, rq):
1690         runQueueEvent.__init__(self, task, stats, rq)
1691         self.exitcode = exitcode
1692
1693 class sceneQueueTaskFailed(runQueueTaskFailed):
1694     """
1695     Event notifing a setscene task failed
1696     """
1697     def __init__(self, task, stats, exitcode, rq):
1698         runQueueTaskFailed.__init__(self, task, stats, exitcode, rq)
1699         self.taskstring = rq.rqdata.get_user_idstring(task, "_setscene")
1700
1701 class runQueueTaskCompleted(runQueueEvent):
1702     """
1703     Event notifing a task completed
1704     """
1705
1706 def check_stamp_fn(fn, taskname, d):
1707     rqexe = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)
1708     fn = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", d)
1709     fnid = rqexe.rqdata.taskData.getfn_id(fn)
1710     taskid = rqexe.rqdata.get_task_id(fnid, taskname)
1711     if taskid is not None:
1712         return rqexe.rq.check_stamp_task(taskid)
1713     return None
1714
1715 class runQueuePipe():
1716     """
1717     Abstraction for a pipe between a worker thread and the server
1718     """
1719     def __init__(self, pipein, pipeout, d):
1720         self.input = pipein
1721         pipeout.close()
1722         fcntl.fcntl(self.input, fcntl.F_SETFL, fcntl.fcntl(self.input, fcntl.F_GETFL) | os.O_NONBLOCK)
1723         self.queue = ""
1724         self.d = d
1725
1726     def read(self):
1727         start = len(self.queue)
1728         try:
1729             self.queue = self.queue + self.input.read(102400)
1730         except (OSError, IOError):
1731             pass
1732         end = len(self.queue)
1733         index = self.queue.find("</event>")
1734         while index != -1:
1735             bb.event.fire_from_worker(self.queue[:index+8], self.d)
1736             self.queue = self.queue[index+8:]
1737             index = self.queue.find("</event>")
1738         return (end > start)
1739
1740     def close(self):
1741         while self.read():
1742             continue
1743         if len(self.queue) > 0:
1744             print("Warning, worker left partial message: %s" % self.queue)
1745         self.input.close()