aboutsummaryrefslogtreecommitdiffstats
path: root/mediagoblin
diff options
context:
space:
mode:
Diffstat (limited to 'mediagoblin')
-rw-r--r--mediagoblin/auth/views.py18
-rw-r--r--mediagoblin/db/mixin.py57
-rw-r--r--mediagoblin/db/models.py20
-rw-r--r--mediagoblin/db/util.py2
-rw-r--r--mediagoblin/edit/views.py34
-rw-r--r--mediagoblin/media_types/ascii/processing.py8
-rw-r--r--mediagoblin/media_types/audio/processing.py8
-rw-r--r--mediagoblin/media_types/image/processing.py10
-rw-r--r--mediagoblin/media_types/stl/model_loader.py3
-rw-r--r--mediagoblin/media_types/stl/processing.py8
-rw-r--r--mediagoblin/media_types/video/transcoders.py348
-rw-r--r--mediagoblin/plugins/api/views.py8
-rw-r--r--mediagoblin/plugins/oauth/views.py8
-rw-r--r--mediagoblin/plugins/piwigo/forms.py (renamed from mediagoblin/tests/test_tests.py)28
-rw-r--r--mediagoblin/plugins/piwigo/views.py17
-rw-r--r--mediagoblin/plugins/raven/README.rst2
-rw-r--r--mediagoblin/processing/__init__.py19
-rw-r--r--mediagoblin/storage/__init__.py16
-rw-r--r--mediagoblin/storage/filestorage.py26
-rw-r--r--mediagoblin/submit/lib.py11
-rw-r--r--mediagoblin/submit/views.py20
-rw-r--r--mediagoblin/tests/conftest.py15
-rw-r--r--mediagoblin/tests/pytest.ini2
-rw-r--r--mediagoblin/tests/test_api.py26
-rw-r--r--mediagoblin/tests/test_auth.py46
-rw-r--r--mediagoblin/tests/test_collections.py11
-rw-r--r--mediagoblin/tests/test_csrf_middleware.py20
-rw-r--r--mediagoblin/tests/test_edit.py60
-rw-r--r--mediagoblin/tests/test_globals.py10
-rw-r--r--mediagoblin/tests/test_http_callback.py27
-rw-r--r--mediagoblin/tests/test_messages.py5
-rw-r--r--mediagoblin/tests/test_misc.py26
-rw-r--r--mediagoblin/tests/test_modelmethods.py98
-rw-r--r--mediagoblin/tests/test_oauth.py61
-rw-r--r--mediagoblin/tests/test_paste.ini2
-rw-r--r--mediagoblin/tests/test_pluginapi.py21
-rw-r--r--mediagoblin/tests/test_storage.py46
-rw-r--r--mediagoblin/tests/test_submission.py98
-rw-r--r--mediagoblin/tests/test_tags.py4
-rw-r--r--mediagoblin/tests/test_workbench.py13
-rw-r--r--mediagoblin/tests/tools.py65
-rw-r--r--mediagoblin/user_pages/views.py10
42 files changed, 615 insertions, 722 deletions
diff --git a/mediagoblin/auth/views.py b/mediagoblin/auth/views.py
index d8ad7b51..354b48c1 100644
--- a/mediagoblin/auth/views.py
+++ b/mediagoblin/auth/views.py
@@ -78,7 +78,7 @@ def register(request):
user.username = register_form.data['username']
user.email = register_form.data['email']
user.pw_hash = auth_lib.bcrypt_gen_password_hash(
- request.form['password'])
+ register_form.password.data)
user.verification_key = unicode(uuid.uuid4())
user.save()
@@ -116,7 +116,7 @@ def login(request):
if login_form.validate():
user = User.query.filter_by(username=login_form.data['username']).first()
- if user and user.check_login(request.form['password']):
+ if user and user.check_login(login_form.password.data):
# set up login in session
request.session['user_id'] = unicode(user.id)
request.session.save()
@@ -196,7 +196,7 @@ def resend_activation(request):
request,
messages.ERROR,
_('You must be logged in so we know who to send the email to!'))
-
+
return redirect(request, 'mediagoblin.auth.login')
if request.user.email_verified:
@@ -204,12 +204,12 @@ def resend_activation(request):
request,
messages.ERROR,
_("You've already verified your email address!"))
-
+
return redirect(request, "mediagoblin.user_pages.user_home", user=request.user['username'])
request.user.verification_key = unicode(uuid.uuid4())
request.user.save()
-
+
email_debug_message(request)
send_verification_email(request.user, request)
@@ -241,11 +241,11 @@ def forgot_password(request):
# has been sanitized. Store if a user was found by email. We should
# not reveal if the operation was successful then as we don't want to
# leak if an email address exists in the system.
- found_by_email = '@' in request.form['username']
+ found_by_email = '@' in fp_form.username.data
if found_by_email:
user = User.query.filter_by(
- email = request.form['username']).first()
+ email = fp_form.username.data).first()
# Don't reveal success in case the lookup happened by email address.
success_message=_("If that email address (case sensitive!) is "
"registered an email has been sent with instructions "
@@ -253,7 +253,7 @@ def forgot_password(request):
else: # found by username
user = User.query.filter_by(
- username = request.form['username']).first()
+ username = fp_form.username.data).first()
if user is None:
messages.add_message(request,
@@ -317,7 +317,7 @@ def verify_forgot_password(request):
if request.method == 'POST' and cp_form.validate():
user.pw_hash = auth_lib.bcrypt_gen_password_hash(
- request.form['password'])
+ cp_form.password.data)
user.fp_verification_key = None
user.fp_token_expire = None
user.save()
diff --git a/mediagoblin/db/mixin.py b/mediagoblin/db/mixin.py
index fdf61e8d..0dc3bc85 100644
--- a/mediagoblin/db/mixin.py
+++ b/mediagoblin/db/mixin.py
@@ -52,10 +52,18 @@ class UserMixin(object):
return cleaned_markdown_conversion(self.bio)
-class MediaEntryMixin(object):
+class GenerateSlugMixin(object):
+ """
+ Mixin to add a generate_slug method to objects.
+
+ Depends on:
+ - self.slug
+ - self.title
+ - self.check_slug_used(new_slug)
+ """
def generate_slug(self):
"""
- Generate a unique slug for this MediaEntry.
+ Generate a unique slug for this object.
This one does not *force* slugs, but usually it will probably result
in a niceish one.
@@ -76,19 +84,15 @@ class MediaEntryMixin(object):
generated bits until it's unique. That'll be a little bit of junk,
but at least it has the basis of a nice slug.
"""
- # import this here due to a cyclic import issue
- # (db.models -> db.mixin -> db.util -> db.models)
- from mediagoblin.db.util import check_media_slug_used
-
#Is already a slug assigned? Check if it is valid
if self.slug:
self.slug = slugify(self.slug)
-
+
# otherwise, try to use the title.
elif self.title:
# assign slug based on title
self.slug = slugify(self.title)
-
+
# We don't want any empty string slugs
if self.slug == u"":
self.slug = None
@@ -98,26 +102,34 @@ class MediaEntryMixin(object):
# so just return... we're not going to force one.
if not self.slug:
return # giving up!
-
+
# Otherwise, let's see if this is unique.
- if check_media_slug_used(self.uploader, self.slug, self.id):
+ if self.check_slug_used(self.slug):
# It looks like it's being used... lame.
-
+
# Can we just append the object's id to the end?
if self.id:
slug_with_id = u"%s-%s" % (self.slug, self.id)
- if not check_media_slug_used(self.uploader,
- slug_with_id, self.id):
+ if not self.check_slug_used(slug_with_id):
self.slug = slug_with_id
return # success!
-
+
# okay, still no success;
# let's whack junk on there till it's unique.
self.slug += '-' + uuid.uuid4().hex[:4]
# keep going if necessary!
- while check_media_slug_used(self.uploader, self.slug, self.id):
+ while self.check_slug_used(self.slug):
self.slug += uuid.uuid4().hex[:4]
+
+class MediaEntryMixin(GenerateSlugMixin):
+ def check_slug_used(self, slug):
+ # import this here due to a cyclic import issue
+ # (db.models -> db.mixin -> db.util -> db.models)
+ from mediagoblin.db.util import check_media_slug_used
+
+ return check_media_slug_used(self.uploader, slug, self.id)
+
@property
def description_html(self):
"""
@@ -238,22 +250,13 @@ class MediaCommentMixin(object):
return cleaned_markdown_conversion(self.content)
-class CollectionMixin(object):
- def generate_slug(self):
+class CollectionMixin(GenerateSlugMixin):
+ def check_slug_used(self, slug):
# import this here due to a cyclic import issue
# (db.models -> db.mixin -> db.util -> db.models)
from mediagoblin.db.util import check_collection_slug_used
- self.slug = slugify(self.title)
-
- duplicate = check_collection_slug_used(mg_globals.database,
- self.creator, self.slug, self.id)
-
- if duplicate:
- if self.id is not None:
- self.slug = u"%s-%s" % (self.id, self.slug)
- else:
- self.slug = None
+ return check_collection_slug_used(self.creator, slug, self.id)
@property
def description_html(self):
diff --git a/mediagoblin/db/models.py b/mediagoblin/db/models.py
index 2f58503f..fcfd0f61 100644
--- a/mediagoblin/db/models.py
+++ b/mediagoblin/db/models.py
@@ -172,8 +172,7 @@ class MediaEntry(Base, MediaEntryMixin):
order_col = MediaComment.created
if not ascending:
order_col = desc(order_col)
- return MediaComment.query.filter_by(
- media_entry=self.id).order_by(order_col)
+ return self.all_comments.order_by(order_col)
def url_to_prev(self, urlgen):
"""get the next 'newer' entry by this user"""
@@ -238,9 +237,7 @@ class MediaEntry(Base, MediaEntryMixin):
:param del_orphan_tags: True/false if we delete unused Tags too
:param commit: True/False if this should end the db transaction"""
# User's CollectionItems are automatically deleted via "cascade".
- # Delete all the associated comments
- for comment in self.get_comments():
- comment.delete(commit=False)
+ # Comments on this Media are deleted by cascade, hopefully.
# Delete all related files/attachments
try:
@@ -385,13 +382,22 @@ class MediaComment(Base, MediaCommentMixin):
content = Column(UnicodeText, nullable=False)
# Cascade: Comments are owned by their creator. So do the full thing.
- # lazy=dynamic: People might post a *lot* of comments, so make
- # the "posted_comments" a query-like thing.
+ # lazy=dynamic: People might post a *lot* of comments,
+ # so make the "posted_comments" a query-like thing.
get_author = relationship(User,
backref=backref("posted_comments",
lazy="dynamic",
cascade="all, delete-orphan"))
+ # Cascade: Comments are somewhat owned by their MediaEntry.
+ # So do the full thing.
+ # lazy=dynamic: MediaEntries might have many comments,
+ # so make the "all_comments" a query-like thing.
+ get_media_entry = relationship(MediaEntry,
+ backref=backref("all_comments",
+ lazy="dynamic",
+ cascade="all, delete-orphan"))
+
class Collection(Base, CollectionMixin):
"""An 'album' or 'set' of media by a user.
diff --git a/mediagoblin/db/util.py b/mediagoblin/db/util.py
index 529ef8b9..6ffec44d 100644
--- a/mediagoblin/db/util.py
+++ b/mediagoblin/db/util.py
@@ -59,7 +59,7 @@ def clean_orphan_tags(commit=True):
Session.commit()
-def check_collection_slug_used(dummy_db, creator_id, slug, ignore_c_id):
+def check_collection_slug_used(creator_id, slug, ignore_c_id):
filt = (Collection.creator == creator_id) \
& (Collection.slug == slug)
if ignore_c_id is not None:
diff --git a/mediagoblin/edit/views.py b/mediagoblin/edit/views.py
index cdb5c713..34b7aaca 100644
--- a/mediagoblin/edit/views.py
+++ b/mediagoblin/edit/views.py
@@ -26,7 +26,7 @@ from mediagoblin.auth import lib as auth_lib
from mediagoblin.edit import forms
from mediagoblin.edit.lib import may_edit_media
from mediagoblin.decorators import (require_active_login, active_user_from_url,
- get_media_entry_by_id,
+ get_media_entry_by_id,
user_may_alter_collection, get_user_collection)
from mediagoblin.tools.response import render_to_response, redirect
from mediagoblin.tools.translate import pass_to_ugettext as _
@@ -58,19 +58,19 @@ def edit_media(request, media):
if request.method == 'POST' and form.validate():
# Make sure there isn't already a MediaEntry with such a slug
# and userid.
- slug = slugify(request.form['slug'])
+ slug = slugify(form.slug.data)
slug_used = check_media_slug_used(media.uploader, slug, media.id)
if slug_used:
form.slug.errors.append(
_(u'An entry with that slug already exists for this user.'))
else:
- media.title = request.form['title']
- media.description = request.form.get('description')
+ media.title = form.title.data
+ media.description = form.description.data
media.tags = convert_to_tag_list_of_dicts(
- request.form.get('tags'))
+ form.tags.data)
- media.license = unicode(request.form.get('license', '')) or None
+ media.license = unicode(form.license.data) or None
media.slug = slug
media.save()
@@ -142,7 +142,7 @@ def edit_attachments(request, media):
request.files['attachment_file'].stream.close()
media.attachment_files.append(dict(
- name=request.form['attachment_name'] \
+ name=form.attachment_name.data \
or request.files['attachment_file'].filename,
filepath=attachment_public_filepath,
created=datetime.utcnow(),
@@ -153,7 +153,7 @@ def edit_attachments(request, media):
messages.add_message(
request, messages.SUCCESS,
_("You added the attachment %s!") \
- % (request.form['attachment_name']
+ % (form.attachment_name.data
or request.files['attachment_file'].filename))
return redirect(request,
@@ -194,8 +194,8 @@ def edit_profile(request, url_user=None):
bio=user.bio)
if request.method == 'POST' and form.validate():
- user.url = unicode(request.form['url'])
- user.bio = unicode(request.form['bio'])
+ user.url = unicode(form.url.data)
+ user.bio = unicode(form.bio.data)
user.save()
@@ -308,26 +308,26 @@ def edit_collection(request, collection):
if request.method == 'POST' and form.validate():
# Make sure there isn't already a Collection with such a slug
# and userid.
- slug_used = check_collection_slug_used(request.db, collection.creator,
- request.form['slug'], collection.id)
+ slug_used = check_collection_slug_used(collection.creator,
+ form.slug.data, collection.id)
# Make sure there isn't already a Collection with this title
existing_collection = request.db.Collection.find_one({
'creator': request.user.id,
- 'title':request.form['title']})
+ 'title':form.title.data})
if existing_collection and existing_collection.id != collection.id:
messages.add_message(
request, messages.ERROR,
_('You already have a collection called "%s"!') % \
- request.form['title'])
+ form.title.data)
elif slug_used:
form.slug.errors.append(
_(u'A collection with that slug already exists for this user.'))
else:
- collection.title = unicode(request.form['title'])
- collection.description = unicode(request.form.get('description'))
- collection.slug = unicode(request.form['slug'])
+ collection.title = unicode(form.title.data)
+ collection.description = unicode(form.description.data)
+ collection.slug = unicode(form.slug.data)
collection.save()
diff --git a/mediagoblin/media_types/ascii/processing.py b/mediagoblin/media_types/ascii/processing.py
index 382cd015..309aab0a 100644
--- a/mediagoblin/media_types/ascii/processing.py
+++ b/mediagoblin/media_types/ascii/processing.py
@@ -127,8 +127,14 @@ def process_ascii(proc_state):
'ascii',
'xmlcharrefreplace'))
- mgg.queue_store.delete_file(queued_filepath)
+ # Remove queued media file from storage and database.
+ # queued_filepath is in the task_id directory which should
+ # be removed too, but fail if the directory is not empty to be on
+ # the super-safe side.
+ mgg.queue_store.delete_file(queued_filepath) # rm file
+ mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir
entry.queued_media_file = []
+
media_files_dict = entry.setdefault('media_files', {})
media_files_dict['thumb'] = thumb_filepath
media_files_dict['unicode'] = unicode_filepath
diff --git a/mediagoblin/media_types/audio/processing.py b/mediagoblin/media_types/audio/processing.py
index 5dffcaf9..101b83e5 100644
--- a/mediagoblin/media_types/audio/processing.py
+++ b/mediagoblin/media_types/audio/processing.py
@@ -147,4 +147,10 @@ def process_audio(proc_state):
else:
entry.media_files['thumb'] = ['fake', 'thumb', 'path.jpg']
- mgg.queue_store.delete_file(queued_filepath)
+ # Remove queued media file from storage and database.
+ # queued_filepath is in the task_id directory which should
+ # be removed too, but fail if the directory is not empty to be on
+ # the super-safe side.
+ mgg.queue_store.delete_file(queued_filepath) # rm file
+ mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir
+ entry.queued_media_file = []
diff --git a/mediagoblin/media_types/image/processing.py b/mediagoblin/media_types/image/processing.py
index ca88d3f4..e951ef29 100644
--- a/mediagoblin/media_types/image/processing.py
+++ b/mediagoblin/media_types/image/processing.py
@@ -122,6 +122,7 @@ def process_image(proc_state):
exif_tags, conversions_subdir,
(mgg.global_config['media:thumb']['max_width'],
mgg.global_config['media:thumb']['max_height']))
+ entry.media_files[u'thumb'] = thumb_filepath
# If the size of the original file exceeds the specified size of a `medium`
# file, a `.medium.jpg` files is created and later associated with the media
@@ -137,8 +138,7 @@ def process_image(proc_state):
exif_tags, conversions_subdir,
(mgg.global_config['media:medium']['max_width'],
mgg.global_config['media:medium']['max_height']))
- else:
- medium_filepath = None
+ entry.media_files[u'medium'] = medium_filepath
# Copy our queued local workbench to its final destination
proc_state.copy_original(name_builder.fill('{basename}{ext}'))
@@ -146,12 +146,6 @@ def process_image(proc_state):
# Remove queued media file from storage and database
proc_state.delete_queue_file()
- # Insert media file information into database
- media_files_dict = entry.setdefault('media_files', {})
- media_files_dict[u'thumb'] = thumb_filepath
- if medium_filepath:
- media_files_dict[u'medium'] = medium_filepath
-
# Insert exif data into database
exif_all = clean_exif(exif_tags)
diff --git a/mediagoblin/media_types/stl/model_loader.py b/mediagoblin/media_types/stl/model_loader.py
index 60fa4851..88f19314 100644
--- a/mediagoblin/media_types/stl/model_loader.py
+++ b/mediagoblin/media_types/stl/model_loader.py
@@ -80,6 +80,7 @@ class ObjModel(ThreeDee):
def load(self, fileob):
for line in fileob:
+ line = line.strip()
if line[0] == "v":
self.verts.append(self.__vector(line))
@@ -121,6 +122,8 @@ def auto_detect(fileob, hint):
pass
except ValueError:
pass
+ except IndexError:
+ pass
try:
# It is pretty important that the binary stl model loader
# is tried second, because its possible for it to parse
diff --git a/mediagoblin/media_types/stl/processing.py b/mediagoblin/media_types/stl/processing.py
index 77744ac5..e41df395 100644
--- a/mediagoblin/media_types/stl/processing.py
+++ b/mediagoblin/media_types/stl/processing.py
@@ -165,8 +165,12 @@ def process_stl(proc_state):
with open(queued_filename, 'rb') as queued_file:
model_file.write(queued_file.read())
- # Remove queued media file from storage and database
- mgg.queue_store.delete_file(queued_filepath)
+ # Remove queued media file from storage and database.
+ # queued_filepath is in the task_id directory which should
+ # be removed too, but fail if the directory is not empty to be on
+ # the super-safe side.
+ mgg.queue_store.delete_file(queued_filepath) # rm file
+ mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir
entry.queued_media_file = []
# Insert media file information into database
diff --git a/mediagoblin/media_types/video/transcoders.py b/mediagoblin/media_types/video/transcoders.py
index d8290d41..58b2c0d4 100644
--- a/mediagoblin/media_types/video/transcoders.py
+++ b/mediagoblin/media_types/video/transcoders.py
@@ -53,283 +53,6 @@ def pixbuf_to_pilbuf(buf):
return data
-class VideoThumbnailer:
- # Declaration of thumbnailer states
- STATE_NULL = 0
- STATE_HALTING = 1
- STATE_PROCESSING = 2
-
- # The current thumbnailer state
-
- def __init__(self, source_path, dest_path):
- '''
- Set up playbin pipeline in order to get video properties.
-
- Initializes and runs the gobject.MainLoop()
-
- Abstract
- - Set up a playbin with a fake audio sink and video sink. Load the video
- into the playbin
- - Initialize
- '''
- # This will contain the thumbnailing pipeline
- self.state = self.STATE_NULL
- self.thumbnail_pipeline = None
- self.buffer_probes = {}
- self.errors = []
-
- self.source_path = source_path
- self.dest_path = dest_path
-
- self.loop = gobject.MainLoop()
-
- # Set up the playbin. It will be used to discover certain
- # properties of the input file
- self.playbin = gst.element_factory_make('playbin')
-
- self.videosink = gst.element_factory_make('fakesink', 'videosink')
- self.playbin.set_property('video-sink', self.videosink)
-
- self.audiosink = gst.element_factory_make('fakesink', 'audiosink')
- self.playbin.set_property('audio-sink', self.audiosink)
-
- self.bus = self.playbin.get_bus()
- self.bus.add_signal_watch()
- self.watch_id = self.bus.connect('message', self._on_bus_message)
-
- self.playbin.set_property('uri', 'file:{0}'.format(
- urllib.pathname2url(self.source_path)))
-
- self.playbin.set_state(gst.STATE_PAUSED)
-
- self.run()
-
- def run(self):
- self.loop.run()
-
- def _on_bus_message(self, bus, message):
- _log.debug(' thumbnail playbin: {0}'.format(message))
-
- if message.type == gst.MESSAGE_ERROR:
- _log.error('thumbnail playbin: {0}'.format(message))
- gobject.idle_add(self._on_bus_error)
-
- elif message.type == gst.MESSAGE_STATE_CHANGED:
- # The pipeline state has changed
- # Parse state changing data
- _prev, state, _pending = message.parse_state_changed()
-
- _log.debug('State changed: {0}'.format(state))
-
- if state == gst.STATE_PAUSED:
- if message.src == self.playbin:
- gobject.idle_add(self._on_bus_paused)
-
- def _on_bus_paused(self):
- '''
- Set up thumbnailing pipeline
- '''
- current_video = self.playbin.get_property('current-video')
-
- if current_video == 0:
- _log.debug('Found current video from playbin')
- else:
- _log.error('Could not get any current video from playbin!')
-
- self.duration = self._get_duration(self.playbin)
- _log.info('Video length: {0}'.format(self.duration / gst.SECOND))
-
- _log.info('Setting up thumbnailing pipeline')
- self.thumbnail_pipeline = gst.parse_launch(
- 'filesrc location="{0}" ! decodebin ! '
- 'ffmpegcolorspace ! videoscale ! '
- 'video/x-raw-rgb,depth=24,bpp=24,pixel-aspect-ratio=1/1,width=180 ! '
- 'fakesink signal-handoffs=True'.format(self.source_path))
-
- self.thumbnail_bus = self.thumbnail_pipeline.get_bus()
- self.thumbnail_bus.add_signal_watch()
- self.thumbnail_watch_id = self.thumbnail_bus.connect(
- 'message', self._on_thumbnail_bus_message)
-
- self.thumbnail_pipeline.set_state(gst.STATE_PAUSED)
-
- #gobject.timeout_add(3000, self._on_timeout)
-
- return False
-
- def _on_thumbnail_bus_message(self, bus, message):
- _log.debug('thumbnail: {0}'.format(message))
-
- if message.type == gst.MESSAGE_ERROR:
- _log.error(message)
- gobject.idle_add(self._on_bus_error)
-
- if message.type == gst.MESSAGE_STATE_CHANGED:
- _log.debug('State changed')
- _prev, state, _pending = message.parse_state_changed()
-
- if (state == gst.STATE_PAUSED and
- not self.state == self.STATE_PROCESSING and
- message.src == self.thumbnail_pipeline):
- _log.info('Pipeline paused, processing')
- self.state = self.STATE_PROCESSING
-
- for sink in self.thumbnail_pipeline.sinks():
- name = sink.get_name()
- factoryname = sink.get_factory().get_name()
-
- if factoryname == 'fakesink':
- sinkpad = sink.get_pad('sink')
-
- self.buffer_probes[name] = sinkpad.add_buffer_probe(
- self.buffer_probe_handler, name)
-
- _log.info('Added buffer probe')
-
- break
-
- # Apply the wadsworth constant, fallback to 1 second
- # TODO: Will break if video is shorter than 1 sec
- seek_amount = max(self.duration / 100 * 30, 1 * gst.SECOND)
-
- _log.debug('seek amount: {0}'.format(seek_amount))
-
- seek_result = self.thumbnail_pipeline.seek(
- 1.0,
- gst.FORMAT_TIME,
- gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_ACCURATE,
- gst.SEEK_TYPE_SET,
- seek_amount,
- gst.SEEK_TYPE_NONE,
- 0)
-
- if not seek_result:
- self.errors.append('COULD_NOT_SEEK')
- _log.error('Couldn\'t seek! result: {0}'.format(
- seek_result))
- _log.info(message)
- self.shutdown()
- else:
- _log.debug('Seek successful')
- self.thumbnail_pipeline.set_state(gst.STATE_PAUSED)
- else:
- _log.debug('Won\'t seek: \t{0}\n\t{1}'.format(
- self.state,
- message.src))
-
- def buffer_probe_handler_real(self, pad, buff, name):
- '''
- Capture buffers as gdk_pixbufs when told to.
- '''
- _log.info('Capturing frame')
- try:
- caps = buff.caps
- if caps is None:
- _log.error('No caps passed to buffer probe handler!')
- self.shutdown()
- return False
-
- _log.debug('caps: {0}'.format(caps))
-
- filters = caps[0]
- width = filters["width"]
- height = filters["height"]
-
- im = Image.new('RGB', (width, height))
-
- data = pixbuf_to_pilbuf(buff.data)
-
- im.putdata(data)
-
- im.save(self.dest_path)
-
- _log.info('Saved thumbnail')
-
- self.shutdown()
-
- except gst.QueryError as e:
- _log.error('QueryError: {0}'.format(e))
-
- return False
-
- def buffer_probe_handler(self, pad, buff, name):
- '''
- Proxy function for buffer_probe_handler_real
- '''
- _log.debug('Attaching real buffer handler to gobject idle event')
- gobject.idle_add(
- lambda: self.buffer_probe_handler_real(pad, buff, name))
-
- return True
-
- def _get_duration(self, pipeline, retries=0):
- '''
- Get the duration of a pipeline.
-
- Retries 5 times.
- '''
- if retries == 5:
- return 0
-
- try:
- return pipeline.query_duration(gst.FORMAT_TIME)[0]
- except gst.QueryError:
- return self._get_duration(pipeline, retries + 1)
-
- def _on_timeout(self):
- _log.error('Timeout in thumbnailer!')
- self.shutdown()
-
- def _on_bus_error(self, *args):
- _log.error('AHAHAHA! Error! args: {0}'.format(args))
-
- def shutdown(self):
- '''
- Tell gobject to call __halt when the mainloop is idle.
- '''
- _log.info('Shutting down')
- self.__halt()
-
- def __halt(self):
- '''
- Halt all pipelines and shut down the main loop
- '''
- _log.info('Halting...')
- self.state = self.STATE_HALTING
-
- self.__disconnect()
-
- gobject.idle_add(self.__halt_final)
-
- def __disconnect(self):
- _log.debug('Disconnecting...')
- if not self.playbin is None:
- self.playbin.set_state(gst.STATE_NULL)
- for sink in self.playbin.sinks():
- name = sink.get_name()
- factoryname = sink.get_factory().get_name()
-
- _log.debug('Disconnecting {0}'.format(name))
-
- if factoryname == "fakesink":
- pad = sink.get_pad("sink")
- pad.remove_buffer_probe(self.buffer_probes[name])
- del self.buffer_probes[name]
-
- self.playbin = None
-
- if self.bus is not None:
- self.bus.disconnect(self.watch_id)
- self.bus = None
-
- def __halt_final(self):
- _log.info('Done')
- if self.errors:
- _log.error(','.join(self.errors))
-
- self.loop.quit()
-
-
class VideoThumbnailerMarkII(object):
'''
Creates a thumbnail from a video file. Rewrite of VideoThumbnailer.
@@ -398,8 +121,8 @@ class VideoThumbnailerMarkII(object):
self.run()
except Exception as exc:
_log.critical(
- 'Exception "{0}" caught, disconnecting and re-raising'\
- .format(exc))
+ 'Exception "{0}" caught, shutting down mainloop and re-raising'\
+ .format(exc))
self.disconnect()
raise
@@ -410,7 +133,8 @@ class VideoThumbnailerMarkII(object):
self.mainloop.run()
def on_playbin_message(self, message_bus, message):
- _log.debug('playbin message: {0}'.format(message))
+ # Silenced to prevent clobbering of output
+ #_log.debug('playbin message: {0}'.format(message))
if message.type == gst.MESSAGE_ERROR:
_log.error('playbin error: {0}'.format(message))
@@ -433,17 +157,20 @@ pending: {2}'.format(
def on_playbin_paused(self):
if self.has_reached_playbin_pause:
- _log.warn('Has already reached logic for playbin pause. Aborting \
+ _log.warn('Has already reached on_playbin_paused. Aborting \
without doing anything this time.')
return False
self.has_reached_playbin_pause = True
+ # XXX: Why is this even needed at this point?
current_video = self.playbin.get_property('current-video')
if not current_video:
- _log.critical('thumbnail could not get any video data \
+ _log.critical('Could not get any video data \
from playbin')
+ else:
+ _log.info('Got video data from playbin')
self.duration = self.get_duration(self.playbin)
self.permission_to_take_picture = True
@@ -474,7 +201,8 @@ from playbin')
return False
def on_thumbnail_message(self, message_bus, message):
- _log.debug('thumbnail message: {0}'.format(message))
+ # This is silenced to prevent clobbering of the terminal window
+ #_log.debug('thumbnail message: {0}'.format(message))
if message.type == gst.MESSAGE_ERROR:
_log.error('thumbnail error: {0}'.format(message.parse_error()))
@@ -490,29 +218,10 @@ pending: {2}'.format(
cur_state,
pending_state))
- if cur_state == gst.STATE_PAUSED and\
- not self.state == self.STATE_PROCESSING_THUMBNAIL:
- self.state = self.STATE_PROCESSING_THUMBNAIL
-
+ if cur_state == gst.STATE_PAUSED and \
+ not self.state == self.STATE_PROCESSING_THUMBNAIL:
# Find the fakesink sink pad and attach the on_buffer_probe
# handler to it.
- for sink in self.thumbnail_pipeline.sinks():
- sink_name = sink.get_name()
- sink_factory_name = sink.get_factory().get_name()
-
- if sink_factory_name == 'fakesink':
- sink_pad = sink.get_pad('sink')
-
- self.buffer_probes[sink_name] = sink_pad\
- .add_buffer_probe(
- self.on_pad_buffer_probe,
- sink_name)
-
- _log.info('Attached buffer probes: {0}'.format(
- self.buffer_probes))
-
- break
-
seek_amount = self.position_callback(self.duration, gst)
seek_result = self.thumbnail_pipeline.seek(
@@ -525,10 +234,30 @@ pending: {2}'.format(
0)
if not seek_result:
- _log.critical('Could not seek.')
+ _log.info('Could not seek.')
+ else:
+ _log.info('Seek successful, attaching buffer probe')
+ self.state = self.STATE_PROCESSING_THUMBNAIL
+ for sink in self.thumbnail_pipeline.sinks():
+ sink_name = sink.get_name()
+ sink_factory_name = sink.get_factory().get_name()
+
+ if sink_factory_name == 'fakesink':
+ sink_pad = sink.get_pad('sink')
+
+ self.buffer_probes[sink_name] = sink_pad\
+ .add_buffer_probe(
+ self.on_pad_buffer_probe,
+ sink_name)
+
+ _log.info('Attached buffer probes: {0}'.format(
+ self.buffer_probes))
+
+ break
+
elif self.state == self.STATE_PROCESSING_THUMBNAIL:
- _log.debug('Already processing thumbnail')
+ _log.info('Already processing thumbnail')
def on_pad_buffer_probe(self, *args):
_log.debug('buffer probe handler: {0}'.format(args))
@@ -649,7 +378,7 @@ pending: {2}'.format(
return self.get_duration(pipeline, attempt + 1)
-class VideoTranscoder:
+class VideoTranscoder(object):
'''
Video transcoder
@@ -1011,6 +740,10 @@ if __name__ == '__main__':
action='store_true',
help='Dear program, please be quiet unless *error*')
+ parser.add_option('-w', '--width',
+ type=int,
+ default=180)
+
(options, args) = parser.parse_args()
if options.verbose:
@@ -1030,6 +763,7 @@ if __name__ == '__main__':
transcoder = VideoTranscoder()
if options.action == 'thumbnail':
+ args.append(options.width)
VideoThumbnailerMarkII(*args)
elif options.action == 'video':
def cb(data):
diff --git a/mediagoblin/plugins/api/views.py b/mediagoblin/plugins/api/views.py
index 2055a663..fde76fe4 100644
--- a/mediagoblin/plugins/api/views.py
+++ b/mediagoblin/plugins/api/views.py
@@ -18,7 +18,6 @@ import json
import logging
from os.path import splitext
-from werkzeug.datastructures import FileStorage
from werkzeug.exceptions import BadRequest, Forbidden
from werkzeug.wrappers import Response
@@ -27,7 +26,8 @@ from mediagoblin.meddleware.csrf import csrf_exempt
from mediagoblin.media_types import sniff_media
from mediagoblin.plugins.api.tools import api_auth, get_entry_serializable, \
json_response
-from mediagoblin.submit.lib import prepare_queue_task, run_process_media
+from mediagoblin.submit.lib import check_file_field, prepare_queue_task, \
+ run_process_media
_log = logging.getLogger(__name__)
@@ -45,9 +45,7 @@ def post_entry(request):
_log.debug('Must POST against post_entry')
raise BadRequest()
- if not 'file' in request.files \
- or not isinstance(request.files['file'], FileStorage) \
- or not request.files['file'].stream:
+ if not check_file_field(request, 'file'):
_log.debug('File field not found')
raise BadRequest()
diff --git a/mediagoblin/plugins/oauth/views.py b/mediagoblin/plugins/oauth/views.py
index c7b2a332..ea45c209 100644
--- a/mediagoblin/plugins/oauth/views.py
+++ b/mediagoblin/plugins/oauth/views.py
@@ -45,11 +45,11 @@ def register_client(request):
if request.method == 'POST' and form.validate():
client = OAuthClient()
- client.name = unicode(request.form['name'])
- client.description = unicode(request.form['description'])
- client.type = unicode(request.form['type'])
+ client.name = unicode(form.name.data)
+ client.description = unicode(form.description.data)
+ client.type = unicode(form.type.data)
client.owner_id = request.user.id
- client.redirect_uri = unicode(request.form['redirect_uri'])
+ client.redirect_uri = unicode(form.redirect_uri.data)
client.generate_identifier()
client.generate_secret()
diff --git a/mediagoblin/tests/test_tests.py b/mediagoblin/plugins/piwigo/forms.py
index d539f1e0..5bb12e62 100644
--- a/mediagoblin/tests/test_tests.py
+++ b/mediagoblin/plugins/piwigo/forms.py
@@ -1,5 +1,5 @@
# GNU MediaGoblin -- federated, autonomous media hosting
-# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@@ -14,23 +14,15 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from mediagoblin import mg_globals
-from mediagoblin.tests.tools import get_app, fixture_add_user
-from mediagoblin.db.models import User
+import wtforms
-def test_get_app_wipes_db():
- """
- Make sure we get a fresh database on every wipe :)
- """
- get_app(dump_old_app=True)
- assert User.query.count() == 0
- fixture_add_user()
- assert User.query.count() == 1
-
- get_app(dump_old_app=False)
- assert User.query.count() == 1
-
- get_app(dump_old_app=True)
- assert User.query.count() == 0
+class AddSimpleForm(wtforms.Form):
+ image = wtforms.FileField()
+ name = wtforms.TextField(
+ validators=[wtforms.validators.Length(min=0, max=500)])
+ comment = wtforms.TextField()
+ # tags = wtforms.FieldList(wtforms.TextField())
+ category = wtforms.IntegerField()
+ level = wtforms.IntegerField()
diff --git a/mediagoblin/plugins/piwigo/views.py b/mediagoblin/plugins/piwigo/views.py
index e9ce6206..3dee09cd 100644
--- a/mediagoblin/plugins/piwigo/views.py
+++ b/mediagoblin/plugins/piwigo/views.py
@@ -24,6 +24,7 @@ from mediagoblin import mg_globals
from mediagoblin.meddleware.csrf import csrf_exempt
from mediagoblin.tools.response import render_404
from .tools import CmdTable, PwgNamedArray, response_xml
+from .forms import AddSimpleForm
_log = logging.getLogger(__name__)
@@ -45,7 +46,7 @@ def pwg_logout(request):
@CmdTable("pwg.getVersion")
def pwg_getversion(request):
- return "piwigo 2.5.0 (MediaGoblin)"
+ return "2.5.0 (MediaGoblin)"
@CmdTable("pwg.session.getStatus")
@@ -80,6 +81,20 @@ def pwg_images_exist(request):
return {}
+@CmdTable("pwg.images.addSimple", True)
+def pwg_images_addSimple(request):
+ form = AddSimpleForm(request.form)
+ if not form.validate():
+ _log.error("addSimple: form failed")
+ raise BadRequest()
+ dump = []
+ for f in form:
+ dump.append("%s=%r" % (f.name, f.data))
+ _log.info("addimple: %r %s %r", request.form, " ".join(dump), request.files)
+
+ return {'image_id': 123456, 'url': ''}
+
+
md5sum_matcher = re.compile(r"^[0-9a-fA-F]{32}$")
def fetch_md5(request, parm_name, optional_parm=False):
diff --git a/mediagoblin/plugins/raven/README.rst b/mediagoblin/plugins/raven/README.rst
index de5fd20d..4006060d 100644
--- a/mediagoblin/plugins/raven/README.rst
+++ b/mediagoblin/plugins/raven/README.rst
@@ -4,6 +4,8 @@
.. _raven-setup:
+Warning: this plugin is somewhat experimental.
+
Set up the raven plugin
=======================
diff --git a/mediagoblin/processing/__init__.py b/mediagoblin/processing/__init__.py
index f9445e28..a1fd3fb7 100644
--- a/mediagoblin/processing/__init__.py
+++ b/mediagoblin/processing/__init__.py
@@ -97,14 +97,27 @@ class ProcessingState(object):
return queued_filename
def copy_original(self, target_name, keyname=u"original"):
+ self.store_public(keyname, self.get_queued_filename(), target_name)
+
+ def store_public(self, keyname, local_file, target_name=None):
+ if target_name is None:
+ target_name = os.path.basename(local_file)
target_filepath = create_pub_filepath(self.entry, target_name)
- mgg.public_store.copy_local_to_storage(self.get_queued_filename(),
- target_filepath)
+ if keyname in self.entry.media_files:
+ _log.warn("store_public: keyname %r already used for file %r, "
+ "replacing with %r", keyname,
+ self.entry.media_files[keyname], target_filepath)
+ mgg.public_store.copy_local_to_storage(local_file, target_filepath)
self.entry.media_files[keyname] = target_filepath
def delete_queue_file(self):
+ # Remove queued media file from storage and database.
+ # queued_filepath is in the task_id directory which should
+ # be removed too, but fail if the directory is not empty to be on
+ # the super-safe side.
queued_filepath = self.entry.queued_media_file
- mgg.queue_store.delete_file(queued_filepath)
+ mgg.queue_store.delete_file(queued_filepath) # rm file
+ mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir
self.entry.queued_media_file = []
diff --git a/mediagoblin/storage/__init__.py b/mediagoblin/storage/__init__.py
index 5c1d7d36..bbe134a7 100644
--- a/mediagoblin/storage/__init__.py
+++ b/mediagoblin/storage/__init__.py
@@ -101,10 +101,20 @@ class StorageInterface(object):
def delete_file(self, filepath):
"""
- Delete or dereference the file at filepath.
+ Delete or dereference the file (not directory) at filepath.
+ """
+ # Subclasses should override this method.
+ self.__raise_not_implemented()
+
+ def delete_dir(self, dirpath, recursive=False):
+ """Delete the directory at dirpath
+
+ :param recursive: Usually, a directory must not contain any
+ files for the delete to succeed. If True, containing files
+ and subdirectories within dirpath will be recursively
+ deleted.
- This might need to delete directories, buckets, whatever, for
- cleanliness. (Be sure to avoid race conditions on that though)
+ :returns: True in case of success, False otherwise.
"""
# Subclasses should override this method.
self.__raise_not_implemented()
diff --git a/mediagoblin/storage/filestorage.py b/mediagoblin/storage/filestorage.py
index ef786b61..3d6e0753 100644
--- a/mediagoblin/storage/filestorage.py
+++ b/mediagoblin/storage/filestorage.py
@@ -62,10 +62,32 @@ class BasicFileStorage(StorageInterface):
return open(self._resolve_filepath(filepath), mode)
def delete_file(self, filepath):
- # TODO: Also delete unused directories if empty (safely, with
- # checks to avoid race conditions).
+ """Delete file at filepath
+
+ Raises OSError in case filepath is a directory."""
+ #TODO: log error
os.remove(self._resolve_filepath(filepath))
+ def delete_dir(self, dirpath, recursive=False):
+ """returns True on succes, False on failure"""
+
+ dirpath = self._resolve_filepath(dirpath)
+
+ # Shortcut the default and simple case of nonempty=F, recursive=F
+ if recursive:
+ try:
+ shutil.rmtree(dirpath)
+ except OSError as e:
+ #TODO: log something here
+ return False
+ else: # recursively delete everything
+ try:
+ os.rmdir(dirpath)
+ except OSError as e:
+ #TODO: log something here
+ return False
+ return True
+
def file_url(self, filepath):
if not self.base_url:
raise NoWebServing(
diff --git a/mediagoblin/submit/lib.py b/mediagoblin/submit/lib.py
index 679fc543..a5483471 100644
--- a/mediagoblin/submit/lib.py
+++ b/mediagoblin/submit/lib.py
@@ -17,6 +17,7 @@
import logging
import uuid
from werkzeug.utils import secure_filename
+from werkzeug.datastructures import FileStorage
from mediagoblin.processing import mark_entry_failed
from mediagoblin.processing.task import process_media
@@ -25,6 +26,16 @@ from mediagoblin.processing.task import process_media
_log = logging.getLogger(__name__)
+def check_file_field(request, field_name):
+ """Check if a file field meets minimal criteria"""
+ retval = (field_name in request.files
+ and isinstance(request.files[field_name], FileStorage)
+ and request.files[field_name].stream)
+ if not retval:
+ _log.debug("Form did not contain proper file field %s", field_name)
+ return retval
+
+
def prepare_queue_task(app, entry, filename):
"""
Prepare a MediaEntry for the processing queue and get a queue file
diff --git a/mediagoblin/submit/views.py b/mediagoblin/submit/views.py
index def7e839..9d31c844 100644
--- a/mediagoblin/submit/views.py
+++ b/mediagoblin/submit/views.py
@@ -22,7 +22,6 @@ import logging
_log = logging.getLogger(__name__)
-from werkzeug.datastructures import FileStorage
from mediagoblin.tools.text import convert_to_tag_list_of_dicts
from mediagoblin.tools.translate import pass_to_ugettext as _
@@ -32,7 +31,8 @@ from mediagoblin.submit import forms as submit_forms
from mediagoblin.messages import add_message, SUCCESS
from mediagoblin.media_types import sniff_media, \
InvalidFileType, FileTypeNotSupported
-from mediagoblin.submit.lib import run_process_media, prepare_queue_task
+from mediagoblin.submit.lib import check_file_field, prepare_queue_task, \
+ run_process_media
@require_active_login
@@ -44,9 +44,7 @@ def submit_start(request):
license=request.user.license_preference)
if request.method == 'POST' and submit_form.validate():
- if not ('file' in request.files
- and isinstance(request.files['file'], FileStorage)
- and request.files['file'].stream):
+ if not check_file_field(request, 'file'):
submit_form.file.errors.append(
_(u'You must provide a file.'))
else:
@@ -62,18 +60,18 @@ def submit_start(request):
entry = request.db.MediaEntry()
entry.media_type = unicode(media_type)
entry.title = (
- unicode(request.form['title'])
+ unicode(submit_form.title.data)
or unicode(splitext(filename)[0]))
- entry.description = unicode(request.form.get('description'))
+ entry.description = unicode(submit_form.description.data)
- entry.license = unicode(request.form.get('license', "")) or None
+ entry.license = unicode(submit_form.license.data) or None
entry.uploader = request.user.id
# Process the user's folksonomy "tags"
entry.tags = convert_to_tag_list_of_dicts(
- request.form.get('tags'))
+ submit_form.tags.data)
# Generate a slug from the title
entry.generate_slug()
@@ -127,8 +125,8 @@ def add_collection(request, media=None):
try:
collection = request.db.Collection()
- collection.title = unicode(request.form['title'])
- collection.description = unicode(request.form.get('description'))
+ collection.title = unicode(submit_form.title.data)
+ collection.description = unicode(submit_form.description.data)
collection.creator = request.user.id
collection.generate_slug()
diff --git a/mediagoblin/tests/conftest.py b/mediagoblin/tests/conftest.py
new file mode 100644
index 00000000..25f495d6
--- /dev/null
+++ b/mediagoblin/tests/conftest.py
@@ -0,0 +1,15 @@
+from mediagoblin.tests import tools
+
+import pytest
+
+@pytest.fixture()
+def test_app(request):
+ """
+ py.test fixture to pass sandboxed mediagoblin applications into tests that
+ want them.
+
+ You could make a local version of this method for your own tests
+ to override the paste and config files being used by passing them
+ in differently to get_app.
+ """
+ return tools.get_app(request)
diff --git a/mediagoblin/tests/pytest.ini b/mediagoblin/tests/pytest.ini
new file mode 100644
index 00000000..d4aa2d69
--- /dev/null
+++ b/mediagoblin/tests/pytest.ini
@@ -0,0 +1,2 @@
+[pytest]
+usefixtures = tmpdir \ No newline at end of file
diff --git a/mediagoblin/tests/test_api.py b/mediagoblin/tests/test_api.py
index 82b1c1b4..cff25776 100644
--- a/mediagoblin/tests/test_api.py
+++ b/mediagoblin/tests/test_api.py
@@ -20,9 +20,11 @@ import base64
from pkg_resources import resource_filename
+import pytest
+
from mediagoblin import mg_globals
from mediagoblin.tools import template, pluginapi
-from mediagoblin.tests.tools import get_app, fixture_add_user
+from mediagoblin.tests.tools import fixture_add_user
_log = logging.getLogger(__name__)
@@ -43,15 +45,14 @@ BIG_BLUE = resource('bigblue.png')
class TestAPI(object):
- def setUp(self):
- self.app = get_app(dump_old_app=False)
+ def setup(self):
self.db = mg_globals.database
self.user_password = u'4cc355_70k3N'
self.user = fixture_add_user(u'joapi', self.user_password)
- def login(self):
- self.app.post(
+ def login(self, test_app):
+ test_app.post(
'/auth/login/', {
'username': self.user.username,
'password': self.user_password})
@@ -65,14 +66,14 @@ class TestAPI(object):
self.user.username,
self.user_password])))}
- def do_post(self, data, **kwargs):
+ def do_post(self, data, test_app, **kwargs):
url = kwargs.pop('url', '/api/submit')
do_follow = kwargs.pop('do_follow', False)
if not 'headers' in kwargs.keys():
kwargs['headers'] = self.http_auth_headers()
- response = self.app.post(url, data, **kwargs)
+ response = test_app.post(url, data, **kwargs)
if do_follow:
response.follow()
@@ -82,21 +83,22 @@ class TestAPI(object):
def upload_data(self, filename):
return {'upload_files': [('file', filename)]}
- def test_1_test_test_view(self):
- self.login()
+ def test_1_test_test_view(self, test_app):
+ self.login(test_app)
- response = self.app.get(
+ response = test_app.get(
'/api/test',
headers=self.http_auth_headers())
assert response.body == \
'{"username": "joapi", "email": "joapi@example.com"}'
- def test_2_test_submission(self):
- self.login()
+ def test_2_test_submission(self, test_app):
+ self.login(test_app)
response = self.do_post(
{'title': 'Great JPG!'},
+ test_app,
**self.upload_data(GOOD_JPG))
assert response.status_int == 200
diff --git a/mediagoblin/tests/test_auth.py b/mediagoblin/tests/test_auth.py
index a59a319c..755727f9 100644
--- a/mediagoblin/tests/test_auth.py
+++ b/mediagoblin/tests/test_auth.py
@@ -17,12 +17,10 @@
import urlparse
import datetime
-from nose.tools import assert_equal
-
from mediagoblin import mg_globals
from mediagoblin.auth import lib as auth_lib
from mediagoblin.db.models import User
-from mediagoblin.tests.tools import setup_fresh_app, get_app, fixture_add_user
+from mediagoblin.tests.tools import fixture_add_user
from mediagoblin.tools import template, mail
@@ -65,7 +63,6 @@ def test_bcrypt_gen_password_hash():
'notthepassword', hashed_pw, '3><7R45417')
-@setup_fresh_app
def test_register_views(test_app):
"""
Massive test function that all our registration-related views all work.
@@ -102,8 +99,8 @@ def test_register_views(test_app):
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
form = context['register_form']
- assert_equal (form.username.errors, [u'Field must be between 3 and 30 characters long.'])
- assert_equal (form.password.errors, [u'Field must be between 5 and 1024 characters long.'])
+ assert form.username.errors == [u'Field must be between 3 and 30 characters long.']
+ assert form.password.errors == [u'Field must be between 5 and 1024 characters long.']
## bad form
template.clear_test_template_context()
@@ -114,11 +111,11 @@ def test_register_views(test_app):
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
form = context['register_form']
- assert_equal (form.username.errors, [u'This field does not take email addresses.'])
- assert_equal (form.email.errors, [u'This field requires an email address.'])
+ assert form.username.errors == [u'This field does not take email addresses.']
+ assert form.email.errors == [u'This field requires an email address.']
## At this point there should be no users in the database ;)
- assert_equal(User.query.count(), 0)
+ assert User.query.count() == 0
# Successful register
# -------------------
@@ -131,9 +128,7 @@ def test_register_views(test_app):
response.follow()
## Did we redirect to the proper page? Use the right template?
- assert_equal(
- urlparse.urlsplit(response.location)[2],
- '/u/happygirl/')
+ assert urlparse.urlsplit(response.location)[2] == '/u/happygirl/'
assert 'mediagoblin/user_pages/user.html' in template.TEMPLATE_TEST_CONTEXT
## Make sure user is in place
@@ -224,9 +219,7 @@ def test_register_views(test_app):
response.follow()
## Did we redirect to the proper page? Use the right template?
- assert_equal(
- urlparse.urlsplit(response.location)[2],
- '/auth/login/')
+ assert urlparse.urlsplit(response.location)[2] == '/auth/login/'
assert 'mediagoblin/auth/login.html' in template.TEMPLATE_TEST_CONTEXT
## Make sure link to change password is sent by email
@@ -257,7 +250,7 @@ def test_register_views(test_app):
response = test_app.get(
"/auth/forgot_password/verify/?userid=%s&token=total_bs" % unicode(
new_user.id), status=404)
- assert_equal(response.status.split()[0], u'404') # status="404 NOT FOUND"
+ assert response.status.split()[0] == u'404' # status="404 NOT FOUND"
## Try using an expired token to change password, shouldn't work
template.clear_test_template_context()
@@ -266,7 +259,7 @@ def test_register_views(test_app):
new_user.fp_token_expire = datetime.datetime.now()
new_user.save()
response = test_app.get("%s?%s" % (path, get_params), status=404)
- assert_equal(response.status.split()[0], u'404') # status="404 NOT FOUND"
+ assert response.status.split()[0] == u'404' # status="404 NOT FOUND"
new_user.fp_token_expire = real_token_expiration
new_user.save()
@@ -294,17 +287,14 @@ def test_register_views(test_app):
# User should be redirected
response.follow()
- assert_equal(
- urlparse.urlsplit(response.location)[2],
- '/')
+ assert urlparse.urlsplit(response.location)[2] == '/'
assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
-def test_authentication_views():
+def test_authentication_views(test_app):
"""
Test logging in and logging out
"""
- test_app = get_app(dump_old_app=False)
# Make a new user
test_user = fixture_add_user(active_user=False)
@@ -372,9 +362,7 @@ def test_authentication_views():
# User should be redirected
response.follow()
- assert_equal(
- urlparse.urlsplit(response.location)[2],
- '/')
+ assert urlparse.urlsplit(response.location)[2] == '/'
assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
# Make sure user is in the session
@@ -389,9 +377,7 @@ def test_authentication_views():
# Should be redirected to index page
response.follow()
- assert_equal(
- urlparse.urlsplit(response.location)[2],
- '/')
+ assert urlparse.urlsplit(response.location)[2] == '/'
assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
# Make sure the user is not in the session
@@ -407,6 +393,4 @@ def test_authentication_views():
'username': u'chris',
'password': 'toast',
'next' : '/u/chris/'})
- assert_equal(
- urlparse.urlsplit(response.location)[2],
- '/u/chris/')
+ assert urlparse.urlsplit(response.location)[2] == '/u/chris/'
diff --git a/mediagoblin/tests/test_collections.py b/mediagoblin/tests/test_collections.py
index b19f6362..87782f30 100644
--- a/mediagoblin/tests/test_collections.py
+++ b/mediagoblin/tests/test_collections.py
@@ -14,17 +14,12 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from mediagoblin.tests.tools import fixture_add_collection, fixture_add_user, \
- get_app
+from mediagoblin.tests.tools import fixture_add_collection, fixture_add_user
from mediagoblin.db.models import Collection, User
-from mediagoblin.db.base import Session
-from nose.tools import assert_equal
-def test_user_deletes_collection():
+def test_user_deletes_collection(test_app):
# Setup db.
- get_app(dump_old_app=False)
-
user = fixture_add_user()
coll = fixture_add_collection(user=user)
# Reload into session:
@@ -34,4 +29,4 @@ def test_user_deletes_collection():
user.delete()
cnt2 = Collection.query.count()
- assert_equal(cnt1, cnt2 + 1)
+ assert cnt1 == cnt2 + 1
diff --git a/mediagoblin/tests/test_csrf_middleware.py b/mediagoblin/tests/test_csrf_middleware.py
index e720264c..a272caf6 100644
--- a/mediagoblin/tests/test_csrf_middleware.py
+++ b/mediagoblin/tests/test_csrf_middleware.py
@@ -14,12 +14,10 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from mediagoblin.tests.tools import get_app
from mediagoblin import mg_globals
-def test_csrf_cookie_set():
- test_app = get_app(dump_old_app=False)
+def test_csrf_cookie_set(test_app):
cookie_name = mg_globals.app_config['csrf_cookie_name']
# get login page
@@ -33,11 +31,14 @@ def test_csrf_cookie_set():
assert response.headers.get('Vary', False) == 'Cookie'
-def test_csrf_token_must_match():
- # We need a fresh app for this test on webtest < 1.3.6.
- # We do not understand why, but it fixes the tests.
- # If we require webtest >= 1.3.6, we can switch to a non fresh app here.
- test_app = get_app(dump_old_app=True)
+# We need a fresh app for this test on webtest < 1.3.6.
+# We do not understand why, but it fixes the tests.
+# If we require webtest >= 1.3.6, we can switch to a non fresh app here.
+#
+# ... this comment might be irrelevant post-pytest-fixtures, but I'm not
+# removing it yet in case we move to module-level tests :)
+# -- cwebber
+def test_csrf_token_must_match(test_app):
# construct a request with no cookie or form token
assert test_app.post('/auth/login/',
@@ -67,8 +68,7 @@ def test_csrf_token_must_match():
extra_environ={'gmg.verify_csrf': True}).\
status_int == 200
-def test_csrf_exempt():
- test_app = get_app(dump_old_app=False)
+def test_csrf_exempt(test_app):
# monkey with the views to decorate a known endpoint
import mediagoblin.auth.views
from mediagoblin.meddleware.csrf import csrf_exempt
diff --git a/mediagoblin/tests/test_edit.py b/mediagoblin/tests/test_edit.py
index 7db6eaea..cda2607f 100644
--- a/mediagoblin/tests/test_edit.py
+++ b/mediagoblin/tests/test_edit.py
@@ -14,35 +14,35 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from nose.tools import assert_equal
+import pytest
from mediagoblin import mg_globals
from mediagoblin.db.models import User
-from mediagoblin.tests.tools import get_app, fixture_add_user
+from mediagoblin.tests.tools import fixture_add_user
from mediagoblin.tools import template
from mediagoblin.auth.lib import bcrypt_check_password
class TestUserEdit(object):
- def setUp(self):
- self.app = get_app(dump_old_app=False)
+ def setup(self):
# set up new user
self.user_password = u'toast'
self.user = fixture_add_user(password = self.user_password)
- self.login()
- def login(self):
- self.app.post(
+ def login(self, test_app):
+ test_app.post(
'/auth/login/', {
'username': self.user.username,
'password': self.user_password})
- def test_user_deletion(self):
+ def test_user_deletion(self, test_app):
"""Delete user via web interface"""
+ self.login(test_app)
+
# Make sure user exists
assert User.query.filter_by(username=u'chris').first()
- res = self.app.post('/edit/account/delete/', {'confirmed': 'y'})
+ res = test_app.post('/edit/account/delete/', {'confirmed': 'y'})
# Make sure user has been deleted
assert User.query.filter_by(username=u'chris').first() == None
@@ -52,14 +52,16 @@ class TestUserEdit(object):
#Restore user at end of test
self.user = fixture_add_user(password = self.user_password)
- self.login()
+ self.login(test_app)
- def test_change_password(self):
+ def test_change_password(self, test_app):
"""Test changing password correctly and incorrectly"""
+ self.login(test_app)
+
# test that the password can be changed
# template.clear_test_template_context()
- res = self.app.post(
+ res = test_app.post(
'/edit/account/', {
'old_password': 'toast',
'new_password': '123456',
@@ -67,7 +69,7 @@ class TestUserEdit(object):
})
# Check for redirect on success
- assert_equal(res.status_int, 302)
+ assert res.status_int == 302
# test_user has to be fetched again in order to have the current values
test_user = User.query.filter_by(username=u'chris').first()
assert bcrypt_check_password('123456', test_user.pw_hash)
@@ -76,7 +78,7 @@ class TestUserEdit(object):
# test that the password cannot be changed if the given
# old_password is wrong template.clear_test_template_context()
- self.app.post(
+ test_app.post(
'/edit/account/', {
'old_password': 'toast',
'new_password': '098765',
@@ -86,50 +88,54 @@ class TestUserEdit(object):
assert not bcrypt_check_password('098765', test_user.pw_hash)
-
- def test_change_bio_url(self):
+ def test_change_bio_url(self, test_app):
"""Test changing bio and URL"""
+ self.login(test_app)
+
# Test if legacy profile editing URL redirects correctly
- res = self.app.post(
+ res = test_app.post(
'/edit/profile/', {
'bio': u'I love toast!',
'url': u'http://dustycloud.org/'}, expect_errors=True)
# Should redirect to /u/chris/edit/
- assert_equal (res.status_int, 302)
+ assert res.status_int == 302
assert res.headers['Location'].endswith("/u/chris/edit/")
- res = self.app.post(
+ res = test_app.post(
'/u/chris/edit/', {
'bio': u'I love toast!',
'url': u'http://dustycloud.org/'})
test_user = User.query.filter_by(username=u'chris').first()
- assert_equal(test_user.bio, u'I love toast!')
- assert_equal(test_user.url, u'http://dustycloud.org/')
+ assert test_user.bio == u'I love toast!'
+ assert test_user.url == u'http://dustycloud.org/'
# change a different user than the logged in (should fail with 403)
fixture_add_user(username=u"foo")
- res = self.app.post(
+ res = test_app.post(
'/u/foo/edit/', {
'bio': u'I love toast!',
'url': u'http://dustycloud.org/'}, expect_errors=True)
- assert_equal(res.status_int, 403)
+ assert res.status_int == 403
# test changing the bio and the URL inproperly
too_long_bio = 150 * 'T' + 150 * 'o' + 150 * 'a' + 150 * 's' + 150* 't'
- self.app.post(
+ test_app.post(
'/u/chris/edit/', {
# more than 500 characters
'bio': too_long_bio,
'url': 'this-is-no-url'})
# Check form errors
- context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/edit/edit_profile.html']
+ context = template.TEMPLATE_TEST_CONTEXT[
+ 'mediagoblin/edit/edit_profile.html']
form = context['form']
- assert_equal(form.bio.errors, [u'Field must be between 0 and 500 characters long.'])
- assert_equal(form.url.errors, [u'This address contains errors'])
+ assert form.bio.errors == [
+ u'Field must be between 0 and 500 characters long.']
+ assert form.url.errors == [
+ u'This address contains errors']
# test changing the url inproperly
diff --git a/mediagoblin/tests/test_globals.py b/mediagoblin/tests/test_globals.py
index 303f89e2..fe3088f8 100644
--- a/mediagoblin/tests/test_globals.py
+++ b/mediagoblin/tests/test_globals.py
@@ -14,16 +14,16 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from nose.tools import assert_raises
+import pytest
from mediagoblin import mg_globals
class TestGlobals(object):
- def setUp(self):
+ def setup(self):
self.old_database = mg_globals.database
- def tearDown(self):
+ def teardown(self):
mg_globals.database = self.old_database
def test_setup_globals(self):
@@ -36,7 +36,7 @@ class TestGlobals(object):
assert mg_globals.public_store == 'my favorite public_store!'
assert mg_globals.queue_store == 'my favorite queue_store!'
- assert_raises(
+ pytest.raises(
AssertionError,
mg_globals.setup_globals,
- no_such_global_foo = "Dummy")
+ no_such_global_foo="Dummy")
diff --git a/mediagoblin/tests/test_http_callback.py b/mediagoblin/tests/test_http_callback.py
index 8bee7045..e2c85d0d 100644
--- a/mediagoblin/tests/test_http_callback.py
+++ b/mediagoblin/tests/test_http_callback.py
@@ -20,28 +20,27 @@ from urlparse import urlparse, parse_qs
from mediagoblin import mg_globals
from mediagoblin.tools import processing
-from mediagoblin.tests.tools import get_app, fixture_add_user
+from mediagoblin.tests.tools import fixture_add_user
from mediagoblin.tests.test_submission import GOOD_PNG
from mediagoblin.tests import test_oauth as oauth
class TestHTTPCallback(object):
- def setUp(self):
- self.app = get_app(dump_old_app=False)
+ def _setup(self, test_app):
self.db = mg_globals.database
self.user_password = u'secret'
self.user = fixture_add_user(u'call_back', self.user_password)
- self.login()
+ self.login(test_app)
- def login(self):
- self.app.post('/auth/login/', {
+ def login(self, testapp):
+ testapp.post('/auth/login/', {
'username': self.user.username,
'password': self.user_password})
- def get_access_token(self, client_id, client_secret, code):
- response = self.app.get('/oauth/access_token', {
+ def get_access_token(self, testapp, client_id, client_secret, code):
+ response = testapp.get('/oauth/access_token', {
'code': code,
'client_id': client_id,
'client_secret': client_secret})
@@ -50,13 +49,15 @@ class TestHTTPCallback(object):
return response_data['access_token']
- def test_callback(self):
+ def test_callback(self, test_app):
''' Test processing HTTP callback '''
+ self._setup(test_app)
self.oauth = oauth.TestOAuth()
- self.oauth.setUp()
+ self.oauth._setup(test_app)
- redirect, client_id = self.oauth.test_4_authorize_confidential_client()
+ redirect, client_id = self.oauth.test_4_authorize_confidential_client(
+ test_app)
code = parse_qs(urlparse(redirect.location).query)['code'][0]
@@ -65,11 +66,11 @@ class TestHTTPCallback(object):
client_secret = client.secret
- access_token = self.get_access_token(client_id, client_secret, code)
+ access_token = self.get_access_token(test_app, client_id, client_secret, code)
callback_url = 'https://foo.example?secrettestmediagoblinparam'
- res = self.app.post('/api/submit?client_id={0}&access_token={1}\
+ res = test_app.post('/api/submit?client_id={0}&access_token={1}\
&client_secret={2}'.format(
client_id,
access_token,
diff --git a/mediagoblin/tests/test_messages.py b/mediagoblin/tests/test_messages.py
index 4c0f3e2e..3ac917b0 100644
--- a/mediagoblin/tests/test_messages.py
+++ b/mediagoblin/tests/test_messages.py
@@ -15,18 +15,15 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from mediagoblin.messages import fetch_messages, add_message
-from mediagoblin.tests.tools import get_app
from mediagoblin.tools import template
-
-def test_messages():
+def test_messages(test_app):
"""
Added messages should show up in the request.session,
fetched messages should be the same as the added ones,
and fetching should clear the message list.
"""
- test_app = get_app(dump_old_app=False)
# Aquire a request object
test_app.get('/')
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
diff --git a/mediagoblin/tests/test_misc.py b/mediagoblin/tests/test_misc.py
index 776affc6..755d863f 100644
--- a/mediagoblin/tests/test_misc.py
+++ b/mediagoblin/tests/test_misc.py
@@ -14,21 +14,17 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from nose.tools import assert_equal
-
from mediagoblin.db.base import Session
from mediagoblin.db.models import User, MediaEntry, MediaComment
-from mediagoblin.tests.tools import get_app, \
- fixture_add_user, fixture_media_entry
+from mediagoblin.tests.tools import fixture_add_user, fixture_media_entry
-def test_404_for_non_existent():
- test_app = get_app(dump_old_app=False)
+def test_404_for_non_existent(test_app):
res = test_app.get('/does-not-exist/', expect_errors=True)
- assert_equal(res.status_int, 404)
+ assert res.status_int == 404
-def test_user_deletes_other_comments():
+def test_user_deletes_other_comments(test_app):
user_a = fixture_add_user(u"chris_a")
user_b = fixture_add_user(u"chris_b")
@@ -60,11 +56,11 @@ def test_user_deletes_other_comments():
cmt_cnt2 = MediaComment.query.count()
# One user deleted
- assert_equal(usr_cnt2, usr_cnt1 - 1)
+ assert usr_cnt2 == usr_cnt1 - 1
# One media gone
- assert_equal(med_cnt2, med_cnt1 - 1)
+ assert med_cnt2 == med_cnt1 - 1
# Three of four comments gone.
- assert_equal(cmt_cnt2, cmt_cnt1 - 3)
+ assert cmt_cnt2 == cmt_cnt1 - 3
User.query.get(user_b.id).delete()
@@ -73,14 +69,14 @@ def test_user_deletes_other_comments():
cmt_cnt2 = MediaComment.query.count()
# All users gone
- assert_equal(usr_cnt2, usr_cnt1 - 2)
+ assert usr_cnt2 == usr_cnt1 - 2
# All media gone
- assert_equal(med_cnt2, med_cnt1 - 2)
+ assert med_cnt2 == med_cnt1 - 2
# All comments gone
- assert_equal(cmt_cnt2, cmt_cnt1 - 4)
+ assert cmt_cnt2 == cmt_cnt1 - 4
-def test_media_deletes_broken_attachment():
+def test_media_deletes_broken_attachment(test_app):
user_a = fixture_add_user(u"chris_a")
media = fixture_media_entry(uploader=user_a.id, save=False)
diff --git a/mediagoblin/tests/test_modelmethods.py b/mediagoblin/tests/test_modelmethods.py
index 7719bd97..427aa47c 100644
--- a/mediagoblin/tests/test_modelmethods.py
+++ b/mediagoblin/tests/test_modelmethods.py
@@ -17,13 +17,10 @@
# Maybe not every model needs a test, but some models have special
# methods, and so it makes sense to test them here.
-from nose.tools import assert_equal
-
from mediagoblin.db.base import Session
from mediagoblin.db.models import MediaEntry
-from mediagoblin.tests.tools import get_app, \
- fixture_add_user
+from mediagoblin.tests.tools import fixture_add_user
import mock
@@ -35,8 +32,7 @@ UUID_MOCK = mock.Mock(return_value=FakeUUID())
class TestMediaEntrySlugs(object):
- def setUp(self):
- self.test_app = get_app(dump_old_app=True)
+ def _setup(self):
self.chris_user = fixture_add_user(u'chris')
self.emily_user = fixture_add_user(u'emily')
self.existing_entry = self._insert_media_entry_fixture(
@@ -57,56 +53,78 @@ class TestMediaEntrySlugs(object):
return entry
- def test_unique_slug_from_title(self):
+ def test_unique_slug_from_title(self, test_app):
+ self._setup()
+
entry = self._insert_media_entry_fixture(u"Totally unique slug!", save=False)
entry.generate_slug()
assert entry.slug == u'totally-unique-slug'
- def test_old_good_unique_slug(self):
+ def test_old_good_unique_slug(self, test_app):
+ self._setup()
+
entry = self._insert_media_entry_fixture(
u"A title here", u"a-different-slug-there", save=False)
entry.generate_slug()
assert entry.slug == u"a-different-slug-there"
- def test_old_weird_slug(self):
+ def test_old_weird_slug(self, test_app):
+ self._setup()
+
entry = self._insert_media_entry_fixture(
slug=u"wowee!!!!!", save=False)
entry.generate_slug()
assert entry.slug == u"wowee"
- def test_existing_slug_use_id(self):
- entry = self._insert_media_entry_fixture(
- u"Beware, I exist!!", this_id=9000, save=False)
- entry.generate_slug()
- assert entry.slug == u"beware-i-exist-9000"
-
- @mock.patch('uuid.uuid4', UUID_MOCK)
- def test_existing_slug_cant_use_id(self):
- # This one grabs the nine thousand slug
- self._insert_media_entry_fixture(
- slug=u"beware-i-exist-9000")
+ def test_existing_slug_use_id(self, test_app):
+ self._setup()
entry = self._insert_media_entry_fixture(
u"Beware, I exist!!", this_id=9000, save=False)
entry.generate_slug()
- assert entry.slug == u"beware-i-exist-test"
-
- @mock.patch('uuid.uuid4', UUID_MOCK)
- def test_existing_slug_cant_use_id_extra_junk(self):
- # This one grabs the nine thousand slug
- self._insert_media_entry_fixture(
- slug=u"beware-i-exist-9000")
-
- # This one grabs makes sure the annoyance doesn't stop
- self._insert_media_entry_fixture(
- slug=u"beware-i-exist-test")
-
- entry = self._insert_media_entry_fixture(
- u"Beware, I exist!!", this_id=9000, save=False)
- entry.generate_slug()
- assert entry.slug == u"beware-i-exist-testtest"
+ assert entry.slug == u"beware-i-exist-9000"
- def test_garbage_slug(self):
+ def test_existing_slug_cant_use_id(self, test_app):
+ self._setup()
+
+ # Getting tired of dealing with test_app and this mock.patch
+ # thing conflicting, getting lazy.
+ @mock.patch('uuid.uuid4', UUID_MOCK)
+ def _real_test():
+ # This one grabs the nine thousand slug
+ self._insert_media_entry_fixture(
+ slug=u"beware-i-exist-9000")
+
+ entry = self._insert_media_entry_fixture(
+ u"Beware, I exist!!", this_id=9000, save=False)
+ entry.generate_slug()
+ assert entry.slug == u"beware-i-exist-test"
+
+ _real_test()
+
+ def test_existing_slug_cant_use_id_extra_junk(self, test_app):
+ self._setup()
+
+ # Getting tired of dealing with test_app and this mock.patch
+ # thing conflicting, getting lazy.
+ @mock.patch('uuid.uuid4', UUID_MOCK)
+ def _real_test():
+ # This one grabs the nine thousand slug
+ self._insert_media_entry_fixture(
+ slug=u"beware-i-exist-9000")
+
+ # This one grabs makes sure the annoyance doesn't stop
+ self._insert_media_entry_fixture(
+ slug=u"beware-i-exist-test")
+
+ entry = self._insert_media_entry_fixture(
+ u"Beware, I exist!!", this_id=9000, save=False)
+ entry.generate_slug()
+ assert entry.slug == u"beware-i-exist-testtest"
+
+ _real_test()
+
+ def test_garbage_slug(self, test_app):
"""
Titles that sound totally like Q*Bert shouldn't have slugs at
all. We'll just reference them by id.
@@ -126,13 +144,15 @@ class TestMediaEntrySlugs(object):
| |#| |#| |#| |#|
\|/ \|/ \|/ \|/
"""
+ self._setup()
+
qbert_entry = self._insert_media_entry_fixture(
u"@!#?@!", save=False)
qbert_entry.generate_slug()
assert qbert_entry.slug is None
-def test_media_data_init():
+def test_media_data_init(test_app):
Session.rollback()
Session.remove()
media = MediaEntry()
@@ -144,4 +164,4 @@ def test_media_data_init():
for obj in Session():
obj_in_session += 1
print repr(obj)
- assert_equal(obj_in_session, 0)
+ assert obj_in_session == 0
diff --git a/mediagoblin/tests/test_oauth.py b/mediagoblin/tests/test_oauth.py
index 94ba5dab..901556fe 100644
--- a/mediagoblin/tests/test_oauth.py
+++ b/mediagoblin/tests/test_oauth.py
@@ -21,15 +21,14 @@ from urlparse import parse_qs, urlparse
from mediagoblin import mg_globals
from mediagoblin.tools import template, pluginapi
-from mediagoblin.tests.tools import get_app, fixture_add_user
+from mediagoblin.tests.tools import fixture_add_user
_log = logging.getLogger(__name__)
class TestOAuth(object):
- def setUp(self):
- self.app = get_app()
+ def _setup(self, test_app):
self.db = mg_globals.database
self.pman = pluginapi.PluginManager()
@@ -37,17 +36,17 @@ class TestOAuth(object):
self.user_password = u'4cc355_70k3N'
self.user = fixture_add_user(u'joauth', self.user_password)
- self.login()
+ self.login(test_app)
- def login(self):
- self.app.post(
+ def login(self, test_app):
+ test_app.post(
'/auth/login/', {
'username': self.user.username,
'password': self.user_password})
- def register_client(self, name, client_type, description=None,
+ def register_client(self, test_app, name, client_type, description=None,
redirect_uri=''):
- return self.app.post(
+ return test_app.post(
'/oauth/client/register', {
'name': name,
'description': description,
@@ -57,9 +56,11 @@ class TestOAuth(object):
def get_context(self, template_name):
return template.TEMPLATE_TEST_CONTEXT[template_name]
- def test_1_public_client_registration_without_redirect_uri(self):
+ def test_1_public_client_registration_without_redirect_uri(self, test_app):
''' Test 'public' OAuth client registration without any redirect uri '''
- response = self.register_client(u'OMGOMGOMG', 'public',
+ self._setup(test_app)
+
+ response = self.register_client(test_app, u'OMGOMGOMG', 'public',
'OMGOMG Apache License v2')
ctx = self.get_context('oauth/client/register.html')
@@ -75,10 +76,10 @@ class TestOAuth(object):
# Should not pass through
assert not client
- def test_2_successful_public_client_registration(self):
+ def test_2_successful_public_client_registration(self, test_app):
''' Successfully register a public client '''
- self.login()
- self.register_client(u'OMGOMG', 'public', 'OMG!',
+ self._setup(test_app)
+ self.register_client(test_app, u'OMGOMG', 'public', 'OMG!',
'http://foo.example')
client = self.db.OAuthClient.query.filter(
@@ -87,9 +88,12 @@ class TestOAuth(object):
# Client should have been registered
assert client
- def test_3_successful_confidential_client_reg(self):
+ def test_3_successful_confidential_client_reg(self, test_app):
''' Register a confidential OAuth client '''
- response = self.register_client(u'GMOGMO', 'confidential', 'NO GMO!')
+ self._setup(test_app)
+
+ response = self.register_client(
+ test_app, u'GMOGMO', 'confidential', 'NO GMO!')
assert response.status_int == 302
@@ -101,15 +105,16 @@ class TestOAuth(object):
return client
- def test_4_authorize_confidential_client(self):
+ def test_4_authorize_confidential_client(self, test_app):
''' Authorize a confidential client as a logged in user '''
+ self._setup(test_app)
- client = self.test_3_successful_confidential_client_reg()
+ client = self.test_3_successful_confidential_client_reg(test_app)
client_identifier = client.identifier
redirect_uri = 'https://foo.example'
- response = self.app.get('/oauth/authorize', {
+ response = test_app.get('/oauth/authorize', {
'client_id': client.identifier,
'scope': 'admin',
'redirect_uri': redirect_uri})
@@ -122,7 +127,7 @@ class TestOAuth(object):
form = ctx['form']
# Short for client authorization post reponse
- capr = self.app.post(
+ capr = test_app.post(
'/oauth/client/authorize', {
'client_id': form.client_id.data,
'allow': 'Allow',
@@ -139,16 +144,19 @@ class TestOAuth(object):
def get_code_from_redirect_uri(self, uri):
return parse_qs(urlparse(uri).query)['code'][0]
- def test_token_endpoint_successful_confidential_request(self):
+ def test_token_endpoint_successful_confidential_request(self, test_app):
''' Successful request against token endpoint '''
- code_redirect, client_id = self.test_4_authorize_confidential_client()
+ self._setup(test_app)
+
+ code_redirect, client_id = self.test_4_authorize_confidential_client(
+ test_app)
code = self.get_code_from_redirect_uri(code_redirect.location)
client = self.db.OAuthClient.query.filter(
self.db.OAuthClient.identifier == unicode(client_id)).first()
- token_res = self.app.get('/oauth/access_token?client_id={0}&\
+ token_res = test_app.get('/oauth/access_token?client_id={0}&\
code={1}&client_secret={2}'.format(client_id, code, client.secret))
assert token_res.status_int == 200
@@ -162,16 +170,19 @@ code={1}&client_secret={2}'.format(client_id, code, client.secret))
assert type(token_data['expires_in']) == int
assert token_data['expires_in'] > 0
- def test_token_endpont_missing_id_confidential_request(self):
+ def test_token_endpont_missing_id_confidential_request(self, test_app):
''' Unsuccessful request against token endpoint, missing client_id '''
- code_redirect, client_id = self.test_4_authorize_confidential_client()
+ self._setup(test_app)
+
+ code_redirect, client_id = self.test_4_authorize_confidential_client(
+ test_app)
code = self.get_code_from_redirect_uri(code_redirect.location)
client = self.db.OAuthClient.query.filter(
self.db.OAuthClient.identifier == unicode(client_id)).first()
- token_res = self.app.get('/oauth/access_token?\
+ token_res = test_app.get('/oauth/access_token?\
code={0}&client_secret={1}'.format(code, client.secret))
assert token_res.status_int == 200
diff --git a/mediagoblin/tests/test_paste.ini b/mediagoblin/tests/test_paste.ini
index d7c18642..875b4f65 100644
--- a/mediagoblin/tests/test_paste.ini
+++ b/mediagoblin/tests/test_paste.ini
@@ -10,7 +10,7 @@ use = egg:Paste#urlmap
[app:mediagoblin]
use = egg:mediagoblin#app
filter-with = beaker
-config = %(here)s/test_mgoblin_app.ini
+config = %(here)s/mediagoblin.ini
[app:publicstore_serve]
use = egg:Paste#static
diff --git a/mediagoblin/tests/test_pluginapi.py b/mediagoblin/tests/test_pluginapi.py
index 315a95da..245c396d 100644
--- a/mediagoblin/tests/test_pluginapi.py
+++ b/mediagoblin/tests/test_pluginapi.py
@@ -19,7 +19,6 @@ from configobj import ConfigObj
from mediagoblin import mg_globals
from mediagoblin.init.plugins import setup_plugins
from mediagoblin.tools import pluginapi
-from nose.tools import eq_
def with_cleanup(*modules_to_delete):
@@ -97,7 +96,7 @@ def test_no_plugins():
setup_plugins()
# Make sure we didn't load anything.
- eq_(len(pman.plugins), 0)
+ assert len(pman.plugins) == 0
@with_cleanup('mediagoblin.plugins.sampleplugin')
@@ -117,14 +116,14 @@ def test_one_plugin():
setup_plugins()
# Make sure we only found one plugin
- eq_(len(pman.plugins), 1)
+ assert len(pman.plugins) == 1
# Make sure the plugin is the one we think it is.
- eq_(pman.plugins[0], 'mediagoblin.plugins.sampleplugin')
+ assert pman.plugins[0] == 'mediagoblin.plugins.sampleplugin'
# Make sure there was one hook registered
- eq_(len(pman.hooks), 1)
+ assert len(pman.hooks) == 1
# Make sure _setup_plugin_called was called once
import mediagoblin.plugins.sampleplugin
- eq_(mediagoblin.plugins.sampleplugin._setup_plugin_called, 1)
+ assert mediagoblin.plugins.sampleplugin._setup_plugin_called == 1
@with_cleanup('mediagoblin.plugins.sampleplugin')
@@ -145,14 +144,14 @@ def test_same_plugin_twice():
setup_plugins()
# Make sure we only found one plugin
- eq_(len(pman.plugins), 1)
+ assert len(pman.plugins) == 1
# Make sure the plugin is the one we think it is.
- eq_(pman.plugins[0], 'mediagoblin.plugins.sampleplugin')
+ assert pman.plugins[0] == 'mediagoblin.plugins.sampleplugin'
# Make sure there was one hook registered
- eq_(len(pman.hooks), 1)
+ assert len(pman.hooks) == 1
# Make sure _setup_plugin_called was called once
import mediagoblin.plugins.sampleplugin
- eq_(mediagoblin.plugins.sampleplugin._setup_plugin_called, 1)
+ assert mediagoblin.plugins.sampleplugin._setup_plugin_called == 1
@with_cleanup()
@@ -172,4 +171,4 @@ def test_disabled_plugin():
setup_plugins()
# Make sure we didn't load the plugin
- eq_(len(pman.plugins), 0)
+ assert len(pman.plugins) == 0
diff --git a/mediagoblin/tests/test_storage.py b/mediagoblin/tests/test_storage.py
index 61326ae9..749f7b07 100644
--- a/mediagoblin/tests/test_storage.py
+++ b/mediagoblin/tests/test_storage.py
@@ -18,7 +18,7 @@
import os
import tempfile
-from nose.tools import assert_raises, assert_equal, assert_true
+import pytest
from werkzeug.utils import secure_filename
from mediagoblin import storage
@@ -41,10 +41,8 @@ def test_clean_listy_filepath():
assert storage.clean_listy_filepath(
['../../../etc/', 'passwd']) == expected
- assert_raises(
- storage.InvalidFilepath,
- storage.clean_listy_filepath,
- ['../../', 'linooks.jpg'])
+ with pytest.raises(storage.InvalidFilepath):
+ storage.clean_listy_filepath(['../../', 'linooks.jpg'])
class FakeStorageSystem():
@@ -78,10 +76,10 @@ def test_storage_system_from_config():
'garbage_arg': 'garbage_arg',
'storage_class':
'mediagoblin.tests.test_storage:FakeStorageSystem'})
- assert_equal(this_storage.foobie, 'eiboof')
- assert_equal(this_storage.blech, 'hcelb')
- assert_equal(unicode(this_storage.__class__),
- u'mediagoblin.tests.test_storage.FakeStorageSystem')
+ assert this_storage.foobie == 'eiboof'
+ assert this_storage.blech == 'hcelb'
+ assert unicode(this_storage.__class__) == \
+ u'mediagoblin.tests.test_storage.FakeStorageSystem'
##########################
@@ -89,7 +87,7 @@ def test_storage_system_from_config():
##########################
def get_tmp_filestorage(mount_url=None, fake_remote=False):
- tmpdir = tempfile.mkdtemp()
+ tmpdir = tempfile.mkdtemp(prefix="test_gmg_storage")
if fake_remote:
this_storage = FakeRemoteStorage(tmpdir, mount_url)
else:
@@ -108,11 +106,13 @@ def test_basic_storage__resolve_filepath():
assert result == os.path.join(
tmpdir, 'etc/passwd')
- assert_raises(
+ pytest.raises(
storage.InvalidFilepath,
this_storage._resolve_filepath,
['../../', 'etc', 'passwd'])
+ os.rmdir(tmpdir)
+
def test_basic_storage_file_exists():
tmpdir, this_storage = get_tmp_filestorage()
@@ -126,6 +126,8 @@ def test_basic_storage_file_exists():
assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol'])
assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol'])
+ this_storage.delete_file(['dir1', 'dir2', 'filename.txt'])
+
def test_basic_storage_get_unique_filepath():
tmpdir, this_storage = get_tmp_filestorage()
@@ -146,6 +148,8 @@ def test_basic_storage_get_unique_filepath():
assert len(new_filename) > len('filename.txt')
assert new_filename == secure_filename(new_filename)
+ os.remove(filename)
+
def test_basic_storage_get_file():
tmpdir, this_storage = get_tmp_filestorage()
@@ -182,6 +186,10 @@ def test_basic_storage_get_file():
with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile:
assert testyfile.read() == 'testy file! so testy.'
+ this_storage.delete_file(filepath)
+ this_storage.delete_file(new_filepath)
+ this_storage.delete_file(['testydir', 'testyfile.txt'])
+
def test_basic_storage_delete_file():
tmpdir, this_storage = get_tmp_filestorage()
@@ -205,10 +213,11 @@ def test_basic_storage_delete_file():
def test_basic_storage_url_for_file():
# Not supplying a base_url should actually just bork.
tmpdir, this_storage = get_tmp_filestorage()
- assert_raises(
+ pytest.raises(
storage.NoWebServing,
this_storage.file_url,
['dir1', 'dir2', 'filename.txt'])
+ os.rmdir(tmpdir)
# base_url without domain
tmpdir, this_storage = get_tmp_filestorage('/media/')
@@ -216,6 +225,7 @@ def test_basic_storage_url_for_file():
['dir1', 'dir2', 'filename.txt'])
expected = '/media/dir1/dir2/filename.txt'
assert result == expected
+ os.rmdir(tmpdir)
# base_url with domain
tmpdir, this_storage = get_tmp_filestorage(
@@ -224,6 +234,7 @@ def test_basic_storage_url_for_file():
['dir1', 'dir2', 'filename.txt'])
expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt'
assert result == expected
+ os.rmdir(tmpdir)
def test_basic_storage_get_local_path():
@@ -237,10 +248,13 @@ def test_basic_storage_get_local_path():
assert result == expected
+ os.rmdir(tmpdir)
+
def test_basic_storage_is_local():
tmpdir, this_storage = get_tmp_filestorage()
assert this_storage.local_storage is True
+ os.rmdir(tmpdir)
def test_basic_storage_copy_locally():
@@ -255,9 +269,13 @@ def test_basic_storage_copy_locally():
new_file_dest = os.path.join(dest_tmpdir, 'file2.txt')
this_storage.copy_locally(filepath, new_file_dest)
+ this_storage.delete_file(filepath)
assert file(new_file_dest).read() == 'Testing this file'
+ os.remove(new_file_dest)
+ os.rmdir(dest_tmpdir)
+
def _test_copy_local_to_storage_works(tmpdir, this_storage):
local_filename = tempfile.mktemp()
@@ -267,10 +285,14 @@ def _test_copy_local_to_storage_works(tmpdir, this_storage):
this_storage.copy_local_to_storage(
local_filename, ['dir1', 'dir2', 'copiedto.txt'])
+ os.remove(local_filename)
+
assert file(
os.path.join(tmpdir, 'dir1/dir2/copiedto.txt'),
'r').read() == 'haha'
+ this_storage.delete_file(['dir1', 'dir2', 'copiedto.txt'])
+
def test_basic_storage_copy_local_to_storage():
tmpdir, this_storage = get_tmp_filestorage()
diff --git a/mediagoblin/tests/test_submission.py b/mediagoblin/tests/test_submission.py
index fc3d8c83..ac714252 100644
--- a/mediagoblin/tests/test_submission.py
+++ b/mediagoblin/tests/test_submission.py
@@ -21,11 +21,9 @@ sys.setdefaultencoding('utf-8')
import urlparse
import os
-from nose.tools import assert_equal, assert_true
from pkg_resources import resource_filename
-from mediagoblin.tests.tools import get_app, \
- fixture_add_user
+from mediagoblin.tests.tools import fixture_add_user
from mediagoblin import mg_globals
from mediagoblin.db.models import MediaEntry
from mediagoblin.tools import template
@@ -51,8 +49,8 @@ REQUEST_CONTEXT = ['mediagoblin/user_pages/user.html', 'request']
class TestSubmission:
- def setUp(self):
- self.test_app = get_app(dump_old_app=False)
+ def _setup(self, test_app):
+ self.test_app = test_app
# TODO: Possibly abstract into a decorator like:
# @as_authenticated_user('chris')
@@ -88,27 +86,29 @@ class TestSubmission:
def check_comments(self, request, media_id, count):
comments = request.db.MediaComment.find({'media_entry': media_id})
- assert_equal(count, len(list(comments)))
+ assert count == len(list(comments))
+
+ def test_missing_fields(self, test_app):
+ self._setup(test_app)
- def test_missing_fields(self):
# Test blank form
# ---------------
response, form = self.do_post({}, *FORM_CONTEXT)
- assert_equal(form.file.errors, [u'You must provide a file.'])
+ assert form.file.errors == [u'You must provide a file.']
# Test blank file
# ---------------
response, form = self.do_post({'title': u'test title'}, *FORM_CONTEXT)
- assert_equal(form.file.errors, [u'You must provide a file.'])
+ assert form.file.errors == [u'You must provide a file.']
def check_url(self, response, path):
- assert_equal(urlparse.urlsplit(response.location)[2], path)
+ assert urlparse.urlsplit(response.location)[2] == path
def check_normal_upload(self, title, filename):
response, context = self.do_post({'title': title}, do_follow=True,
**self.upload_data(filename))
self.check_url(response, '/u/{0}/'.format(self.test_user.username))
- assert_true('mediagoblin/user_pages/user.html' in context)
+ assert 'mediagoblin/user_pages/user.html' in context
# Make sure the media view is at least reachable, logged in...
url = '/u/{0}/m/{1}/'.format(self.test_user.username,
title.lower().replace(' ', '-'))
@@ -117,21 +117,25 @@ class TestSubmission:
self.logout()
self.test_app.get(url)
- def test_normal_jpg(self):
+ def test_normal_jpg(self, test_app):
+ self._setup(test_app)
self.check_normal_upload(u'Normal upload 1', GOOD_JPG)
- def test_normal_png(self):
+ def test_normal_png(self, test_app):
+ self._setup(test_app)
self.check_normal_upload(u'Normal upload 2', GOOD_PNG)
def check_media(self, request, find_data, count=None):
media = MediaEntry.find(find_data)
if count is not None:
- assert_equal(media.count(), count)
+ assert media.count() == count
if count == 0:
return
return media[0]
- def test_tags(self):
+ def test_tags(self, test_app):
+ self._setup(test_app)
+
# Good tag string
# --------
response, request = self.do_post({'title': u'Balanced Goblin 2',
@@ -151,12 +155,14 @@ class TestSubmission:
'tags': BAD_TAG_STRING},
*FORM_CONTEXT,
**self.upload_data(GOOD_JPG))
- assert_equal(form.tags.errors, [
+ assert form.tags.errors == [
u'Tags must be shorter than 50 characters. ' \
'Tags that are too long: ' \
- 'ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu'])
+ 'ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu']
+
+ def test_delete(self, test_app):
+ self._setup(test_app)
- def test_delete(self):
response, request = self.do_post({'title': u'Balanced Goblin'},
*REQUEST_CONTEXT, do_follow=True,
**self.upload_data(GOOD_JPG))
@@ -173,7 +179,7 @@ class TestSubmission:
'slug': u"Balanced=Goblin",
'tags': u''})
media = self.check_media(request, {'title': u'Balanced Goblin'}, 1)
- assert_equal(media.slug, u"balanced-goblin")
+ assert media.slug == u"balanced-goblin"
# Add a comment, so we can test for its deletion later.
self.check_comments(request, media_id, 0)
@@ -201,33 +207,39 @@ class TestSubmission:
self.check_media(request, {'id': media_id}, 0)
self.check_comments(request, media_id, 0)
- def test_evil_file(self):
+ def test_evil_file(self, test_app):
+ self._setup(test_app)
+
# Test non-suppoerted file with non-supported extension
# -----------------------------------------------------
response, form = self.do_post({'title': u'Malicious Upload 1'},
*FORM_CONTEXT,
**self.upload_data(EVIL_FILE))
- assert_equal(len(form.file.errors), 1)
+ assert len(form.file.errors) == 1
assert 'Sorry, I don\'t support that file type :(' == \
str(form.file.errors[0])
- def test_get_media_manager(self):
+ def test_get_media_manager(self, test_app):
"""Test if the get_media_manger function returns sensible things
"""
+ self._setup(test_app)
+
response, request = self.do_post({'title': u'Balanced Goblin'},
*REQUEST_CONTEXT, do_follow=True,
**self.upload_data(GOOD_JPG))
media = self.check_media(request, {'title': u'Balanced Goblin'}, 1)
- assert_equal(media.media_type, u'mediagoblin.media_types.image')
- assert_equal(media.media_manager, img_MEDIA_MANAGER)
+ assert media.media_type == u'mediagoblin.media_types.image'
+ assert media.media_manager == img_MEDIA_MANAGER
- def test_sniffing(self):
+ def test_sniffing(self, test_app):
'''
Test sniffing mechanism to assert that regular uploads work as intended
'''
+ self._setup(test_app)
+
template.clear_test_template_context()
response = self.test_app.post(
'/submit/', {
@@ -254,25 +266,36 @@ class TestSubmission:
**self.upload_data(filename))
self.check_url(response, '/u/{0}/'.format(self.test_user.username))
entry = mg_globals.database.MediaEntry.find_one({'title': title})
- assert_equal(entry.state, 'failed')
- assert_equal(entry.fail_error, u'mediagoblin.processing:BadMediaFail')
+ assert entry.state == 'failed'
+ assert entry.fail_error == u'mediagoblin.processing:BadMediaFail'
+
+ def test_evil_jpg(self, test_app):
+ self._setup(test_app)
- def test_evil_jpg(self):
# Test non-supported file with .jpg extension
# -------------------------------------------
self.check_false_image(u'Malicious Upload 2', EVIL_JPG)
- def test_evil_png(self):
+ def test_evil_png(self, test_app):
+ self._setup(test_app)
+
# Test non-supported file with .png extension
# -------------------------------------------
self.check_false_image(u'Malicious Upload 3', EVIL_PNG)
- def test_media_data(self):
+ def test_media_data(self, test_app):
+ self._setup(test_app)
+
self.check_normal_upload(u"With GPS data", GPS_JPG)
media = self.check_media(None, {"title": u"With GPS data"}, 1)
- assert_equal(media.media_data.gps_latitude, 59.336666666666666)
+ assert media.media_data.gps_latitude == 59.336666666666666
+
+ def test_processing(self, test_app):
+ self._setup(test_app)
+
+ public_store_dir = mg_globals.global_config[
+ 'storage:publicstore']['base_dir']
- def test_processing(self):
data = {'title': u'Big Blue'}
response, request = self.do_post(data, *REQUEST_CONTEXT, do_follow=True,
**self.upload_data(BIG_BLUE))
@@ -282,12 +305,11 @@ class TestSubmission:
('medium', 'bigblue.medium.png'),
('thumb', 'bigblue.thumbnail.png')):
# Does the processed image have a good filename?
- filename = resource_filename(
- 'mediagoblin.tests',
- os.path.join('test_user_dev/media/public',
- *media.media_files.get(key, [])))
- assert_true(filename.endswith('_' + basename))
+ filename = os.path.join(
+ public_store_dir,
+ *media.media_files.get(key, []))
+ assert filename.endswith('_' + basename)
# Is it smaller than the last processed image we looked at?
size = os.stat(filename).st_size
- assert_true(last_size > size)
+ assert last_size > size
last_size = size
diff --git a/mediagoblin/tests/test_tags.py b/mediagoblin/tests/test_tags.py
index ccb93085..e25cc283 100644
--- a/mediagoblin/tests/test_tags.py
+++ b/mediagoblin/tests/test_tags.py
@@ -14,17 +14,15 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from mediagoblin.tests.tools import get_app
from mediagoblin.tools import text
-def test_list_of_dicts_conversion():
+def test_list_of_dicts_conversion(test_app):
"""
When the user adds tags to a media entry, the string from the form is
converted into a list of tags, where each tag is stored in the database
as a dict. Each tag dict should contain the tag's name and slug. Another
function performs the reverse operation when populating a form to edit tags.
"""
- test_app = get_app(dump_old_app=False)
# Leading, trailing, and internal whitespace should be removed and slugified
assert text.convert_to_tag_list_of_dicts('sleep , 6 AM, chainsaw! ') == [
{'name': u'sleep', 'slug': u'sleep'},
diff --git a/mediagoblin/tests/test_workbench.py b/mediagoblin/tests/test_workbench.py
index 636c8689..9cd49671 100644
--- a/mediagoblin/tests/test_workbench.py
+++ b/mediagoblin/tests/test_workbench.py
@@ -25,9 +25,14 @@ from mediagoblin.tests.test_storage import get_tmp_filestorage
class TestWorkbench(object):
- def setUp(self):
+ def setup(self):
+ self.workbench_base = tempfile.mkdtemp(prefix='gmg_workbench_testing')
self.workbench_manager = workbench.WorkbenchManager(
- os.path.join(tempfile.gettempdir(), u'mgoblin_workbench_testing'))
+ self.workbench_base)
+
+ def teardown(self):
+ # If the workbench is empty, this should work.
+ os.rmdir(self.workbench_base)
def test_create_workbench(self):
workbench = self.workbench_manager.create()
@@ -70,6 +75,7 @@ class TestWorkbench(object):
filename = this_workbench.localized_file(this_storage, filepath)
assert filename == os.path.join(
tmpdir, 'dir1/dir2/ourfile.txt')
+ this_storage.delete_file(filepath)
# with a fake remote file storage
tmpdir, this_storage = get_tmp_filestorage(fake_remote=True)
@@ -95,6 +101,9 @@ class TestWorkbench(object):
assert filename == os.path.join(
this_workbench.dir, 'thisfile.text')
+ this_storage.delete_file(filepath)
+ this_workbench.destroy()
+
def test_workbench_decorator(self):
"""Test @get_workbench decorator and automatic cleanup"""
# The decorator needs mg_globals.workbench_manager
diff --git a/mediagoblin/tests/tools.py b/mediagoblin/tests/tools.py
index 1d8e6e96..a0498a6e 100644
--- a/mediagoblin/tests/tools.py
+++ b/mediagoblin/tests/tools.py
@@ -43,14 +43,14 @@ TEST_APP_CONFIG = pkg_resources.resource_filename(
'mediagoblin.tests', 'test_mgoblin_app.ini')
TEST_USER_DEV = pkg_resources.resource_filename(
'mediagoblin.tests', 'test_user_dev')
-MGOBLIN_APP = None
+
USER_DEV_DIRECTORIES_TO_SETUP = ['media/public', 'media/queue']
BAD_CELERY_MESSAGE = """\
-Sorry, you *absolutely* must run nosetests with the
+Sorry, you *absolutely* must run tests with the
mediagoblin.init.celery.from_tests module. Like so:
-$ CELERY_CONFIG_MODULE=mediagoblin.init.celery.from_tests ./bin/nosetests"""
+$ CELERY_CONFIG_MODULE=mediagoblin.init.celery.from_tests ./bin/py.test"""
class BadCeleryEnviron(Exception): pass
@@ -101,7 +101,30 @@ def suicide_if_bad_celery_environ():
raise BadCeleryEnviron(BAD_CELERY_MESSAGE)
-def get_app(dump_old_app=True):
+def get_app(request, paste_config=None, mgoblin_config=None):
+ """Create a MediaGoblin app for testing.
+
+ Args:
+ - request: Not an http request, but a pytest fixture request. We
+ use this to make temporary directories that pytest
+ automatically cleans up as needed.
+ - paste_config: particular paste config used by this application.
+ - mgoblin_config: particular mediagoblin config used by this
+ application.
+ """
+ paste_config = paste_config or TEST_SERVER_CONFIG
+ mgoblin_config = mgoblin_config or TEST_APP_CONFIG
+
+ # This is the directory we're copying the paste/mgoblin config stuff into
+ run_dir = request.config._tmpdirhandler.mktemp(
+ 'mgoblin_app', numbered=True)
+ user_dev_dir = run_dir.mkdir('test_user_dev').strpath
+
+ new_paste_config = run_dir.join('paste.ini').strpath
+ new_mgoblin_config = run_dir.join('mediagoblin.ini').strpath
+ shutil.copyfile(paste_config, new_paste_config)
+ shutil.copyfile(mgoblin_config, new_mgoblin_config)
+
suicide_if_bad_celery_environ()
# Make sure we've turned on testing
@@ -110,26 +133,16 @@ def get_app(dump_old_app=True):
# Leave this imported as it sets up celery.
from mediagoblin.init.celery import from_tests
- global MGOBLIN_APP
-
- # Just return the old app if that exists and it's okay to set up
- # and return
- if MGOBLIN_APP and not dump_old_app:
- return MGOBLIN_APP
-
Session.rollback()
Session.remove()
- # Remove and reinstall user_dev directories
- if os.path.exists(TEST_USER_DEV):
- shutil.rmtree(TEST_USER_DEV)
-
+ # install user_dev directories
for directory in USER_DEV_DIRECTORIES_TO_SETUP:
- full_dir = os.path.join(TEST_USER_DEV, directory)
+ full_dir = os.path.join(user_dev_dir, directory)
os.makedirs(full_dir)
# Get app config
- global_config, validation_result = read_mediagoblin_config(TEST_APP_CONFIG)
+ global_config, validation_result = read_mediagoblin_config(new_mgoblin_config)
app_config = global_config['mediagoblin']
# Run database setup/migrations
@@ -137,7 +150,7 @@ def get_app(dump_old_app=True):
# setup app and return
test_app = loadapp(
- 'config:' + TEST_SERVER_CONFIG)
+ 'config:' + new_paste_config)
# Re-setup celery
setup_celery_app(app_config, global_config)
@@ -149,26 +162,10 @@ def get_app(dump_old_app=True):
mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app))
app = TestApp(test_app)
- MGOBLIN_APP = app
return app
-def setup_fresh_app(func):
- """
- Decorator to setup a fresh test application for this function.
-
- Cleans out test buckets and passes in a new, fresh test_app.
- """
- @wraps(func)
- def wrapper(*args, **kwargs):
- test_app = get_app()
- testing.clear_test_buckets()
- return func(test_app, *args, **kwargs)
-
- return wrapper
-
-
def install_fixtures_simple(db, fixtures):
"""
Very simply install fixtures in the database
diff --git a/mediagoblin/user_pages/views.py b/mediagoblin/user_pages/views.py
index c611daa1..61c23f16 100644
--- a/mediagoblin/user_pages/views.py
+++ b/mediagoblin/user_pages/views.py
@@ -204,11 +204,11 @@ def media_collect(request, media):
# If we are here, method=POST and the form is valid, submit things.
# If the user is adding a new collection, use that:
- if request.form['collection_title']:
+ if form.collection_title.data:
# Make sure this user isn't duplicating an existing collection
existing_collection = Collection.query.filter_by(
creator=request.user.id,
- title=request.form['collection_title']).first()
+ title=form.collection_title.data).first()
if existing_collection:
messages.add_message(request, messages.ERROR,
_('You already have a collection called "%s"!')
@@ -218,8 +218,8 @@ def media_collect(request, media):
media=media.slug_or_id)
collection = Collection()
- collection.title = request.form['collection_title']
- collection.description = request.form.get('collection_description')
+ collection.title = form.collection_title.data
+ collection.description = form.collection_description.data
collection.creator = request.user.id
collection.generate_slug()
collection.save()
@@ -251,7 +251,7 @@ def media_collect(request, media):
collection_item = request.db.CollectionItem()
collection_item.collection = collection.id
collection_item.media_entry = media.id
- collection_item.note = request.form['note']
+ collection_item.note = form.note.data
collection_item.save()
collection.items = collection.items + 1