summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBill Peck <bpeck@redhat.com>2011-02-22 10:50:02 -0500
committerBill Peck <bpeck@redhat.com>2011-03-01 14:18:42 -0500
commit948766f832ead337f15e14c2287af3ab199a1451 (patch)
treebd612592ee3d1bbaac749bde78e292e8f84aa1fd
parent5641bb2f3e23ad3602fbacfa75b9a20dcdadb1d6 (diff)
Jobs left in queued state foreverbeaker-distribution-beaker-setup-1_1-6
Still need to add test cases. Bug: 668473 Change-Id: Ib7c92110349c39cd498981cd95035fc1ed2b6cb2
-rw-r--r--Server/bkr/server/model.py4
-rw-r--r--Server/bkr/server/test/test_deadrecipes.py71
-rwxr-xr-xServer/bkr/server/tools/beakerd.py69
3 files changed, 137 insertions, 7 deletions
diff --git a/Server/bkr/server/model.py b/Server/bkr/server/model.py
index 690ac08..55c10fa 100644
--- a/Server/bkr/server/model.py
+++ b/Server/bkr/server/model.py
@@ -3011,7 +3011,7 @@ class Distro(MappedObject):
distro_requires.appendChild(xmland)
return distro_requires
- def systems_filter(self, user, filter):
+ def systems_filter(self, user, filter, join=['lab_controller']):
"""
Return Systems that match the following filter
<host>
@@ -3034,7 +3034,7 @@ class Distro(MappedObject):
"""
from needpropertyxml import ElementWrapper
import xmltramp
- systems = self.all_systems(user)
+ systems = self.all_systems(user, join)
#FIXME Should validate XML before processing.
queries = []
joins = []
diff --git a/Server/bkr/server/test/test_deadrecipes.py b/Server/bkr/server/test/test_deadrecipes.py
new file mode 100644
index 0000000..fd6f6b6
--- /dev/null
+++ b/Server/bkr/server/test/test_deadrecipes.py
@@ -0,0 +1,71 @@
+import unittest
+
+from time import sleep
+import time
+from bkr.server.model import TaskStatus, Job, LabControllerDistro
+import sqlalchemy.orm
+from turbogears.database import session
+from bkr.server.test import data_setup
+from bkr.server.tools import beakerd
+import threading
+
+class TestBeakerd(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ data_setup.create_test_env('min')
+ session.flush()
+
+ def setUp(self):
+ # Create two unique labs
+ lab1 = data_setup.create_labcontroller(fqdn='lab_%d' %
+ int(time.time() * 1000))
+ lab2 = data_setup.create_labcontroller(fqdn='lab_%d' %
+ int(time.time() * 1000))
+
+ # Create two distros and only put one in each lab.
+ self.distro1 = data_setup.create_distro()
+ self.distro2 = data_setup.create_distro()
+ session.flush()
+ self.distro1.lab_controller_assocs = [LabControllerDistro(
+ lab_controller=lab2
+ )
+ ]
+ self.distro2.lab_controller_assocs = [LabControllerDistro(
+ lab_controller=lab1
+ )
+ ]
+
+ # Create two systems but only put them in lab1.
+ system1 = data_setup.create_system()
+ system2 = data_setup.create_system()
+ system1.lab_controller = lab1
+ system2.lab_controller = lab1
+
+ # Create two jobs, one requiring distro1 and one requiring distro2
+ self.job1 = data_setup.create_job(whiteboard=u'job_1',
+ distro=self.distro1)
+ self.job2 = data_setup.create_job(whiteboard=u'job_2',
+ distro=self.distro2)
+ session.flush()
+
+ def test_invalid_system_distro_combo(self):
+ beakerd.new_recipes()
+ self.assertEqual(self.job1.status, TaskStatus.by_name(u'Aborted'))
+ self.assertEqual(self.job2.status, TaskStatus.by_name(u'Processed'))
+
+
+ def test_dead_recipes(self):
+ beakerd.new_recipes()
+ beakerd.processed_recipesets()
+ self.assertEqual(self.job2.status, TaskStatus.by_name(u'Queued'))
+ # Remove distro2 from lab1, should cause remaining recipe to abort.
+ for lab in self.distro2.lab_controllers[:]:
+ self.distro2.lab_controllers.remove(lab)
+ beakerd.dead_recipes()
+ self.assertEqual(self.job2.status, TaskStatus.by_name(u'Aborted'))
+
+
+ @classmethod
+ def teardownClass(cls):
+ pass
diff --git a/Server/bkr/server/tools/beakerd.py b/Server/bkr/server/tools/beakerd.py
index 7baa466..3ce6fe1 100755
--- a/Server/bkr/server/tools/beakerd.py
+++ b/Server/bkr/server/tools/beakerd.py
@@ -85,14 +85,34 @@ def new_recipes(*args):
try:
recipe = Recipe.by_id(_recipe.id)
if recipe.distro:
+ recipe.systems = []
+
+ # Do the query twice.
+
+ # First query verifies that the distro
+ # exists in at least one lab that has a macthing system.
systems = recipe.distro.systems_filter(
recipe.recipeset.job.owner,
- recipe.host_requires
+ recipe.host_requires,
+ join=['lab_controller',
+ '_distros',
+ 'distro'],
)
- recipe.systems = []
- for system in systems:
- # Add matched systems to recipe.
- recipe.systems.append(system)
+ # Second query picksup all possible systems so that as
+ # distros appear in other labs those systems will be
+ # available.
+ all_systems = recipe.distro.systems_filter(
+ recipe.recipeset.job.owner,
+ recipe.host_requires,
+ join=['lab_controller'],
+ )
+ # based on above queries, condition on systems but add
+ # all_systems.
+ if systems:
+ for system in all_systems:
+ # Add matched systems to recipe.
+ recipe.systems.append(system)
+
# If the recipe only matches one system then bump its priority.
if len(recipe.systems) == 1:
try:
@@ -233,6 +253,44 @@ def processed_recipesets(*args):
log.debug("Exiting processed_recipes routine")
return True
+def dead_recipes(*args):
+ recipes = Recipe.query()\
+ .join('status')\
+ .outerjoin(['systems'])\
+ .outerjoin(['distro',
+ 'lab_controller_assocs',
+ 'lab_controller'])\
+ .filter(
+ or_(
+ and_(Recipe.status==TaskStatus.by_name(u'Queued'),
+ System.id==None,
+ ),
+ and_(Recipe.status==TaskStatus.by_name(u'Queued'),
+ LabController.id==None,
+ ),
+ )
+ )
+
+ log.debug("Entering dead_recipes routine")
+ for _recipe in recipes:
+ session.begin()
+ try:
+ recipe = Recipe.by_id(_recipe.id)
+ if len(recipe.systems) == 0:
+ msg = "R:%s does not match any systems, aborting." % recipe.id
+ log.info(msg)
+ recipe.recipeset.abort(msg)
+ if len(recipe.distro.lab_controller_assocs) == 0:
+ msg = "R:%s does not have a valid distro, aborting." % recipe.id
+ log.info(msg)
+ recipe.recipeset.abort(msg)
+ session.commit()
+ except exceptions.Exception, e:
+ session.rollback()
+ log.exception("Failed to commit due to :%s" % e)
+ session.close()
+ log.debug("Exiting dead_recipes routine")
+
def queued_recipes(*args):
automated = SystemStatus.by_name(u'Automated')
recipes = Recipe.query()\
@@ -477,6 +535,7 @@ def schedule():
log.debug("starting scheduled recipes Thread")
# Run scheduled_recipes in this process
while True:
+ dead_recipes()
queued = queued_recipes()
scheduled = scheduled_recipes()
if not queued and not scheduled: