summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBill Peck <bpeck@redhat.com>2011-02-22 10:50:02 -0500
committerBill Peck <bpeck@redhat.com>2011-03-02 16:17:26 -0500
commit3659ce62e133e5aa431cfefc36b07c2ff528aaef (patch)
treedd907454434081db6425a8b809445135b64883c3
parent5641bb2f3e23ad3602fbacfa75b9a20dcdadb1d6 (diff)
Jobs left in queued state foreverbeaker-distribution-beaker-setup-1_1-7
-rw-r--r--Server/bkr/server/model.py4
-rw-r--r--Server/bkr/server/test/test_deadrecipes.py95
-rwxr-xr-xServer/bkr/server/tools/beakerd.py71
-rwxr-xr-xTasks/distribution/beaker/setup/runtest.sh2
4 files changed, 165 insertions, 7 deletions
diff --git a/Server/bkr/server/model.py b/Server/bkr/server/model.py
index 690ac08..55c10fa 100644
--- a/Server/bkr/server/model.py
+++ b/Server/bkr/server/model.py
@@ -3011,7 +3011,7 @@ class Distro(MappedObject):
distro_requires.appendChild(xmland)
return distro_requires
- def systems_filter(self, user, filter):
+ def systems_filter(self, user, filter, join=['lab_controller']):
"""
Return Systems that match the following filter
<host>
@@ -3034,7 +3034,7 @@ class Distro(MappedObject):
"""
from needpropertyxml import ElementWrapper
import xmltramp
- systems = self.all_systems(user)
+ systems = self.all_systems(user, join)
#FIXME Should validate XML before processing.
queries = []
joins = []
diff --git a/Server/bkr/server/test/test_deadrecipes.py b/Server/bkr/server/test/test_deadrecipes.py
new file mode 100644
index 0000000..d0de8bc
--- /dev/null
+++ b/Server/bkr/server/test/test_deadrecipes.py
@@ -0,0 +1,95 @@
+import unittest
+import xmltramp
+
+from time import sleep
+import time
+from bkr.server.model import TaskStatus, Job, LabControllerDistro, Distro
+import sqlalchemy.orm
+from turbogears.database import session
+from bkr.server.test import data_setup
+from bkr.server.tools import beakerd
+from bkr.server.jobxml import XmlJob
+from bkr.server.jobs import Jobs
+
+import threading
+
+class TestBeakerd(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ # Create two unique labs
+ lab1 = data_setup.create_labcontroller(fqdn='lab_%d' %
+ int(time.time() * 1000))
+ lab2 = data_setup.create_labcontroller(fqdn='lab_%d' %
+ int(time.time() * 1000))
+
+ # Create two distros and only put one in each lab.
+ cls.distro1 = data_setup.create_distro()
+ cls.distro2 = data_setup.create_distro()
+ session.flush()
+ cls.distro1.lab_controller_assocs = [LabControllerDistro(
+ lab_controller=lab2
+ )
+ ]
+ cls.distro2.lab_controller_assocs = [LabControllerDistro(
+ lab_controller=lab1
+ )
+ ]
+
+ # Create a user
+ user = data_setup.create_user()
+
+ # Create two systems but only put them in lab1.
+ system1 = data_setup.create_system(owner=user)
+ system2 = data_setup.create_system(owner=user)
+ system1.lab_controller = lab1
+ system2.lab_controller = lab1
+
+ data_setup.create_task(name=u'/distribution/install')
+
+ # Create two jobs, one requiring distro1 and one requiring distro2
+ job = '''
+ <job>
+ <whiteboard>%s</whiteboard>
+ <recipeSet>
+ <recipe>
+ <distroRequires>
+ <distro_name op="=" value="%s" />
+ </distroRequires>
+ <hostRequires/>
+ <task name="/distribution/install" role="STANDALONE">
+ <params/>
+ </task>
+ </recipe>
+ </recipeSet>
+ </job>
+ '''
+ xmljob1 = XmlJob(xmltramp.parse(job % (cls.distro1.name, cls.distro1.name)))
+ xmljob2 = XmlJob(xmltramp.parse(job % (cls.distro2.name, cls.distro2.name)))
+
+ cls.job1 = Jobs().process_xmljob(xmljob1, user)
+ cls.job2 = Jobs().process_xmljob(xmljob2, user)
+ session.flush()
+
+ def test_01_invalid_system_distro_combo(self):
+ beakerd.new_recipes()
+ self.assertEqual(Job.by_id(self.job1.id).status, TaskStatus.by_name(u'Aborted'))
+ self.assertEqual(Job.by_id(self.job2.id).status, TaskStatus.by_name(u'Processed'))
+
+
+ def test_02_dead_recipes(self):
+ beakerd.new_recipes()
+ beakerd.processed_recipesets()
+ self.assertEqual(Job.by_id(self.job2.id).status, TaskStatus.by_name(u'Queued'))
+ # Remove distro2 from lab1, should cause remaining recipe to abort.
+ distro = Distro.by_id(self.distro2.id)
+ for lab in distro.lab_controllers[:]:
+ distro.lab_controllers.remove(lab)
+ session.flush()
+ beakerd.dead_recipes()
+ self.assertEqual(Job.by_id(self.job2.id).status, TaskStatus.by_name(u'Aborted'))
+
+
+ @classmethod
+ def teardownClass(cls):
+ pass
diff --git a/Server/bkr/server/tools/beakerd.py b/Server/bkr/server/tools/beakerd.py
index 7baa466..ae3a7ec 100755
--- a/Server/bkr/server/tools/beakerd.py
+++ b/Server/bkr/server/tools/beakerd.py
@@ -85,14 +85,36 @@ def new_recipes(*args):
try:
recipe = Recipe.by_id(_recipe.id)
if recipe.distro:
+ recipe.systems = []
+
+ # Do the query twice.
+
+ # First query verifies that the distro
+ # exists in at least one lab that has a macthing system.
systems = recipe.distro.systems_filter(
recipe.recipeset.job.owner,
- recipe.host_requires
+ recipe.host_requires,
+ join=['lab_controller',
+ '_distros',
+ 'distro'],
+ )\
+ .filter(Distro.install_name==
+ recipe.distro.install_name)
+ # Second query picksup all possible systems so that as
+ # distros appear in other labs those systems will be
+ # available.
+ all_systems = recipe.distro.systems_filter(
+ recipe.recipeset.job.owner,
+ recipe.host_requires,
+ join=['lab_controller'],
)
- recipe.systems = []
- for system in systems:
- # Add matched systems to recipe.
- recipe.systems.append(system)
+ # based on above queries, condition on systems but add
+ # all_systems.
+ if systems.count():
+ for system in all_systems:
+ # Add matched systems to recipe.
+ recipe.systems.append(system)
+
# If the recipe only matches one system then bump its priority.
if len(recipe.systems) == 1:
try:
@@ -233,6 +255,44 @@ def processed_recipesets(*args):
log.debug("Exiting processed_recipes routine")
return True
+def dead_recipes(*args):
+ recipes = Recipe.query()\
+ .join('status')\
+ .outerjoin(['systems'])\
+ .outerjoin(['distro',
+ 'lab_controller_assocs',
+ 'lab_controller'])\
+ .filter(
+ or_(
+ and_(Recipe.status==TaskStatus.by_name(u'Queued'),
+ System.id==None,
+ ),
+ and_(Recipe.status==TaskStatus.by_name(u'Queued'),
+ LabController.id==None,
+ ),
+ )
+ )
+
+ log.debug("Entering dead_recipes routine")
+ for _recipe in recipes:
+ session.begin()
+ try:
+ recipe = Recipe.by_id(_recipe.id)
+ if len(recipe.systems) == 0:
+ msg = "R:%s does not match any systems, aborting." % recipe.id
+ log.info(msg)
+ recipe.recipeset.abort(msg)
+ if len(recipe.distro.lab_controller_assocs) == 0:
+ msg = "R:%s does not have a valid distro, aborting." % recipe.id
+ log.info(msg)
+ recipe.recipeset.abort(msg)
+ session.commit()
+ except exceptions.Exception, e:
+ session.rollback()
+ log.exception("Failed to commit due to :%s" % e)
+ session.close()
+ log.debug("Exiting dead_recipes routine")
+
def queued_recipes(*args):
automated = SystemStatus.by_name(u'Automated')
recipes = Recipe.query()\
@@ -477,6 +537,7 @@ def schedule():
log.debug("starting scheduled recipes Thread")
# Run scheduled_recipes in this process
while True:
+ dead_recipes()
queued = queued_recipes()
scheduled = scheduled_recipes()
if not queued and not scheduled:
diff --git a/Tasks/distribution/beaker/setup/runtest.sh b/Tasks/distribution/beaker/setup/runtest.sh
index 0dc76e1..091a654 100755
--- a/Tasks/distribution/beaker/setup/runtest.sh
+++ b/Tasks/distribution/beaker/setup/runtest.sh
@@ -175,6 +175,8 @@ function generate_beaker_cfg()
sqlalchemy.dburi="mysql://beaker:beaker@localhost/beaker"
sqlalchemy.pool_recycle = 3600
+basepath.rpms = '/tmp/beaker-tests-rpms'
+basepath.repos = '/tmp/beaker-tests-repos'
# if you are using a database or table type without transactions
# (MySQL default, for example), you should turn off transactions