Package SCons :: Module Job
[hide private]
[frames] | no frames]

Source Code for Module SCons.Job

  1  """SCons.Job 
  2   
  3  This module defines the Serial and Parallel classes that execute tasks to 
  4  complete a build. The Jobs class provides a higher level interface to start, 
  5  stop, and wait on jobs. 
  6   
  7  """ 
  8   
  9  # 
 10  # Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008 The SCons Foundation 
 11  # 
 12  # Permission is hereby granted, free of charge, to any person obtaining 
 13  # a copy of this software and associated documentation files (the 
 14  # "Software"), to deal in the Software without restriction, including 
 15  # without limitation the rights to use, copy, modify, merge, publish, 
 16  # distribute, sublicense, and/or sell copies of the Software, and to 
 17  # permit persons to whom the Software is furnished to do so, subject to 
 18  # the following conditions: 
 19  # 
 20  # The above copyright notice and this permission notice shall be included 
 21  # in all copies or substantial portions of the Software. 
 22  # 
 23  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY 
 24  # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE 
 25  # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 
 26  # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 
 27  # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 
 28  # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 
 29  # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 
 30  # 
 31   
 32  __revision__ = "src/engine/SCons/Job.py 2928 2008/04/29 22:44:09 knight" 
 33   
 34  import SCons.compat 
 35   
 36  import os 
 37  import signal 
 38   
 39   
 40  # The default stack size (in kilobytes) of the threads used to execute 
 41  # jobs in parallel. 
 42  # 
 43  # We use a stack size of 256 kilobytes. The default on some platforms 
 44  # is too large and prevents us from creating enough threads to fully 
 45  # parallelized the build. For example, the default stack size on linux 
 46  # is 8 MBytes. 
 47   
 48  default_stack_size = 256 
 49   
 50  interrupt_msg = 'Build interrupted.' 
 51   
 52   
