1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
# GNU MediaGoblin -- federated, autonomous media hosting
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import urllib
import urllib2
from celery import registry, task
from mediagoblin import mg_globals as mgg
from . import mark_entry_failed, BaseProcessingFail
from mediagoblin.tools.processing import json_processing_callback
from mediagoblin.processing import get_entry_and_manager
_log = logging.getLogger(__name__)
logging.basicConfig()
_log.setLevel(logging.DEBUG)
@task.task(default_retry_delay=2 * 60)
def handle_push_urls(feed_url):
"""Subtask, notifying the PuSH servers of new content
Retry 3 times every 2 minutes if run in separate process before failing."""
if not mgg.app_config["push_urls"]:
return # Nothing to do
_log.debug('Notifying Push servers for feed {0}'.format(feed_url))
hubparameters = {
'hub.mode': 'publish',
'hub.url': feed_url}
hubdata = urllib.urlencode(hubparameters)
hubheaders = {
"Content-type": "application/x-www-form-urlencoded",
"Connection": "close"}
for huburl in mgg.app_config["push_urls"]:
hubrequest = urllib2.Request(huburl, hubdata, hubheaders)
try:
hubresponse = urllib2.urlopen(hubrequest)
except (urllib2.HTTPError, urllib2.URLError) as exc:
# We retry by default 3 times before failing
_log.info("PuSH url %r gave error %r", huburl, exc)
try:
return handle_push_urls.retry(exc=exc, throw=False)
except Exception as e:
# All retries failed, Failure is no tragedy here, probably.
_log.warn('Failed to notify PuSH server for feed {0}. '
'Giving up.'.format(feed_url))
return False
################################
# Media processing initial steps
################################
class ProcessMedia(task.Task):
"""
Pass this entry off for processing.
"""
def run(self, media_id, feed_url, reprocess_action, reprocess_info=None):
"""
Pass the media entry off to the appropriate processing function
(for now just process_image...)
:param feed_url: The feed URL that the PuSH server needs to be
updated for.
:param reprocess: A dict containing all of the necessary reprocessing
info for the media_type.
"""
reprocess_info = reprocess_info or {}
entry, manager = get_entry_and_manager(media_id)
# Try to process, and handle expected errors.
try:
processor_class = manager.get_processor(reprocess_action, entry)
entry.state = u'processing'
entry.save()
_log.debug('Processing {0}'.format(entry))
with processor_class(manager, entry) as processor:
processor.process(**reprocess_info)
# We set the state to processed and save the entry here so there's
# no need to save at the end of the processing stage, probably ;)
entry.state = u'processed'
entry.save()
# Notify the PuSH servers as async task
if mgg.app_config["push_urls"] and feed_url:
handle_push_urls.subtask().delay(feed_url)
json_processing_callback(entry)
except BaseProcessingFail as exc:
mark_entry_failed(entry.id, exc)
json_processing_callback(entry)
return
except ImportError as exc:
_log.error(
'Entry {0} failed to process due to an import error: {1}'\
.format(
entry.title,
exc))
mark_entry_failed(entry.id, exc)
json_processing_callback(entry)
except Exception as exc:
_log.error('An unhandled exception was raised while'
+ ' processing {0}'.format(
entry))
mark_entry_failed(entry.id, exc)
json_processing_callback(entry)
raise
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""
If the processing failed we should mark that in the database.
Assuming that the exception raised is a subclass of
BaseProcessingFail, we can use that to get more information
about the failure and store that for conveying information to
users about the failure, etc.
"""
entry_id = args[0]
mark_entry_failed(entry_id, exc)
entry = mgg.database.MediaEntry.query.filter_by(id=entry_id).first()
json_processing_callback(entry)
# Register the task
process_media = registry.tasks[ProcessMedia.name]
|