1 """SCons.Job
2
3 This module defines the Serial and Parallel classes that execute tasks to
4 complete a build. The Jobs class provides a higher level interface to start,
5 stop, and wait on jobs.
6
7 """
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 __revision__ = "src/engine/SCons/Job.py 3266 2008/08/12 07:31:01 knight"
33
34 import SCons.compat
35
36 import os
37 import signal
38
39
40
41
42
43
44
45
46
47
48 default_stack_size = 256
49
50 interrupt_msg = 'Build interrupted.'
51
52
55 self.interrupted = False
56
58 self.interrupted = True
59
61 return self.interrupted
62
63
65 """An instance of this class initializes N jobs, and provides
66 methods for starting, stopping, and waiting on all N jobs.
67 """
68
70 """
71 create 'num' jobs using the given taskmaster.
72
73 If 'num' is 1 or less, then a serial job will be used,
74 otherwise a parallel job with 'num' worker threads will
75 be used.
76
77 The 'num_jobs' attribute will be set to the actual number of jobs
78 allocated. If more than one job is requested but the Parallel
79 class can't do it, it gets reset to 1. Wrapping interfaces that
80 care should check the value of 'num_jobs' after initialization.
81 """
82
83 self.job = None
84 if num > 1:
85 try:
86 stack_size = SCons.Job.stack_size
87 except AttributeError:
88 stack_size = default_stack_size
89
90 try:
91 self.job = Parallel(taskmaster, num, stack_size)
92 self.num_jobs = num
93 except NameError:
94 pass
95 if self.job is None:
96 self.job = Serial(taskmaster)
97 self.num_jobs = 1
98
99 - def run(self, postfunc=lambda: None):
100 """Run the jobs.
101
102 postfunc() will be invoked after the jobs has run. It will be
103 invoked even if the jobs are interrupted by a keyboard
104 interrupt (well, in fact by a signal such as either SIGINT,
105 SIGTERM or SIGHUP). The execution of postfunc() is protected
106 against keyboard interrupts and is guaranteed to run to
107 completion."""
108 self._setup_sig_handler()
109 try:
110 self.job.start()
111 finally:
112 postfunc()
113 self._reset_sig_handler()
114
116 """Returns whether the jobs were interrupted by a signal."""
117 return self.job.interrupted()
118
120 """Setup an interrupt handler so that SCons can shutdown cleanly in
121 various conditions:
122
123 a) SIGINT: Keyboard interrupt
124 b) SIGTERM: kill or system shutdown
125 c) SIGHUP: Controlling shell exiting
126
127 We handle all of these cases by stopping the taskmaster. It
128 turns out that it very difficult to stop the build process
129 by throwing asynchronously an exception such as
130 KeyboardInterrupt. For example, the python Condition
131 variables (threading.Condition) and Queue's do not seem to
132 asynchronous-exception-safe. It would require adding a whole
133 bunch of try/finally block and except KeyboardInterrupt all
134 over the place.
135
136 Note also that we have to be careful to handle the case when
137 SCons forks before executing another process. In that case, we
138 want the child to exit immediately.
139 """
140 def handler(signum, stack, self=self, parentpid=os.getpid()):
141 if os.getpid() == parentpid:
142 self.job.taskmaster.stop()
143 self.job.interrupted.set()
144 else:
145 os._exit(2)
146
147 self.old_sigint = signal.signal(signal.SIGINT, handler)
148 self.old_sigterm = signal.signal(signal.SIGTERM, handler)
149 try:
150 self.old_sighup = signal.signal(signal.SIGHUP, handler)
151 except AttributeError:
152 pass
153
155 """Restore the signal handlers to their previous state (before the
156 call to _setup_sig_handler()."""
157
158 signal.signal(signal.SIGINT, self.old_sigint)
159 signal.signal(signal.SIGTERM, self.old_sigterm)
160 try:
161 signal.signal(signal.SIGHUP, self.old_sighup)
162 except AttributeError:
163 pass
164
166 """This class is used to execute tasks in series, and is more efficient
167 than Parallel, but is only appropriate for non-parallel builds. Only
168 one instance of this class should be in existence at a time.
169
170 This class is not thread safe.
171 """
172
174 """Create a new serial job given a taskmaster.
175
176 The taskmaster's next_task() method should return the next task
177 that needs to be executed, or None if there are no more tasks. The
178 taskmaster's executed() method will be called for each task when it
179 is successfully executed or failed() will be called if it failed to
180 execute (e.g. execute() raised an exception)."""
181
182 self.taskmaster = taskmaster
183 self.interrupted = InterruptState()
184
186 """Start the job. This will begin pulling tasks from the taskmaster
187 and executing them, and return when there are no more tasks. If a task
188 fails to execute (i.e. execute() raises an exception), then the job will
189 stop."""
190
191 while 1:
192 task = self.taskmaster.next_task()
193
194 if task is None:
195 break
196
197 try:
198 task.prepare()
199 if task.needs_execute():
200 task.execute()
201 except:
202 if self.interrupted():
203 try:
204 raise SCons.Errors.BuildError(
205 task.targets[0], errstr=interrupt_msg)
206 except:
207 task.exception_set()
208 else:
209 task.exception_set()
210
211
212
213 task.failed()
214 else:
215 task.executed()
216
217 task.postprocess()
218 self.taskmaster.cleanup()
219
220
221
222
223
224 try:
225 import Queue
226 import threading
227 except ImportError:
228 pass
229 else:
230 - class Worker(threading.Thread):
231 """A worker thread waits on a task to be posted to its request queue,
232 dequeues the task, executes it, and posts a tuple including the task
233 and a boolean indicating whether the task executed successfully. """
234
235 - def __init__(self, requestQueue, resultsQueue, interrupted):
236 threading.Thread.__init__(self)
237 self.setDaemon(1)
238 self.requestQueue = requestQueue
239 self.resultsQueue = resultsQueue
240 self.interrupted = interrupted
241 self.start()
242
244 while 1:
245 task = self.requestQueue.get()
246
247 if not task:
248
249
250
251 break
252
253 try:
254 if self.interrupted():
255 raise SCons.Errors.BuildError(
256 task.targets[0], errstr=interrupt_msg)
257 task.execute()
258 except:
259 task.exception_set()
260 ok = False
261 else:
262 ok = True
263
264 self.resultsQueue.put((task, ok))
265
267 """This class is responsible for spawning and managing worker threads."""
268
269 - def __init__(self, num, stack_size, interrupted):
270 """Create the request and reply queues, and 'num' worker threads.
271
272 One must specify the stack size of the worker threads. The
273 stack size is specified in kilobytes.
274 """
275 self.requestQueue = Queue.Queue(0)
276 self.resultsQueue = Queue.Queue(0)
277
278 try:
279 prev_size = threading.stack_size(stack_size*1024)
280 except AttributeError, e:
281
282
283 if hasattr(SCons.Job, 'stack_size'):
284 msg = "Setting stack size is unsupported by this version of Python:\n " + \
285 e.args[0]
286 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
287 except ValueError, e:
288 msg = "Setting stack size failed:\n " + \
289 e.message
290 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
291
292
293 self.workers = []
294 for _ in range(num):
295 worker = Worker(self.requestQueue, self.resultsQueue, interrupted)
296 self.workers.append(worker)
297
298
299
300 if 'prev_size' in locals().keys():
301 threading.stack_size(prev_size)
302
303 - def put(self, task):
304 """Put task into request queue."""
305 self.requestQueue.put(task)
306
308 """Remove and return a result tuple from the results queue."""
309 return self.resultsQueue.get()
310
312 self.resultsQueue.put((task, False))
313
315 """
316 Shuts down the thread pool, giving each worker thread a
317 chance to shut down gracefully.
318 """
319
320
321
322
323 for _ in self.workers:
324 self.requestQueue.put(None)
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339 for worker in self.workers:
340 worker.join(1.0)
341 self.workers = []
342
344 """This class is used to execute tasks in parallel, and is somewhat
345 less efficient than Serial, but is appropriate for parallel builds.
346
347 This class is thread safe.
348 """
349
350 - def __init__(self, taskmaster, num, stack_size):
351 """Create a new parallel job given a taskmaster.
352
353 The taskmaster's next_task() method should return the next
354 task that needs to be executed, or None if there are no more
355 tasks. The taskmaster's executed() method will be called
356 for each task when it is successfully executed or failed()
357 will be called if the task failed to execute (i.e. execute()
358 raised an exception).
359
360 Note: calls to taskmaster are serialized, but calls to
361 execute() on distinct tasks are not serialized, because
362 that is the whole point of parallel jobs: they can execute
363 multiple tasks simultaneously. """
364
365 self.taskmaster = taskmaster
366 self.interrupted = InterruptState()
367 self.tp = ThreadPool(num, stack_size, self.interrupted)
368
369 self.maxjobs = num
370
372 """Start the job. This will begin pulling tasks from the
373 taskmaster and executing them, and return when there are no
374 more tasks. If a task fails to execute (i.e. execute() raises
375 an exception), then the job will stop."""
376
377 jobs = 0
378
379 while 1:
380
381
382 while jobs < self.maxjobs:
383 task = self.taskmaster.next_task()
384 if task is None:
385 break
386
387 try:
388
389 task.prepare()
390 except:
391 task.exception_set()
392 task.failed()
393 task.postprocess()
394 else:
395 if task.needs_execute():
396
397 self.tp.put(task)
398 jobs = jobs + 1
399 else:
400 task.executed()
401 task.postprocess()
402
403 if not task and not jobs: break
404
405
406
407 while 1:
408 task, ok = self.tp.get()
409 jobs = jobs - 1
410
411 if ok:
412 task.executed()
413 else:
414 if self.interrupted():
415 try:
416 raise SCons.Errors.BuildError(
417 task.targets[0], errstr=interrupt_msg)
418 except:
419 task.exception_set()
420
421
422
423 task.failed()
424
425 task.postprocess()
426
427 if self.tp.resultsQueue.empty():
428 break
429
430 self.tp.cleanup()
431 self.taskmaster.cleanup()
432