Package SCons :: Module Job
[hide private]
[frames] | no frames]

Source Code for Module SCons.Job

  1  """SCons.Job 
  2   
  3  This module defines the Serial and Parallel classes that execute tasks to 
  4  complete a build. The Jobs class provides a higher level interface to start, 
  5  stop, and wait on jobs. 
  6   
  7  """ 
  8   
  9  # 
 10  # Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 The SCons Foundation 
 11  # 
 12  # Permission is hereby granted, free of charge, to any person obtaining 
 13  # a copy of this software and associated documentation files (the 
 14  # "Software"), to deal in the Software without restriction, including 
 15  # without limitation the rights to use, copy, modify, merge, publish, 
 16  # distribute, sublicense, and/or sell copies of the Software, and to 
 17  # permit persons to whom the Software is furnished to do so, subject to 
 18  # the following conditions: 
 19  # 
 20  # The above copyright notice and this permission notice shall be included 
 21  # in all copies or substantial portions of the Software. 
 22  # 
 23  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY 
 24  # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE 
 25  # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 
 26  # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 
 27  # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 
 28  # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 
 29  # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 
 30  # 
 31   
 32  __revision__ = "src/engine/SCons/Job.py  2013/03/03 09:48:35 garyo" 
 33   
 34  import SCons.compat 
 35   
 36  import os 
 37  import signal 
 38   
 39  import SCons.Errors 
 40   
 41  # The default stack size (in kilobytes) of the threads used to execute 
 42  # jobs in parallel. 
 43  # 
 44  # We use a stack size of 256 kilobytes. The default on some platforms 
 45  # is too large and prevents us from creating enough threads to fully 
 46  # parallelized the build. For example, the default stack size on linux 
 47  # is 8 MBytes. 
 48   
 49  explicit_stack_size = None 
 50  default_stack_size = 256 
 51   
 52  interrupt_msg = 'Build interrupted.' 
 53   
 54   
