summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Styk <mastyk@redhat.com>2019-08-07 12:19:35 +0200
committerMartin Styk <mastyk@redhat.com>2019-08-08 05:06:36 +0000
commitca432fcb853b793fa83c440e26fd5a26a02cb210 (patch)
treeb0c40c26073f11939d664cefd3c873dbdb04e6cc
parente696e4a5c5e16742e70db54323b8f17caa8b4ded (diff)
Enable AMQ messaging in Beaker
Change-Id: I7048a66f46fac4363eae4a48fd23891932f7a853 Signed-off-by: Martin Styk <mastyk@redhat.com>
-rw-r--r--Server/bkr/server/messaging.py329
-rw-r--r--Server/bkr/server/model/scheduler.py1367
-rw-r--r--Server/dev.cfg9
-rw-r--r--Server/server.cfg8
-rw-r--r--beaker.spec4
5 files changed, 1075 insertions, 642 deletions
diff --git a/Server/bkr/server/messaging.py b/Server/bkr/server/messaging.py
new file mode 100644
index 0000000..54aaa13
--- /dev/null
+++ b/Server/bkr/server/messaging.py
@@ -0,0 +1,329 @@
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+
+"""
+Sending messages to the AMQ
+"""
+
+import json
+import logging
+import random
+
+from proton import Message, SSLDomain
+from proton.handlers import MessagingHandler
+from proton.reactor import Container
+# XXX replace turbogears with beaker prefixed flask when migration is done
+from turbogears.config import get
+
+log = logging.getLogger(__name__)
+
+# Taken from rhmsg
+class TimeoutHandler(MessagingHandler):
+ def __init__(self, url, conf, msgs, *args, **kws):
+ super(TimeoutHandler, self).__init__(*args, **kws)
+ self.url = url
+ self.conf = conf
+ self.msgs = msgs
+ self.pending = {}
+
+ def on_start(self, event):
+ log.debug('Container starting')
+ event.container.connected = False
+ event.container.error_msgs = []
+ if 'cert' in self.conf and 'key' in self.conf and 'cacert' in self.conf:
+ ssl = SSLDomain(SSLDomain.MODE_CLIENT)
+ ssl.set_credentials(self.conf['cert'], self.conf['key'], None)
+ ssl.set_trusted_ca_db(self.conf['cacert'])
+ ssl.set_peer_authentication(SSLDomain.VERIFY_PEER)
+ else:
+ ssl = None
+ log.debug('connecting to %s', self.url)
+ event.container.connect(url=self.url, reconnect=False, ssl_domain=ssl)
+ connect_timeout = self.conf['connect_timeout']
+ self.connect_task = event.container.schedule(connect_timeout, self)
+ send_timeout = self.conf['send_timeout']
+ self.timeout_task = event.container.schedule(send_timeout, self)
+
+ def on_timer_task(self, event):
+ if not event.container.connected:
+ log.error('not connected, stopping container')
+ if self.timeout_task:
+ self.timeout_task.cancel()
+ self.timeout_task = None
+ event.container.stop()
+ else:
+ # This should only run when called from the timeout task
+ log.error('send timeout expired with %s messages unsent, stopping container',
+ len(self.msgs))
+ event.container.stop()
+
+ def on_connection_opened(self, event):
+ event.container.connected = True
+ self.connect_task.cancel()
+ self.connect_task = None
+ log.debug('connection to %s opened successfully', event.connection.hostname)
+ self.send_msgs(event)
+
+ def on_connection_closed(self, event):
+ log.debug('disconnected from %s', event.connection.hostname)
+
+ def send_msgs(self, event):
+ sender = event.container.create_sender(event.connection, target=self.conf['address'])
+ for msg in self.msgs:
+ delivery = sender.send(msg)
+ log.debug('sent msg: %s', msg.properties)
+ self.pending[delivery] = msg
+ sender.close()
+
+ def update_pending(self, event):
+ msg = self.pending[event.delivery]
+ del self.pending[event.delivery]
+ log.debug('removed message from self.pending: %s', msg.properties)
+ if not self.pending:
+ if self.msgs:
+ log.error('%s messages unsent (rejected or released)', len(self.msgs))
+ else:
+ log.debug('all messages sent successfully')
+ if self.timeout_task:
+ log.debug('canceling timeout task')
+ self.timeout_task.cancel()
+ self.timeout_task = None
+ log.debug('closing connection to %s', event.connection.hostname)
+ event.connection.close()
+
+ def on_settled(self, event):
+ msg = self.pending[event.delivery]
+ self.msgs.remove(msg)
+ log.debug('removed message from self.msgs: %s', msg.properties)
+ self.update_pending(event)
+
+ def on_rejected(self, event):
+ msg = self.pending[event.delivery]
+ log.error('message was rejected: %s', msg.properties)
+ self.update_pending(event)
+
+ def on_released(self, event):
+ msg = self.pending[event.delivery]
+ log.error('message was released: %s', msg.properties)
+ self.update_pending(event)
+
+ def on_transport_tail_closed(self, event):
+ if self.connect_task:
+ log.debug('canceling connect timer')
+ self.connect_task.cancel()
+ self.connect_task = None
+ if self.timeout_task:
+ log.debug('canceling send timer')
+ self.timeout_task.cancel()
+ self.timeout_task = None
+
+ def handle_error(self, objtype, event, level=logging.ERROR):
+ endpoint = getattr(event, objtype, None)
+ condition = (getattr(endpoint, 'remote_condition', None)
+ or getattr(endpoint, 'condition', None))
+ if condition:
+ name = condition.name
+ desc = condition.description
+ log.log(level, '%s error: %s: %s', objtype, name, desc)
+ else:
+ name = '{0} error'.format(objtype)
+ desc = 'unspecified'
+ log.log(level, 'unspecified %s error', objtype)
+ event.container.error_msgs.append((self.url, name, desc))
+
+ def on_connection_error(self, event):
+ self.handle_error('connection', event)
+
+ def on_session_error(self, event):
+ self.handle_error('session', event)
+
+ def on_link_error(self, event):
+ self.handle_error('link', event)
+ log.error('closing connection to: %s', event.connection.hostname)
+ event.connection.close()
+
+ def on_transport_error(self, event):
+ """
+ Implement this handler with the same logic as the default handler in
+ MessagingHandler, but log to our logger at INFO level, instead of the
+ root logger with WARNING level.
+ """
+ self.handle_error('transport', event, level=logging.INFO)
+ if (event.transport
+ and event.transport.condition
+ and event.transport.condition.name in self.fatal_conditions):
+ log.error('closing connection to: %s', event.connection.hostname)
+ event.connection.close()
+
+
+class AMQProducer(object):
+
+ def __init__(self, host=None, port=None,
+ urls=None,
+ certificate=None, private_key=None,
+ trusted_certificates=None,
+ topic=None,
+ timeout=60):
+ if isinstance(urls, (list, tuple)):
+ pass
+ elif urls:
+ urls = [urls]
+ elif host:
+ urls = ['amqps://{0}:{1}'.format(host, port or 5671)]
+ else:
+ raise RuntimeError('either host or urls must be specified')
+ self.urls = urls
+ self.conf = {
+ 'cert': certificate,
+ 'key': private_key,
+ 'cacert': trusted_certificates,
+ 'connect_timeout': timeout,
+ 'send_timeout': timeout
+ }
+ if topic:
+ self.through_topic(topic)
+ else:
+ self.address = None
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ pass
+
+ def through_topic(self, address):
+ self.address = self.build_address('topic', address)
+
+ @staticmethod
+ def build_address(channel, address):
+ return '{0}://{1}'.format(channel, address)
+
+ def _send_all(self, messages):
+ messages = list(messages)
+ errors = []
+ for url in sorted(self.urls, key=lambda k: random.random()):
+ container = Container(TimeoutHandler(url, self.conf, messages))
+ container.run()
+ errors.extend(container.error_msgs)
+ if not messages:
+ break
+ else:
+ error_strs = ['{0}: {1}: {2}'.format(*e) for e in errors]
+ raise RuntimeError('could not send {0} message{1} to any destinations, '
+ 'errors:\n{2}'.format(len(messages),
+ 's' if len(messages) != 1 else '',
+ '\n'.join(error_strs)))
+
+ def send(self, *messages):
+ """
+ Send a list of messages.
+
+ Each argument is a proton.Message.
+ """
+ assert self.address, 'Must call through_queue or through_topic in advance.'
+
+ self.conf['address'] = self.address
+ self._send_all(messages)
+
+ def _build_msg(self, props, body, attrs=None):
+ """
+ Build and return a proton.Message.
+
+ Arguments:
+ props (dict): Message properties
+ body (object): Message body
+ attrs (dict): Attributes to set on the message.
+ """
+ msg = Message(properties=props, body=body)
+ if attrs:
+ for name, value in attrs.items():
+ setattr(msg, name, value)
+ return msg
+
+ def send_msg(self, props, body, **kws):
+ """
+ Send a single message.
+
+ Arguments:
+ props (dict): Message properties
+ body (str): Message body. Should be utf-8 encoded text.
+
+ Any keyword arguments will be treated as attributes to set on the
+ underlying Message.
+ """
+ msg = self._build_msg(props, body, kws)
+ self.send(msg)
+
+ def send_msgs(self, messages):
+ """
+ Send a list of messages.
+
+ Arguments:
+ messages (list): A list of 2-element lists/tuples.
+ tuple[0]: A dict of message headers.
+ tuple[1]: Message body. Should be utf-8 encoded text.
+
+ If the tuple has a third element, it is treated as a dict containing
+ attributes to be set on the underlying Message.
+ """
+ msgs = []
+ for message in messages:
+ msgs.append(self._build_msg(*message))
+ self.send(*msgs)
+
+
+class BeakerMessenger(object):
+ __instance = None
+
+ def __new__(cls, *args, **kwargs):
+ if not cls.__instance:
+ cls.__instance = BeakerMessenger.__BeakerMessenger()
+ return cls.__instance
+
+ class __BeakerMessenger:
+ def __init__(self):
+ url = get('amq.url')
+ cert = get('amq.cert')
+ key = get('amq.key')
+ cacerts = get('amq.cacerts')
+ topic = get('amq.topic_prefix')
+ self.producer = AMQProducer(urls=url,
+ certificate=cert,
+ private_key=key,
+ trusted_certificates=cacerts,
+ topic=topic)
+
+ def send(self, header, body):
+ try:
+ self.producer.send_msg(header, body)
+ except RuntimeError as e:
+ log.exception(e)
+
+
+def _messenger_enabled():
+ if _messenger_enabled.res is False:
+ _messenger_enabled.res = bool(
+ get('amq.url')
+ and get('amq.cert')
+ and get('amq.key')
+ and get('amq.cacerts')
+ )
+ return _messenger_enabled.res
+
+
+_messenger_enabled.res = False
+
+
+def send_scheduler_update(obj):
+ if not _messenger_enabled():
+ return
+
+ data = obj.minimal_json_content()
+ _send_payload(obj.task_info(), data)
+
+
+def _send_payload(header, body):
+ bkr_msg = BeakerMessenger()
+ bkr_msg.send(header, json.dumps(body, default=str)) # pylint: disable=no-member
diff --git a/Server/bkr/server/model/scheduler.py b/Server/bkr/server/model/scheduler.py
index 8b4f76b..d39329a 100644
--- a/Server/bkr/server/model/scheduler.py
+++ b/Server/bkr/server/model/scheduler.py
@@ -1,79 +1,81 @@
-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
-import os.path
+import crypt
+import decimal
import logging
-from datetime import datetime, timedelta
-from copy import copy
-from itertools import chain
+import numbers
+import os.path
+import random
import re
import shutil
import string
-import random
-import crypt
+import urllib
+import urlparse
+import uuid
import xml.dom.minidom
from collections import defaultdict
-import uuid
-import urlparse
-import urllib
+from datetime import datetime, timedelta
+from itertools import chain
+
import netaddr
-import numbers
-import decimal
from kid import Element
+from lxml import etree
+from lxml.builder import E
from sqlalchemy import (Table, Column, ForeignKey, UniqueConstraint, Index,
- Integer, Unicode, DateTime, Boolean, UnicodeText, String, Numeric,
- event)
-from sqlalchemy.sql import select, union, and_, or_, not_, func, literal, exists, delete
+ Integer, Unicode, DateTime, Boolean, UnicodeText, String, Numeric,
+ event)
from sqlalchemy.exc import InvalidRequestError
-from sqlalchemy.orm import (mapper, relationship, object_mapper,
- dynamic_loader, validates, synonym, contains_eager, aliased)
-from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.ext.associationproxy import association_proxy
+from sqlalchemy.orm import (relationship, object_mapper,
+ dynamic_loader, validates, synonym, contains_eager, aliased)
+from sqlalchemy.orm.exc import NoResultFound
+from sqlalchemy.sql import select, union, and_, or_, not_, func, literal, exists, delete
from turbogears import url
from turbogears.config import get
from turbogears.database import session
-from lxml import etree
-from lxml.builder import E
+
from bkr.common.helpers import makedirs_ignore, total_seconds
from bkr.server import identity, metrics, mail
-from bkr.server.bexceptions import BX, BeakerException, StaleTaskStatusException, DatabaseLookupError
+from bkr.server.bexceptions import BX, BeakerException, StaleTaskStatusException
from bkr.server.helpers import make_link, make_fake_link
from bkr.server.hybrid import hybrid_method, hybrid_property
from bkr.server.installopts import InstallOptions
+from bkr.server.messaging import send_scheduler_update
from bkr.server.util import absolute_url
-from .types import (UUID, MACAddress, IPAddress, TaskResult, TaskStatus, TaskPriority,
- ResourceType, RecipeVirtStatus, mac_unix_padded_dialect, SystemStatus,
- RecipeReservationCondition, SystemSchedulerStatus, ImageType)
-from .base import DeclarativeMappedObject
from .activity import Activity, ActivityMixin
+from .base import DeclarativeMappedObject
+from .distrolibrary import (OSMajor, OSVersion, Distro, DistroTree,
+ LabControllerDistroTree, install_options_for_distro, KernelType)
from .identity import User, Group
+from .installation import Installation
+from .inventory import System, Reservation
from .lab import LabController
-from .distrolibrary import (OSMajor, OSVersion, Distro, DistroTree,
- LabControllerDistroTree, install_options_for_distro, KernelType)
from .tasklibrary import Task, TaskPackage
-from .inventory import System, SystemActivity, Reservation
-from .installation import Installation
+from .types import (UUID, MACAddress, IPAddress, TaskResult, TaskStatus, TaskPriority,
+ ResourceType, RecipeVirtStatus, mac_unix_padded_dialect, SystemStatus,
+ RecipeReservationCondition, SystemSchedulerStatus, ImageType)
log = logging.getLogger(__name__)
xmldoc = xml.dom.minidom.Document()
+
def node(element, value):
node = etree.Element(element)
node.text = value
return node
-class RecipeActivity(Activity):
+class RecipeActivity(Activity):
__tablename__ = 'recipe_activity'
__table_args__ = {'mysql_engine': 'InnoDB'}
id = Column(Integer, ForeignKey('activity.id',
- name='recipe_activity_id_fk'), primary_key=True)
+ name='recipe_activity_id_fk'), primary_key=True)
recipe_id = Column(Integer, ForeignKey('recipe.id',
- name='recipe_activity_recipe_id_fk'), nullable=False)
+ name='recipe_activity_recipe_id_fk'), nullable=False)
object_id = synonym('recipe_id')
object = relationship('Recipe', back_populates='activity')
__mapper_args__ = {'polymorphic_identity': u'recipe_activity'}
@@ -96,7 +98,6 @@ class RecipeActivity(Activity):
class RecipeSetActivity(Activity):
-
__tablename__ = 'recipeset_activity'
__table_args__ = {'mysql_engine': 'InnoDB'}
id = Column(Integer, ForeignKey('activity.id'), primary_key=True)
@@ -120,58 +121,64 @@ class RecipeSetActivity(Activity):
}
return result
-machine_guest_map =Table('machine_guest_map', DeclarativeMappedObject.metadata,
- Column('machine_recipe_id', Integer,
- ForeignKey('machine_recipe.id', onupdate='CASCADE', ondelete='CASCADE'),
- primary_key=True),
- Column('guest_recipe_id', Integer,
- ForeignKey('recipe.id', onupdate='CASCADE', ondelete='CASCADE'),
- primary_key=True),
- mysql_engine='InnoDB',
-)
-system_recipe_map = Table('system_recipe_map', DeclarativeMappedObject.metadata,
- Column('system_id', Integer,
- ForeignKey('system.id', onupdate='CASCADE', ondelete='CASCADE'),
- primary_key=True),
- Column('recipe_id', Integer,
- ForeignKey('recipe.id', onupdate='CASCADE', ondelete='CASCADE'),
- primary_key=True),
- mysql_engine='InnoDB',
-)
-
-system_hardware_scan_recipe_map = Table('system_hardware_scan_recipe_map', DeclarativeMappedObject.metadata,
- Column('system_id', Integer,
- ForeignKey('system.id', onupdate='CASCADE', ondelete='CASCADE'),
- primary_key=True),
- Column('recipe_id', Integer,
- ForeignKey('recipe.id', onupdate='CASCADE', ondelete='CASCADE'),
- primary_key=True),
- mysql_engine='InnoDB',
-)
+machine_guest_map = Table('machine_guest_map', DeclarativeMappedObject.metadata,
+ Column('machine_recipe_id', Integer,
+ ForeignKey('machine_recipe.id', onupdate='CASCADE',
+ ondelete='CASCADE'),
+ primary_key=True),
+ Column('guest_recipe_id', Integer,
+ ForeignKey('recipe.id', onupdate='CASCADE', ondelete='CASCADE'),
+ primary_key=True),
+ mysql_engine='InnoDB',
+ )
+system_recipe_map = Table('system_recipe_map', DeclarativeMappedObject.metadata,
+ Column('system_id', Integer,
+ ForeignKey('system.id', onupdate='CASCADE', ondelete='CASCADE'),
+ primary_key=True),
+ Column('recipe_id', Integer,
+ ForeignKey('recipe.id', onupdate='CASCADE', ondelete='CASCADE'),
+ primary_key=True),
+ mysql_engine='InnoDB',
+ )
+
+system_hardware_scan_recipe_map = Table('system_hardware_scan_recipe_map',
+ DeclarativeMappedObject.metadata,
+ Column('system_id', Integer,
+ ForeignKey('system.id', onupdate='CASCADE',
+ ondelete='CASCADE'),
+ primary_key=True),
+ Column('recipe_id', Integer,
+ ForeignKey('recipe.id', onupdate='CASCADE',
+ ondelete='CASCADE'),
+ primary_key=True),
+ mysql_engine='InnoDB',
+ )
recipe_tag_map = Table('recipe_tag_map', DeclarativeMappedObject.metadata,
- Column('tag_id', Integer,
- ForeignKey('recipe_tag.id', onupdate='CASCADE', ondelete='CASCADE'),
- primary_key=True),
- Column('recipe_id', Integer,
- ForeignKey('recipe.id', onupdate='CASCADE', ondelete='CASCADE'),
- primary_key=True),
- mysql_engine='InnoDB',
-)
+ Column('tag_id', Integer,
+ ForeignKey('recipe_tag.id', onupdate='CASCADE', ondelete='CASCADE'),
+ primary_key=True),
+ Column('recipe_id', Integer,
+ ForeignKey('recipe.id', onupdate='CASCADE', ondelete='CASCADE'),
+ primary_key=True),
+ mysql_engine='InnoDB',
+ )
task_packages_custom_map = Table('task_packages_custom_map', DeclarativeMappedObject.metadata,
- Column('recipe_id', Integer, ForeignKey('recipe.id', onupdate='CASCADE',
- ondelete='CASCADE'), primary_key=True),
- Column('package_id', Integer, ForeignKey('task_package.id',
- onupdate='CASCADE', ondelete='CASCADE'), primary_key=True),
- mysql_engine='InnoDB',
-)
+ Column('recipe_id', Integer,
+ ForeignKey('recipe.id', onupdate='CASCADE',
+ ondelete='CASCADE'), primary_key=True),
+ Column('package_id', Integer, ForeignKey('task_package.id',
+ onupdate='CASCADE',
+ ondelete='CASCADE'),
+ primary_key=True),
+ mysql_engine='InnoDB',
+ )
class JobActivity(Activity):
-
__tablename__ = 'job_activity'
__table_args__ = {'mysql_engine': 'InnoDB'}
id = Column(Integer, ForeignKey('activity.id'), primary_key=True)
@@ -216,11 +223,11 @@ class Watchdog(DeclarativeMappedObject):
Returns a list of all watchdog entries that are either active
or expired for this lab controller.
- A recipe is only returned as "expired" if all the recipes in the recipe
- set have expired. Similarly, a recipe is returned as "active" so long
- as any recipe in the recipe set is still active. Some tasks rely on
- this behaviour. In particular, the host recipe in virt testing will
- finish while its guests are still running, but we want to keep
+ A recipe is only returned as "expired" if all the recipes in the recipe
+ set have expired. Similarly, a recipe is returned as "active" so long
+ as any recipe in the recipe set is still active. Some tasks rely on
+ this behaviour. In particular, the host recipe in virt testing will
+ finish while its guests are still running, but we want to keep
monitoring the host's console log in case of a panic.
XXX Note that queries returned by this method do not function correctly
@@ -230,20 +237,21 @@ class Watchdog(DeclarativeMappedObject):
in 0.8.3 (and perhaps versions in between, although currently untested).
"""
any_recipe_has_active_watchdog = exists(select([1],
- from_obj=Recipe.__table__.join(Watchdog.__table__))\
- .where(Watchdog.kill_time > datetime.utcnow())\
- .where(Recipe.recipe_set_id == RecipeSet.id)\
- .correlate(RecipeSet.__table__))
- watchdog_query = cls.query.join(Watchdog.recipe).join(Recipe.recipeset)\
- .filter(Watchdog.kill_time != None)
+ from_obj=Recipe.__table__.join(
+ Watchdog.__table__)) \
+ .where(Watchdog.kill_time > datetime.utcnow()) \
+ .where(Recipe.recipe_set_id == RecipeSet.id) \
+ .correlate(RecipeSet.__table__))
+ watchdog_query = cls.query.join(Watchdog.recipe).join(Recipe.recipeset) \
+ .filter(Watchdog.kill_time != None)
if labcontroller is not None:
watchdog_query = watchdog_query.filter(
- RecipeSet.lab_controller==labcontroller)
+ RecipeSet.lab_controller == labcontroller)
if status == 'active':
watchdog_query = watchdog_query.filter(any_recipe_has_active_watchdog)
elif status == 'expired':
- watchdog_query = watchdog_query.join(RecipeSet.job)\
- .filter(not_(Job.is_dirty))\
+ watchdog_query = watchdog_query.join(RecipeSet.job) \
+ .filter(not_(Job.is_dirty)) \
.filter(not_(any_recipe_has_active_watchdog))
else:
return None
@@ -251,7 +259,7 @@ class Watchdog(DeclarativeMappedObject):
def __repr__(self):
return '%s(id=%r, recipe_id=%r, kill_time=%r)' % (self.__class__.__name__,
- self.id, self.recipe_id, self.kill_time)
+ self.id, self.recipe_id, self.kill_time)
class Log(DeclarativeMappedObject):
@@ -259,18 +267,18 @@ class Log(DeclarativeMappedObject):
# Log tables all have the following fields:
# path
- # Subdirectory of this log, relative to the root of the recipe/RT/RTR.
- # Probably won't have an initial or trailing slash, but I wouldn't bet on
- # it. ;-) Notably, the value '/' is used (rather than the empty string)
+ # Subdirectory of this log, relative to the root of the recipe/RT/RTR.
+ # Probably won't have an initial or trailing slash, but I wouldn't bet on
+ # it. ;-) Notably, the value '/' is used (rather than the empty string)
# to represent no subdirectory.
# filename
# Filename of this log.
# server
- # Absolute URL to the directory where the log is stored. Path and
+ # Absolute URL to the directory where the log is stored. Path and
# filename are relative to this.
# Always NULL if log transferring is not enabled (CACHE=False).
# basepath
- # Absolute filesystem path to the directory where the log is stored on
+ # Absolute filesystem path to the directory where the log is stored on
# the remote system. XXX we shouldn't need to store this!
# Always NULL if log transferring is not enabled (CACHE=False).
@@ -286,10 +294,10 @@ class Log(DeclarativeMappedObject):
@staticmethod
def _normalized_path(path):
"""
- We need to normalize the `path` attribute *before* storing it, so that
- we don't end up with duplicate rows that point to equivalent filesystem
+ We need to normalize the `path` attribute *before* storing it, so that
+ we don't end up with duplicate rows that point to equivalent filesystem
paths (bug 865265).
- Also by convention we use '/' rather than empty string to mean "no
+ Also by convention we use '/' rather than empty string to mean "no
subdirectory". It's all a bit weird...
"""
return re.sub(r'/+', '/', path or u'') or u'/'
@@ -309,8 +317,8 @@ class Log(DeclarativeMappedObject):
def __repr__(self):
return '%s(path=%r, filename=%r, server=%r, basepath=%r)' % (
- self.__class__.__name__, self.path, self.filename,
- self.server, self.basepath)
+ self.__class__.__name__, self.path, self.filename,
+ self.server, self.basepath)
def result(self):
return self.parent.result
@@ -330,19 +338,19 @@ class Log(DeclarativeMappedObject):
@property
def full_path(self):
"""
- Returns an absolute URL to the log if it's stored remotely, or an
+ Returns an absolute URL to the log if it's stored remotely, or an
absolute filesystem path if it's stored locally.
"""
if self.server:
return self.absolute_url
else:
return os.path.join(self.parent.logspath, self.parent.filepath,
- self.combined_path)
+ self.combined_path)
@property
def absolute_url(self):
if self.server:
- # self.server points at a directory so it should end in
+ # self.server points at a directory so it should end in
# a trailing slash, but older versions of the code didn't do that
url = self.server
if not url.endswith('/'):
@@ -350,7 +358,7 @@ class Log(DeclarativeMappedObject):
return '%s%s' % (url, self.combined_path)
else:
return urlparse.urljoin(absolute_url('/'),
- os.path.join('/logs', self.parent.filepath, self.combined_path))
+ os.path.join('/logs', self.parent.filepath, self.combined_path))
@property
def link(self):
@@ -362,30 +370,30 @@ class Log(DeclarativeMappedObject):
def dict(self):
""" Return a dict describing this log
"""
- return dict( server = self.server,
- path = self.path,
- filename = self.filename,
- tid = '%s:%s' % (self.type, self.id),
- filepath = self.parent.filepath,
- basepath = self.basepath,
- url = absolute_url(self.href),
- )
+ return dict(server=self.server,
+ path=self.path,
+ filename=self.filename,
+ tid='%s:%s' % (self.type, self.id),
+ filepath=self.parent.filepath,
+ basepath=self.basepath,
+ url=absolute_url(self.href),
+ )
def to_xml(self):
return E.log(name=self.combined_path, href=absolute_url(self.href))
@classmethod
- def by_id(cls,id):
+ def by_id(cls, id):
return cls.query.filter_by(id=id).one()
def __cmp__(self, other):
""" Used to compare logs that are already stored. Log(path,filename) in Recipe.logs == True
"""
- if hasattr(other,'path'):
+ if hasattr(other, 'path'):
path = other.path
- if hasattr(other,'filename'):
+ if hasattr(other, 'filename'):
filename = other.filename
- if "%s/%s" % (self.path,self.filename) == "%s/%s" % (path,filename):
+ if "%s/%s" % (self.path, self.filename) == "%s/%s" % (path, filename):
return 0
else:
return 1
@@ -398,6 +406,7 @@ class Log(DeclarativeMappedObject):
'href': url(self.href),
}
+
class LogRecipe(Log):
type = 'R'
@@ -411,6 +420,7 @@ class LogRecipe(Log):
def href(self):
return '/recipes/%s/logs/%s' % (self.parent.id, self.combined_path)
+
class LogRecipeTask(Log):
type = 'T'
@@ -418,13 +428,14 @@ class LogRecipeTask(Log):
__table_args__ = {'mysql_engine': 'InnoDB'}
# common column definitions are inherited from Log
recipe_task_id = Column(Integer, ForeignKey('recipe_task.id'),
- nullable=False)
+ nullable=False)
parent = relationship('RecipeTask', back_populates='logs')
@property
def href(self):
return '/recipes/%s/tasks/%s/logs/%s' % (self.parent.recipe.id,
- self.parent.id, self.combined_path)
+ self.parent.id, self.combined_path)
+
class LogRecipeTaskResult(Log):
type = 'E'
@@ -433,23 +444,24 @@ class LogRecipeTaskResult(Log):
__table_args__ = {'mysql_engine': 'InnoDB'}
# common column definitions are inherited from Log
recipe_task_result_id = Column(Integer,
- ForeignKey('recipe_task_result.id'), nullable=False)
+ ForeignKey('recipe_task_result.id'), nullable=False)
parent = relationship('RecipeTaskResult', back_populates='logs')
@property
def href(self):
return '/recipes/%s/tasks/%s/results/%s/logs/%s' % (
- self.parent.recipetask.recipe.id, self.parent.recipetask.id,
- self.parent.id, self.combined_path)
+ self.parent.recipetask.recipe.id, self.parent.recipetask.id,
+ self.parent.id, self.combined_path)
+
class TaskBase(DeclarativeMappedObject):
__abstract__ = True
- t_id_types = dict(T = 'RecipeTask',
- TR = 'RecipeTaskResult',
- R = 'Recipe',
- RS = 'RecipeSet',
- J = 'Job')
+ t_id_types = dict(T='RecipeTask',
+ TR='RecipeTaskResult',
+ R='Recipe',
+ RS='RecipeSet',
+ J='Job')
# Defined on subclasses
id = 0
@@ -472,7 +484,7 @@ class TaskBase(DeclarativeMappedObject):
Return an TaskBase object by it's shorthand i.e 'J:xx, RS:xx'
"""
# Keep Client/doc/bkr.rst in sync with this
- task_type,id = t_id.split(":")
+ task_type, id = t_id.split(":")
try:
class_str = cls.t_id_types[task_type]
except KeyError:
@@ -481,7 +493,7 @@ class TaskBase(DeclarativeMappedObject):
class_ref = globals()[class_str]
try:
obj_ref = class_ref.by_id(id)
- except InvalidRequestError, e:
+ except InvalidRequestError:
raise BeakerException(_('%s is not a valid %s id' % (id, class_str)))
return obj_ref
@@ -491,25 +503,25 @@ class TaskBase(DeclarativeMappedObject):
_change_status will update the status if needed
Returns True when status is changed
"""
- current_status = self.status #pylint: disable=E0203
+ current_status = self.status # pylint: disable=E0203
if current_status != new_status:
# Sanity check to make sure the status never goes backwards.
- if isinstance(self, (Recipe, RecipeTask)) and \
- ((new_status.queued and not current_status.queued) or \
- (not new_status.finished and current_status.finished)):
+ if (isinstance(self, (Recipe, RecipeTask))
+ and ((new_status.queued and not current_status.queued)
+ or (not new_status.finished and current_status.finished))):
raise ValueError('Invalid state transition for %s: %s -> %s'
- % (self.t_id, current_status, new_status))
- # Use a conditional UPDATE to make sure we are really working from
+ % (self.t_id, current_status, new_status))
+ # Use a conditional UPDATE to make sure we are really working from
# the latest database state.
- # The .base_mapper bit here is so we can get from MachineRecipe to
- # Recipe, which is needed due to the limitations of .update()
- if session.query(object_mapper(self).base_mapper)\
- .filter_by(id=self.id, status=current_status)\
+ # The .base_mapper bit here is so we can get from MachineRecipe to
+ # Recipe, which is needed due to the limitations of .update()
+ if session.query(object_mapper(self).base_mapper) \
+ .filter_by(id=self.id, status=current_status) \
.update({'status': new_status}, synchronize_session=False) \
!= 1:
raise StaleTaskStatusException(
- 'Status for %s updated in another transaction'
- % self.t_id)
+ 'Status for %s updated in another transaction'
+ % self.t_id)
# update the ORM session state as well
self.status = new_status
return True
@@ -524,7 +536,7 @@ class TaskBase(DeclarativeMappedObject):
return self.status.finished
@is_finished.expression
- def is_finished(cls): #pylint: disable=E0213
+ def is_finished(cls): # pylint: disable=E0213
return cls.status.in_([status for status in TaskStatus if status.finished])
def is_queued(self):
@@ -541,8 +553,9 @@ class TaskBase(DeclarativeMappedObject):
return (self.result in [TaskResult.warn,
TaskResult.fail,
TaskResult.panic])
+
@is_failed.expression
- def is_failed(cls): #pylint: disable=E0213
+ def is_failed(cls): # pylint: disable=E0213
"""
Return SQL expression that is true if the task has failed
"""
@@ -557,7 +570,7 @@ class TaskBase(DeclarativeMappedObject):
return all([task.status == TaskStatus.aborted for task in self.tasks])
@is_suspiciously_aborted.expression
- def is_suspiciously_aborted(cls): # pylint: disable=E0213
+ def is_suspiciously_aborted(cls): # pylint: disable=E0213
"""Returns an SQL expression evaluating to TRUE if the recipe abort was
suspicious. Note: There is no 'ALL' operator in SQL to get rid of the
double negation."""
@@ -611,11 +624,11 @@ class TaskBase(DeclarativeMappedObject):
container.append(div)
return container
-
def t_id(self):
for t, class_ in self.t_id_types.iteritems():
if self.__class__.__name__ == class_:
return '%s:%s' % (t, self.id)
+
t_id = property(t_id)
@@ -630,24 +643,24 @@ class Job(TaskBase, ActivityMixin):
is_dirty = Column(Boolean, nullable=False, index=True)
owner_id = Column(Integer, ForeignKey('tg_user.user_id'), index=True)
owner = relationship(User, back_populates='jobs',
- primaryjoin=owner_id == User.user_id)
+ primaryjoin=owner_id == User.user_id)
submitter_id = Column(Integer,
- ForeignKey('tg_user.user_id', name='job_submitter_id_fk'))
+ ForeignKey('tg_user.user_id', name='job_submitter_id_fk'))
submitter = relationship(User, primaryjoin=submitter_id == User.user_id)
group_id = Column(Integer,
- ForeignKey('tg_group.group_id', name='job_group_id_fk'))
+ ForeignKey('tg_group.group_id', name='job_group_id_fk'))
group = relationship(Group, back_populates='jobs')
whiteboard = Column(Unicode(2000))
extra_xml = Column(UnicodeText, nullable=True)
retention_tag_id = Column(Integer, ForeignKey('retention_tag.id'),
- nullable=False)
+ nullable=False)
retention_tag = relationship('RetentionTag', back_populates='jobs')
product_id = Column(Integer, ForeignKey('product.id'), nullable=True)
product = relationship('Product', back_populates='jobs')
result = Column(TaskResult.db_type(), nullable=False,
- default=TaskResult.new, index=True)
+ default=TaskResult.new, index=True)
status = Column(TaskStatus.db_type(), nullable=False,
- default=TaskStatus.new, index=True)
+ default=TaskStatus.new, index=True)
purged = Column(DateTime, default=None, index=True)
deleted = Column(DateTime, default=None, index=True)
# Total tasks
@@ -664,15 +677,15 @@ class Job(TaskBase, ActivityMixin):
ktasks = Column(Integer, default=0)
recipesets = relationship('RecipeSet', back_populates='job')
_job_ccs = relationship('JobCc', back_populates='job',
- cascade='all, delete-orphan')
+ cascade='all, delete-orphan')
activity = relationship(JobActivity, back_populates='object',
- cascade='all, delete-orphan',
- order_by=[JobActivity.__table__.c.id.desc()])
+ cascade='all, delete-orphan',
+ order_by=[JobActivity.__table__.c.id.desc()])
activity_type = JobActivity
def __init__(self, ttasks=0, owner=None, whiteboard=None,
- retention_tag=None, product=None, group=None, submitter=None):
+ retention_tag=None, product=None, group=None, submitter=None):
super(Job, self).__init__()
self.ttasks = ttasks
self.owner = owner
@@ -686,17 +699,40 @@ class Job(TaskBase, ActivityMixin):
self.product = product
self.is_dirty = False
- stop_types = ['abort','cancel']
+ stop_types = ['abort', 'cancel']
max_by_whiteboard = 20
def __json__(self):
return self.to_json()
def to_json(self, include_recipesets=True):
- data = {
+ data = self.minimal_json_content()
+ if identity.current.user:
+ u = identity.current.user
+ data['can_change_retention_tag'] = self.can_change_retention_tag(u)
+ if data['can_change_retention_tag']:
+ data['possible_retention_tags'] = RetentionTag.query.all()
+ data['can_change_product'] = self.can_change_product(u)
+ if data['can_change_product']:
+ data['possible_products'] = Product.query.all()
+ data['can_cancel'] = self.can_cancel(u)
+ data['can_delete'] = self.can_delete(u)
+ data['can_edit'] = self.can_edit(u)
+ else:
+ data['can_change_retention_tag'] = False
+ data['can_change_product'] = False
+ data['can_cancel'] = False
+ data['can_delete'] = False
+ data['can_edit'] = False
+ if include_recipesets:
+ data['recipesets'] = self.recipesets
+ return data
+
+ def minimal_json_content(self):
+ return {
'id': self.id,
't_id': self.t_id,
- 'submitter': self.submitter or self.owner, # submitter may be NULL prior to Beaker 0.14
+ 'submitter': self.submitter or self.owner, # submitter may be NULL prior to Beaker 0.14
'owner': self.owner,
'group': self.group,
'status': self.status,
@@ -717,43 +753,23 @@ class Job(TaskBase, ActivityMixin):
'recipe_count': self.recipe_count,
'clone_href': self.clone_link(),
}
- if identity.current.user:
- u = identity.current.user
- data['can_change_retention_tag'] = self.can_change_retention_tag(u)
- if data['can_change_retention_tag']:
- data['possible_retention_tags'] = RetentionTag.query.all()
- data['can_change_product'] = self.can_change_product(u)
- if data['can_change_product']:
- data['possible_products'] = Product.query.all()
- data['can_cancel'] = self.can_cancel(u)
- data['can_delete'] = self.can_delete(u)
- data['can_edit'] = self.can_edit(u)
- else:
- data['can_change_retention_tag'] = False
- data['can_change_product'] = False
- data['can_cancel'] = False
- data['can_delete'] = False
- data['can_edit'] = False
- if include_recipesets:
- data['recipesets'] = self.recipesets
- return data
@classmethod
def mine(cls, owner):
"""
Returns a query of all jobs which are owned by the given user.
"""
- return cls.query.filter(or_(Job.owner==owner, Job.submitter==owner))
+ return cls.query.filter(or_(Job.owner == owner, Job.submitter == owner))
@classmethod
def my_groups(cls, owner):
"""
- ... as in, "my groups' jobs". Returns a query of all jobs which were
+ ... as in, "my groups' jobs". Returns a query of all jobs which were
submitted for any of the given user's groups.
"""
if owner.groups:
- return cls.query.outerjoin(Job.group)\
- .filter(Group.group_id.in_([g.group_id for g in owner.groups]))
+ return cls.query.outerjoin(Job.group) \
+ .filter(Group.group_id.in_([g.group_id for g in owner.groups]))
else:
return cls.query.filter(literal(False))
@@ -762,8 +778,8 @@ class Job(TaskBase, ActivityMixin):
"""
Returns True if all recipes in this job finished more than *n* days ago.
"""
- # Note that Recipe.finish_time may be null even for a finished recipe,
- # that indicates it was cancelled/aborted before it ever started. In
+ # Note that Recipe.finish_time may be null even for a finished recipe,
+ # that indicates it was cancelled/aborted before it ever started. In
# that case we look at RecipeSet.queue_time instead.
if not self.is_finished():
return False
@@ -776,15 +792,16 @@ class Job(TaskBase, ActivityMixin):
return True
@completed_n_days_ago.expression
- def completed_n_days_ago(cls, n): #pylint: disable=E0213
- # We let cutoff be computed on the SQL side, in case n is also a SQL
+ def completed_n_days_ago(cls, n): # pylint: disable=E0213
+ # We let cutoff be computed on the SQL side, in case n is also a SQL
# expression and not a constant.
cutoff = func.adddate(func.utc_timestamp(), -n)
return and_(
cls.is_finished(),
not_(exists([1], from_obj=Recipe.__table__.join(RecipeSet.__table__),
- whereclause=and_(RecipeSet.job_id == Job.id,
- func.coalesce(Recipe.finish_time, RecipeSet.queue_time) >= cutoff)))
+ whereclause=and_(RecipeSet.job_id == Job.id,
+ func.coalesce(Recipe.finish_time,
+ RecipeSet.queue_time) >= cutoff)))
)
@hybrid_property
@@ -803,16 +820,16 @@ class Job(TaskBase, ActivityMixin):
return False
@is_expired.expression
- def is_expired(cls): #pylint: disable=E0213
- # We *could* rely on the caller to join Job.retention_tag and then use
- # RetentionTag.expire_in_days directly here, instead of a having this
- # correlated subquery. We have used that approach elsewhere in other
- # hybrids. It produces faster queries too. *BUT* if we did that here,
- # and then the caller accidentally forgot to do the join, this clause
- # would silently result in EVERY JOB being expired which would be
+ def is_expired(cls): # pylint: disable=E0213
+ # We *could* rely on the caller to join Job.retention_tag and then use
+ # RetentionTag.expire_in_days directly here, instead of a having this
+ # correlated subquery. We have used that approach elsewhere in other
+ # hybrids. It produces faster queries too. *BUT* if we did that here,
+ # and then the caller accidentally forgot to do the join, this clause
+ # would silently result in EVERY JOB being expired which would be
# a huge disaster.
- expire_in_days_subquery = select([RetentionTag.expire_in_days])\
- .where(RetentionTag.jobs).correlate(Job).label('expire_in_days')
+ expire_in_days_subquery = select([RetentionTag.expire_in_days]) \
+ .where(RetentionTag.jobs).correlate(Job).label('expire_in_days')
return and_(
Job.deleted == None,
expire_in_days_subquery > 0,
@@ -823,7 +840,9 @@ class Job(TaskBase, ActivityMixin):
def has_family(cls, family, query=None, **kw):
if query is None:
query = cls.query
- query = query.join(cls.recipesets, RecipeSet.recipes, Recipe.distro_tree, DistroTree.distro, Distro.osversion, OSVersion.osmajor).filter(OSMajor.osmajor == family).reset_joinpoint()
+ query = query.join(cls.recipesets, RecipeSet.recipes, Recipe.distro_tree, DistroTree.distro,
+ Distro.osversion, OSVersion.osmajor).filter(
+ OSMajor.osmajor == family).reset_joinpoint()
return query
@classmethod
@@ -839,7 +858,7 @@ class Job(TaskBase, ActivityMixin):
@classmethod
def by_product(cls, product, query=None):
if query is None:
- query=cls.query
+ query = cls.query
if isinstance(product, basestring):
product = [product]
product_query = cls.product_id.in_([Product.by_name(p).id for p in product])
@@ -849,7 +868,7 @@ class Job(TaskBase, ActivityMixin):
@classmethod
def by_owner(cls, owner, query=None):
if query is None:
- query=cls.query
+ query = cls.query
if isinstance(owner, basestring):
owner = [owner]
# by_user_name() returns None for non-existent users
@@ -863,7 +882,7 @@ class Job(TaskBase, ActivityMixin):
@classmethod
def by_groups(cls, groups, query=None):
if query is None:
- query=cls.query
+ query = cls.query
if isinstance(groups, basestring):
groups = [groups]
groups_query = cls.group_id.in_([Group.by_name(g).id for g in groups])
@@ -895,7 +914,8 @@ class Job(TaskBase, ActivityMixin):
if kw.get('whiteboard'):
job.whiteboard = kw.get('whiteboard')
if job.owner.rootpw_expired:
- raise BX(_(u"Your root password has expired, please change or clear it in order to submit jobs."))
+ raise BX(_(
+ u"Your root password has expired, please change or clear it in order to submit jobs."))
for distro_tree in distro_trees:
recipeSet = RecipeSet(ttasks=2)
@@ -915,7 +935,7 @@ class Job(TaskBase, ActivityMixin):
raise BX(_(u'You do not have access to reserve %s' % system))
if not system.in_lab_with_distro_tree(distro_tree):
raise BX(_(u'%s is not available on %s'
- % (distro_tree, system.lab_controller)))
+ % (distro_tree, system.lab_controller)))
if not system.compatible_with_distro_tree(
arch=distro_tree.arch,
osmajor=distro_tree.distro.osversion.osmajor.osmajor,
@@ -928,24 +948,24 @@ class Job(TaskBase, ActivityMixin):
lab_controller = kw.get('lab')
if not distro_tree.url_in_lab(lab_controller):
raise BX(_(u'%s is not available on %s'
- % (distro_tree, lab_controller)))
+ % (distro_tree, lab_controller)))
if not MachineRecipe.hypothetical_candidate_systems(job.owner,
- distro_tree=distro_tree,
- lab_controller=lab_controller).count():
+ distro_tree=distro_tree,
+ lab_controller=lab_controller).count():
raise BX(_(u'No available systems compatible with %s on %s'
- % (distro_tree, lab_controller)))
+ % (distro_tree, lab_controller)))
recipe.host_requires = (u'<hostRequires>'
- u'<labcontroller op="=" value="%s" />'
- u'<system_type op="=" value="Machine" />'
- u'</hostRequires>'
- % lab_controller.fqdn)
+ u'<labcontroller op="=" value="%s" />'
+ u'<system_type op="=" value="Machine" />'
+ u'</hostRequires>'
+ % lab_controller.fqdn)
recipeSet.lab_controller = lab_controller
else:
if not MachineRecipe.hypothetical_candidate_systems(job.owner,
- distro_tree=distro_tree).count():
+ distro_tree=distro_tree).count():
raise BX(_(u'No available systems compatible with %s'
% distro_tree))
- pass # leave hostrequires completely unset
+ pass # leave hostrequires completely unset
if kw.get('ks_meta'):
recipe.ks_meta = kw.get('ks_meta')
if kw.get('koptions'):
@@ -955,15 +975,15 @@ class Job(TaskBase, ActivityMixin):
# Eventually we will want the option to add more tasks.
# Add Install task
recipe.tasks.append(RecipeTask.from_task(
- Task.by_name(u'/distribution/check-install')))
+ Task.by_name(u'/distribution/check-install')))
# Add Reserve task
reserveTask = RecipeTask.from_task(
- Task.by_name(u'/distribution/reservesys'))
+ Task.by_name(u'/distribution/reservesys'))
if kw.get('reservetime'):
- reserveTask.params.append(RecipeTaskParam( name = 'RESERVETIME',
- value = kw.get('reservetime')
- )
- )
+ reserveTask.params.append(RecipeTaskParam(name='RESERVETIME',
+ value=kw.get('reservetime')
+ )
+ )
recipe.tasks.append(reserveTask)
recipeSet.recipes.append(recipe)
job.recipesets.append(recipeSet)
@@ -985,7 +1005,7 @@ class Job(TaskBase, ActivityMixin):
if kw.get('whiteboard'):
job.whiteboard = kw.get('whiteboard')
if job.owner.rootpw_expired:
- raise ValueError(u'Your root password has expired,'
+ raise ValueError(u'Your root password has expired,'
'please change or clear it in order to submit jobs.')
recipeSet = RecipeSet(ttasks=2)
recipe = MachineRecipe(ttasks=2)
@@ -1029,7 +1049,7 @@ class Job(TaskBase, ActivityMixin):
@classmethod
def find_jobs(cls, query=None, tag=None, complete_days=None, family=None,
- product=None, owner=None, **kw):
+ product=None, owner=None, **kw):
"""Return a filtered job query
Does what it says. Also helps searching for expired jobs
@@ -1048,7 +1068,7 @@ class Job(TaskBase, ActivityMixin):
log.exception(err_msg)
raise BX(err_msg)
- query =cls.has_family(family, query)
+ query = cls.has_family(family, query)
if tag:
if len(tag) == 1:
tag = tag[0]
@@ -1063,7 +1083,7 @@ class Job(TaskBase, ActivityMixin):
if len(product) == 1:
product = product[0]
try:
- query = cls.by_product(product,query)
+ query = cls.by_product(product, query)
except NoResultFound:
err_msg = _('Product is invalid: %s') % product
log.exception(err_msg)
@@ -1078,7 +1098,7 @@ class Job(TaskBase, ActivityMixin):
return query
@classmethod
- def cancel_jobs_by_user(cls, user, msg = None):
+ def cancel_jobs_by_user(cls, user, msg=None):
jobs = Job.query.filter(and_(Job.owner == user,
Job.status.in_([s for s in TaskStatus if not s.finished])))
for job in jobs:
@@ -1111,12 +1131,12 @@ class Job(TaskBase, ActivityMixin):
@property
def all_activity(self):
"""
- A list of all activity records from this job and its recipe sets,
+ A list of all activity records from this job and its recipe sets,
combined in the proper order.
"""
return sorted(
- sum((rs.activity for rs in self.recipesets), self.activity),
- key=lambda a: a.created, reverse=True)
+ sum((rs.activity for rs in self.recipesets), self.activity),
+ key=lambda a: a.created, reverse=True)
def clone_link(self):
""" return link to clone this job
@@ -1133,7 +1153,7 @@ class Job(TaskBase, ActivityMixin):
"""Returns a relative URL for job's page."""
return urllib.quote(u'/jobs/%s' % self.id)
- def is_owner(self,user):
+ def is_owner(self, user):
if self.owner == user:
return True
return False
@@ -1145,13 +1165,13 @@ class Job(TaskBase, ActivityMixin):
return False
@is_deleted.expression
- def is_deleted(cls): #pylint: disable=E0213
+ def is_deleted(cls): # pylint: disable=E0213
return cls.deleted != None
def priority_settings(self, prefix, colspan='1'):
span = Element('span')
title = Element('td')
- title.attrib['class']='title'
+ title.attrib['class'] = 'title'
title.text = "Set all RecipeSet priorities"
content = Element('td')
content.attrib['colspan'] = colspan
@@ -1164,10 +1184,10 @@ class Job(TaskBase, ActivityMixin):
span.append(content)
return span
- def retention_settings(self,prefix,colspan='1'):
+ def retention_settings(self, prefix, colspan='1'):
span = Element('span')
title = Element('td')
- title.attrib['class']='title'
+ title.attrib['class'] = 'title'
title.text = "Set all RecipeSet tags"
content = Element('td')
content.attrib['colspan'] = colspan
@@ -1230,15 +1250,15 @@ class Job(TaskBase, ActivityMixin):
Method for exporting job status for TaskWatcher
"""
return dict(
- id = "J:%s" % self.id,
- worker = None,
- state_label = "%s" % self.status,
- state = self.status.value,
- method = "%s" % self.whiteboard,
- result = "%s" % self.result,
- is_finished = self.is_finished(),
- is_failed = self.is_failed(),
- )
+ id="J:%s" % self.id,
+ worker=None,
+ state_label="%s" % self.status,
+ state=self.status.value,
+ method="%s" % self.whiteboard,
+ result="%s" % self.result,
+ is_finished=self.is_finished(),
+ is_failed=self.is_failed(),
+ )
def all_recipes(self):
"""
@@ -1247,6 +1267,7 @@ class Job(TaskBase, ActivityMixin):
for recipeset in self.recipesets:
for recipe in recipeset.recipes:
yield recipe
+
all_recipes = property(all_recipes)
@property
@@ -1255,15 +1276,16 @@ class Job(TaskBase, ActivityMixin):
def update_status(self):
if not self.is_dirty:
- # This error should be impossible to trigger in beakerd's
+ # This error should be impossible to trigger in beakerd's
# update_dirty_jobs thread.
- # If you are seeing this in a test case, it means something
- # *before* this point has failed to mark this job as dirty even
- # though the tests were expecting it to be dirty. That's a real bug
+ # If you are seeing this in a test case, it means something
+ # *before* this point has failed to mark this job as dirty even
+ # though the tests were expecting it to be dirty. That's a real bug
# which needs fixing in whatever happened before this point.
# For example: https://bugzilla.redhat.com/show_bug.cgi?id=991245#c15
raise RuntimeError('Invoked update_status on '
- 'job %s which was not dirty' % self.id)
+ 'job %s which was not dirty' % self.id)
+
self._update_status()
self._mark_clean()
@@ -1299,13 +1321,15 @@ class Job(TaskBase, ActivityMixin):
max_result = recipeset.result
status_changed = self._change_status(min_status)
self.result = max_result
- if status_changed and self.is_finished():
- # Send email notification
- mail.job_notify(self)
+ if status_changed:
+ send_scheduler_update(self)
+ if self.is_finished():
+ # Send email notification
+ mail.job_notify(self)
- #def t_id(self):
+ # def t_id(self):
# return "J:%s" % self.id
- #t_id = property(t_id)
+ # t_id = property(t_id)
@property
def link(self):
@@ -1353,7 +1377,7 @@ class Job(TaskBase, ActivityMixin):
"""Returns True iff the given user can comment on this job."""
if user is None:
return False
- return True # anyone can comment on any job
+ return True # anyone can comment on any job
def can_waive(self, user=None):
"""Returns True iff the given user can waive recipe sets in this job."""
@@ -1371,16 +1395,16 @@ class Job(TaskBase, ActivityMixin):
if self.group in user.groups:
return True
return self.is_owner(user) or user.is_admin() or \
- self.submitter == user
+ self.submitter == user
cc = association_proxy('_job_ccs', 'email_address')
-class JobCc(DeclarativeMappedObject):
+class JobCc(DeclarativeMappedObject):
__tablename__ = 'job_cc'
__table_args__ = {'mysql_engine': 'InnoDB'}
job_id = Column(Integer, ForeignKey('job.id', ondelete='CASCADE',
- onupdate='CASCADE'), primary_key=True)
+ onupdate='CASCADE'), primary_key=True)
job = relationship(Job, back_populates='_job_ccs')
email_address = Column(Unicode(255), primary_key=True, index=True)
@@ -1390,7 +1414,6 @@ class JobCc(DeclarativeMappedObject):
class Product(DeclarativeMappedObject):
-
__tablename__ = 'product'
__table_args__ = {'mysql_engine': 'InnoDB'}
id = Column(Integer, autoincrement=True, primary_key=True)
@@ -1431,14 +1454,14 @@ class Product(DeclarativeMappedObject):
except NoResultFound:
raise ValueError('No such product %r' % name)
-class BeakerTag(DeclarativeMappedObject):
+class BeakerTag(DeclarativeMappedObject):
__tablename__ = 'beaker_tag'
__table_args__ = (
UniqueConstraint('tag', 'type'),
{'mysql_engine': 'InnoDB'}
)
- id = Column(Integer, primary_key=True, nullable = False)
+ id = Column(Integer, primary_key=True, nullable=False)
tag = Column(Unicode(20), nullable=False)
type = Column(Unicode(40), nullable=False)
__mapper_args__ = {'polymorphic_on': type, 'polymorphic_identity': u'tag'}
@@ -1448,11 +1471,11 @@ class BeakerTag(DeclarativeMappedObject):
@classmethod
def by_id(cls, id, *args, **kw):
- return cls.query.filter(cls.id==id).one()
+ return cls.query.filter(cls.id == id).one()
@classmethod
def by_tag(cls, tag, *args, **kw):
- return cls.query.filter(cls.tag==tag).one()
+ return cls.query.filter(cls.tag == tag).one()
@classmethod
def get_all(cls, *args, **kw):
@@ -1460,11 +1483,10 @@ class BeakerTag(DeclarativeMappedObject):
class RetentionTag(BeakerTag):
-
__tablename__ = 'retention_tag'
__table_args__ = {'mysql_engine': 'InnoDB'}
id = Column(Integer, ForeignKey('beaker_tag.id', onupdate='CASCADE',
- ondelete='CASCADE'), primary_key=True)
+ ondelete='CASCADE'), primary_key=True)
is_default = Column('default_', Boolean(create_constraint=False))
expire_in_days = Column(Integer, default=0, nullable=False)
needs_product = Column(Boolean, default=False, nullable=False)
@@ -1476,7 +1498,7 @@ class RetentionTag(BeakerTag):
self.set_default_val(is_default)
@classmethod
- def by_name(cls,tag):
+ def by_name(cls, tag):
try:
return cls.query.filter_by(tag=tag).one()
except NoResultFound:
@@ -1500,13 +1522,15 @@ class RetentionTag(BeakerTag):
try:
current_default = self.get_default()
current_default.is_default = False
- except InvalidRequestError, e: pass
+ except InvalidRequestError:
+ pass
self.is_default = is_default
- default = property(get_default_val,set_default_val)
+
+ default = property(get_default_val, set_default_val)
@classmethod
def get_default(cls, *args, **kw):
- return cls.query.filter(cls.is_default==True).one()
+ return cls.query.filter(cls.is_default == True).one()
@classmethod
def list_by_requires_product(cls, requires=True, *args, **kw):
@@ -1526,8 +1550,8 @@ class RetentionTag(BeakerTag):
def __repr__(self):
return '%s(tag=%r, needs_product=%r, expire_in_days=%r)' % (
- self.__class__.__name__, self.tag, self.needs_product,
- self.expire_in_days)
+ self.__class__.__name__, self.tag, self.needs_product,
+ self.expire_in_days)
def __unicode__(self):
return self.tag
@@ -1541,6 +1565,7 @@ class RetentionTag(BeakerTag):
'needs_product': self.needs_product,
}
+
class RecipeSet(TaskBase, ActivityMixin):
"""
A Collection of Recipes that must be executed at the same time.
@@ -1552,12 +1577,12 @@ class RecipeSet(TaskBase, ActivityMixin):
job_id = Column(Integer, ForeignKey('job.id'), nullable=False)
job = relationship(Job, back_populates='recipesets')
priority = Column(TaskPriority.db_type(), nullable=False,
- default=TaskPriority.default_priority(), index=True)
+ default=TaskPriority.default_priority(), index=True)
queue_time = Column(DateTime, nullable=False, default=datetime.utcnow)
result = Column(TaskResult.db_type(), nullable=False,
- default=TaskResult.new, index=True)
+ default=TaskResult.new, index=True)
status = Column(TaskStatus.db_type(), nullable=False,
- default=TaskStatus.new, index=True)
+ default=TaskStatus.new, index=True)
lab_controller_id = Column(Integer, ForeignKey('lab_controller.id'))
lab_controller = relationship(LabController)
# Total tasks
@@ -1574,13 +1599,14 @@ class RecipeSet(TaskBase, ActivityMixin):
ktasks = Column(Integer, default=0)
recipes = relationship('Recipe', back_populates='recipeset')
activity = relationship(RecipeSetActivity, back_populates='object',
- order_by=[RecipeSetActivity.created.desc(), RecipeSetActivity.id.desc()])
+ order_by=[RecipeSetActivity.created.desc(),
+ RecipeSetActivity.id.desc()])
waived = Column(Boolean, nullable=False, default=False)
comments = relationship('RecipeSetComment', cascade='all, delete-orphan')
activity_type = RecipeSetActivity
- stop_types = ['abort','cancel']
+ stop_types = ['abort', 'cancel']
def __init__(self, ttasks=0, priority=None):
super(RecipeSet, self).__init__()
@@ -1591,24 +1617,12 @@ class RecipeSet(TaskBase, ActivityMixin):
return self.to_json()
def to_json(self, include_job=False, include_recipes=True):
- data = {
- 'id': self.id,
- 't_id': self.t_id,
- 'status': self.status,
- 'is_finished': self.is_finished(),
- 'ntasks': self.ntasks,
- 'ptasks': self.ptasks,
- 'wtasks': self.wtasks,
- 'ftasks': self.ftasks,
- 'ktasks': self.ktasks,
- 'ttasks': self.ttasks,
- 'priority': self.priority,
+ data = self.minimal_json_content()
+ data.update({
'possible_priorities': list(TaskPriority),
- 'waived': self.waived,
- 'comments': self.comments,
- 'clone_href': self.clone_link(),
- 'queue_time': self.queue_time,
- }
+ 'comments': self.comments
+ })
+
if identity.current.user:
u = identity.current.user
data['can_change_priority'] = self.can_change_priority(u)
@@ -1626,9 +1640,27 @@ class RecipeSet(TaskBase, ActivityMixin):
data['job'] = self.job.to_json(include_recipesets=False)
if include_recipes:
data['machine_recipes'] = [recipe.to_json(include_results=False)
- for recipe in self.machine_recipes]
+ for recipe in self.machine_recipes]
return data
+ def minimal_json_content(self):
+ return {
+ 'id': self.id,
+ 't_id': self.t_id,
+ 'status': self.status,
+ 'is_finished': self.is_finished(),
+ 'ntasks': self.ntasks,
+ 'ptasks': self.ptasks,
+ 'wtasks': self.wtasks,
+ 'ftasks': self.ftasks,
+ 'ktasks': self.ktasks,
+ 'ttasks': self.ttasks,
+ 'priority': self.priority,
+ 'waived': self.waived,
+ 'clone_href': self.clone_link(),
+ 'queue_time': self.queue_time,
+ }
+
def all_logs(self, load_parent=True):
"""
Returns an iterator of all logs in this recipeset.
@@ -1665,7 +1697,7 @@ class RecipeSet(TaskBase, ActivityMixin):
def can_change_priority(self, user):
"""
- Is the given user permitted to change the priority of this recipe
+ Is the given user permitted to change the priority of this recipe
set?
See also #allowed_priorities
"""
@@ -1673,6 +1705,7 @@ class RecipeSet(TaskBase, ActivityMixin):
def owner(self):
return self.job.owner
+
owner = property(owner)
def to_xml(self, clone=False, include_enclosing_job=False, **kwargs):
@@ -1699,7 +1732,7 @@ class RecipeSet(TaskBase, ActivityMixin):
comments = etree.Element('comments')
for c in self.comments:
comments.append(E.comment(c.comment, user=c.user.user_name,
- created=c.created.strftime('%Y-%m-%d %H:%M:%S')))
+ created=c.created.strftime('%Y-%m-%d %H:%M:%S')))
recipeSet.append(comments)
if include_enclosing_job:
@@ -1719,7 +1752,7 @@ class RecipeSet(TaskBase, ActivityMixin):
r.purge()
@classmethod
- def allowed_priorities_initial(cls,user):
+ def allowed_priorities_initial(cls, user):
if not user:
return
if user.is_admin() or user.has_permission('change_prio'):
@@ -1735,22 +1768,22 @@ class RecipeSet(TaskBase, ActivityMixin):
if type(tag) is list:
tag_query = cls.retention_tag_id.in_([RetentionTag.by_tag(unicode(t)).id for t in tag])
else:
- tag_query = cls.retention_tag==RetentionTag.by_tag(unicode(tag))
+ tag_query = cls.retention_tag == RetentionTag.by_tag(unicode(tag))
return query.filter(tag_query)
@classmethod
def by_datestamp(cls, datestamp, query=None):
if not query:
- query=cls.query
+ query = cls.query
return query.filter(RecipeSet.queue_time <= datestamp)
@classmethod
- def by_id(cls,id):
+ def by_id(cls, id):
return cls.query.filter_by(id=id).one()
@classmethod
- def by_job_id(cls,job_id):
+ def by_job_id(cls, job_id):
queri = RecipeSet.query.outerjoin('job').filter(Job.id == job_id)
return queri
@@ -1767,11 +1800,13 @@ class RecipeSet(TaskBase, ActivityMixin):
# MySQL did not pick the recipe.status index. bz1573081
all_recipes = aliased(Recipe, name='all_recipes')
matching_recipes = aliased(Recipe, name='matching_recipes')
- return RecipeSet.query\
- .join(all_recipes)\
- .join(matching_recipes, and_(matching_recipes.recipeset, matching_recipes.status == status))\
- .group_by(RecipeSet.id)\
- .having(func.count(all_recipes.id.distinct()) == func.count(matching_recipes.id.distinct()))
+ return RecipeSet.query \
+ .join(all_recipes) \
+ .join(matching_recipes,
+ and_(matching_recipes.recipeset, matching_recipes.status == status)) \
+ .group_by(RecipeSet.id) \
+ .having(
+ func.count(all_recipes.id.distinct()) == func.count(matching_recipes.id.distinct()))
def cancel(self, msg=None):
"""
@@ -1815,9 +1850,12 @@ class RecipeSet(TaskBase, ActivityMixin):
min_status = recipe.status
if recipe.result.severity > max_result.severity:
max_result = recipe.result
- self._change_status(min_status)
+ status_changed = self._change_status(min_status)
self.result = max_result
+ if status_changed:
+ send_scheduler_update(self)
+
# Return systems if recipeSet finished
if self.is_finished():
for recipe in self.recipes:
@@ -1826,38 +1864,39 @@ class RecipeSet(TaskBase, ActivityMixin):
def machine_recipes_orderby(self, labcontroller):
query = select([Recipe.id,
func.count(System.id).label('count')],
- from_obj=[Recipe.__table__,
- system_recipe_map,
- System.__table__,
- RecipeSet.__table__,
- LabController.__table__],
- whereclause="recipe.id = system_recipe_map.recipe_id \
+ from_obj=[Recipe.__table__,
+ system_recipe_map,
+ System.__table__,
+ RecipeSet.__table__,
+ LabController.__table__],
+ whereclause="recipe.id = system_recipe_map.recipe_id \
AND system.id = system_recipe_map.system_id \
AND system.lab_controller_id = lab_controller.id \
AND recipe_set.id = recipe.recipe_set_id \
AND recipe_set.id = %s \
AND lab_controller.id = %s" % (self.id,
- labcontroller.id),
- group_by=[Recipe.id],
- order_by='count')
- return map(lambda x: MachineRecipe.query.filter_by(id=x[0]).first(), session.connection(RecipeSet).execute(query).fetchall())
+ labcontroller.id),
+ group_by=[Recipe.id],
+ order_by='count')
+ return map(lambda x: MachineRecipe.query.filter_by(id=x[0]).first(),
+ session.connection(RecipeSet).execute(query).fetchall())
def task_info(self):
"""
Method for exporting RecipeSet status for TaskWatcher
"""
return dict(
- id = "RS:%s" % self.id,
- worker = None,
- state_label = "%s" % self.status,
- state = self.status.value,
- method = None,
- result = "%s" % self.result,
- is_finished = self.is_finished(),
- is_failed = self.is_failed(),
- )
-
- def allowed_priorities(self,user):
+ id="RS:%s" % self.id,
+ worker=None,
+ state_label="%s" % self.status,
+ state=self.status.value,
+ method=None,
+ result="%s" % self.result,
+ is_finished=self.is_finished(),
+ is_failed=self.is_failed(),
+ )
+
+ def allowed_priorities(self, user):
if not user:
return []
if user.is_admin() or user.has_permission('change_prio'):
@@ -1891,7 +1930,7 @@ class RecipeReservationRequest(DeclarativeMappedObject):
recipe_id = Column(Integer, ForeignKey('recipe.id'), nullable=False)
duration = Column(Integer, default=86400, nullable=False)
when = Column(RecipeReservationCondition.db_type(), nullable=False,
- default=RecipeReservationCondition.always)
+ default=RecipeReservationCondition.always)
def __json__(self):
return {
@@ -1930,11 +1969,11 @@ class Recipe(TaskBase, ActivityMixin):
distro_tree_id = Column(Integer, ForeignKey('distro_tree.id'))
distro_tree = relationship(DistroTree, back_populates='recipes')
installation = relationship(Installation, uselist=False,
- back_populates='recipe')
+ back_populates='recipe')
result = Column(TaskResult.db_type(), nullable=False,
- default=TaskResult.new, index=True)
+ default=TaskResult.new, index=True)
status = Column(TaskStatus.db_type(), nullable=False,
- default=TaskStatus.new, index=True)
+ default=TaskStatus.new, index=True)
reservation_request = relationship(RecipeReservationRequest, uselist=False)
start_time = Column(DateTime, index=True)
finish_time = Column(DateTime)
@@ -1968,29 +2007,29 @@ class Recipe(TaskBase, ActivityMixin):
autopick_random = Column(Boolean, nullable=False, default=False)
log_server = Column(Unicode(255), index=True)
virt_status = Column(RecipeVirtStatus.db_type(), index=True,
- nullable=False, default=RecipeVirtStatus.possible)
+ nullable=False, default=RecipeVirtStatus.possible)
__mapper_args__ = {'polymorphic_on': type, 'polymorphic_identity': u'recipe'}
resource = relationship('RecipeResource', uselist=False, back_populates='recipe')
watchdog = relationship(Watchdog, uselist=False,
- cascade='all, delete, delete-orphan')
+ cascade='all, delete, delete-orphan')
systems = relationship(System, secondary=system_recipe_map,
- back_populates='queued_recipes')
+ back_populates='queued_recipes')
dyn_systems = dynamic_loader(System, secondary=system_recipe_map)
tasks = relationship('RecipeTask', back_populates='recipe')
dyn_tasks = relationship('RecipeTask', lazy='dynamic')
tags = relationship('RecipeTag', secondary=recipe_tag_map,
- back_populates='recipes')
+ back_populates='recipes')
repos = relationship('RecipeRepo')
rpms = relationship('RecipeRpm', back_populates='recipe')
logs = relationship(LogRecipe, back_populates='parent', cascade='all, delete-orphan')
custom_packages = relationship(TaskPackage,
- secondary=task_packages_custom_map)
+ secondary=task_packages_custom_map)
ks_appends = relationship('RecipeKSAppend')
- stop_types = ['abort','cancel']
+ stop_types = ['abort', 'cancel']
activity = relationship(RecipeActivity, back_populates='object',
- cascade='all, delete-orphan',
- order_by=[RecipeActivity.created.desc(), RecipeActivity.id.desc()])
+ cascade='all, delete-orphan',
+ order_by=[RecipeActivity.created.desc(), RecipeActivity.id.desc()])
activity_type = RecipeActivity
def __init__(self, ttasks=0):
@@ -2018,7 +2057,7 @@ class Recipe(TaskBase, ActivityMixin):
def repopath(self):
return get('basepath.repos')
- def is_owner(self,user):
+ def is_owner(self, user):
return self.recipeset.job.owner == user
@property
@@ -2036,20 +2075,22 @@ class Recipe(TaskBase, ActivityMixin):
def link(self):
""" Return a link to this recipe. """
return make_link(url='/recipes/%s' % self.id, text=self.t_id,
- elem_class='recipe-id')
+ elem_class='recipe-id')
def filepath(self):
"""
Return file path for this recipe
"""
- job = self.recipeset.job
+ job = self.recipeset.job
return "%s/%02d/%s/%s/%s" % (self.recipeset.queue_time.year,
- self.recipeset.queue_time.month,
- job.id // Log.MAX_ENTRIES_PER_DIRECTORY, job.id, self.id)
+ self.recipeset.queue_time.month,
+ job.id // Log.MAX_ENTRIES_PER_DIRECTORY, job.id, self.id)
+
filepath = property(filepath)
def owner(self):
return self.recipeset.job.owner
+
owner = property(owner)
def purge(self):
@@ -2057,9 +2098,9 @@ class Recipe(TaskBase, ActivityMixin):
How we delete a Recipe.
"""
self.logs = []
- # Hacking up the associated Installation like this is a little dodgy,
- # the idea is just to save on disk space in the kickstart table. In
- # theory no user will ever end up looking at the Installation now that
+ # Hacking up the associated Installation like this is a little dodgy,
+ # the idea is just to save on disk space in the kickstart table. In
+ # theory no user will ever end up looking at the Installation now that
# the recipe is deleted anyway...
if self.installation and self.installation.rendered_kickstart:
session.delete(self.installation.rendered_kickstart)
@@ -2074,17 +2115,17 @@ class Recipe(TaskBase, ActivityMixin):
# so we just fake it with the prefixes parameter:
# http://stackoverflow.com/a/34854513/120202
query = delete(RecipeTaskResult.__table__.join(RecipeTask),
- prefixes=[RecipeTaskResult.__table__.name])\
- .where(RecipeTask.recipe_id == self.id)
+ prefixes=[RecipeTaskResult.__table__.name]) \
+ .where(RecipeTask.recipe_id == self.id)
session.connection(RecipeTaskResult).execute(query)
def task_repo(self):
- return ('beaker-tasks',absolute_url('/repos/%s' % self.id,
- scheme='http',
- labdomain=True,
- webpath=False,
- )
- )
+ return ('beaker-tasks', absolute_url('/repos/%s' % self.id,
+ scheme='http',
+ labdomain=True,
+ webpath=False,
+ )
+ )
def harness_repo(self):
"""
@@ -2115,7 +2156,7 @@ class Recipe(TaskBase, ActivityMixin):
return InstallOptions(ks_meta, {}, {})
def to_xml(self, clone=False, include_enclosing_job=True,
- include_logs=True, **kwargs):
+ include_logs=True, **kwargs):
recipe = etree.Element(self.xml_element_name)
if not clone:
recipe.set("id", "%s" % self.id)
@@ -2134,7 +2175,8 @@ class Recipe(TaskBase, ActivityMixin):
recipe.set('kickstart_url', self.installation.rendered_kickstart.link)
recipe.set("ks_meta", "%s" % self.ks_meta and self.ks_meta or '')
recipe.set("kernel_options", "%s" % self.kernel_options and self.kernel_options or '')
- recipe.set("kernel_options_post", "%s" % self.kernel_options_post and self.kernel_options_post or '')
+ recipe.set("kernel_options_post",
+ "%s" % self.kernel_options_post and self.kernel_options_post or '')
if self.start_time and not clone:
recipe.set('start_time', unicode(self.start_time))
if self.finish_time and not clone:
@@ -2168,13 +2210,16 @@ class Recipe(TaskBase, ActivityMixin):
if self.installation:
if self.installation.install_started:
installation.set('install_started',
- self.installation.install_started.strftime('%Y-%m-%d %H:%M:%S'))
+ self.installation.install_started.strftime(
+ '%Y-%m-%d %H:%M:%S'))
if self.installation.install_finished:
installation.set('install_finished',
- self.installation.install_finished.strftime('%Y-%m-%d %H:%M:%S'))
+ self.installation.install_finished.strftime(
+ '%Y-%m-%d %H:%M:%S'))
if self.installation.postinstall_finished:
installation.set('postinstall_finished',
- self.installation.postinstall_finished.strftime('%Y-%m-%d %H:%M:%S'))
+ self.installation.postinstall_finished.strftime(
+ '%Y-%m-%d %H:%M:%S'))
recipe.append(installation)
packages = etree.Element("packages")
if self.custom_packages:
@@ -2237,6 +2282,7 @@ class Recipe(TaskBase, ActivityMixin):
return self.finish_time - self.start_time
except TypeError:
return None
+
duration = property(_get_duration)
@property
@@ -2313,10 +2359,11 @@ class Recipe(TaskBase, ActivityMixin):
for partition in self._parse_partitions():
if partition.get('fs'):
partitions.append('%s:%s:%s:%s' % (partition['name'],
- partition['type'], partition['size'], partition['fs']))
+ partition['type'], partition['size'],
+ partition['fs']))
else:
partitions.append('%s:%s:%s' % (partition['name'],
- partition['type'], partition['size']))
+ partition['type'], partition['size']))
return ';'.join(partitions)
partitionsKSMeta = property(_partitionsKSMeta)
@@ -2351,7 +2398,7 @@ class Recipe(TaskBase, ActivityMixin):
# falls back to being queued on a regular system
makedirs_ignore(snapshot_repo, 0755)
Task.make_snapshot_repo(snapshot_repo)
- # Record task versions as they existed at this point in time, since we
+ # Record task versions as they existed at this point in time, since we
# just created the task library snapshot for this recipe.
for recipetask in self.tasks:
if recipetask.task:
@@ -2368,7 +2415,7 @@ class Recipe(TaskBase, ActivityMixin):
shutil.rmtree(directory)
except OSError:
if os.path.isdir(directory):
- #something else must have gone wrong
+ # something else must have gone wrong
raise
def schedule(self):
@@ -2397,6 +2444,7 @@ class Recipe(TaskBase, ActivityMixin):
delta = self.watchdog.kill_time - datetime.utcnow().replace(microsecond=0)
duration = delta
return duration
+
time_remaining = property(_time_remaining)
def return_reservation_link(self):
@@ -2412,7 +2460,7 @@ class Recipe(TaskBase, ActivityMixin):
Method to abort/cancel all unfinished tasks in this recipe.
"""
# This ensures that the recipe does not stay Reserved when it is already
- # Reserved and the watchdog expires (for abort) or, is cancelled
+ # Reserved and the watchdog expires (for abort) or, is cancelled
# (user initiated)
if self.watchdog:
self.extend(0)
@@ -2489,10 +2537,10 @@ class Recipe(TaskBase, ActivityMixin):
# Record the start of this Recipe.
if not self.start_time \
- and self.status in [TaskStatus.installing, TaskStatus.running]:
+ and self.status in [TaskStatus.installing, TaskStatus.running]:
if self.installation.rebooted:
self.start_time = self.installation.rebooted
- elif self.installation.install_started: # in case of manual reboot
+ elif self.installation.install_started: # in case of manual reboot
self.start_time = self.installation.install_started
elif self.first_task.start_time:
self.start_time = self.first_task.start_time
@@ -2503,6 +2551,9 @@ class Recipe(TaskBase, ActivityMixin):
# Record the completion of this Recipe.
self.finish_time = datetime.utcnow()
+ if status_changed:
+ send_scheduler_update(self)
+
if status_changed and self.is_finished():
metrics.increment('counters.recipes_%s' % self.status.name)
if self.is_suspiciously_aborted and \
@@ -2511,18 +2562,18 @@ class Recipe(TaskBase, ActivityMixin):
self.resource.system.suspicious_abort()
if self.is_finished():
- # If we have any guests which haven't started, kill them now
+ # If we have any guests which haven't started, kill them now
# because there is no way they can ever start.
for guest in getattr(self, 'guests', []):
if (not guest.is_finished() and
guest.watchdog and not guest.watchdog.kill_time):
guest.abort(msg=u'Aborted: host %s finished but guest never started'
- % self.t_id)
+ % self.t_id)
def _fix_zombie_tasks(self):
- # It's not possible to get into this state in recent version of Beaker,
- # but very old recipes may be finished while still having tasks that
- # are running. We don't want to restart the recipe though, so we need
+ # It's not possible to get into this state in recent version of Beaker,
+ # but very old recipes may be finished while still having tasks that
+ # are running. We don't want to restart the recipe though, so we need
# to kill the zombie tasks.
log.debug('Fixing zombie tasks in %s', self.t_id)
assert self.is_finished()
@@ -2549,19 +2600,19 @@ class Recipe(TaskBase, ActivityMixin):
def reduced_install_options(self):
sources = []
sources.append(install_options_for_distro(
- self.installation.osmajor,
- self.installation.osminor,
- self.installation.variant,
- self.installation.arch))
+ self.installation.osmajor,
+ self.installation.osminor,
+ self.installation.variant,
+ self.installation.arch))
if self.distro_tree:
sources.append(self.distro_tree.install_options())
if self.resource:
sources.extend(self.resource.install_options(arch=self.installation.arch,
- osmajor=self.installation.osmajor,
- osminor=self.installation.osminor))
+ osmajor=self.installation.osmajor,
+ osminor=self.installation.osminor))
sources.append(self.generated_install_options())
sources.append(InstallOptions.from_strings(self.ks_meta,
- self.kernel_options, self.kernel_options_post))
+ self.kernel_options, self.kernel_options_post))
return InstallOptions.reduce(sources)
def provision(self):
@@ -2569,7 +2620,8 @@ class Recipe(TaskBase, ActivityMixin):
install_options = self.reduced_install_options()
if self.distro_tree:
self.installation.tree_url = self.distro_tree.url_in_lab(
- lab_controller=self.recipeset.lab_controller, scheme=install_options.ks_meta.get('method', None))
+ lab_controller=self.recipeset.lab_controller,
+ scheme=install_options.ks_meta.get('method', None))
by_kernel = ImageType.kernel
by_initrd = ImageType.initrd
if getattr(self.resource, 'system', None) and self.resource.system.kernel_type:
@@ -2585,8 +2637,8 @@ class Recipe(TaskBase, ActivityMixin):
by_initrd, kernel_type).path
if 'no_default_harness_repo' not in install_options.ks_meta \
- and not self.harness_repo():
- raise ValueError('Failed to find repo for harness')
+ and not self.harness_repo():
+ raise ValueError('Failed to find repo for harness')
if 'ks' in install_options.kernel_options:
# Use it as is
rendered_kickstart = None
@@ -2600,7 +2652,7 @@ class Recipe(TaskBase, ActivityMixin):
if line.find('%packages') == 0:
nopackages = False
break
- beforepackages = self.kickstart[:packages_slot-1]
+ beforepackages = self.kickstart[:packages_slot - 1]
afterpackages = self.kickstart[packages_slot:]
# if no %packages section then add it
if nopackages:
@@ -2640,21 +2692,21 @@ class Recipe(TaskBase, ActivityMixin):
{%% snippet 'postinstall_done' %%}
"""
kickstart = kicktemplate % dict(
- beforepackages = beforepackages,
- afterpackages = afterpackages)
+ beforepackages=beforepackages,
+ afterpackages=afterpackages)
rendered_kickstart = generate_kickstart(install_options=install_options,
- distro_tree=self.distro_tree,
- system=getattr(self.resource, 'system', None),
- user=self.recipeset.job.owner,
- recipe=self, kickstart=kickstart)
+ distro_tree=self.distro_tree,
+ system=getattr(self.resource, 'system', None),
+ user=self.recipeset.job.owner,
+ recipe=self, kickstart=kickstart)
install_options.kernel_options['ks'] = rendered_kickstart.link
else:
ks_appends = [ks_append.ks_append for ks_append in self.ks_appends]
rendered_kickstart = generate_kickstart(install_options=install_options,
- distro_tree=self.distro_tree,
- system=getattr(self.resource, 'system', None),
- user=self.recipeset.job.owner,
- recipe=self, ks_appends=ks_appends)
+ distro_tree=self.distro_tree,
+ system=getattr(self.resource, 'system', None),
+ user=self.recipeset.job.owner,
+ recipe=self, ks_appends=ks_appends)
install_options.kernel_options['ks'] = rendered_kickstart.link
self.installation.kernel_options = install_options.kernel_options_str
@@ -2662,13 +2714,13 @@ class Recipe(TaskBase, ActivityMixin):
if isinstance(self.resource, SystemResource):
self.installation.system = self.resource.system
self.resource.system.configure_netboot(installation=self.installation,
- service=u'Scheduler')
+ service=u'Scheduler')
self.resource.system.action_power(action=u'reboot',
- installation=self.installation, service=u'Scheduler')
+ installation=self.installation, service=u'Scheduler')
self.resource.system.record_activity(user=self.recipeset.job.owner,
- service=u'Scheduler', action=u'Provision',
- field=u'Distro Tree', old=u'',
- new=unicode(self.distro_tree))
+ service=u'Scheduler', action=u'Provision',
+ field=u'Distro Tree', old=u'',
+ new=unicode(self.distro_tree))
elif isinstance(self.resource, VirtResource):
# Delayed import to avoid circular dependency
from bkr.server import dynamic_virt
@@ -2678,8 +2730,8 @@ class Recipe(TaskBase, ActivityMixin):
self.initial_watchdog()
def cleanup(self):
- # Note that this may be called *many* times for a recipe, even when it
- # has already been cleaned up, so we have to handle that gracefully
+ # Note that this may be called *many* times for a recipe, even when it
+ # has already been cleaned up, so we have to handle that gracefully
# (and cheaply!)
self.destroyRepo()
if self.resource:
@@ -2693,8 +2745,8 @@ class Recipe(TaskBase, ActivityMixin):
# self.systems = []
# but that triggers a SELECT of all the rows
# and then a DELETE of each one indvidually. This is faster.
- query = delete(system_recipe_map)\
- .where(system_recipe_map.c.recipe_id == self.id)
+ query = delete(system_recipe_map) \
+ .where(system_recipe_map.c.recipe_id == self.id)
session.connection(self.__class__).execute(query)
session.expire(self, ['systems'])
@@ -2707,15 +2759,15 @@ class Recipe(TaskBase, ActivityMixin):
else:
worker = None
return dict(
- id = "R:%s" % self.id,
- worker = worker,
- state_label = "%s" % self.status,
- state = self.status.value,
- method = "%s" % self.whiteboard,
- result = "%s" % self.result,
- is_finished = self.is_finished(),
- is_failed = self.is_failed(),
- )
+ id="R:%s" % self.id,
+ worker=worker,
+ state_label="%s" % self.status,
+ state=self.status.value,
+ method="%s" % self.whiteboard,
+ result="%s" % self.result,
+ is_finished=self.is_finished(),
+ is_failed=self.is_failed(),
+ )
def extend(self, kill_time):
"""
@@ -2727,9 +2779,9 @@ class Recipe(TaskBase, ActivityMixin):
raise TypeError('Pass number of seconds to extend the watchdog by')
if kill_time:
self.watchdog.kill_time = datetime.utcnow() + timedelta(
- seconds=kill_time)
+ seconds=kill_time)
else:
- # kill_time of zero is a special case, it means someone wants to
+ # kill_time of zero is a special case, it means someone wants to
# end the recipe right now.
self.watchdog.kill_time = datetime.utcnow()
# We need to mark the job as dirty so that update_status will take
@@ -2764,22 +2816,22 @@ class Recipe(TaskBase, ActivityMixin):
"""
# Get all the logs from log_* tables directly to avoid doing N database
# queries for N results on large recipes.
- recipe_logs = LogRecipe.query\
- .filter(LogRecipe.recipe_id == self.id)
- recipe_task_logs = LogRecipeTask.query\
- .join(LogRecipeTask.parent)\
- .join(RecipeTask.recipe)\
- .filter(Recipe.id == self.id)
+ recipe_logs = LogRecipe.query \
+ .filter(LogRecipe.recipe_id == self.id)
+ recipe_task_logs = LogRecipeTask.query \
+ .join(LogRecipeTask.parent) \
+ .join(RecipeTask.recipe) \
+ .filter(Recipe.id == self.id)
if load_parent:
recipe_task_logs = recipe_task_logs.options(contains_eager(LogRecipeTask.parent))
- recipe_task_result_logs = LogRecipeTaskResult.query\
- .join(LogRecipeTaskResult.parent)\
- .join(RecipeTaskResult.recipetask)\
- .join(RecipeTask.recipe)\
- .filter(Recipe.id == self.id)
+ recipe_task_result_logs = LogRecipeTaskResult.query \
+ .join(LogRecipeTaskResult.parent) \
+ .join(RecipeTaskResult.recipetask) \
+ .join(RecipeTask.recipe) \
+ .filter(Recipe.id == self.id)
if load_parent:
- recipe_task_result_logs = recipe_task_result_logs\
- .options(contains_eager(LogRecipeTaskResult.parent))
+ recipe_task_result_logs = recipe_task_result_logs \
+ .options(contains_eager(LogRecipeTaskResult.parent))
return chain(recipe_logs, recipe_task_logs, recipe_task_result_logs)
@classmethod
@@ -2788,11 +2840,11 @@ class Recipe(TaskBase, ActivityMixin):
A class method that can be used to search for Jobs that belong to a user
"""
return cls.query.filter(Recipe.recipeset.has(
- RecipeSet.job.has(Job.owner == owner)))
+ RecipeSet.job.has(Job.owner == owner)))
def peer_roles(self):
"""
- Returns dict of (role -> recipes) for all "peer" recipes (recipes in
+ Returns dict of (role -> recipes) for all "peer" recipes (recipes in
the same recipe set as this recipe, *including this recipe*).
"""
result = {}
@@ -2806,18 +2858,18 @@ class Recipe(TaskBase, ActivityMixin):
@property
def position_in_job(self):
"""
- Returns an ordinal indicating the position of this recipe in the job.
+ Returns an ordinal indicating the position of this recipe in the job.
The first recipe is 1, the second recipe is 2, etc.
"""
- # Using a db query for this in order to avoid loading all recipes in
+ # Using a db query for this in order to avoid loading all recipes in
# the job.
- # Note that recipe sets/recipes have no explicitly persisted ordering,
- # it's implicit based on insertion order which is then reflected in the
- # id ordering. So the ordinal position of this recipe is really the
+ # Note that recipe sets/recipes have no explicitly persisted ordering,
+ # it's implicit based on insertion order which is then reflected in the
+ # id ordering. So the ordinal position of this recipe is really the
# number of recipes in the job with a lower id than this one, plus 1.
- recipes_before_self = Recipe.query.join(RecipeSet)\
- .filter(RecipeSet.job == self.recipeset.job)\
- .filter(Recipe.id < self.id)
+ recipes_before_self = Recipe.query.join(RecipeSet) \
+ .filter(RecipeSet.job == self.recipeset.job) \
+ .filter(Recipe.id < self.id)
return recipes_before_self.with_entities(func.count(Recipe.id)).scalar() + 1
@property
@@ -2836,7 +2888,8 @@ class Recipe(TaskBase, ActivityMixin):
def can_update_reservation_request(self, user=None):
"""Returns True iff the given user can update the reservation request"""
return self.can_edit(user) and self.status not in (TaskStatus.completed,
- TaskStatus.cancelled, TaskStatus.aborted, TaskStatus.reserved)
+ TaskStatus.cancelled, TaskStatus.aborted,
+ TaskStatus.reserved)
def can_comment(self, user):
"""Returns True iff the given user can comment on this recipe."""
@@ -2848,7 +2901,7 @@ class Recipe(TaskBase, ActivityMixin):
# Delayed import to avoid circular dependency
from bkr.server.model import RecipeReviewedState
rrs = session.object_session(self).query(RecipeReviewedState).filter_by(
- recipe_id=self.id, user_id=user.user_id).first()
+ recipe_id=self.id, user_id=user.user_id).first()
if rrs is not None:
return rrs.reviewed
return False
@@ -2871,26 +2924,10 @@ class Recipe(TaskBase, ActivityMixin):
return self.to_json()
def to_json(self, include_recipeset=False, include_tasks=True, include_results=True):
- data = {
- 'id': self.id,
- 't_id': self.t_id,
- 'status': self.status,
- 'is_finished': self.is_finished(),
- 'is_deleted': self.is_deleted,
- 'result': self.result,
- 'whiteboard': self.whiteboard,
+ data = self.minimal_json_content()
+ data.update({
'distro_tree': self.distro_tree,
- 'role': self.role,
- 'resource': self.resource,
'installation': self.installation,
- 'ntasks': self.ntasks,
- 'ptasks': self.ptasks,
- 'wtasks': self.wtasks,
- 'ftasks': self.ftasks,
- 'ktasks': self.ktasks,
- 'ttasks': self.ttasks,
- 'start_time': self.start_time,
- 'finish_time': self.finish_time,
'ks_meta': self.ks_meta,
'kernel_options': self.kernel_options,
'kernel_options_post': self.kernel_options_post,
@@ -2905,13 +2942,13 @@ class Recipe(TaskBase, ActivityMixin):
# for backwards compatibility only:
'recipe_id': self.id,
'job_id': self.recipeset.job.t_id,
- }
+ })
# watchdog may not have kill time.
if self.watchdog and self.watchdog.kill_time:
data['time_remaining_seconds'] = int(total_seconds(self.time_remaining))
else:
data['time_remaining_seconds'] = None
- if self.reservation_request:
+ if self.reservation_request:
data['reservation_request'] = self.reservation_request.__json__()
else:
data['reservation_request'] = RecipeReservationRequest.empty_json()
@@ -2936,12 +2973,33 @@ class Recipe(TaskBase, ActivityMixin):
data['reviewed'] = None
if include_recipeset:
data['recipeset'] = self.recipeset.to_json(
- include_job=True, include_recipes=False)
+ include_job=True, include_recipes=False)
if include_tasks:
data['tasks'] = [task.to_json(include_results=include_results)
- for task in self.tasks]
+ for task in self.tasks]
return data
+ def minimal_json_content(self):
+ return {
+ 'id': self.id,
+ 't_id': self.t_id,
+ 'status': self.status,
+ 'is_finished': self.is_finished(),
+ 'is_deleted': self.is_deleted,
+ 'result': self.result,
+ 'whiteboard': self.whiteboard,
+ 'role': self.role,
+ 'resource': self.resource,
+ 'ntasks': self.ntasks,
+ 'ptasks': self.ptasks,
+ 'wtasks': self.wtasks,
+ 'ftasks': self.ftasks,
+ 'ktasks': self.ktasks,
+ 'ttasks': self.ttasks,
+ 'start_time': self.start_time,
+ 'finish_time': self.finish_time,
+ }
+
def _roles_to_xml(recipe):
for key, recipes in sorted(recipe.peer_roles().iteritems()):
@@ -2952,23 +3010,24 @@ def _roles_to_xml(recipe):
system = etree.Element("system")
system.set("value", "%s" % r.resource.fqdn)
role.append(system)
- yield(role)
+ yield (role)
-# The recipe status will change Waiting <-> Installing based on the timestamps
-# recorded on the installation. So we need to ensure that the job is marked
+# The recipe status will change Waiting <-> Installing based on the timestamps
+# recorded on the installation. So we need to ensure that the job is marked
# dirty whenever a timestamp is recorded, so that beakerd will call update_status.
def _mark_installation_recipe_dirty(installation, value, oldvalue, initiator):
if installation.recipe:
installation.recipe.recipeset.job._mark_dirty()
-event.listen(Installation.rebooted, 'set', _mark_installation_recipe_dirty)
-event.listen(Installation.install_started, 'set', _mark_installation_recipe_dirty)
-event.listen(Installation.install_finished, 'set', _mark_installation_recipe_dirty)
+
+
+event.listen(Installation.rebooted, 'set', _mark_installation_recipe_dirty)
+event.listen(Installation.install_started, 'set', _mark_installation_recipe_dirty)
+event.listen(Installation.install_finished, 'set', _mark_installation_recipe_dirty)
event.listen(Installation.postinstall_finished, 'set', _mark_installation_recipe_dirty)
class GuestRecipe(Recipe):
-
__tablename__ = 'guest_recipe'
__table_args__ = {'mysql_engine': 'InnoDB'}
id = Column(Integer, ForeignKey('recipe.id'), primary_key=True)
@@ -2976,7 +3035,7 @@ class GuestRecipe(Recipe):
guestargs = Column(UnicodeText)
__mapper_args__ = {'polymorphic_identity': u'guest_recipe'}
hostrecipe = relationship('MachineRecipe', secondary=machine_guest_map,
- uselist=False, back_populates='guests')
+ uselist=False, back_populates='guests')
xml_element_name = 'guestrecipe'
systemtype = 'Virtual'
@@ -2987,12 +3046,13 @@ class GuestRecipe(Recipe):
data['guestargs'] = self.guestargs
if include_hostrecipe:
data['hostrecipe'] = self.hostrecipe.to_json(include_guests=False,
- include_tasks=False, include_recipeset=False)
+ include_tasks=False,
+ include_recipeset=False)
return data
def to_xml(self, clone=False, **kwargs):
node = super(GuestRecipe, self).to_xml(clone=clone, **kwargs)
- # Note that node may be either a job element or a guestrecipe element,
+ # Note that node may be either a job element or a guestrecipe element,
# depending on the value of include_enclosing_job kwarg.
if node.tag != self.xml_element_name:
recipe = node.find('.//' + self.xml_element_name)
@@ -3007,16 +3067,16 @@ class GuestRecipe(Recipe):
recipe.set('location', self.installation.tree_url)
elif self.distro_tree and self.recipeset.lab_controller:
# We are producing XML for an old recipe provisioned prior to Beaker 25.
- # We used to call reduced_install_options() to find the value
- # of the method= ksmeta var and use that to pick the right
- # distro tree URL. However that code path no longer works for
- # old recipes. This is a second-best attempt. Really, people
- # should not expect the location="" attribute to be reliable
+ # We used to call reduced_install_options() to find the value
+ # of the method= ksmeta var and use that to pick the right
+ # distro tree URL. However that code path no longer works for
+ # old recipes. This is a second-best attempt. Really, people
+ # should not expect the location="" attribute to be reliable
# after the guest recipe has completed.
installopts = InstallOptions.from_strings(self.ks_meta, '', '')
method = installopts.ks_meta.get('method', None)
location = self.distro_tree.url_in_lab(
- self.recipeset.lab_controller, scheme=method)
+ self.recipeset.lab_controller, scheme=method)
if location:
recipe.set("location", location)
if self.distro_tree and self.recipeset.lab_controller and not clone:
@@ -3050,10 +3110,12 @@ class GuestRecipe(Recipe):
def t_id(self):
return 'R:%s' % self.id
+
t_id = property(t_id)
distro_requires = property(_get_distro_requires, _set_distro_requires)
+
class MachineRecipe(Recipe):
"""
Optionally can contain guest recipes which are just other recipes
@@ -3065,7 +3127,7 @@ class MachineRecipe(Recipe):
id = Column(Integer, ForeignKey('recipe.id'), primary_key=True)
__mapper_args__ = {'polymorphic_identity': u'machine_recipe'}
guests = relationship(GuestRecipe, secondary=machine_guest_map,
- back_populates='hostrecipe')
+ back_populates='hostrecipe')
xml_element_name = 'recipe'
systemtype = 'Machine'
@@ -3074,17 +3136,18 @@ class MachineRecipe(Recipe):
data = super(MachineRecipe, self).to_json(**kwargs)
if include_guests:
data['guest_recipes'] = [g.to_json(include_hostrecipe=False,
- include_recipeset=False,
- include_tasks=kwargs.get('include_tasks', True))
- for g in self.guests]
+ include_recipeset=False,
+ include_tasks=kwargs.get('include_tasks', True))
+ for g in self.guests]
return data
def to_xml(self, clone=False, include_enclosing_job=True, **kwargs):
recipe = super(MachineRecipe, self).to_xml(clone=clone,
- include_enclosing_job=include_enclosing_job, **kwargs)
+ include_enclosing_job=include_enclosing_job,
+ **kwargs)
# insert <guestrecipe>s at the top above other child elements
recipe[0:0] = [guest.to_xml(clone, include_enclosing_job=False, **kwargs)
- for guest in self.guests]
+ for guest in self.guests]
return recipe
def check_virtualisability(self):
@@ -3140,8 +3203,8 @@ class MachineRecipe(Recipe):
recipes = cls.query
active_statuses = [s for s in TaskStatus if not s.finished]
query = (recipes.filter(cls.status.in_(active_statuses))
- .group_by(cls.status)
- .values(cls.status, func.count(cls.id)))
+ .group_by(cls.status)
+ .values(cls.status, func.count(cls.id)))
result = dict((status.name, 0) for status in active_statuses)
result.update((status.name, count) for status, count in query)
return result
@@ -3158,8 +3221,10 @@ class MachineRecipe(Recipe):
func.count(cls.id))
.filter(cls.status.in_(active_statuses))
.group_by(grouping, cls.status))
+
def init_group_stats():
return dict((status.name, 0) for status in active_statuses)
+
result = defaultdict(init_group_stats)
for group, status, count in query:
result[group][status.name] = count
@@ -3173,6 +3238,7 @@ class MachineRecipe(Recipe):
def t_id(self):
return 'R:%s' % self.id
+
t_id = property(t_id)
distro_requires = property(_get_distro_requires, _set_distro_requires)
@@ -3188,13 +3254,13 @@ class MachineRecipe(Recipe):
host_filter = XmlHost.from_string(self.host_requires)
if not host_filter.force:
systems = host_filter.apply_filter(systems). \
- filter(System.status == SystemStatus.automated)
+ filter(System.status == SystemStatus.automated)
systems = systems.filter(System.compatible_with_distro_tree(arch=self.installation.arch,
osmajor=self.installation.osmajor,
osminor=self.installation.osminor))
else:
systems = systems.filter(System.fqdn == host_filter.force). \
- filter(System.status != SystemStatus.removed)
+ filter(System.status != SystemStatus.removed)
# If it's a user-supplied distro, assume we can use any lab.
if self.distro_tree and only_in_lab:
systems = systems.filter(System.in_lab_with_distro_tree(self.distro_tree))
@@ -3202,10 +3268,10 @@ class MachineRecipe(Recipe):
return systems
@classmethod
- def hypothetical_candidate_systems(cls, user, distro_tree=None, lab_controller=None,
+ def hypothetical_candidate_systems(cls, user, distro_tree=None, lab_controller=None,
force=False):
"""
- If a recipe were constructed according to the given arguments, what
+ If a recipe were constructed according to the given arguments, what
would its candidate systems be?
"""
systems = System.all(user)
@@ -3232,33 +3298,35 @@ class MachineRecipe(Recipe):
def matching_systems(self):
"""
Returns a query of systems which are free to run this recipe *right now*.
- Note that this returns a filtered-down version of the candidate system
- list (recipe.systems) which is populated by beakerd from the
+ Note that this returns a filtered-down version of the candidate system
+ list (recipe.systems) which is populated by beakerd from the
.candidate_systems() query above.
"""
# The system must be in a lab where this recipe's distro tree is available.
if self.distro_tree:
- eligible_labcontrollers = set(lca.lab_controller for lca in self.distro_tree.lab_controller_assocs)
+ eligible_labcontrollers = set(
+ lca.lab_controller for lca in self.distro_tree.lab_controller_assocs)
else:
eligible_labcontrollers = set(LabController.query.filter_by(removed=None).all())
# This recipe's guest recipes' distro trees must also be in the lab.
for guestrecipe in self.guests:
if guestrecipe.distro_tree:
eligible_labcontrollers.intersection_update(lca.lab_controller
- for lca in guestrecipe.distro_tree.lab_controller_assocs)
+ for lca in
+ guestrecipe.distro_tree.lab_controller_assocs)
# Another recipe in the set might have locked us to a specific lab.
if self.recipeset.lab_controller:
eligible_labcontrollers.intersection_update([self.recipeset.lab_controller])
if not eligible_labcontrollers:
return None
- return self.dyn_systems\
- .join(System.lab_controller)\
- .outerjoin(System.cpu)\
- .filter(System.user == None)\
- .filter(System.scheduler_status == SystemSchedulerStatus.idle)\
- .filter(System.lab_controller_id.in_([lc.id for lc in eligible_labcontrollers]))\
- .filter(LabController.disabled == False)\
- .filter(or_(System.loaned == None, System.loaned == self.recipeset.job.owner))
+ return self.dyn_systems \
+ .join(System.lab_controller) \
+ .outerjoin(System.cpu) \
+ .filter(System.user == None) \
+ .filter(System.scheduler_status == SystemSchedulerStatus.idle) \
+ .filter(System.lab_controller_id.in_([lc.id for lc in eligible_labcontrollers])) \
+ .filter(LabController.disabled == False) \
+ .filter(or_(System.loaned == None, System.loaned == self.recipeset.job.owner))
@classmethod
def runnable_on_system(cls, system):
@@ -3266,41 +3334,43 @@ class MachineRecipe(Recipe):
Like .matching_systems() but from the other direction.
Returns a query of Recipes which are ready to be run on the given system.
"""
- recipes = system.dyn_queued_recipes\
- .join(Recipe.recipeset)\
- .join(RecipeSet.job)\
- .filter(not_(Job.is_deleted))\
- .filter(Recipe.status == TaskStatus.queued)
+ recipes = system.dyn_queued_recipes \
+ .join(Recipe.recipeset) \
+ .join(RecipeSet.job) \
+ .filter(not_(Job.is_deleted)) \
+ .filter(Recipe.status == TaskStatus.queued)
# The recipe set might be locked to a specific lab by an earlier recipe in the set.
recipes = recipes.filter(or_(
- RecipeSet.lab_controller == None,
- RecipeSet.lab_controller == system.lab_controller))
+ RecipeSet.lab_controller == None,
+ RecipeSet.lab_controller == system.lab_controller))
# The recipe's distro tree must be available in the same lab as the system.
recipes = recipes.filter(or_(
- Recipe.distro_tree_id == None,
- LabControllerDistroTree.query
- .filter(LabControllerDistroTree.lab_controller == system.lab_controller)
- .filter(LabControllerDistroTree.distro_tree_id == Recipe.distro_tree_id)
- .exists().correlate(Recipe)))
+ Recipe.distro_tree_id == None,
+ LabControllerDistroTree.query
+ .filter(LabControllerDistroTree.lab_controller == system.lab_controller)
+ .filter(LabControllerDistroTree.distro_tree_id == Recipe.distro_tree_id)
+ .exists().correlate(Recipe)))
# All of the recipe's guest recipe's distros must also be available in the lab.
# We have to use the outer-join-not-NULL trick because we want
# *all* guests, not *any* guest.
guestrecipe = aliased(Recipe, name='guestrecipe')
recipes = recipes.filter(not_(exists([1],
- from_obj=machine_guest_map
- .join(Recipe.__table__.alias('guestrecipe'))
- .join(DistroTree.__table__)
- .outerjoin(LabControllerDistroTree.__table__,
- and_(LabControllerDistroTree.distro_tree_id == DistroTree.id,
- LabControllerDistroTree.lab_controller == system.lab_controller)))
- .where(machine_guest_map.c.machine_recipe_id == Recipe.id)
- .where(LabControllerDistroTree.id == None)
- .correlate(Recipe)))
+ from_obj=machine_guest_map
+ .join(Recipe.__table__.alias('guestrecipe'))
+ .join(DistroTree.__table__)
+ .outerjoin(LabControllerDistroTree.__table__,
+ and_(
+ LabControllerDistroTree.distro_tree_id == DistroTree.id,
+ LabControllerDistroTree.lab_controller == system.lab_controller)))
+ .where(machine_guest_map.c.machine_recipe_id == Recipe.id)
+ .where(LabControllerDistroTree.id == None)
+ .correlate(Recipe)))
# If the system is loaned, it can only run recipes belonging to the loanee.
if system.loaned:
recipes = recipes.filter(Job.owner == system.loaned)
return recipes
+
class RecipeTag(DeclarativeMappedObject):
"""
Each recipe can be tagged with information that identifies what is being
@@ -3341,11 +3411,11 @@ class RecipeTask(TaskBase):
comments = relationship('RecipeTaskComment', back_populates='recipetask')
params = relationship('RecipeTaskParam')
logs = relationship(LogRecipeTask, back_populates='parent',
- cascade='all, delete-orphan')
+ cascade='all, delete-orphan')
watchdog = relationship(Watchdog, uselist=False)
- result_types = ['pass_','warn','fail','panic', 'result_none', 'skip']
- stop_types = ['stop','abort','cancel']
+ result_types = ['pass_', 'warn', 'fail', 'panic', 'result_none', 'skip']
+ stop_types = ['stop', 'abort', 'cancel']
def record_activity(self, **kwds):
"""
@@ -3363,7 +3433,7 @@ class RecipeTask(TaskBase):
@classmethod
def from_fetch_url(cls, url, subdir=None, name=None):
"""
- Constructs an external RecipeTask for the given fetch URL. If name is
+ Constructs an external RecipeTask for the given fetch URL. If name is
not given it defaults to the fetch URL combined with the subdir (if any).
"""
if name is None:
@@ -3389,25 +3459,16 @@ class RecipeTask(TaskBase):
return self.to_json()
def to_json(self, include_results=True):
- data = {
- 'id': self.id,
- 'name': self.name,
- 'version': self.version,
- 'status': unicode(self.status),
- 'recipe_id': self.recipe_id,
- 't_id': self.t_id,
+ data = self.minimal_json_content()
+ data.update({
'distro_tree': self.recipe.distro_tree,
'fetch_url': self.fetch_url,
'fetch_subdir': self.fetch_subdir,
'role': self.role,
- 'start_time': self.start_time,
- 'finish_time': self.finish_time,
- 'result': self.result,
'params': self.params,
- 'is_finished': self.is_finished(),
'logs': self.logs,
'comments': self.comments,
- }
+ })
if self.task:
data['task'] = {
'id': self.task.id,
@@ -3424,16 +3485,31 @@ class RecipeTask(TaskBase):
data['results'] = self.results
return data
+ def minimal_json_content(self):
+ return {
+ 'id': self.id,
+ 'name': self.name,
+ 'version': self.version,
+ 'status': unicode(self.status),
+ 'recipe_id': self.recipe_id,
+ 't_id': self.t_id,
+ 'is_finished': self.is_finished(),
+ 'start_time': self.start_time,
+ 'finish_time': self.finish_time,
+ 'result': self.result,
+ }
+
def filepath(self):
"""
Return file path for this task
"""
- job = self.recipe.recipeset.job
+ job = self.recipe.recipeset.job
recipe = self.recipe
return "%s/%02d/%s/%s/%s/%s" % (recipe.recipeset.queue_time.year,
- recipe.recipeset.queue_time.month,
- job.id // Log.MAX_ENTRIES_PER_DIRECTORY, job.id,
- recipe.id, self.id)
+ recipe.recipeset.queue_time.month,
+ job.id // Log.MAX_ENTRIES_PER_DIRECTORY, job.id,
+ recipe.id, self.id)
+
filepath = property(filepath)
def to_xml(self, clone=False, include_logs=True, **kwargs):
@@ -3489,30 +3565,31 @@ class RecipeTask(TaskBase):
def _get_duration(self):
duration = None
if self.finish_time and self.start_time:
- duration = self.finish_time - self.start_time
+ duration = self.finish_time - self.start_time
elif self.start_time and self.watchdog and self.watchdog.kill_time:
delta = self.watchdog.kill_time - datetime.utcnow().replace(microsecond=0)
duration = 'Time Remaining %s' % delta
return duration
+
duration = property(_get_duration)
def link_id(self):
""" Return a link to this Executed Recipe->Task
"""
- return make_link(url = '/recipes/%s#task%s' % (self.recipe.id, self.id),
- text = 'T:%s' % self.id)
+ return make_link(url='/recipes/%s#task%s' % (self.recipe.id, self.id),
+ text='T:%s' % self.id)
link_id = property(link_id)
@property
def name_markup(self):
"""
- Returns HTML markup (in the form of a kid.Element) displaying the name.
+ Returns HTML markup (in the form of a kid.Element) displaying the name.
The name is linked to the task library when applicable.
"""
if self.task:
- return make_link(url = '/tasks/%s' % self.task.id,
- text = self.name)
+ return make_link(url='/tasks/%s' % self.task.id,
+ text=self.name)
else:
span = Element('span')
span.text = self.name
@@ -3522,15 +3599,15 @@ class RecipeTask(TaskBase):
"""
Returns an iterator all logs in this task.
"""
- recipe_task_logs = LogRecipeTask.query\
- .filter(LogRecipeTask.recipe_task_id == self.id)
- recipe_task_result_logs = LogRecipeTaskResult.query\
- .join(LogRecipeTaskResult.parent)\
- .join(RecipeTaskResult.recipetask)\
- .filter(RecipeTask.id == self.id)
+ recipe_task_logs = LogRecipeTask.query \
+ .filter(LogRecipeTask.recipe_task_id == self.id)
+ recipe_task_result_logs = LogRecipeTaskResult.query \
+ .join(LogRecipeTaskResult.parent) \
+ .join(RecipeTaskResult.recipetask) \
+ .filter(RecipeTask.id == self.id)
if load_parent:
- recipe_task_result_logs = recipe_task_result_logs\
- .options(contains_eager(LogRecipeTaskResult.parent))
+ recipe_task_result_logs = recipe_task_result_logs \
+ .options(contains_eager(LogRecipeTaskResult.parent))
return chain(recipe_task_logs, recipe_task_result_logs)
@property
@@ -3541,14 +3618,16 @@ class RecipeTask(TaskBase):
"""
Update number of passes, failures, warns, panics..
"""
- # The self.result == TaskResult.new condition is just an optimisation
+ # The self.result == TaskResult.new condition is just an optimisation
# to avoid constantly recomputing the result after the task is finished
+
if self.is_finished() and self.result == TaskResult.new:
max_result = TaskResult.min()
for result in self.results:
if result.result.severity > max_result.severity:
max_result = result.result
self.result = max_result
+ send_scheduler_update(self)
def start(self, watchdog_override=None):
"""
@@ -3571,7 +3650,7 @@ class RecipeTask(TaskBase):
task_time = self.task.avg_time if self.task else 0
# add in 30 minutes at a minimum
self.recipe.watchdog.kill_time = (datetime.utcnow() +
- timedelta(seconds=(task_time + 1800)))
+ timedelta(seconds=(task_time + 1800)))
self.recipe.recipeset.job._mark_dirty()
return True
@@ -3605,6 +3684,7 @@ class RecipeTask(TaskBase):
def owner(self):
return self.recipe.recipeset.job.owner
+
owner = property(owner)
def cancel(self, msg=None):
@@ -3630,10 +3710,10 @@ class RecipeTask(TaskBase):
self.finish_time = datetime.utcnow()
self._change_status(status)
self.results.append(RecipeTaskResult(
- path=u'/',
- result=TaskResult.warn,
- score=0,
- log=msg))
+ path=u'/',
+ result=TaskResult.warn,
+ score=0,
+ log=msg))
def pass_(self, path, score, summary):
"""
@@ -3685,10 +3765,10 @@ class RecipeTask(TaskBase):
score = round(decimal.Decimal(number_match.group()))
score = min(score, RecipeTaskResult.max_score)
recipeTaskResult = RecipeTaskResult(recipetask=self,
- path=path,
- result=result,
- score=score,
- log=summary)
+ path=path,
+ result=result,
+ score=score,
+ log=summary)
# Flush the result to the DB so we can return the id.
session.add(recipeTaskResult)
session.flush()
@@ -3707,15 +3787,15 @@ class RecipeTask(TaskBase):
else:
worker = None
return dict(
- id = "T:%s" % self.id,
- worker = worker,
- state_label = "%s" % self.status,
- state = self.status.value,
- method = "%s" % self.name,
- result = "%s" % self.result,
- is_finished = self.is_finished(),
- is_failed = self.is_failed(),
- )
+ id="T:%s" % self.id,
+ worker=worker,
+ state_label="%s" % self.status,
+ state=self.status.value,
+ method="%s" % self.name,
+ result="%s" % self.result,
+ is_finished=self.is_finished(),
+ is_failed=self.is_failed(),
+ )
def no_value(self):
return None
@@ -3724,9 +3804,9 @@ class RecipeTask(TaskBase):
def peer_roles(self):
"""
- Returns dict of (role -> recipetasks) for all "peer" RecipeTasks,
- *including this RecipeTask*. A peer RecipeTask is one which appears at
- the same position in another recipe from the same recipe set as this
+ Returns dict of (role -> recipetasks) for all "peer" RecipeTasks,
+ *including this RecipeTask*. A peer RecipeTask is one which appears at
+ the same position in another recipe from the same recipe set as this
recipe.
"""
result = {}
@@ -3750,6 +3830,7 @@ class RecipeTask(TaskBase):
"""Returns True iff the given user can comment on this recipe task."""
return self.recipe.can_comment(user)
+
Index('ix_recipe_task_name_version', RecipeTask.name, RecipeTask.version)
@@ -3858,7 +3939,7 @@ class RecipeTaskRpm(DeclarativeMappedObject):
__tablename__ = 'recipe_task_rpm'
__table_args__ = {'mysql_engine': 'InnoDB'}
recipe_task_id = Column(Integer, ForeignKey('recipe_task.id'),
- primary_key=True)
+ primary_key=True)
package = Column(Unicode(255))
version = Column(Unicode(255))
release = Column(Unicode(255))
@@ -3883,14 +3964,14 @@ class RecipeTaskResult(TaskBase):
log = Column(UnicodeText)
start_time = Column(DateTime, default=datetime.utcnow)
logs = relationship(LogRecipeTaskResult, back_populates='parent',
- cascade='all, delete-orphan')
+ cascade='all, delete-orphan')
comments = relationship('RecipeTaskResultComment', back_populates='recipetaskresult')
#: Maximum allowable value for the score column.
max_score = 10 ** score.type.precision - 1
def __init__(self, recipetask=None, path=None, result=None,
- score=None, log=None):
+ score=None, log=None):
super(RecipeTaskResult, self).__init__()
self.recipetask = recipetask
self.path = path
@@ -3902,13 +3983,14 @@ class RecipeTaskResult(TaskBase):
"""
Return file path for this result
"""
- job = self.recipetask.recipe.recipeset.job
+ job = self.recipetask.recipe.recipeset.job
recipe = self.recipetask.recipe
- task_id = self.recipetask.id
+ task_id = self.recipetask.id
return "%s/%02d/%s/%s/%s/%s/%s" % (recipe.recipeset.queue_time.year,
- recipe.recipeset.queue_time.month,
- job.id // Log.MAX_ENTRIES_PER_DIRECTORY, job.id,
- recipe.id, task_id, self.id)
+ recipe.recipeset.queue_time.month,
+ job.id // Log.MAX_ENTRIES_PER_DIRECTORY, job.id,
+ recipe.id, task_id, self.id)
+
filepath = property(filepath)
def to_xml(self, include_logs=True, **kwargs):
@@ -3940,28 +4022,29 @@ class RecipeTaskResult(TaskBase):
Method for exporting RecipeTaskResult status for TaskWatcher
"""
return dict(
- id = "TR:%s" % self.id,
- worker = dict(name = "%s" % None),
- state_label = "%s" % self.result,
- state = self.result.value,
- method = "%s" % self.path,
- result = "%s" % self.result,
- is_finished = True,
- is_failed = False
- )
+ id="TR:%s" % self.id,
+ worker=dict(name="%s" % None),
+ state_label="%s" % self.result,
+ state=self.result.value,
+ method="%s" % self.path,
+ result="%s" % self.result,
+ is_finished=True,
+ is_failed=False
+ )
def t_id(self):
return "TR:%s" % self.id
+
t_id = property(t_id)
@property
def duration(self):
"""
- Task results happen at a fixed point in time so they don't really have
- a duration. This property is really the time since the previous result,
- or else the time since the start of the task if this is the first
+ Task results happen at a fixed point in time so they don't really have
+ a duration. This property is really the time since the previous result,
+ or else the time since the start of the task if this is the first
result.
- In a sense this is the duration of the stuff that happened in order to
+ In a sense this is the duration of the stuff that happened in order to
produce this result.
The duration can be None if the task was never started.
"""
@@ -3993,7 +4076,7 @@ class RecipeTaskResult(TaskBase):
@property
def display_label(self):
"""
- Human-friendly label for the result, when shown alongside its parent
+ Human-friendly label for the result, when shown alongside its parent
task. The conventions here are basically a historical RHTSism.
"""
if not self.path or self.path == '/':
@@ -4032,9 +4115,9 @@ class RecipeResource(DeclarativeMappedObject):
__table_args__ = {'mysql_engine': 'InnoDB'}
id = Column(Integer, autoincrement=True, primary_key=True)
recipe_id = Column(Integer, ForeignKey('recipe.id',
- name='recipe_resource_recipe_id_fk',
- onupdate='CASCADE', ondelete='CASCADE'),
- nullable=False, unique=True)
+ name='recipe_resource_recipe_id_fk',
+ onupdate='CASCADE', ondelete='CASCADE'),
+ nullable=False, unique=True)
recipe = relationship(Recipe, back_populates='resource')
type = Column(ResourceType.db_type(), nullable=False)
fqdn = Column(Unicode(255), default=None, index=True)
@@ -4057,28 +4140,29 @@ class RecipeResource(DeclarativeMappedObject):
base_addr = netaddr.EUI(get('beaker.base_mac_addr', '52:54:00:00:00:00'))
session.flush()
# This subquery gives all MAC addresses in use right now
- guest_mac_query = session.query(GuestResource.mac_address.label('mac_address'))\
- .filter(GuestResource.mac_address != None)\
- .join(RecipeResource.recipe).join(Recipe.recipeset)\
- .filter(not_(RecipeSet.status.in_([s for s in TaskStatus if s.finished])))
+ guest_mac_query = session.query(GuestResource.mac_address.label('mac_address')) \
+ .filter(GuestResource.mac_address != None) \
+ .join(RecipeResource.recipe).join(Recipe.recipeset) \
+ .filter(not_(RecipeSet.status.in_([s for s in TaskStatus if s.finished])))
# This trickery finds "gaps" of unused MAC addresses by filtering for MAC
# addresses where address + 1 is not in use.
# We union with base address - 1 to find any gap at the start.
# Note that this relies on the MACAddress type being represented as
# BIGINT in the database, which lets us do arithmetic on it.
left_side = union(guest_mac_query,
- select([int(base_addr) - 1])).alias('left_side')
+ select([int(base_addr) - 1])).alias('left_side')
right_side = guest_mac_query.subquery()
free_addr = session.scalar(select([left_side.c.mac_address + 1],
- from_obj=left_side.outerjoin(right_side,
- onclause=left_side.c.mac_address + 1 == right_side.c.mac_address))\
- .where(right_side.c.mac_address == None)\
- .where(left_side.c.mac_address + 1 >= int(base_addr))\
- .order_by(left_side.c.mac_address).limit(1))
+ from_obj=left_side.outerjoin(right_side,
+ onclause=left_side.c.mac_address + 1 == right_side.c.mac_address)) \
+ .where(right_side.c.mac_address == None) \
+ .where(left_side.c.mac_address + 1 >= int(base_addr)) \
+ .order_by(left_side.c.mac_address).limit(1))
# The type of (left_side.c.mac_address + 1) comes out as Integer
# instead of MACAddress, I think it's a sqlalchemy bug :-(
return netaddr.EUI(free_addr, dialect=mac_unix_padded_dialect)
+
class SystemResource(RecipeResource):
"""
For a recipe which is running on a Beaker system.
@@ -4087,12 +4171,12 @@ class SystemResource(RecipeResource):
__tablename__ = 'system_resource'
__table_args__ = {'mysql_engine': 'InnoDB'}
id = Column(Integer, ForeignKey('recipe_resource.id',
- name='system_resource_id_fk'), primary_key=True)
+ name='system_resource_id_fk'), primary_key=True)
system_id = Column(Integer, ForeignKey('system.id',
- name='system_resource_system_id_fk'), nullable=False)
+ name='system_resource_system_id_fk'), nullable=False)
system = relationship(System)
reservation_id = Column(Integer, ForeignKey('reservation.id',
- name='system_resource_reservation_id_fk'))
+ name='system_resource_reservation_id_fk'))
reservation = relationship(Reservation)
__mapper_args__ = {'polymorphic_identity': ResourceType.system}
@@ -4103,8 +4187,8 @@ class SystemResource(RecipeResource):
def __repr__(self):
return '%s(fqdn=%r, system=%r, reservation=%r)' % (
- self.__class__.__name__, self.fqdn, self.system,
- self.reservation)
+ self.__class__.__name__, self.fqdn, self.system,
+ self.reservation)
def __json__(self):
data = super(SystemResource, self).__json__()
@@ -4128,21 +4212,21 @@ class SystemResource(RecipeResource):
def allocate(self):
log.debug('Reserving system %s for recipe %s', self.system, self.recipe.id)
self.reservation = self.system.reserve_for_recipe(
- service=u'Scheduler',
- user=self.recipe.recipeset.job.owner)
+ service=u'Scheduler',
+ user=self.recipe.recipeset.job.owner)
def release(self):
- # Note that this may be called *many* times for a recipe, even when it
- # has already been cleaned up, so we have to handle that gracefully
+ # Note that this may be called *many* times for a recipe, even when it
+ # has already been cleaned up, so we have to handle that gracefully
# (and cheaply!)
# system_resource rows for very old recipes may have no reservation
if not self.reservation or self.reservation.finish_time:
return
log.debug('Releasing system %s for recipe %s',
- self.system, self.recipe.id)
+ self.system, self.recipe.id)
self.system.unreserve(service=u'Scheduler',
- reservation=self.reservation,
- user=self.recipe.recipeset.job.owner)
+ reservation=self.reservation,
+ user=self.recipe.recipeset.job.owner)
class VirtResource(RecipeResource):
@@ -4153,9 +4237,9 @@ class VirtResource(RecipeResource):
__tablename__ = 'virt_resource'
__table_args__ = {'mysql_engine': 'InnoDB'}
id = Column(Integer, ForeignKey('recipe_resource.id',
- name='virt_resource_id_fk'), primary_key=True)
- # OpenStack treats these ids as opaque strings, but we rely on them being
- # 128-bit numbers because we use the SMBIOS UUID field in our iPXE hackery.
+ name='virt_resource_id_fk'), primary_key=True)
+ # OpenStack treats these ids as opaque strings, but we rely on them being
+ # 128-bit numbers because we use the SMBIOS UUID field in our iPXE hackery.
# So we store it as an actual UUID, not an opaque string.
instance_id = Column(UUID, nullable=False)
network_id = Column(UUID, nullable=True)
@@ -4165,7 +4249,7 @@ class VirtResource(RecipeResource):
instance_created = Column(DateTime, nullable=True, default=None)
instance_deleted = Column(DateTime, nullable=True, default=None)
lab_controller_id = Column(Integer, ForeignKey('lab_controller.id',
- name='virt_resource_lab_controller_id_fk'))
+ name='virt_resource_lab_controller_id_fk'))
lab_controller = relationship(LabController)
__mapper_args__ = {'polymorphic_identity': ResourceType.virt}
@@ -4176,7 +4260,7 @@ class VirtResource(RecipeResource):
return cls.query.filter(cls.instance_id == instance_id).one()
def __init__(self, instance_id, network_id, subnet_id, router_id, floating_ip,
- lab_controller):
+ lab_controller):
super(VirtResource, self).__init__()
if isinstance(instance_id, basestring):
instance_id = uuid.UUID(instance_id)
@@ -4219,8 +4303,8 @@ class VirtResource(RecipeResource):
def __repr__(self):
return '%s(fqdn=%r, instance_id=%r, lab_controller=%r)' % (
- self.__class__.__name__, self.fqdn, self.instance_id,
- self.lab_controller)
+ self.__class__.__name__, self.fqdn, self.instance_id,
+ self.lab_controller)
def __json__(self):
data = super(VirtResource, self).__json__()
@@ -4258,20 +4342,21 @@ class VirtResource(RecipeResource):
if self.recipe.is_finished():
return None
return urlparse.urljoin(get('openstack.dashboard_url'),
- 'project/instances/%s/' % self.instance_id)
+ 'project/instances/%s/' % self.instance_id)
def install_options(self, arch, osmajor, osminor):
- yield InstallOptions.from_strings('hwclock_is_utc', u'console=tty0 console=ttyS0,115200n8', '')
+ yield InstallOptions.from_strings('hwclock_is_utc', u'console=tty0 console=ttyS0,115200n8',
+ '')
def release(self):
- # Note that this may be called *many* times for a recipe, even when it
- # has already been cleaned up, so we have to handle that gracefully
+ # Note that this may be called *many* times for a recipe, even when it
+ # has already been cleaned up, so we have to handle that gracefully
# (and cheaply!)
if self.instance_deleted:
return
try:
log.debug('Releasing vm %s for recipe %s',
- self.instance_id, self.recipe.id)
+ self.instance_id, self.recipe.id)
# Delayed import to avoid circular dependency
from bkr.server import dynamic_virt
manager = dynamic_virt.VirtManager(self.recipe.recipeset.job.owner)
@@ -4279,30 +4364,30 @@ class VirtResource(RecipeResource):
self.instance_deleted = datetime.utcnow()
except Exception:
log.exception('Failed to destroy vm %s, leaked!',
- self.instance_id)
+ self.instance_id)
# suppress exception, nothing more we can do now
class GuestResource(RecipeResource):
"""
- For a GuestRecipe which is running on a guest associated with a parent
+ For a GuestRecipe which is running on a guest associated with a parent
MachineRecipe.
"""
__tablename__ = 'guest_resource'
__table_args__ = {'mysql_engine': 'InnoDB'}
id = Column(Integer, ForeignKey('recipe_resource.id',
- name='guest_resource_id_fk'), primary_key=True)
+ name='guest_resource_id_fk'), primary_key=True)
mac_address = Column(MACAddress(), index=True, default=None)
__mapper_args__ = {'polymorphic_identity': ResourceType.guest}
def __repr__(self):
return '%s(fqdn=%r, mac_address=%r)' % (self.__class__.__name__,
- self.fqdn, self.mac_address)
+ self.fqdn, self.mac_address)
@property
def link(self):
- return self.fqdn # just text, not a link
+ return self.fqdn # just text, not a link
def install_options(self, arch, osmajor, osminor):
ks_meta = {
@@ -4315,7 +4400,7 @@ class GuestResource(RecipeResource):
log.debug('Allocated MAC address %s for recipe %s', self.mac_address, self.recipe.id)
def release(self):
- # Note that this may be called *many* times for a recipe, even when it
- # has already been cleaned up, so we have to handle that gracefully
+ # Note that this may be called *many* times for a recipe, even when it
+ # has already been cleaned up, so we have to handle that gracefully
# (and cheaply!)
pass
diff --git a/Server/dev.cfg b/Server/dev.cfg
index f42e0fd..3589b14 100644
--- a/Server/dev.cfg
+++ b/Server/dev.cfg
@@ -1,7 +1,7 @@
[global]
# This is where all of your settings go for your development environment
# Settings that are the same for both development and production
-# (such as template engine, encodings, etc.) all go in
+# (such as template engine, encodings, etc.) all go in
# beaker/server/config/app.cfg
# DATABASE
@@ -63,3 +63,10 @@ basepath.logs = './test-server-joblogs'
basepath.harness = './test-harnessdir'
assets.debug = True
assets.auto_build = True
+
+# AMQ messaging
+#amq.url = 'amqps://example.domain.com'
+#amq.cert = '/etc/beaker/msg-beaker.crt'
+#amq.key = '/etc/beaker/msg-beaker.key'
+#amq.cacerts = '/etc/pki/tls/certs/ca-bundle.crt'
+#amq.topic_prefix = 'VirtualTopic.eng.beaker' \ No newline at end of file
diff --git a/Server/server.cfg b/Server/server.cfg
index 3c02e9c..d999049 100644
--- a/Server/server.cfg
+++ b/Server/server.cfg
@@ -176,3 +176,11 @@ sqlalchemy.pool_recycle = 3600
# If you have special systems which do not support any of RHEL, CentOS, or Fedora
# then you may need to extend the default list.
#beaker.inventory_osmajors = ['RedHatEnterpriseLinux7', ...]
+
+# AMQ messaging
+# If amq attributes are set, Beaker will send updates via AMQ messages
+#amq.url = amqps://broker01.example.com
+#amq.cert = /etc/beaker/cert.pem
+#amq.key = /etc/beaker/key.pem
+#amq.cacerts = /etc/pki/tls/certs/ca-bundle.crt
+#amq.topic_prefix = VirtualTopic.eng.beaker \ No newline at end of file
diff --git a/beaker.spec b/beaker.spec
index 0d96a6d..21e4206 100644
--- a/beaker.spec
+++ b/beaker.spec
@@ -202,6 +202,7 @@ BuildRequires: python2-passlib
BuildRequires: python2-alembic
BuildRequires: python2-daemon
BuildRequires: python2-futures
+BuildRequires: python2-qpid-proton >= 0.13.0
Requires: TurboGears
Requires: python2-turbojson
Requires: python2-sqlalchemy
@@ -226,6 +227,7 @@ Requires: python2-webassets
Requires: python2-passlib
Requires: python2-alembic
Requires: python2-futures
+Requires: python2-qpid-proton >= 0.13.0
%else # old style Python package names
BuildRequires: python-requests
BuildRequires: TurboGears >= 1.1.3
@@ -247,6 +249,7 @@ BuildRequires: python-passlib
BuildRequires: python-alembic
BuildRequires: python-daemon
BuildRequires: python-futures
+BuildRequires: python-qpid-proton >= 0.13.0
Requires: TurboGears >= 1.1.3
Requires: python-turbojson
Requires: python-sqlalchemy >= 0.9
@@ -271,6 +274,7 @@ Requires: python-webassets
Requires: python-passlib
Requires: python-alembic
Requires: python-futures
+Requires: python-qpid-proton >= 0.13.0
%endif
%if %{with systemd}
BuildRequires: systemd