1 """SCons.Job
2
3 This module defines the Serial and Parallel classes that execute tasks to
4 complete a build. The Jobs class provides a higher level interface to start,
5 stop, and wait on jobs.
6
7 """
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 __revision__ = "src/engine/SCons/Job.py 3603 2008/10/10 05:46:45 scons"
33
34 import os
35 import signal
36
37 import SCons.Errors
38
39
40
41
42
43
44
45
46
47 explicit_stack_size = None
48 default_stack_size = 256
49
50 interrupt_msg = 'Build interrupted.'
51
52
55 self.interrupted = False
56
58 self.interrupted = True
59
61 return self.interrupted
62
63
65 """An instance of this class initializes N jobs, and provides
66 methods for starting, stopping, and waiting on all N jobs.
67 """
68
70 """
71 create 'num' jobs using the given taskmaster.
72
73 If 'num' is 1 or less, then a serial job will be used,
74 otherwise a parallel job with 'num' worker threads will
75 be used.
76
77 The 'num_jobs' attribute will be set to the actual number of jobs
78 allocated. If more than one job is requested but the Parallel
79 class can't do it, it gets reset to 1. Wrapping interfaces that
80 care should check the value of 'num_jobs' after initialization.
81 """
82
83 self.job = None
84 if num > 1:
85 stack_size = explicit_stack_size
86 if stack_size is None:
87 stack_size = default_stack_size
88
89 try:
90 self.job = Parallel(taskmaster, num, stack_size)
91 self.num_jobs = num
92 except NameError:
93 pass
94 if self.job is None:
95 self.job = Serial(taskmaster)
96 self.num_jobs = 1
97
98 - def run(self, postfunc=lambda: None):
99 """Run the jobs.
100
101 postfunc() will be invoked after the jobs has run. It will be
102 invoked even if the jobs are interrupted by a keyboard
103 interrupt (well, in fact by a signal such as either SIGINT,
104 SIGTERM or SIGHUP). The execution of postfunc() is protected
105 against keyboard interrupts and is guaranteed to run to
106 completion."""
107 self._setup_sig_handler()
108 try:
109 self.job.start()
110 finally:
111 postfunc()
112 self._reset_sig_handler()
113
115 """Returns whether the jobs were interrupted by a signal."""
116 return self.job.interrupted()
117
119 """Setup an interrupt handler so that SCons can shutdown cleanly in
120 various conditions:
121
122 a) SIGINT: Keyboard interrupt
123 b) SIGTERM: kill or system shutdown
124 c) SIGHUP: Controlling shell exiting
125
126 We handle all of these cases by stopping the taskmaster. It
127 turns out that it very difficult to stop the build process
128 by throwing asynchronously an exception such as
129 KeyboardInterrupt. For example, the python Condition
130 variables (threading.Condition) and Queue's do not seem to
131 asynchronous-exception-safe. It would require adding a whole
132 bunch of try/finally block and except KeyboardInterrupt all
133 over the place.
134
135 Note also that we have to be careful to handle the case when
136 SCons forks before executing another process. In that case, we
137 want the child to exit immediately.
138 """
139 def handler(signum, stack, self=self, parentpid=os.getpid()):
140 if os.getpid() == parentpid:
141 self.job.taskmaster.stop()
142 self.job.interrupted.set()
143 else:
144 os._exit(2)
145
146 self.old_sigint = signal.signal(signal.SIGINT, handler)
147 self.old_sigterm = signal.signal(signal.SIGTERM, handler)
148 try:
149 self.old_sighup = signal.signal(signal.SIGHUP, handler)
150 except AttributeError:
151 pass
152
154 """Restore the signal handlers to their previous state (before the
155 call to _setup_sig_handler()."""
156
157 signal.signal(signal.SIGINT, self.old_sigint)
158 signal.signal(signal.SIGTERM, self.old_sigterm)
159 try:
160 signal.signal(signal.SIGHUP, self.old_sighup)
161 except AttributeError:
162 pass
163
165 """This class is used to execute tasks in series, and is more efficient
166 than Parallel, but is only appropriate for non-parallel builds. Only
167 one instance of this class should be in existence at a time.
168
169 This class is not thread safe.
170 """
171
173 """Create a new serial job given a taskmaster.
174
175 The taskmaster's next_task() method should return the next task
176 that needs to be executed, or None if there are no more tasks. The
177 taskmaster's executed() method will be called for each task when it
178 is successfully executed or failed() will be called if it failed to
179 execute (e.g. execute() raised an exception)."""
180
181 self.taskmaster = taskmaster
182 self.interrupted = InterruptState()
183
185 """Start the job. This will begin pulling tasks from the taskmaster
186 and executing them, and return when there are no more tasks. If a task
187 fails to execute (i.e. execute() raises an exception), then the job will
188 stop."""
189
190 while 1:
191 task = self.taskmaster.next_task()
192
193 if task is None:
194 break
195
196 try:
197 task.prepare()
198 if task.needs_execute():
199 task.execute()
200 except:
201 if self.interrupted():
202 try:
203 raise SCons.Errors.BuildError(
204 task.targets[0], errstr=interrupt_msg)
205 except:
206 task.exception_set()
207 else:
208 task.exception_set()
209
210
211
212 task.failed()
213 else:
214 task.executed()
215
216 task.postprocess()
217 self.taskmaster.cleanup()
218
219
220
221
222
223 try:
224 import Queue
225 import threading
226 except ImportError:
227 pass
228 else:
229 - class Worker(threading.Thread):
230 """A worker thread waits on a task to be posted to its request queue,
231 dequeues the task, executes it, and posts a tuple including the task
232 and a boolean indicating whether the task executed successfully. """
233
234 - def __init__(self, requestQueue, resultsQueue, interrupted):
235 threading.Thread.__init__(self)
236 self.setDaemon(1)
237 self.requestQueue = requestQueue
238 self.resultsQueue = resultsQueue
239 self.interrupted = interrupted
240 self.start()
241
243 while 1:
244 task = self.requestQueue.get()
245
246 if not task:
247
248
249
250 break
251
252 try:
253 if self.interrupted():
254 raise SCons.Errors.BuildError(
255 task.targets[0], errstr=interrupt_msg)
256 task.execute()
257 except:
258 task.exception_set()
259 ok = False
260 else:
261 ok = True
262
263 self.resultsQueue.put((task, ok))
264
266 """This class is responsible for spawning and managing worker threads."""
267
268 - def __init__(self, num, stack_size, interrupted):
269 """Create the request and reply queues, and 'num' worker threads.
270
271 One must specify the stack size of the worker threads. The
272 stack size is specified in kilobytes.
273 """
274 self.requestQueue = Queue.Queue(0)
275 self.resultsQueue = Queue.Queue(0)
276
277 try:
278 prev_size = threading.stack_size(stack_size*1024)
279 except AttributeError, e:
280
281
282 if not explicit_stack_size is None:
283 msg = "Setting stack size is unsupported by this version of Python:\n " + \
284 e.args[0]
285 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
286 except ValueError, e:
287 msg = "Setting stack size failed:\n " + \
288 e.message
289 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
290
291
292 self.workers = []
293 for _ in range(num):
294 worker = Worker(self.requestQueue, self.resultsQueue, interrupted)
295 self.workers.append(worker)
296
297
298
299 if 'prev_size' in locals().keys():
300 threading.stack_size(prev_size)
301
302 - def put(self, task):
303 """Put task into request queue."""
304 self.requestQueue.put(task)
305
307 """Remove and return a result tuple from the results queue."""
308 return self.resultsQueue.get()
309
311 self.resultsQueue.put((task, False))
312
314 """
315 Shuts down the thread pool, giving each worker thread a
316 chance to shut down gracefully.
317 """
318
319
320
321
322 for _ in self.workers:
323 self.requestQueue.put(None)
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338 for worker in self.workers:
339 worker.join(1.0)
340 self.workers = []
341
343 """This class is used to execute tasks in parallel, and is somewhat
344 less efficient than Serial, but is appropriate for parallel builds.
345
346 This class is thread safe.
347 """
348
349 - def __init__(self, taskmaster, num, stack_size):
350 """Create a new parallel job given a taskmaster.
351
352 The taskmaster's next_task() method should return the next
353 task that needs to be executed, or None if there are no more
354 tasks. The taskmaster's executed() method will be called
355 for each task when it is successfully executed or failed()
356 will be called if the task failed to execute (i.e. execute()
357 raised an exception).
358
359 Note: calls to taskmaster are serialized, but calls to
360 execute() on distinct tasks are not serialized, because
361 that is the whole point of parallel jobs: they can execute
362 multiple tasks simultaneously. """
363
364 self.taskmaster = taskmaster
365 self.interrupted = InterruptState()
366 self.tp = ThreadPool(num, stack_size, self.interrupted)
367
368 self.maxjobs = num
369
371 """Start the job. This will begin pulling tasks from the
372 taskmaster and executing them, and return when there are no
373 more tasks. If a task fails to execute (i.e. execute() raises
374 an exception), then the job will stop."""
375
376 jobs = 0
377
378 while 1:
379
380
381 while jobs < self.maxjobs:
382 task = self.taskmaster.next_task()
383 if task is None:
384 break
385
386 try:
387
388 task.prepare()
389 except:
390 task.exception_set()
391 task.failed()
392 task.postprocess()
393 else:
394 if task.needs_execute():
395
396 self.tp.put(task)
397 jobs = jobs + 1
398 else:
399 task.executed()
400 task.postprocess()
401
402 if not task and not jobs: break
403
404
405
406 while 1:
407 task, ok = self.tp.get()
408 jobs = jobs - 1
409
410 if ok:
411 task.executed()
412 else:
413 if self.interrupted():
414 try:
415 raise SCons.Errors.BuildError(
416 task.targets[0], errstr=interrupt_msg)
417 except:
418 task.exception_set()
419
420
421
422 task.failed()
423
424 task.postprocess()
425
426 if self.tp.resultsQueue.empty():
427 break
428
429 self.tp.cleanup()
430 self.taskmaster.cleanup()
431