From a6b215cec9618a927d0e3675cae2b6142c837cba Mon Sep 17 00:00:00 2001 From: volpino Date: Sat, 7 Jul 2012 15:13:59 +0200 Subject: [PATCH] euscanwww: Fixed tasks to work with processing.scan and processing.misc Fixed tasks to work with processing.* changes. Moved periodic tasks config to settings.py using CELERYBEAT_SCHEDULE Signed-off-by: volpino --- euscanwww/README.Celery | 6 +-- euscanwww/djeuscan/processing/__init__.py | 2 - .../djeuscan/processing/misc/regen_rrds.py | 1 + .../processing/misc/update_portage_trees.py | 10 +++-- .../djeuscan/processing/scan/scan_metadata.py | 12 +++--- .../djeuscan/processing/scan/scan_portage.py | 25 +++++++----- .../djeuscan/processing/scan/scan_upstream.py | 8 ++-- euscanwww/djeuscan/tasks.py | 39 ++++++------------- euscanwww/euscanwww/settings.py | 17 ++++++++ euscanwww/scripts/start_dev_services.sh | 3 +- 10 files changed, 65 insertions(+), 58 deletions(-) diff --git a/euscanwww/README.Celery b/euscanwww/README.Celery index 5159e32..f2a5149 100644 --- a/euscanwww/README.Celery +++ b/euscanwww/README.Celery @@ -23,16 +23,12 @@ Running Celery You'll need: * celeryd (celery daemon for running tasks):: - python manage.py celeryd -E -l INFO + python manage.py celeryd -B -E -l INFO * celerycam (for monitoring celery and see the results in the django admin page):: python manage.py celerycam -* celerybeat (for running periodic tasks):: - - python manage.py celerybeat -l INFO - TODO ==== diff --git a/euscanwww/djeuscan/processing/__init__.py b/euscanwww/djeuscan/processing/__init__.py index 97bed46..f1a00c6 100644 --- a/euscanwww/djeuscan/processing/__init__.py +++ b/euscanwww/djeuscan/processing/__init__.py @@ -1,4 +1,3 @@ - class FakeLogger(object): def __getattr__(self, key): return lambda *x, **y: None @@ -29,4 +28,3 @@ def set_verbosity_level(logger, verbosity): logger.setLevel(levels[verbosity]) return logger - diff --git a/euscanwww/djeuscan/processing/misc/regen_rrds.py b/euscanwww/djeuscan/processing/misc/regen_rrds.py index da548a1..f578aa7 100644 --- a/euscanwww/djeuscan/processing/misc/regen_rrds.py +++ b/euscanwww/djeuscan/processing/misc/regen_rrds.py @@ -3,6 +3,7 @@ from djeuscan import charts from djeuscan.processing import FakeLogger + def regen_rrds(logger=None): """ Regenerates the rrd database diff --git a/euscanwww/djeuscan/processing/misc/update_portage_trees.py b/euscanwww/djeuscan/processing/misc/update_portage_trees.py index e8d07c3..46eb9c6 100644 --- a/euscanwww/djeuscan/processing/misc/update_portage_trees.py +++ b/euscanwww/djeuscan/processing/misc/update_portage_trees.py @@ -2,6 +2,7 @@ import os from django.conf import settings + def _launch_command(cmd, logger=None): """ Helper for launching shell commands inside tasks @@ -12,7 +13,7 @@ def _launch_command(cmd, logger=None): fp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - mask = select.EPOLLIN|select.EPOLLHUP|select.EPOLLERR + mask = select.EPOLLIN | select.EPOLLHUP | select.EPOLLERR epoll = select.epoll() epoll.register(fp.stdout.fileno(), mask) @@ -36,13 +37,14 @@ def _launch_command(cmd, logger=None): source, out = fp.stderr, error line = source.readline().rstrip('\n') out("%s[%s]: %s" % (cmd[0], fp.pid, line)) - elif event & (select.EPOLLERR|select.EPOLLHUP): + elif event & (select.EPOLLERR | select.EPOLLHUP): exited = True finally: epoll.close() fp.wait() + def emerge_sync(logger): """ Launches an emerge --sync @@ -81,10 +83,12 @@ def layman_sync(logger, cache=True): for overlay in installed_overlays: logger.info('Generating cache for overlay %s...' % overlay) overlay_path = os.path.join(l.config['storage'], overlay) - if not os.path.exists(os.path.join(overlay_path, 'profiles/repo_name')): + repo_path = os.path.join(overlay_path, 'profiles/repo_name') + if not os.path.exists(repo_path): continue _launch_command(cmd + ['--repo', overlay], logger) + def eix_update(logger): """ Launches eix-update diff --git a/euscanwww/djeuscan/processing/scan/scan_metadata.py b/euscanwww/djeuscan/processing/scan/scan_metadata.py index 263f4c0..184408d 100644 --- a/euscanwww/djeuscan/processing/scan/scan_metadata.py +++ b/euscanwww/djeuscan/processing/scan/scan_metadata.py @@ -1,7 +1,4 @@ -import os.path - from gentoolkit.query import Query -from gentoolkit.errors import GentoolkitFatalError from django.db.transaction import commit_on_success from django.core.management.color import color_style @@ -10,6 +7,7 @@ from django.core.exceptions import ValidationError from djeuscan.models import Package, Herd, Maintainer from djeuscan.processing import FakeLogger + class ScanMetadata(object): def __init__(self, logger=None): self.style = color_style() @@ -61,8 +59,10 @@ class ScanMetadata(object): old_herds = set(existing_herds).difference(herds.keys()) existing_maintainers = [m.email for m in obj.maintainers.all()] - new_maintainers = set(maintainers.keys()).difference(existing_maintainers) - old_maintainers = set(existing_maintainers).difference(maintainers.keys()) + new_maintainers = set(maintainers.keys()).\ + difference(existing_maintainers) + old_maintainers = set(existing_maintainers).\ + difference(maintainers.keys()) for herd in obj.herds.all(): if herd.herd in old_herds: @@ -125,6 +125,7 @@ class ScanMetadata(object): ) return maintainer + @commit_on_success def scan_metadata(packages=None, category=None, logger=None): scan_handler = ScanMetadata(logger=logger) @@ -139,4 +140,3 @@ def scan_metadata(packages=None, category=None, logger=None): scan_handler.scan('%s/%s' % (pkg.category, pkg.name), pkg) else: scan_handler.scan(pkg) - diff --git a/euscanwww/djeuscan/processing/scan/scan_portage.py b/euscanwww/djeuscan/processing/scan/scan_portage.py index ac59bc9..760c260 100644 --- a/euscanwww/djeuscan/processing/scan/scan_portage.py +++ b/euscanwww/djeuscan/processing/scan/scan_portage.py @@ -83,7 +83,9 @@ class ScanPortage(object): package['homepage'] = ' '.join(p.homepages) package['description'] = p.description package['versions'] = [] - package['versions'].append((p._cpv, p.slot, p.repository or 'gentoo')) + package['versions'].append( + (p._cpv, p.slot, p.repository or 'gentoo') + ) if package_name: yield package @@ -109,7 +111,7 @@ class ScanPortage(object): self.logger.error(self.style.ERROR(msg)) return - package = {'versions' : []} + package = {'versions': []} category = "" for event, elem in parser: @@ -123,8 +125,11 @@ class ScanPortage(object): package[elem.tag] = elem.text or "" elif elem.tag == "version": # append version data to versions - cpv = "%s/%s-%s" % \ - (package["category"], package["package"], elem.attrib["id"]) + cpv = "%s/%s-%s" % ( + package["category"], + package["package"], + elem.attrib["id"] + ) slot = elem.attrib.get("slot", "0") overlay = elem.attrib.get("repository", "gentoo") package["versions"].append((cpv, slot, overlay)) @@ -133,7 +138,7 @@ class ScanPortage(object): if elem.tag == "package": # clean old data yield package - package = {"versions" : []} + package = {"versions": []} if elem.tag == "category": # clean old data @@ -177,7 +182,9 @@ class ScanPortage(object): for data in self.scan_eix_xml(query, category): #for data in self.scan_gentoopm(query, category): cat, pkg = data['category'], data['package'] - package = self.store_package(cat, pkg, data['homepage'], data['description']) + package = self.store_package( + cat, pkg, data['homepage'], data['description'] + ) packages_alive.add("%s/%s" % (cat, pkg)) for cpv, slot, overlay in data['versions']: self.store_version(package, cpv, slot, overlay) @@ -300,8 +307,9 @@ class ScanPortage(object): @commit_on_success -def scan_portage(packages=None, category=None, no_log=False, purge_packages=False, - purge_versions=False, prefetch=False, logger=None): +def scan_portage(packages=None, category=None, no_log=False, + purge_packages=False, purge_versions=False, prefetch=False, + logger=None): logger = logger or FakeLogger() @@ -335,4 +343,3 @@ def scan_portage(packages=None, category=None, no_log=False, purge_packages=Fals scan_handler.scan(pkg) logger.info('Done.') - diff --git a/euscanwww/djeuscan/processing/scan/scan_upstream.py b/euscanwww/djeuscan/processing/scan/scan_upstream.py index d79a967..e06b5d7 100644 --- a/euscanwww/djeuscan/processing/scan/scan_upstream.py +++ b/euscanwww/djeuscan/processing/scan/scan_upstream.py @@ -69,7 +69,9 @@ class ScanUpstream(object): # Set all versions dead, then set found versions alive and # delete old versions if self.purge_versions: - Version.objects.filter(package=obj, packaged=False).update(alive=False) + Version.objects.filter( + package=obj, packaged=False + ).update(alive=False) return obj @@ -109,7 +111,6 @@ class ScanUpstream(object): package.n_versions += 1 package.save() - def purge_old_versions(self): if not self.purge_versions: return @@ -132,6 +133,7 @@ class ScanUpstream(object): versions.delete() + @commit_on_success def scan_upstream(packages=None, purge_versions=False, logger=None): @@ -144,8 +146,6 @@ def scan_upstream(packages=None, purge_versions=False, if not packages: packages = Package.objects.all() - result = True - for pkg in packages: if isinstance(pkg, Package): scan_handler.scan('%s/%s' % (pkg.category, pkg.name)) diff --git a/euscanwww/djeuscan/tasks.py b/euscanwww/djeuscan/tasks.py index 65269aa..c47fbbf 100644 --- a/euscanwww/djeuscan/tasks.py +++ b/euscanwww/djeuscan/tasks.py @@ -4,19 +4,15 @@ Celery tasks for djeuscan from itertools import islice -from celery.task import task, periodic_task -from celery.task.schedules import crontab +from celery.task import task from celery.task.sets import TaskSet from django.conf import settings from djeuscan.models import Package, RefreshPackageQuery -from djeuscan.processing.regen_rrds import regen_rrds -from djeuscan.processing.update_counters import update_counters -from djeuscan.processing.scan_metadata import scan_metadata -from djeuscan.processing.scan_portage import scan_portage -from djeuscan.processing.scan_upstream import scan_upstream -from djeuscan.processing.update_portage_trees import update_portage_trees +from djeuscan.processing.misc import regen_rrds, update_counters, \ + update_portage_trees +from djeuscan.processing.scan import scan_metadata, scan_portage, scan_upstream class TaskFailedException(Exception): @@ -87,13 +83,10 @@ def _scan_metadata_task(packages): logger.info("Starting metadata scanning subtask for %d packages...", len(packages)) - result = scan_metadata( + scan_metadata( packages=packages, logger=logger, ) - if not result: - raise TaskFailedException - return result @task @@ -126,7 +119,7 @@ def _scan_portage_task(packages, no_log=False, purge_packages=False, else: logger.info("Starting portage scanning for all packages...") - result = scan_portage( + scan_portage( packages=packages, no_log=no_log, purge_packages=purge_packages, @@ -134,9 +127,6 @@ def _scan_portage_task(packages, no_log=False, purge_packages=False, prefetch=prefetch, logger=logger, ) - if not result: - raise TaskFailedException - return result @task @@ -182,8 +172,9 @@ def _scan_upstream_task(packages, purge_versions=False): purge_versions=purge_versions, logger=logger, ) - if not result: - raise TaskFailedException + # TODO: implement some kind of error raising in case of failure + #if not result: + # raise TaskFailedException return result @@ -258,7 +249,9 @@ def scan_package_task(package): _scan_upstream_task([package]) -@periodic_task(run_every=crontab(minute="*/1")) +# Periodic tasks + +@task def consume_refresh_package_request(): """ Satisfies user requests for package refreshing, runs every minute @@ -273,14 +266,6 @@ def consume_refresh_package_request(): return result -@periodic_task(run_every=crontab(hour=03, minute=00, day_of_week=1)) -def update_periodic_task(): - """ - Runs a whole update once a week - """ - update_task() - - admin_tasks = [ regen_rrds_task, update_counters_task, diff --git a/euscanwww/euscanwww/settings.py b/euscanwww/euscanwww/settings.py index 1cb50f3..c37c174 100644 --- a/euscanwww/euscanwww/settings.py +++ b/euscanwww/euscanwww/settings.py @@ -1,6 +1,7 @@ # Django settings for euscanwww project. import os.path +from datetime import timedelta DEBUG = True TEMPLATE_DEBUG = DEBUG @@ -238,6 +239,22 @@ CELERYD_CONCURRENCY = 4 TASKS_CONCURRENTLY = 4 TASKS_SUBTASK_PACKAGES = 32 +CELERYBEAT_SCHEDULE = { + "refresh_package_query": { + "task": "djeuscan.tasks.consume_refresh_package_request", + "schedule": timedelta(minutes=1), + }, + "weekly_update": { + "task": "djeuscan.tasks.update_task", + "schedule": timedelta(days=7), + }, + "daily_update": { + "task": "djeuscan.tasks.update_task", + "schedule": timedelta(days=1), + "kwargs": {"scan_upstream": False} + }, +} + # LDAP authentication # TODO: Test data - change me! AUTH_LDAP_SERVER_URI = "ldap://localhost" diff --git a/euscanwww/scripts/start_dev_services.sh b/euscanwww/scripts/start_dev_services.sh index 74b76ec..2f50909 100755 --- a/euscanwww/scripts/start_dev_services.sh +++ b/euscanwww/scripts/start_dev_services.sh @@ -1,4 +1,3 @@ -python manage.py celeryd -E -l INFO & -python manage.py celerybeat -l INFO & +python manage.py celeryd -B -E -l INFO & python manage.py celerycam & python manage.py runserver