Package SCons :: Module Job
[hide private]
[frames] | no frames]

Source Code for Module SCons.Job

  1  """SCons.Job 
  2   
  3  This module defines the Serial and Parallel classes that execute tasks to 
  4  complete a build. The Jobs class provides a higher level interface to start, 
  5  stop, and wait on jobs. 
  6   
  7  """ 
  8   
  9  # 
 10  # Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010 The SCons Foundation 
 11  # 
 12  # Permission is hereby granted, free of charge, to any person obtaining 
 13  # a copy of this software and associated documentation files (the 
 14  # "Software"), to deal in the Software without restriction, including 
 15  # without limitation the rights to use, copy, modify, merge, publish, 
 16  # distribute, sublicense, and/or sell copies of the Software, and to 
 17  # permit persons to whom the Software is furnished to do so, subject to 
 18  # the following conditions: 
 19  # 
 20  # The above copyright notice and this permission notice shall be included 
 21  # in all copies or substantial portions of the Software. 
 22  # 
 23  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY 
 24  # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE 
 25  # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 
 26  # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 
 27  # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 
 28  # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 
 29  # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 
 30  # 
 31   
 32  __revision__ = "src/engine/SCons/Job.py 5110 2010/07/25 16:14:38 bdeegan" 
 33   
 34  import os 
 35  import signal 
 36   
 37  import SCons.Errors 
 38   
 39  # The default stack size (in kilobytes) of the threads used to execute 
 40  # jobs in parallel. 
 41  # 
 42  # We use a stack size of 256 kilobytes. The default on some platforms 
 43  # is too large and prevents us from creating enough threads to fully 
 44  # parallelized the build. For example, the default stack size on linux 
 45  # is 8 MBytes. 
 46   
 47  explicit_stack_size = None 
 48  default_stack_size = 256 
 49   
 50  interrupt_msg = 'Build interrupted.' 
 51   
 52   