53 -class InterruptState:
54 - def __init__(self):
55 self.interrupted = False
56
57 - def set(self):
58 self.interrupted = True
59
60 - def __call__(self):
61 return self.interrupted
62 63
64 -class Jobs:
65 """An instance of this class initializes N jobs, and provides 66 methods for starting, stopping, and waiting on all N jobs. 67 """ 68
69 - def __init__(self, num, taskmaster):
70 """ 71 create 'num' jobs using the given taskmaster. 72 73 If 'num' is 1 or less, then a serial job will be used, 74 otherwise a parallel job with 'num' worker threads will 75 be used. 76 77 The 'num_jobs' attribute will be set to the actual number of jobs 78 allocated. If more than one job is requested but the Parallel 79 class can't do it, it gets reset to 1. Wrapping interfaces that 80 care should check the value of 'num_jobs' after initialization. 81 """ 82 83 self.job = None 84 if num > 1: 85 try: 86 stack_size = SCons.Job.stack_size 87 except AttributeError: 88 stack_size = default_stack_size 89 90 try: 91 self.job = Parallel(taskmaster, num, stack_size) 92 self.num_jobs = num 93 except NameError: 94 pass 95 if self.job is None: 96 self.job = Serial(taskmaster) 97 self.num_jobs = 1
98
99 - def run(self, postfunc=lambda: None):
100 """Run the jobs. 101 102 postfunc() will be invoked after the jobs has run. It will be 103 invoked even if the jobs are interrupted by a keyboard 104 interrupt (well, in fact by a signal such as either SIGINT, 105 SIGTERM or SIGHUP). The execution of postfunc() is protected 106 against keyboard interrupts and is guaranteed to run to 107 completion.""" 108 self._setup_sig_handler() 109 try: 110 self.job.start() 111 finally: 112 postfunc() 113 self._reset_sig_handler()
114
115 - def were_interrupted(self):
116 """Returns whether the jobs were interrupted by a signal.""" 117 return self.job.interrupted()
118
119 - def _setup_sig_handler(self):
120 """Setup an interrupt handler so that SCons can shutdown cleanly in 121 various conditions: 122 123 a) SIGINT: Keyboard interrupt 124 b) SIGTERM: kill or system shutdown 125 c) SIGHUP: Controlling shell exiting 126 127 We handle all of these cases by stopping the taskmaster. It 128 turns out that it very difficult to stop the build process 129 by throwing asynchronously an exception such as 130 KeyboardInterrupt. For example, the python Condition 131 variables (threading.Condition) and Queue's do not seem to 132 asynchronous-exception-safe. It would require adding a whole 133 bunch of try/finally block and except KeyboardInterrupt all 134 over the place. 135 136 Note also that we have to be careful to handle the case when 137 SCons forks before executing another process. In that case, we 138 want the child to exit immediately. 139 """ 140 def handler(signum, stack, self=self, parentpid=os.getpid()): 141 if os.getpid() == parentpid: 142 self.job.taskmaster.stop() 143 self.job.interrupted.set() 144 else: 145 os._exit(2)
146 147 self.old_sigint = signal.signal(signal.SIGINT, handler) 148 self.old_sigterm = signal.signal(signal.SIGTERM, handler) 149 try: 150 self.old_sighup = signal.signal(signal.SIGHUP, handler) 151 except AttributeError: 152 pass
153
154 - def _reset_sig_handler(self):
155 """Restore the signal handlers to their previous state (before the 156 call to _setup_sig_handler().""" 157 158 signal.signal(signal.SIGINT, self.old_sigint) 159 signal.signal(signal.SIGTERM, self.old_sigterm) 160 try: 161 signal.signal(signal.SIGHUP, self.old_sighup) 162 except AttributeError: 163 pass
164
165 -class Serial:
166 """This class is used to execute tasks in series, and is more efficient 167 than Parallel, but is only appropriate for non-parallel builds. Only 168 one instance of this class should be in existence at a time. 169 170 This class is not thread safe. 171 """ 172
173 - def __init__(self, taskmaster):
174 """Create a new serial job given a taskmaster. 175 176 The taskmaster's next_task() method should return the next task 177 that needs to be executed, or None if there are no more tasks. The 178 taskmaster's executed() method will be called for each task when it 179 is successfully executed or failed() will be called if it failed to 180 execute (e.g. execute() raised an exception).""" 181 182 self.taskmaster = taskmaster 183 self.interrupted = InterruptState()
184
185 - def start(self):
186 """Start the job. This will begin pulling tasks from the taskmaster 187 and executing them, and return when there are no more tasks. If a task 188 fails to execute (i.e. execute() raises an exception), then the job will 189 stop.""" 190 191 while 1: 192 task = self.taskmaster.next_task() 193 194 if task is None: 195 break 196 197 try: 198 task.prepare() 199 if task.needs_execute(): 200 task.execute() 201 except: 202 if self.interrupted(): 203 try: 204 raise SCons.Errors.BuildError( 205 task.targets[0], errstr=interrupt_msg) 206 except: 207 task.exception_set() 208 else: 209 task.exception_set() 210 211 # Let the failed() callback function arrange for the 212 # build to stop if that's appropriate. 213 task.failed() 214 else: 215 task.executed() 216 217 task.postprocess() 218 self.taskmaster.cleanup()
219 220 221 # Trap import failure so that everything in the Job module but the 222 # Parallel class (and its dependent classes) will work if the interpreter 223 # doesn't support threads. 224 try: 225 import Queue 226 import threading 227 except ImportError: 228 pass 229 else:
230 - class Worker(threading.Thread):
231 """A worker thread waits on a task to be posted to its request queue, 232 dequeues the task, executes it, and posts a tuple including the task 233 and a boolean indicating whether the task executed successfully. """ 234
235 - def __init__(self, requestQueue, resultsQueue, interrupted):
236 threading.Thread.__init__(self) 237 self.setDaemon(1) 238 self.requestQueue = requestQueue 239 self.resultsQueue = resultsQueue 240 self.interrupted = interrupted 241 self.start()
242
243 - def run(self):
244 while 1: 245 task = self.requestQueue.get() 246 247 if not task: 248 # The "None" value is used as a sentinel by 249 # ThreadPool.cleanup(). This indicates that there 250 # are no more tasks, so we should quit. 251 break 252 253 try: 254 if self.interrupted(): 255 raise SCons.Errors.BuildError( 256 task.targets[0], errstr=interrupt_msg) 257 task.execute() 258 except: 259 task.exception_set() 260 ok = False 261 else: 262 ok = True 263 264 self.resultsQueue.put((task, ok))
265
266 - class ThreadPool:
267 """This class is responsible for spawning and managing worker threads.""" 268
269 - def __init__(self, num, stack_size, interrupted):
270 """Create the request and reply queues, and 'num' worker threads. 271 272 One must specify the stack size of the worker threads. The 273 stack size is specified in kilobytes. 274 """ 275 self.requestQueue = Queue.Queue(0) 276 self.resultsQueue = Queue.Queue(0) 277 278 try: 279 prev_size = threading.stack_size(stack_size*1024) 280 except AttributeError, e: 281 # Only print a warning if the stack size has been 282 # explicitely set. 283 if hasattr(SCons.Job, 'stack_size'): 284 msg = "Setting stack size is unsupported by this version of Python:\n " + \ 285 e.args[0] 286 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) 287 except ValueError, e: 288 msg = "Setting stack size failed:\n " + \ 289 e.message 290 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) 291 292 # Create worker threads 293 self.workers = [] 294 for _ in range(num): 295 worker = Worker(self.requestQueue, self.resultsQueue, interrupted) 296 self.workers.append(worker) 297 298 # Once we drop Python 1.5 we can change the following to: 299 #if 'prev_size' in locals(): 300 if 'prev_size' in locals().keys(): 301 threading.stack_size(prev_size)
302
303 - def put(self, task):
304 """Put task into request queue.""" 305 self.requestQueue.put(task)
306
307 - def get(self):
308 """Remove and return a result tuple from the results queue.""" 309 return self.resultsQueue.get()
310
311 - def preparation_failed(self, task):
312 self.resultsQueue.put((task, False))
313
314 - def cleanup(self):
315 """ 316 Shuts down the thread pool, giving each worker thread a 317 chance to shut down gracefully. 318 """ 319 # For each worker thread, put a sentinel "None" value 320 # on the requestQueue (indicating that there's no work 321 # to be done) so that each worker thread will get one and 322 # terminate gracefully. 323 for _ in self.workers: 324 self.requestQueue.put(None) 325 326 # Wait for all of the workers to terminate. 327 # 328 # If we don't do this, later Python versions (2.4, 2.5) often 329 # seem to raise exceptions during shutdown. This happens 330 # in requestQueue.get(), as an assertion failure that 331 # requestQueue.not_full is notified while not acquired, 332 # seemingly because the main thread has shut down (or is 333 # in the process of doing so) while the workers are still 334 # trying to pull sentinels off the requestQueue. 335 # 336 # Normally these terminations should happen fairly quickly, 337 # but we'll stick a one-second timeout on here just in case 338 # someone gets hung. 339 for worker in self.workers: 340 worker.join(1.0) 341 self.workers = []
342
343 - class Parallel:
344 """This class is used to execute tasks in parallel, and is somewhat 345 less efficient than Serial, but is appropriate for parallel builds. 346 347 This class is thread safe. 348 """ 349
350 - def __init__(self, taskmaster, num, stack_size):
351 """Create a new parallel job given a taskmaster. 352 353 The taskmaster's next_task() method should return the next 354 task that needs to be executed, or None if there are no more 355 tasks. The taskmaster's executed() method will be called 356 for each task when it is successfully executed or failed() 357 will be called if the task failed to execute (i.e. execute() 358 raised an exception). 359 360 Note: calls to taskmaster are serialized, but calls to 361 execute() on distinct tasks are not serialized, because 362 that is the whole point of parallel jobs: they can execute 363 multiple tasks simultaneously. """ 364 365 self.taskmaster = taskmaster 366 self.interrupted = InterruptState() 367 self.tp = ThreadPool(num, stack_size, self.interrupted) 368 369 self.maxjobs = num
370
371 - def start(self):
372 """Start the job. This will begin pulling tasks from the 373 taskmaster and executing them, and return when there are no 374 more tasks. If a task fails to execute (i.e. execute() raises 375 an exception), then the job will stop.""" 376 377 jobs = 0 378 379 while 1: 380 # Start up as many available tasks as we're 381 # allowed to. 382 while jobs < self.maxjobs: 383 task = self.taskmaster.next_task() 384 if task is None: 385 break 386 387 try: 388 # prepare task for execution 389 task.prepare() 390 except: 391 task.exception_set() 392 task.failed() 393 task.postprocess() 394 else: 395 if task.needs_execute(): 396 # dispatch task 397 self.tp.put(task) 398 jobs = jobs + 1 399 else: 400 task.executed() 401 task.postprocess() 402 403 if not task and not jobs: break 404 405 # Let any/all completed tasks finish up before we go 406 # back and put the next batch of tasks on the queue. 407 while 1: 408 task, ok = self.tp.get() 409 jobs = jobs - 1 410 411 if ok: 412 task.executed() 413 else: 414 if self.interrupted(): 415 try: 416 raise SCons.Errors.BuildError( 417 task.targets[0], errstr=interrupt_msg) 418 except: 419 task.exception_set() 420 421 # Let the failed() callback function arrange 422 # for the build to stop if that's appropriate. 423 task.failed() 424 425 task.postprocess() 426 427 if self.tp.resultsQueue.empty(): 428 break 429 430 self.tp.cleanup() 431 self.taskmaster.cleanup()
432