Fix more incorrect usages of 'is'
[bitbake.git] / lib / bb / runqueue.py
1 #!/usr/bin/env python
2 # ex:ts=4:sw=4:sts=4:et
3 # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4 """
5 BitBake 'RunQueue' implementation
6
7 Handles preparation and execution of a queue of tasks
8 """
9
10 # Copyright (C) 2006-2007  Richard Purdie
11 #
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License version 2 as
14 # published by the Free Software Foundation.
15 #
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 # GNU General Public License for more details.
20 #
21 # You should have received a copy of the GNU General Public License along
22 # with this program; if not, write to the Free Software Foundation, Inc.,
23 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25 import copy
26 import os
27 import sys
28 import signal
29 import stat
30 import fcntl
31 import logging
32 import bb
33 from bb import msg, data, event
34
35 bblogger = logging.getLogger("BitBake")
36 logger = logging.getLogger("BitBake.RunQueue")
37
38 class RunQueueStats:
39     """
40     Holds statistics on the tasks handled by the associated runQueue
41     """
42     def __init__(self, total):
43         self.completed = 0
44         self.skipped = 0
45         self.failed = 0
46         self.active = 0
47         self.total = total
48
49     def copy(self):
50         obj = self.__class__(self.total)
51         obj.__dict__.update(self.__dict__)
52         return obj
53
54     def taskFailed(self):
55         self.active = self.active - 1
56         self.failed = self.failed + 1
57
58     def taskCompleted(self, number = 1):
59         self.active = self.active - number
60         self.completed = self.completed + number
61
62     def taskSkipped(self, number = 1):
63         self.active = self.active + number
64         self.skipped = self.skipped + number
65
66     def taskActive(self):
67         self.active = self.active + 1
68
69 # These values indicate the next step due to be run in the
70 # runQueue state machine
71 runQueuePrepare = 2
72 runQueueSceneInit = 3
73 runQueueSceneRun = 4
74 runQueueRunInit = 5
75 runQueueRunning = 6
76 runQueueFailed = 7
77 runQueueCleanUp = 8
78 runQueueComplete = 9
79 runQueueChildProcess = 10
80
81 class RunQueueScheduler(object):
82     """
83     Control the order tasks are scheduled in.
84     """
85     name = "basic"
86
87     def __init__(self, runqueue, rqdata):
88         """
89         The default scheduler just returns the first buildable task (the
90         priority map is sorted by task numer)
91         """
92         self.rq = runqueue
93         self.rqdata = rqdata
94         numTasks = len(self.rqdata.runq_fnid)
95
96         self.prio_map = []
97         self.prio_map.extend(range(numTasks))
98
99     def next_buildable_task(self):
100         """
101         Return the id of the first task we find that is buildable
102         """
103         for tasknum in xrange(len(self.rqdata.runq_fnid)):
104             taskid = self.prio_map[tasknum]
105             if self.rq.runq_running[taskid] == 1:
106                 continue
107             if self.rq.runq_buildable[taskid] == 1:
108                 return taskid
109
110     def next(self):
111         """
112         Return the id of the task we should build next
113         """
114         if self.rq.stats.active < self.rq.number_tasks:
115             return self.next_buildable_task()
116
117 class RunQueueSchedulerSpeed(RunQueueScheduler):
118     """
119     A scheduler optimised for speed. The priority map is sorted by task weight,
120     heavier weighted tasks (tasks needed by the most other tasks) are run first.
121     """
122     name = "speed"
123
124     def __init__(self, runqueue, rqdata):
125         """
126         The priority map is sorted by task weight.
127         """
128
129         self.rq = runqueue
130         self.rqdata = rqdata
131
132         sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight))
133         copyweight = copy.deepcopy(self.rqdata.runq_weight)
134         self.prio_map = []
135
136         for weight in sortweight:
137             idx = copyweight.index(weight)
138             self.prio_map.append(idx)
139             copyweight[idx] = -1
140
141         self.prio_map.reverse()
142
143 class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
144     """
145     A scheduler optimised to complete .bb files are quickly as possible. The
146     priority map is sorted by task weight, but then reordered so once a given
147     .bb file starts to build, its completed as quickly as possible. This works
148     well where disk space is at a premium and classes like OE's rm_work are in
149     force.
150     """
151     name = "completion"
152
153     def __init__(self, runqueue, rqdata):
154         RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata)
155
156         #FIXME - whilst this groups all fnids together it does not reorder the
157         #fnid groups optimally.
158
159         basemap = copy.deepcopy(self.prio_map)
160         self.prio_map = []
161         while (len(basemap) > 0):
162             entry = basemap.pop(0)
163             self.prio_map.append(entry)
164             fnid = self.rqdata.runq_fnid[entry]
165             todel = []
166             for entry in basemap:
167                 entry_fnid = self.rqdata.runq_fnid[entry]
168                 if entry_fnid == fnid:
169                     todel.append(basemap.index(entry))
170                     self.prio_map.append(entry)
171             todel.reverse()
172             for idx in todel:
173                 del basemap[idx]
174
175 class RunQueueData:
176     """
177     BitBake Run Queue implementation
178     """
179     def __init__(self, rq, cooker, cfgData, dataCache, taskData, targets):
180         self.cooker = cooker
181         self.dataCache = dataCache
182         self.taskData = taskData
183         self.targets = targets
184         self.rq = rq
185
186         self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or ""
187         self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split()
188
189         self.reset()
190
191     def reset(self):
192         self.runq_fnid = []
193         self.runq_task = []
194         self.runq_depends = []
195         self.runq_revdeps = []
196         self.runq_hash = []
197
198     def runq_depends_names(self, ids):
199         import re
200         ret = []
201         for id in self.runq_depends[ids]:
202             nam = os.path.basename(self.get_user_idstring(id))
203             nam = re.sub("_[^,]*,", ",", nam)
204             ret.extend([nam])
205         return ret
206
207     def get_user_idstring(self, task):
208         fn = self.taskData.fn_index[self.runq_fnid[task]]
209         taskname = self.runq_task[task]
210         return "%s, %s" % (fn, taskname)
211
212     def get_task_id(self, fnid, taskname):
213         for listid in xrange(len(self.runq_fnid)):
214             if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname:
215                 return listid
216         return None
217
218     def circular_depchains_handler(self, tasks):
219         """
220         Some tasks aren't buildable, likely due to circular dependency issues.
221         Identify the circular dependencies and print them in a user readable format.
222         """
223         from copy import deepcopy
224
225         valid_chains = []
226         explored_deps = {}
227         msgs = []
228
229         def chain_reorder(chain):
230             """
231             Reorder a dependency chain so the lowest task id is first
232             """
233             lowest = 0
234             new_chain = []
235             for entry in xrange(len(chain)):
236                 if chain[entry] < chain[lowest]:
237                     lowest = entry
238             new_chain.extend(chain[lowest:])
239             new_chain.extend(chain[:lowest])
240             return new_chain
241
242         def chain_compare_equal(chain1, chain2):
243             """
244             Compare two dependency chains and see if they're the same
245             """
246             if len(chain1) != len(chain2):
247                 return False
248             for index in xrange(len(chain1)):
249                 if chain1[index] != chain2[index]:
250                     return False
251             return True
252
253         def chain_array_contains(chain, chain_array):
254             """
255             Return True if chain_array contains chain
256             """
257             for ch in chain_array:
258                 if chain_compare_equal(ch, chain):
259                     return True
260             return False
261
262         def find_chains(taskid, prev_chain):
263             prev_chain.append(taskid)
264             total_deps = []
265             total_deps.extend(self.runq_revdeps[taskid])
266             for revdep in self.runq_revdeps[taskid]:
267                 if revdep in prev_chain:
268                     idx = prev_chain.index(revdep)
269                     # To prevent duplicates, reorder the chain to start with the lowest taskid
270                     # and search through an array of those we've already printed
271                     chain = prev_chain[idx:]
272                     new_chain = chain_reorder(chain)
273                     if not chain_array_contains(new_chain, valid_chains):
274                         valid_chains.append(new_chain)
275                         msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
276                         for dep in new_chain:
277                             msgs.append("  Task %s (%s) (dependent Tasks %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends_names(dep)))
278                         msgs.append("\n")
279                     if len(valid_chains) > 10:
280                         msgs.append("Aborted dependency loops search after 10 matches.\n")
281                         return msgs
282                     continue
283                 scan = False
284                 if revdep not in explored_deps:
285                     scan = True
286                 elif revdep in explored_deps[revdep]:
287                     scan = True
288                 else:
289                     for dep in prev_chain:
290                         if dep in explored_deps[revdep]:
291                             scan = True
292                 if scan:
293                     find_chains(revdep, copy.deepcopy(prev_chain))
294                 for dep in explored_deps[revdep]:
295                     if dep not in total_deps:
296                         total_deps.append(dep)
297
298             explored_deps[taskid] = total_deps
299
300         for task in tasks:
301             find_chains(task, [])
302
303         return msgs
304
305     def calculate_task_weights(self, endpoints):
306         """
307         Calculate a number representing the "weight" of each task. Heavier weighted tasks
308         have more dependencies and hence should be executed sooner for maximum speed.
309
310         This function also sanity checks the task list finding tasks that are not
311         possible to execute due to circular dependencies.
312         """
313
314         numTasks = len(self.runq_fnid)
315         weight = []
316         deps_left = []
317         task_done = []
318
319         for listid in xrange(numTasks):
320             task_done.append(False)
321             weight.append(0)
322             deps_left.append(len(self.runq_revdeps[listid]))
323
324         for listid in endpoints:
325             weight[listid] = 1
326             task_done[listid] = True
327
328         while True:
329             next_points = []
330             for listid in endpoints:
331                 for revdep in self.runq_depends[listid]:
332                     weight[revdep] = weight[revdep] + weight[listid]
333                     deps_left[revdep] = deps_left[revdep] - 1
334                     if deps_left[revdep] == 0:
335                         next_points.append(revdep)
336                         task_done[revdep] = True
337             endpoints = next_points
338             if len(next_points) == 0:
339                 break
340
341         # Circular dependency sanity check
342         problem_tasks = []
343         for task in xrange(numTasks):
344             if task_done[task] is False or deps_left[task] != 0:
345                 problem_tasks.append(task)
346                 logger.debug(2, "Task %s (%s) is not buildable", task, self.get_user_idstring(task))
347                 logger.debug(2, "(Complete marker was %s and the remaining dependency count was %s)\n", task_done[task], deps_left[task])
348
349         if problem_tasks:
350             message = "Unbuildable tasks were found.\n"
351             message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
352             message = message + "Identifying dependency loops (this may take a short while)...\n"
353             logger.error(message)
354
355             msgs = self.circular_depchains_handler(problem_tasks)
356
357             message = "\n"
358             for msg in msgs:
359                 message = message + msg
360             bb.msg.fatal(bb.msg.domain.RunQueue, message)
361
362         return weight
363
364     def prepare(self):
365         """
366         Turn a set of taskData into a RunQueue and compute data needed
367         to optimise the execution order.
368         """
369
370         runq_build = []
371         recursive_tdepends = {}
372         runq_recrdepends = []
373         tdepends_fnid = {}
374
375         taskData = self.taskData
376
377         if len(taskData.tasks_name) == 0:
378             # Nothing to do
379             return 0
380
381         logger.info("Preparing runqueue")
382
383         # Step A - Work out a list of tasks to run
384         #
385         # Taskdata gives us a list of possible providers for every build and run
386         # target ordered by priority. It also gives information on each of those
387         # providers.
388         #
389         # To create the actual list of tasks to execute we fix the list of
390         # providers and then resolve the dependencies into task IDs. This
391         # process is repeated for each type of dependency (tdepends, deptask,
392         # rdeptast, recrdeptask, idepends).
393
394         def add_build_dependencies(depids, tasknames, depends):
395             for depid in depids:
396                 # Won't be in build_targets if ASSUME_PROVIDED
397                 if depid not in taskData.build_targets:
398                     continue
399                 depdata = taskData.build_targets[depid][0]
400                 if depdata is None:
401                     continue
402                 dep = taskData.fn_index[depdata]
403                 for taskname in tasknames:
404                     taskid = taskData.gettask_id(dep, taskname, False)
405                     if taskid is not None:
406                         depends.append(taskid)
407
408         def add_runtime_dependencies(depids, tasknames, depends):
409             for depid in depids:
410                 if depid not in taskData.run_targets:
411                     continue
412                 depdata = taskData.run_targets[depid][0]
413                 if depdata is None:
414                     continue
415                 dep = taskData.fn_index[depdata]
416                 for taskname in tasknames:
417                     taskid = taskData.gettask_id(dep, taskname, False)
418                     if taskid is not None:
419                         depends.append(taskid)
420
421         for task in xrange(len(taskData.tasks_name)):
422             depends = []
423             recrdepends = []
424             fnid = taskData.tasks_fnid[task]
425             fn = taskData.fn_index[fnid]
426             task_deps = self.dataCache.task_deps[fn]
427
428             logger.debug(2, "Processing %s:%s", fn, taskData.tasks_name[task])
429
430             if fnid not in taskData.failed_fnids:
431
432                 # Resolve task internal dependencies
433                 #
434                 # e.g. addtask before X after Y
435                 depends = taskData.tasks_tdepends[task]
436
437                 # Resolve 'deptask' dependencies
438                 #
439                 # e.g. do_sometask[deptask] = "do_someothertask"
440                 # (makes sure sometask runs after someothertask of all DEPENDS)
441                 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
442                     tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
443                     add_build_dependencies(taskData.depids[fnid], tasknames, depends)
444
445                 # Resolve 'rdeptask' dependencies
446                 #
447                 # e.g. do_sometask[rdeptask] = "do_someothertask"
448                 # (makes sure sometask runs after someothertask of all RDEPENDS)
449                 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
450                     taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
451                     add_runtime_dependencies(taskData.rdepids[fnid], [taskname], depends)
452
453                 # Resolve inter-task dependencies
454                 #
455                 # e.g. do_sometask[depends] = "targetname:do_someothertask"
456                 # (makes sure sometask runs after targetname's someothertask)
457                 if fnid not in tdepends_fnid:
458                     tdepends_fnid[fnid] = set()
459                 idepends = taskData.tasks_idepends[task]
460                 for (depid, idependtask) in idepends:
461                     if depid in taskData.build_targets:
462                         # Won't be in build_targets if ASSUME_PROVIDED
463                         depdata = taskData.build_targets[depid][0]
464                         if depdata is not None:
465                             dep = taskData.fn_index[depdata]
466                             taskid = taskData.gettask_id(dep, idependtask, False)
467                             if taskid is None:
468                                 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s in %s depends upon nonexistant task %s in %s" % (taskData.tasks_name[task], fn, idependtask, dep))
469                             depends.append(taskid)
470                             if depdata != fnid:
471                                 tdepends_fnid[fnid].add(taskid)
472
473
474                 # Resolve recursive 'recrdeptask' dependencies (A)
475                 #
476                 # e.g. do_sometask[recrdeptask] = "do_someothertask"
477                 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
478                 # We cover the recursive part of the dependencies below
479                 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
480                     for taskname in task_deps['recrdeptask'][taskData.tasks_name[task]].split():
481                         recrdepends.append(taskname)
482                         add_build_dependencies(taskData.depids[fnid], [taskname], depends)
483                         add_runtime_dependencies(taskData.rdepids[fnid], [taskname], depends)
484
485                 # Rmove all self references
486                 if task in depends:
487                     newdep = []
488                     logger.debug(2, "Task %s (%s %s) contains self reference! %s", task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends)
489                     for dep in depends:
490                         if task != dep:
491                             newdep.append(dep)
492                     depends = newdep
493
494             self.runq_fnid.append(taskData.tasks_fnid[task])
495             self.runq_task.append(taskData.tasks_name[task])
496             self.runq_depends.append(set(depends))
497             self.runq_revdeps.append(set())
498             self.runq_hash.append("")
499
500             runq_build.append(0)
501             runq_recrdepends.append(recrdepends)
502
503         #
504         # Build a list of recursive cumulative dependencies for each fnid
505         # We do this by fnid, since if A depends on some task in B
506         # we're interested in later tasks B's fnid might have but B itself
507         # doesn't depend on
508         #
509         # Algorithm is O(tasks) + O(tasks)*O(fnids)
510         #
511         reccumdepends = {}
512         for task in xrange(len(self.runq_fnid)):
513             fnid = self.runq_fnid[task]
514             if fnid not in reccumdepends:
515                 if fnid in tdepends_fnid:
516                     reccumdepends[fnid] = tdepends_fnid[fnid]
517                 else:
518                     reccumdepends[fnid] = set()
519             reccumdepends[fnid].update(self.runq_depends[task])
520         for task in xrange(len(self.runq_fnid)):
521             taskfnid = self.runq_fnid[task]
522             for fnid in reccumdepends:
523                 if task in reccumdepends[fnid]:
524                     reccumdepends[fnid].add(task)
525                     if taskfnid in reccumdepends:
526                         reccumdepends[fnid].update(reccumdepends[taskfnid])
527
528
529         # Resolve recursive 'recrdeptask' dependencies (B)
530         #
531         # e.g. do_sometask[recrdeptask] = "do_someothertask"
532         # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
533         for task in xrange(len(self.runq_fnid)):
534             if len(runq_recrdepends[task]) > 0:
535                 taskfnid = self.runq_fnid[task]
536                 for dep in reccumdepends[taskfnid]:
537                     # Ignore self references
538                     if dep == task:
539                         continue
540                     for taskname in runq_recrdepends[task]:
541                         if taskData.tasks_name[dep] == taskname:
542                             self.runq_depends[task].add(dep)
543
544         # Step B - Mark all active tasks
545         #
546         # Start with the tasks we were asked to run and mark all dependencies
547         # as active too. If the task is to be 'forced', clear its stamp. Once
548         # all active tasks are marked, prune the ones we don't need.
549
550         logger.verbose("Marking Active Tasks")
551
552         def mark_active(listid, depth):
553             """
554             Mark an item as active along with its depends
555             (calls itself recursively)
556             """
557
558             if runq_build[listid] == 1:
559                 return
560
561             runq_build[listid] = 1
562
563             depends = self.runq_depends[listid]
564             for depend in depends:
565                 mark_active(depend, depth+1)
566
567         self.target_pairs = []
568         for target in self.targets:
569             targetid = taskData.getbuild_id(target[0])
570
571             if targetid not in taskData.build_targets:
572                 continue
573
574             if targetid in taskData.failed_deps:
575                 continue
576
577             fnid = taskData.build_targets[targetid][0]
578             fn = taskData.fn_index[fnid]
579             self.target_pairs.append((fn, target[1]))
580
581             if fnid in taskData.failed_fnids:
582                 continue
583
584             if target[1] not in taskData.tasks_lookup[fnid]:
585                 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s does not exist for target %s" % (target[1], target[0]))
586
587             listid = taskData.tasks_lookup[fnid][target[1]]
588
589             mark_active(listid, 1)
590
591         # Step C - Prune all inactive tasks
592         #
593         # Once all active tasks are marked, prune the ones we don't need.
594
595         maps = []
596         delcount = 0
597         for listid in xrange(len(self.runq_fnid)):
598             if runq_build[listid-delcount] == 1:
599                 maps.append(listid-delcount)
600             else:
601                 del self.runq_fnid[listid-delcount]
602                 del self.runq_task[listid-delcount]
603                 del self.runq_depends[listid-delcount]
604                 del runq_build[listid-delcount]
605                 del self.runq_revdeps[listid-delcount]
606                 del self.runq_hash[listid-delcount]
607                 delcount = delcount + 1
608                 maps.append(-1)
609
610         #
611         # Step D - Sanity checks and computation
612         #
613
614         # Check to make sure we still have tasks to run
615         if len(self.runq_fnid) == 0:
616             if not taskData.abort:
617                 bb.msg.fatal(bb.msg.domain.RunQueue, "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
618             else:
619                 bb.msg.fatal(bb.msg.domain.RunQueue, "No active tasks and not in --continue mode?! Please report this bug.")
620
621         logger.verbose("Pruned %s inactive tasks, %s left", delcount, len(self.runq_fnid))
622
623         # Remap the dependencies to account for the deleted tasks
624         # Check we didn't delete a task we depend on
625         for listid in xrange(len(self.runq_fnid)):
626             newdeps = []
627             origdeps = self.runq_depends[listid]
628             for origdep in origdeps:
629                 if maps[origdep] == -1:
630                     bb.msg.fatal(bb.msg.domain.RunQueue, "Invalid mapping - Should never happen!")
631                 newdeps.append(maps[origdep])
632             self.runq_depends[listid] = set(newdeps)
633
634         logger.verbose("Assign Weightings")
635
636         # Generate a list of reverse dependencies to ease future calculations
637         for listid in xrange(len(self.runq_fnid)):
638             for dep in self.runq_depends[listid]:
639                 self.runq_revdeps[dep].add(listid)
640
641         # Identify tasks at the end of dependency chains
642         # Error on circular dependency loops (length two)
643         endpoints = []
644         for listid in xrange(len(self.runq_fnid)):
645             revdeps = self.runq_revdeps[listid]
646             if len(revdeps) == 0:
647                 endpoints.append(listid)
648             for dep in revdeps:
649                 if dep in self.runq_depends[listid]:
650                     #self.dump_data(taskData)
651                     bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep], taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
652
653         logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
654
655         # Calculate task weights
656         # Check of higher length circular dependencies
657         self.runq_weight = self.calculate_task_weights(endpoints)
658
659         # Sanity Check - Check for multiple tasks building the same provider
660         prov_list = {}
661         seen_fn = []
662         for task in xrange(len(self.runq_fnid)):
663             fn = taskData.fn_index[self.runq_fnid[task]]
664             if fn in seen_fn:
665                 continue
666             seen_fn.append(fn)
667             for prov in self.dataCache.fn_provides[fn]:
668                 if prov not in prov_list:
669                     prov_list[prov] = [fn]
670                 elif fn not in prov_list[prov]:
671                     prov_list[prov].append(fn)
672         error = False
673         for prov in prov_list:
674             if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
675                 error = True
676                 logger.error("Multiple .bb files are due to be built which each provide %s (%s).\n This usually means one provides something the other doesn't and should.", prov, " ".join(prov_list[prov]))
677
678
679         # Create a whitelist usable by the stamp checks
680         stampfnwhitelist = []
681         for entry in self.stampwhitelist.split():
682             entryid = self.taskData.getbuild_id(entry)
683             if entryid not in self.taskData.build_targets:
684                 continue
685             fnid = self.taskData.build_targets[entryid][0]
686             fn = self.taskData.fn_index[fnid]
687             stampfnwhitelist.append(fn)
688         self.stampfnwhitelist = stampfnwhitelist
689
690         # Interate over the task list looking for tasks with a 'setscene' function
691         self.runq_setscene = []
692         for task in range(len(self.runq_fnid)):
693             setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
694             if not setscene:
695                 continue
696             self.runq_setscene.append(task)
697
698         # Interate over the task list and call into the siggen code
699         dealtwith = set()
700         todeal = set(range(len(self.runq_fnid)))
701         while len(todeal) > 0:
702             for task in todeal.copy():
703                 if len(self.runq_depends[task] - dealtwith) == 0:
704                     dealtwith.add(task)
705                     todeal.remove(task)
706                     procdep = []
707                     for dep in self.runq_depends[task]:
708                         procdep.append(self.taskData.fn_index[self.runq_fnid[dep]] + "." + self.runq_task[dep])
709                     self.runq_hash[task] = bb.parse.siggen.get_taskhash(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task], procdep, self.dataCache)
710
711         self.hashes = {}
712         self.hash_deps = {}
713         for task in xrange(len(self.runq_fnid)):
714             identifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[task]],
715                                     self.runq_task[task])
716             self.hashes[identifier] = self.runq_hash[task]
717             deps = []
718             for dep in self.runq_depends[task]:
719                 depidentifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[dep]],
720                                            self.runq_task[dep])
721                 deps.append(depidentifier)
722             self.hash_deps[identifier] = deps
723
724         # Remove stamps for targets if force mode active
725         if self.cooker.configuration.force:
726             for (fn, target) in self.target_pairs:
727                 logger.verbose("Remove stamp %s, %s", target, fn)
728                 bb.build.del_stamp(target, self.dataCache, fn)
729
730         return len(self.runq_fnid)
731
732     def dump_data(self, taskQueue):
733         """
734         Dump some debug information on the internal data structures
735         """
736         logger.debug(3, "run_tasks:")
737         for task in xrange(len(self.rqdata.runq_task)):
738             logger.debug(3, " (%s)%s - %s: %s   Deps %s RevDeps %s", task,
739                          taskQueue.fn_index[self.rqdata.runq_fnid[task]],
740                          self.rqdata.runq_task[task],
741                          self.rqdata.runq_weight[task],
742                          self.rqdata.runq_depends[task],
743                          self.rqdata.runq_revdeps[task])
744
745         logger.debug(3, "sorted_tasks:")
746         for task1 in xrange(len(self.rqdata.runq_task)):
747             if task1 in self.prio_map:
748                 task = self.prio_map[task1]
749                 logger.debug(3, " (%s)%s - %s: %s   Deps %s RevDeps %s", task,
750                            taskQueue.fn_index[self.rqdata.runq_fnid[task]],
751                            self.rqdata.runq_task[task],
752                            self.rqdata.runq_weight[task],
753                            self.rqdata.runq_depends[task],
754                            self.rqdata.runq_revdeps[task])
755
756 class RunQueue:
757     def __init__(self, cooker, cfgData, dataCache, taskData, targets):
758
759         self.cooker = cooker
760         self.cfgData = cfgData
761         self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets)
762
763         self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, True) or "perfile"
764         self.hashvalidate = bb.data.getVar("BB_HASHCHECK_FUNCTION", cfgData, True) or None
765
766         self.state = runQueuePrepare
767
768     def check_stamps(self):
769         unchecked = {}
770         current = []
771         notcurrent = []
772         buildable = []
773
774         if self.stamppolicy == "perfile":
775             fulldeptree = False
776         else:
777             fulldeptree = True
778             stampwhitelist = []
779             if self.stamppolicy == "whitelist":
780                 stampwhitelist = self.rqdata.stampfnwhitelist
781
782         for task in xrange(len(self.rqdata.runq_fnid)):
783             unchecked[task] = ""
784             if len(self.rqdata.runq_depends[task]) == 0:
785                 buildable.append(task)
786
787         def check_buildable(self, task, buildable):
788             for revdep in self.rqdata.runq_revdeps[task]:
789                 alldeps = 1
790                 for dep in self.rqdata.runq_depends[revdep]:
791                     if dep in unchecked:
792                         alldeps = 0
793                 if alldeps == 1:
794                     if revdep in unchecked:
795                         buildable.append(revdep)
796
797         for task in xrange(len(self.rqdata.runq_fnid)):
798             if task not in unchecked:
799                 continue
800             fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
801             taskname = self.rqdata.runq_task[task]
802             stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
803             # If the stamp is missing its not current
804             if not os.access(stampfile, os.F_OK):
805                 del unchecked[task]
806                 notcurrent.append(task)
807                 check_buildable(self, task, buildable)
808                 continue
809             # If its a 'nostamp' task, it's not current
810             taskdep = self.rqdata.dataCache.task_deps[fn]
811             if 'nostamp' in taskdep and task in taskdep['nostamp']:
812                 del unchecked[task]
813                 notcurrent.append(task)
814                 check_buildable(self, task, buildable)
815                 continue
816
817         while (len(buildable) > 0):
818             nextbuildable = []
819             for task in buildable:
820                 if task in unchecked:
821                     fn = self.taskData.fn_index[self.rqdata.runq_fnid[task]]
822                     taskname = self.rqdata.runq_task[task]
823                     stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
824                     iscurrent = True
825
826                     t1 = os.stat(stampfile)[stat.ST_MTIME]
827                     for dep in self.rqdata.runq_depends[task]:
828                         if iscurrent:
829                             fn2 = self.taskData.fn_index[self.rqdata.runq_fnid[dep]]
830                             taskname2 = self.rqdata.runq_task[dep]
831                             stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
832                             if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
833                                 if dep in notcurrent:
834                                     iscurrent = False
835                                 else:
836                                     t2 = os.stat(stampfile2)[stat.ST_MTIME]
837                                     if t1 < t2:
838                                         iscurrent = False
839                     del unchecked[task]
840                     if iscurrent:
841                         current.append(task)
842                     else:
843                         notcurrent.append(task)
844
845                 check_buildable(self, task, nextbuildable)
846
847             buildable = nextbuildable
848
849         #for task in range(len(self.runq_fnid)):
850         #    fn = self.taskData.fn_index[self.runq_fnid[task]]
851         #    taskname = self.runq_task[task]
852         #    print "%s %s.%s" % (task, taskname, fn)
853
854         #print "Unchecked: %s" % unchecked
855         #print "Current: %s" % current
856         #print "Not current: %s" % notcurrent
857
858         if len(unchecked) > 0:
859             bb.msg.fatal(bb.msg.domain.RunQueue, "check_stamps fatal internal error")
860         return current
861
862     def check_stamp_task(self, task, taskname = None):
863         def get_timestamp(f):
864             try:
865                 if not os.access(f, os.F_OK):
866                     return None
867                 return os.stat(f)[stat.ST_MTIME]
868             except:
869                 return None
870
871         if self.stamppolicy == "perfile":
872             fulldeptree = False
873         else:
874             fulldeptree = True
875             stampwhitelist = []
876             if self.stamppolicy == "whitelist":
877                 stampwhitelist = self.rqdata.stampfnwhitelist
878
879         fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
880         if taskname is None:
881             taskname = self.rqdata.runq_task[task]
882
883         stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
884
885         # If the stamp is missing its not current
886         if not os.access(stampfile, os.F_OK):
887             logger.debug(2, "Stampfile %s not available", stampfile)
888             return False
889         # If its a 'nostamp' task, it's not current
890         taskdep = self.rqdata.dataCache.task_deps[fn]
891         if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
892             logger.debug(2, "%s.%s is nostamp\n", fn, taskname)
893             return False
894
895         if taskname != "do_setscene" and taskname.endswith("_setscene"):
896             return True
897
898         iscurrent = True
899         t1 = get_timestamp(stampfile)
900         for dep in self.rqdata.runq_depends[task]:
901             if iscurrent:
902                 fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]]
903                 taskname2 = self.rqdata.runq_task[dep]
904                 stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
905                 stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCache, fn2)
906                 t2 = get_timestamp(stampfile2)
907                 t3 = get_timestamp(stampfile3)
908                 if t3 and t3 > t2:
909                    continue
910                 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
911                     if not t2:
912                         logger.debug(2, 'Stampfile %s does not exist', stampfile2)
913                         iscurrent = False
914                     if t1 < t2:
915                         logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2)
916                         iscurrent = False
917
918         return iscurrent
919
920     def execute_runqueue(self):
921         """
922         Run the tasks in a queue prepared by rqdata.prepare()
923         Upon failure, optionally try to recover the build using any alternate providers
924         (if the abort on failure configuration option isn't set)
925         """
926
927         retval = 0.5
928
929         if self.state is runQueuePrepare:
930             self.rqexe = RunQueueExecuteDummy(self)
931             if self.rqdata.prepare() == 0:
932                 self.state = runQueueComplete
933             else:
934                 self.state = runQueueSceneInit
935
936         if self.state is runQueueSceneInit:
937             if self.cooker.configuration.dump_signatures:
938                 self.dump_signatures()
939             else:
940                 self.rqexe = RunQueueExecuteScenequeue(self)
941
942         if self.state is runQueueSceneRun:
943             retval = self.rqexe.execute()
944
945         if self.state is runQueueRunInit:
946             logger.info("Executing RunQueue Tasks")
947             self.rqexe = RunQueueExecuteTasks(self)
948             self.state = runQueueRunning
949
950         if self.state is runQueueRunning:
951             retval = self.rqexe.execute()
952
953         if self.state is runQueueCleanUp:
954            self.rqexe.finish()
955
956         if self.state is runQueueFailed:
957             if not self.rqdata.taskData.tryaltconfigs:
958                 raise bb.runqueue.TaskFailure(self.rqexe.failed_fnids)
959             for fnid in self.rqexe.failed_fnids:
960                 self.rqdata.taskData.fail_fnid(fnid)
961             self.rqdata.reset()
962
963         if self.state is runQueueComplete:
964             # All done
965             logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
966             return False
967
968         if self.state is runQueueChildProcess:
969             print("Child process, eeek, shouldn't happen!")
970             return False
971
972         # Loop
973         return retval
974
975     def finish_runqueue(self, now = False):
976         if now:
977             self.rqexe.finish_now()
978         else:
979             self.rqexe.finish()
980
981     def dump_signatures(self):
982         self.state = runQueueComplete
983         done = set()
984         bb.note("Reparsing files to collect dependency data")
985         for task in range(len(self.rqdata.runq_fnid)):
986             if self.rqdata.runq_fnid[task] not in done:
987                 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
988                 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
989                 done.add(self.rqdata.runq_fnid[task])
990
991         bb.parse.siggen.dump_sigs(self.rqdata.dataCache)
992
993         return
994
995
996 class RunQueueExecute:
997
998     def __init__(self, rq):
999         self.rq = rq
1000         self.cooker = rq.cooker
1001         self.cfgData = rq.cfgData
1002         self.rqdata = rq.rqdata
1003
1004         self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", self.cfgData, 1) or 1)
1005         self.scheduler = bb.data.getVar("BB_SCHEDULER", self.cfgData, 1) or "speed"
1006
1007         self.runq_buildable = []
1008         self.runq_running = []
1009         self.runq_complete = []
1010         self.build_pids = {}
1011         self.build_pipes = {}
1012         self.failed_fnids = []
1013
1014     def runqueue_process_waitpid(self):
1015         """
1016         Return none is there are no processes awaiting result collection, otherwise
1017         collect the process exit codes and close the information pipe.
1018         """
1019         result = os.waitpid(-1, os.WNOHANG)
1020         if result[0] == 0 and result[1] == 0:
1021             return None
1022         task = self.build_pids[result[0]]
1023         del self.build_pids[result[0]]
1024         self.build_pipes[result[0]].close()
1025         del self.build_pipes[result[0]]
1026         if result[1] != 0:
1027             self.task_fail(task, result[1]>>8)
1028         else:
1029             self.task_complete(task)
1030
1031     def finish_now(self):
1032         if self.stats.active:
1033             logger.info("Sending SIGTERM to remaining %s tasks", self.stats.active)
1034             for k, v in self.build_pids.iteritems():
1035                 try:
1036                     os.kill(-k, signal.SIGTERM)
1037                 except:
1038                     pass
1039         for pipe in self.build_pipes:
1040             self.build_pipes[pipe].read()
1041
1042     def finish(self):
1043         self.rq.state = runQueueCleanUp
1044
1045         for pipe in self.build_pipes:
1046             self.build_pipes[pipe].read()
1047
1048         if self.stats.active > 0:
1049             bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1050             self.runqueue_process_waitpid()
1051             return
1052
1053         if len(self.failed_fnids) != 0:
1054             self.rq.state = runQueueFailed
1055             return
1056
1057         self.rq.state = runQueueComplete
1058         return
1059
1060     def fork_off_task(self, fn, task, taskname, quieterrors=False):
1061         sys.stdout.flush()
1062         sys.stderr.flush()
1063         try:
1064             pipein, pipeout = os.pipe()
1065             pipein = os.fdopen(pipein, 'rb', 4096)
1066             pipeout = os.fdopen(pipeout, 'wb', 0)
1067             pid = os.fork()
1068         except OSError as e:
1069             bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
1070         if pid == 0:
1071             pipein.close()
1072
1073             # Save out the PID so that the event can include it the
1074             # events
1075             bb.event.worker_pid = os.getpid()
1076             bb.event.worker_pipe = pipeout
1077
1078             self.rq.state = runQueueChildProcess
1079             # Make the child the process group leader
1080             os.setpgid(0, 0)
1081             # No stdin
1082             newsi = os.open(os.devnull, os.O_RDWR)
1083             os.dup2(newsi, sys.stdin.fileno())
1084
1085             bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
1086             bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data)
1087             bb.parse.siggen.set_taskdata(self.rqdata.hashes, self.rqdata.hash_deps)
1088             ret = 0
1089             try:
1090                 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
1091                 the_data.setVar('BB_TASKHASH', self.rqdata.runq_hash[task])
1092                 os.environ.update(bb.data.exported_vars(the_data))
1093             except Exception as exc:
1094                 if not quieterrors:
1095                     logger.critical(str(exc))
1096                 os._exit(1)
1097             try:
1098                 ret = bb.build.exec_task(fn, taskname, the_data)
1099                 os._exit(ret)
1100             except:
1101                 os._exit(1)
1102
1103         return pid, pipein, pipeout
1104
1105 class RunQueueExecuteDummy(RunQueueExecute):
1106     def __init__(self, rq):
1107         self.rq = rq
1108         self.stats = RunQueueStats(0)
1109
1110     def finish(self):
1111         self.rq.state = runQueueComplete
1112         return
1113
1114 class RunQueueExecuteTasks(RunQueueExecute):
1115     def __init__(self, rq):
1116         RunQueueExecute.__init__(self, rq)
1117
1118         self.stats = RunQueueStats(len(self.rqdata.runq_fnid))
1119
1120         # Mark initial buildable tasks
1121         for task in xrange(self.stats.total):
1122             self.runq_running.append(0)
1123             self.runq_complete.append(0)
1124             if len(self.rqdata.runq_depends[task]) == 0:
1125                 self.runq_buildable.append(1)
1126             else:
1127                 self.runq_buildable.append(0)
1128             if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
1129                 self.rq.scenequeue_covered.add(task)
1130
1131         found = True
1132         while found:
1133             found = False
1134             for task in xrange(self.stats.total):
1135                 if task in self.rq.scenequeue_covered:
1136                     continue
1137                 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
1138                     self.rq.scenequeue_covered.add(task)
1139                     found = True
1140
1141         logger.debug(1, 'Full skip list %s', self.rq.scenequeue_covered)
1142
1143         for task in self.rq.scenequeue_covered:
1144             self.task_skip(task)
1145
1146         event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
1147
1148         schedulers = self.get_schedulers()
1149         for scheduler in schedulers:
1150             if self.scheduler == scheduler.name:
1151                 self.sched = scheduler(self, self.rqdata)
1152                 logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name)
1153                 break
1154         else:
1155             bb.fatal("Invalid scheduler '%s'.  Available schedulers: %s" %
1156                      (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1157
1158
1159     def get_schedulers(self):
1160         schedulers = set(obj for obj in globals().values()
1161                              if type(obj) is type and
1162                                 issubclass(obj, RunQueueScheduler))
1163
1164         user_schedulers = bb.data.getVar("BB_SCHEDULERS", self.cfgData, True)
1165         if user_schedulers:
1166             for sched in user_schedulers.split():
1167                 if not "." in sched:
1168                     bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
1169                     continue
1170
1171                 modname, name = sched.rsplit(".", 1)
1172                 try:
1173                     module = __import__(modname, fromlist=(name,))
1174                 except ImportError, exc:
1175                     logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
1176                     raise SystemExit(1)
1177                 else:
1178                     schedulers.add(getattr(module, name))
1179         return schedulers
1180
1181     def task_completeoutright(self, task):
1182         """
1183         Mark a task as completed
1184         Look at the reverse dependencies and mark any task with
1185         completed dependencies as buildable
1186         """
1187         self.runq_complete[task] = 1
1188         for revdep in self.rqdata.runq_revdeps[task]:
1189             if self.runq_running[revdep] == 1:
1190                 continue
1191             if self.runq_buildable[revdep] == 1:
1192                 continue
1193             alldeps = 1
1194             for dep in self.rqdata.runq_depends[revdep]:
1195                 if self.runq_complete[dep] != 1:
1196                     alldeps = 0
1197             if alldeps == 1:
1198                 self.runq_buildable[revdep] = 1
1199                 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
1200                 taskname = self.rqdata.runq_task[revdep]
1201                 logger.debug(1, "Marking task %s (%s, %s) as buildable", revdep, fn, taskname)
1202
1203     def task_complete(self, task):
1204         self.stats.taskCompleted()
1205         bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1206         self.task_completeoutright(task)
1207
1208     def task_fail(self, task, exitcode):
1209         """
1210         Called when a task has failed
1211         Updates the state engine with the failure
1212         """
1213         self.stats.taskFailed()
1214         fnid = self.rqdata.runq_fnid[task]
1215         self.failed_fnids.append(fnid)
1216         bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq), self.cfgData)
1217         if self.rqdata.taskData.abort:
1218             self.rq.state = runQueueCleanUp
1219
1220     def task_skip(self, task):
1221         self.runq_running[task] = 1
1222         self.runq_buildable[task] = 1
1223         self.task_completeoutright(task)
1224         self.stats.taskCompleted()
1225         self.stats.taskSkipped()
1226
1227     def execute(self):
1228         """
1229         Run the tasks in a queue prepared by rqdata.prepare()
1230         """
1231
1232         if self.stats.total == 0:
1233             # nothing to do
1234             self.rq.state = runQueueCleanUp
1235
1236         task = self.sched.next()
1237         if task is not None:
1238             fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1239
1240             taskname = self.rqdata.runq_task[task]
1241             if self.rq.check_stamp_task(task, taskname):
1242                 logger.debug(2, "Stamp current task %s (%s)", task,
1243                                 self.rqdata.get_user_idstring(task))
1244                 self.task_skip(task)
1245                 return True
1246             elif self.cooker.configuration.dry_run:
1247                 self.runq_running[task] = 1
1248                 self.runq_buildable[task] = 1
1249                 self.stats.taskActive()
1250                 self.task_complete(task)
1251                 return True
1252
1253             taskdep = self.rqdata.dataCache.task_deps[fn]
1254             if 'noexec' in taskdep and taskname in taskdep['noexec']:
1255                 startevent = runQueueTaskStarted(task, self.stats, self.rq,
1256                                                  noexec=True)
1257                 bb.event.fire(startevent, self.cfgData)
1258                 self.runq_running[task] = 1
1259                 self.stats.taskActive()
1260                 bb.build.make_stamp(taskname, self.rqdata.dataCache, fn)
1261                 self.task_complete(task)
1262                 return True
1263             else:
1264                 startevent = runQueueTaskStarted(task, self.stats, self.rq)
1265                 bb.event.fire(startevent, self.cfgData)
1266
1267             pid, pipein, pipeout = self.fork_off_task(fn, task, taskname)
1268
1269             self.build_pids[pid] = task
1270             self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
1271             self.runq_running[task] = 1
1272             self.stats.taskActive()
1273
1274         for pipe in self.build_pipes:
1275             self.build_pipes[pipe].read()
1276
1277         if self.stats.active > 0:
1278             if self.runqueue_process_waitpid() is None:
1279                 return 0.5
1280             return True
1281
1282         if len(self.failed_fnids) != 0:
1283             self.rq.state = runQueueFailed
1284             return True
1285
1286         # Sanity Checks
1287         for task in xrange(self.stats.total):
1288             if self.runq_buildable[task] == 0:
1289                 logger.error("Task %s never buildable!", task)
1290             if self.runq_running[task] == 0:
1291                 logger.error("Task %s never ran!", task)
1292             if self.runq_complete[task] == 0:
1293                 logger.error("Task %s never completed!", task)
1294         self.rq.state = runQueueComplete
1295         return True
1296
1297 class RunQueueExecuteScenequeue(RunQueueExecute):
1298     def __init__(self, rq):
1299         RunQueueExecute.__init__(self, rq)
1300
1301         self.scenequeue_covered = set()
1302         self.scenequeue_notcovered = set()
1303
1304         # If we don't have any setscene functions, skip this step
1305         if len(self.rqdata.runq_setscene) == 0:
1306             rq.scenequeue_covered = set()
1307             rq.state = runQueueRunInit
1308             return
1309
1310         self.stats = RunQueueStats(len(self.rqdata.runq_setscene))
1311
1312         endpoints = {}
1313         sq_revdeps = []
1314         sq_revdeps_new = []
1315         sq_revdeps_squash = []
1316
1317         # We need to construct a dependency graph for the setscene functions. Intermediate
1318         # dependencies between the setscene tasks only complicate the code. This code
1319         # therefore aims to collapse the huge runqueue dependency tree into a smaller one
1320         # only containing the setscene functions.
1321
1322         for task in xrange(self.stats.total):
1323             self.runq_running.append(0)
1324             self.runq_complete.append(0)
1325             self.runq_buildable.append(0)
1326
1327         for task in xrange(len(self.rqdata.runq_fnid)):
1328             sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task]))
1329             sq_revdeps_new.append(set())
1330             if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
1331                 endpoints[task] = None
1332
1333         for task in self.rqdata.runq_setscene:
1334             for dep in self.rqdata.runq_depends[task]:
1335                     endpoints[dep] = task
1336
1337         def process_endpoints(endpoints):
1338             newendpoints = {}
1339             for point, task in endpoints.items():
1340                 tasks = set()
1341                 if task:
1342                     tasks.add(task)
1343                 if sq_revdeps_new[point]:
1344                     tasks |= sq_revdeps_new[point]
1345                 sq_revdeps_new[point] = set()
1346                 for dep in self.rqdata.runq_depends[point]:
1347                     if point in sq_revdeps[dep]:
1348                         sq_revdeps[dep].remove(point)
1349                     if tasks:
1350                         sq_revdeps_new[dep] |= tasks
1351                     if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene:
1352                         newendpoints[dep] = task
1353             if len(newendpoints) != 0:
1354                 process_endpoints(newendpoints)
1355
1356         process_endpoints(endpoints)
1357
1358         for task in xrange(len(self.rqdata.runq_fnid)):
1359             if task in self.rqdata.runq_setscene:
1360                 deps = set()
1361                 for dep in sq_revdeps_new[task]:
1362                     deps.add(self.rqdata.runq_setscene.index(dep))
1363                 sq_revdeps_squash.append(deps)
1364             elif len(sq_revdeps_new[task]) != 0:
1365                 bb.msg.fatal(bb.msg.domain.RunQueue, "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
1366
1367         #for task in xrange(len(sq_revdeps_squash)):
1368         #    print "Task %s: %s.%s is %s " % (task, self.taskData.fn_index[self.runq_fnid[self.runq_setscene[task]]], self.runq_task[self.runq_setscene[task]] + "_setscene", sq_revdeps_squash[task])
1369
1370         self.sq_deps = []
1371         self.sq_revdeps = sq_revdeps_squash
1372         self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps)
1373
1374         for task in xrange(len(self.sq_revdeps)):
1375             self.sq_deps.append(set())
1376         for task in xrange(len(self.sq_revdeps)):
1377             for dep in self.sq_revdeps[task]:
1378                 self.sq_deps[dep].add(task)
1379
1380         for task in xrange(len(self.sq_revdeps)):
1381             if len(self.sq_revdeps[task]) == 0:
1382                 self.runq_buildable[task] = 1
1383
1384         if self.rq.hashvalidate:
1385             sq_hash = []
1386             sq_hashfn = []
1387             sq_fn = []
1388             sq_taskname = []
1389             sq_task = []
1390             noexec = []
1391             for task in xrange(len(self.sq_revdeps)):
1392                 realtask = self.rqdata.runq_setscene[task]
1393                 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1394                 taskname = self.rqdata.runq_task[realtask]
1395                 taskdep = self.rqdata.dataCache.task_deps[fn]
1396                 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1397                     noexec.append(task)
1398                     self.task_skip(task)
1399                     bb.build.make_stamp(taskname + "_setscene", self.rqdata.dataCache, fn)
1400                     continue
1401                 sq_fn.append(fn)
1402                 sq_hashfn.append(self.rqdata.dataCache.hashfn[fn])
1403                 sq_hash.append(self.rqdata.runq_hash[realtask])
1404                 sq_taskname.append(taskname)
1405                 sq_task.append(task)
1406             call = self.rq.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
1407             locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.configuration.data }
1408             valid = bb.utils.better_eval(call, locs)
1409
1410             valid_new = []
1411             for v in valid:
1412                 valid_new.append(sq_task[v])
1413
1414             for task in xrange(len(self.sq_revdeps)):
1415                 if task not in valid_new and task not in noexec:
1416                     logger.debug(2, 'No package found, so skipping setscene task %s',
1417                                  self.rqdata.get_user_idstring(task))
1418                     self.task_failoutright(task)
1419
1420         logger.info('Executing SetScene Tasks')
1421
1422         self.rq.state = runQueueSceneRun
1423
1424     def scenequeue_updatecounters(self, task):
1425         for dep in self.sq_deps[task]:
1426             self.sq_revdeps2[dep].remove(task)
1427             if len(self.sq_revdeps2[dep]) == 0:
1428                 self.runq_buildable[dep] = 1
1429
1430     def task_completeoutright(self, task):
1431         """
1432         Mark a task as completed
1433         Look at the reverse dependencies and mark any task with
1434         completed dependencies as buildable
1435         """
1436
1437         index = self.rqdata.runq_setscene[task]
1438         logger.debug(1, 'Found task %s which could be accelerated',
1439                         self.rqdata.get_user_idstring(index))
1440
1441         self.scenequeue_covered.add(task)
1442         self.scenequeue_updatecounters(task)
1443
1444     def task_complete(self, task):
1445         self.stats.taskCompleted()
1446         self.task_completeoutright(task)
1447
1448     def task_fail(self, task, result):
1449         self.stats.taskFailed()
1450         index = self.rqdata.runq_setscene[task]
1451         bb.event.fire(runQueueTaskFailed(task, self.stats, result, self), self.cfgData)
1452         self.scenequeue_notcovered.add(task)
1453         self.scenequeue_updatecounters(task)
1454
1455     def task_failoutright(self, task):
1456         self.runq_running[task] = 1
1457         self.runq_buildable[task] = 1
1458         self.stats.taskCompleted()
1459         self.stats.taskSkipped()
1460         index = self.rqdata.runq_setscene[task]
1461         self.scenequeue_notcovered.add(task)
1462         self.scenequeue_updatecounters(task)
1463
1464     def task_skip(self, task):
1465         self.runq_running[task] = 1
1466         self.runq_buildable[task] = 1
1467         self.task_completeoutright(task)
1468         self.stats.taskCompleted()
1469         self.stats.taskSkipped()
1470
1471     def execute(self):
1472         """
1473         Run the tasks in a queue prepared by prepare_runqueue
1474         """
1475
1476         task = None
1477         if self.stats.active < self.number_tasks:
1478             # Find the next setscene to run
1479             for nexttask in xrange(self.stats.total):
1480                 if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
1481                     task = nexttask
1482                     break
1483         if task is not None:
1484             realtask = self.rqdata.runq_setscene[task]
1485             fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1486
1487             taskname = self.rqdata.runq_task[realtask] + "_setscene"
1488             if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask]):
1489                 logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant',
1490                              task, self.rqdata.get_user_idstring(task))
1491                 self.task_failoutright(task)
1492                 return True
1493
1494             if self.cooker.configuration.force:
1495                 for target in self.rqdata.target_pairs:
1496                     if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
1497                         self.task_failoutright(task)
1498                         return True
1499
1500             if self.rq.check_stamp_task(realtask, taskname):
1501                 logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies',
1502                              task, self.rqdata.get_user_idstring(realtask))
1503                 self.task_skip(task)
1504                 return True
1505
1506             pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname)
1507
1508             self.build_pids[pid] = task
1509             self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
1510             self.runq_running[task] = 1
1511             self.stats.taskActive()
1512             if self.stats.active < self.number_tasks:
1513                 return True
1514
1515         for pipe in self.build_pipes:
1516             self.build_pipes[pipe].read()
1517
1518         if self.stats.active > 0:
1519             if self.runqueue_process_waitpid() is None:
1520                 return 0.5
1521             return True
1522
1523         # Convert scenequeue_covered task numbers into full taskgraph ids
1524         oldcovered = self.scenequeue_covered
1525         self.rq.scenequeue_covered = set()
1526         for task in oldcovered:
1527             self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
1528
1529         logger.debug(1, 'We can skip tasks %s', self.rq.scenequeue_covered)
1530
1531         self.rq.state = runQueueRunInit
1532         return True
1533
1534     def fork_off_task(self, fn, task, taskname):
1535         return RunQueueExecute.fork_off_task(self, fn, task, taskname, quieterrors=True)
1536
1537 class TaskFailure(Exception):
1538     """
1539     Exception raised when a task in a runqueue fails
1540     """
1541     def __init__(self, x):
1542         self.args = x
1543
1544
1545 class runQueueExitWait(bb.event.Event):
1546     """
1547     Event when waiting for task processes to exit
1548     """
1549
1550     def __init__(self, remain):
1551         self.remain = remain
1552         self.message = "Waiting for %s active tasks to finish" % remain
1553         bb.event.Event.__init__(self)
1554
1555 class runQueueEvent(bb.event.Event):
1556     """
1557     Base runQueue event class
1558     """
1559     def __init__(self, task, stats, rq):
1560         self.taskid = task
1561         self.taskstring = rq.rqdata.get_user_idstring(task)
1562         self.stats = stats.copy()
1563         bb.event.Event.__init__(self)
1564
1565 class runQueueTaskStarted(runQueueEvent):
1566     """
1567     Event notifing a task was started
1568     """
1569     def __init__(self, task, stats, rq, noexec=False):
1570         runQueueEvent.__init__(self, task, stats, rq)
1571         self.noexec = noexec
1572
1573 class runQueueTaskFailed(runQueueEvent):
1574     """
1575     Event notifing a task failed
1576     """
1577     def __init__(self, task, stats, exitcode, rq):
1578         runQueueEvent.__init__(self, task, stats, rq)
1579         self.exitcode = exitcode
1580
1581 class runQueueTaskCompleted(runQueueEvent):
1582     """
1583     Event notifing a task completed
1584     """
1585
1586 def check_stamp_fn(fn, taskname, d):
1587     rqexe = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)
1588     fn = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", d)
1589     fnid = rqexe.rqdata.taskData.getfn_id(fn)
1590     taskid = rqexe.rqdata.get_task_id(fnid, taskname)
1591     if taskid is not None:
1592         return rqexe.rq.check_stamp_task(taskid)
1593     return None
1594
1595 class runQueuePipe():
1596     """
1597     Abstraction for a pipe between a worker thread and the server
1598     """
1599     def __init__(self, pipein, pipeout, d):
1600         self.input = pipein
1601         pipeout.close()
1602         fcntl.fcntl(self.input, fcntl.F_SETFL, fcntl.fcntl(self.input, fcntl.F_GETFL) | os.O_NONBLOCK)
1603         self.queue = ""
1604         self.d = d
1605
1606     def read(self):
1607         start = len(self.queue)
1608         try:
1609             self.queue = self.queue + self.input.read(102400)
1610         except (OSError, IOError):
1611             pass
1612         end = len(self.queue)
1613         index = self.queue.find("</event>")
1614         while index != -1:
1615             bb.event.fire_from_worker(self.queue[:index+8], self.d)
1616             self.queue = self.queue[index+8:]
1617             index = self.queue.find("</event>")
1618         return (end > start)
1619
1620     def close(self):
1621         while self.read():
1622             continue
1623         if len(self.queue) > 0:
1624             print("Warning, worker left partial message: %s" % self.queue)
1625         self.input.close()