runqueue.py: Message fix from hrw
[bitbake.git] / lib / bb / runqueue.py
1 #!/usr/bin/env python
2 # ex:ts=4:sw=4:sts=4:et
3 # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4 """
5 BitBake 'RunQueue' implementation
6
7 Handles preparation and execution of a queue of tasks
8 """
9
10 # Copyright (C) 2006  Richard Purdie
11 #
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License version 2 as
14 # published by the Free Software Foundation.
15 #
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 # GNU General Public License for more details.
20 #
21 # You should have received a copy of the GNU General Public License along
22 # with this program; if not, write to the Free Software Foundation, Inc.,
23 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25 from bb import msg, data, fetch, event, mkdirhier, utils
26 from sets import Set 
27 import bb, os, sys
28 import signal
29
30 class TaskFailure(Exception):
31     """Exception raised when a task in a runqueue fails"""
32     def __init__(self, x): 
33         self.args = x
34
35
36 class RunQueueStats:
37     """
38     Holds statistics on the tasks handled by the associated runQueue
39     """
40     def __init__(self):
41         self.completed = 0
42         self.skipped = 0
43         self.failed = 0
44
45     def taskFailed(self):
46         self.failed = self.failed + 1
47
48     def taskCompleted(self):
49         self.completed = self.completed + 1
50
51     def taskSkipped(self):
52         self.skipped = self.skipped + 1
53
54 class RunQueue:
55     """
56     BitBake Run Queue implementation
57     """
58     def __init__(self, cooker, cfgData, dataCache, taskData, targets):
59         self.reset_runqueue()
60         self.cooker = cooker
61         self.dataCache = dataCache
62         self.taskData = taskData
63         self.targets = targets
64
65         self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData) or 1)
66
67     def reset_runqueue(self):
68
69         self.runq_fnid = []
70         self.runq_task = []
71         self.runq_depends = []
72         self.runq_revdeps = []
73         self.runq_weight = []
74         self.prio_map = []
75
76     def get_user_idstring(self, task):
77         fn = self.taskData.fn_index[self.runq_fnid[task]]
78         taskname = self.runq_task[task]
79         return "%s, %s" % (fn, taskname)
80
81     def prepare_runqueue(self):
82         """
83         Turn a set of taskData into a RunQueue and compute data needed 
84         to optimise the execution order.
85         """
86
87         depends = []
88         runq_weight1 = []
89         runq_build = []
90         runq_done = []
91
92         taskData = self.taskData
93
94         if len(taskData.tasks_name) == 0:
95             # Nothing to do
96             return
97
98         bb.msg.note(1, bb.msg.domain.RunQueue, "Preparing runqueue")
99
100         for task in range(len(taskData.tasks_name)):
101             fnid = taskData.tasks_fnid[task]
102             fn = taskData.fn_index[fnid]
103             task_deps = self.dataCache.task_deps[fn]
104
105             if fnid not in taskData.failed_fnids:
106
107                 depends = taskData.tasks_tdepends[task]
108
109                 # Resolve Depends
110                 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
111                     taskname = task_deps['deptask'][taskData.tasks_name[task]]
112                     for depid in taskData.depids[fnid]:
113                         if depid in taskData.build_targets:
114                             depdata = taskData.build_targets[depid][0]
115                             if depdata:
116                                 dep = taskData.fn_index[depdata]
117                                 depends.append(taskData.gettask_id(dep, taskname))
118
119                 # Resolve Runtime Depends
120                 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
121                     taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
122                     for depid in taskData.rdepids[fnid]:
123                         if depid in taskData.run_targets:
124                             depdata = taskData.run_targets[depid][0]
125                             if depdata:
126                                 dep = taskData.fn_index[depdata]
127                                 depends.append(taskData.gettask_id(dep, taskname))
128
129                 idepends = taskData.tasks_idepends[task]
130                 for idepend in idepends:
131                     depid = int(idepend.split(":")[0])
132                     if depid in taskData.build_targets:
133                         depdata = taskData.build_targets[depid][0]
134                         if depdata:
135                             dep = taskData.fn_index[depdata]
136                             depends.append(taskData.gettask_id(dep, idepend.split(":")[1]))
137
138                 def add_recursive_build(depid):
139                     """
140                     Add build depends of depid to depends
141                     (if we've not see it before)
142                     (calls itself recursively)
143                     """
144                     if str(depid) in dep_seen:
145                         return
146                     dep_seen.append(depid)
147                     if depid in taskData.build_targets:
148                         depdata = taskData.build_targets[depid][0]
149                         if depdata:
150                             dep = taskData.fn_index[depdata]
151                             # Need to avoid creating new tasks here
152                             taskid = taskData.gettask_id(dep, taskname, False)
153                             if taskid:
154                                 depends.append(taskid)
155                                 fnid = taskData.tasks_fnid[taskid]
156                             else:
157                                 fnid = taskData.getfn_id(dep)
158                             for nextdepid in taskData.depids[fnid]:
159                                 if nextdepid not in dep_seen:
160                                     add_recursive_build(nextdepid)
161                             for nextdepid in taskData.rdepids[fnid]:
162                                 if nextdepid not in rdep_seen:
163                                     add_recursive_run(nextdepid)
164
165                 def add_recursive_run(rdepid):
166                     """
167                     Add runtime depends of rdepid to depends
168                     (if we've not see it before)
169                     (calls itself recursively)
170                     """
171                     if str(rdepid) in rdep_seen:
172                         return
173                     rdep_seen.append(rdepid)
174                     if rdepid in taskData.run_targets:
175                         depdata = taskData.run_targets[rdepid][0]
176                         if depdata:
177                             dep = taskData.fn_index[depdata]
178                             # Need to avoid creating new tasks here
179                             taskid = taskData.gettask_id(dep, taskname, False)
180                             if taskid:
181                                 depends.append(taskid)
182                                 fnid = taskData.tasks_fnid[taskid]
183                             else:
184                                 fnid = taskData.getfn_id(dep)
185                             for nextdepid in taskData.depids[fnid]:
186                                 if nextdepid not in dep_seen:
187                                     add_recursive_build(nextdepid)
188                             for nextdepid in taskData.rdepids[fnid]:
189                                 if nextdepid not in rdep_seen:
190                                     add_recursive_run(nextdepid)
191
192
193                 # Resolve Recursive Runtime Depends
194                 # Also includes all Build Depends (and their runtime depends)
195                 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
196                     for taskname in task_deps['recrdeptask'][taskData.tasks_name[task]].split():
197                         dep_seen = []
198                         rdep_seen = []
199                         for depid in taskData.depids[fnid]:
200                             add_recursive_build(depid)
201                         for rdepid in taskData.rdepids[fnid]:
202                             add_recursive_run(rdepid)
203
204                 #Prune self references
205                 if task in depends:
206                     newdep = []
207                     bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s %s) contains self reference! %s" % (task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends))
208                     for dep in depends:
209                        if task != dep:
210                           newdep.append(dep)
211                     depends = newdep
212
213
214             self.runq_fnid.append(taskData.tasks_fnid[task])
215             self.runq_task.append(taskData.tasks_name[task])
216             self.runq_depends.append(Set(depends))
217             self.runq_revdeps.append(Set())
218             self.runq_weight.append(0)
219
220             runq_weight1.append(0)
221             runq_build.append(0)
222             runq_done.append(0)
223
224         bb.msg.note(2, bb.msg.domain.RunQueue, "Marking Active Tasks")
225
226         def mark_active(listid, depth):
227             """
228             Mark an item as active along with its depends
229             (calls itself recursively)
230             """
231
232             if runq_build[listid] == 1:
233                 return
234
235             runq_build[listid] = 1
236
237             depends = self.runq_depends[listid]
238             for depend in depends:
239                 mark_active(depend, depth+1)
240
241         for target in self.targets:
242             targetid = taskData.getbuild_id(target[0])
243
244             if targetid not in taskData.build_targets:
245                 continue
246
247             if targetid in taskData.failed_deps:
248                 continue
249
250             fnid = taskData.build_targets[targetid][0]
251
252             # Remove stamps for targets if force mode active
253             if self.cooker.configuration.force:
254                 fn = taskData.fn_index[fnid]
255                 bb.msg.note(2, bb.msg.domain.RunQueue, "Remove stamp %s, %s" % (target[1], fn))
256                 bb.build.del_stamp(target[1], self.dataCache, fn)
257
258             if fnid in taskData.failed_fnids:
259                 continue
260
261             listid = taskData.tasks_lookup[fnid][target[1]]
262
263             mark_active(listid, 1)
264
265         # Prune inactive tasks
266         maps = []
267         delcount = 0
268         for listid in range(len(self.runq_fnid)):
269             if runq_build[listid-delcount] == 1:
270                 maps.append(listid-delcount)
271             else:
272                 del self.runq_fnid[listid-delcount]
273                 del self.runq_task[listid-delcount]
274                 del self.runq_depends[listid-delcount]
275                 del self.runq_weight[listid-delcount]
276                 del runq_weight1[listid-delcount]
277                 del runq_build[listid-delcount]
278                 del runq_done[listid-delcount]
279                 del self.runq_revdeps[listid-delcount]
280                 delcount = delcount + 1
281                 maps.append(-1)
282
283         if len(self.runq_fnid) == 0:
284             if not taskData.abort:
285                 bb.msg.note(1, bb.msg.domain.RunQueue, "All possible tasks have been run but build incomplete (--continue mode). See errors above for incomplete tasks.")
286                 return
287             bb.msg.fatal(bb.msg.domain.RunQueue, "No active tasks and not in --continue mode?! Please report this bug.")
288
289         bb.msg.note(2, bb.msg.domain.RunQueue, "Pruned %s inactive tasks, %s left" % (delcount, len(self.runq_fnid)))
290
291         for listid in range(len(self.runq_fnid)):
292             newdeps = []
293             origdeps = self.runq_depends[listid]
294             for origdep in origdeps:
295                 if maps[origdep] == -1:
296                     bb.msg.fatal(bb.msg.domain.RunQueue, "Invalid mapping - Should never happen!")
297                 newdeps.append(maps[origdep])
298             self.runq_depends[listid] = Set(newdeps)
299
300         bb.msg.note(2, bb.msg.domain.RunQueue, "Assign Weightings")
301
302         for listid in range(len(self.runq_fnid)):
303             for dep in self.runq_depends[listid]:
304                 self.runq_revdeps[dep].add(listid)
305
306         endpoints = []
307         for listid in range(len(self.runq_fnid)):
308             revdeps = self.runq_revdeps[listid]
309             if len(revdeps) == 0:
310                 runq_done[listid] = 1
311                 self.runq_weight[listid] = 1
312                 endpoints.append(listid)
313             for dep in revdeps:
314                 if dep in self.runq_depends[listid]:
315                     #self.dump_data(taskData)
316                     bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep] , taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
317             runq_weight1[listid] = len(revdeps)
318
319         bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints))
320
321         while 1:
322             next_points = []
323             for listid in endpoints:
324                 for revdep in self.runq_depends[listid]:
325                     self.runq_weight[revdep] = self.runq_weight[revdep] + self.runq_weight[listid]
326                     runq_weight1[revdep] = runq_weight1[revdep] - 1
327                     if runq_weight1[revdep] == 0:
328                         next_points.append(revdep)
329                         runq_done[revdep] = 1
330             endpoints = next_points
331             if len(next_points) == 0:
332                 break           
333
334         # Sanity Checks
335         for task in range(len(self.runq_fnid)):
336             if runq_done[task] == 0:
337                 seen = []
338                 deps_seen = []
339                 def print_chain(taskid, finish):
340                     seen.append(taskid)
341                     for revdep in self.runq_revdeps[taskid]:
342                         if runq_done[revdep] == 0 and revdep not in seen and not finish:
343                             bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) (depends: %s)" % (revdep, self.get_user_idstring(revdep), self.runq_depends[revdep]))
344                             if revdep in deps_seen:
345                                 bb.msg.error(bb.msg.domain.RunQueue, "Chain ends at Task %s (%s)" % (revdep, self.get_user_idstring(revdep)))
346                                 finish = True
347                                 return
348                             for dep in self.runq_depends[revdep]:
349                                 deps_seen.append(dep)
350                             print_chain(revdep, finish)
351                 print_chain(task, False)
352                 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) not processed!\nThis is probably a circular dependency (the chain might be printed above)." % (task, self.get_user_idstring(task)))
353             if runq_weight1[task] != 0:
354                 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) count not zero!" % (task, self.get_user_idstring(task)))
355
356         # Make a weight sorted map
357         from copy import deepcopy
358
359         sortweight = deepcopy(self.runq_weight)
360         sortweight.sort()
361         copyweight = deepcopy(self.runq_weight)
362         self.prio_map = []
363
364         for weight in sortweight:
365             idx = copyweight.index(weight)
366             self.prio_map.append(idx)
367             copyweight[idx] = -1
368         self.prio_map.reverse()
369
370         #self.dump_data(taskData)
371
372     def execute_runqueue(self):
373         """
374         Run the tasks in a queue prepared by prepare_runqueue
375         Upon failure, optionally try to recover the build using any alternate providers
376         (if the abort on failure configuration option isn't set)
377         """
378
379         failures = 0
380         while 1:
381             failed_fnids = []
382             try:
383                 self.execute_runqueue_internal()
384             finally:
385                 if self.master_process:
386                     failed_fnids = self.finish_runqueue()
387             if len(failed_fnids) == 0:
388                 return failures
389             if self.taskData.abort:
390                 raise bb.runqueue.TaskFailure(failed_fnids)
391             for fnid in failed_fnids:
392                 #print "Failure: %s %s %s" % (fnid, self.taskData.fn_index[fnid],  self.runq_task[fnid])
393                 self.taskData.fail_fnid(fnid)
394                 failures = failures + 1
395             self.reset_runqueue()
396             self.prepare_runqueue()
397
398     def execute_runqueue_initVars(self):
399
400         self.stats = RunQueueStats()
401
402         self.active_builds = 0
403         self.runq_buildable = []
404         self.runq_running = []
405         self.runq_complete = []
406         self.build_pids = {}
407         self.failed_fnids = []
408         self.master_process = True
409
410         # Mark initial buildable tasks
411         for task in range(len(self.runq_fnid)):
412             self.runq_running.append(0)
413             self.runq_complete.append(0)
414             if len(self.runq_depends[task]) == 0:
415                 self.runq_buildable.append(1)
416             else:
417                 self.runq_buildable.append(0)
418
419     def task_complete(self, task):
420         """
421         Mark a task as completed
422         Look at the reverse dependencies and mark any task with 
423         completed dependencies as buildable
424         """
425         self.runq_complete[task] = 1
426         for revdep in self.runq_revdeps[task]:
427             if self.runq_running[revdep] == 1:
428                 continue
429             if self.runq_buildable[revdep] == 1:
430                 continue
431             alldeps = 1
432             for dep in self.runq_depends[revdep]:
433                 if self.runq_complete[dep] != 1:
434                     alldeps = 0
435             if alldeps == 1:
436                 self.runq_buildable[revdep] = 1
437                 fn = self.taskData.fn_index[self.runq_fnid[revdep]]
438                 taskname = self.runq_task[revdep]
439                 bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname))
440
441     def get_next_task(self):
442         """
443         Return the id of the highest priority task that is buildable
444         """
445         for task1 in range(len(self.runq_fnid)):
446             task = self.prio_map[task1]
447             if self.runq_running[task] == 1:
448                 continue
449             if self.runq_buildable[task] == 1:
450                 return task
451         return None
452
453     def execute_runqueue_internal(self):
454         """
455         Run the tasks in a queue prepared by prepare_runqueue
456         """
457
458         bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue")
459
460         self.execute_runqueue_initVars()
461
462         if len(self.runq_fnid) == 0:
463             # nothing to do
464             return []
465
466         def sigint_handler(signum, frame):
467             raise KeyboardInterrupt
468
469         while True:
470             task = self.get_next_task()
471             if task is not None:
472                 fn = self.taskData.fn_index[self.runq_fnid[task]]
473
474                 taskname = self.runq_task[task]
475                 if bb.build.stamp_is_current(taskname, self.dataCache, fn):
476                     bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task)))
477                     self.runq_running[task] = 1
478                     self.task_complete(task)
479                     self.stats.taskCompleted()
480                     self.stats.taskSkipped()
481                     continue
482
483                 bb.msg.note(1, bb.msg.domain.RunQueue, "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.active_builds + 1, len(self.runq_fnid), task, self.get_user_idstring(task)))
484                 try: 
485                     pid = os.fork() 
486                 except OSError, e: 
487                     bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
488                 if pid == 0:
489                     # Bypass master process' handling
490                     self.master_process = False
491                     # Stop Ctrl+C being sent to children
492                     # signal.signal(signal.SIGINT, signal.SIG_IGN)
493                     # Make the child the process group leader
494                     os.setpgid(0, 0)
495                     newsi = os.open('/dev/null', os.O_RDWR)
496                     os.dup2(newsi, sys.stdin.fileno())
497                     self.cooker.configuration.cmd = taskname[3:]
498                     try: 
499                         self.cooker.tryBuild(fn, False)
500                     except bb.build.EventException:
501                         bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
502                         sys.exit(1)
503                     except:
504                         bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
505                         raise
506                     sys.exit(0)
507                 self.build_pids[pid] = task
508                 self.runq_running[task] = 1
509                 self.active_builds = self.active_builds + 1
510                 if self.active_builds < self.number_tasks:
511                     continue
512             if self.active_builds > 0:
513                 result = os.waitpid(-1, 0)
514                 self.active_builds = self.active_builds - 1
515                 task = self.build_pids[result[0]]
516                 if result[1] != 0:
517                     del self.build_pids[result[0]]
518                     bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task)))
519                     self.failed_fnids.append(self.runq_fnid[task])
520                     self.stats.taskFailed()
521                     break
522                 self.task_complete(task)
523                 self.stats.taskCompleted()
524                 del self.build_pids[result[0]]
525                 continue
526             return
527
528     def finish_runqueue(self):
529         try:
530             while self.active_builds > 0:
531                 bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.active_builds)
532                 tasknum = 1
533                 for k, v in self.build_pids.iteritems():
534                      bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v), k))
535                      tasknum = tasknum + 1
536                 result = os.waitpid(-1, 0)
537                 task = self.build_pids[result[0]]
538                 if result[1] != 0:
539                      bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task)))
540                      self.failed_fnids.append(self.runq_fnid[task])
541                      self.stats.taskFailed()
542                 del self.build_pids[result[0]]
543                 self.active_builds = self.active_builds - 1
544             bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
545             return self.failed_fnids
546         except KeyboardInterrupt:
547             bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.active_builds)
548             for k, v in self.build_pids.iteritems():
549                  try:
550                      os.kill(-k, signal.SIGINT)
551                  except:
552                      pass
553             raise
554
555         # Sanity Checks
556         for task in range(len(self.runq_fnid)):
557             if self.runq_buildable[task] == 0:
558                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task)
559             if self.runq_running[task] == 0:
560                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task)
561             if self.runq_complete[task] == 0:
562                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task)
563
564         bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
565
566         return self.failed_fnids
567
568     def dump_data(self, taskQueue):
569         """
570         Dump some debug information on the internal data structures
571         """
572         bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:")
573         for task in range(len(self.runq_fnid)):
574                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
575                         taskQueue.fn_index[self.runq_fnid[task]], 
576                         self.runq_task[task], 
577                         self.runq_weight[task], 
578                         self.runq_depends[task], 
579                         self.runq_revdeps[task]))
580
581         bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:")
582         for task1 in range(len(self.runq_fnid)):
583             if task1 in self.prio_map:
584                 task = self.prio_map[task1]
585                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
586                         taskQueue.fn_index[self.runq_fnid[task]], 
587                         self.runq_task[task], 
588                         self.runq_weight[task], 
589                         self.runq_depends[task], 
590                         self.runq_revdeps[task]))