runqueue.py: Fix the case where BB_NUMBER_THREADS is unset
[bitbake.git] / lib / bb / runqueue.py
1 #!/usr/bin/env python
2 # ex:ts=4:sw=4:sts=4:et
3 # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4 """
5 BitBake 'RunQueue' implementation
6
7 Handles preparation and execution of a queue of tasks
8
9 Copyright (C) 2006  Richard Purdie
10
11 This program is free software; you can redistribute it and/or modify it under
12 the terms of the GNU General Public License version 2 as published by the Free 
13 Software Foundation
14
15 This program is distributed in the hope that it will be useful, but WITHOUT
16 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License along with
20 """
21
22 from bb import msg, data, fetch, event, mkdirhier, utils
23 import bb, os, sys
24
25 class TaskFailure(Exception):
26     """Exception raised when a task in a runqueue fails"""
27
28     def __init__(self, fnid, taskname):
29         self.args = fnid, taskname
30
31 class RunQueue:
32     """
33     BitBake Run Queue implementation
34     """
35     def __init__(self):
36         self.reset_runqueue()
37
38     def reset_runqueue(self):
39         self.runq_fnid = []
40         self.runq_task = []
41         self.runq_depends = []
42         self.runq_revdeps = []
43         self.runq_weight = []
44         self.prio_map = []
45
46     def prepare_runqueue(self, cfgData, dataCache, taskData, targets):
47         """
48         Turn a set of taskData into a RunQueue and compute data needed 
49         to optimise the execution order.
50         targets is list of paired values - a provider name and the task to run
51         """
52
53         depends = []
54         runq_weight1 = []
55         runq_build = []
56         runq_done = []
57
58         bb.msg.note(1, bb.msg.domain.RunQueue, "Preparing Runqueue")
59
60         for task in range(len(taskData.tasks_name)):
61             fnid = taskData.tasks_fnid[task]
62             fn = taskData.fn_index[fnid]
63             task_deps = dataCache.task_deps[fn]
64
65             if fnid not in taskData.failed_fnids:
66
67                 depends = taskData.tasks_tdepends[task]
68
69                 # Resolve Depends
70                 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
71                     taskname = task_deps['deptask'][taskData.tasks_name[task]]
72                     for depid in taskData.depids[fnid]:
73                         if depid in taskData.build_targets:
74                             depdata = taskData.build_targets[depid][0]
75                             if depdata:
76                                 dep = taskData.fn_index[depdata]
77                                 depends.append(taskData.gettask_id(dep, taskname))
78
79                 # Resolve Runtime Depends
80                 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
81                     taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
82                     for depid in taskData.rdepids[fnid]:
83                         if depid in taskData.run_targets:
84                             depdata = taskData.run_targets[depid][0]
85                             if depdata:
86                                 dep = taskData.fn_index[depdata]
87                                 depends.append(taskData.gettask_id(dep, taskname))
88
89                 def add_recursive_run(rdepid):
90                     """
91                     Add runtime depends of rdepid to depends, if 
92                     we've not see it before
93                     (calls itself recursively)
94                     """
95                     if str(rdepid) in rdep_seen:
96                         return
97                     rdep_seen.append(rdepid)
98                     if rdepid in taskData.run_targets:
99                         depdata = taskData.run_targets[rdepid][0]
100                         if depdata:
101                             dep = taskData.fn_index[depdata]
102                             taskid = taskData.gettask_id(dep, taskname)
103                             depends.append(taskid)
104                             fnid = taskData.tasks_fnid[taskid]
105                             for nextdepid in taskData.rdepids[fnid]:
106                                 if nextdepid not in rdep_seen:
107                                     add_recursive_run(nextdepid)
108
109                 # Resolve Recursive Runtime Depends
110                 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
111                     rdep_seen = []
112                     taskname = task_deps['recrdeptask'][taskData.tasks_name[task]]
113                     for rdepid in taskData.rdepids[fnid]:
114                         add_recursive_run(rdepid)
115
116                 #Prune self references
117                 if task in depends:
118                     newdep = []
119                     bb.debug(2, "Task %s (%s %s) contains self reference! %s" % (task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends))
120                     for dep in depends:
121                        if task != dep:
122                           newdep.append(dep)
123                     depends = newdep
124
125
126             self.runq_fnid.append(taskData.tasks_fnid[task])
127             self.runq_task.append(taskData.tasks_name[task])
128             self.runq_depends.append(depends)
129             self.runq_revdeps.append([])
130             self.runq_weight.append(0)
131
132             runq_weight1.append(0)
133             runq_build.append(0)
134             runq_done.append(0)
135
136         bb.msg.note(2, bb.msg.domain.RunQueue, "Marking Active Tasks")
137
138         def mark_active(listid, depth):
139             """
140             Mark an item as active along with its depends
141             (calls itself recursively)
142             """
143
144             if runq_build[listid] == 1:
145                 return
146
147             runq_build[listid] = 1
148
149             depends = self.runq_depends[listid]
150             for depend in depends:
151                 mark_active(depend, depth+1)
152
153         for target in targets:
154             targetid = taskData.getbuild_id(target[0])
155             if targetid in taskData.failed_deps:
156                 continue
157
158             fnid = taskData.build_targets[targetid][0]
159             if fnid in taskData.failed_fnids:
160                 continue
161
162             fnids = taskData.matches_in_list(self.runq_fnid, fnid)
163             tasks = taskData.matches_in_list(self.runq_task, target[1])
164
165             listid = taskData.both_contain(fnids, tasks)
166
167             mark_active(listid, 1)
168
169         # Prune inactive tasks
170         maps = []
171         delcount = 0
172         for listid in range(len(self.runq_fnid)):
173             if runq_build[listid-delcount] == 1:
174                 maps.append(listid-delcount)
175             else:
176                 del self.runq_fnid[listid-delcount]
177                 del self.runq_task[listid-delcount]
178                 del self.runq_depends[listid-delcount]
179                 del self.runq_weight[listid-delcount]
180                 del runq_weight1[listid-delcount]
181                 del runq_build[listid-delcount]
182                 del runq_done[listid-delcount]
183                 del self.runq_revdeps[listid-delcount]
184                 delcount = delcount + 1
185                 maps.append(-1)
186
187         bb.msg.note(2, bb.msg.domain.RunQueue, "Pruned %s inactive tasks, %s left" % (delcount, len(self.runq_fnid)))
188
189         for listid in range(len(self.runq_fnid)):
190             newdeps = []
191             origdeps = self.runq_depends[listid]
192             for origdep in origdeps:
193                 if maps[origdep] == -1:
194                     bb.fatal(bb.msg.domain.RunQueue, "Invalid mapping - Should never happen!")
195                 newdeps.append(maps[origdep])
196             self.runq_depends[listid] = newdeps
197
198         bb.msg.note(2, bb.msg.domain.RunQueue, "Assign Weightings")
199
200         for listid in range(len(self.runq_fnid)):
201             for dep in self.runq_depends[listid]:
202                 if dep not in self.runq_revdeps[dep]:
203                     self.runq_revdeps[dep].append(listid)
204
205         endpoints = []
206         for listid in range(len(self.runq_fnid)):
207             revdeps = self.runq_revdeps[listid]
208             if len(revdeps) == 0:
209                 runq_done[listid] = 1
210                 self.runq_weight[listid] = 1
211                 endpoints.append(listid)
212             for dep in revdeps:
213                 if dep in self.runq_depends[listid]:
214                     #self.dump_data(taskData)
215                     bb.fatal("Task %s has circular dependency on %s" % (dep, listid))
216             runq_weight1[listid] = len(revdeps)
217
218         bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints))
219
220         while 1:
221             next_points = []
222             for listid in endpoints:
223                 for revdep in self.runq_depends[listid]:
224                     self.runq_weight[revdep] = self.runq_weight[revdep] + self.runq_weight[listid]
225                     runq_weight1[revdep] = runq_weight1[revdep] - 1
226                     if runq_weight1[revdep] == 0:
227                         next_points.append(revdep)
228                         runq_done[revdep] = 1
229             endpoints = next_points
230             if len(next_points) == 0:
231                 break           
232
233         # Sanity Checks
234         for task in range(len(self.runq_fnid)):
235             if runq_done[task] == 0:
236                 bb.fatal("Task %s not processed!" % task)
237             if runq_weight1[task] != 0:
238                 bb.fatal("Task %s count not zero!" % task)
239
240         # Make a weight sorted map
241         from copy import deepcopy
242
243         sortweight = deepcopy(self.runq_weight)
244         sortweight.sort()
245         copyweight = deepcopy(self.runq_weight)
246         self.prio_map = []
247
248         for weight in sortweight:
249             idx = copyweight.index(weight)
250             self.prio_map.append(idx)
251             copyweight[idx] = -1
252         self.prio_map.reverse()
253
254     def execute_runqueue(self, cooker, cfgData, dataCache, taskData, runlist):
255         """
256         Run the tasks in a queue prepared by prepare_runqueue
257         Upon failure, optionally try to recover the build using any alternate providers
258         (if the abort on failure configuration option isn't set)
259         """
260
261         failures = 0
262         while 1:
263             try:
264                 self.execute_runqueue_internal(cooker, cfgData, dataCache, taskData)
265                 return failures
266             except bb.runqueue.TaskFailure, (fnid, taskname):
267                 taskData.fail_fnid(fnid)
268                 self.reset_runqueue()
269                 self.prepare_runqueue(cfgData, dataCache, taskData, runlist)
270                 failures = failures + 1
271
272     def execute_runqueue_internal(self, cooker, cfgData, dataCache, taskData):
273         """
274         Run the tasks in a queue prepared by prepare_runqueue
275         """
276
277         bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue")
278
279         runq_buildable = []
280         runq_running = []
281         runq_complete = []
282         active_builds = 0
283         build_pids = {}
284
285         def get_next_task(data):
286             """
287             Return the id of the highest priority task that is buildable
288             """
289             for task1 in range(len(data.runq_fnid)):
290                 task = data.prio_map[task1]
291                 if runq_running[task] == 1:
292                     continue
293                 if runq_buildable[task] == 1:
294                     return task
295             return None
296
297         def task_complete(data, task):
298             """
299             Mark a task as completed
300             Look at the reverse dependencies and mark any task with 
301             completed dependencies as buildable
302             """
303             runq_complete[task] = 1
304             for revdep in data.runq_revdeps[task]:
305                 if runq_running[revdep] == 1:
306                     continue
307                 if runq_buildable[revdep] == 1:
308                     continue
309                 alldeps = 1
310                 for dep in data.runq_depends[revdep]:
311                     if runq_complete[dep] != 1:
312                         alldeps = 0
313                 if alldeps == 1:
314                     runq_buildable[revdep] = 1
315                     fn = taskData.fn_index[self.runq_fnid[revdep]]
316                     taskname = self.runq_task[revdep]
317                     bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname))
318
319         # Mark initial buildable tasks
320         for task in range(len(self.runq_fnid)):
321             runq_running.append(0)
322             runq_complete.append(0)
323             if len(self.runq_depends[task]) == 0:
324                 runq_buildable.append(1)
325             else:
326                 runq_buildable.append(0)
327
328         def get_user_idstring(task):
329             fn = taskData.fn_index[self.runq_fnid[task]]
330             taskname = self.runq_task[task]
331             return "%s, %s" % (fn, taskname)
332
333         number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData) or 1)
334
335         try:
336             while 1:
337                 task = get_next_task(self)
338                 if task is not None:
339                     fn = taskData.fn_index[self.runq_fnid[task]]
340                     taskname = self.runq_task[task]
341
342                     if bb.build.stamp_is_current_cache(dataCache, fn, taskname):
343                         bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, get_user_idstring(task)))
344                         runq_running[task] = 1
345                         task_complete(self, task)
346                         continue
347
348                     bb.msg.debug(1, bb.msg.domain.RunQueue, "Running task %s (%s)" % (task, get_user_idstring(task)))
349                     try: 
350                         pid = os.fork() 
351                     except OSError, e: 
352                         bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
353                     if pid == 0:
354                         cooker.configuration.cmd = taskname[3:]
355                         try: 
356                             cooker.tryBuild(fn, False)
357                         except bb.build.EventException, e:
358                             bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
359                             sys.exit(1)
360                         except:
361                             sys.exit(1)
362                         sys.exit(0)
363                     build_pids[pid] = task
364                     runq_running[task] = 1
365                     active_builds = active_builds + 1
366                     if active_builds < number_tasks:
367                         continue
368                 if active_builds > 0:
369                     result = os.waitpid(-1, 0)
370                     active_builds = active_builds - 1
371                     if result[1] != 0:
372                         bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (build_pids[result[0]], get_user_idstring(build_pids[result[0]])))
373                         raise bb.runqueue.TaskFailure(self.runq_fnid[build_pids[result[0]]], self.runq_task[build_pids[result[0]]])
374                     task_complete(self, build_pids[result[0]])
375                     del build_pids[result[0]]
376                     continue
377                 break
378         except SystemExit:
379             raise
380         except:
381             bb.error("Exception received")
382             if active_builds > 0:
383                 while active_builds > 0:
384                     bb.note("Waiting for %s active tasks to finish" % active_builds)
385                     tasknum = 1
386                     for k, v in build_pids.iteritems():
387                         bb.note("%s: %s (%s)" % (tasknum, get_user_idstring(v), k))
388                         tasknum = tasknum + 1
389                     result = os.waitpid(-1, 0)
390                     del build_pids[result[0]]               
391                     active_builds = active_builds - 1
392             if cooker.configuration.abort:
393                 sys.exit(1)
394             raise
395
396         # Sanity Checks
397         for task in range(len(self.runq_fnid)):
398             if runq_buildable[task] == 0:
399                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task)
400             if runq_running[task] == 0:
401                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task)
402             if runq_complete[task] == 0:
403                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task)
404
405         return 0
406
407     def dump_data(self, taskQueue):
408         """
409         Dump some debug information on the internal data structures
410         """
411         bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:")
412         for task in range(len(self.runq_fnid)):
413                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
414                         taskQueue.fn_index[self.runq_fnid[task]], 
415                         self.runq_task[task], 
416                         self.runq_weight[task], 
417                         self.runq_depends[task], 
418                         self.runq_revdeps[task]))
419
420         bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:")
421         for task1 in range(len(self.runq_fnid)):
422             if task1 in self.prio_map:
423                 task = self.prio_map[task1]
424                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
425                         taskQueue.fn_index[self.runq_fnid[task]], 
426                         self.runq_task[task], 
427                         self.runq_weight[task], 
428                         self.runq_depends[task], 
429                         self.runq_revdeps[task]))