3 # Thomas Nagy, 2015 (ita)
6 Execute commands through pre-forked servers. This tool creates as many servers as build threads.
7 On a benchmark executed on Linux Kubuntu 14, 8 virtual cores and SSD drive::
9 ./genbench.py /tmp/build 200 100 15 5
11 # no prefork: 2m7.179s
17 # optional, will spawn 40 servers early
25 The servers and the build process are using a shared nonce to prevent undesirable external connections.
28 import os, re, socket, threading, sys, subprocess, time, atexit, traceback, random, signal
32 import socketserver as SocketServer
34 from queue import Queue
36 from Queue import Queue
40 import pickle as cPickle
49 def make_header(params, cookie=''):
50 header = ','.join(params)
51 header = header.ljust(HEADER_SIZE - len(cookie))
52 assert(len(header) == HEADER_SIZE - len(cookie))
53 header = header + cookie
54 if sys.hexversion > 0x3000000:
55 header = header.encode('iso8859-1')
58 def safe_compare(x, y):
60 for (a, b) in zip(x, y):
61 sum |= ord(a) ^ ord(b)
64 re_valid_query = re.compile('^[a-zA-Z0-9_, ]+$')
65 class req(SocketServer.StreamRequestHandler):
68 while self.process_command():
70 except KeyboardInterrupt:
72 except Exception as e:
75 def send_response(self, ret, out, err, exc):
77 data = (out, err, exc)
78 data = cPickle.dumps(data, -1)
82 params = [RES, str(ret), str(len(data))]
84 # no need for the cookie in the response
85 self.wfile.write(make_header(params))
87 self.wfile.write(data)
90 def process_command(self):
91 query = self.rfile.read(HEADER_SIZE)
95 assert(len(query) == HEADER_SIZE)
96 if sys.hexversion > 0x3000000:
97 query = query.decode('iso8859-1')
101 if not safe_compare(key, SHARED_KEY):
102 print('%r %r' % (key, SHARED_KEY))
103 self.send_response(-1, '', '', 'Invalid key given!')
108 if not re_valid_query.match(query):
109 self.send_response(-1, '', '', 'Invalid query %r' % query)
110 raise ValueError('Invalid query %r' % query)
112 query = query.strip().split(',')
115 self.run_command(query[1:])
116 elif query[0] == BYE:
117 raise ValueError('Exit')
119 raise ValueError('Invalid query %r' % query)
122 def run_command(self, query):
125 data = self.rfile.read(size)
126 assert(len(data) == size)
127 kw = cPickle.loads(data)
130 ret = out = err = exc = None
136 if kw['stdout'] or kw['stderr']:
137 p = subprocess.Popen(cmd, **kw)
138 (out, err) = p.communicate()
141 ret = subprocess.Popen(cmd, **kw).wait()
142 except KeyboardInterrupt:
144 except Exception as e:
146 exc = str(e) + traceback.format_exc()
148 self.send_response(ret, out, err, exc)
150 def create_server(conn, cls):
151 # child processes do not need the key, so we remove it from the OS environment
153 SHARED_KEY = os.environ['SHARED_KEY']
154 os.environ['SHARED_KEY'] = ''
156 ppid = int(os.environ['PREFORKPID'])
168 os.kill(os.getpid(), signal.SIGKILL)
169 t = threading.Thread(target=reap)
173 server = SocketServer.TCPServer(conn, req)
174 print(server.server_address[1])
176 #server.timeout = 6000 # seconds
177 server.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
179 server.serve_forever(poll_interval=0.001)
180 except KeyboardInterrupt:
183 if __name__ == '__main__':
184 conn = ("127.0.0.1", 0)
185 #print("listening - %r %r\n" % conn)
186 create_server(conn, req)
189 from waflib import Logs, Utils, Runner, Errors, Options
191 def init_task_pool(self):
192 # lazy creation, and set a common pool for all task consumers
193 pool = self.pool = []
194 for i in range(self.numjobs):
195 consumer = Runner.get_pool()
196 pool.append(consumer)
198 self.ready = Queue(0)
200 consumer.ready = self.ready
202 threading.current_thread().idx = consumer.idx
203 except Exception as e:
208 Runner.Parallel.init_task_pool = init_task_pool
210 def make_server(bld, idx):
211 cmd = [sys.executable, os.path.abspath(__file__)]
212 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
215 def make_conn(bld, srv):
217 conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
218 conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
219 conn.connect(('127.0.0.1', port))
226 global SERVERS, CONNS
239 atexit.register(close_all)
241 def put_data(conn, data):
243 while cnt < len(data):
244 sent = conn.send(data[cnt:])
246 raise RuntimeError('connection ended')
249 def read_data(conn, siz):
253 data = conn.recv(min(siz - cnt, 1024))
255 raise RuntimeError('connection ended %r %r' % (cnt, siz))
258 if sys.hexversion > 0x3000000:
259 ret = ''.encode('iso8859-1').join(buf)
264 def exec_command(self, cmd, **kw):
266 if kw['stdout'] not in (None, subprocess.PIPE):
267 return self.exec_command_old(cmd, **kw)
269 if kw['stderr'] not in (None, subprocess.PIPE):
270 return self.exec_command_old(cmd, **kw)
272 kw['shell'] = isinstance(cmd, str)
273 Logs.debug('runner: %r' % cmd)
274 Logs.debug('runner_env: kw=%s' % kw)
277 self.logger.info(cmd)
279 if 'stdout' not in kw:
280 kw['stdout'] = subprocess.PIPE
281 if 'stderr' not in kw:
282 kw['stderr'] = subprocess.PIPE
284 if Logs.verbose and not kw['shell'] and not Utils.check_exe(cmd[0]):
285 raise Errors.WafError("Program %s not found!" % cmd[0])
287 idx = threading.current_thread().idx
291 #print("sub %r %r" % (idx, cmd))
292 #print("write to %r %r" % (idx, cmd))
294 data = cPickle.dumps(kw, -1)
295 params = [REQ, str(len(data))]
296 header = make_header(params, self.SHARED_KEY)
300 put_data(conn, header + data)
301 #put_data(conn, data)
303 #print("running %r %r" % (idx, cmd))
304 #print("read from %r %r" % (idx, cmd))
306 data = read_data(conn, HEADER_SIZE)
307 if sys.hexversion > 0x3000000:
308 data = data.decode('iso8859-1')
310 #print("received %r" % data)
311 lst = data.split(',')
317 data = read_data(conn, dlen)
318 (out, err, exc) = cPickle.loads(data)
320 raise Errors.WafError('Execution failure: %s' % exc)
323 if not isinstance(out, str):
324 out = out.decode(sys.stdout.encoding or 'iso8859-1')
326 self.logger.debug('out: %s' % out)
328 Logs.info(out, extra={'stream':sys.stdout, 'c1': ''})
330 if not isinstance(err, str):
331 err = err.decode(sys.stdout.encoding or 'iso8859-1')
333 self.logger.error('err: %s' % err)
335 Logs.info(err, extra={'stream':sys.stderr, 'c1': ''})
341 key = ctx.SHARED_KEY = os.environ['SHARED_KEY']
343 key = "".join([chr(random.SystemRandom().randint(40, 126)) for x in range(20)])
344 os.environ['SHARED_KEY'] = ctx.SHARED_KEY = key
346 os.environ['PREFORKPID'] = str(os.getpid())
349 def init_servers(ctx, maxval):
350 while len(SERVERS) < maxval:
352 srv = make_server(ctx, i)
354 while len(CONNS) < maxval:
358 # postpone the connection
359 srv.port = int(srv.stdout.readline())
364 conn = make_conn(ctx, srv)
369 raise ValueError('Could not start the server!')
370 if srv.poll() is not None:
371 Logs.warn('Looks like it it not our server process - concurrent builds are unsupported at this stage')
372 raise ValueError('Could not start the server')
376 if not getattr(Options.options, 'smp', getattr(self, 'smp', None)):
378 if Utils.unversioned_sys_platform() in ('freebsd',):
380 cmd = ['cpuset', '-l', '0', '-p', str(pid)]
381 elif Utils.unversioned_sys_platform() in ('linux',):
383 cmd = ['taskset', '-pc', '0', str(pid)]
385 self.cmd_and_log(cmd, quiet=0)
389 init_servers(opt, 40)
390 opt.add_option('--pin-process', action='store_true', dest='smp', default=False)
393 if bld.cmd == 'clean':
397 init_servers(bld, bld.jobs)
400 bld.__class__.exec_command_old = bld.__class__.exec_command
401 bld.__class__.exec_command = exec_command