From be1f0f7d33440d96c2bcc1c7f3dfd5cbb356e54f Mon Sep 17 00:00:00 2001 From: Elrond Date: Mon, 17 Dec 2012 19:42:31 +0100 Subject: upload refactor: push url handling Start to refactor our upload handling in main submit and the api. Start factoring out the handling of PuSH url handling. --- mediagoblin/submit/lib.py | 50 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 mediagoblin/submit/lib.py (limited to 'mediagoblin/submit/lib.py') diff --git a/mediagoblin/submit/lib.py b/mediagoblin/submit/lib.py new file mode 100644 index 00000000..57069e84 --- /dev/null +++ b/mediagoblin/submit/lib.py @@ -0,0 +1,50 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import urllib +import urllib2 +import logging + +from mediagoblin import mg_globals + +_log = logging.getLogger(__name__) + + +def handle_push_urls(request): + if mg_globals.app_config["push_urls"]: + feed_url = request.urlgen( + 'mediagoblin.user_pages.atom_feed', + qualified=True, + user=request.user.username) + hubparameters = { + 'hub.mode': 'publish', + 'hub.url': feed_url} + hubdata = urllib.urlencode(hubparameters) + hubheaders = { + "Content-type": "application/x-www-form-urlencoded", + "Connection": "close"} + for huburl in mg_globals.app_config["push_urls"]: + hubrequest = urllib2.Request(huburl, hubdata, hubheaders) + try: + hubresponse = urllib2.urlopen(hubrequest) + except urllib2.HTTPError as exc: + # This is not a big issue, the item will be fetched + # by the PuSH server next time we hit it + _log.warning( + "push url %r gave error %r", huburl, exc.code) + except urllib2.URLError as exc: + _log.warning( + "push url %r is unreachable %r", huburl, exc.reason) -- cgit v1.2.3 From 86bb44ef121e64e2a2c7ad175af444000a7ca0c9 Mon Sep 17 00:00:00 2001 From: Elrond Date: Mon, 17 Dec 2012 19:54:26 +0100 Subject: Factor out the actual calling of the processing. Calling the processing task and handling the exceptions is easy, but has a bunch of caveats, so factor it out into an easy callable function. --- mediagoblin/submit/lib.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) (limited to 'mediagoblin/submit/lib.py') diff --git a/mediagoblin/submit/lib.py b/mediagoblin/submit/lib.py index 57069e84..f174d4ea 100644 --- a/mediagoblin/submit/lib.py +++ b/mediagoblin/submit/lib.py @@ -17,12 +17,36 @@ import urllib import urllib2 import logging +from celery import registry from mediagoblin import mg_globals +from mediagoblin.processing import mark_entry_failed +from mediagoblin.processing.task import ProcessMedia + _log = logging.getLogger(__name__) +def run_process_media(entry): + process_media = registry.tasks[ProcessMedia.name] + try: + process_media.apply_async( + [unicode(entry.id)], {}, + task_id=entry.queued_task_id) + except BaseException as exc: + # The purpose of this section is because when running in "lazy" + # or always-eager-with-exceptions-propagated celery mode that + # the failure handling won't happen on Celery end. Since we + # expect a lot of users to run things in this way we have to + # capture stuff here. + # + # ... not completely the diaper pattern because the + # exception is re-raised :) + mark_entry_failed(entry.id, exc) + # re-raise the exception + raise + + def handle_push_urls(request): if mg_globals.app_config["push_urls"]: feed_url = request.urlgen( -- cgit v1.2.3 From 8eb47d02d922fa90abd56729c6c6898f43cf7413 Mon Sep 17 00:00:00 2001 From: Elrond Date: Mon, 17 Dec 2012 20:05:37 +0100 Subject: Processing: Factor out prepare_entry. prepare_entry handles the task_id setup and generating a queue filename and file. it returns the queue file. --- mediagoblin/submit/lib.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) (limited to 'mediagoblin/submit/lib.py') diff --git a/mediagoblin/submit/lib.py b/mediagoblin/submit/lib.py index f174d4ea..6660eb53 100644 --- a/mediagoblin/submit/lib.py +++ b/mediagoblin/submit/lib.py @@ -17,7 +17,9 @@ import urllib import urllib2 import logging +import uuid from celery import registry +from werkzeug.utils import secure_filename from mediagoblin import mg_globals from mediagoblin.processing import mark_entry_failed @@ -27,6 +29,32 @@ from mediagoblin.processing.task import ProcessMedia _log = logging.getLogger(__name__) +def prepare_entry(request, entry, filename): + # We generate this ourselves so we know what the taks id is for + # retrieval later. + + # (If we got it off the task's auto-generation, there'd be + # a risk of a race condition when we'd save after sending + # off the task) + task_id = unicode(uuid.uuid4()) + entry.queued_task_id = task_id + + # Now store generate the queueing related filename + queue_filepath = request.app.queue_store.get_unique_filepath( + ['media_entries', + task_id, + secure_filename(filename)]) + + # queue appropriately + queue_file = request.app.queue_store.get_file( + queue_filepath, 'wb') + + # Add queued filename to the entry + entry.queued_media_file = queue_filepath + + return queue_file + + def run_process_media(entry): process_media = registry.tasks[ProcessMedia.name] try: -- cgit v1.2.3 From b228d89715558281d9573543dd0d6c74836d42ca Mon Sep 17 00:00:00 2001 From: Elrond Date: Wed, 26 Dec 2012 23:40:42 +0100 Subject: prepare_queue_task: Take app not request. First rename prepare_entry to prepare_queue_task, because this is really more like what this thing does. Thanks to Velmont for noting that we do not need a request in here, but an "app" is good enough. Which means, that this stuff can be called from tool scripts too. --- mediagoblin/submit/lib.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'mediagoblin/submit/lib.py') diff --git a/mediagoblin/submit/lib.py b/mediagoblin/submit/lib.py index 6660eb53..db5dfe53 100644 --- a/mediagoblin/submit/lib.py +++ b/mediagoblin/submit/lib.py @@ -29,7 +29,10 @@ from mediagoblin.processing.task import ProcessMedia _log = logging.getLogger(__name__) -def prepare_entry(request, entry, filename): +def prepare_queue_task(app, entry, filename): + """ + Prepare a MediaEntry for the processing queue and get a queue file + """ # We generate this ourselves so we know what the taks id is for # retrieval later. @@ -40,13 +43,13 @@ def prepare_entry(request, entry, filename): entry.queued_task_id = task_id # Now store generate the queueing related filename - queue_filepath = request.app.queue_store.get_unique_filepath( + queue_filepath = app.queue_store.get_unique_filepath( ['media_entries', task_id, secure_filename(filename)]) # queue appropriately - queue_file = request.app.queue_store.get_file( + queue_file = app.queue_store.get_file( queue_filepath, 'wb') # Add queued filename to the entry -- cgit v1.2.3 From 2cfffd5ed8c054bb60c27ede4e69667f97d12b09 Mon Sep 17 00:00:00 2001 From: Sebastian Spaeth Date: Tue, 15 Jan 2013 14:41:30 +0100 Subject: Make PuSHing the Pubhubsubbub server an async task (#436, #585) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Notifying the PuSH servers had 3 problems.  1) it was done immediately after sending of the processing task to celery. So if celery was run in a separate process we would notify the PuSH servers before the new media was processed/ visible. (#436) 2) Notification code was called in submit/views.py, so submitting via the API never resulted in notifications. (#585) 3) If Notifying the PuSH server failed, we would never retry. The solution was to make the PuSH notification an asynchronous subtask. This way: 1) it will only be called once async processing has finished, 2) it is in the main processing code path, so even API calls will result in notifications, and 3) We retry 3 times in case of failure before giving up. If the server is in a separate process, we will wait 3x 2 minutes before retrying the notification. The only downside is that the celery server needs to have access to the internet to ping the PuSH server. If that is a problem, we need to make the task belong to a special group of celery servers that has access to the internet. As a side effect, I believe I removed the limitation that prevented us from upgrading celery. Signed-off-by: Sebastian Spaeth --- mediagoblin/submit/lib.py | 41 ++++++----------------------------------- 1 file changed, 6 insertions(+), 35 deletions(-) (limited to 'mediagoblin/submit/lib.py') diff --git a/mediagoblin/submit/lib.py b/mediagoblin/submit/lib.py index db5dfe53..ba07c6fa 100644 --- a/mediagoblin/submit/lib.py +++ b/mediagoblin/submit/lib.py @@ -14,16 +14,12 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -import urllib -import urllib2 import logging import uuid -from celery import registry from werkzeug.utils import secure_filename -from mediagoblin import mg_globals from mediagoblin.processing import mark_entry_failed -from mediagoblin.processing.task import ProcessMedia +from mediagoblin.processing.task import process_media _log = logging.getLogger(__name__) @@ -58,11 +54,13 @@ def prepare_queue_task(app, entry, filename): return queue_file -def run_process_media(entry): - process_media = registry.tasks[ProcessMedia.name] +def run_process_media(entry, request): + feed_url = request.urlgen( + 'mediagoblin.user_pages.atom_feed', + qualified=True, user=request.user.username) try: process_media.apply_async( - [unicode(entry.id)], {}, + [entry.id, feed_url], {}, task_id=entry.queued_task_id) except BaseException as exc: # The purpose of this section is because when running in "lazy" @@ -76,30 +74,3 @@ def run_process_media(entry): mark_entry_failed(entry.id, exc) # re-raise the exception raise - - -def handle_push_urls(request): - if mg_globals.app_config["push_urls"]: - feed_url = request.urlgen( - 'mediagoblin.user_pages.atom_feed', - qualified=True, - user=request.user.username) - hubparameters = { - 'hub.mode': 'publish', - 'hub.url': feed_url} - hubdata = urllib.urlencode(hubparameters) - hubheaders = { - "Content-type": "application/x-www-form-urlencoded", - "Connection": "close"} - for huburl in mg_globals.app_config["push_urls"]: - hubrequest = urllib2.Request(huburl, hubdata, hubheaders) - try: - hubresponse = urllib2.urlopen(hubrequest) - except urllib2.HTTPError as exc: - # This is not a big issue, the item will be fetched - # by the PuSH server next time we hit it - _log.warning( - "push url %r gave error %r", huburl, exc.code) - except urllib2.URLError as exc: - _log.warning( - "push url %r is unreachable %r", huburl, exc.reason) -- cgit v1.2.3 From c7b3d070b65a84e3bfa9d8e3e6f52aac6552910f Mon Sep 17 00:00:00 2001 From: Sebastian Spaeth Date: Tue, 15 Jan 2013 15:03:00 +0100 Subject: Don't pass request into run_process_media People(tm) want to start run_process_media from the CLI and might not have a request object handy. So pass in the feed_url into run_process_media rather than the request object and allow the feed url to be empty (resulting in no PuSH notification at all then). Signed-off-by: Sebastian Spaeth --- mediagoblin/submit/lib.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) (limited to 'mediagoblin/submit/lib.py') diff --git a/mediagoblin/submit/lib.py b/mediagoblin/submit/lib.py index ba07c6fa..679fc543 100644 --- a/mediagoblin/submit/lib.py +++ b/mediagoblin/submit/lib.py @@ -54,10 +54,14 @@ def prepare_queue_task(app, entry, filename): return queue_file -def run_process_media(entry, request): - feed_url = request.urlgen( - 'mediagoblin.user_pages.atom_feed', - qualified=True, user=request.user.username) +def run_process_media(entry, feed_url=None): + """Process the media asynchronously + + :param entry: MediaEntry() instance to be processed. + :param feed_url: A string indicating the feed_url that the PuSH servers + should be notified of. This will be sth like: `request.urlgen( + 'mediagoblin.user_pages.atom_feed',qualified=True, + user=request.user.username)`""" try: process_media.apply_async( [entry.id, feed_url], {}, -- cgit v1.2.3 From 2ef2f46e73845dcd55666cad49c5a17908bf5b46 Mon Sep 17 00:00:00 2001 From: Elrond Date: Fri, 22 Mar 2013 15:45:21 +0100 Subject: Refactor file field checking. When uploading, the file field needs some checks, it seems. So refactor them into check_file_field and use around. --- mediagoblin/submit/lib.py | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'mediagoblin/submit/lib.py') diff --git a/mediagoblin/submit/lib.py b/mediagoblin/submit/lib.py index 679fc543..a5483471 100644 --- a/mediagoblin/submit/lib.py +++ b/mediagoblin/submit/lib.py @@ -17,6 +17,7 @@ import logging import uuid from werkzeug.utils import secure_filename +from werkzeug.datastructures import FileStorage from mediagoblin.processing import mark_entry_failed from mediagoblin.processing.task import process_media @@ -25,6 +26,16 @@ from mediagoblin.processing.task import process_media _log = logging.getLogger(__name__) +def check_file_field(request, field_name): + """Check if a file field meets minimal criteria""" + retval = (field_name in request.files + and isinstance(request.files[field_name], FileStorage) + and request.files[field_name].stream) + if not retval: + _log.debug("Form did not contain proper file field %s", field_name) + return retval + + def prepare_queue_task(app, entry, filename): """ Prepare a MediaEntry for the processing queue and get a queue file -- cgit v1.2.3 From cec9648c11d851baa8add4f49cdcdbc5416386a9 Mon Sep 17 00:00:00 2001 From: Alon Levy Date: Tue, 23 Apr 2013 09:39:51 +0300 Subject: mediagoblin/submit/lib.py: fix typo Signed-off-by: Alon Levy --- mediagoblin/submit/lib.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'mediagoblin/submit/lib.py') diff --git a/mediagoblin/submit/lib.py b/mediagoblin/submit/lib.py index a5483471..7c3b8ab3 100644 --- a/mediagoblin/submit/lib.py +++ b/mediagoblin/submit/lib.py @@ -40,7 +40,7 @@ def prepare_queue_task(app, entry, filename): """ Prepare a MediaEntry for the processing queue and get a queue file """ - # We generate this ourselves so we know what the taks id is for + # We generate this ourselves so we know what the task id is for # retrieval later. # (If we got it off the task's auto-generation, there'd be -- cgit v1.2.3 From 6c1467d570a4da68ef8b4edac9aecdb9c87a61de Mon Sep 17 00:00:00 2001 From: Elrond Date: Tue, 21 May 2013 00:28:37 +0200 Subject: Refactor submit util new_upload_entry This tool creates an initial media entry for a given user. No magic. It just prefills the license with the user's default license and adds the user as uploader. --- mediagoblin/submit/lib.py | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'mediagoblin/submit/lib.py') diff --git a/mediagoblin/submit/lib.py b/mediagoblin/submit/lib.py index 7c3b8ab3..7e85696b 100644 --- a/mediagoblin/submit/lib.py +++ b/mediagoblin/submit/lib.py @@ -19,6 +19,7 @@ import uuid from werkzeug.utils import secure_filename from werkzeug.datastructures import FileStorage +from mediagoblin.db.models import MediaEntry from mediagoblin.processing import mark_entry_failed from mediagoblin.processing.task import process_media @@ -36,6 +37,16 @@ def check_file_field(request, field_name): return retval +def new_upload_entry(user): + """ + Create a new MediaEntry for uploading + """ + entry = MediaEntry() + entry.uploader = user.id + entry.license = user.license_preference + return entry + + def prepare_queue_task(app, entry, filename): """ Prepare a MediaEntry for the processing queue and get a queue file -- cgit v1.2.3