diff options
Diffstat (limited to 'mediagoblin/tests')
33 files changed, 751 insertions, 487 deletions
diff --git a/mediagoblin/tests/media_tools.py b/mediagoblin/tests/media_tools.py new file mode 100644 index 00000000..8d58c024 --- /dev/null +++ b/mediagoblin/tests/media_tools.py @@ -0,0 +1,61 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from contextlib import contextmanager +import tempfile + +import gi +gi.require_version('Gst', '1.0') +from gi.repository import Gst +Gst.init(None) + +@contextmanager +def create_av(make_video=False, make_audio=False): + 'creates audio/video in `path`, throws AssertionError on any error' + media = tempfile.NamedTemporaryFile(suffix='.ogg') + pipeline = Gst.Pipeline() + mux = Gst.ElementFactory.make('oggmux', 'mux') + pipeline.add(mux) + if make_video: + video_src = Gst.ElementFactory.make('videotestsrc', 'video_src') + video_src.set_property('num-buffers', 20) + video_enc = Gst.ElementFactory.make('theoraenc', 'video_enc') + pipeline.add(video_src) + pipeline.add(video_enc) + assert video_src.link(video_enc) + assert video_enc.link(mux) + if make_audio: + audio_src = Gst.ElementFactory.make('audiotestsrc', 'audio_src') + audio_src.set_property('num-buffers', 20) + audio_enc = Gst.ElementFactory.make('vorbisenc', 'audio_enc') + pipeline.add(audio_src) + pipeline.add(audio_enc) + assert audio_src.link(audio_enc) + assert audio_enc.link(mux) + sink = Gst.ElementFactory.make('filesink', 'sink') + sink.set_property('location', media.name) + pipeline.add(sink) + mux.link(sink) + pipeline.set_state(Gst.State.PLAYING) + state = pipeline.get_state(Gst.SECOND) + assert state[0] == Gst.StateChangeReturn.SUCCESS + bus = pipeline.get_bus() + message = bus.timed_pop_filtered( + Gst.SECOND, # one second should be more than enough for 50-buf vid + Gst.MessageType.ERROR | Gst.MessageType.EOS) + assert message.type == Gst.MessageType.EOS + pipeline.set_state(Gst.State.NULL) + yield media.name diff --git a/mediagoblin/tests/test_api.py b/mediagoblin/tests/test_api.py index 93e82f18..83003875 100644 --- a/mediagoblin/tests/test_api.py +++ b/mediagoblin/tests/test_api.py @@ -15,7 +15,10 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import json -import mock +try: + import mock +except ImportError: + import unittest.mock as mock import pytest from webtest import AppError @@ -23,6 +26,7 @@ from webtest import AppError from .resources import GOOD_JPG from mediagoblin import mg_globals from mediagoblin.db.models import User, MediaEntry, MediaComment +from mediagoblin.tools.routing import extract_url_arguments from mediagoblin.tests.tools import fixture_add_user from mediagoblin.moderation.tools import take_away_privileges @@ -55,7 +59,7 @@ class TestAPI(object): headers=headers ) - return response, json.loads(response.body) + return response, json.loads(response.body.decode()) def _upload_image(self, test_app, image): """ Uploads and image to MediaGoblin via pump.io API """ @@ -72,7 +76,7 @@ class TestAPI(object): data, headers=headers ) - image = json.loads(response.body) + image = json.loads(response.body.decode()) return response, image @@ -142,7 +146,7 @@ class TestAPI(object): headers=headers ) - assert "403 FORBIDDEN" in excinfo.value.message + assert "403 FORBIDDEN" in excinfo.value.args[0] def test_unable_to_post_feed_as_someone_else(self, test_app): """ Tests that can't post an image to someone else's feed """ @@ -165,7 +169,7 @@ class TestAPI(object): headers=headers ) - assert "403 FORBIDDEN" in excinfo.value.message + assert "403 FORBIDDEN" in excinfo.value.args[0] def test_only_able_to_update_own_image(self, test_app): """ Test's that the uploader is the only person who can update an image """ @@ -184,7 +188,8 @@ class TestAPI(object): # Lets change the image uploader to be self.other_user, this is easier # than uploading the image as someone else as the way self.mocked_oauth_required # and self._upload_image. - media = MediaEntry.query.filter_by(id=data["object"]["id"]).first() + id = int(data["object"]["id"].split("/")[-2]) + media = MediaEntry.query.filter_by(id=id).first() media.uploader = self.other_user.id media.save() @@ -197,7 +202,7 @@ class TestAPI(object): headers=headers ) - assert "403 FORBIDDEN" in excinfo.value.message + assert "403 FORBIDDEN" in excinfo.value.args[0] def test_upload_image_with_filename(self, test_app): """ Tests that you can upload an image with filename and description """ @@ -224,16 +229,17 @@ class TestAPI(object): headers={"Content-Type": "application/json"} ) - image = json.loads(response.body)["object"] + image = json.loads(response.body.decode())["object"] # Check everything has been set on the media correctly - media = MediaEntry.query.filter_by(id=image["id"]).first() + id = int(image["id"].split("/")[-2]) + media = MediaEntry.query.filter_by(id=id).first() assert media.title == title assert media.description == description assert media.license == license # Check we're being given back everything we should on an update - assert image["id"] == media.id + assert int(image["id"].split("/")[-2]) == media.id assert image["displayName"] == title assert image["content"] == description assert image["license"] == license @@ -260,7 +266,7 @@ class TestAPI(object): ) # Assert that we've got a 403 - assert "403 FORBIDDEN" in excinfo.value.message + assert "403 FORBIDDEN" in excinfo.value.args[0] def test_object_endpoint(self, test_app): """ Tests that object can be looked up at endpoint """ @@ -281,11 +287,11 @@ class TestAPI(object): with self.mock_oauth(): request = test_app.get(object_uri) - image = json.loads(request.body) - entry = MediaEntry.query.filter_by(id=image["id"]).first() + image = json.loads(request.body.decode()) + entry_id = int(image["id"].split("/")[-2]) + entry = MediaEntry.query.filter_by(id=entry_id).first() assert request.status_code == 200 - assert entry.id == image["id"] assert "image" in image assert "fullImage" in image @@ -313,7 +319,8 @@ class TestAPI(object): assert response.status_code == 200 # Find the objects in the database - media = MediaEntry.query.filter_by(id=data["object"]["id"]).first() + media_id = int(data["object"]["id"].split("/")[-2]) + media = MediaEntry.query.filter_by(id=media_id).first() comment = media.get_comments()[0] # Tests that it matches in the database @@ -321,7 +328,6 @@ class TestAPI(object): assert comment.content == content # Test that the response is what we should be given - assert comment.id == comment_data["object"]["id"] assert comment.content == comment_data["object"]["content"] def test_unable_to_post_comment_as_someone_else(self, test_app): @@ -351,7 +357,7 @@ class TestAPI(object): headers=headers ) - assert "403 FORBIDDEN" in excinfo.value.message + assert "403 FORBIDDEN" in excinfo.value.args[0] def test_unable_to_update_someone_elses_comment(self, test_app): """ Test that you're able to update someoen elses comment. """ @@ -376,7 +382,7 @@ class TestAPI(object): response, comment_data = self._activity_to_feed(test_app, activity) # change who uploaded the comment as it's easier than changing - comment_id = comment_data["object"]["id"] + comment_id = int(comment_data["object"]["id"].split("/")[-2]) comment = MediaComment.query.filter_by(id=comment_id).first() comment.author = self.other_user.id comment.save() @@ -396,14 +402,14 @@ class TestAPI(object): headers=headers ) - assert "403 FORBIDDEN" in excinfo.value.message + assert "403 FORBIDDEN" in excinfo.value.args[0] def test_profile(self, test_app): """ Tests profile endpoint """ uri = "/api/user/{0}/profile".format(self.user.username) with self.mock_oauth(): response = test_app.get(uri) - profile = json.loads(response.body) + profile = json.loads(response.body.decode()) assert response.status_code == 200 @@ -417,7 +423,7 @@ class TestAPI(object): uri = "/api/user/{0}/".format(self.user.username) with self.mock_oauth(): response = test_app.get(uri) - user = json.loads(response.body) + user = json.loads(response.body.decode()) assert response.status_code == 200 @@ -433,7 +439,7 @@ class TestAPI(object): with pytest.raises(AppError) as excinfo: response = test_app.get("/api/whoami") - assert "401 UNAUTHORIZED" in excinfo.value.message + assert "401 UNAUTHORIZED" in excinfo.value.args[0] def test_read_feed(self, test_app): """ Test able to read objects from the feed """ @@ -443,7 +449,7 @@ class TestAPI(object): uri = "/api/user/{0}/feed".format(self.active_user.username) with self.mock_oauth(): response = test_app.get(uri) - feed = json.loads(response.body) + feed = json.loads(response.body.decode()) assert response.status_code == 200 @@ -468,9 +474,9 @@ class TestAPI(object): with pytest.raises(AppError) as excinfo: self._post_image_to_feed(test_app, data) - assert "403 FORBIDDEN" in excinfo.value.message + assert "403 FORBIDDEN" in excinfo.value.args[0] - def test_object_endpoint(self, test_app): + def test_object_endpoint_requestable(self, test_app): """ Test that object endpoint can be requested """ response, data = self._upload_image(test_app, GOOD_JPG) response, data = self._post_image_to_feed(test_app, data) @@ -478,7 +484,7 @@ class TestAPI(object): with self.mock_oauth(): response = test_app.get(data["object"]["links"]["self"]["href"]) - data = json.loads(response.body) + data = json.loads(response.body.decode()) assert response.status_code == 200 @@ -486,3 +492,109 @@ class TestAPI(object): assert "url" in data assert "links" in data assert data["objectType"] == "image" + + def test_delete_media_by_activity(self, test_app): + """ Test that an image can be deleted by a delete activity to feed """ + response, data = self._upload_image(test_app, GOOD_JPG) + response, data = self._post_image_to_feed(test_app, data) + object_id = data["object"]["id"] + + activity = { + "verb": "delete", + "object": { + "id": object_id, + "objectType": "image", + } + } + + response = self._activity_to_feed(test_app, activity)[1] + + # Check the media is no longer in the database + media_id = int(object_id.split("/")[-2]) + media = MediaEntry.query.filter_by(id=media_id).first() + + assert media is None + + # Check we've been given the full delete activity back + assert "id" in response + assert response["verb"] == "delete" + assert "object" in response + assert response["object"]["id"] == object_id + assert response["object"]["objectType"] == "image" + + def test_delete_comment_by_activity(self, test_app): + """ Test that a comment is deleted by a delete activity to feed """ + # First upload an image to comment against + response, data = self._upload_image(test_app, GOOD_JPG) + response, data = self._post_image_to_feed(test_app, data) + + # Post a comment to delete + activity = { + "verb": "post", + "object": { + "objectType": "comment", + "content": "This is a comment.", + "inReplyTo": data["object"], + } + } + + comment = self._activity_to_feed(test_app, activity)[1] + + # Now delete the image + activity = { + "verb": "delete", + "object": { + "id": comment["object"]["id"], + "objectType": "comment", + } + } + + delete = self._activity_to_feed(test_app, activity)[1] + + # Verify the comment no longer exists + comment_id = int(comment["object"]["id"].split("/")[-2]) + assert MediaComment.query.filter_by(id=comment_id).first() is None + + # Check we've got a delete activity back + assert "id" in delete + assert delete["verb"] == "delete" + assert "object" in delete + assert delete["object"]["id"] == comment["object"]["id"] + assert delete["object"]["objectType"] == "comment" + + def test_edit_comment(self, test_app): + """ Test that someone can update their own comment """ + # First upload an image to comment against + response, data = self._upload_image(test_app, GOOD_JPG) + response, data = self._post_image_to_feed(test_app, data) + + # Post a comment to edit + activity = { + "verb": "post", + "object": { + "objectType": "comment", + "content": "This is a comment", + "inReplyTo": data["object"], + } + } + + comment = self._activity_to_feed(test_app, activity)[1] + + # Now create an update activity to change the content + activity = { + "verb": "update", + "object": { + "id": comment["object"]["id"], + "content": "This is my fancy new content string!", + "objectType": "comment", + }, + } + + comment = self._activity_to_feed(test_app, activity)[1] + + # Verify the comment reflects the changes + comment_id = int(comment["object"]["id"].split("/")[-2]) + model = MediaComment.query.filter_by(id=comment_id).first() + + assert model.content == activity["object"]["content"] + diff --git a/mediagoblin/tests/test_audio.py b/mediagoblin/tests/test_audio.py new file mode 100644 index 00000000..9826ceb1 --- /dev/null +++ b/mediagoblin/tests/test_audio.py @@ -0,0 +1,105 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import tempfile +import shutil +import os +import pytest +from contextlib import contextmanager +import logging +import imghdr + +#os.environ['GST_DEBUG'] = '4,python:4' + +pytest.importorskip("gi.repository.Gst") +pytest.importorskip("scikits.audiolab") +import gi +gi.require_version('Gst', '1.0') +from gi.repository import Gst +Gst.init(None) + +from mediagoblin.media_types.audio.transcoders import (AudioTranscoder, + AudioThumbnailer) +from mediagoblin.media_types.tools import discover + + +@contextmanager +def create_audio(): + audio = tempfile.NamedTemporaryFile() + src = Gst.ElementFactory.make('audiotestsrc', None) + src.set_property('num-buffers', 50) + enc = Gst.ElementFactory.make('flacenc', None) + dst = Gst.ElementFactory.make('filesink', None) + dst.set_property('location', audio.name) + pipeline = Gst.Pipeline() + pipeline.add(src) + pipeline.add(enc) + pipeline.add(dst) + src.link(enc) + enc.link(dst) + pipeline.set_state(Gst.State.PLAYING) + state = pipeline.get_state(3 * Gst.SECOND) + assert state[0] == Gst.StateChangeReturn.SUCCESS + bus = pipeline.get_bus() + bus.timed_pop_filtered( + 3 * Gst.SECOND, + Gst.MessageType.ERROR | Gst.MessageType.EOS) + pipeline.set_state(Gst.State.NULL) + yield (audio.name) + + +@contextmanager +def create_data_for_test(): + with create_audio() as audio_name: + second_file = tempfile.NamedTemporaryFile() + yield (audio_name, second_file.name) + + +def test_transcoder(): + ''' + Tests AudioTransocder's transcode method + ''' + transcoder = AudioTranscoder() + with create_data_for_test() as (audio_name, result_name): + transcoder.transcode(audio_name, result_name, quality=0.3, + progress_callback=None) + info = discover(result_name) + assert len(info.get_audio_streams()) == 1 + transcoder.transcode(audio_name, result_name, quality=0.3, + mux_name='oggmux', progress_callback=None) + info = discover(result_name) + assert len(info.get_audio_streams()) == 1 + + +def test_thumbnails(): + '''Test thumbnails generation. + + The code below heavily repeats + audio.processing.CommonAudioProcessor.create_spectrogram + 1. Create test audio + 2. Convert it to OGG source for spectogram using transcoder + 3. Create spectogram in jpg + + ''' + thumbnailer = AudioThumbnailer() + transcoder = AudioTranscoder() + with create_data_for_test() as (audio_name, new_name): + transcoder.transcode(audio_name, new_name, mux_name='oggmux') + thumbnail = tempfile.NamedTemporaryFile(suffix='.jpg') + # fft_size below is copypasted from config_spec.ini + thumbnailer.spectrogram(new_name, thumbnail.name, width=100, + fft_size=4096) + assert imghdr.what(thumbnail.name) == 'jpeg' diff --git a/mediagoblin/tests/test_auth.py b/mediagoblin/tests/test_auth.py index 1bbc3d01..5ce67688 100644 --- a/mediagoblin/tests/test_auth.py +++ b/mediagoblin/tests/test_auth.py @@ -14,10 +14,14 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import urlparse + import pkg_resources import pytest +import six + +import six.moves.urllib.parse as urlparse + from mediagoblin import mg_globals from mediagoblin.db.models import User from mediagoblin.tests.tools import get_app, fixture_add_user @@ -107,7 +111,7 @@ def test_register_views(test_app): ## Make sure user is logged in request = template.TEMPLATE_TEST_CONTEXT[ 'mediagoblin/user_pages/user_nonactive.html']['request'] - assert request.session['user_id'] == unicode(new_user.id) + assert request.session['user_id'] == six.text_type(new_user.id) ## Make sure we get email confirmation, and try verifying assert len(mail.EMAIL_TEST_INBOX) == 1 @@ -115,7 +119,7 @@ def test_register_views(test_app): assert message['To'] == 'angrygrrl@example.org' email_context = template.TEMPLATE_TEST_CONTEXT[ 'mediagoblin/auth/verification_email.txt'] - assert email_context['verification_url'] in message.get_payload(decode=True) + assert email_context['verification_url'].encode('ascii') in message.get_payload(decode=True) path = urlparse.urlsplit(email_context['verification_url'])[2] get_params = urlparse.urlsplit(email_context['verification_url'])[3] @@ -186,7 +190,7 @@ def test_register_views(test_app): email_context = template.TEMPLATE_TEST_CONTEXT[ 'mediagoblin/plugins/basic_auth/fp_verification_email.txt'] #TODO - change the name of verification_url to something forgot-password-ish - assert email_context['verification_url'] in message.get_payload(decode=True) + assert email_context['verification_url'].encode('ascii') in message.get_payload(decode=True) path = urlparse.urlsplit(email_context['verification_url'])[2] get_params = urlparse.urlsplit(email_context['verification_url'])[3] @@ -229,7 +233,6 @@ def test_register_views(test_app): assert urlparse.urlsplit(response.location)[2] == '/' assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT - def test_authentication_views(test_app): """ Test logging in and logging out @@ -305,7 +308,7 @@ def test_authentication_views(test_app): # Make sure user is in the session context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html'] session = context['request'].session - assert session['user_id'] == unicode(test_user.id) + assert session['user_id'] == six.text_type(test_user.id) # Successful logout # ----------------- @@ -332,6 +335,19 @@ def test_authentication_views(test_app): 'next' : '/u/chris/'}) assert urlparse.urlsplit(response.location)[2] == '/u/chris/' + ## Verify that username is lowercased on login attempt + template.clear_test_template_context() + response = test_app.post( + '/auth/login/', { + 'username': u'ANDREW', + 'password': 'fuselage'}) + context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html'] + form = context['login_form'] + + # Username should no longer be uppercased; it should be lowercased + assert not form.username.data == u'ANDREW' + assert form.username.data == u'andrew' + @pytest.fixture() def authentication_disabled_app(request): return get_app( diff --git a/mediagoblin/tests/test_basic_auth.py b/mediagoblin/tests/test_basic_auth.py index 828f0515..e7157bee 100644 --- a/mediagoblin/tests/test_basic_auth.py +++ b/mediagoblin/tests/test_basic_auth.py @@ -13,7 +13,8 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import urlparse + +import six.moves.urllib.parse as urlparse from mediagoblin.db.models import User from mediagoblin.plugins.basic_auth import tools as auth_tools diff --git a/mediagoblin/tests/test_edit.py b/mediagoblin/tests/test_edit.py index dc9c422f..384929cb 100644 --- a/mediagoblin/tests/test_edit.py +++ b/mediagoblin/tests/test_edit.py @@ -14,7 +14,9 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import urlparse, os, pytest +import six +import six.moves.urllib.parse as urlparse +import pytest from mediagoblin import mg_globals from mediagoblin.db.models import User, MediaEntry @@ -142,8 +144,7 @@ class TestUserEdit(object): assert message['To'] == 'new@example.com' email_context = template.TEMPLATE_TEST_CONTEXT[ 'mediagoblin/edit/verification.txt'] - assert email_context['verification_url'] in \ - message.get_payload(decode=True) + assert email_context['verification_url'].encode('ascii') in message.get_payload(decode=True) path = urlparse.urlsplit(email_context['verification_url'])[2] assert path == u'/edit/verify_email/' @@ -250,5 +251,11 @@ class TestMetaDataEdit: old_metadata = new_metadata new_metadata = media_entry.media_metadata assert new_metadata == old_metadata - assert ("u'On the worst day' is not a 'date-time'" in - response.body) + context = template.TEMPLATE_TEST_CONTEXT[ + 'mediagoblin/edit/metadata.html'] + if six.PY2: + expected = "u'On the worst day' is not a 'date-time'" + else: + expected = "'On the worst day' is not a 'date-time'" + assert context['form'].errors[ + 'media_metadata'][0]['identifier'][0] == expected diff --git a/mediagoblin/tests/test_exif.py b/mediagoblin/tests/test_exif.py index af301818..e3869ec8 100644 --- a/mediagoblin/tests/test_exif.py +++ b/mediagoblin/tests/test_exif.py @@ -20,6 +20,8 @@ try: except ImportError: import Image +from collections import OrderedDict + from mediagoblin.tools.exif import exif_fix_image_orientation, \ extract_exif, clean_exif, get_gps_data, get_useful from .resources import GOOD_JPG, EMPTY_JPG, BAD_JPG, GPS_JPG @@ -39,31 +41,32 @@ def test_exif_extraction(): gps = get_gps_data(result) # Do we have the result? - assert len(result) == 55 + assert len(result) >= 50 # Do we have clean data? - assert len(clean) == 53 + assert len(clean) >= 50 # GPS data? assert gps == {} # Do we have the "useful" tags? - assert useful == {'EXIF CVAPattern': {'field_length': 8, + + expected = OrderedDict({'EXIF CVAPattern': {'field_length': 8, 'field_offset': 26224, 'field_type': 7, - 'printable': u'[0, 2, 0, 2, 1, 2, 0, 1]', + 'printable': '[0, 2, 0, 2, 1, 2, 0, 1]', 'tag': 41730, 'values': [0, 2, 0, 2, 1, 2, 0, 1]}, 'EXIF ColorSpace': {'field_length': 2, 'field_offset': 476, 'field_type': 3, - 'printable': u'sRGB', + 'printable': 'sRGB', 'tag': 40961, 'values': [1]}, 'EXIF ComponentsConfiguration': {'field_length': 4, 'field_offset': 308, 'field_type': 7, - 'printable': u'YCbCr', + 'printable': 'YCbCr', 'tag': 37121, 'values': [1, 2, 3, 0]}, 'EXIF CompressedBitsPerPixel': {'field_length': 8, @@ -365,7 +368,10 @@ def test_exif_extraction(): 'field_type': 5, 'printable': u'300', 'tag': 283, - 'values': [[300, 1]]}} + 'values': [[300, 1]]}}) + + for key in expected.keys(): + assert useful[key] == expected[key] def test_exif_image_orientation(): @@ -379,7 +385,7 @@ def test_exif_image_orientation(): result) # Are the dimensions correct? - assert image.size == (428, 640) + assert image.size in ((428, 640), (640, 428)) # If this pixel looks right, the rest of the image probably will too. assert_in(image.getdata()[10000], diff --git a/mediagoblin/tests/test_http_callback.py b/mediagoblin/tests/test_http_callback.py deleted file mode 100644 index 64b7ee8f..00000000 --- a/mediagoblin/tests/test_http_callback.py +++ /dev/null @@ -1,83 +0,0 @@ -# GNU MediaGoblin -- federated, autonomous media hosting -# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import json - -import pytest -from urlparse import urlparse, parse_qs - -from mediagoblin import mg_globals -from mediagoblin.tools import processing -from mediagoblin.tests.tools import fixture_add_user -from mediagoblin.tests.test_submission import GOOD_PNG -from mediagoblin.tests import test_oauth2 as oauth - - -class TestHTTPCallback(object): - @pytest.fixture(autouse=True) - def setup(self, test_app): - self.test_app = test_app - - self.db = mg_globals.database - - self.user_password = u'secret' - self.user = fixture_add_user(u'call_back', self.user_password) - - self.login() - - def login(self): - self.test_app.post('/auth/login/', { - 'username': self.user.username, - 'password': self.user_password}) - - def get_access_token(self, client_id, client_secret, code): - response = self.test_app.get('/oauth-2/access_token', { - 'code': code, - 'client_id': client_id, - 'client_secret': client_secret}) - - response_data = json.loads(response.body) - - return response_data['access_token'] - - def test_callback(self): - ''' Test processing HTTP callback ''' - self.oauth = oauth.TestOAuth() - self.oauth.setup(self.test_app) - - redirect, client_id = self.oauth.test_4_authorize_confidential_client() - - code = parse_qs(urlparse(redirect.location).query)['code'][0] - - client = self.db.OAuthClient.query.filter( - self.db.OAuthClient.identifier == unicode(client_id)).first() - - client_secret = client.secret - - access_token = self.get_access_token(client_id, client_secret, code) - - callback_url = 'https://foo.example?secrettestmediagoblinparam' - - self.test_app.post('/api/submit?client_id={0}&access_token={1}\ -&client_secret={2}'.format( - client_id, - access_token, - client_secret), { - 'title': 'Test', - 'callback_url': callback_url}, - upload_files=[('file', GOOD_PNG)]) - - assert processing.TESTS_CALLBACKS[callback_url]['state'] == u'processed' diff --git a/mediagoblin/tests/test_ldap.py b/mediagoblin/tests/test_ldap.py index 7e20d059..f251d150 100644 --- a/mediagoblin/tests/test_ldap.py +++ b/mediagoblin/tests/test_ldap.py @@ -13,10 +13,16 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import urlparse + import pkg_resources import pytest -import mock +import six +try: + import mock +except ImportError: + import unittest.mock as mock + +import six.moves.urllib.parse as urlparse from mediagoblin import mg_globals from mediagoblin.db.base import Session @@ -126,6 +132,6 @@ def test_ldap_plugin(ldap_plugin_app): # Make sure user is in the session context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html'] session = context['request'].session - assert session['user_id'] == unicode(test_user.id) + assert session['user_id'] == six.text_type(test_user.id) _test_authentication() diff --git a/mediagoblin/tests/test_legacy_api.py b/mediagoblin/tests/test_legacy_api.py index 4e0cbd8f..b3b2fcec 100644 --- a/mediagoblin/tests/test_legacy_api.py +++ b/mediagoblin/tests/test_legacy_api.py @@ -17,6 +17,7 @@ import logging import base64 +import json import pytest @@ -48,10 +49,10 @@ class TestAPI(object): return template.TEMPLATE_TEST_CONTEXT[template_name] def http_auth_headers(self): - return {'Authorization': 'Basic {0}'.format( - base64.b64encode(':'.join([ + return {'Authorization': ('Basic {0}'.format( + base64.b64encode((':'.join([ self.user.username, - self.user_password])))} + self.user_password])).encode('ascii')).decode()))} def do_post(self, data, test_app, **kwargs): url = kwargs.pop('url', '/api/submit') @@ -77,8 +78,8 @@ class TestAPI(object): '/api/test', headers=self.http_auth_headers()) - assert response.body == \ - '{"username": "joapi", "email": "joapi@example.com"}' + assert json.loads(response.body.decode()) == { + "username": "joapi", "email": "joapi@example.com"} def test_2_test_submission(self, test_app): self.login(test_app) diff --git a/mediagoblin/tests/test_metadata.py b/mediagoblin/tests/test_metadata.py index b4ea646e..a10e00ec 100644 --- a/mediagoblin/tests/test_metadata.py +++ b/mediagoblin/tests/test_metadata.py @@ -56,7 +56,7 @@ class TestMetadataFunctionality: jsonld_fail_1 = None try: jsonld_fail_1 = compact_and_validate(metadata_fail_1) - except ValidationError, e: + except ValidationError as e: assert e.message == "'All Rights Reserved.' is not a 'uri'" assert jsonld_fail_1 == None #,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,., @@ -72,7 +72,7 @@ class TestMetadataFunctionality: jsonld_fail_2 = None try: jsonld_fail_2 = compact_and_validate(metadata_fail_2) - except ValidationError, e: + except ValidationError as e: assert e.message == "'The other day' is not a 'date-time'" assert jsonld_fail_2 == None diff --git a/mediagoblin/tests/test_mgoblin_app.ini b/mediagoblin/tests/test_mgoblin_app.ini index 4cd3d9b6..6ac64321 100644 --- a/mediagoblin/tests/test_mgoblin_app.ini +++ b/mediagoblin/tests/test_mgoblin_app.ini @@ -31,10 +31,11 @@ BROKER_URL = "sqlite:///%(here)s/test_user_dev/kombu.db" [plugins] [[mediagoblin.plugins.api]] -[[mediagoblin.plugins.oauth]] [[mediagoblin.plugins.httpapiauth]] [[mediagoblin.plugins.piwigo]] [[mediagoblin.plugins.basic_auth]] [[mediagoblin.plugins.openid]] [[mediagoblin.media_types.image]] +[[mediagoblin.media_types.video]] +[[mediagoblin.media_types.audio]] [[mediagoblin.media_types.pdf]] diff --git a/mediagoblin/tests/test_modelmethods.py b/mediagoblin/tests/test_modelmethods.py index 32d5dce0..1ab0c476 100644 --- a/mediagoblin/tests/test_modelmethods.py +++ b/mediagoblin/tests/test_modelmethods.py @@ -17,13 +17,20 @@ # Maybe not every model needs a test, but some models have special # methods, and so it makes sense to test them here. +from __future__ import print_function + from mediagoblin.db.base import Session -from mediagoblin.db.models import MediaEntry, User, Privilege +from mediagoblin.db.models import MediaEntry, User, Privilege, Activity, \ + Generator from mediagoblin.tests import MGClientTestCase -from mediagoblin.tests.tools import fixture_add_user +from mediagoblin.tests.tools import fixture_add_user, fixture_media_entry, \ + fixture_add_activity -import mock +try: + import mock +except ImportError: + import unittest.mock as mock import pytest @@ -202,7 +209,7 @@ def test_media_data_init(test_app): obj_in_session = 0 for obj in Session(): obj_in_session += 1 - print repr(obj) + print(repr(obj)) assert obj_in_session == 0 @@ -225,3 +232,55 @@ class TestUserUrlForSelf(MGClientTestCase): self.user(u'lindsay').url_for_self(fake_urlgen()) assert excinfo.errisinstance(TypeError) assert 'object is not callable' in str(excinfo) + +class TestActivitySetGet(object): + """ Test methods on the Activity and ActivityIntermediator models """ + + @pytest.fixture(autouse=True) + def setup(self, test_app): + self.app = test_app + self.user = fixture_add_user() + self.obj = fixture_media_entry() + self.target = fixture_media_entry() + + def test_set_activity_object(self): + """ Activity.set_object should produce ActivityIntermediator """ + # The fixture will set self.obj as the object on the activity. + activity = fixture_add_activity(self.obj, actor=self.user) + + # Assert the media has been associated with an AI + assert self.obj.activity is not None + + # Assert the AI on the media and object are the same + assert activity.object == self.obj.activity + + def test_activity_set_target(self): + """ Activity.set_target should produce ActivityIntermediator """ + # This should set everything needed on the target + activity = fixture_add_activity(self.obj, actor=self.user) + activity.set_target(self.target) + + # Assert the media has been associated with the AI + assert self.target.activity is not None + + # assert the AI on the media and target are the same + assert activity.target == self.target.activity + + def test_get_activity_object(self): + """ Activity.get_object should return a set object """ + activity = fixture_add_activity(self.obj, actor=self.user) + + print("self.obj.activity = {0}".format(self.obj.activity)) + + # check we now can get the object + assert activity.get_object is not None + assert activity.get_object.id == self.obj.id + + def test_get_activity_target(self): + """ Activity.set_target should return a set target """ + activity = fixture_add_activity(self.obj, actor=self.user) + activity.set_target(self.target) + + # check we can get the target + assert activity.get_target is not None + assert activity.get_target.id == self.target.id diff --git a/mediagoblin/tests/test_notifications.py b/mediagoblin/tests/test_notifications.py index 3bf36f5f..385da569 100644 --- a/mediagoblin/tests/test_notifications.py +++ b/mediagoblin/tests/test_notifications.py @@ -16,7 +16,7 @@ import pytest -import urlparse +import six.moves.urllib.parse as urlparse from mediagoblin.tools import template, mail @@ -135,13 +135,13 @@ otherperson@example.com\n\nSGkgb3RoZXJwZXJzb24sCmNocmlzIGNvbW1lbnRlZCBvbiB5b3VyI self.logout() self.login('otherperson', 'nosreprehto') - self.test_app.get(media_uri_slug + '/c/{0}/'.format(comment_id)) + self.test_app.get(media_uri_slug + 'c/{0}/'.format(comment_id)) notification = Notification.query.filter_by(id=notification_id).first() assert notification.seen == True - self.test_app.get(media_uri_slug + '/notifications/silence/') + self.test_app.get(media_uri_slug + 'notifications/silence/') subscription = CommentSubscription.query.filter_by(id=subscription_id)\ .first() diff --git a/mediagoblin/tests/test_oauth1.py b/mediagoblin/tests/test_oauth1.py index 568036e5..e41a68c7 100644 --- a/mediagoblin/tests/test_oauth1.py +++ b/mediagoblin/tests/test_oauth1.py @@ -14,10 +14,9 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import cgi - import pytest -from urlparse import parse_qs, urlparse + +from six.moves.urllib.parse import parse_qs, urlparse from oauthlib.oauth1 import Client @@ -73,7 +72,7 @@ class TestOAuth(object): "application_name": "Testificate MD", "application_type": "web", "contacts": "someone@someplace.com tuteo@tsengeo.lu", - "logo_url": "http://ayrel.com/utral.png", + "logo_uri": "http://ayrel.com/utral.png", "redirect_uris": "http://navi-kosman.lu http://gmg-yawne-oeru.lu", } @@ -86,7 +85,7 @@ class TestOAuth(object): assert client.secret == client_info["client_secret"] assert client.application_type == query["application_type"] assert client.redirect_uri == query["redirect_uris"].split() - assert client.logo_url == query["logo_url"] + assert client.logo_url == query["logo_uri"] assert client.contacts == query["contacts"].split() @@ -103,7 +102,7 @@ class TestOAuth(object): "type": "client_update", "application_name": "neytiri", "contacts": "someone@someplace.com abc@cba.com", - "logo_url": "http://place.com/picture.png", + "logo_uri": "http://place.com/picture.png", "application_type": "web", "redirect_uris": "http://blah.gmg/whatever https://inboxen.org/", } @@ -118,7 +117,7 @@ class TestOAuth(object): assert client.application_type == update_query["application_type"] assert client.application_name == update_query["application_name"] assert client.contacts == update_query["contacts"].split() - assert client.logo_url == update_query["logo_url"] + assert client.logo_url == update_query["logo_uri"] assert client.redirect_uri == update_query["redirect_uris"].split() def to_authorize_headers(self, data): @@ -146,7 +145,7 @@ class TestOAuth(object): headers["Content-Type"] = self.MIME_FORM response = self.test_app.post(endpoint, headers=headers) - response = cgi.parse_qs(response.body) + response = parse_qs(response.body.decode()) # each element is a list, reduce it to a string for key, value in response.items(): diff --git a/mediagoblin/tests/test_oauth2.py b/mediagoblin/tests/test_oauth2.py deleted file mode 100644 index 957f4e65..00000000 --- a/mediagoblin/tests/test_oauth2.py +++ /dev/null @@ -1,223 +0,0 @@ -# GNU MediaGoblin -- federated, autonomous media hosting -# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import json -import logging - -import pytest -from urlparse import parse_qs, urlparse - -from mediagoblin import mg_globals -from mediagoblin.tools import template, pluginapi -from mediagoblin.tests.tools import fixture_add_user - - -_log = logging.getLogger(__name__) - - -class TestOAuth(object): - @pytest.fixture(autouse=True) - def setup(self, test_app): - self.test_app = test_app - - self.db = mg_globals.database - - self.pman = pluginapi.PluginManager() - - self.user_password = u'4cc355_70k3N' - self.user = fixture_add_user(u'joauth', self.user_password, - privileges=[u'active']) - - self.login() - - def login(self): - self.test_app.post( - '/auth/login/', { - 'username': self.user.username, - 'password': self.user_password}) - - def register_client(self, name, client_type, description=None, - redirect_uri=''): - return self.test_app.post( - '/oauth-2/client/register', { - 'name': name, - 'description': description, - 'type': client_type, - 'redirect_uri': redirect_uri}) - - def get_context(self, template_name): - return template.TEMPLATE_TEST_CONTEXT[template_name] - - def test_1_public_client_registration_without_redirect_uri(self): - ''' Test 'public' OAuth client registration without any redirect uri ''' - response = self.register_client( - u'OMGOMGOMG', 'public', 'OMGOMG Apache License v2') - - ctx = self.get_context('oauth/client/register.html') - - client = self.db.OAuthClient.query.filter( - self.db.OAuthClient.name == u'OMGOMGOMG').first() - - assert response.status_int == 200 - - # Should display an error - assert len(ctx['form'].redirect_uri.errors) - - # Should not pass through - assert not client - - def test_2_successful_public_client_registration(self): - ''' Successfully register a public client ''' - uri = 'http://foo.example' - self.register_client( - u'OMGOMG', 'public', 'OMG!', uri) - - client = self.db.OAuthClient.query.filter( - self.db.OAuthClient.name == u'OMGOMG').first() - - # redirect_uri should be set - assert client.redirect_uri == uri - - # Client should have been registered - assert client - - def test_3_successful_confidential_client_reg(self): - ''' Register a confidential OAuth client ''' - response = self.register_client( - u'GMOGMO', 'confidential', 'NO GMO!') - - assert response.status_int == 302 - - client = self.db.OAuthClient.query.filter( - self.db.OAuthClient.name == u'GMOGMO').first() - - # Client should have been registered - assert client - - return client - - def test_4_authorize_confidential_client(self): - ''' Authorize a confidential client as a logged in user ''' - client = self.test_3_successful_confidential_client_reg() - - client_identifier = client.identifier - - redirect_uri = 'https://foo.example' - response = self.test_app.get('/oauth-2/authorize', { - 'client_id': client.identifier, - 'scope': 'all', - 'redirect_uri': redirect_uri}) - - # User-agent should NOT be redirected - assert response.status_int == 200 - - ctx = self.get_context('oauth/authorize.html') - - form = ctx['form'] - - # Short for client authorization post reponse - capr = self.test_app.post( - '/oauth-2/client/authorize', { - 'client_id': form.client_id.data, - 'allow': 'Allow', - 'next': form.next.data}) - - assert capr.status_int == 302 - - authorization_response = capr.follow() - - assert authorization_response.location.startswith(redirect_uri) - - return authorization_response, client_identifier - - def get_code_from_redirect_uri(self, uri): - ''' Get the value of ?code= from an URI ''' - return parse_qs(urlparse(uri).query)['code'][0] - - def test_token_endpoint_successful_confidential_request(self): - ''' Successful request against token endpoint ''' - code_redirect, client_id = self.test_4_authorize_confidential_client() - - code = self.get_code_from_redirect_uri(code_redirect.location) - - client = self.db.OAuthClient.query.filter( - self.db.OAuthClient.identifier == unicode(client_id)).first() - - token_res = self.test_app.get('/oauth-2/access_token?client_id={0}&\ -code={1}&client_secret={2}'.format(client_id, code, client.secret)) - - assert token_res.status_int == 200 - - token_data = json.loads(token_res.body) - - assert not 'error' in token_data - assert 'access_token' in token_data - assert 'token_type' in token_data - assert 'expires_in' in token_data - assert type(token_data['expires_in']) == int - assert token_data['expires_in'] > 0 - - # There should be a refresh token provided in the token data - assert len(token_data['refresh_token']) - - return client_id, token_data - - def test_token_endpont_missing_id_confidential_request(self): - ''' Unsuccessful request against token endpoint, missing client_id ''' - code_redirect, client_id = self.test_4_authorize_confidential_client() - - code = self.get_code_from_redirect_uri(code_redirect.location) - - client = self.db.OAuthClient.query.filter( - self.db.OAuthClient.identifier == unicode(client_id)).first() - - token_res = self.test_app.get('/oauth-2/access_token?\ -code={0}&client_secret={1}'.format(code, client.secret)) - - assert token_res.status_int == 200 - - token_data = json.loads(token_res.body) - - assert 'error' in token_data - assert not 'access_token' in token_data - assert token_data['error'] == 'invalid_request' - assert len(token_data['error_description']) - - def test_refresh_token(self): - ''' Try to get a new access token using the refresh token ''' - # Get an access token and a refresh token - client_id, token_data =\ - self.test_token_endpoint_successful_confidential_request() - - client = self.db.OAuthClient.query.filter( - self.db.OAuthClient.identifier == client_id).first() - - token_res = self.test_app.get('/oauth-2/access_token', - {'refresh_token': token_data['refresh_token'], - 'client_id': client_id, - 'client_secret': client.secret - }) - - assert token_res.status_int == 200 - - new_token_data = json.loads(token_res.body) - - assert not 'error' in new_token_data - assert 'access_token' in new_token_data - assert 'token_type' in new_token_data - assert 'expires_in' in new_token_data - assert type(new_token_data['expires_in']) == int - assert new_token_data['expires_in'] > 0 diff --git a/mediagoblin/tests/test_openid.py b/mediagoblin/tests/test_openid.py index 0424fdda..a3ab176a 100644 --- a/mediagoblin/tests/test_openid.py +++ b/mediagoblin/tests/test_openid.py @@ -14,10 +14,14 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import urlparse import pkg_resources import pytest -import mock +import six +import six.moves.urllib.parse as urlparse +try: + import mock +except ImportError: + import unittest.mock as mock openid_consumer = pytest.importorskip( "openid.consumer.consumer") @@ -206,7 +210,7 @@ class TestOpenIDPlugin(object): # Make sure user is in the session context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html'] session = context['request'].session - assert session['user_id'] == unicode(test_user.id) + assert session['user_id'] == six.text_type(test_user.id) _test_new_user() diff --git a/mediagoblin/tests/test_paste.ini b/mediagoblin/tests/test_paste.ini index a9595432..8d75c3cb 100644 --- a/mediagoblin/tests/test_paste.ini +++ b/mediagoblin/tests/test_paste.ini @@ -1,40 +1,18 @@ [DEFAULT] debug = true -[composite:main] -use = egg:Paste#urlmap -/ = mediagoblin -/mgoblin_media/ = publicstore_serve -/test_static/ = mediagoblin_static -/theme_static/ = theme_static -/plugin_static/ = plugin_static - -[app:mediagoblin] +[app:main] use = egg:mediagoblin#app config = %(here)s/mediagoblin.ini - -[app:publicstore_serve] -use = egg:Paste#static -document_root = %(here)s/user_dev/media/public - -[app:mediagoblin_static] -use = egg:Paste#static -document_root = %(here)s/mediagoblin/static/ - -[app:theme_static] -use = egg:Paste#static -document_root = %(here)s/user_dev/theme_static/ -cache_max_age = 86400 - -[app:plugin_static] -use = egg:Paste#static -document_root = %(here)s/user_dev/plugin_static/ -cache_max_age = 86400 +/mgoblin_media = %(here)s/user_dev/media/public +/test_static = %(here)s/mediagoblin/static +/theme_static = %(here)s/user_dev/theme_static +/plugin_static = %(here)s/user_dev/plugin_static [celery] CELERY_ALWAYS_EAGER = true [server:main] -use = egg:Paste#http +use = egg:gunicorn host = 127.0.0.1 port = 6543 diff --git a/mediagoblin/tests/test_pdf.py b/mediagoblin/tests/test_pdf.py index b4d1940a..7107dc9a 100644 --- a/mediagoblin/tests/test_pdf.py +++ b/mediagoblin/tests/test_pdf.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import collections import tempfile import shutil import os @@ -21,19 +22,25 @@ import pytest from mediagoblin.media_types.pdf.processing import ( pdf_info, check_prerequisites, create_pdf_thumb) -from .resources import GOOD_PDF as GOOD +from .resources import GOOD_PDF -@pytest.mark.skipif("not check_prerequisites()") +@pytest.mark.skipif("not os.path.exists(GOOD_PDF) or not check_prerequisites()") def test_pdf(): - good_dict = {'pdf_version_major': 1, 'pdf_title': '', - 'pdf_page_size_width': 612, 'pdf_author': '', - 'pdf_keywords': '', 'pdf_pages': 10, - 'pdf_producer': 'dvips + GNU Ghostscript 7.05', - 'pdf_version_minor': 3, - 'pdf_creator': 'LaTeX with hyperref package', - 'pdf_page_size_height': 792} - assert pdf_info(GOOD) == good_dict + expected_dict = {'pdf_author': -1, + 'pdf_creator': -1, + 'pdf_keywords': -1, + 'pdf_page_size_height': -1, + 'pdf_page_size_width': -1, + 'pdf_pages': -1, + 'pdf_producer': -1, + 'pdf_title': -1, + 'pdf_version_major': 1, + 'pdf_version_minor': -1} + good_info = pdf_info(GOOD_PDF) + for k, v in expected_dict.items(): + assert(k in good_info) + assert(v == -1 or v == good_info[k]) temp_dir = tempfile.mkdtemp() - create_pdf_thumb(GOOD, os.path.join(temp_dir, 'good_256_256.png'), 256, 256) + create_pdf_thumb(GOOD_PDF, os.path.join(temp_dir, 'good_256_256.png'), 256, 256) shutil.rmtree(temp_dir) diff --git a/mediagoblin/tests/test_persona.py b/mediagoblin/tests/test_persona.py index a1cd30eb..a8466b8a 100644 --- a/mediagoblin/tests/test_persona.py +++ b/mediagoblin/tests/test_persona.py @@ -13,10 +13,16 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import urlparse + import pkg_resources import pytest -import mock +import six +try: + import mock +except ImportError: + import unittest.mock as mock + +import six.moves.urllib.parse as urlparse pytest.importorskip("requests") @@ -140,7 +146,7 @@ class TestPersonaPlugin(object): # Make sure user is in the session context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html'] session = context['request'].session - assert session['user_id'] == unicode(test_user.id) + assert session['user_id'] == six.text_type(test_user.id) _test_registration() diff --git a/mediagoblin/tests/test_piwigo.py b/mediagoblin/tests/test_piwigo.py index 16ad0111..33aea580 100644 --- a/mediagoblin/tests/test_piwigo.py +++ b/mediagoblin/tests/test_piwigo.py @@ -44,28 +44,23 @@ class Test_PWG(object): def test_session(self): resp = self.do_post("pwg.session.login", {"username": u"nouser", "password": "wrong"}) - assert resp.body == XML_PREFIX \ - + '<rsp stat="fail"><err code="999" msg="Invalid username/password"/></rsp>' + assert resp.body == (XML_PREFIX + '<rsp stat="fail"><err code="999" msg="Invalid username/password"/></rsp>').encode('ascii') resp = self.do_post("pwg.session.login", {"username": self.username, "password": "wrong"}) - assert resp.body == XML_PREFIX \ - + '<rsp stat="fail"><err code="999" msg="Invalid username/password"/></rsp>' + assert resp.body == (XML_PREFIX + '<rsp stat="fail"><err code="999" msg="Invalid username/password"/></rsp>').encode('ascii') resp = self.do_get("pwg.session.getStatus") - assert resp.body == XML_PREFIX \ - + '<rsp stat="ok"><username>guest</username></rsp>' + assert resp.body == (XML_PREFIX + '<rsp stat="ok"><username>guest</username></rsp>').encode('ascii') resp = self.do_post("pwg.session.login", {"username": self.username, "password": self.password}) - assert resp.body == XML_PREFIX + '<rsp stat="ok">1</rsp>' + assert resp.body == (XML_PREFIX + '<rsp stat="ok">1</rsp>').encode('ascii') resp = self.do_get("pwg.session.getStatus") - assert resp.body == XML_PREFIX \ - + '<rsp stat="ok"><username>chris</username></rsp>' + assert resp.body == (XML_PREFIX + '<rsp stat="ok"><username>chris</username></rsp>').encode('ascii') self.do_get("pwg.session.logout") resp = self.do_get("pwg.session.getStatus") - assert resp.body == XML_PREFIX \ - + '<rsp stat="ok"><username>guest</username></rsp>' + assert resp.body == (XML_PREFIX + '<rsp stat="ok"><username>guest</username></rsp>').encode('ascii') diff --git a/mediagoblin/tests/test_pluginapi.py b/mediagoblin/tests/test_pluginapi.py index eae0ce15..2fd6df39 100644 --- a/mediagoblin/tests/test_pluginapi.py +++ b/mediagoblin/tests/test_pluginapi.py @@ -224,7 +224,7 @@ def test_hook_handle(): assert pluginapi.hook_handle( "nothing_handling", call_log, unhandled_okay=True) is None assert call_log == [] - + # Multiple provided, go with the first! call_log = [] assert pluginapi.hook_handle( @@ -348,7 +348,7 @@ def test_modify_context(context_modified_app): """ # Specific thing passed into a page result = context_modified_app.get("/modify_context/specific/") - assert result.body.strip() == """Specific page! + assert result.body.strip() == b"""Specific page! specific thing: in yer specificpage global thing: globally appended! @@ -357,7 +357,7 @@ doubleme: happyhappy""" # General test, should have global context variable only result = context_modified_app.get("/modify_context/") - assert result.body.strip() == """General page! + assert result.body.strip() == b"""General page! global thing: globally appended! lol: cats @@ -421,7 +421,7 @@ def test_plugin_assetlink(static_plugin_app): junk_file_path = os.path.join( linked_assets_dir.rstrip(os.path.sep), 'junk.txt') - with file(junk_file_path, 'w') as junk_file: + with open(junk_file_path, 'w') as junk_file: junk_file.write('barf') os.unlink(plugin_link_dir) @@ -440,14 +440,14 @@ to: # link dir exists, but is a non-symlink os.unlink(plugin_link_dir) - with file(plugin_link_dir, 'w') as clobber_file: + with open(plugin_link_dir, 'w') as clobber_file: clobber_file.write('clobbered!') result = run_assetlink().collection[0] assert result == 'Could not link "staticstuff": %s exists and is not a symlink\n' % ( plugin_link_dir) - with file(plugin_link_dir, 'r') as clobber_file: + with open(plugin_link_dir, 'r') as clobber_file: assert clobber_file.read() == 'clobbered!' @@ -456,11 +456,10 @@ def test_plugin_staticdirect(static_plugin_app): Test that the staticdirect utilities pull up the right things """ result = json.loads( - static_plugin_app.get('/staticstuff/').body) + static_plugin_app.get('/staticstuff/').body.decode()) assert len(result) == 2 assert result['mgoblin_bunny_pic'] == '/test_static/images/bunny_pic.png' assert result['plugin_bunny_css'] == \ '/plugin_static/staticstuff/css/bunnify.css' - diff --git a/mediagoblin/tests/test_privileges.py b/mediagoblin/tests/test_privileges.py index 05829b34..8ea3d754 100644 --- a/mediagoblin/tests/test_privileges.py +++ b/mediagoblin/tests/test_privileges.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import six import pytest from datetime import date, timedelta from webtest import AppError @@ -79,7 +80,7 @@ class TestPrivilegeFunctionality: response = self.test_app.get('/') assert response.status == "200 OK" - assert "You are Banned" in response.body + assert b"You are Banned" in response.body # Then test what happens when that ban has an expiration date which # hasn't happened yet #---------------------------------------------------------------------- @@ -92,7 +93,7 @@ class TestPrivilegeFunctionality: response = self.test_app.get('/') assert response.status == "200 OK" - assert "You are Banned" in response.body + assert b"You are Banned" in response.body # Then test what happens when that ban has an expiration date which # has already happened @@ -107,7 +108,7 @@ class TestPrivilegeFunctionality: response = self.test_app.get('/') assert response.status == "302 FOUND" - assert not "You are Banned" in response.body + assert not b"You are Banned" in response.body def testVariousPrivileges(self): # The various actions that require privileges (ex. reporting, @@ -127,14 +128,16 @@ class TestPrivilegeFunctionality: #---------------------------------------------------------------------- with pytest.raises(AppError) as excinfo: response = self.test_app.get('/submit/') - assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii') + assert b'Bad response: 403 FORBIDDEN' in excinfo with pytest.raises(AppError) as excinfo: response = self.do_post({'upload_files':[('file',GOOD_JPG)], 'title':u'Normal Upload 1'}, url='/submit/') - assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii') + assert b'Bad response: 403 FORBIDDEN' in excinfo # Test that a user cannot comment without the commenter privilege #---------------------------------------------------------------------- @@ -149,50 +152,58 @@ class TestPrivilegeFunctionality: media_uri_slug = '/u/{0}/m/{1}/'.format(self.admin_user.username, media_entry.slug) response = self.test_app.get(media_uri_slug) - assert not "Add a comment" in response.body + assert not b"Add a comment" in response.body self.query_for_users() with pytest.raises(AppError) as excinfo: response = self.test_app.post( media_uri_id + 'comment/add/', {'comment_content': u'Test comment #42'}) - assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii') + assert b'Bad response: 403 FORBIDDEN' in excinfo # Test that a user cannot report without the reporter privilege #---------------------------------------------------------------------- with pytest.raises(AppError) as excinfo: response = self.test_app.get(media_uri_slug+"report/") - assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii') + assert b'Bad response: 403 FORBIDDEN' in excinfo with pytest.raises(AppError) as excinfo: response = self.do_post( {'report_reason':u'Testing Reports #1', 'reporter_id':u'3'}, url=(media_uri_slug+"report/")) - assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii') + assert b'Bad response: 403 FORBIDDEN' in excinfo # Test that a user cannot access the moderation pages w/o moderator # or admin privileges #---------------------------------------------------------------------- with pytest.raises(AppError) as excinfo: response = self.test_app.get("/mod/users/") - assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii') + assert b'Bad response: 403 FORBIDDEN' in excinfo with pytest.raises(AppError) as excinfo: response = self.test_app.get("/mod/reports/") - assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii') + assert b'Bad response: 403 FORBIDDEN' in excinfo with pytest.raises(AppError) as excinfo: response = self.test_app.get("/mod/media/") - assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii') + assert b'Bad response: 403 FORBIDDEN' in excinfo with pytest.raises(AppError) as excinfo: response = self.test_app.get("/mod/users/1/") - assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii') + assert b'Bad response: 403 FORBIDDEN' in excinfo with pytest.raises(AppError) as excinfo: response = self.test_app.get("/mod/reports/1/") - assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii') + assert b'Bad response: 403 FORBIDDEN' in excinfo self.query_for_users() @@ -202,4 +213,5 @@ class TestPrivilegeFunctionality: 'targeted_user':self.admin_user.id}, url='/mod/reports/1/') self.query_for_users() - assert 'Bad response: 403 FORBIDDEN' in str(excinfo) + excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii') + assert b'Bad response: 403 FORBIDDEN' in excinfo diff --git a/mediagoblin/tests/test_reporting.py b/mediagoblin/tests/test_reporting.py index a154a061..6a9fe205 100644 --- a/mediagoblin/tests/test_reporting.py +++ b/mediagoblin/tests/test_reporting.py @@ -15,6 +15,7 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import pytest +import six from mediagoblin.tools import template from mediagoblin.tests.tools import (fixture_add_user, fixture_media_entry, @@ -75,7 +76,7 @@ class TestReportFiling: response, context = self.do_post( {'report_reason':u'Testing Media Report', - 'reporter_id':unicode(allie_id)},url= media_uri_slug + "report/") + 'reporter_id':six.text_type(allie_id)},url= media_uri_slug + "report/") assert response.status == "302 FOUND" @@ -110,7 +111,7 @@ class TestReportFiling: response, context = self.do_post({ 'report_reason':u'Testing Comment Report', - 'reporter_id':unicode(allie_id)},url= comment_uri_slug + "report/") + 'reporter_id':six.text_type(allie_id)},url= comment_uri_slug + "report/") assert response.status == "302 FOUND" diff --git a/mediagoblin/tests/test_sql_migrations.py b/mediagoblin/tests/test_sql_migrations.py index 3d67fdf6..7e0569ad 100644 --- a/mediagoblin/tests/test_sql_migrations.py +++ b/mediagoblin/tests/test_sql_migrations.py @@ -14,6 +14,11 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import six +import pytest + +pytestmark = pytest.mark.skipif(six.PY3, reason='needs sqlalchemy.migrate') + import copy from sqlalchemy import ( @@ -23,7 +28,8 @@ from sqlalchemy import ( from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.sql import select, insert -from migrate import changeset +if six.PY2: + from migrate import changeset from mediagoblin.db.base import GMGTableBase from mediagoblin.db.migration_tools import MigrationManager, RegisterMigration @@ -190,7 +196,7 @@ def level_exits_new_table(db_conn): for level in result: - for exit_name, to_level in level['exits'].iteritems(): + for exit_name, to_level in six.iteritems(level['exits']): # Insert the level exit db_conn.execute( level_exits.insert().values( diff --git a/mediagoblin/tests/test_storage.py b/mediagoblin/tests/test_storage.py index f6f1d18f..5cb1672b 100644 --- a/mediagoblin/tests/test_storage.py +++ b/mediagoblin/tests/test_storage.py @@ -19,6 +19,8 @@ import os import tempfile import pytest +import six + from werkzeug.utils import secure_filename from mediagoblin import storage @@ -45,7 +47,7 @@ def test_clean_listy_filepath(): storage.clean_listy_filepath(['../../', 'linooks.jpg']) -class FakeStorageSystem(): +class FakeStorageSystem(object): def __init__(self, foobie, blech, **kwargs): self.foobie = foobie self.blech = blech @@ -78,8 +80,8 @@ def test_storage_system_from_config(): 'mediagoblin.tests.test_storage:FakeStorageSystem'}) assert this_storage.foobie == 'eiboof' assert this_storage.blech == 'hcelb' - assert unicode(this_storage.__class__) == \ - u'mediagoblin.tests.test_storage.FakeStorageSystem' + assert six.text_type(this_storage.__class__) == \ + u"<class 'mediagoblin.tests.test_storage.FakeStorageSystem'>" ########################## @@ -172,7 +174,7 @@ def test_basic_storage_get_file(): with this_storage.get_file(filepath, 'r') as our_file: assert our_file.read() == 'First file' assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) - with file(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file: + with open(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file: assert our_file.read() == 'First file' # Write to the same path but try to get a unique file. @@ -184,13 +186,13 @@ def test_basic_storage_get_file(): with this_storage.get_file(new_filepath, 'r') as our_file: assert our_file.read() == 'Second file' assert os.path.exists(os.path.join(tmpdir, *new_filepath)) - with file(os.path.join(tmpdir, *new_filepath), 'r') as our_file: + with open(os.path.join(tmpdir, *new_filepath), 'r') as our_file: assert our_file.read() == 'Second file' # Read from an existing file manually_written_file = os.makedirs( os.path.join(tmpdir, 'testydir')) - with file(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile: + with open(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile: testyfile.write('testy file! so testy.') with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile: @@ -286,7 +288,7 @@ def test_basic_storage_copy_locally(): this_storage.copy_locally(filepath, new_file_dest) this_storage.delete_file(filepath) - assert file(new_file_dest).read() == 'Testing this file' + assert open(new_file_dest).read() == 'Testing this file' os.remove(new_file_dest) os.rmdir(dest_tmpdir) @@ -295,7 +297,7 @@ def test_basic_storage_copy_locally(): def _test_copy_local_to_storage_works(tmpdir, this_storage): local_filename = tempfile.mktemp() - with file(local_filename, 'w') as tmpfile: + with open(local_filename, 'w') as tmpfile: tmpfile.write('haha') this_storage.copy_local_to_storage( @@ -303,7 +305,7 @@ def _test_copy_local_to_storage_works(tmpdir, this_storage): os.remove(local_filename) - assert file( + assert open( os.path.join(tmpdir, 'dir1/dir2/copiedto.txt'), 'r').read() == 'haha' diff --git a/mediagoblin/tests/test_submission.py b/mediagoblin/tests/test_submission.py index b5b13ed3..65c4b3a3 100644 --- a/mediagoblin/tests/test_submission.py +++ b/mediagoblin/tests/test_submission.py @@ -14,15 +14,26 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import sys -reload(sys) -sys.setdefaultencoding('utf-8') +import six + +if six.PY2: # this hack only work in Python 2 + import sys + reload(sys) + sys.setdefaultencoding('utf-8') -import urlparse import os import pytest +import six.moves.urllib.parse as urlparse + +# this gst initialization stuff is really required here +import gi +gi.require_version('Gst', '1.0') +from gi.repository import Gst +Gst.init(None) + from mediagoblin.tests.tools import fixture_add_user +from .media_tools import create_av from mediagoblin import mg_globals from mediagoblin.db.models import MediaEntry, User from mediagoblin.db.base import Session @@ -34,7 +45,7 @@ from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \ BIG_BLUE, GOOD_PDF, GPS_JPG, MED_PNG, BIG_PNG GOOD_TAG_STRING = u'yin,yang' -BAD_TAG_STRING = unicode('rage,' + 'f' * 26 + 'u' * 26) +BAD_TAG_STRING = six.text_type('rage,' + 'f' * 26 + 'u' * 26) FORM_CONTEXT = ['mediagoblin/submit/start.html', 'submit_form'] REQUEST_CONTEXT = ['mediagoblin/user_pages/user.html', 'request'] @@ -145,7 +156,7 @@ class TestSubmission: def test_normal_png(self): self.check_normal_upload(u'Normal upload 2', GOOD_PNG) - @pytest.mark.skipif("not pdf_check_prerequisites()") + @pytest.mark.skipif("not os.path.exists(GOOD_PDF) or not pdf_check_prerequisites()") def test_normal_pdf(self): response, context = self.do_post({'title': u'Normal upload 3 (pdf)'}, do_follow=True, @@ -359,7 +370,19 @@ class TestSubmission: def test_media_data(self): self.check_normal_upload(u"With GPS data", GPS_JPG) media = self.check_media(None, {"title": u"With GPS data"}, 1) - assert media.media_data.gps_latitude == 59.336666666666666 + assert media.get_location.position["latitude"] == 59.336666666666666 + + def test_audio(self): + with create_av(make_audio=True) as path: + self.check_normal_upload('Audio', path) + + def test_video(self): + with create_av(make_video=True) as path: + self.check_normal_upload('Video', path) + + def test_audio_and_video(self): + with create_av(make_audio=True, make_video=True) as path: + self.check_normal_upload('Audio and Video', path) def test_processing(self): public_store_dir = mg_globals.global_config[ diff --git a/mediagoblin/tests/test_submission/good.pdf b/mediagoblin/tests/test_submission/good.pdf Binary files differindex ab5db006..d7029f36 100644..120000 --- a/mediagoblin/tests/test_submission/good.pdf +++ b/mediagoblin/tests/test_submission/good.pdf diff --git a/mediagoblin/tests/test_timesince.py b/mediagoblin/tests/test_timesince.py index 6579eb09..99ae31da 100644 --- a/mediagoblin/tests/test_timesince.py +++ b/mediagoblin/tests/test_timesince.py @@ -16,7 +16,7 @@ from datetime import datetime, timedelta -from mediagoblin.tools.timesince import is_aware, timesince +from mediagoblin.tools.timesince import timesince def test_timesince(): diff --git a/mediagoblin/tests/test_util.py b/mediagoblin/tests/test_util.py index 36563e75..8193233f 100644 --- a/mediagoblin/tests/test_util.py +++ b/mediagoblin/tests/test_util.py @@ -14,12 +14,17 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import mock +try: + import mock +except ImportError: + import unittest.mock as mock import email import pytest import smtplib import pkg_resources +import six + from mediagoblin.tests.tools import get_app from mediagoblin.tools import common, url, translate, mail, text, testing @@ -57,7 +62,7 @@ I hope you like unit tests JUST AS MUCH AS I DO!""") assert message['From'] == "sender@mediagoblin.example.org" assert message['To'] == "amanda@example.org, akila@example.org" assert message['Subject'] == "Testing is so much fun!" - assert message.get_payload(decode=True) == """HAYYY GUYS! + assert message.get_payload(decode=True) == b"""HAYYY GUYS! I hope you like unit tests JUST AS MUCH AS I DO!""" @@ -70,7 +75,7 @@ I hope you like unit tests JUST AS MUCH AS I DO!""" assert mbox_message['From'] == "sender@mediagoblin.example.org" assert mbox_message['To'] == "amanda@example.org, akila@example.org" assert mbox_message['Subject'] == "Testing is so much fun!" - assert mbox_message.get_payload(decode=True) == """HAYYY GUYS! + assert mbox_message.get_payload(decode=True) == b"""HAYYY GUYS! I hope you like unit tests JUST AS MUCH AS I DO!""" @@ -144,13 +149,13 @@ def test_gettext_lazy_proxy(): orig = u"Password" set_thread_locale("es") - p1 = unicode(proxy) + p1 = six.text_type(proxy) p1_should = pass_to_ugettext(orig) assert p1_should != orig, "Test useless, string not translated" assert p1 == p1_should set_thread_locale("sv") - p2 = unicode(proxy) + p2 = six.text_type(proxy) p2_should = pass_to_ugettext(orig) assert p2_should != orig, "Test broken, string not translated" assert p2 == p2_should diff --git a/mediagoblin/tests/test_video.py b/mediagoblin/tests/test_video.py new file mode 100644 index 00000000..79244515 --- /dev/null +++ b/mediagoblin/tests/test_video.py @@ -0,0 +1,132 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import tempfile +import os +from contextlib import contextmanager +import imghdr + +#os.environ['GST_DEBUG'] = '4,python:4' +import pytest +pytest.importorskip("gi.repository.Gst") + +import gi +gi.require_version('Gst', '1.0') +from gi.repository import Gst +Gst.init(None) + +from mediagoblin.media_types.video.transcoders import (capture_thumb, + VideoTranscoder) +from mediagoblin.media_types.tools import discover + +@contextmanager +def create_data(suffix=None, make_audio=False): + video = tempfile.NamedTemporaryFile() + src = Gst.ElementFactory.make('videotestsrc', None) + src.set_property('num-buffers', 10) + videorate = Gst.ElementFactory.make('videorate', None) + enc = Gst.ElementFactory.make('theoraenc', None) + mux = Gst.ElementFactory.make('oggmux', None) + dst = Gst.ElementFactory.make('filesink', None) + dst.set_property('location', video.name) + pipeline = Gst.Pipeline() + pipeline.add(src) + pipeline.add(videorate) + pipeline.add(enc) + pipeline.add(mux) + pipeline.add(dst) + src.link(videorate) + videorate.link(enc) + enc.link(mux) + mux.link(dst) + if make_audio: + audio_src = Gst.ElementFactory.make('audiotestsrc', None) + audio_src.set_property('num-buffers', 10) + audiorate = Gst.ElementFactory.make('audiorate', None) + audio_enc = Gst.ElementFactory.make('vorbisenc', None) + pipeline.add(audio_src) + pipeline.add(audio_enc) + pipeline.add(audiorate) + audio_src.link(audiorate) + audiorate.link(audio_enc) + audio_enc.link(mux) + pipeline.set_state(Gst.State.PLAYING) + state = pipeline.get_state(3 * Gst.SECOND) + assert state[0] == Gst.StateChangeReturn.SUCCESS + bus = pipeline.get_bus() + message = bus.timed_pop_filtered( + 3 * Gst.SECOND, + Gst.MessageType.ERROR | Gst.MessageType.EOS) + pipeline.set_state(Gst.State.NULL) + if suffix: + result = tempfile.NamedTemporaryFile(suffix=suffix) + else: + result = tempfile.NamedTemporaryFile() + yield (video.name, result.name) + + +#TODO: this should be skipped if video plugin is not enabled +def test_thumbnails(): + ''' + Test thumbnails generation. + 1. Create a video (+audio) from gst's videotestsrc + 2. Capture thumbnail + 3. Everything should get removed because of temp files usage + ''' + #data create_data() as (video_name, thumbnail_name): + test_formats = [('.png', 'png'), ('.jpg', 'jpeg'), ('.gif', 'gif')] + for suffix, format in test_formats: + with create_data(suffix) as (video_name, thumbnail_name): + capture_thumb(video_name, thumbnail_name, width=40) + # check result file format + assert imghdr.what(thumbnail_name) == format + # TODO: check height and width + # FIXME: it doesn't work with small width, say, 10px. This should be + # fixed somehow + suffix, format = test_formats[0] + with create_data(suffix, True) as (video_name, thumbnail_name): + capture_thumb(video_name, thumbnail_name, width=40) + assert imghdr.what(thumbnail_name) == format + with create_data(suffix, True) as (video_name, thumbnail_name): + capture_thumb(video_name, thumbnail_name, width=10) # smaller width + assert imghdr.what(thumbnail_name) == format + with create_data(suffix, True) as (video_name, thumbnail_name): + capture_thumb(video_name, thumbnail_name, width=100) # bigger width + assert imghdr.what(thumbnail_name) == format + + +def test_transcoder(): + # test without audio + with create_data() as (video_name, result_name): + transcoder = VideoTranscoder() + transcoder.transcode( + video_name, result_name, + vp8_quality=8, + vp8_threads=0, # autodetect + vorbis_quality=0.3, + dimensions=(640, 640)) + assert len(discover(result_name).get_video_streams()) == 1 + # test with audio + with create_data(make_audio=True) as (video_name, result_name): + transcoder = VideoTranscoder() + transcoder.transcode( + video_name, result_name, + vp8_quality=8, + vp8_threads=0, # autodetect + vorbis_quality=0.3, + dimensions=(640, 640)) + assert len(discover(result_name).get_video_streams()) == 1 + assert len(discover(result_name).get_audio_streams()) == 1 diff --git a/mediagoblin/tests/test_workbench.py b/mediagoblin/tests/test_workbench.py index 6695618b..f3ff57ed 100644 --- a/mediagoblin/tests/test_workbench.py +++ b/mediagoblin/tests/test_workbench.py @@ -50,7 +50,7 @@ class TestWorkbench(object): # kill a workbench this_workbench = self.workbench_manager.create() tmpfile_name = this_workbench.joinpath('temp.txt') - tmpfile = file(tmpfile_name, 'w') + tmpfile = open(tmpfile_name, 'w') with tmpfile: tmpfile.write('lollerskates') diff --git a/mediagoblin/tests/tools.py b/mediagoblin/tests/tools.py index 34392bf1..dec95e83 100644 --- a/mediagoblin/tests/tools.py +++ b/mediagoblin/tests/tools.py @@ -19,6 +19,7 @@ import os import pkg_resources import shutil +import six from paste.deploy import loadapp from webtest import TestApp @@ -26,7 +27,7 @@ from webtest import TestApp from mediagoblin import mg_globals from mediagoblin.db.models import User, MediaEntry, Collection, MediaComment, \ CommentSubscription, CommentNotification, Privilege, CommentReport, Client, \ - RequestToken, AccessToken + RequestToken, AccessToken, Activity, Generator from mediagoblin.tools import testing from mediagoblin.init.config import read_mediagoblin_config from mediagoblin.db.base import Session @@ -144,7 +145,7 @@ def install_fixtures_simple(db, fixtures): """ Very simply install fixtures in the database """ - for collection_name, collection_fixtures in fixtures.iteritems(): + for collection_name, collection_fixtures in six.iteritems(fixtures): collection = db[collection_name] for fixture in collection_fixtures: collection.insert(fixture) @@ -164,7 +165,7 @@ def assert_db_meets_expected(db, expected): {'id': 'foo', 'some_field': 'some_value'},]} """ - for collection_name, collection_data in expected.iteritems(): + for collection_name, collection_data in six.iteritems(expected): collection = db[collection_name] for expected_document in collection_data: document = collection.query.filter_by(id=expected_document['id']).first() @@ -345,3 +346,28 @@ def fixture_add_comment_report(comment=None, reported_user=None, Session.expunge(comment_report) return comment_report + +def fixture_add_activity(obj, verb="post", target=None, generator=None, actor=None): + if generator is None: + generator = Generator( + name="GNU MediaGoblin", + object_type="service" + ) + generator.save() + + if actor is None: + actor = fixture_add_user() + + activity = Activity( + verb=verb, + actor=actor.id, + generator=generator.id, + ) + + activity.set_object(obj) + + if target is not None: + activity.set_target(target) + + activity.save() + return activity
\ No newline at end of file |