1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
# GNU MediaGoblin -- federated, autonomous media hosting
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import datetime
import logging
from celery import Celery
from kombu import Exchange, Queue
from mediagoblin.tools.pluginapi import hook_runall
_log = logging.getLogger(__name__)
MANDATORY_CELERY_IMPORTS = [
'mediagoblin.processing.task',
'mediagoblin.notifications.task',
'mediagoblin.submit.task',
'mediagoblin.media_types.video.processing',
]
DEFAULT_SETTINGS_MODULE = 'mediagoblin.init.celery.dummy_settings_module'
def get_celery_settings_dict(app_config, global_config,
force_celery_always_eager=False):
"""
Get a celery settings dictionary from reading the config
"""
if 'celery' in global_config:
celery_conf = global_config['celery']
else:
celery_conf = {}
# Add x-max-priority to config
celery_conf['CELERY_QUEUES'] = (
Queue('default', Exchange('default'), routing_key='default',
queue_arguments={'x-max-priority': 10}),
)
celery_settings = {}
# Add all celery settings from config
for key, value in celery_conf.items():
celery_settings[key] = value
# TODO: use default result stuff here if it exists
# add mandatory celery imports
celery_imports = celery_settings.setdefault('CELERY_IMPORTS', [])
celery_imports.extend(MANDATORY_CELERY_IMPORTS)
if force_celery_always_eager:
celery_settings['CELERY_ALWAYS_EAGER'] = True
celery_settings['CELERY_EAGER_PROPAGATES_EXCEPTIONS'] = True
# Garbage collection periodic task
frequency = app_config.get('garbage_collection', 60)
if frequency:
frequency = int(frequency)
celery_settings['CELERYBEAT_SCHEDULE'] = {
'garbage-collection': {
'task': 'mediagoblin.submit.task.collect_garbage',
'schedule': datetime.timedelta(minutes=frequency),
}
}
return celery_settings
def setup_celery_app(app_config, global_config,
settings_module=DEFAULT_SETTINGS_MODULE,
force_celery_always_eager=False):
"""
Setup celery without using terrible setup-celery-module hacks.
"""
celery_settings = get_celery_settings_dict(
app_config, global_config, force_celery_always_eager)
celery_app = Celery()
celery_app.config_from_object(celery_settings)
hook_runall('celery_setup', celery_app)
def setup_celery_from_config(app_config, global_config,
settings_module=DEFAULT_SETTINGS_MODULE,
force_celery_always_eager=False,
set_environ=True):
"""
Take a mediagoblin app config and try to set up a celery settings
module from this.
Args:
- app_config: the application config section
- global_config: the entire ConfigObj loaded config, all sections
- settings_module: the module to populate, as a string
- force_celery_always_eager: whether or not to force celery into
always eager mode; good for development and small installs
- set_environ: if set, this will CELERY_CONFIG_MODULE to the
settings_module
"""
celery_settings = get_celery_settings_dict(
app_config, global_config, force_celery_always_eager)
__import__(settings_module)
this_module = sys.modules[settings_module]
for key, value in celery_settings.items():
setattr(this_module, key, value)
if set_environ:
os.environ['CELERY_CONFIG_MODULE'] = settings_module
# Replace the default celery.current_app.conf if celery has already been
# initiated
from celery import current_app
_log.info('Setting celery configuration from object "{}"'.format(
settings_module))
current_app.config_from_object(this_module)
_log.debug('Celery broker host: {}'.format(current_app.conf['BROKER_HOST']))
|