55 -class InterruptState(object):
56 - def __init__(self):
57 self.interrupted = False
58
59 - def set(self):
60 self.interrupted = True
61
62 - def __call__(self):
63 return self.interrupted
64 65
66 -class Jobs(object):
67 """An instance of this class initializes N jobs, and provides 68 methods for starting, stopping, and waiting on all N jobs. 69 """ 70
71 - def __init__(self, num, taskmaster):
72 """ 73 create 'num' jobs using the given taskmaster. 74 75 If 'num' is 1 or less, then a serial job will be used, 76 otherwise a parallel job with 'num' worker threads will 77 be used. 78 79 The 'num_jobs' attribute will be set to the actual number of jobs 80 allocated. If more than one job is requested but the Parallel 81 class can't do it, it gets reset to 1. Wrapping interfaces that 82 care should check the value of 'num_jobs' after initialization. 83 """ 84 85 self.job = None 86 if num > 1: 87 stack_size = explicit_stack_size 88 if stack_size is None: 89 stack_size = default_stack_size 90 91 try: 92 self.job = Parallel(taskmaster, num, stack_size) 93 self.num_jobs = num 94 except NameError: 95 pass 96 if self.job is None: 97 self.job = Serial(taskmaster) 98 self.num_jobs = 1
99
100 - def run(self, postfunc=lambda: None):
101 """Run the jobs. 102 103 postfunc() will be invoked after the jobs has run. It will be 104 invoked even if the jobs are interrupted by a keyboard 105 interrupt (well, in fact by a signal such as either SIGINT, 106 SIGTERM or SIGHUP). The execution of postfunc() is protected 107 against keyboard interrupts and is guaranteed to run to 108 completion.""" 109 self._setup_sig_handler() 110 try: 111 self.job.start() 112 finally: 113 postfunc() 114 self._reset_sig_handler()
115
116 - def were_interrupted(self):
117 """Returns whether the jobs were interrupted by a signal.""" 118 return self.job.interrupted()
119
120 - def _setup_sig_handler(self):
121 """Setup an interrupt handler so that SCons can shutdown cleanly in 122 various conditions: 123 124 a) SIGINT: Keyboard interrupt 125 b) SIGTERM: kill or system shutdown 126 c) SIGHUP: Controlling shell exiting 127 128 We handle all of these cases by stopping the taskmaster. It 129 turns out that it very difficult to stop the build process 130 by throwing asynchronously an exception such as 131 KeyboardInterrupt. For example, the python Condition 132 variables (threading.Condition) and queue's do not seem to 133 asynchronous-exception-safe. It would require adding a whole 134 bunch of try/finally block and except KeyboardInterrupt all 135 over the place. 136 137 Note also that we have to be careful to handle the case when 138 SCons forks before executing another process. In that case, we 139 want the child to exit immediately. 140 """ 141 def handler(signum, stack, self=self, parentpid=os.getpid()): 142 if os.getpid() == parentpid: 143 self.job.taskmaster.stop() 144 self.job.interrupted.set() 145 else: 146 os._exit(2)
147 148 self.old_sigint = signal.signal(signal.SIGINT, handler) 149 self.old_sigterm = signal.signal(signal.SIGTERM, handler) 150 try: 151 self.old_sighup = signal.signal(signal.SIGHUP, handler) 152 except AttributeError: 153 pass
154
155 - def _reset_sig_handler(self):
156 """Restore the signal handlers to their previous state (before the 157 call to _setup_sig_handler().""" 158 159 signal.signal(signal.SIGINT, self.old_sigint) 160 signal.signal(signal.SIGTERM, self.old_sigterm) 161 try: 162 signal.signal(signal.SIGHUP, self.old_sighup) 163 except AttributeError: 164 pass
165
166 -class Serial(object):
167 """This class is used to execute tasks in series, and is more efficient 168 than Parallel, but is only appropriate for non-parallel builds. Only 169 one instance of this class should be in existence at a time. 170 171 This class is not thread safe. 172 """ 173
174 - def __init__(self, taskmaster):
175 """Create a new serial job given a taskmaster. 176 177 The taskmaster's next_task() method should return the next task 178 that needs to be executed, or None if there are no more tasks. The 179 taskmaster's executed() method will be called for each task when it 180 is successfully executed or failed() will be called if it failed to 181 execute (e.g. execute() raised an exception).""" 182 183 self.taskmaster = taskmaster 184 self.interrupted = InterruptState()
185
186 - def start(self):
187 """Start the job. This will begin pulling tasks from the taskmaster 188 and executing them, and return when there are no more tasks. If a task 189 fails to execute (i.e. execute() raises an exception), then the job will 190 stop.""" 191 192 while True: 193 task = self.taskmaster.next_task() 194 195 if task is None: 196 break 197 198 try: 199 task.prepare() 200 if task.needs_execute(): 201 task.execute() 202 except: 203 if self.interrupted(): 204 try: 205 raise SCons.Errors.BuildError( 206 task.targets[0], errstr=interrupt_msg) 207 except: 208 task.exception_set() 209 else: 210 task.exception_set() 211 212 # Let the failed() callback function arrange for the 213 # build to stop if that's appropriate. 214 task.failed() 215 else: 216 task.executed() 217 218 task.postprocess() 219 self.taskmaster.cleanup()
220 221 222 # Trap import failure so that everything in the Job module but the 223 # Parallel class (and its dependent classes) will work if the interpreter 224 # doesn't support threads. 225 try: 226 import queue 227 import threading 228 except ImportError: 229 pass 230 else:
231 - class Worker(threading.Thread):
232 """A worker thread waits on a task to be posted to its request queue, 233 dequeues the task, executes it, and posts a tuple including the task 234 and a boolean indicating whether the task executed successfully. """ 235
236 - def __init__(self, requestQueue, resultsQueue, interrupted):
237 threading.Thread.__init__(self) 238 self.setDaemon(1) 239 self.requestQueue = requestQueue 240 self.resultsQueue = resultsQueue 241 self.interrupted = interrupted 242 self.start()
243
244 - def run(self):
245 while True: 246 task = self.requestQueue.get() 247 248 if task is None: 249 # The "None" value is used as a sentinel by 250 # ThreadPool.cleanup(). This indicates that there 251 # are no more tasks, so we should quit. 252 break 253 254 try: 255 if self.interrupted(): 256 raise SCons.Errors.BuildError( 257 task.targets[0], errstr=interrupt_msg) 258 task.execute() 259 except: 260 task.exception_set() 261 ok = False 262 else: 263 ok = True 264 265 self.resultsQueue.put((task, ok))
266
267 - class ThreadPool(object):
268 """This class is responsible for spawning and managing worker threads.""" 269
270 - def __init__(self, num, stack_size, interrupted):
271 """Create the request and reply queues, and 'num' worker threads. 272 273 One must specify the stack size of the worker threads. The 274 stack size is specified in kilobytes. 275 """ 276 self.requestQueue = queue.Queue(0) 277 self.resultsQueue = queue.Queue(0) 278 279 try: 280 prev_size = threading.stack_size(stack_size*1024) 281 except AttributeError, e: 282 # Only print a warning if the stack size has been 283 # explicitly set. 284 if not explicit_stack_size is None: 285 msg = "Setting stack size is unsupported by this version of Python:\n " + \ 286 e.args[0] 287 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) 288 except ValueError, e: 289 msg = "Setting stack size failed:\n " + str(e) 290 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) 291 292 # Create worker threads 293 self.workers = [] 294 for _ in range(num): 295 worker = Worker(self.requestQueue, self.resultsQueue, interrupted) 296 self.workers.append(worker) 297 298 if 'prev_size' in locals(): 299 threading.stack_size(prev_size)
300
301 - def put(self, task):
302 """Put task into request queue.""" 303 self.requestQueue.put(task)
304
305 - def get(self):
306 """Remove and return a result tuple from the results queue.""" 307 return self.resultsQueue.get()
308
309 - def preparation_failed(self, task):
310 self.resultsQueue.put((task, False))
311
312 - def cleanup(self):
313 """ 314 Shuts down the thread pool, giving each worker thread a 315 chance to shut down gracefully. 316 """ 317 # For each worker thread, put a sentinel "None" value 318 # on the requestQueue (indicating that there's no work 319 # to be done) so that each worker thread will get one and 320 # terminate gracefully. 321 for _ in self.workers: 322 self.requestQueue.put(None) 323 324 # Wait for all of the workers to terminate. 325 # 326 # If we don't do this, later Python versions (2.4, 2.5) often 327 # seem to raise exceptions during shutdown. This happens 328 # in requestQueue.get(), as an assertion failure that 329 # requestQueue.not_full is notified while not acquired, 330 # seemingly because the main thread has shut down (or is 331 # in the process of doing so) while the workers are still 332 # trying to pull sentinels off the requestQueue. 333 # 334 # Normally these terminations should happen fairly quickly, 335 # but we'll stick a one-second timeout on here just in case 336 # someone gets hung. 337 for worker in self.workers: 338 worker.join(1.0) 339 self.workers = []
340
341 - class Parallel(object):
342 """This class is used to execute tasks in parallel, and is somewhat 343 less efficient than Serial, but is appropriate for parallel builds. 344 345 This class is thread safe. 346 """ 347
348 - def __init__(self, taskmaster, num, stack_size):
349 """Create a new parallel job given a taskmaster. 350 351 The taskmaster's next_task() method should return the next 352 task that needs to be executed, or None if there are no more 353 tasks. The taskmaster's executed() method will be called 354 for each task when it is successfully executed or failed() 355 will be called if the task failed to execute (i.e. execute() 356 raised an exception). 357 358 Note: calls to taskmaster are serialized, but calls to 359 execute() on distinct tasks are not serialized, because 360 that is the whole point of parallel jobs: they can execute 361 multiple tasks simultaneously. """ 362 363 self.taskmaster = taskmaster 364 self.interrupted = InterruptState() 365 self.tp = ThreadPool(num, stack_size, self.interrupted) 366 367 self.maxjobs = num
368
369 - def start(self):
370 """Start the job. This will begin pulling tasks from the 371 taskmaster and executing them, and return when there are no 372 more tasks. If a task fails to execute (i.e. execute() raises 373 an exception), then the job will stop.""" 374 375 jobs = 0 376 377 while True: 378 # Start up as many available tasks as we're 379 # allowed to. 380 while jobs < self.maxjobs: 381 task = self.taskmaster.next_task() 382 if task is None: 383 break 384 385 try: 386 # prepare task for execution 387 task.prepare() 388 except: 389 task.exception_set() 390 task.failed() 391 task.postprocess() 392 else: 393 if task.needs_execute(): 394 # dispatch task 395 self.tp.put(task) 396 jobs = jobs + 1 397 else: 398 task.executed() 399 task.postprocess() 400 401 if not task and not jobs: break 402 403 # Let any/all completed tasks finish up before we go 404 # back and put the next batch of tasks on the queue. 405 while True: 406 task, ok = self.tp.get() 407 jobs = jobs - 1 408 409 if ok: 410 task.executed() 411 else: 412 if self.interrupted(): 413 try: 414 raise SCons.Errors.BuildError( 415 task.targets[0], errstr=interrupt_msg) 416 except: 417 task.exception_set() 418 419 # Let the failed() callback function arrange 420 # for the build to stop if that's appropriate. 421 task.failed() 422 423 task.postprocess() 424 425 if self.tp.resultsQueue.empty(): 426 break 427 428 self.tp.cleanup() 429 self.taskmaster.cleanup()
430 431 # Local Variables: 432 # tab-width:4 433 # indent-tabs-mode:nil 434 # End: 435 # vim: set expandtab tabstop=4 shiftwidth=4: 436