diff options
Diffstat (limited to 'mediagoblin')
42 files changed, 615 insertions, 722 deletions
diff --git a/mediagoblin/auth/views.py b/mediagoblin/auth/views.py index d8ad7b51..354b48c1 100644 --- a/mediagoblin/auth/views.py +++ b/mediagoblin/auth/views.py @@ -78,7 +78,7 @@ def register(request): user.username = register_form.data['username'] user.email = register_form.data['email'] user.pw_hash = auth_lib.bcrypt_gen_password_hash( - request.form['password']) + register_form.password.data) user.verification_key = unicode(uuid.uuid4()) user.save() @@ -116,7 +116,7 @@ def login(request): if login_form.validate(): user = User.query.filter_by(username=login_form.data['username']).first() - if user and user.check_login(request.form['password']): + if user and user.check_login(login_form.password.data): # set up login in session request.session['user_id'] = unicode(user.id) request.session.save() @@ -196,7 +196,7 @@ def resend_activation(request): request, messages.ERROR, _('You must be logged in so we know who to send the email to!')) - + return redirect(request, 'mediagoblin.auth.login') if request.user.email_verified: @@ -204,12 +204,12 @@ def resend_activation(request): request, messages.ERROR, _("You've already verified your email address!")) - + return redirect(request, "mediagoblin.user_pages.user_home", user=request.user['username']) request.user.verification_key = unicode(uuid.uuid4()) request.user.save() - + email_debug_message(request) send_verification_email(request.user, request) @@ -241,11 +241,11 @@ def forgot_password(request): # has been sanitized. Store if a user was found by email. We should # not reveal if the operation was successful then as we don't want to # leak if an email address exists in the system. - found_by_email = '@' in request.form['username'] + found_by_email = '@' in fp_form.username.data if found_by_email: user = User.query.filter_by( - email = request.form['username']).first() + email = fp_form.username.data).first() # Don't reveal success in case the lookup happened by email address. success_message=_("If that email address (case sensitive!) is " "registered an email has been sent with instructions " @@ -253,7 +253,7 @@ def forgot_password(request): else: # found by username user = User.query.filter_by( - username = request.form['username']).first() + username = fp_form.username.data).first() if user is None: messages.add_message(request, @@ -317,7 +317,7 @@ def verify_forgot_password(request): if request.method == 'POST' and cp_form.validate(): user.pw_hash = auth_lib.bcrypt_gen_password_hash( - request.form['password']) + cp_form.password.data) user.fp_verification_key = None user.fp_token_expire = None user.save() diff --git a/mediagoblin/db/mixin.py b/mediagoblin/db/mixin.py index fdf61e8d..0dc3bc85 100644 --- a/mediagoblin/db/mixin.py +++ b/mediagoblin/db/mixin.py @@ -52,10 +52,18 @@ class UserMixin(object): return cleaned_markdown_conversion(self.bio) -class MediaEntryMixin(object): +class GenerateSlugMixin(object): + """ + Mixin to add a generate_slug method to objects. + + Depends on: + - self.slug + - self.title + - self.check_slug_used(new_slug) + """ def generate_slug(self): """ - Generate a unique slug for this MediaEntry. + Generate a unique slug for this object. This one does not *force* slugs, but usually it will probably result in a niceish one. @@ -76,19 +84,15 @@ class MediaEntryMixin(object): generated bits until it's unique. That'll be a little bit of junk, but at least it has the basis of a nice slug. """ - # import this here due to a cyclic import issue - # (db.models -> db.mixin -> db.util -> db.models) - from mediagoblin.db.util import check_media_slug_used - #Is already a slug assigned? Check if it is valid if self.slug: self.slug = slugify(self.slug) - + # otherwise, try to use the title. elif self.title: # assign slug based on title self.slug = slugify(self.title) - + # We don't want any empty string slugs if self.slug == u"": self.slug = None @@ -98,26 +102,34 @@ class MediaEntryMixin(object): # so just return... we're not going to force one. if not self.slug: return # giving up! - + # Otherwise, let's see if this is unique. - if check_media_slug_used(self.uploader, self.slug, self.id): + if self.check_slug_used(self.slug): # It looks like it's being used... lame. - + # Can we just append the object's id to the end? if self.id: slug_with_id = u"%s-%s" % (self.slug, self.id) - if not check_media_slug_used(self.uploader, - slug_with_id, self.id): + if not self.check_slug_used(slug_with_id): self.slug = slug_with_id return # success! - + # okay, still no success; # let's whack junk on there till it's unique. self.slug += '-' + uuid.uuid4().hex[:4] # keep going if necessary! - while check_media_slug_used(self.uploader, self.slug, self.id): + while self.check_slug_used(self.slug): self.slug += uuid.uuid4().hex[:4] + +class MediaEntryMixin(GenerateSlugMixin): + def check_slug_used(self, slug): + # import this here due to a cyclic import issue + # (db.models -> db.mixin -> db.util -> db.models) + from mediagoblin.db.util import check_media_slug_used + + return check_media_slug_used(self.uploader, slug, self.id) + @property def description_html(self): """ @@ -238,22 +250,13 @@ class MediaCommentMixin(object): return cleaned_markdown_conversion(self.content) -class CollectionMixin(object): - def generate_slug(self): +class CollectionMixin(GenerateSlugMixin): + def check_slug_used(self, slug): # import this here due to a cyclic import issue # (db.models -> db.mixin -> db.util -> db.models) from mediagoblin.db.util import check_collection_slug_used - self.slug = slugify(self.title) - - duplicate = check_collection_slug_used(mg_globals.database, - self.creator, self.slug, self.id) - - if duplicate: - if self.id is not None: - self.slug = u"%s-%s" % (self.id, self.slug) - else: - self.slug = None + return check_collection_slug_used(self.creator, slug, self.id) @property def description_html(self): diff --git a/mediagoblin/db/models.py b/mediagoblin/db/models.py index 2f58503f..fcfd0f61 100644 --- a/mediagoblin/db/models.py +++ b/mediagoblin/db/models.py @@ -172,8 +172,7 @@ class MediaEntry(Base, MediaEntryMixin): order_col = MediaComment.created if not ascending: order_col = desc(order_col) - return MediaComment.query.filter_by( - media_entry=self.id).order_by(order_col) + return self.all_comments.order_by(order_col) def url_to_prev(self, urlgen): """get the next 'newer' entry by this user""" @@ -238,9 +237,7 @@ class MediaEntry(Base, MediaEntryMixin): :param del_orphan_tags: True/false if we delete unused Tags too :param commit: True/False if this should end the db transaction""" # User's CollectionItems are automatically deleted via "cascade". - # Delete all the associated comments - for comment in self.get_comments(): - comment.delete(commit=False) + # Comments on this Media are deleted by cascade, hopefully. # Delete all related files/attachments try: @@ -385,13 +382,22 @@ class MediaComment(Base, MediaCommentMixin): content = Column(UnicodeText, nullable=False) # Cascade: Comments are owned by their creator. So do the full thing. - # lazy=dynamic: People might post a *lot* of comments, so make - # the "posted_comments" a query-like thing. + # lazy=dynamic: People might post a *lot* of comments, + # so make the "posted_comments" a query-like thing. get_author = relationship(User, backref=backref("posted_comments", lazy="dynamic", cascade="all, delete-orphan")) + # Cascade: Comments are somewhat owned by their MediaEntry. + # So do the full thing. + # lazy=dynamic: MediaEntries might have many comments, + # so make the "all_comments" a query-like thing. + get_media_entry = relationship(MediaEntry, + backref=backref("all_comments", + lazy="dynamic", + cascade="all, delete-orphan")) + class Collection(Base, CollectionMixin): """An 'album' or 'set' of media by a user. diff --git a/mediagoblin/db/util.py b/mediagoblin/db/util.py index 529ef8b9..6ffec44d 100644 --- a/mediagoblin/db/util.py +++ b/mediagoblin/db/util.py @@ -59,7 +59,7 @@ def clean_orphan_tags(commit=True): Session.commit() -def check_collection_slug_used(dummy_db, creator_id, slug, ignore_c_id): +def check_collection_slug_used(creator_id, slug, ignore_c_id): filt = (Collection.creator == creator_id) \ & (Collection.slug == slug) if ignore_c_id is not None: diff --git a/mediagoblin/edit/views.py b/mediagoblin/edit/views.py index cdb5c713..34b7aaca 100644 --- a/mediagoblin/edit/views.py +++ b/mediagoblin/edit/views.py @@ -26,7 +26,7 @@ from mediagoblin.auth import lib as auth_lib from mediagoblin.edit import forms from mediagoblin.edit.lib import may_edit_media from mediagoblin.decorators import (require_active_login, active_user_from_url, - get_media_entry_by_id, + get_media_entry_by_id, user_may_alter_collection, get_user_collection) from mediagoblin.tools.response import render_to_response, redirect from mediagoblin.tools.translate import pass_to_ugettext as _ @@ -58,19 +58,19 @@ def edit_media(request, media): if request.method == 'POST' and form.validate(): # Make sure there isn't already a MediaEntry with such a slug # and userid. - slug = slugify(request.form['slug']) + slug = slugify(form.slug.data) slug_used = check_media_slug_used(media.uploader, slug, media.id) if slug_used: form.slug.errors.append( _(u'An entry with that slug already exists for this user.')) else: - media.title = request.form['title'] - media.description = request.form.get('description') + media.title = form.title.data + media.description = form.description.data media.tags = convert_to_tag_list_of_dicts( - request.form.get('tags')) + form.tags.data) - media.license = unicode(request.form.get('license', '')) or None + media.license = unicode(form.license.data) or None media.slug = slug media.save() @@ -142,7 +142,7 @@ def edit_attachments(request, media): request.files['attachment_file'].stream.close() media.attachment_files.append(dict( - name=request.form['attachment_name'] \ + name=form.attachment_name.data \ or request.files['attachment_file'].filename, filepath=attachment_public_filepath, created=datetime.utcnow(), @@ -153,7 +153,7 @@ def edit_attachments(request, media): messages.add_message( request, messages.SUCCESS, _("You added the attachment %s!") \ - % (request.form['attachment_name'] + % (form.attachment_name.data or request.files['attachment_file'].filename)) return redirect(request, @@ -194,8 +194,8 @@ def edit_profile(request, url_user=None): bio=user.bio) if request.method == 'POST' and form.validate(): - user.url = unicode(request.form['url']) - user.bio = unicode(request.form['bio']) + user.url = unicode(form.url.data) + user.bio = unicode(form.bio.data) user.save() @@ -308,26 +308,26 @@ def edit_collection(request, collection): if request.method == 'POST' and form.validate(): # Make sure there isn't already a Collection with such a slug # and userid. - slug_used = check_collection_slug_used(request.db, collection.creator, - request.form['slug'], collection.id) + slug_used = check_collection_slug_used(collection.creator, + form.slug.data, collection.id) # Make sure there isn't already a Collection with this title existing_collection = request.db.Collection.find_one({ 'creator': request.user.id, - 'title':request.form['title']}) + 'title':form.title.data}) if existing_collection and existing_collection.id != collection.id: messages.add_message( request, messages.ERROR, _('You already have a collection called "%s"!') % \ - request.form['title']) + form.title.data) elif slug_used: form.slug.errors.append( _(u'A collection with that slug already exists for this user.')) else: - collection.title = unicode(request.form['title']) - collection.description = unicode(request.form.get('description')) - collection.slug = unicode(request.form['slug']) + collection.title = unicode(form.title.data) + collection.description = unicode(form.description.data) + collection.slug = unicode(form.slug.data) collection.save() diff --git a/mediagoblin/media_types/ascii/processing.py b/mediagoblin/media_types/ascii/processing.py index 382cd015..309aab0a 100644 --- a/mediagoblin/media_types/ascii/processing.py +++ b/mediagoblin/media_types/ascii/processing.py @@ -127,8 +127,14 @@ def process_ascii(proc_state): 'ascii', 'xmlcharrefreplace')) - mgg.queue_store.delete_file(queued_filepath) + # Remove queued media file from storage and database. + # queued_filepath is in the task_id directory which should + # be removed too, but fail if the directory is not empty to be on + # the super-safe side. + mgg.queue_store.delete_file(queued_filepath) # rm file + mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir entry.queued_media_file = [] + media_files_dict = entry.setdefault('media_files', {}) media_files_dict['thumb'] = thumb_filepath media_files_dict['unicode'] = unicode_filepath diff --git a/mediagoblin/media_types/audio/processing.py b/mediagoblin/media_types/audio/processing.py index 5dffcaf9..101b83e5 100644 --- a/mediagoblin/media_types/audio/processing.py +++ b/mediagoblin/media_types/audio/processing.py @@ -147,4 +147,10 @@ def process_audio(proc_state): else: entry.media_files['thumb'] = ['fake', 'thumb', 'path.jpg'] - mgg.queue_store.delete_file(queued_filepath) + # Remove queued media file from storage and database. + # queued_filepath is in the task_id directory which should + # be removed too, but fail if the directory is not empty to be on + # the super-safe side. + mgg.queue_store.delete_file(queued_filepath) # rm file + mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir + entry.queued_media_file = [] diff --git a/mediagoblin/media_types/image/processing.py b/mediagoblin/media_types/image/processing.py index ca88d3f4..e951ef29 100644 --- a/mediagoblin/media_types/image/processing.py +++ b/mediagoblin/media_types/image/processing.py @@ -122,6 +122,7 @@ def process_image(proc_state): exif_tags, conversions_subdir, (mgg.global_config['media:thumb']['max_width'], mgg.global_config['media:thumb']['max_height'])) + entry.media_files[u'thumb'] = thumb_filepath # If the size of the original file exceeds the specified size of a `medium` # file, a `.medium.jpg` files is created and later associated with the media @@ -137,8 +138,7 @@ def process_image(proc_state): exif_tags, conversions_subdir, (mgg.global_config['media:medium']['max_width'], mgg.global_config['media:medium']['max_height'])) - else: - medium_filepath = None + entry.media_files[u'medium'] = medium_filepath # Copy our queued local workbench to its final destination proc_state.copy_original(name_builder.fill('{basename}{ext}')) @@ -146,12 +146,6 @@ def process_image(proc_state): # Remove queued media file from storage and database proc_state.delete_queue_file() - # Insert media file information into database - media_files_dict = entry.setdefault('media_files', {}) - media_files_dict[u'thumb'] = thumb_filepath - if medium_filepath: - media_files_dict[u'medium'] = medium_filepath - # Insert exif data into database exif_all = clean_exif(exif_tags) diff --git a/mediagoblin/media_types/stl/model_loader.py b/mediagoblin/media_types/stl/model_loader.py index 60fa4851..88f19314 100644 --- a/mediagoblin/media_types/stl/model_loader.py +++ b/mediagoblin/media_types/stl/model_loader.py @@ -80,6 +80,7 @@ class ObjModel(ThreeDee): def load(self, fileob): for line in fileob: + line = line.strip() if line[0] == "v": self.verts.append(self.__vector(line)) @@ -121,6 +122,8 @@ def auto_detect(fileob, hint): pass except ValueError: pass + except IndexError: + pass try: # It is pretty important that the binary stl model loader # is tried second, because its possible for it to parse diff --git a/mediagoblin/media_types/stl/processing.py b/mediagoblin/media_types/stl/processing.py index 77744ac5..e41df395 100644 --- a/mediagoblin/media_types/stl/processing.py +++ b/mediagoblin/media_types/stl/processing.py @@ -165,8 +165,12 @@ def process_stl(proc_state): with open(queued_filename, 'rb') as queued_file: model_file.write(queued_file.read()) - # Remove queued media file from storage and database - mgg.queue_store.delete_file(queued_filepath) + # Remove queued media file from storage and database. + # queued_filepath is in the task_id directory which should + # be removed too, but fail if the directory is not empty to be on + # the super-safe side. + mgg.queue_store.delete_file(queued_filepath) # rm file + mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir entry.queued_media_file = [] # Insert media file information into database diff --git a/mediagoblin/media_types/video/transcoders.py b/mediagoblin/media_types/video/transcoders.py index d8290d41..58b2c0d4 100644 --- a/mediagoblin/media_types/video/transcoders.py +++ b/mediagoblin/media_types/video/transcoders.py @@ -53,283 +53,6 @@ def pixbuf_to_pilbuf(buf): return data -class VideoThumbnailer: - # Declaration of thumbnailer states - STATE_NULL = 0 - STATE_HALTING = 1 - STATE_PROCESSING = 2 - - # The current thumbnailer state - - def __init__(self, source_path, dest_path): - ''' - Set up playbin pipeline in order to get video properties. - - Initializes and runs the gobject.MainLoop() - - Abstract - - Set up a playbin with a fake audio sink and video sink. Load the video - into the playbin - - Initialize - ''' - # This will contain the thumbnailing pipeline - self.state = self.STATE_NULL - self.thumbnail_pipeline = None - self.buffer_probes = {} - self.errors = [] - - self.source_path = source_path - self.dest_path = dest_path - - self.loop = gobject.MainLoop() - - # Set up the playbin. It will be used to discover certain - # properties of the input file - self.playbin = gst.element_factory_make('playbin') - - self.videosink = gst.element_factory_make('fakesink', 'videosink') - self.playbin.set_property('video-sink', self.videosink) - - self.audiosink = gst.element_factory_make('fakesink', 'audiosink') - self.playbin.set_property('audio-sink', self.audiosink) - - self.bus = self.playbin.get_bus() - self.bus.add_signal_watch() - self.watch_id = self.bus.connect('message', self._on_bus_message) - - self.playbin.set_property('uri', 'file:{0}'.format( - urllib.pathname2url(self.source_path))) - - self.playbin.set_state(gst.STATE_PAUSED) - - self.run() - - def run(self): - self.loop.run() - - def _on_bus_message(self, bus, message): - _log.debug(' thumbnail playbin: {0}'.format(message)) - - if message.type == gst.MESSAGE_ERROR: - _log.error('thumbnail playbin: {0}'.format(message)) - gobject.idle_add(self._on_bus_error) - - elif message.type == gst.MESSAGE_STATE_CHANGED: - # The pipeline state has changed - # Parse state changing data - _prev, state, _pending = message.parse_state_changed() - - _log.debug('State changed: {0}'.format(state)) - - if state == gst.STATE_PAUSED: - if message.src == self.playbin: - gobject.idle_add(self._on_bus_paused) - - def _on_bus_paused(self): - ''' - Set up thumbnailing pipeline - ''' - current_video = self.playbin.get_property('current-video') - - if current_video == 0: - _log.debug('Found current video from playbin') - else: - _log.error('Could not get any current video from playbin!') - - self.duration = self._get_duration(self.playbin) - _log.info('Video length: {0}'.format(self.duration / gst.SECOND)) - - _log.info('Setting up thumbnailing pipeline') - self.thumbnail_pipeline = gst.parse_launch( - 'filesrc location="{0}" ! decodebin ! ' - 'ffmpegcolorspace ! videoscale ! ' - 'video/x-raw-rgb,depth=24,bpp=24,pixel-aspect-ratio=1/1,width=180 ! ' - 'fakesink signal-handoffs=True'.format(self.source_path)) - - self.thumbnail_bus = self.thumbnail_pipeline.get_bus() - self.thumbnail_bus.add_signal_watch() - self.thumbnail_watch_id = self.thumbnail_bus.connect( - 'message', self._on_thumbnail_bus_message) - - self.thumbnail_pipeline.set_state(gst.STATE_PAUSED) - - #gobject.timeout_add(3000, self._on_timeout) - - return False - - def _on_thumbnail_bus_message(self, bus, message): - _log.debug('thumbnail: {0}'.format(message)) - - if message.type == gst.MESSAGE_ERROR: - _log.error(message) - gobject.idle_add(self._on_bus_error) - - if message.type == gst.MESSAGE_STATE_CHANGED: - _log.debug('State changed') - _prev, state, _pending = message.parse_state_changed() - - if (state == gst.STATE_PAUSED and - not self.state == self.STATE_PROCESSING and - message.src == self.thumbnail_pipeline): - _log.info('Pipeline paused, processing') - self.state = self.STATE_PROCESSING - - for sink in self.thumbnail_pipeline.sinks(): - name = sink.get_name() - factoryname = sink.get_factory().get_name() - - if factoryname == 'fakesink': - sinkpad = sink.get_pad('sink') - - self.buffer_probes[name] = sinkpad.add_buffer_probe( - self.buffer_probe_handler, name) - - _log.info('Added buffer probe') - - break - - # Apply the wadsworth constant, fallback to 1 second - # TODO: Will break if video is shorter than 1 sec - seek_amount = max(self.duration / 100 * 30, 1 * gst.SECOND) - - _log.debug('seek amount: {0}'.format(seek_amount)) - - seek_result = self.thumbnail_pipeline.seek( - 1.0, - gst.FORMAT_TIME, - gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_ACCURATE, - gst.SEEK_TYPE_SET, - seek_amount, - gst.SEEK_TYPE_NONE, - 0) - - if not seek_result: - self.errors.append('COULD_NOT_SEEK') - _log.error('Couldn\'t seek! result: {0}'.format( - seek_result)) - _log.info(message) - self.shutdown() - else: - _log.debug('Seek successful') - self.thumbnail_pipeline.set_state(gst.STATE_PAUSED) - else: - _log.debug('Won\'t seek: \t{0}\n\t{1}'.format( - self.state, - message.src)) - - def buffer_probe_handler_real(self, pad, buff, name): - ''' - Capture buffers as gdk_pixbufs when told to. - ''' - _log.info('Capturing frame') - try: - caps = buff.caps - if caps is None: - _log.error('No caps passed to buffer probe handler!') - self.shutdown() - return False - - _log.debug('caps: {0}'.format(caps)) - - filters = caps[0] - width = filters["width"] - height = filters["height"] - - im = Image.new('RGB', (width, height)) - - data = pixbuf_to_pilbuf(buff.data) - - im.putdata(data) - - im.save(self.dest_path) - - _log.info('Saved thumbnail') - - self.shutdown() - - except gst.QueryError as e: - _log.error('QueryError: {0}'.format(e)) - - return False - - def buffer_probe_handler(self, pad, buff, name): - ''' - Proxy function for buffer_probe_handler_real - ''' - _log.debug('Attaching real buffer handler to gobject idle event') - gobject.idle_add( - lambda: self.buffer_probe_handler_real(pad, buff, name)) - - return True - - def _get_duration(self, pipeline, retries=0): - ''' - Get the duration of a pipeline. - - Retries 5 times. - ''' - if retries == 5: - return 0 - - try: - return pipeline.query_duration(gst.FORMAT_TIME)[0] - except gst.QueryError: - return self._get_duration(pipeline, retries + 1) - - def _on_timeout(self): - _log.error('Timeout in thumbnailer!') - self.shutdown() - - def _on_bus_error(self, *args): - _log.error('AHAHAHA! Error! args: {0}'.format(args)) - - def shutdown(self): - ''' - Tell gobject to call __halt when the mainloop is idle. - ''' - _log.info('Shutting down') - self.__halt() - - def __halt(self): - ''' - Halt all pipelines and shut down the main loop - ''' - _log.info('Halting...') - self.state = self.STATE_HALTING - - self.__disconnect() - - gobject.idle_add(self.__halt_final) - - def __disconnect(self): - _log.debug('Disconnecting...') - if not self.playbin is None: - self.playbin.set_state(gst.STATE_NULL) - for sink in self.playbin.sinks(): - name = sink.get_name() - factoryname = sink.get_factory().get_name() - - _log.debug('Disconnecting {0}'.format(name)) - - if factoryname == "fakesink": - pad = sink.get_pad("sink") - pad.remove_buffer_probe(self.buffer_probes[name]) - del self.buffer_probes[name] - - self.playbin = None - - if self.bus is not None: - self.bus.disconnect(self.watch_id) - self.bus = None - - def __halt_final(self): - _log.info('Done') - if self.errors: - _log.error(','.join(self.errors)) - - self.loop.quit() - - class VideoThumbnailerMarkII(object): ''' Creates a thumbnail from a video file. Rewrite of VideoThumbnailer. @@ -398,8 +121,8 @@ class VideoThumbnailerMarkII(object): self.run() except Exception as exc: _log.critical( - 'Exception "{0}" caught, disconnecting and re-raising'\ - .format(exc)) + 'Exception "{0}" caught, shutting down mainloop and re-raising'\ + .format(exc)) self.disconnect() raise @@ -410,7 +133,8 @@ class VideoThumbnailerMarkII(object): self.mainloop.run() def on_playbin_message(self, message_bus, message): - _log.debug('playbin message: {0}'.format(message)) + # Silenced to prevent clobbering of output + #_log.debug('playbin message: {0}'.format(message)) if message.type == gst.MESSAGE_ERROR: _log.error('playbin error: {0}'.format(message)) @@ -433,17 +157,20 @@ pending: {2}'.format( def on_playbin_paused(self): if self.has_reached_playbin_pause: - _log.warn('Has already reached logic for playbin pause. Aborting \ + _log.warn('Has already reached on_playbin_paused. Aborting \ without doing anything this time.') return False self.has_reached_playbin_pause = True + # XXX: Why is this even needed at this point? current_video = self.playbin.get_property('current-video') if not current_video: - _log.critical('thumbnail could not get any video data \ + _log.critical('Could not get any video data \ from playbin') + else: + _log.info('Got video data from playbin') self.duration = self.get_duration(self.playbin) self.permission_to_take_picture = True @@ -474,7 +201,8 @@ from playbin') return False def on_thumbnail_message(self, message_bus, message): - _log.debug('thumbnail message: {0}'.format(message)) + # This is silenced to prevent clobbering of the terminal window + #_log.debug('thumbnail message: {0}'.format(message)) if message.type == gst.MESSAGE_ERROR: _log.error('thumbnail error: {0}'.format(message.parse_error())) @@ -490,29 +218,10 @@ pending: {2}'.format( cur_state, pending_state)) - if cur_state == gst.STATE_PAUSED and\ - not self.state == self.STATE_PROCESSING_THUMBNAIL: - self.state = self.STATE_PROCESSING_THUMBNAIL - + if cur_state == gst.STATE_PAUSED and \ + not self.state == self.STATE_PROCESSING_THUMBNAIL: # Find the fakesink sink pad and attach the on_buffer_probe # handler to it. - for sink in self.thumbnail_pipeline.sinks(): - sink_name = sink.get_name() - sink_factory_name = sink.get_factory().get_name() - - if sink_factory_name == 'fakesink': - sink_pad = sink.get_pad('sink') - - self.buffer_probes[sink_name] = sink_pad\ - .add_buffer_probe( - self.on_pad_buffer_probe, - sink_name) - - _log.info('Attached buffer probes: {0}'.format( - self.buffer_probes)) - - break - seek_amount = self.position_callback(self.duration, gst) seek_result = self.thumbnail_pipeline.seek( @@ -525,10 +234,30 @@ pending: {2}'.format( 0) if not seek_result: - _log.critical('Could not seek.') + _log.info('Could not seek.') + else: + _log.info('Seek successful, attaching buffer probe') + self.state = self.STATE_PROCESSING_THUMBNAIL + for sink in self.thumbnail_pipeline.sinks(): + sink_name = sink.get_name() + sink_factory_name = sink.get_factory().get_name() + + if sink_factory_name == 'fakesink': + sink_pad = sink.get_pad('sink') + + self.buffer_probes[sink_name] = sink_pad\ + .add_buffer_probe( + self.on_pad_buffer_probe, + sink_name) + + _log.info('Attached buffer probes: {0}'.format( + self.buffer_probes)) + + break + elif self.state == self.STATE_PROCESSING_THUMBNAIL: - _log.debug('Already processing thumbnail') + _log.info('Already processing thumbnail') def on_pad_buffer_probe(self, *args): _log.debug('buffer probe handler: {0}'.format(args)) @@ -649,7 +378,7 @@ pending: {2}'.format( return self.get_duration(pipeline, attempt + 1) -class VideoTranscoder: +class VideoTranscoder(object): ''' Video transcoder @@ -1011,6 +740,10 @@ if __name__ == '__main__': action='store_true', help='Dear program, please be quiet unless *error*') + parser.add_option('-w', '--width', + type=int, + default=180) + (options, args) = parser.parse_args() if options.verbose: @@ -1030,6 +763,7 @@ if __name__ == '__main__': transcoder = VideoTranscoder() if options.action == 'thumbnail': + args.append(options.width) VideoThumbnailerMarkII(*args) elif options.action == 'video': def cb(data): diff --git a/mediagoblin/plugins/api/views.py b/mediagoblin/plugins/api/views.py index 2055a663..fde76fe4 100644 --- a/mediagoblin/plugins/api/views.py +++ b/mediagoblin/plugins/api/views.py @@ -18,7 +18,6 @@ import json import logging from os.path import splitext -from werkzeug.datastructures import FileStorage from werkzeug.exceptions import BadRequest, Forbidden from werkzeug.wrappers import Response @@ -27,7 +26,8 @@ from mediagoblin.meddleware.csrf import csrf_exempt from mediagoblin.media_types import sniff_media from mediagoblin.plugins.api.tools import api_auth, get_entry_serializable, \ json_response -from mediagoblin.submit.lib import prepare_queue_task, run_process_media +from mediagoblin.submit.lib import check_file_field, prepare_queue_task, \ + run_process_media _log = logging.getLogger(__name__) @@ -45,9 +45,7 @@ def post_entry(request): _log.debug('Must POST against post_entry') raise BadRequest() - if not 'file' in request.files \ - or not isinstance(request.files['file'], FileStorage) \ - or not request.files['file'].stream: + if not check_file_field(request, 'file'): _log.debug('File field not found') raise BadRequest() diff --git a/mediagoblin/plugins/oauth/views.py b/mediagoblin/plugins/oauth/views.py index c7b2a332..ea45c209 100644 --- a/mediagoblin/plugins/oauth/views.py +++ b/mediagoblin/plugins/oauth/views.py @@ -45,11 +45,11 @@ def register_client(request): if request.method == 'POST' and form.validate(): client = OAuthClient() - client.name = unicode(request.form['name']) - client.description = unicode(request.form['description']) - client.type = unicode(request.form['type']) + client.name = unicode(form.name.data) + client.description = unicode(form.description.data) + client.type = unicode(form.type.data) client.owner_id = request.user.id - client.redirect_uri = unicode(request.form['redirect_uri']) + client.redirect_uri = unicode(form.redirect_uri.data) client.generate_identifier() client.generate_secret() diff --git a/mediagoblin/tests/test_tests.py b/mediagoblin/plugins/piwigo/forms.py index d539f1e0..5bb12e62 100644 --- a/mediagoblin/tests/test_tests.py +++ b/mediagoblin/plugins/piwigo/forms.py @@ -1,5 +1,5 @@ # GNU MediaGoblin -- federated, autonomous media hosting -# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -14,23 +14,15 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from mediagoblin import mg_globals -from mediagoblin.tests.tools import get_app, fixture_add_user -from mediagoblin.db.models import User +import wtforms -def test_get_app_wipes_db(): - """ - Make sure we get a fresh database on every wipe :) - """ - get_app(dump_old_app=True) - assert User.query.count() == 0 - fixture_add_user() - assert User.query.count() == 1 - - get_app(dump_old_app=False) - assert User.query.count() == 1 - - get_app(dump_old_app=True) - assert User.query.count() == 0 +class AddSimpleForm(wtforms.Form): + image = wtforms.FileField() + name = wtforms.TextField( + validators=[wtforms.validators.Length(min=0, max=500)]) + comment = wtforms.TextField() + # tags = wtforms.FieldList(wtforms.TextField()) + category = wtforms.IntegerField() + level = wtforms.IntegerField() diff --git a/mediagoblin/plugins/piwigo/views.py b/mediagoblin/plugins/piwigo/views.py index e9ce6206..3dee09cd 100644 --- a/mediagoblin/plugins/piwigo/views.py +++ b/mediagoblin/plugins/piwigo/views.py @@ -24,6 +24,7 @@ from mediagoblin import mg_globals from mediagoblin.meddleware.csrf import csrf_exempt from mediagoblin.tools.response import render_404 from .tools import CmdTable, PwgNamedArray, response_xml +from .forms import AddSimpleForm _log = logging.getLogger(__name__) @@ -45,7 +46,7 @@ def pwg_logout(request): @CmdTable("pwg.getVersion") def pwg_getversion(request): - return "piwigo 2.5.0 (MediaGoblin)" + return "2.5.0 (MediaGoblin)" @CmdTable("pwg.session.getStatus") @@ -80,6 +81,20 @@ def pwg_images_exist(request): return {} +@CmdTable("pwg.images.addSimple", True) +def pwg_images_addSimple(request): + form = AddSimpleForm(request.form) + if not form.validate(): + _log.error("addSimple: form failed") + raise BadRequest() + dump = [] + for f in form: + dump.append("%s=%r" % (f.name, f.data)) + _log.info("addimple: %r %s %r", request.form, " ".join(dump), request.files) + + return {'image_id': 123456, 'url': ''} + + md5sum_matcher = re.compile(r"^[0-9a-fA-F]{32}$") def fetch_md5(request, parm_name, optional_parm=False): diff --git a/mediagoblin/plugins/raven/README.rst b/mediagoblin/plugins/raven/README.rst index de5fd20d..4006060d 100644 --- a/mediagoblin/plugins/raven/README.rst +++ b/mediagoblin/plugins/raven/README.rst @@ -4,6 +4,8 @@ .. _raven-setup: +Warning: this plugin is somewhat experimental. + Set up the raven plugin ======================= diff --git a/mediagoblin/processing/__init__.py b/mediagoblin/processing/__init__.py index f9445e28..a1fd3fb7 100644 --- a/mediagoblin/processing/__init__.py +++ b/mediagoblin/processing/__init__.py @@ -97,14 +97,27 @@ class ProcessingState(object): return queued_filename def copy_original(self, target_name, keyname=u"original"): + self.store_public(keyname, self.get_queued_filename(), target_name) + + def store_public(self, keyname, local_file, target_name=None): + if target_name is None: + target_name = os.path.basename(local_file) target_filepath = create_pub_filepath(self.entry, target_name) - mgg.public_store.copy_local_to_storage(self.get_queued_filename(), - target_filepath) + if keyname in self.entry.media_files: + _log.warn("store_public: keyname %r already used for file %r, " + "replacing with %r", keyname, + self.entry.media_files[keyname], target_filepath) + mgg.public_store.copy_local_to_storage(local_file, target_filepath) self.entry.media_files[keyname] = target_filepath def delete_queue_file(self): + # Remove queued media file from storage and database. + # queued_filepath is in the task_id directory which should + # be removed too, but fail if the directory is not empty to be on + # the super-safe side. queued_filepath = self.entry.queued_media_file - mgg.queue_store.delete_file(queued_filepath) + mgg.queue_store.delete_file(queued_filepath) # rm file + mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir self.entry.queued_media_file = [] diff --git a/mediagoblin/storage/__init__.py b/mediagoblin/storage/__init__.py index 5c1d7d36..bbe134a7 100644 --- a/mediagoblin/storage/__init__.py +++ b/mediagoblin/storage/__init__.py @@ -101,10 +101,20 @@ class StorageInterface(object): def delete_file(self, filepath): """ - Delete or dereference the file at filepath. + Delete or dereference the file (not directory) at filepath. + """ + # Subclasses should override this method. + self.__raise_not_implemented() + + def delete_dir(self, dirpath, recursive=False): + """Delete the directory at dirpath + + :param recursive: Usually, a directory must not contain any + files for the delete to succeed. If True, containing files + and subdirectories within dirpath will be recursively + deleted. - This might need to delete directories, buckets, whatever, for - cleanliness. (Be sure to avoid race conditions on that though) + :returns: True in case of success, False otherwise. """ # Subclasses should override this method. self.__raise_not_implemented() diff --git a/mediagoblin/storage/filestorage.py b/mediagoblin/storage/filestorage.py index ef786b61..3d6e0753 100644 --- a/mediagoblin/storage/filestorage.py +++ b/mediagoblin/storage/filestorage.py @@ -62,10 +62,32 @@ class BasicFileStorage(StorageInterface): return open(self._resolve_filepath(filepath), mode) def delete_file(self, filepath): - # TODO: Also delete unused directories if empty (safely, with - # checks to avoid race conditions). + """Delete file at filepath + + Raises OSError in case filepath is a directory.""" + #TODO: log error os.remove(self._resolve_filepath(filepath)) + def delete_dir(self, dirpath, recursive=False): + """returns True on succes, False on failure""" + + dirpath = self._resolve_filepath(dirpath) + + # Shortcut the default and simple case of nonempty=F, recursive=F + if recursive: + try: + shutil.rmtree(dirpath) + except OSError as e: + #TODO: log something here + return False + else: # recursively delete everything + try: + os.rmdir(dirpath) + except OSError as e: + #TODO: log something here + return False + return True + def file_url(self, filepath): if not self.base_url: raise NoWebServing( diff --git a/mediagoblin/submit/lib.py b/mediagoblin/submit/lib.py index 679fc543..a5483471 100644 --- a/mediagoblin/submit/lib.py +++ b/mediagoblin/submit/lib.py @@ -17,6 +17,7 @@ import logging import uuid from werkzeug.utils import secure_filename +from werkzeug.datastructures import FileStorage from mediagoblin.processing import mark_entry_failed from mediagoblin.processing.task import process_media @@ -25,6 +26,16 @@ from mediagoblin.processing.task import process_media _log = logging.getLogger(__name__) +def check_file_field(request, field_name): + """Check if a file field meets minimal criteria""" + retval = (field_name in request.files + and isinstance(request.files[field_name], FileStorage) + and request.files[field_name].stream) + if not retval: + _log.debug("Form did not contain proper file field %s", field_name) + return retval + + def prepare_queue_task(app, entry, filename): """ Prepare a MediaEntry for the processing queue and get a queue file diff --git a/mediagoblin/submit/views.py b/mediagoblin/submit/views.py index def7e839..9d31c844 100644 --- a/mediagoblin/submit/views.py +++ b/mediagoblin/submit/views.py @@ -22,7 +22,6 @@ import logging _log = logging.getLogger(__name__) -from werkzeug.datastructures import FileStorage from mediagoblin.tools.text import convert_to_tag_list_of_dicts from mediagoblin.tools.translate import pass_to_ugettext as _ @@ -32,7 +31,8 @@ from mediagoblin.submit import forms as submit_forms from mediagoblin.messages import add_message, SUCCESS from mediagoblin.media_types import sniff_media, \ InvalidFileType, FileTypeNotSupported -from mediagoblin.submit.lib import run_process_media, prepare_queue_task +from mediagoblin.submit.lib import check_file_field, prepare_queue_task, \ + run_process_media @require_active_login @@ -44,9 +44,7 @@ def submit_start(request): license=request.user.license_preference) if request.method == 'POST' and submit_form.validate(): - if not ('file' in request.files - and isinstance(request.files['file'], FileStorage) - and request.files['file'].stream): + if not check_file_field(request, 'file'): submit_form.file.errors.append( _(u'You must provide a file.')) else: @@ -62,18 +60,18 @@ def submit_start(request): entry = request.db.MediaEntry() entry.media_type = unicode(media_type) entry.title = ( - unicode(request.form['title']) + unicode(submit_form.title.data) or unicode(splitext(filename)[0])) - entry.description = unicode(request.form.get('description')) + entry.description = unicode(submit_form.description.data) - entry.license = unicode(request.form.get('license', "")) or None + entry.license = unicode(submit_form.license.data) or None entry.uploader = request.user.id # Process the user's folksonomy "tags" entry.tags = convert_to_tag_list_of_dicts( - request.form.get('tags')) + submit_form.tags.data) # Generate a slug from the title entry.generate_slug() @@ -127,8 +125,8 @@ def add_collection(request, media=None): try: collection = request.db.Collection() - collection.title = unicode(request.form['title']) - collection.description = unicode(request.form.get('description')) + collection.title = unicode(submit_form.title.data) + collection.description = unicode(submit_form.description.data) collection.creator = request.user.id collection.generate_slug() diff --git a/mediagoblin/tests/conftest.py b/mediagoblin/tests/conftest.py new file mode 100644 index 00000000..25f495d6 --- /dev/null +++ b/mediagoblin/tests/conftest.py @@ -0,0 +1,15 @@ +from mediagoblin.tests import tools + +import pytest + +@pytest.fixture() +def test_app(request): + """ + py.test fixture to pass sandboxed mediagoblin applications into tests that + want them. + + You could make a local version of this method for your own tests + to override the paste and config files being used by passing them + in differently to get_app. + """ + return tools.get_app(request) diff --git a/mediagoblin/tests/pytest.ini b/mediagoblin/tests/pytest.ini new file mode 100644 index 00000000..d4aa2d69 --- /dev/null +++ b/mediagoblin/tests/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +usefixtures = tmpdir
\ No newline at end of file diff --git a/mediagoblin/tests/test_api.py b/mediagoblin/tests/test_api.py index 82b1c1b4..cff25776 100644 --- a/mediagoblin/tests/test_api.py +++ b/mediagoblin/tests/test_api.py @@ -20,9 +20,11 @@ import base64 from pkg_resources import resource_filename +import pytest + from mediagoblin import mg_globals from mediagoblin.tools import template, pluginapi -from mediagoblin.tests.tools import get_app, fixture_add_user +from mediagoblin.tests.tools import fixture_add_user _log = logging.getLogger(__name__) @@ -43,15 +45,14 @@ BIG_BLUE = resource('bigblue.png') class TestAPI(object): - def setUp(self): - self.app = get_app(dump_old_app=False) + def setup(self): self.db = mg_globals.database self.user_password = u'4cc355_70k3N' self.user = fixture_add_user(u'joapi', self.user_password) - def login(self): - self.app.post( + def login(self, test_app): + test_app.post( '/auth/login/', { 'username': self.user.username, 'password': self.user_password}) @@ -65,14 +66,14 @@ class TestAPI(object): self.user.username, self.user_password])))} - def do_post(self, data, **kwargs): + def do_post(self, data, test_app, **kwargs): url = kwargs.pop('url', '/api/submit') do_follow = kwargs.pop('do_follow', False) if not 'headers' in kwargs.keys(): kwargs['headers'] = self.http_auth_headers() - response = self.app.post(url, data, **kwargs) + response = test_app.post(url, data, **kwargs) if do_follow: response.follow() @@ -82,21 +83,22 @@ class TestAPI(object): def upload_data(self, filename): return {'upload_files': [('file', filename)]} - def test_1_test_test_view(self): - self.login() + def test_1_test_test_view(self, test_app): + self.login(test_app) - response = self.app.get( + response = test_app.get( '/api/test', headers=self.http_auth_headers()) assert response.body == \ '{"username": "joapi", "email": "joapi@example.com"}' - def test_2_test_submission(self): - self.login() + def test_2_test_submission(self, test_app): + self.login(test_app) response = self.do_post( {'title': 'Great JPG!'}, + test_app, **self.upload_data(GOOD_JPG)) assert response.status_int == 200 diff --git a/mediagoblin/tests/test_auth.py b/mediagoblin/tests/test_auth.py index a59a319c..755727f9 100644 --- a/mediagoblin/tests/test_auth.py +++ b/mediagoblin/tests/test_auth.py @@ -17,12 +17,10 @@ import urlparse import datetime -from nose.tools import assert_equal - from mediagoblin import mg_globals from mediagoblin.auth import lib as auth_lib from mediagoblin.db.models import User -from mediagoblin.tests.tools import setup_fresh_app, get_app, fixture_add_user +from mediagoblin.tests.tools import fixture_add_user from mediagoblin.tools import template, mail @@ -65,7 +63,6 @@ def test_bcrypt_gen_password_hash(): 'notthepassword', hashed_pw, '3><7R45417') -@setup_fresh_app def test_register_views(test_app): """ Massive test function that all our registration-related views all work. @@ -102,8 +99,8 @@ def test_register_views(test_app): context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html'] form = context['register_form'] - assert_equal (form.username.errors, [u'Field must be between 3 and 30 characters long.']) - assert_equal (form.password.errors, [u'Field must be between 5 and 1024 characters long.']) + assert form.username.errors == [u'Field must be between 3 and 30 characters long.'] + assert form.password.errors == [u'Field must be between 5 and 1024 characters long.'] ## bad form template.clear_test_template_context() @@ -114,11 +111,11 @@ def test_register_views(test_app): context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html'] form = context['register_form'] - assert_equal (form.username.errors, [u'This field does not take email addresses.']) - assert_equal (form.email.errors, [u'This field requires an email address.']) + assert form.username.errors == [u'This field does not take email addresses.'] + assert form.email.errors == [u'This field requires an email address.'] ## At this point there should be no users in the database ;) - assert_equal(User.query.count(), 0) + assert User.query.count() == 0 # Successful register # ------------------- @@ -131,9 +128,7 @@ def test_register_views(test_app): response.follow() ## Did we redirect to the proper page? Use the right template? - assert_equal( - urlparse.urlsplit(response.location)[2], - '/u/happygirl/') + assert urlparse.urlsplit(response.location)[2] == '/u/happygirl/' assert 'mediagoblin/user_pages/user.html' in template.TEMPLATE_TEST_CONTEXT ## Make sure user is in place @@ -224,9 +219,7 @@ def test_register_views(test_app): response.follow() ## Did we redirect to the proper page? Use the right template? - assert_equal( - urlparse.urlsplit(response.location)[2], - '/auth/login/') + assert urlparse.urlsplit(response.location)[2] == '/auth/login/' assert 'mediagoblin/auth/login.html' in template.TEMPLATE_TEST_CONTEXT ## Make sure link to change password is sent by email @@ -257,7 +250,7 @@ def test_register_views(test_app): response = test_app.get( "/auth/forgot_password/verify/?userid=%s&token=total_bs" % unicode( new_user.id), status=404) - assert_equal(response.status.split()[0], u'404') # status="404 NOT FOUND" + assert response.status.split()[0] == u'404' # status="404 NOT FOUND" ## Try using an expired token to change password, shouldn't work template.clear_test_template_context() @@ -266,7 +259,7 @@ def test_register_views(test_app): new_user.fp_token_expire = datetime.datetime.now() new_user.save() response = test_app.get("%s?%s" % (path, get_params), status=404) - assert_equal(response.status.split()[0], u'404') # status="404 NOT FOUND" + assert response.status.split()[0] == u'404' # status="404 NOT FOUND" new_user.fp_token_expire = real_token_expiration new_user.save() @@ -294,17 +287,14 @@ def test_register_views(test_app): # User should be redirected response.follow() - assert_equal( - urlparse.urlsplit(response.location)[2], - '/') + assert urlparse.urlsplit(response.location)[2] == '/' assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT -def test_authentication_views(): +def test_authentication_views(test_app): """ Test logging in and logging out """ - test_app = get_app(dump_old_app=False) # Make a new user test_user = fixture_add_user(active_user=False) @@ -372,9 +362,7 @@ def test_authentication_views(): # User should be redirected response.follow() - assert_equal( - urlparse.urlsplit(response.location)[2], - '/') + assert urlparse.urlsplit(response.location)[2] == '/' assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT # Make sure user is in the session @@ -389,9 +377,7 @@ def test_authentication_views(): # Should be redirected to index page response.follow() - assert_equal( - urlparse.urlsplit(response.location)[2], - '/') + assert urlparse.urlsplit(response.location)[2] == '/' assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT # Make sure the user is not in the session @@ -407,6 +393,4 @@ def test_authentication_views(): 'username': u'chris', 'password': 'toast', 'next' : '/u/chris/'}) - assert_equal( - urlparse.urlsplit(response.location)[2], - '/u/chris/') + assert urlparse.urlsplit(response.location)[2] == '/u/chris/' diff --git a/mediagoblin/tests/test_collections.py b/mediagoblin/tests/test_collections.py index b19f6362..87782f30 100644 --- a/mediagoblin/tests/test_collections.py +++ b/mediagoblin/tests/test_collections.py @@ -14,17 +14,12 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from mediagoblin.tests.tools import fixture_add_collection, fixture_add_user, \ - get_app +from mediagoblin.tests.tools import fixture_add_collection, fixture_add_user from mediagoblin.db.models import Collection, User -from mediagoblin.db.base import Session -from nose.tools import assert_equal -def test_user_deletes_collection(): +def test_user_deletes_collection(test_app): # Setup db. - get_app(dump_old_app=False) - user = fixture_add_user() coll = fixture_add_collection(user=user) # Reload into session: @@ -34,4 +29,4 @@ def test_user_deletes_collection(): user.delete() cnt2 = Collection.query.count() - assert_equal(cnt1, cnt2 + 1) + assert cnt1 == cnt2 + 1 diff --git a/mediagoblin/tests/test_csrf_middleware.py b/mediagoblin/tests/test_csrf_middleware.py index e720264c..a272caf6 100644 --- a/mediagoblin/tests/test_csrf_middleware.py +++ b/mediagoblin/tests/test_csrf_middleware.py @@ -14,12 +14,10 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from mediagoblin.tests.tools import get_app from mediagoblin import mg_globals -def test_csrf_cookie_set(): - test_app = get_app(dump_old_app=False) +def test_csrf_cookie_set(test_app): cookie_name = mg_globals.app_config['csrf_cookie_name'] # get login page @@ -33,11 +31,14 @@ def test_csrf_cookie_set(): assert response.headers.get('Vary', False) == 'Cookie' -def test_csrf_token_must_match(): - # We need a fresh app for this test on webtest < 1.3.6. - # We do not understand why, but it fixes the tests. - # If we require webtest >= 1.3.6, we can switch to a non fresh app here. - test_app = get_app(dump_old_app=True) +# We need a fresh app for this test on webtest < 1.3.6. +# We do not understand why, but it fixes the tests. +# If we require webtest >= 1.3.6, we can switch to a non fresh app here. +# +# ... this comment might be irrelevant post-pytest-fixtures, but I'm not +# removing it yet in case we move to module-level tests :) +# -- cwebber +def test_csrf_token_must_match(test_app): # construct a request with no cookie or form token assert test_app.post('/auth/login/', @@ -67,8 +68,7 @@ def test_csrf_token_must_match(): extra_environ={'gmg.verify_csrf': True}).\ status_int == 200 -def test_csrf_exempt(): - test_app = get_app(dump_old_app=False) +def test_csrf_exempt(test_app): # monkey with the views to decorate a known endpoint import mediagoblin.auth.views from mediagoblin.meddleware.csrf import csrf_exempt diff --git a/mediagoblin/tests/test_edit.py b/mediagoblin/tests/test_edit.py index 7db6eaea..cda2607f 100644 --- a/mediagoblin/tests/test_edit.py +++ b/mediagoblin/tests/test_edit.py @@ -14,35 +14,35 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from nose.tools import assert_equal +import pytest from mediagoblin import mg_globals from mediagoblin.db.models import User -from mediagoblin.tests.tools import get_app, fixture_add_user +from mediagoblin.tests.tools import fixture_add_user from mediagoblin.tools import template from mediagoblin.auth.lib import bcrypt_check_password class TestUserEdit(object): - def setUp(self): - self.app = get_app(dump_old_app=False) + def setup(self): # set up new user self.user_password = u'toast' self.user = fixture_add_user(password = self.user_password) - self.login() - def login(self): - self.app.post( + def login(self, test_app): + test_app.post( '/auth/login/', { 'username': self.user.username, 'password': self.user_password}) - def test_user_deletion(self): + def test_user_deletion(self, test_app): """Delete user via web interface""" + self.login(test_app) + # Make sure user exists assert User.query.filter_by(username=u'chris').first() - res = self.app.post('/edit/account/delete/', {'confirmed': 'y'}) + res = test_app.post('/edit/account/delete/', {'confirmed': 'y'}) # Make sure user has been deleted assert User.query.filter_by(username=u'chris').first() == None @@ -52,14 +52,16 @@ class TestUserEdit(object): #Restore user at end of test self.user = fixture_add_user(password = self.user_password) - self.login() + self.login(test_app) - def test_change_password(self): + def test_change_password(self, test_app): """Test changing password correctly and incorrectly""" + self.login(test_app) + # test that the password can be changed # template.clear_test_template_context() - res = self.app.post( + res = test_app.post( '/edit/account/', { 'old_password': 'toast', 'new_password': '123456', @@ -67,7 +69,7 @@ class TestUserEdit(object): }) # Check for redirect on success - assert_equal(res.status_int, 302) + assert res.status_int == 302 # test_user has to be fetched again in order to have the current values test_user = User.query.filter_by(username=u'chris').first() assert bcrypt_check_password('123456', test_user.pw_hash) @@ -76,7 +78,7 @@ class TestUserEdit(object): # test that the password cannot be changed if the given # old_password is wrong template.clear_test_template_context() - self.app.post( + test_app.post( '/edit/account/', { 'old_password': 'toast', 'new_password': '098765', @@ -86,50 +88,54 @@ class TestUserEdit(object): assert not bcrypt_check_password('098765', test_user.pw_hash) - - def test_change_bio_url(self): + def test_change_bio_url(self, test_app): """Test changing bio and URL""" + self.login(test_app) + # Test if legacy profile editing URL redirects correctly - res = self.app.post( + res = test_app.post( '/edit/profile/', { 'bio': u'I love toast!', 'url': u'http://dustycloud.org/'}, expect_errors=True) # Should redirect to /u/chris/edit/ - assert_equal (res.status_int, 302) + assert res.status_int == 302 assert res.headers['Location'].endswith("/u/chris/edit/") - res = self.app.post( + res = test_app.post( '/u/chris/edit/', { 'bio': u'I love toast!', 'url': u'http://dustycloud.org/'}) test_user = User.query.filter_by(username=u'chris').first() - assert_equal(test_user.bio, u'I love toast!') - assert_equal(test_user.url, u'http://dustycloud.org/') + assert test_user.bio == u'I love toast!' + assert test_user.url == u'http://dustycloud.org/' # change a different user than the logged in (should fail with 403) fixture_add_user(username=u"foo") - res = self.app.post( + res = test_app.post( '/u/foo/edit/', { 'bio': u'I love toast!', 'url': u'http://dustycloud.org/'}, expect_errors=True) - assert_equal(res.status_int, 403) + assert res.status_int == 403 # test changing the bio and the URL inproperly too_long_bio = 150 * 'T' + 150 * 'o' + 150 * 'a' + 150 * 's' + 150* 't' - self.app.post( + test_app.post( '/u/chris/edit/', { # more than 500 characters 'bio': too_long_bio, 'url': 'this-is-no-url'}) # Check form errors - context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/edit/edit_profile.html'] + context = template.TEMPLATE_TEST_CONTEXT[ + 'mediagoblin/edit/edit_profile.html'] form = context['form'] - assert_equal(form.bio.errors, [u'Field must be between 0 and 500 characters long.']) - assert_equal(form.url.errors, [u'This address contains errors']) + assert form.bio.errors == [ + u'Field must be between 0 and 500 characters long.'] + assert form.url.errors == [ + u'This address contains errors'] # test changing the url inproperly diff --git a/mediagoblin/tests/test_globals.py b/mediagoblin/tests/test_globals.py index 303f89e2..fe3088f8 100644 --- a/mediagoblin/tests/test_globals.py +++ b/mediagoblin/tests/test_globals.py @@ -14,16 +14,16 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from nose.tools import assert_raises +import pytest from mediagoblin import mg_globals class TestGlobals(object): - def setUp(self): + def setup(self): self.old_database = mg_globals.database - def tearDown(self): + def teardown(self): mg_globals.database = self.old_database def test_setup_globals(self): @@ -36,7 +36,7 @@ class TestGlobals(object): assert mg_globals.public_store == 'my favorite public_store!' assert mg_globals.queue_store == 'my favorite queue_store!' - assert_raises( + pytest.raises( AssertionError, mg_globals.setup_globals, - no_such_global_foo = "Dummy") + no_such_global_foo="Dummy") diff --git a/mediagoblin/tests/test_http_callback.py b/mediagoblin/tests/test_http_callback.py index 8bee7045..e2c85d0d 100644 --- a/mediagoblin/tests/test_http_callback.py +++ b/mediagoblin/tests/test_http_callback.py @@ -20,28 +20,27 @@ from urlparse import urlparse, parse_qs from mediagoblin import mg_globals from mediagoblin.tools import processing -from mediagoblin.tests.tools import get_app, fixture_add_user +from mediagoblin.tests.tools import fixture_add_user from mediagoblin.tests.test_submission import GOOD_PNG from mediagoblin.tests import test_oauth as oauth class TestHTTPCallback(object): - def setUp(self): - self.app = get_app(dump_old_app=False) + def _setup(self, test_app): self.db = mg_globals.database self.user_password = u'secret' self.user = fixture_add_user(u'call_back', self.user_password) - self.login() + self.login(test_app) - def login(self): - self.app.post('/auth/login/', { + def login(self, testapp): + testapp.post('/auth/login/', { 'username': self.user.username, 'password': self.user_password}) - def get_access_token(self, client_id, client_secret, code): - response = self.app.get('/oauth/access_token', { + def get_access_token(self, testapp, client_id, client_secret, code): + response = testapp.get('/oauth/access_token', { 'code': code, 'client_id': client_id, 'client_secret': client_secret}) @@ -50,13 +49,15 @@ class TestHTTPCallback(object): return response_data['access_token'] - def test_callback(self): + def test_callback(self, test_app): ''' Test processing HTTP callback ''' + self._setup(test_app) self.oauth = oauth.TestOAuth() - self.oauth.setUp() + self.oauth._setup(test_app) - redirect, client_id = self.oauth.test_4_authorize_confidential_client() + redirect, client_id = self.oauth.test_4_authorize_confidential_client( + test_app) code = parse_qs(urlparse(redirect.location).query)['code'][0] @@ -65,11 +66,11 @@ class TestHTTPCallback(object): client_secret = client.secret - access_token = self.get_access_token(client_id, client_secret, code) + access_token = self.get_access_token(test_app, client_id, client_secret, code) callback_url = 'https://foo.example?secrettestmediagoblinparam' - res = self.app.post('/api/submit?client_id={0}&access_token={1}\ + res = test_app.post('/api/submit?client_id={0}&access_token={1}\ &client_secret={2}'.format( client_id, access_token, diff --git a/mediagoblin/tests/test_messages.py b/mediagoblin/tests/test_messages.py index 4c0f3e2e..3ac917b0 100644 --- a/mediagoblin/tests/test_messages.py +++ b/mediagoblin/tests/test_messages.py @@ -15,18 +15,15 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. from mediagoblin.messages import fetch_messages, add_message -from mediagoblin.tests.tools import get_app from mediagoblin.tools import template - -def test_messages(): +def test_messages(test_app): """ Added messages should show up in the request.session, fetched messages should be the same as the added ones, and fetching should clear the message list. """ - test_app = get_app(dump_old_app=False) # Aquire a request object test_app.get('/') context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html'] diff --git a/mediagoblin/tests/test_misc.py b/mediagoblin/tests/test_misc.py index 776affc6..755d863f 100644 --- a/mediagoblin/tests/test_misc.py +++ b/mediagoblin/tests/test_misc.py @@ -14,21 +14,17 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from nose.tools import assert_equal - from mediagoblin.db.base import Session from mediagoblin.db.models import User, MediaEntry, MediaComment -from mediagoblin.tests.tools import get_app, \ - fixture_add_user, fixture_media_entry +from mediagoblin.tests.tools import fixture_add_user, fixture_media_entry -def test_404_for_non_existent(): - test_app = get_app(dump_old_app=False) +def test_404_for_non_existent(test_app): res = test_app.get('/does-not-exist/', expect_errors=True) - assert_equal(res.status_int, 404) + assert res.status_int == 404 -def test_user_deletes_other_comments(): +def test_user_deletes_other_comments(test_app): user_a = fixture_add_user(u"chris_a") user_b = fixture_add_user(u"chris_b") @@ -60,11 +56,11 @@ def test_user_deletes_other_comments(): cmt_cnt2 = MediaComment.query.count() # One user deleted - assert_equal(usr_cnt2, usr_cnt1 - 1) + assert usr_cnt2 == usr_cnt1 - 1 # One media gone - assert_equal(med_cnt2, med_cnt1 - 1) + assert med_cnt2 == med_cnt1 - 1 # Three of four comments gone. - assert_equal(cmt_cnt2, cmt_cnt1 - 3) + assert cmt_cnt2 == cmt_cnt1 - 3 User.query.get(user_b.id).delete() @@ -73,14 +69,14 @@ def test_user_deletes_other_comments(): cmt_cnt2 = MediaComment.query.count() # All users gone - assert_equal(usr_cnt2, usr_cnt1 - 2) + assert usr_cnt2 == usr_cnt1 - 2 # All media gone - assert_equal(med_cnt2, med_cnt1 - 2) + assert med_cnt2 == med_cnt1 - 2 # All comments gone - assert_equal(cmt_cnt2, cmt_cnt1 - 4) + assert cmt_cnt2 == cmt_cnt1 - 4 -def test_media_deletes_broken_attachment(): +def test_media_deletes_broken_attachment(test_app): user_a = fixture_add_user(u"chris_a") media = fixture_media_entry(uploader=user_a.id, save=False) diff --git a/mediagoblin/tests/test_modelmethods.py b/mediagoblin/tests/test_modelmethods.py index 7719bd97..427aa47c 100644 --- a/mediagoblin/tests/test_modelmethods.py +++ b/mediagoblin/tests/test_modelmethods.py @@ -17,13 +17,10 @@ # Maybe not every model needs a test, but some models have special # methods, and so it makes sense to test them here. -from nose.tools import assert_equal - from mediagoblin.db.base import Session from mediagoblin.db.models import MediaEntry -from mediagoblin.tests.tools import get_app, \ - fixture_add_user +from mediagoblin.tests.tools import fixture_add_user import mock @@ -35,8 +32,7 @@ UUID_MOCK = mock.Mock(return_value=FakeUUID()) class TestMediaEntrySlugs(object): - def setUp(self): - self.test_app = get_app(dump_old_app=True) + def _setup(self): self.chris_user = fixture_add_user(u'chris') self.emily_user = fixture_add_user(u'emily') self.existing_entry = self._insert_media_entry_fixture( @@ -57,56 +53,78 @@ class TestMediaEntrySlugs(object): return entry - def test_unique_slug_from_title(self): + def test_unique_slug_from_title(self, test_app): + self._setup() + entry = self._insert_media_entry_fixture(u"Totally unique slug!", save=False) entry.generate_slug() assert entry.slug == u'totally-unique-slug' - def test_old_good_unique_slug(self): + def test_old_good_unique_slug(self, test_app): + self._setup() + entry = self._insert_media_entry_fixture( u"A title here", u"a-different-slug-there", save=False) entry.generate_slug() assert entry.slug == u"a-different-slug-there" - def test_old_weird_slug(self): + def test_old_weird_slug(self, test_app): + self._setup() + entry = self._insert_media_entry_fixture( slug=u"wowee!!!!!", save=False) entry.generate_slug() assert entry.slug == u"wowee" - def test_existing_slug_use_id(self): - entry = self._insert_media_entry_fixture( - u"Beware, I exist!!", this_id=9000, save=False) - entry.generate_slug() - assert entry.slug == u"beware-i-exist-9000" - - @mock.patch('uuid.uuid4', UUID_MOCK) - def test_existing_slug_cant_use_id(self): - # This one grabs the nine thousand slug - self._insert_media_entry_fixture( - slug=u"beware-i-exist-9000") + def test_existing_slug_use_id(self, test_app): + self._setup() entry = self._insert_media_entry_fixture( u"Beware, I exist!!", this_id=9000, save=False) entry.generate_slug() - assert entry.slug == u"beware-i-exist-test" - - @mock.patch('uuid.uuid4', UUID_MOCK) - def test_existing_slug_cant_use_id_extra_junk(self): - # This one grabs the nine thousand slug - self._insert_media_entry_fixture( - slug=u"beware-i-exist-9000") - - # This one grabs makes sure the annoyance doesn't stop - self._insert_media_entry_fixture( - slug=u"beware-i-exist-test") - - entry = self._insert_media_entry_fixture( - u"Beware, I exist!!", this_id=9000, save=False) - entry.generate_slug() - assert entry.slug == u"beware-i-exist-testtest" + assert entry.slug == u"beware-i-exist-9000" - def test_garbage_slug(self): + def test_existing_slug_cant_use_id(self, test_app): + self._setup() + + # Getting tired of dealing with test_app and this mock.patch + # thing conflicting, getting lazy. + @mock.patch('uuid.uuid4', UUID_MOCK) + def _real_test(): + # This one grabs the nine thousand slug + self._insert_media_entry_fixture( + slug=u"beware-i-exist-9000") + + entry = self._insert_media_entry_fixture( + u"Beware, I exist!!", this_id=9000, save=False) + entry.generate_slug() + assert entry.slug == u"beware-i-exist-test" + + _real_test() + + def test_existing_slug_cant_use_id_extra_junk(self, test_app): + self._setup() + + # Getting tired of dealing with test_app and this mock.patch + # thing conflicting, getting lazy. + @mock.patch('uuid.uuid4', UUID_MOCK) + def _real_test(): + # This one grabs the nine thousand slug + self._insert_media_entry_fixture( + slug=u"beware-i-exist-9000") + + # This one grabs makes sure the annoyance doesn't stop + self._insert_media_entry_fixture( + slug=u"beware-i-exist-test") + + entry = self._insert_media_entry_fixture( + u"Beware, I exist!!", this_id=9000, save=False) + entry.generate_slug() + assert entry.slug == u"beware-i-exist-testtest" + + _real_test() + + def test_garbage_slug(self, test_app): """ Titles that sound totally like Q*Bert shouldn't have slugs at all. We'll just reference them by id. @@ -126,13 +144,15 @@ class TestMediaEntrySlugs(object): | |#| |#| |#| |#| \|/ \|/ \|/ \|/ """ + self._setup() + qbert_entry = self._insert_media_entry_fixture( u"@!#?@!", save=False) qbert_entry.generate_slug() assert qbert_entry.slug is None -def test_media_data_init(): +def test_media_data_init(test_app): Session.rollback() Session.remove() media = MediaEntry() @@ -144,4 +164,4 @@ def test_media_data_init(): for obj in Session(): obj_in_session += 1 print repr(obj) - assert_equal(obj_in_session, 0) + assert obj_in_session == 0 diff --git a/mediagoblin/tests/test_oauth.py b/mediagoblin/tests/test_oauth.py index 94ba5dab..901556fe 100644 --- a/mediagoblin/tests/test_oauth.py +++ b/mediagoblin/tests/test_oauth.py @@ -21,15 +21,14 @@ from urlparse import parse_qs, urlparse from mediagoblin import mg_globals from mediagoblin.tools import template, pluginapi -from mediagoblin.tests.tools import get_app, fixture_add_user +from mediagoblin.tests.tools import fixture_add_user _log = logging.getLogger(__name__) class TestOAuth(object): - def setUp(self): - self.app = get_app() + def _setup(self, test_app): self.db = mg_globals.database self.pman = pluginapi.PluginManager() @@ -37,17 +36,17 @@ class TestOAuth(object): self.user_password = u'4cc355_70k3N' self.user = fixture_add_user(u'joauth', self.user_password) - self.login() + self.login(test_app) - def login(self): - self.app.post( + def login(self, test_app): + test_app.post( '/auth/login/', { 'username': self.user.username, 'password': self.user_password}) - def register_client(self, name, client_type, description=None, + def register_client(self, test_app, name, client_type, description=None, redirect_uri=''): - return self.app.post( + return test_app.post( '/oauth/client/register', { 'name': name, 'description': description, @@ -57,9 +56,11 @@ class TestOAuth(object): def get_context(self, template_name): return template.TEMPLATE_TEST_CONTEXT[template_name] - def test_1_public_client_registration_without_redirect_uri(self): + def test_1_public_client_registration_without_redirect_uri(self, test_app): ''' Test 'public' OAuth client registration without any redirect uri ''' - response = self.register_client(u'OMGOMGOMG', 'public', + self._setup(test_app) + + response = self.register_client(test_app, u'OMGOMGOMG', 'public', 'OMGOMG Apache License v2') ctx = self.get_context('oauth/client/register.html') @@ -75,10 +76,10 @@ class TestOAuth(object): # Should not pass through assert not client - def test_2_successful_public_client_registration(self): + def test_2_successful_public_client_registration(self, test_app): ''' Successfully register a public client ''' - self.login() - self.register_client(u'OMGOMG', 'public', 'OMG!', + self._setup(test_app) + self.register_client(test_app, u'OMGOMG', 'public', 'OMG!', 'http://foo.example') client = self.db.OAuthClient.query.filter( @@ -87,9 +88,12 @@ class TestOAuth(object): # Client should have been registered assert client - def test_3_successful_confidential_client_reg(self): + def test_3_successful_confidential_client_reg(self, test_app): ''' Register a confidential OAuth client ''' - response = self.register_client(u'GMOGMO', 'confidential', 'NO GMO!') + self._setup(test_app) + + response = self.register_client( + test_app, u'GMOGMO', 'confidential', 'NO GMO!') assert response.status_int == 302 @@ -101,15 +105,16 @@ class TestOAuth(object): return client - def test_4_authorize_confidential_client(self): + def test_4_authorize_confidential_client(self, test_app): ''' Authorize a confidential client as a logged in user ''' + self._setup(test_app) - client = self.test_3_successful_confidential_client_reg() + client = self.test_3_successful_confidential_client_reg(test_app) client_identifier = client.identifier redirect_uri = 'https://foo.example' - response = self.app.get('/oauth/authorize', { + response = test_app.get('/oauth/authorize', { 'client_id': client.identifier, 'scope': 'admin', 'redirect_uri': redirect_uri}) @@ -122,7 +127,7 @@ class TestOAuth(object): form = ctx['form'] # Short for client authorization post reponse - capr = self.app.post( + capr = test_app.post( '/oauth/client/authorize', { 'client_id': form.client_id.data, 'allow': 'Allow', @@ -139,16 +144,19 @@ class TestOAuth(object): def get_code_from_redirect_uri(self, uri): return parse_qs(urlparse(uri).query)['code'][0] - def test_token_endpoint_successful_confidential_request(self): + def test_token_endpoint_successful_confidential_request(self, test_app): ''' Successful request against token endpoint ''' - code_redirect, client_id = self.test_4_authorize_confidential_client() + self._setup(test_app) + + code_redirect, client_id = self.test_4_authorize_confidential_client( + test_app) code = self.get_code_from_redirect_uri(code_redirect.location) client = self.db.OAuthClient.query.filter( self.db.OAuthClient.identifier == unicode(client_id)).first() - token_res = self.app.get('/oauth/access_token?client_id={0}&\ + token_res = test_app.get('/oauth/access_token?client_id={0}&\ code={1}&client_secret={2}'.format(client_id, code, client.secret)) assert token_res.status_int == 200 @@ -162,16 +170,19 @@ code={1}&client_secret={2}'.format(client_id, code, client.secret)) assert type(token_data['expires_in']) == int assert token_data['expires_in'] > 0 - def test_token_endpont_missing_id_confidential_request(self): + def test_token_endpont_missing_id_confidential_request(self, test_app): ''' Unsuccessful request against token endpoint, missing client_id ''' - code_redirect, client_id = self.test_4_authorize_confidential_client() + self._setup(test_app) + + code_redirect, client_id = self.test_4_authorize_confidential_client( + test_app) code = self.get_code_from_redirect_uri(code_redirect.location) client = self.db.OAuthClient.query.filter( self.db.OAuthClient.identifier == unicode(client_id)).first() - token_res = self.app.get('/oauth/access_token?\ + token_res = test_app.get('/oauth/access_token?\ code={0}&client_secret={1}'.format(code, client.secret)) assert token_res.status_int == 200 diff --git a/mediagoblin/tests/test_paste.ini b/mediagoblin/tests/test_paste.ini index d7c18642..875b4f65 100644 --- a/mediagoblin/tests/test_paste.ini +++ b/mediagoblin/tests/test_paste.ini @@ -10,7 +10,7 @@ use = egg:Paste#urlmap [app:mediagoblin] use = egg:mediagoblin#app filter-with = beaker -config = %(here)s/test_mgoblin_app.ini +config = %(here)s/mediagoblin.ini [app:publicstore_serve] use = egg:Paste#static diff --git a/mediagoblin/tests/test_pluginapi.py b/mediagoblin/tests/test_pluginapi.py index 315a95da..245c396d 100644 --- a/mediagoblin/tests/test_pluginapi.py +++ b/mediagoblin/tests/test_pluginapi.py @@ -19,7 +19,6 @@ from configobj import ConfigObj from mediagoblin import mg_globals from mediagoblin.init.plugins import setup_plugins from mediagoblin.tools import pluginapi -from nose.tools import eq_ def with_cleanup(*modules_to_delete): @@ -97,7 +96,7 @@ def test_no_plugins(): setup_plugins() # Make sure we didn't load anything. - eq_(len(pman.plugins), 0) + assert len(pman.plugins) == 0 @with_cleanup('mediagoblin.plugins.sampleplugin') @@ -117,14 +116,14 @@ def test_one_plugin(): setup_plugins() # Make sure we only found one plugin - eq_(len(pman.plugins), 1) + assert len(pman.plugins) == 1 # Make sure the plugin is the one we think it is. - eq_(pman.plugins[0], 'mediagoblin.plugins.sampleplugin') + assert pman.plugins[0] == 'mediagoblin.plugins.sampleplugin' # Make sure there was one hook registered - eq_(len(pman.hooks), 1) + assert len(pman.hooks) == 1 # Make sure _setup_plugin_called was called once import mediagoblin.plugins.sampleplugin - eq_(mediagoblin.plugins.sampleplugin._setup_plugin_called, 1) + assert mediagoblin.plugins.sampleplugin._setup_plugin_called == 1 @with_cleanup('mediagoblin.plugins.sampleplugin') @@ -145,14 +144,14 @@ def test_same_plugin_twice(): setup_plugins() # Make sure we only found one plugin - eq_(len(pman.plugins), 1) + assert len(pman.plugins) == 1 # Make sure the plugin is the one we think it is. - eq_(pman.plugins[0], 'mediagoblin.plugins.sampleplugin') + assert pman.plugins[0] == 'mediagoblin.plugins.sampleplugin' # Make sure there was one hook registered - eq_(len(pman.hooks), 1) + assert len(pman.hooks) == 1 # Make sure _setup_plugin_called was called once import mediagoblin.plugins.sampleplugin - eq_(mediagoblin.plugins.sampleplugin._setup_plugin_called, 1) + assert mediagoblin.plugins.sampleplugin._setup_plugin_called == 1 @with_cleanup() @@ -172,4 +171,4 @@ def test_disabled_plugin(): setup_plugins() # Make sure we didn't load the plugin - eq_(len(pman.plugins), 0) + assert len(pman.plugins) == 0 diff --git a/mediagoblin/tests/test_storage.py b/mediagoblin/tests/test_storage.py index 61326ae9..749f7b07 100644 --- a/mediagoblin/tests/test_storage.py +++ b/mediagoblin/tests/test_storage.py @@ -18,7 +18,7 @@ import os import tempfile -from nose.tools import assert_raises, assert_equal, assert_true +import pytest from werkzeug.utils import secure_filename from mediagoblin import storage @@ -41,10 +41,8 @@ def test_clean_listy_filepath(): assert storage.clean_listy_filepath( ['../../../etc/', 'passwd']) == expected - assert_raises( - storage.InvalidFilepath, - storage.clean_listy_filepath, - ['../../', 'linooks.jpg']) + with pytest.raises(storage.InvalidFilepath): + storage.clean_listy_filepath(['../../', 'linooks.jpg']) class FakeStorageSystem(): @@ -78,10 +76,10 @@ def test_storage_system_from_config(): 'garbage_arg': 'garbage_arg', 'storage_class': 'mediagoblin.tests.test_storage:FakeStorageSystem'}) - assert_equal(this_storage.foobie, 'eiboof') - assert_equal(this_storage.blech, 'hcelb') - assert_equal(unicode(this_storage.__class__), - u'mediagoblin.tests.test_storage.FakeStorageSystem') + assert this_storage.foobie == 'eiboof' + assert this_storage.blech == 'hcelb' + assert unicode(this_storage.__class__) == \ + u'mediagoblin.tests.test_storage.FakeStorageSystem' ########################## @@ -89,7 +87,7 @@ def test_storage_system_from_config(): ########################## def get_tmp_filestorage(mount_url=None, fake_remote=False): - tmpdir = tempfile.mkdtemp() + tmpdir = tempfile.mkdtemp(prefix="test_gmg_storage") if fake_remote: this_storage = FakeRemoteStorage(tmpdir, mount_url) else: @@ -108,11 +106,13 @@ def test_basic_storage__resolve_filepath(): assert result == os.path.join( tmpdir, 'etc/passwd') - assert_raises( + pytest.raises( storage.InvalidFilepath, this_storage._resolve_filepath, ['../../', 'etc', 'passwd']) + os.rmdir(tmpdir) + def test_basic_storage_file_exists(): tmpdir, this_storage = get_tmp_filestorage() @@ -126,6 +126,8 @@ def test_basic_storage_file_exists(): assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol']) assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol']) + this_storage.delete_file(['dir1', 'dir2', 'filename.txt']) + def test_basic_storage_get_unique_filepath(): tmpdir, this_storage = get_tmp_filestorage() @@ -146,6 +148,8 @@ def test_basic_storage_get_unique_filepath(): assert len(new_filename) > len('filename.txt') assert new_filename == secure_filename(new_filename) + os.remove(filename) + def test_basic_storage_get_file(): tmpdir, this_storage = get_tmp_filestorage() @@ -182,6 +186,10 @@ def test_basic_storage_get_file(): with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile: assert testyfile.read() == 'testy file! so testy.' + this_storage.delete_file(filepath) + this_storage.delete_file(new_filepath) + this_storage.delete_file(['testydir', 'testyfile.txt']) + def test_basic_storage_delete_file(): tmpdir, this_storage = get_tmp_filestorage() @@ -205,10 +213,11 @@ def test_basic_storage_delete_file(): def test_basic_storage_url_for_file(): # Not supplying a base_url should actually just bork. tmpdir, this_storage = get_tmp_filestorage() - assert_raises( + pytest.raises( storage.NoWebServing, this_storage.file_url, ['dir1', 'dir2', 'filename.txt']) + os.rmdir(tmpdir) # base_url without domain tmpdir, this_storage = get_tmp_filestorage('/media/') @@ -216,6 +225,7 @@ def test_basic_storage_url_for_file(): ['dir1', 'dir2', 'filename.txt']) expected = '/media/dir1/dir2/filename.txt' assert result == expected + os.rmdir(tmpdir) # base_url with domain tmpdir, this_storage = get_tmp_filestorage( @@ -224,6 +234,7 @@ def test_basic_storage_url_for_file(): ['dir1', 'dir2', 'filename.txt']) expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt' assert result == expected + os.rmdir(tmpdir) def test_basic_storage_get_local_path(): @@ -237,10 +248,13 @@ def test_basic_storage_get_local_path(): assert result == expected + os.rmdir(tmpdir) + def test_basic_storage_is_local(): tmpdir, this_storage = get_tmp_filestorage() assert this_storage.local_storage is True + os.rmdir(tmpdir) def test_basic_storage_copy_locally(): @@ -255,9 +269,13 @@ def test_basic_storage_copy_locally(): new_file_dest = os.path.join(dest_tmpdir, 'file2.txt') this_storage.copy_locally(filepath, new_file_dest) + this_storage.delete_file(filepath) assert file(new_file_dest).read() == 'Testing this file' + os.remove(new_file_dest) + os.rmdir(dest_tmpdir) + def _test_copy_local_to_storage_works(tmpdir, this_storage): local_filename = tempfile.mktemp() @@ -267,10 +285,14 @@ def _test_copy_local_to_storage_works(tmpdir, this_storage): this_storage.copy_local_to_storage( local_filename, ['dir1', 'dir2', 'copiedto.txt']) + os.remove(local_filename) + assert file( os.path.join(tmpdir, 'dir1/dir2/copiedto.txt'), 'r').read() == 'haha' + this_storage.delete_file(['dir1', 'dir2', 'copiedto.txt']) + def test_basic_storage_copy_local_to_storage(): tmpdir, this_storage = get_tmp_filestorage() diff --git a/mediagoblin/tests/test_submission.py b/mediagoblin/tests/test_submission.py index fc3d8c83..ac714252 100644 --- a/mediagoblin/tests/test_submission.py +++ b/mediagoblin/tests/test_submission.py @@ -21,11 +21,9 @@ sys.setdefaultencoding('utf-8') import urlparse import os -from nose.tools import assert_equal, assert_true from pkg_resources import resource_filename -from mediagoblin.tests.tools import get_app, \ - fixture_add_user +from mediagoblin.tests.tools import fixture_add_user from mediagoblin import mg_globals from mediagoblin.db.models import MediaEntry from mediagoblin.tools import template @@ -51,8 +49,8 @@ REQUEST_CONTEXT = ['mediagoblin/user_pages/user.html', 'request'] class TestSubmission: - def setUp(self): - self.test_app = get_app(dump_old_app=False) + def _setup(self, test_app): + self.test_app = test_app # TODO: Possibly abstract into a decorator like: # @as_authenticated_user('chris') @@ -88,27 +86,29 @@ class TestSubmission: def check_comments(self, request, media_id, count): comments = request.db.MediaComment.find({'media_entry': media_id}) - assert_equal(count, len(list(comments))) + assert count == len(list(comments)) + + def test_missing_fields(self, test_app): + self._setup(test_app) - def test_missing_fields(self): # Test blank form # --------------- response, form = self.do_post({}, *FORM_CONTEXT) - assert_equal(form.file.errors, [u'You must provide a file.']) + assert form.file.errors == [u'You must provide a file.'] # Test blank file # --------------- response, form = self.do_post({'title': u'test title'}, *FORM_CONTEXT) - assert_equal(form.file.errors, [u'You must provide a file.']) + assert form.file.errors == [u'You must provide a file.'] def check_url(self, response, path): - assert_equal(urlparse.urlsplit(response.location)[2], path) + assert urlparse.urlsplit(response.location)[2] == path def check_normal_upload(self, title, filename): response, context = self.do_post({'title': title}, do_follow=True, **self.upload_data(filename)) self.check_url(response, '/u/{0}/'.format(self.test_user.username)) - assert_true('mediagoblin/user_pages/user.html' in context) + assert 'mediagoblin/user_pages/user.html' in context # Make sure the media view is at least reachable, logged in... url = '/u/{0}/m/{1}/'.format(self.test_user.username, title.lower().replace(' ', '-')) @@ -117,21 +117,25 @@ class TestSubmission: self.logout() self.test_app.get(url) - def test_normal_jpg(self): + def test_normal_jpg(self, test_app): + self._setup(test_app) self.check_normal_upload(u'Normal upload 1', GOOD_JPG) - def test_normal_png(self): + def test_normal_png(self, test_app): + self._setup(test_app) self.check_normal_upload(u'Normal upload 2', GOOD_PNG) def check_media(self, request, find_data, count=None): media = MediaEntry.find(find_data) if count is not None: - assert_equal(media.count(), count) + assert media.count() == count if count == 0: return return media[0] - def test_tags(self): + def test_tags(self, test_app): + self._setup(test_app) + # Good tag string # -------- response, request = self.do_post({'title': u'Balanced Goblin 2', @@ -151,12 +155,14 @@ class TestSubmission: 'tags': BAD_TAG_STRING}, *FORM_CONTEXT, **self.upload_data(GOOD_JPG)) - assert_equal(form.tags.errors, [ + assert form.tags.errors == [ u'Tags must be shorter than 50 characters. ' \ 'Tags that are too long: ' \ - 'ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu']) + 'ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu'] + + def test_delete(self, test_app): + self._setup(test_app) - def test_delete(self): response, request = self.do_post({'title': u'Balanced Goblin'}, *REQUEST_CONTEXT, do_follow=True, **self.upload_data(GOOD_JPG)) @@ -173,7 +179,7 @@ class TestSubmission: 'slug': u"Balanced=Goblin", 'tags': u''}) media = self.check_media(request, {'title': u'Balanced Goblin'}, 1) - assert_equal(media.slug, u"balanced-goblin") + assert media.slug == u"balanced-goblin" # Add a comment, so we can test for its deletion later. self.check_comments(request, media_id, 0) @@ -201,33 +207,39 @@ class TestSubmission: self.check_media(request, {'id': media_id}, 0) self.check_comments(request, media_id, 0) - def test_evil_file(self): + def test_evil_file(self, test_app): + self._setup(test_app) + # Test non-suppoerted file with non-supported extension # ----------------------------------------------------- response, form = self.do_post({'title': u'Malicious Upload 1'}, *FORM_CONTEXT, **self.upload_data(EVIL_FILE)) - assert_equal(len(form.file.errors), 1) + assert len(form.file.errors) == 1 assert 'Sorry, I don\'t support that file type :(' == \ str(form.file.errors[0]) - def test_get_media_manager(self): + def test_get_media_manager(self, test_app): """Test if the get_media_manger function returns sensible things """ + self._setup(test_app) + response, request = self.do_post({'title': u'Balanced Goblin'}, *REQUEST_CONTEXT, do_follow=True, **self.upload_data(GOOD_JPG)) media = self.check_media(request, {'title': u'Balanced Goblin'}, 1) - assert_equal(media.media_type, u'mediagoblin.media_types.image') - assert_equal(media.media_manager, img_MEDIA_MANAGER) + assert media.media_type == u'mediagoblin.media_types.image' + assert media.media_manager == img_MEDIA_MANAGER - def test_sniffing(self): + def test_sniffing(self, test_app): ''' Test sniffing mechanism to assert that regular uploads work as intended ''' + self._setup(test_app) + template.clear_test_template_context() response = self.test_app.post( '/submit/', { @@ -254,25 +266,36 @@ class TestSubmission: **self.upload_data(filename)) self.check_url(response, '/u/{0}/'.format(self.test_user.username)) entry = mg_globals.database.MediaEntry.find_one({'title': title}) - assert_equal(entry.state, 'failed') - assert_equal(entry.fail_error, u'mediagoblin.processing:BadMediaFail') + assert entry.state == 'failed' + assert entry.fail_error == u'mediagoblin.processing:BadMediaFail' + + def test_evil_jpg(self, test_app): + self._setup(test_app) - def test_evil_jpg(self): # Test non-supported file with .jpg extension # ------------------------------------------- self.check_false_image(u'Malicious Upload 2', EVIL_JPG) - def test_evil_png(self): + def test_evil_png(self, test_app): + self._setup(test_app) + # Test non-supported file with .png extension # ------------------------------------------- self.check_false_image(u'Malicious Upload 3', EVIL_PNG) - def test_media_data(self): + def test_media_data(self, test_app): + self._setup(test_app) + self.check_normal_upload(u"With GPS data", GPS_JPG) media = self.check_media(None, {"title": u"With GPS data"}, 1) - assert_equal(media.media_data.gps_latitude, 59.336666666666666) + assert media.media_data.gps_latitude == 59.336666666666666 + + def test_processing(self, test_app): + self._setup(test_app) + + public_store_dir = mg_globals.global_config[ + 'storage:publicstore']['base_dir'] - def test_processing(self): data = {'title': u'Big Blue'} response, request = self.do_post(data, *REQUEST_CONTEXT, do_follow=True, **self.upload_data(BIG_BLUE)) @@ -282,12 +305,11 @@ class TestSubmission: ('medium', 'bigblue.medium.png'), ('thumb', 'bigblue.thumbnail.png')): # Does the processed image have a good filename? - filename = resource_filename( - 'mediagoblin.tests', - os.path.join('test_user_dev/media/public', - *media.media_files.get(key, []))) - assert_true(filename.endswith('_' + basename)) + filename = os.path.join( + public_store_dir, + *media.media_files.get(key, [])) + assert filename.endswith('_' + basename) # Is it smaller than the last processed image we looked at? size = os.stat(filename).st_size - assert_true(last_size > size) + assert last_size > size last_size = size diff --git a/mediagoblin/tests/test_tags.py b/mediagoblin/tests/test_tags.py index ccb93085..e25cc283 100644 --- a/mediagoblin/tests/test_tags.py +++ b/mediagoblin/tests/test_tags.py @@ -14,17 +14,15 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from mediagoblin.tests.tools import get_app from mediagoblin.tools import text -def test_list_of_dicts_conversion(): +def test_list_of_dicts_conversion(test_app): """ When the user adds tags to a media entry, the string from the form is converted into a list of tags, where each tag is stored in the database as a dict. Each tag dict should contain the tag's name and slug. Another function performs the reverse operation when populating a form to edit tags. """ - test_app = get_app(dump_old_app=False) # Leading, trailing, and internal whitespace should be removed and slugified assert text.convert_to_tag_list_of_dicts('sleep , 6 AM, chainsaw! ') == [ {'name': u'sleep', 'slug': u'sleep'}, diff --git a/mediagoblin/tests/test_workbench.py b/mediagoblin/tests/test_workbench.py index 636c8689..9cd49671 100644 --- a/mediagoblin/tests/test_workbench.py +++ b/mediagoblin/tests/test_workbench.py @@ -25,9 +25,14 @@ from mediagoblin.tests.test_storage import get_tmp_filestorage class TestWorkbench(object): - def setUp(self): + def setup(self): + self.workbench_base = tempfile.mkdtemp(prefix='gmg_workbench_testing') self.workbench_manager = workbench.WorkbenchManager( - os.path.join(tempfile.gettempdir(), u'mgoblin_workbench_testing')) + self.workbench_base) + + def teardown(self): + # If the workbench is empty, this should work. + os.rmdir(self.workbench_base) def test_create_workbench(self): workbench = self.workbench_manager.create() @@ -70,6 +75,7 @@ class TestWorkbench(object): filename = this_workbench.localized_file(this_storage, filepath) assert filename == os.path.join( tmpdir, 'dir1/dir2/ourfile.txt') + this_storage.delete_file(filepath) # with a fake remote file storage tmpdir, this_storage = get_tmp_filestorage(fake_remote=True) @@ -95,6 +101,9 @@ class TestWorkbench(object): assert filename == os.path.join( this_workbench.dir, 'thisfile.text') + this_storage.delete_file(filepath) + this_workbench.destroy() + def test_workbench_decorator(self): """Test @get_workbench decorator and automatic cleanup""" # The decorator needs mg_globals.workbench_manager diff --git a/mediagoblin/tests/tools.py b/mediagoblin/tests/tools.py index 1d8e6e96..a0498a6e 100644 --- a/mediagoblin/tests/tools.py +++ b/mediagoblin/tests/tools.py @@ -43,14 +43,14 @@ TEST_APP_CONFIG = pkg_resources.resource_filename( 'mediagoblin.tests', 'test_mgoblin_app.ini') TEST_USER_DEV = pkg_resources.resource_filename( 'mediagoblin.tests', 'test_user_dev') -MGOBLIN_APP = None + USER_DEV_DIRECTORIES_TO_SETUP = ['media/public', 'media/queue'] BAD_CELERY_MESSAGE = """\ -Sorry, you *absolutely* must run nosetests with the +Sorry, you *absolutely* must run tests with the mediagoblin.init.celery.from_tests module. Like so: -$ CELERY_CONFIG_MODULE=mediagoblin.init.celery.from_tests ./bin/nosetests""" +$ CELERY_CONFIG_MODULE=mediagoblin.init.celery.from_tests ./bin/py.test""" class BadCeleryEnviron(Exception): pass @@ -101,7 +101,30 @@ def suicide_if_bad_celery_environ(): raise BadCeleryEnviron(BAD_CELERY_MESSAGE) -def get_app(dump_old_app=True): +def get_app(request, paste_config=None, mgoblin_config=None): + """Create a MediaGoblin app for testing. + + Args: + - request: Not an http request, but a pytest fixture request. We + use this to make temporary directories that pytest + automatically cleans up as needed. + - paste_config: particular paste config used by this application. + - mgoblin_config: particular mediagoblin config used by this + application. + """ + paste_config = paste_config or TEST_SERVER_CONFIG + mgoblin_config = mgoblin_config or TEST_APP_CONFIG + + # This is the directory we're copying the paste/mgoblin config stuff into + run_dir = request.config._tmpdirhandler.mktemp( + 'mgoblin_app', numbered=True) + user_dev_dir = run_dir.mkdir('test_user_dev').strpath + + new_paste_config = run_dir.join('paste.ini').strpath + new_mgoblin_config = run_dir.join('mediagoblin.ini').strpath + shutil.copyfile(paste_config, new_paste_config) + shutil.copyfile(mgoblin_config, new_mgoblin_config) + suicide_if_bad_celery_environ() # Make sure we've turned on testing @@ -110,26 +133,16 @@ def get_app(dump_old_app=True): # Leave this imported as it sets up celery. from mediagoblin.init.celery import from_tests - global MGOBLIN_APP - - # Just return the old app if that exists and it's okay to set up - # and return - if MGOBLIN_APP and not dump_old_app: - return MGOBLIN_APP - Session.rollback() Session.remove() - # Remove and reinstall user_dev directories - if os.path.exists(TEST_USER_DEV): - shutil.rmtree(TEST_USER_DEV) - + # install user_dev directories for directory in USER_DEV_DIRECTORIES_TO_SETUP: - full_dir = os.path.join(TEST_USER_DEV, directory) + full_dir = os.path.join(user_dev_dir, directory) os.makedirs(full_dir) # Get app config - global_config, validation_result = read_mediagoblin_config(TEST_APP_CONFIG) + global_config, validation_result = read_mediagoblin_config(new_mgoblin_config) app_config = global_config['mediagoblin'] # Run database setup/migrations @@ -137,7 +150,7 @@ def get_app(dump_old_app=True): # setup app and return test_app = loadapp( - 'config:' + TEST_SERVER_CONFIG) + 'config:' + new_paste_config) # Re-setup celery setup_celery_app(app_config, global_config) @@ -149,26 +162,10 @@ def get_app(dump_old_app=True): mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app)) app = TestApp(test_app) - MGOBLIN_APP = app return app -def setup_fresh_app(func): - """ - Decorator to setup a fresh test application for this function. - - Cleans out test buckets and passes in a new, fresh test_app. - """ - @wraps(func) - def wrapper(*args, **kwargs): - test_app = get_app() - testing.clear_test_buckets() - return func(test_app, *args, **kwargs) - - return wrapper - - def install_fixtures_simple(db, fixtures): """ Very simply install fixtures in the database diff --git a/mediagoblin/user_pages/views.py b/mediagoblin/user_pages/views.py index c611daa1..61c23f16 100644 --- a/mediagoblin/user_pages/views.py +++ b/mediagoblin/user_pages/views.py @@ -204,11 +204,11 @@ def media_collect(request, media): # If we are here, method=POST and the form is valid, submit things. # If the user is adding a new collection, use that: - if request.form['collection_title']: + if form.collection_title.data: # Make sure this user isn't duplicating an existing collection existing_collection = Collection.query.filter_by( creator=request.user.id, - title=request.form['collection_title']).first() + title=form.collection_title.data).first() if existing_collection: messages.add_message(request, messages.ERROR, _('You already have a collection called "%s"!') @@ -218,8 +218,8 @@ def media_collect(request, media): media=media.slug_or_id) collection = Collection() - collection.title = request.form['collection_title'] - collection.description = request.form.get('collection_description') + collection.title = form.collection_title.data + collection.description = form.collection_description.data collection.creator = request.user.id collection.generate_slug() collection.save() @@ -251,7 +251,7 @@ def media_collect(request, media): collection_item = request.db.CollectionItem() collection_item.collection = collection.id collection_item.media_entry = media.id - collection_item.note = request.form['note'] + collection_item.note = form.note.data collection_item.save() collection.items = collection.items + 1 |