53 -class InterruptState:
54 - def __init__(self):
55 self.interrupted = False
56
57 - def set(self):
58 self.interrupted = True
59
60 - def __call__(self):
61 return self.interrupted
62 63
64 -class Jobs:
65 """An instance of this class initializes N jobs, and provides 66 methods for starting, stopping, and waiting on all N jobs. 67 """ 68
69 - def __init__(self, num, taskmaster):
70 """ 71 create 'num' jobs using the given taskmaster. 72 73 If 'num' is 1 or less, then a serial job will be used, 74 otherwise a parallel job with 'num' worker threads will 75 be used. 76 77 The 'num_jobs' attribute will be set to the actual number of jobs 78 allocated. If more than one job is requested but the Parallel 79 class can't do it, it gets reset to 1. Wrapping interfaces that 80 care should check the value of 'num_jobs' after initialization. 81 """ 82 83 self.job = None 84 if num > 1: 85 stack_size = explicit_stack_size 86 if stack_size is None: 87 stack_size = default_stack_size 88 89 try: 90 self.job = Parallel(taskmaster, num, stack_size) 91 self.num_jobs = num 92 except NameError: 93 pass 94 if self.job is None: 95 self.job = Serial(taskmaster) 96 self.num_jobs = 1
97
98 - def run(self, postfunc=lambda: None):
99 """Run the jobs. 100 101 postfunc() will be invoked after the jobs has run. It will be 102 invoked even if the jobs are interrupted by a keyboard 103 interrupt (well, in fact by a signal such as either SIGINT, 104 SIGTERM or SIGHUP). The execution of postfunc() is protected 105 against keyboard interrupts and is guaranteed to run to 106 completion.""" 107 self._setup_sig_handler() 108 try: 109 self.job.start() 110 finally: 111 postfunc() 112 self._reset_sig_handler()
113
114 - def were_interrupted(self):
115 """Returns whether the jobs were interrupted by a signal.""" 116 return self.job.interrupted()
117
118 - def _setup_sig_handler(self):
119 """Setup an interrupt handler so that SCons can shutdown cleanly in 120 various conditions: 121 122 a) SIGINT: Keyboard interrupt 123 b) SIGTERM: kill or system shutdown 124 c) SIGHUP: Controlling shell exiting 125 126 We handle all of these cases by stopping the taskmaster. It 127 turns out that it very difficult to stop the build process 128 by throwing asynchronously an exception such as 129 KeyboardInterrupt. For example, the python Condition 130 variables (threading.Condition) and Queue's do not seem to 131 asynchronous-exception-safe. It would require adding a whole 132 bunch of try/finally block and except KeyboardInterrupt all 133 over the place. 134 135 Note also that we have to be careful to handle the case when 136 SCons forks before executing another process. In that case, we 137 want the child to exit immediately. 138 """ 139 def handler(signum, stack, self=self, parentpid=os.getpid()): 140 if os.getpid() == parentpid: 141 self.job.taskmaster.stop() 142 self.job.interrupted.set() 143 else: 144 os._exit(2)
145 146 self.old_sigint = signal.signal(signal.SIGINT, handler) 147 self.old_sigterm = signal.signal(signal.SIGTERM, handler) 148 try: 149 self.old_sighup = signal.signal(signal.SIGHUP, handler) 150 except AttributeError: 151 pass
152
153 - def _reset_sig_handler(self):
154 """Restore the signal handlers to their previous state (before the 155 call to _setup_sig_handler().""" 156 157 signal.signal(signal.SIGINT, self.old_sigint) 158 signal.signal(signal.SIGTERM, self.old_sigterm) 159 try: 160 signal.signal(signal.SIGHUP, self.old_sighup) 161 except AttributeError: 162 pass
163
164 -class Serial:
165 """This class is used to execute tasks in series, and is more efficient 166 than Parallel, but is only appropriate for non-parallel builds. Only 167 one instance of this class should be in existence at a time. 168 169 This class is not thread safe. 170 """ 171
172 - def __init__(self, taskmaster):
173 """Create a new serial job given a taskmaster. 174 175 The taskmaster's next_task() method should return the next task 176 that needs to be executed, or None if there are no more tasks. The 177 taskmaster's executed() method will be called for each task when it 178 is successfully executed or failed() will be called if it failed to 179 execute (e.g. execute() raised an exception).""" 180 181 self.taskmaster = taskmaster 182 self.interrupted = InterruptState()
183
184 - def start(self):
185 """Start the job. This will begin pulling tasks from the taskmaster 186 and executing them, and return when there are no more tasks. If a task 187 fails to execute (i.e. execute() raises an exception), then the job will 188 stop.""" 189 190 while 1: 191 task = self.taskmaster.next_task() 192 193 if task is None: 194 break 195 196 try: 197 task.prepare() 198 if task.needs_execute(): 199 task.execute() 200 except: 201 if self.interrupted(): 202 try: 203 raise SCons.Errors.BuildError( 204 task.targets[0], errstr=interrupt_msg) 205 except: 206 task.exception_set() 207 else: 208 task.exception_set() 209 210 # Let the failed() callback function arrange for the 211 # build to stop if that's appropriate. 212 task.failed() 213 else: 214 task.executed() 215 216 task.postprocess() 217 self.taskmaster.cleanup()
218 219 220 # Trap import failure so that everything in the Job module but the 221 # Parallel class (and its dependent classes) will work if the interpreter 222 # doesn't support threads. 223 try: 224 import Queue 225 import threading 226 except ImportError: 227 pass 228 else:
229 - class Worker(threading.Thread):
230 """A worker thread waits on a task to be posted to its request queue, 231 dequeues the task, executes it, and posts a tuple including the task 232 and a boolean indicating whether the task executed successfully. """ 233
234 - def __init__(self, requestQueue, resultsQueue, interrupted):
235 threading.Thread.__init__(self) 236 self.setDaemon(1) 237 self.requestQueue = requestQueue 238 self.resultsQueue = resultsQueue 239 self.interrupted = interrupted 240 self.start()
241
242 - def run(self):
243 while 1: 244 task = self.requestQueue.get() 245 246 if task is None: 247 # The "None" value is used as a sentinel by 248 # ThreadPool.cleanup(). This indicates that there 249 # are no more tasks, so we should quit. 250 break 251 252 try: 253 if self.interrupted(): 254 raise SCons.Errors.BuildError( 255 task.targets[0], errstr=interrupt_msg) 256 task.execute() 257 except: 258 task.exception_set() 259 ok = False 260 else: 261 ok = True 262 263 self.resultsQueue.put((task, ok))
264
265 - class ThreadPool:
266 """This class is responsible for spawning and managing worker threads.""" 267
268 - def __init__(self, num, stack_size, interrupted):
269 """Create the request and reply queues, and 'num' worker threads. 270 271 One must specify the stack size of the worker threads. The 272 stack size is specified in kilobytes. 273 """ 274 self.requestQueue = Queue.Queue(0) 275 self.resultsQueue = Queue.Queue(0) 276 277 try: 278 prev_size = threading.stack_size(stack_size*1024) 279 except AttributeError, e: 280 # Only print a warning if the stack size has been 281 # explicitly set. 282 if not explicit_stack_size is None: 283 msg = "Setting stack size is unsupported by this version of Python:\n " + \ 284 e.args[0] 285 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) 286 except ValueError, e: 287 msg = "Setting stack size failed:\n " + str(e) 288 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) 289 290 # Create worker threads 291 self.workers = [] 292 for _ in range(num): 293 worker = Worker(self.requestQueue, self.resultsQueue, interrupted) 294 self.workers.append(worker) 295 296 # Once we drop Python 1.5 we can change the following to: 297 #if 'prev_size' in locals(): 298 if 'prev_size' in locals().keys(): 299 threading.stack_size(prev_size)
300
301 - def put(self, task):
302 """Put task into request queue.""" 303 self.requestQueue.put(task)
304
305 - def get(self):
306 """Remove and return a result tuple from the results queue.""" 307 return self.resultsQueue.get()
308
309 - def preparation_failed(self, task):
310 self.resultsQueue.put((task, False))
311
312 - def cleanup(self):
313 """ 314 Shuts down the thread pool, giving each worker thread a 315 chance to shut down gracefully. 316 """ 317 # For each worker thread, put a sentinel "None" value 318 # on the requestQueue (indicating that there's no work 319 # to be done) so that each worker thread will get one and 320 # terminate gracefully. 321 for _ in self.workers: 322 self.requestQueue.put(None) 323 324 # Wait for all of the workers to terminate. 325 # 326 # If we don't do this, later Python versions (2.4, 2.5) often 327 # seem to raise exceptions during shutdown. This happens 328 # in requestQueue.get(), as an assertion failure that 329 # requestQueue.not_full is notified while not acquired, 330 # seemingly because the main thread has shut down (or is 331 # in the process of doing so) while the workers are still 332 # trying to pull sentinels off the requestQueue. 333 # 334 # Normally these terminations should happen fairly quickly, 335 # but we'll stick a one-second timeout on here just in case 336 # someone gets hung. 337 for worker in self.workers: 338 worker.join(1.0) 339 self.workers = []
340
341 - class Parallel:
342 """This class is used to execute tasks in parallel, and is somewhat 343 less efficient than Serial, but is appropriate for parallel builds. 344 345 This class is thread safe. 346 """ 347
348 - def __init__(self, taskmaster, num, stack_size):
349 """Create a new parallel job given a taskmaster. 350 351 The taskmaster's next_task() method should return the next 352 task that needs to be executed, or None if there are no more 353 tasks. The taskmaster's executed() method will be called 354 for each task when it is successfully executed or failed() 355 will be called if the task failed to execute (i.e. execute() 356 raised an exception). 357 358 Note: calls to taskmaster are serialized, but calls to 359 execute() on distinct tasks are not serialized, because 360 that is the whole point of parallel jobs: they can execute 361 multiple tasks simultaneously. """ 362 363 self.taskmaster = taskmaster 364 self.interrupted = InterruptState() 365 self.tp = ThreadPool(num, stack_size, self.interrupted) 366 367 self.maxjobs = num
368
369 - def start(self):
370 """Start the job. This will begin pulling tasks from the 371 taskmaster and executing them, and return when there are no 372 more tasks. If a task fails to execute (i.e. execute() raises 373 an exception), then the job will stop.""" 374 375 jobs = 0 376 377 while 1: 378 # Start up as many available tasks as we're 379 # allowed to. 380 while jobs < self.maxjobs: 381 task = self.taskmaster.next_task() 382 if task is None: 383 break 384 385 try: 386 # prepare task for execution 387 task.prepare() 388 except: 389 task.exception_set() 390 task.failed() 391 task.postprocess() 392 else: 393 if task.needs_execute(): 394 # dispatch task 395 self.tp.put(task) 396 jobs = jobs + 1 397 else: 398 task.executed() 399 task.postprocess() 400 401 if not task and not jobs: break 402 403 # Let any/all completed tasks finish up before we go 404 # back and put the next batch of tasks on the queue. 405 while 1: 406 task, ok = self.tp.get() 407 jobs = jobs - 1 408 409 if ok: 410 task.executed() 411 else: 412 if self.interrupted(): 413 try: 414 raise SCons.Errors.BuildError( 415 task.targets[0], errstr=interrupt_msg) 416 except: 417 task.exception_set() 418 419 # Let the failed() callback function arrange 420 # for the build to stop if that's appropriate. 421 task.failed() 422 423 task.postprocess() 424 425 if self.tp.resultsQueue.empty(): 426 break 427 428 self.tp.cleanup() 429 self.taskmaster.cleanup()
430 431 # Local Variables: 432 # tab-width:4 433 # indent-tabs-mode:nil 434 # End: 435 # vim: set expandtab tabstop=4 shiftwidth=4: 436