runqueue: pass a copy of the RunQueueStats to events
[bitbake.git] / lib / bb / runqueue.py
1 #!/usr/bin/env python
2 # ex:ts=4:sw=4:sts=4:et
3 # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4 """
5 BitBake 'RunQueue' implementation
6
7 Handles preparation and execution of a queue of tasks
8 """
9
10 # Copyright (C) 2006-2007  Richard Purdie
11 #
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License version 2 as
14 # published by the Free Software Foundation.
15 #
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 # GNU General Public License for more details.
20 #
21 # You should have received a copy of the GNU General Public License along
22 # with this program; if not, write to the Free Software Foundation, Inc.,
23 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25 import copy
26 import os
27 import sys
28 import signal
29 import stat
30 import fcntl
31 import logging
32 import bb
33 from bb import msg, data, event
34
35 bblogger = logging.getLogger("BitBake")
36 logger = logging.getLogger("BitBake.RunQueue")
37
38 class RunQueueStats:
39     """
40     Holds statistics on the tasks handled by the associated runQueue
41     """
42     def __init__(self, total):
43         self.completed = 0
44         self.skipped = 0
45         self.failed = 0
46         self.active = 0
47         self.total = total
48
49     def copy(self):
50         obj = self.__class__(self.total)
51         obj.completed = self.completed
52         obj.skipped = self.skipped
53         obj.failed = self.failed
54         obj.active = self.active
55         return obj
56
57     def taskFailed(self):
58         self.active = self.active - 1
59         self.failed = self.failed + 1
60
61     def taskCompleted(self, number = 1):
62         self.active = self.active - number
63         self.completed = self.completed + number
64
65     def taskSkipped(self, number = 1):
66         self.active = self.active + number
67         self.skipped = self.skipped + number
68
69     def taskActive(self):
70         self.active = self.active + 1
71
72 # These values indicate the next step due to be run in the
73 # runQueue state machine
74 runQueuePrepare = 2
75 runQueueSceneInit = 3
76 runQueueSceneRun = 4
77 runQueueRunInit = 5
78 runQueueRunning = 6
79 runQueueFailed = 7
80 runQueueCleanUp = 8
81 runQueueComplete = 9
82 runQueueChildProcess = 10
83
84 class RunQueueScheduler(object):
85     """
86     Control the order tasks are scheduled in.
87     """
88     name = "basic"
89
90     def __init__(self, runqueue, rqdata):
91         """
92         The default scheduler just returns the first buildable task (the
93         priority map is sorted by task numer)
94         """
95         self.rq = runqueue
96         self.rqdata = rqdata
97         numTasks = len(self.rqdata.runq_fnid)
98
99         self.prio_map = []
100         self.prio_map.extend(range(numTasks))
101
102     def next_buildable_task(self):
103         """
104         Return the id of the first task we find that is buildable
105         """
106         for tasknum in xrange(len(self.rqdata.runq_fnid)):
107             taskid = self.prio_map[tasknum]
108             if self.rq.runq_running[taskid] == 1:
109                 continue
110             if self.rq.runq_buildable[taskid] == 1:
111                 return taskid
112
113     def next(self):
114         """
115         Return the id of the task we should build next
116         """
117         if self.rq.stats.active < self.rq.number_tasks:
118             return self.next_buildable_task()
119
120 class RunQueueSchedulerSpeed(RunQueueScheduler):
121     """
122     A scheduler optimised for speed. The priority map is sorted by task weight,
123     heavier weighted tasks (tasks needed by the most other tasks) are run first.
124     """
125     name = "speed"
126
127     def __init__(self, runqueue, rqdata):
128         """
129         The priority map is sorted by task weight.
130         """
131
132         self.rq = runqueue
133         self.rqdata = rqdata
134
135         sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight))
136         copyweight = copy.deepcopy(self.rqdata.runq_weight)
137         self.prio_map = []
138
139         for weight in sortweight:
140             idx = copyweight.index(weight)
141             self.prio_map.append(idx)
142             copyweight[idx] = -1
143
144         self.prio_map.reverse()
145
146 class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
147     """
148     A scheduler optimised to complete .bb files are quickly as possible. The
149     priority map is sorted by task weight, but then reordered so once a given
150     .bb file starts to build, its completed as quickly as possible. This works
151     well where disk space is at a premium and classes like OE's rm_work are in
152     force.
153     """
154     name = "completion"
155
156     def __init__(self, runqueue, rqdata):
157         RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata)
158
159         #FIXME - whilst this groups all fnids together it does not reorder the
160         #fnid groups optimally.
161
162         basemap = copy.deepcopy(self.prio_map)
163         self.prio_map = []
164         while (len(basemap) > 0):
165             entry = basemap.pop(0)
166             self.prio_map.append(entry)
167             fnid = self.rqdata.runq_fnid[entry]
168             todel = []
169             for entry in basemap:
170                 entry_fnid = self.rqdata.runq_fnid[entry]
171                 if entry_fnid == fnid:
172                     todel.append(basemap.index(entry))
173                     self.prio_map.append(entry)
174             todel.reverse()
175             for idx in todel:
176                 del basemap[idx]
177
178 class RunQueueData:
179     """
180     BitBake Run Queue implementation
181     """
182     def __init__(self, rq, cooker, cfgData, dataCache, taskData, targets):
183         self.cooker = cooker
184         self.dataCache = dataCache
185         self.taskData = taskData
186         self.targets = targets
187         self.rq = rq
188
189         self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or ""
190         self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split()
191
192         self.reset()
193
194     def reset(self):
195         self.runq_fnid = []
196         self.runq_task = []
197         self.runq_depends = []
198         self.runq_revdeps = []
199         self.runq_hash = []
200
201     def runq_depends_names(self, ids):
202         import re
203         ret = []
204         for id in self.runq_depends[ids]:
205             nam = os.path.basename(self.get_user_idstring(id))
206             nam = re.sub("_[^,]*,", ",", nam)
207             ret.extend([nam])
208         return ret
209
210     def get_user_idstring(self, task):
211         fn = self.taskData.fn_index[self.runq_fnid[task]]
212         taskname = self.runq_task[task]
213         return "%s, %s" % (fn, taskname)
214
215     def get_task_id(self, fnid, taskname):
216         for listid in xrange(len(self.runq_fnid)):
217             if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname:
218                 return listid
219         return None
220
221     def circular_depchains_handler(self, tasks):
222         """
223         Some tasks aren't buildable, likely due to circular dependency issues.
224         Identify the circular dependencies and print them in a user readable format.
225         """
226         from copy import deepcopy
227
228         valid_chains = []
229         explored_deps = {}
230         msgs = []
231
232         def chain_reorder(chain):
233             """
234             Reorder a dependency chain so the lowest task id is first
235             """
236             lowest = 0
237             new_chain = []
238             for entry in xrange(len(chain)):
239                 if chain[entry] < chain[lowest]:
240                     lowest = entry
241             new_chain.extend(chain[lowest:])
242             new_chain.extend(chain[:lowest])
243             return new_chain
244
245         def chain_compare_equal(chain1, chain2):
246             """
247             Compare two dependency chains and see if they're the same
248             """
249             if len(chain1) != len(chain2):
250                 return False
251             for index in xrange(len(chain1)):
252                 if chain1[index] != chain2[index]:
253                     return False
254             return True
255
256         def chain_array_contains(chain, chain_array):
257             """
258             Return True if chain_array contains chain
259             """
260             for ch in chain_array:
261                 if chain_compare_equal(ch, chain):
262                     return True
263             return False
264
265         def find_chains(taskid, prev_chain):
266             prev_chain.append(taskid)
267             total_deps = []
268             total_deps.extend(self.runq_revdeps[taskid])
269             for revdep in self.runq_revdeps[taskid]:
270                 if revdep in prev_chain:
271                     idx = prev_chain.index(revdep)
272                     # To prevent duplicates, reorder the chain to start with the lowest taskid
273                     # and search through an array of those we've already printed
274                     chain = prev_chain[idx:]
275                     new_chain = chain_reorder(chain)
276                     if not chain_array_contains(new_chain, valid_chains):
277                         valid_chains.append(new_chain)
278                         msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
279                         for dep in new_chain:
280                             msgs.append("  Task %s (%s) (dependent Tasks %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends_names(dep)))
281                         msgs.append("\n")
282                     if len(valid_chains) > 10:
283                         msgs.append("Aborted dependency loops search after 10 matches.\n")
284                         return msgs
285                     continue
286                 scan = False
287                 if revdep not in explored_deps:
288                     scan = True
289                 elif revdep in explored_deps[revdep]:
290                     scan = True
291                 else:
292                     for dep in prev_chain:
293                         if dep in explored_deps[revdep]:
294                             scan = True
295                 if scan:
296                     find_chains(revdep, copy.deepcopy(prev_chain))
297                 for dep in explored_deps[revdep]:
298                     if dep not in total_deps:
299                         total_deps.append(dep)
300
301             explored_deps[taskid] = total_deps
302
303         for task in tasks:
304             find_chains(task, [])
305
306         return msgs
307
308     def calculate_task_weights(self, endpoints):
309         """
310         Calculate a number representing the "weight" of each task. Heavier weighted tasks
311         have more dependencies and hence should be executed sooner for maximum speed.
312
313         This function also sanity checks the task list finding tasks that are not
314         possible to execute due to circular dependencies.
315         """
316
317         numTasks = len(self.runq_fnid)
318         weight = []
319         deps_left = []
320         task_done = []
321
322         for listid in xrange(numTasks):
323             task_done.append(False)
324             weight.append(0)
325             deps_left.append(len(self.runq_revdeps[listid]))
326
327         for listid in endpoints:
328             weight[listid] = 1
329             task_done[listid] = True
330
331         while True:
332             next_points = []
333             for listid in endpoints:
334                 for revdep in self.runq_depends[listid]:
335                     weight[revdep] = weight[revdep] + weight[listid]
336                     deps_left[revdep] = deps_left[revdep] - 1
337                     if deps_left[revdep] == 0:
338                         next_points.append(revdep)
339                         task_done[revdep] = True
340             endpoints = next_points
341             if len(next_points) == 0:
342                 break
343
344         # Circular dependency sanity check
345         problem_tasks = []
346         for task in xrange(numTasks):
347             if task_done[task] is False or deps_left[task] != 0:
348                 problem_tasks.append(task)
349                 logger.debug(2, "Task %s (%s) is not buildable", task, self.get_user_idstring(task))
350                 logger.debug(2, "(Complete marker was %s and the remaining dependency count was %s)\n", task_done[task], deps_left[task])
351
352         if problem_tasks:
353             message = "Unbuildable tasks were found.\n"
354             message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
355             message = message + "Identifying dependency loops (this may take a short while)...\n"
356             logger.error(message)
357
358             msgs = self.circular_depchains_handler(problem_tasks)
359
360             message = "\n"
361             for msg in msgs:
362                 message = message + msg
363             bb.msg.fatal(bb.msg.domain.RunQueue, message)
364
365         return weight
366
367     def prepare(self):
368         """
369         Turn a set of taskData into a RunQueue and compute data needed
370         to optimise the execution order.
371         """
372
373         runq_build = []
374         recursive_tdepends = {}
375         runq_recrdepends = []
376         tdepends_fnid = {}
377
378         taskData = self.taskData
379
380         if len(taskData.tasks_name) == 0:
381             # Nothing to do
382             return 0
383
384         logger.info("Preparing runqueue")
385
386         # Step A - Work out a list of tasks to run
387         #
388         # Taskdata gives us a list of possible providers for every build and run
389         # target ordered by priority. It also gives information on each of those
390         # providers.
391         #
392         # To create the actual list of tasks to execute we fix the list of
393         # providers and then resolve the dependencies into task IDs. This
394         # process is repeated for each type of dependency (tdepends, deptask,
395         # rdeptast, recrdeptask, idepends).
396
397         def add_build_dependencies(depids, tasknames, depends):
398             for depid in depids:
399                 # Won't be in build_targets if ASSUME_PROVIDED
400                 if depid not in taskData.build_targets:
401                     continue
402                 depdata = taskData.build_targets[depid][0]
403                 if depdata is None:
404                     continue
405                 dep = taskData.fn_index[depdata]
406                 for taskname in tasknames:
407                     taskid = taskData.gettask_id(dep, taskname, False)
408                     if taskid is not None:
409                         depends.append(taskid)
410
411         def add_runtime_dependencies(depids, tasknames, depends):
412             for depid in depids:
413                 if depid not in taskData.run_targets:
414                     continue
415                 depdata = taskData.run_targets[depid][0]
416                 if depdata is None:
417                     continue
418                 dep = taskData.fn_index[depdata]
419                 for taskname in tasknames:
420                     taskid = taskData.gettask_id(dep, taskname, False)
421                     if taskid is not None:
422                         depends.append(taskid)
423
424         for task in xrange(len(taskData.tasks_name)):
425             depends = []
426             recrdepends = []
427             fnid = taskData.tasks_fnid[task]
428             fn = taskData.fn_index[fnid]
429             task_deps = self.dataCache.task_deps[fn]
430
431             logger.debug(2, "Processing %s:%s", fn, taskData.tasks_name[task])
432
433             if fnid not in taskData.failed_fnids:
434
435                 # Resolve task internal dependencies
436                 #
437                 # e.g. addtask before X after Y
438                 depends = taskData.tasks_tdepends[task]
439
440                 # Resolve 'deptask' dependencies
441                 #
442                 # e.g. do_sometask[deptask] = "do_someothertask"
443                 # (makes sure sometask runs after someothertask of all DEPENDS)
444                 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
445                     tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
446                     add_build_dependencies(taskData.depids[fnid], tasknames, depends)
447
448                 # Resolve 'rdeptask' dependencies
449                 #
450                 # e.g. do_sometask[rdeptask] = "do_someothertask"
451                 # (makes sure sometask runs after someothertask of all RDEPENDS)
452                 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
453                     taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
454                     add_runtime_dependencies(taskData.rdepids[fnid], [taskname], depends)
455
456                 # Resolve inter-task dependencies
457                 #
458                 # e.g. do_sometask[depends] = "targetname:do_someothertask"
459                 # (makes sure sometask runs after targetname's someothertask)
460                 if fnid not in tdepends_fnid:
461                     tdepends_fnid[fnid] = set()
462                 idepends = taskData.tasks_idepends[task]
463                 for (depid, idependtask) in idepends:
464                     if depid in taskData.build_targets:
465                         # Won't be in build_targets if ASSUME_PROVIDED
466                         depdata = taskData.build_targets[depid][0]
467                         if depdata is not None:
468                             dep = taskData.fn_index[depdata]
469                             taskid = taskData.gettask_id(dep, idependtask, False)
470                             if taskid is None:
471                                 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s in %s depends upon nonexistant task %s in %s" % (taskData.tasks_name[task], fn, idependtask, dep))
472                             depends.append(taskid)
473                             if depdata != fnid:
474                                 tdepends_fnid[fnid].add(taskid)
475
476
477                 # Resolve recursive 'recrdeptask' dependencies (A)
478                 #
479                 # e.g. do_sometask[recrdeptask] = "do_someothertask"
480                 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
481                 # We cover the recursive part of the dependencies below
482                 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
483                     for taskname in task_deps['recrdeptask'][taskData.tasks_name[task]].split():
484                         recrdepends.append(taskname)
485                         add_build_dependencies(taskData.depids[fnid], [taskname], depends)
486                         add_runtime_dependencies(taskData.rdepids[fnid], [taskname], depends)
487
488                 # Rmove all self references
489                 if task in depends:
490                     newdep = []
491                     logger.debug(2, "Task %s (%s %s) contains self reference! %s", task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends)
492                     for dep in depends:
493                         if task != dep:
494                             newdep.append(dep)
495                     depends = newdep
496
497             self.runq_fnid.append(taskData.tasks_fnid[task])
498             self.runq_task.append(taskData.tasks_name[task])
499             self.runq_depends.append(set(depends))
500             self.runq_revdeps.append(set())
501             self.runq_hash.append("")
502
503             runq_build.append(0)
504             runq_recrdepends.append(recrdepends)
505
506         #
507         # Build a list of recursive cumulative dependencies for each fnid
508         # We do this by fnid, since if A depends on some task in B
509         # we're interested in later tasks B's fnid might have but B itself
510         # doesn't depend on
511         #
512         # Algorithm is O(tasks) + O(tasks)*O(fnids)
513         #
514         reccumdepends = {}
515         for task in xrange(len(self.runq_fnid)):
516             fnid = self.runq_fnid[task]
517             if fnid not in reccumdepends:
518                 if fnid in tdepends_fnid:
519                     reccumdepends[fnid] = tdepends_fnid[fnid]
520                 else:
521                     reccumdepends[fnid] = set()
522             reccumdepends[fnid].update(self.runq_depends[task])
523         for task in xrange(len(self.runq_fnid)):
524             taskfnid = self.runq_fnid[task]
525             for fnid in reccumdepends:
526                 if task in reccumdepends[fnid]:
527                     reccumdepends[fnid].add(task)
528                     if taskfnid in reccumdepends:
529                         reccumdepends[fnid].update(reccumdepends[taskfnid])
530
531
532         # Resolve recursive 'recrdeptask' dependencies (B)
533         #
534         # e.g. do_sometask[recrdeptask] = "do_someothertask"
535         # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
536         for task in xrange(len(self.runq_fnid)):
537             if len(runq_recrdepends[task]) > 0:
538                 taskfnid = self.runq_fnid[task]
539                 for dep in reccumdepends[taskfnid]:
540                     # Ignore self references
541                     if dep == task:
542                         continue
543                     for taskname in runq_recrdepends[task]:
544                         if taskData.tasks_name[dep] == taskname:
545                             self.runq_depends[task].add(dep)
546
547         # Step B - Mark all active tasks
548         #
549         # Start with the tasks we were asked to run and mark all dependencies
550         # as active too. If the task is to be 'forced', clear its stamp. Once
551         # all active tasks are marked, prune the ones we don't need.
552
553         logger.verbose("Marking Active Tasks")
554
555         def mark_active(listid, depth):
556             """
557             Mark an item as active along with its depends
558             (calls itself recursively)
559             """
560
561             if runq_build[listid] == 1:
562                 return
563
564             runq_build[listid] = 1
565
566             depends = self.runq_depends[listid]
567             for depend in depends:
568                 mark_active(depend, depth+1)
569
570         self.target_pairs = []
571         for target in self.targets:
572             targetid = taskData.getbuild_id(target[0])
573
574             if targetid not in taskData.build_targets:
575                 continue
576
577             if targetid in taskData.failed_deps:
578                 continue
579
580             fnid = taskData.build_targets[targetid][0]
581             fn = taskData.fn_index[fnid]
582             self.target_pairs.append((fn, target[1]))
583
584             if fnid in taskData.failed_fnids:
585                 continue
586
587             if target[1] not in taskData.tasks_lookup[fnid]:
588                 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s does not exist for target %s" % (target[1], target[0]))
589
590             listid = taskData.tasks_lookup[fnid][target[1]]
591
592             mark_active(listid, 1)
593
594         # Step C - Prune all inactive tasks
595         #
596         # Once all active tasks are marked, prune the ones we don't need.
597
598         maps = []
599         delcount = 0
600         for listid in xrange(len(self.runq_fnid)):
601             if runq_build[listid-delcount] == 1:
602                 maps.append(listid-delcount)
603             else:
604                 del self.runq_fnid[listid-delcount]
605                 del self.runq_task[listid-delcount]
606                 del self.runq_depends[listid-delcount]
607                 del runq_build[listid-delcount]
608                 del self.runq_revdeps[listid-delcount]
609                 del self.runq_hash[listid-delcount]
610                 delcount = delcount + 1
611                 maps.append(-1)
612
613         #
614         # Step D - Sanity checks and computation
615         #
616
617         # Check to make sure we still have tasks to run
618         if len(self.runq_fnid) == 0:
619             if not taskData.abort:
620                 bb.msg.fatal(bb.msg.domain.RunQueue, "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
621             else:
622                 bb.msg.fatal(bb.msg.domain.RunQueue, "No active tasks and not in --continue mode?! Please report this bug.")
623
624         logger.verbose("Pruned %s inactive tasks, %s left", delcount, len(self.runq_fnid))
625
626         # Remap the dependencies to account for the deleted tasks
627         # Check we didn't delete a task we depend on
628         for listid in xrange(len(self.runq_fnid)):
629             newdeps = []
630             origdeps = self.runq_depends[listid]
631             for origdep in origdeps:
632                 if maps[origdep] == -1:
633                     bb.msg.fatal(bb.msg.domain.RunQueue, "Invalid mapping - Should never happen!")
634                 newdeps.append(maps[origdep])
635             self.runq_depends[listid] = set(newdeps)
636
637         logger.verbose("Assign Weightings")
638
639         # Generate a list of reverse dependencies to ease future calculations
640         for listid in xrange(len(self.runq_fnid)):
641             for dep in self.runq_depends[listid]:
642                 self.runq_revdeps[dep].add(listid)
643
644         # Identify tasks at the end of dependency chains
645         # Error on circular dependency loops (length two)
646         endpoints = []
647         for listid in xrange(len(self.runq_fnid)):
648             revdeps = self.runq_revdeps[listid]
649             if len(revdeps) == 0:
650                 endpoints.append(listid)
651             for dep in revdeps:
652                 if dep in self.runq_depends[listid]:
653                     #self.dump_data(taskData)
654                     bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep], taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
655
656         logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
657
658         # Calculate task weights
659         # Check of higher length circular dependencies
660         self.runq_weight = self.calculate_task_weights(endpoints)
661
662         # Sanity Check - Check for multiple tasks building the same provider
663         prov_list = {}
664         seen_fn = []
665         for task in xrange(len(self.runq_fnid)):
666             fn = taskData.fn_index[self.runq_fnid[task]]
667             if fn in seen_fn:
668                 continue
669             seen_fn.append(fn)
670             for prov in self.dataCache.fn_provides[fn]:
671                 if prov not in prov_list:
672                     prov_list[prov] = [fn]
673                 elif fn not in prov_list[prov]:
674                     prov_list[prov].append(fn)
675         error = False
676         for prov in prov_list:
677             if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
678                 error = True
679                 logger.error("Multiple .bb files are due to be built which each provide %s (%s).\n This usually means one provides something the other doesn't and should.", prov, " ".join(prov_list[prov]))
680
681
682         # Create a whitelist usable by the stamp checks
683         stampfnwhitelist = []
684         for entry in self.stampwhitelist.split():
685             entryid = self.taskData.getbuild_id(entry)
686             if entryid not in self.taskData.build_targets:
687                 continue
688             fnid = self.taskData.build_targets[entryid][0]
689             fn = self.taskData.fn_index[fnid]
690             stampfnwhitelist.append(fn)
691         self.stampfnwhitelist = stampfnwhitelist
692
693         # Interate over the task list looking for tasks with a 'setscene' function
694         self.runq_setscene = []
695         for task in range(len(self.runq_fnid)):
696             setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
697             if not setscene:
698                 continue
699             self.runq_setscene.append(task)
700
701         # Interate over the task list and call into the siggen code
702         dealtwith = set()
703         todeal = set(range(len(self.runq_fnid)))
704         while len(todeal) > 0:
705             for task in todeal.copy():
706                 if len(self.runq_depends[task] - dealtwith) == 0:
707                     dealtwith.add(task)
708                     todeal.remove(task)
709                     procdep = []
710                     for dep in self.runq_depends[task]:
711                         procdep.append(self.taskData.fn_index[self.runq_fnid[dep]] + "." + self.runq_task[dep])
712                     self.runq_hash[task] = bb.parse.siggen.get_taskhash(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task], procdep, self.dataCache)
713
714         self.hashes = {}
715         self.hash_deps = {}
716         for task in xrange(len(self.runq_fnid)):
717             identifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[task]],
718                                     self.runq_task[task])
719             self.hashes[identifier] = self.runq_hash[task]
720             deps = []
721             for dep in self.runq_depends[task]:
722                 depidentifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[dep]],
723                                            self.runq_task[dep])
724                 deps.append(depidentifier)
725             self.hash_deps[identifier] = deps
726
727         # Remove stamps for targets if force mode active
728         if self.cooker.configuration.force:
729             for (fn, target) in self.target_pairs:
730                 logger.verbose("Remove stamp %s, %s", target, fn)
731                 bb.build.del_stamp(target, self.dataCache, fn)
732
733         return len(self.runq_fnid)
734
735     def dump_data(self, taskQueue):
736         """
737         Dump some debug information on the internal data structures
738         """
739         logger.debug(3, "run_tasks:")
740         for task in xrange(len(self.rqdata.runq_task)):
741             logger.debug(3, " (%s)%s - %s: %s   Deps %s RevDeps %s", task,
742                          taskQueue.fn_index[self.rqdata.runq_fnid[task]],
743                          self.rqdata.runq_task[task],
744                          self.rqdata.runq_weight[task],
745                          self.rqdata.runq_depends[task],
746                          self.rqdata.runq_revdeps[task])
747
748         logger.debug(3, "sorted_tasks:")
749         for task1 in xrange(len(self.rqdata.runq_task)):
750             if task1 in self.prio_map:
751                 task = self.prio_map[task1]
752                 logger.debug(3, " (%s)%s - %s: %s   Deps %s RevDeps %s", task,
753                            taskQueue.fn_index[self.rqdata.runq_fnid[task]],
754                            self.rqdata.runq_task[task],
755                            self.rqdata.runq_weight[task],
756                            self.rqdata.runq_depends[task],
757                            self.rqdata.runq_revdeps[task])
758
759 class RunQueue:
760     def __init__(self, cooker, cfgData, dataCache, taskData, targets):
761
762         self.cooker = cooker
763         self.cfgData = cfgData
764         self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets)
765
766         self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, True) or "perfile"
767         self.hashvalidate = bb.data.getVar("BB_HASHCHECK_FUNCTION", cfgData, True) or None
768
769         self.state = runQueuePrepare
770
771     def check_stamps(self):
772         unchecked = {}
773         current = []
774         notcurrent = []
775         buildable = []
776
777         if self.stamppolicy == "perfile":
778             fulldeptree = False
779         else:
780             fulldeptree = True
781             stampwhitelist = []
782             if self.stamppolicy == "whitelist":
783                 stampwhitelist = self.rqdata.stampfnwhitelist
784
785         for task in xrange(len(self.rqdata.runq_fnid)):
786             unchecked[task] = ""
787             if len(self.rqdata.runq_depends[task]) == 0:
788                 buildable.append(task)
789
790         def check_buildable(self, task, buildable):
791             for revdep in self.rqdata.runq_revdeps[task]:
792                 alldeps = 1
793                 for dep in self.rqdata.runq_depends[revdep]:
794                     if dep in unchecked:
795                         alldeps = 0
796                 if alldeps == 1:
797                     if revdep in unchecked:
798                         buildable.append(revdep)
799
800         for task in xrange(len(self.rqdata.runq_fnid)):
801             if task not in unchecked:
802                 continue
803             fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
804             taskname = self.rqdata.runq_task[task]
805             stampfile = bb.parse.siggen.stampfile(self.rqdata.dataCache.stamp[fn], fn, taskname)
806             # If the stamp is missing its not current
807             if not os.access(stampfile, os.F_OK):
808                 del unchecked[task]
809                 notcurrent.append(task)
810                 check_buildable(self, task, buildable)
811                 continue
812             # If its a 'nostamp' task, it's not current
813             taskdep = self.rqdata.dataCache.task_deps[fn]
814             if 'nostamp' in taskdep and task in taskdep['nostamp']:
815                 del unchecked[task]
816                 notcurrent.append(task)
817                 check_buildable(self, task, buildable)
818                 continue
819
820         while (len(buildable) > 0):
821             nextbuildable = []
822             for task in buildable:
823                 if task in unchecked:
824                     fn = self.taskData.fn_index[self.rqdata.runq_fnid[task]]
825                     taskname = self.rqdata.runq_task[task]
826                     stampfile = bb.parse.siggen.stampfile(self.rqdata.dataCache.stamp[fn], fn, taskname)
827                     iscurrent = True
828
829                     t1 = os.stat(stampfile)[stat.ST_MTIME]
830                     for dep in self.rqdata.runq_depends[task]:
831                         if iscurrent:
832                             fn2 = self.taskData.fn_index[self.rqdata.runq_fnid[dep]]
833                             taskname2 = self.rqdata.runq_task[dep]
834                             stampfile2 = bb.parse.siggen.stampfile(self.rqdata.dataCache.stamp[fn2], fn2, taskname2)
835                             if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
836                                 if dep in notcurrent:
837                                     iscurrent = False
838                                 else:
839                                     t2 = os.stat(stampfile2)[stat.ST_MTIME]
840                                     if t1 < t2:
841                                         iscurrent = False
842                     del unchecked[task]
843                     if iscurrent:
844                         current.append(task)
845                     else:
846                         notcurrent.append(task)
847
848                 check_buildable(self, task, nextbuildable)
849
850             buildable = nextbuildable
851
852         #for task in range(len(self.runq_fnid)):
853         #    fn = self.taskData.fn_index[self.runq_fnid[task]]
854         #    taskname = self.runq_task[task]
855         #    print "%s %s.%s" % (task, taskname, fn)
856
857         #print "Unchecked: %s" % unchecked
858         #print "Current: %s" % current
859         #print "Not current: %s" % notcurrent
860
861         if len(unchecked) > 0:
862             bb.msg.fatal(bb.msg.domain.RunQueue, "check_stamps fatal internal error")
863         return current
864
865     def check_stamp_task(self, task, taskname = None):
866         def get_timestamp(f):
867             try:
868                 if not os.access(f, os.F_OK):
869                     return None
870                 return os.stat(f)[stat.ST_MTIME]
871             except:
872                 return None
873
874         if self.stamppolicy == "perfile":
875             fulldeptree = False
876         else:
877             fulldeptree = True
878             stampwhitelist = []
879             if self.stamppolicy == "whitelist":
880                 stampwhitelist = self.rqdata.stampfnwhitelist
881
882         fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
883         if taskname is None:
884             taskname = self.rqdata.runq_task[task]
885
886         stampfile = bb.parse.siggen.stampfile(self.rqdata.dataCache.stamp[fn], fn, taskname)
887
888         # If the stamp is missing its not current
889         if not os.access(stampfile, os.F_OK):
890             logger.debug(2, "Stampfile %s not available", stampfile)
891             return False
892         # If its a 'nostamp' task, it's not current
893         taskdep = self.rqdata.dataCache.task_deps[fn]
894         if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
895             logger.debug(2, "%s.%s is nostamp\n", fn, taskname)
896             return False
897
898         if taskname != "do_setscene" and taskname.endswith("_setscene"):
899             return True
900
901         iscurrent = True
902         t1 = get_timestamp(stampfile)
903         for dep in self.rqdata.runq_depends[task]:
904             if iscurrent:
905                 fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]]
906                 taskname2 = self.rqdata.runq_task[dep]
907                 stampfile2 = bb.parse.siggen.stampfile(self.rqdata.dataCache.stamp[fn2], fn2, taskname2)
908                 stampfile3 = bb.parse.siggen.stampfile(self.rqdata.dataCache.stamp[fn2], fn2, taskname2 + "_setscene")
909                 t2 = get_timestamp(stampfile2)
910                 t3 = get_timestamp(stampfile3)
911                 if t3 and t3 > t2:
912                    continue
913                 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
914                     if not t2:
915                         logger.debug(2, 'Stampfile %s does not exist', stampfile2)
916                         iscurrent = False
917                     if t1 < t2:
918                         logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2)
919                         iscurrent = False
920
921         return iscurrent
922
923     def execute_runqueue(self):
924         """
925         Run the tasks in a queue prepared by rqdata.prepare()
926         Upon failure, optionally try to recover the build using any alternate providers
927         (if the abort on failure configuration option isn't set)
928         """
929
930         retval = 0.5
931
932         if self.state is runQueuePrepare:
933             self.rqexe = RunQueueExecuteDummy(self)
934             if self.rqdata.prepare() is 0:
935                 self.state = runQueueComplete
936             else:
937                 self.state = runQueueSceneInit
938
939         if self.state is runQueueSceneInit:
940             if self.cooker.configuration.dump_signatures:
941                 self.dump_signatures()
942             else:
943                 self.rqexe = RunQueueExecuteScenequeue(self)
944
945         if self.state is runQueueSceneRun:
946             retval = self.rqexe.execute()
947
948         if self.state is runQueueRunInit:
949             logger.info("Executing RunQueue Tasks")
950             self.rqexe = RunQueueExecuteTasks(self)
951             self.state = runQueueRunning
952
953         if self.state is runQueueRunning:
954             retval = self.rqexe.execute()
955
956         if self.state is runQueueCleanUp:
957            self.rqexe.finish()
958
959         if self.state is runQueueFailed:
960             if not self.rqdata.taskData.tryaltconfigs:
961                 raise bb.runqueue.TaskFailure(self.rqexe.failed_fnids)
962             for fnid in self.rqexe.failed_fnids:
963                 self.rqdata.taskData.fail_fnid(fnid)
964             self.rqdata.reset()
965
966         if self.state is runQueueComplete:
967             # All done
968             logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
969             return False
970
971         if self.state is runQueueChildProcess:
972             print("Child process, eeek, shouldn't happen!")
973             return False
974
975         # Loop
976         return retval
977
978     def finish_runqueue(self, now = False):
979         if now:
980             self.rqexe.finish_now()
981         else:
982             self.rqexe.finish()
983
984     def dump_signatures(self):
985         self.state = runQueueComplete
986         done = set()
987         bb.note("Reparsing files to collect dependency data")
988         for task in range(len(self.rqdata.runq_fnid)):
989             if self.rqdata.runq_fnid[task] not in done:
990                 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
991                 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
992                 done.add(self.rqdata.runq_fnid[task])
993
994         bb.parse.siggen.dump_sigs(self.rqdata.dataCache)
995
996         return
997
998
999 class RunQueueExecute:
1000
1001     def __init__(self, rq):
1002         self.rq = rq
1003         self.cooker = rq.cooker
1004         self.cfgData = rq.cfgData
1005         self.rqdata = rq.rqdata
1006
1007         self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", self.cfgData, 1) or 1)
1008         self.scheduler = bb.data.getVar("BB_SCHEDULER", self.cfgData, 1) or "speed"
1009
1010         self.runq_buildable = []
1011         self.runq_running = []
1012         self.runq_complete = []
1013         self.build_pids = {}
1014         self.build_pipes = {}
1015         self.failed_fnids = []
1016
1017     def runqueue_process_waitpid(self):
1018         """
1019         Return none is there are no processes awaiting result collection, otherwise
1020         collect the process exit codes and close the information pipe.
1021         """
1022         result = os.waitpid(-1, os.WNOHANG)
1023         if result[0] is 0 and result[1] is 0:
1024             return None
1025         task = self.build_pids[result[0]]
1026         del self.build_pids[result[0]]
1027         self.build_pipes[result[0]].close()
1028         del self.build_pipes[result[0]]
1029         if result[1] != 0:
1030             self.task_fail(task, result[1]>>8)
1031         else:
1032             self.task_complete(task)
1033
1034     def finish_now(self):
1035         if self.stats.active:
1036             logger.info("Sending SIGTERM to remaining %s tasks", self.stats.active)
1037             for k, v in self.build_pids.iteritems():
1038                 try:
1039                     os.kill(-k, signal.SIGTERM)
1040                 except:
1041                     pass
1042         for pipe in self.build_pipes:
1043             self.build_pipes[pipe].read()
1044
1045     def finish(self):
1046         self.rq.state = runQueueCleanUp
1047
1048         for pipe in self.build_pipes:
1049             self.build_pipes[pipe].read()
1050
1051         if self.stats.active > 0:
1052             bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1053             self.runqueue_process_waitpid()
1054             return
1055
1056         if len(self.failed_fnids) != 0:
1057             self.rq.state = runQueueFailed
1058             return
1059
1060         self.rq.state = runQueueComplete
1061         return
1062
1063     def fork_off_task(self, fn, task, taskname, quieterrors=False):
1064         sys.stdout.flush()
1065         sys.stderr.flush()
1066         try:
1067             pipein, pipeout = os.pipe()
1068             pipein = os.fdopen(pipein, 'rb', 4096)
1069             pipeout = os.fdopen(pipeout, 'wb', 0)
1070             pid = os.fork()
1071         except OSError as e:
1072             bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
1073         if pid == 0:
1074             pipein.close()
1075
1076             # Save out the PID so that the event can include it the
1077             # events
1078             bb.event.worker_pid = os.getpid()
1079             bb.event.worker_pipe = pipeout
1080
1081             self.rq.state = runQueueChildProcess
1082             # Make the child the process group leader
1083             os.setpgid(0, 0)
1084             # No stdin
1085             newsi = os.open(os.devnull, os.O_RDWR)
1086             os.dup2(newsi, sys.stdin.fileno())
1087
1088             bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
1089             bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data)
1090             bb.parse.siggen.set_taskdata(self.rqdata.hashes, self.rqdata.hash_deps)
1091             try:
1092                 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
1093                 the_data.setVar('BB_TASKHASH', self.rqdata.runq_hash[task])
1094                 os.environ.update(bb.data.exported_vars(the_data))
1095                 bb.build.exec_task(fn, taskname, the_data)
1096             except Exception as exc:
1097                 if not quieterrors:
1098                     logger.critical(str(exc))
1099                 os._exit(1)
1100             os._exit(0)
1101         return pid, pipein, pipeout
1102
1103 class RunQueueExecuteDummy(RunQueueExecute):
1104     def __init__(self, rq):
1105         self.rq = rq
1106         self.stats = RunQueueStats(0)
1107
1108     def finish(self):
1109         self.rq.state = runQueueComplete
1110         return
1111
1112 class RunQueueExecuteTasks(RunQueueExecute):
1113     def __init__(self, rq):
1114         RunQueueExecute.__init__(self, rq)
1115
1116         self.stats = RunQueueStats(len(self.rqdata.runq_fnid))
1117
1118         # Mark initial buildable tasks
1119         for task in xrange(self.stats.total):
1120             self.runq_running.append(0)
1121             self.runq_complete.append(0)
1122             if len(self.rqdata.runq_depends[task]) == 0:
1123                 self.runq_buildable.append(1)
1124             else:
1125                 self.runq_buildable.append(0)
1126             if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
1127                 self.rq.scenequeue_covered.add(task)
1128
1129         found = True
1130         while found:
1131             found = False
1132             for task in xrange(self.stats.total):
1133                 if task in self.rq.scenequeue_covered:
1134                     continue
1135                 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
1136                     self.rq.scenequeue_covered.add(task)
1137                     found = True
1138
1139         logger.debug(1, 'Full skip list %s', self.rq.scenequeue_covered)
1140
1141         for task in self.rq.scenequeue_covered:
1142             self.task_skip(task)
1143
1144         event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
1145
1146         schedulers = self.get_schedulers()
1147         for scheduler in schedulers:
1148             if self.scheduler == scheduler.name:
1149                 self.sched = scheduler(self, self.rqdata)
1150                 logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name)
1151                 break
1152         else:
1153             bb.fatal("Invalid scheduler '%s'.  Available schedulers: %s" %
1154                      (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1155
1156
1157     def get_schedulers(self):
1158         schedulers = set(obj for obj in globals().values()
1159                              if type(obj) is type and
1160                                 issubclass(obj, RunQueueScheduler))
1161
1162         user_schedulers = bb.data.getVar("BB_SCHEDULERS", self.cfgData, True)
1163         if user_schedulers:
1164             for sched in user_schedulers.split():
1165                 if not "." in sched:
1166                     bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
1167                     continue
1168
1169                 modname, name = sched.rsplit(".", 1)
1170                 try:
1171                     module = __import__(modname, fromlist=(name,))
1172                 except ImportError, exc:
1173                     logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
1174                     raise SystemExit(1)
1175                 else:
1176                     schedulers.add(getattr(module, name))
1177         return schedulers
1178
1179     def task_completeoutright(self, task):
1180         """
1181         Mark a task as completed
1182         Look at the reverse dependencies and mark any task with
1183         completed dependencies as buildable
1184         """
1185         self.runq_complete[task] = 1
1186         for revdep in self.rqdata.runq_revdeps[task]:
1187             if self.runq_running[revdep] == 1:
1188                 continue
1189             if self.runq_buildable[revdep] == 1:
1190                 continue
1191             alldeps = 1
1192             for dep in self.rqdata.runq_depends[revdep]:
1193                 if self.runq_complete[dep] != 1:
1194                     alldeps = 0
1195             if alldeps == 1:
1196                 self.runq_buildable[revdep] = 1
1197                 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
1198                 taskname = self.rqdata.runq_task[revdep]
1199                 logger.debug(1, "Marking task %s (%s, %s) as buildable", revdep, fn, taskname)
1200
1201     def task_complete(self, task):
1202         self.stats.taskCompleted()
1203         bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1204         self.task_completeoutright(task)
1205
1206     def task_fail(self, task, exitcode):
1207         """
1208         Called when a task has failed
1209         Updates the state engine with the failure
1210         """
1211         self.stats.taskFailed()
1212         fnid = self.rqdata.runq_fnid[task]
1213         self.failed_fnids.append(fnid)
1214         bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq), self.cfgData)
1215         if self.rqdata.taskData.abort:
1216             self.rq.state = runQueueCleanUp
1217
1218     def task_skip(self, task):
1219         self.runq_running[task] = 1
1220         self.runq_buildable[task] = 1
1221         self.task_completeoutright(task)
1222         self.stats.taskCompleted()
1223         self.stats.taskSkipped()
1224
1225     def execute(self):
1226         """
1227         Run the tasks in a queue prepared by rqdata.prepare()
1228         """
1229
1230         if self.stats.total == 0:
1231             # nothing to do
1232             self.rq.state = runQueueCleanUp
1233
1234         task = self.sched.next()
1235         if task is not None:
1236             fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1237
1238             taskname = self.rqdata.runq_task[task]
1239             if self.rq.check_stamp_task(task, taskname):
1240                 logger.debug(2, "Stamp current task %s (%s)", task,
1241                                 self.rqdata.get_user_idstring(task))
1242                 self.task_skip(task)
1243                 return True
1244             elif self.cooker.configuration.dry_run:
1245                 self.runq_running[task] = 1
1246                 self.runq_buildable[task] = 1
1247                 self.stats.taskActive()
1248                 self.task_complete(task)
1249                 return True
1250
1251             taskdep = self.rqdata.dataCache.task_deps[fn]
1252             if 'noexec' in taskdep and taskname in taskdep['noexec']:
1253                 startevent = runQueueTaskStarted(task, self.stats, self.rq,
1254                                                  noexec=True)
1255                 bb.event.fire(startevent, self.cfgData)
1256                 self.runq_running[task] = 1
1257                 self.stats.taskActive()
1258                 bb.build.make_stamp(taskname, self.rqdata.dataCache, fn)
1259                 self.task_complete(task)
1260                 return True
1261             else:
1262                 startevent = runQueueTaskStarted(task, self.stats, self.rq)
1263                 bb.event.fire(startevent, self.cfgData)
1264
1265             pid, pipein, pipeout = self.fork_off_task(fn, task, taskname)
1266
1267             self.build_pids[pid] = task
1268             self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
1269             self.runq_running[task] = 1
1270             self.stats.taskActive()
1271
1272         for pipe in self.build_pipes:
1273             self.build_pipes[pipe].read()
1274
1275         if self.stats.active > 0:
1276             if self.runqueue_process_waitpid() is None:
1277                 return 0.5
1278             return True
1279
1280         if len(self.failed_fnids) != 0:
1281             self.rq.state = runQueueFailed
1282             return True
1283
1284         # Sanity Checks
1285         for task in xrange(self.stats.total):
1286             if self.runq_buildable[task] == 0:
1287                 logger.error("Task %s never buildable!", task)
1288             if self.runq_running[task] == 0:
1289                 logger.error("Task %s never ran!", task)
1290             if self.runq_complete[task] == 0:
1291                 logger.error("Task %s never completed!", task)
1292         self.rq.state = runQueueComplete
1293         return True
1294
1295 class RunQueueExecuteScenequeue(RunQueueExecute):
1296     def __init__(self, rq):
1297         RunQueueExecute.__init__(self, rq)
1298
1299         self.scenequeue_covered = set()
1300         self.scenequeue_notcovered = set()
1301
1302         # If we don't have any setscene functions, skip this step
1303         if len(self.rqdata.runq_setscene) == 0:
1304             rq.scenequeue_covered = set()
1305             rq.state = runQueueRunInit
1306             return
1307
1308         self.stats = RunQueueStats(len(self.rqdata.runq_setscene))
1309
1310         endpoints = {}
1311         sq_revdeps = []
1312         sq_revdeps_new = []
1313         sq_revdeps_squash = []
1314
1315         # We need to construct a dependency graph for the setscene functions. Intermediate
1316         # dependencies between the setscene tasks only complicate the code. This code
1317         # therefore aims to collapse the huge runqueue dependency tree into a smaller one
1318         # only containing the setscene functions.
1319
1320         for task in xrange(self.stats.total):
1321             self.runq_running.append(0)
1322             self.runq_complete.append(0)
1323             self.runq_buildable.append(0)
1324
1325         for task in xrange(len(self.rqdata.runq_fnid)):
1326             sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task]))
1327             sq_revdeps_new.append(set())
1328             if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
1329                 endpoints[task] = None
1330
1331         for task in self.rqdata.runq_setscene:
1332             for dep in self.rqdata.runq_depends[task]:
1333                     endpoints[dep] = task
1334
1335         def process_endpoints(endpoints):
1336             newendpoints = {}
1337             for point, task in endpoints.items():
1338                 tasks = set()
1339                 if task:
1340                     tasks.add(task)
1341                 if sq_revdeps_new[point]:
1342                     tasks |= sq_revdeps_new[point]
1343                 sq_revdeps_new[point] = set()
1344                 for dep in self.rqdata.runq_depends[point]:
1345                     if point in sq_revdeps[dep]:
1346                         sq_revdeps[dep].remove(point)
1347                     if tasks:
1348                         sq_revdeps_new[dep] |= tasks
1349                     if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene:
1350                         newendpoints[dep] = task
1351             if len(newendpoints) != 0:
1352                 process_endpoints(newendpoints)
1353
1354         process_endpoints(endpoints)
1355
1356         for task in xrange(len(self.rqdata.runq_fnid)):
1357             if task in self.rqdata.runq_setscene:
1358                 deps = set()
1359                 for dep in sq_revdeps_new[task]:
1360                     deps.add(self.rqdata.runq_setscene.index(dep))
1361                 sq_revdeps_squash.append(deps)
1362             elif len(sq_revdeps_new[task]) != 0:
1363                 bb.msg.fatal(bb.msg.domain.RunQueue, "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
1364
1365         #for task in xrange(len(sq_revdeps_squash)):
1366         #    print "Task %s: %s.%s is %s " % (task, self.taskData.fn_index[self.runq_fnid[self.runq_setscene[task]]], self.runq_task[self.runq_setscene[task]] + "_setscene", sq_revdeps_squash[task])
1367
1368         self.sq_deps = []
1369         self.sq_revdeps = sq_revdeps_squash
1370         self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps)
1371
1372         for task in xrange(len(self.sq_revdeps)):
1373             self.sq_deps.append(set())
1374         for task in xrange(len(self.sq_revdeps)):
1375             for dep in self.sq_revdeps[task]:
1376                 self.sq_deps[dep].add(task)
1377
1378         for task in xrange(len(self.sq_revdeps)):
1379             if len(self.sq_revdeps[task]) == 0:
1380                 self.runq_buildable[task] = 1
1381
1382         if self.rq.hashvalidate:
1383             sq_hash = []
1384             sq_hashfn = []
1385             sq_fn = []
1386             sq_taskname = []
1387             sq_task = []
1388             noexec = []
1389             for task in xrange(len(self.sq_revdeps)):
1390                 realtask = self.rqdata.runq_setscene[task]
1391                 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1392                 taskname = self.rqdata.runq_task[realtask]
1393                 taskdep = self.rqdata.dataCache.task_deps[fn]
1394                 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1395                     noexec.append(task)
1396                     self.task_skip(task)
1397                     bb.build.make_stamp(taskname + "_setscene", self.rqdata.dataCache, fn)
1398                     continue
1399                 sq_fn.append(fn)
1400                 sq_hashfn.append(self.rqdata.dataCache.hashfn[fn])
1401                 sq_hash.append(self.rqdata.runq_hash[realtask])
1402                 sq_taskname.append(taskname)
1403                 sq_task.append(task)
1404             call = self.rq.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
1405             locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.configuration.data }
1406             valid = bb.utils.better_eval(call, locs)
1407
1408             valid_new = []
1409             for v in valid:
1410                 valid_new.append(sq_task[v])
1411
1412             for task in xrange(len(self.sq_revdeps)):
1413                 if task not in valid_new and task not in noexec:
1414                     logger.debug(2, 'No package found, so skipping setscene task %s',
1415                                  self.rqdata.get_user_idstring(task))
1416                     self.task_failoutright(task)
1417
1418         logger.info('Executing SetScene Tasks')
1419
1420         self.rq.state = runQueueSceneRun
1421
1422     def scenequeue_updatecounters(self, task):
1423         for dep in self.sq_deps[task]:
1424             self.sq_revdeps2[dep].remove(task)
1425             if len(self.sq_revdeps2[dep]) == 0:
1426                 self.runq_buildable[dep] = 1
1427
1428     def task_completeoutright(self, task):
1429         """
1430         Mark a task as completed
1431         Look at the reverse dependencies and mark any task with
1432         completed dependencies as buildable
1433         """
1434
1435         index = self.rqdata.runq_setscene[task]
1436         logger.debug(1, 'Found task %s which could be accelerated',
1437                         self.rqdata.get_user_idstring(index))
1438
1439         self.scenequeue_covered.add(task)
1440         self.scenequeue_updatecounters(task)
1441
1442     def task_complete(self, task):
1443         self.stats.taskCompleted()
1444         self.task_completeoutright(task)
1445
1446     def task_fail(self, task, result):
1447         self.stats.taskFailed()
1448         index = self.rqdata.runq_setscene[task]
1449         bb.event.fire(runQueueTaskFailed(task, self.stats, result, self), self.cfgData)
1450         self.scenequeue_notcovered.add(task)
1451         self.scenequeue_updatecounters(task)
1452
1453     def task_failoutright(self, task):
1454         self.runq_running[task] = 1
1455         self.runq_buildable[task] = 1
1456         self.stats.taskCompleted()
1457         self.stats.taskSkipped()
1458         index = self.rqdata.runq_setscene[task]
1459         self.scenequeue_notcovered.add(task)
1460         self.scenequeue_updatecounters(task)
1461
1462     def task_skip(self, task):
1463         self.runq_running[task] = 1
1464         self.runq_buildable[task] = 1
1465         self.task_completeoutright(task)
1466         self.stats.taskCompleted()
1467         self.stats.taskSkipped()
1468
1469     def execute(self):
1470         """
1471         Run the tasks in a queue prepared by prepare_runqueue
1472         """
1473
1474         task = None
1475         if self.stats.active < self.number_tasks:
1476             # Find the next setscene to run
1477             for nexttask in xrange(self.stats.total):
1478                 if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
1479                     task = nexttask
1480                     break
1481         if task is not None:
1482             realtask = self.rqdata.runq_setscene[task]
1483             fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1484
1485             taskname = self.rqdata.runq_task[realtask] + "_setscene"
1486             if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask]):
1487                 logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant',
1488                              task, self.rqdata.get_user_idstring(task))
1489                 self.task_failoutright(task)
1490                 return True
1491
1492             if self.cooker.configuration.force:
1493                 for target in self.rqdata.target_pairs:
1494                     if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
1495                         self.task_failoutright(task)
1496                         return True
1497
1498             if self.rq.check_stamp_task(realtask, taskname):
1499                 logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies',
1500                              task, self.rqdata.get_user_idstring(realtask))
1501                 self.task_skip(task)
1502                 return True
1503
1504             pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname)
1505
1506             self.build_pids[pid] = task
1507             self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
1508             self.runq_running[task] = 1
1509             self.stats.taskActive()
1510             if self.stats.active < self.number_tasks:
1511                 return True
1512
1513         for pipe in self.build_pipes:
1514             self.build_pipes[pipe].read()
1515
1516         if self.stats.active > 0:
1517             if self.runqueue_process_waitpid() is None:
1518                 return 0.5
1519             return True
1520
1521         # Convert scenequeue_covered task numbers into full taskgraph ids
1522         oldcovered = self.scenequeue_covered
1523         self.rq.scenequeue_covered = set()
1524         for task in oldcovered:
1525             self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
1526
1527         logger.debug(1, 'We can skip tasks %s', self.rq.scenequeue_covered)
1528
1529         self.rq.state = runQueueRunInit
1530         return True
1531
1532     def fork_off_task(self, fn, task, taskname):
1533         return RunQueueExecute.fork_off_task(self, fn, task, taskname, quieterrors=True)
1534
1535 class TaskFailure(Exception):
1536     """
1537     Exception raised when a task in a runqueue fails
1538     """
1539     def __init__(self, x):
1540         self.args = x
1541
1542
1543 class runQueueExitWait(bb.event.Event):
1544     """
1545     Event when waiting for task processes to exit
1546     """
1547
1548     def __init__(self, remain):
1549         self.remain = remain
1550         self.message = "Waiting for %s active tasks to finish" % remain
1551         bb.event.Event.__init__(self)
1552
1553 class runQueueEvent(bb.event.Event):
1554     """
1555     Base runQueue event class
1556     """
1557     def __init__(self, task, stats, rq):
1558         self.taskid = task
1559         self.taskstring = rq.rqdata.get_user_idstring(task)
1560         self.stats = stats.copy()
1561         bb.event.Event.__init__(self)
1562
1563 class runQueueTaskStarted(runQueueEvent):
1564     """
1565     Event notifing a task was started
1566     """
1567     def __init__(self, task, stats, rq, noexec=False):
1568         runQueueEvent.__init__(self, task, stats, rq)
1569         self.noexec = noexec
1570
1571 class runQueueTaskFailed(runQueueEvent):
1572     """
1573     Event notifing a task failed
1574     """
1575     def __init__(self, task, stats, exitcode, rq):
1576         runQueueEvent.__init__(self, task, stats, rq)
1577         self.exitcode = exitcode
1578
1579 class runQueueTaskCompleted(runQueueEvent):
1580     """
1581     Event notifing a task completed
1582     """
1583
1584 def check_stamp_fn(fn, taskname, d):
1585     rqexe = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)
1586     fn = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", d)
1587     fnid = rqexe.rqdata.taskData.getfn_id(fn)
1588     taskid = rqexe.rqdata.get_task_id(fnid, taskname)
1589     if taskid is not None:
1590         return rqexe.rq.check_stamp_task(taskid)
1591     return None
1592
1593 class runQueuePipe():
1594     """
1595     Abstraction for a pipe between a worker thread and the server
1596     """
1597     def __init__(self, pipein, pipeout, d):
1598         self.input = pipein
1599         pipeout.close()
1600         fcntl.fcntl(self.input, fcntl.F_SETFL, fcntl.fcntl(self.input, fcntl.F_GETFL) | os.O_NONBLOCK)
1601         self.queue = ""
1602         self.d = d
1603
1604     def read(self):
1605         start = len(self.queue)
1606         try:
1607             self.queue = self.queue + self.input.read(102400)
1608         except (OSError, IOError):
1609             pass
1610         end = len(self.queue)
1611         index = self.queue.find("</event>")
1612         while index != -1:
1613             bb.event.fire_from_worker(self.queue[:index+8], self.d)
1614             self.queue = self.queue[index+8:]
1615             index = self.queue.find("</event>")
1616         return (end > start)
1617
1618     def close(self):
1619         while self.read():
1620             continue
1621         if len(self.queue) > 0:
1622             print("Warning, worker left partial message: %s" % self.queue)
1623         self.input.close()