forked from Lainports/freebsd-ports
one 'head' node, rather than just pointyhat itself. Constants are factored out into installation-specific files known as portbuild/conf/server.conf and portbuild/conf/client.conf. There is only one server.conf file. Individual <arch> directories may have their own client.conf files, or may symlink to ../conf/client.conf. Note the removal of the hard-coding of INDEX.N, where N has to be a single digit, and also have no '.' in it. Feature safe: yes
597 lines
18 KiB
Python
Executable file
597 lines
18 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
# Improved build dispatcher. Invoked on server-side from dopackages.
|
|
|
|
# We try to build leaf packages (those
|
|
# which can be built immediately without requiring additional
|
|
# dependencies to be built) in the order such that the ones required
|
|
# by the longest dependency chains are built first.
|
|
#
|
|
# This has the effect of favouring deep parts of the package tree and
|
|
# evening out the depth over time, hopefully avoiding the situation
|
|
# where the entire cluster waits for a deep part of the tree to
|
|
# build on a small number of machines
|
|
#
|
|
# We can dynamically respond to changes in build machine availability,
|
|
# since the queue manager will block jobs that cannot be immediately
|
|
# satisfied and will unblock us when a job slot becomes available.
|
|
#
|
|
# When a package build fails, it is requeued with a lower priority
|
|
# such that it will rebuild again as soon as no "phase 1" packages
|
|
# are available to build. This prevents the cluster staying idle
|
|
# until the last phase 1 package builds.
|
|
#
|
|
# Other advantages are that this system is easily customizable and in
|
|
# the future will let us customize things like the matching policy of
|
|
# jobs to machines. For example, we could avoid dispatching multiple
|
|
# openoffice builds to the same system.
|
|
#
|
|
# TODO:
|
|
# * Combine build prep stages?
|
|
# - initial check for file up-to-date
|
|
# * check mtime for package staleness (cf make)
|
|
# * option to skip phase 2
|
|
|
|
from qmanagerclient import *
|
|
|
|
from freebsd_config import *
|
|
|
|
import os, string, sys, threading, time, subprocess
|
|
#import random
|
|
from itertools import chain
|
|
#import gc
|
|
from stat import *
|
|
|
|
from Queue import Queue
|
|
from heapq import *
|
|
|
|
CONFIG_DIR="/var/portbuild"
|
|
CONFIG_SUBDIR="conf"
|
|
CONFIG_FILENAME="server.conf"
|
|
|
|
config = getConfig( CONFIG_DIR, CONFIG_SUBDIR, CONFIG_FILENAME )
|
|
QMANAGER_PRIORITY_PACKAGES = string.split( \
|
|
config.get( 'QMANAGER_PRIORITY_PACKAGES' ) )
|
|
|
|
categories = {}
|
|
ports = {}
|
|
|
|
# When a build fails we requeue it with a lower priority such that it
|
|
# will never preempt a phase 1 build but will build when spare
|
|
# capacity is available.
|
|
PHASE2_BASE_PRIO=1000
|
|
|
|
# Process success quickly so other jobs are started
|
|
SUCCESS_PRIO = -1000
|
|
|
|
# Failure should be a less common event :)
|
|
FAILURE_PRIO = -900
|
|
|
|
# Port status codes
|
|
PENDING = 1 # Yet to build
|
|
PHASE2 = 2 # Failed once
|
|
|
|
class PriorityQueue(Queue):
|
|
"""Variant of Queue that retrieves open entries in
|
|
priority order (lowest first).
|
|
Entries are typically tuples of the form: (priority number,
|
|
data)
|
|
This class can be found at: Python-2.6a3/Lib/Queue.py
|
|
"""
|
|
maxsize = 0
|
|
|
|
def _init(self, maxsize):
|
|
self.queue = []
|
|
|
|
def _qsize(self, len=len):
|
|
return len(self.queue)
|
|
|
|
def _put(self, item, heappush=heappush):
|
|
heappush(self.queue, item)
|
|
|
|
def _get(self, heappop=heappop):
|
|
return heappop(self.queue)
|
|
|
|
class Index(object):
|
|
|
|
def __init__(self, indexfile):
|
|
self.indexfile = indexfile
|
|
|
|
def parse(self, targets = None):
|
|
|
|
print "[MASTER] Read index"
|
|
f = file(self.indexfile)
|
|
index = f.readlines()
|
|
f.close()
|
|
f = None
|
|
del f
|
|
|
|
lines=[]
|
|
print "[MASTER] Phase 1"
|
|
for i in index:
|
|
(name, path, prefix, comment, descr, maintainer, categories, bdep,
|
|
rdep, www, edep, pdep, fdep) = i.rstrip().split("|")
|
|
|
|
if targets is None or name in targets:
|
|
lines.append((name, bdep, rdep, edep, pdep, fdep))
|
|
|
|
Port(name, path, "", "", "", "",
|
|
categories, "")
|
|
index = None
|
|
del index
|
|
|
|
print "[MASTER] Phase 2"
|
|
for (name, bdep, rdep, edep, pdep, fdep) in lines:
|
|
ports[name].setdeps(bdep, rdep, edep, pdep, fdep)
|
|
|
|
lines = None
|
|
del lines
|
|
print "[MASTER] Done"
|
|
|
|
def depthindex(targets):
|
|
""" Initial population of depth tree """
|
|
|
|
for i in targets:
|
|
i.depth_recursive()
|
|
|
|
class Port(object):
|
|
|
|
def __init__(self, name, path, prefix, comment, descr, maintainer,
|
|
cats, www):
|
|
|
|
__slots__ = ["name", "path", "prefix", "comment", "descr",
|
|
"maintainer", "www", "bdep", "rdep", "edep", "pdep",
|
|
"fdep", "alldep", "parents", "depth", "categories"]
|
|
|
|
self.name = name
|
|
self.path = path
|
|
self.prefix = prefix
|
|
self.comment = comment
|
|
self.descr = descr
|
|
self.maintainer = maintainer
|
|
self.www = www
|
|
|
|
# Populated later
|
|
self.bdep = []
|
|
self.rdep = []
|
|
self.edep = []
|
|
self.pdep = []
|
|
self.fdep = []
|
|
|
|
self.alldep = []
|
|
self.parents = []
|
|
self.id = None # XXX
|
|
|
|
self.status = PENDING
|
|
|
|
# Whether the package build has completed and is hanging around
|
|
# to resolve dependencies for others XXX use status
|
|
self.done = False
|
|
|
|
# Depth is the maximum length of the dependency chain of this port
|
|
self.depth = None
|
|
|
|
self.categories=[]
|
|
scats = cats.split()
|
|
if len(scats) != len(set(scats)):
|
|
print "[MASTER] Warning: port %s includes duplicated categories: %s" % (name, cats)
|
|
|
|
for c in set(scats):
|
|
try:
|
|
cat = categories[c]
|
|
except KeyError:
|
|
cat = Category(c)
|
|
|
|
self.categories.append(cat)
|
|
cat.add(self)
|
|
|
|
ports[name] = self
|
|
|
|
def remove(self):
|
|
""" Clean ourselves up but don't touch references in other objects;
|
|
they still need to know about us as dependencies etc """
|
|
|
|
self.fdep = None
|
|
self.edep = None
|
|
self.pdep = None
|
|
self.bdep = None
|
|
self.rdep = None
|
|
self.alldep = None
|
|
self.parents = None
|
|
|
|
for cat in self.categories:
|
|
cat.remove(self)
|
|
|
|
ports[self.name] = None
|
|
del ports[self.name]
|
|
del self
|
|
|
|
def destroy(self):
|
|
""" Remove a package and all references to it """
|
|
|
|
for pkg in self.alldep:
|
|
if pkg.parents is not None:
|
|
# Already removed but not destroyed
|
|
try:
|
|
pkg.parents.remove(self)
|
|
except ValueError:
|
|
continue
|
|
|
|
for pkg in self.parents:
|
|
try:
|
|
pkg.fdep.remove(self)
|
|
except ValueError:
|
|
pass
|
|
try:
|
|
pkg.edep.remove(self)
|
|
except ValueError:
|
|
pass
|
|
try:
|
|
pkg.pdep.remove(self)
|
|
except ValueError:
|
|
pass
|
|
try:
|
|
pkg.bdep.remove(self)
|
|
except ValueError:
|
|
pass
|
|
try:
|
|
pkg.rdep.remove(self)
|
|
except ValueError:
|
|
pass
|
|
pkg.alldep.remove(self)
|
|
|
|
sys.exc_clear()
|
|
|
|
self.remove()
|
|
|
|
def setdeps(self, bdep, rdep, edep, pdep, fdep):
|
|
self.fdep = [ports[p] for p in fdep.split()]
|
|
self.edep = [ports[p] for p in edep.split()]
|
|
self.pdep = [ports[p] for p in pdep.split()]
|
|
self.bdep = [ports[p] for p in bdep.split()]
|
|
self.rdep = [ports[p] for p in rdep.split()]
|
|
|
|
self.alldep = list(set(chain(self.fdep, self.edep, self.pdep,
|
|
self.bdep, self.rdep)))
|
|
|
|
for p in self.alldep:
|
|
p.parents.append(self)
|
|
|
|
def depth_recursive(self):
|
|
|
|
"""
|
|
Recursively populate the depth tree up from a given package
|
|
through dependencies, assuming empty values on entries not yet
|
|
visited
|
|
"""
|
|
|
|
if self.depth is None:
|
|
if len(self.parents) > 0:
|
|
max = 0
|
|
for i in self.parents:
|
|
w = i.depth_recursive()
|
|
if w > max:
|
|
max = w
|
|
self.depth = max + 1
|
|
else:
|
|
self.depth = 1
|
|
for port in QMANAGER_PRIORITY_PACKAGES:
|
|
if self.name.startswith(port):
|
|
# Artificial boost to try and get it building earlier
|
|
self.depth = 100
|
|
return self.depth
|
|
|
|
def destroy_recursive(self):
|
|
""" Remove a port and everything that depends on it """
|
|
|
|
parents=set([self])
|
|
|
|
while len(parents) > 0:
|
|
pkg = parents.pop()
|
|
assert pkg.depth is not None
|
|
parents.update(pkg.parents)
|
|
pkg.destroy()
|
|
|
|
def success(self):
|
|
""" Build succeeded and possibly uncovered some new leaves """
|
|
|
|
parents = self.parents[:]
|
|
self.done = True
|
|
self.remove()
|
|
|
|
newleafs = [p for p in parents if all(c.done for c in p.alldep)]
|
|
return newleafs
|
|
|
|
def failure(self):
|
|
""" Build failed """
|
|
|
|
self.destroy_recursive()
|
|
|
|
def packagename(self, arch, branch, buildid):
|
|
""" Return the path where a package may be found"""
|
|
|
|
return "/var/portbuild/%s/%s/builds/%s/packages/All/%s.tbz" \
|
|
% (arch, branch, buildid, self.name)
|
|
|
|
def is_stale(self, arch, branch, buildid):
|
|
""" Does a package need to be (re)-built?
|
|
|
|
Returns: False: if it exists and has newer mtime than all of
|
|
its dependencies.
|
|
True: otherwise
|
|
"""
|
|
|
|
my_pkgname = self.packagename(arch, branch, buildid)
|
|
pkg_exists = os.path.exists(my_pkgname)
|
|
|
|
if pkg_exists:
|
|
my_mtime = os.stat(my_pkgname)[ST_MTIME]
|
|
|
|
dep_packages = [pkg.packagename(arch, branch, buildid)
|
|
for pkg in self.alldep]
|
|
deps_exist = all(os.path.exists(pkg) for pkg in dep_packages)
|
|
return not (pkg_exists and deps_exist and
|
|
all(os.stat(pkg)[ST_MTIME] <= my_mtime
|
|
for pkg in dep_packages))
|
|
|
|
class Category(object):
|
|
def __init__(self, name):
|
|
self.name = name
|
|
self.ports = {}
|
|
categories[name] = self
|
|
|
|
def add(self, port):
|
|
self.ports[port] = port
|
|
|
|
def remove(self, port):
|
|
self.ports[port]=None
|
|
del self.ports[port]
|
|
|
|
def gettargets(targets):
|
|
""" split command line arguments into list of packages to build.
|
|
Returns set or iterable of all ports that will be built including
|
|
dependencies """
|
|
|
|
plist = set()
|
|
if len(targets) == 0:
|
|
targets = ["all"]
|
|
for i in targets:
|
|
if i == "all":
|
|
return ports.itervalues()
|
|
|
|
if i.endswith("-all"):
|
|
cat = i.rpartition("-")[0]
|
|
plist.update(p.name for p in categories[cat].ports)
|
|
elif i.rstrip(".tbz") in ports:
|
|
plist.update([ports[i.rstrip(".tbz")].name])
|
|
else:
|
|
raise KeyError, i
|
|
|
|
# Compute transitive closure of all dependencies
|
|
pleft=plist.copy()
|
|
while len(pleft) > 0:
|
|
pkg = pleft.pop()
|
|
new = [p.name for p in ports[pkg].alldep]
|
|
plist.update(new)
|
|
pleft.update(new)
|
|
|
|
for p in set(ports.keys()).difference(plist):
|
|
ports[p].destroy()
|
|
|
|
return [ports[p] for p in plist]
|
|
|
|
class worker(threading.Thread):
|
|
|
|
# Protects threads
|
|
lock = threading.Lock()
|
|
|
|
# Running threads, used for collecting status
|
|
threads = {}
|
|
|
|
def __init__(self, mach, job, arch, branch, buildid, queue):
|
|
threading.Thread.__init__(self)
|
|
self.machine = mach
|
|
self.job = job
|
|
self.arch = arch
|
|
self.branch = branch
|
|
self.buildid = buildid
|
|
self.queue = queue
|
|
|
|
self.setDaemon(True)
|
|
|
|
def run(self):
|
|
pkg = self.job
|
|
|
|
print "[MASTER] Running job %s" % (pkg.name),
|
|
if pkg.status == PHASE2:
|
|
print " (phase 2)"
|
|
else:
|
|
print
|
|
try:
|
|
runenv={'HOME':"/root",
|
|
'PATH':'/sbin:/bin:/usr/sbin:/usr/bin:/usr/games:/usr/local/sbin:/usr/local/bin:/var/portbuild/scripts',
|
|
'FD':" ".join(["%s.tbz" % p.name for p in pkg.fdep]),
|
|
'ED':" ".join(["%s.tbz" % p.name for p in pkg.edep]),
|
|
'PD':" ".join(["%s.tbz" % p.name for p in pkg.pdep]),
|
|
'BD':" ".join(["%s.tbz" % p.name for p in pkg.bdep]),
|
|
'RD':" ".join(["%s.tbz" % p.name for p in pkg.rdep])}
|
|
for var in ["NOCLEAN", "NO_RESTRICTED", "NOPLISTCHECK", "NO_DISTFILES", "FETCH_ORIGINAL", "TRYBROKEN" ]:
|
|
if var in os.environ:
|
|
runenv[var] = os.environ.get(var)
|
|
build = subprocess.Popen(
|
|
["/bin/sh", "/var/portbuild/scripts/pdispatch",
|
|
self.arch, self.branch, self.buildid, self.machine,
|
|
"/var/portbuild/scripts/portbuild", "%s.tbz" % pkg.name,
|
|
pkg.path],
|
|
env=runenv,
|
|
stderr=subprocess.STDOUT, stdout=subprocess.PIPE, bufsize=0)
|
|
except OSError, e:
|
|
print >>sys.stderr, "[%s:%s]: Execution failed: %s" % \
|
|
(pkg.id, pkg.name, e)
|
|
while True:
|
|
try:
|
|
line = build.stdout.readline()
|
|
except:
|
|
print "[%s:%s]: Failed reading from build script" % \
|
|
(pkg.id, pkg.name)
|
|
break
|
|
if line == "":
|
|
break
|
|
print "[%s:%s] %s" % (pkg.id, pkg.name, line.rstrip())
|
|
|
|
retcode = build.wait()
|
|
|
|
# time.sleep(random.randint(0,60))
|
|
#
|
|
# r = random.random()
|
|
# if r < 0.1:
|
|
# retcode = 1
|
|
# elif r < 0.15:
|
|
# retcode = 254
|
|
# else:
|
|
# retcode = 0
|
|
|
|
conn = QManagerClientConn(stderr = sys.stderr)
|
|
timeout = 1
|
|
try:
|
|
(code, vars) = conn.command("release", {'id':pkg.id})
|
|
except RequestError, e:
|
|
print "[MASTER] Error releasing job %s (%s): %s" % (pkg.name, pkg.id, e.value)
|
|
|
|
if retcode == 254:
|
|
# Requeue soft failure at original priority
|
|
# XXX exponential backoff?
|
|
time.sleep(60)
|
|
# print "Requeueing %s" % pkg.id
|
|
self.queue.put((-pkg.depth, pkg))
|
|
elif retcode == 253:
|
|
# setting up a machine, we should immediately retry
|
|
self.queue.put((-pkg.depth, pkg))
|
|
elif retcode == 0:
|
|
self.queue.put((SUCCESS_PRIO, pkg))
|
|
else:
|
|
self.queue.put((FAILURE_PRIO, pkg))
|
|
|
|
# Clean up
|
|
worker.lock.acquire()
|
|
worker.threads[self]=None
|
|
del worker.threads[self]
|
|
worker.lock.release()
|
|
|
|
@staticmethod
|
|
def dispatch(mach, job, arch, branch, buildid, queue):
|
|
wrk = worker(mach, job, arch, branch, buildid, queue)
|
|
|
|
worker.lock.acquire()
|
|
worker.threads[wrk] = wrk
|
|
worker.lock.release()
|
|
|
|
wrk.start()
|
|
|
|
def main(arch, branch, buildid, args):
|
|
global index
|
|
|
|
basedir="/var/portbuild/"+arch+"/"+branch+"/builds/"+buildid
|
|
portsdir=basedir+"/ports"
|
|
|
|
indexfile=portsdir+"/INDEX-"+branch
|
|
|
|
print "[MASTER] parseindex..."
|
|
index = Index(indexfile)
|
|
index.parse()
|
|
print "[MASTER] length = %s" % len(ports)
|
|
|
|
print "[MASTER] Finding targets..."
|
|
targets = gettargets(args)
|
|
|
|
print "[MASTER] Calculating depth..."
|
|
depthindex(targets)
|
|
|
|
print "[MASTER] Pruning duds..."
|
|
dudsfile=basedir+"/duds"
|
|
for line in file(dudsfile):
|
|
try:
|
|
dud = ports[line.rstrip()]
|
|
except KeyError:
|
|
continue
|
|
print "[MASTER] Skipping %s (duds)" % dud.name
|
|
dud.destroy_recursive()
|
|
|
|
queue = PriorityQueue()
|
|
# XXX can do this while parsing index if we prune targets/duds
|
|
# first
|
|
for pkg in ports.itervalues():
|
|
if len(pkg.alldep) == 0:
|
|
queue.put((-pkg.depth, pkg))
|
|
|
|
# XXX check osversion, pool
|
|
mdl=["arch = %s" % arch]
|
|
|
|
# Main work loop
|
|
while len(ports) > 0:
|
|
print "[MASTER] Ports remaining=%s, Queue length=%s" % (len(ports), queue.qsize())
|
|
|
|
if len(ports) < 10:
|
|
print "[MASTER] Remaining ports: %s" % ports.keys()
|
|
|
|
(prio, job) = queue.get()
|
|
if prio == SUCCESS_PRIO:
|
|
print "[MASTER] Job %s succeeded" % job.name
|
|
for new in job.success():
|
|
queue.put((-new.depth, new))
|
|
continue
|
|
elif prio == FAILURE_PRIO:
|
|
if job.status == PHASE2:
|
|
print "[MASTER] Job %s failed" % job.name
|
|
job.failure()
|
|
continue
|
|
else:
|
|
# Requeue at low priority
|
|
print "[MASTER] Job %s failed (requeued for phase 2)" % job.name
|
|
job.status = PHASE2
|
|
queue.put((PHASE2_BASE_PRIO-job.depth, job))
|
|
continue
|
|
elif job.status == PHASE2:
|
|
depth = -(prio - PHASE2_BASE_PRIO)
|
|
else:
|
|
depth = -prio
|
|
|
|
print "[MASTER] Working on job %s, depth %d" % (job.name, depth)
|
|
if job.is_stale(arch, branch, buildid):
|
|
conn = QManagerClientConn(stderr = sys.stderr)
|
|
(code, vars) = conn.command("acquire",
|
|
{"name":job.name,
|
|
"type":"%s/%s/%s package" % \
|
|
(arch, branch, buildid),
|
|
"priority":10, "mdl":mdl})
|
|
|
|
if code[0] == "2":
|
|
machine=vars['machine']
|
|
job.id=vars['id']
|
|
# print "Got ID %s" % job.id
|
|
|
|
worker.dispatch(machine, job, arch, branch, buildid, queue)
|
|
else:
|
|
print "[MASTER] Error acquiring job %s: %s" % (pkg.name, code)
|
|
else:
|
|
print "[MASTER] Skipping %s since it already exists" % job.name
|
|
for new in job.success():
|
|
queue.put((-new.depth, new))
|
|
|
|
print "[MASTER] Waiting for threads"
|
|
threads = worker.threads.copy()
|
|
|
|
for t in threads:
|
|
print "[MASTER] Outstanding thread: %s" % t.job.name
|
|
|
|
for t in threads:
|
|
print "[MASTER] Waiting for thread %s" % t.job.name
|
|
t.join()
|
|
|
|
print "[MASTER] Finished"
|
|
|
|
if __name__ == "__main__":
|
|
# from guppy import hpy; h = hpy()
|
|
|
|
main(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4:])
|
|
|