aboutsummaryrefslogtreecommitdiffstats
path: root/mediagoblin/tests
diff options
context:
space:
mode:
Diffstat (limited to 'mediagoblin/tests')
-rw-r--r--mediagoblin/tests/media_tools.py61
-rw-r--r--mediagoblin/tests/test_api.py164
-rw-r--r--mediagoblin/tests/test_audio.py105
-rw-r--r--mediagoblin/tests/test_auth.py28
-rw-r--r--mediagoblin/tests/test_basic_auth.py3
-rw-r--r--mediagoblin/tests/test_edit.py17
-rw-r--r--mediagoblin/tests/test_exif.py22
-rw-r--r--mediagoblin/tests/test_http_callback.py83
-rw-r--r--mediagoblin/tests/test_ldap.py12
-rw-r--r--mediagoblin/tests/test_legacy_api.py11
-rw-r--r--mediagoblin/tests/test_metadata.py4
-rw-r--r--mediagoblin/tests/test_mgoblin_app.ini3
-rw-r--r--mediagoblin/tests/test_modelmethods.py67
-rw-r--r--mediagoblin/tests/test_notifications.py6
-rw-r--r--mediagoblin/tests/test_oauth1.py15
-rw-r--r--mediagoblin/tests/test_oauth2.py223
-rw-r--r--mediagoblin/tests/test_openid.py10
-rw-r--r--mediagoblin/tests/test_paste.ini34
-rw-r--r--mediagoblin/tests/test_pdf.py29
-rw-r--r--mediagoblin/tests/test_persona.py12
-rw-r--r--mediagoblin/tests/test_piwigo.py17
-rw-r--r--mediagoblin/tests/test_pluginapi.py15
-rw-r--r--mediagoblin/tests/test_privileges.py42
-rw-r--r--mediagoblin/tests/test_reporting.py5
-rw-r--r--mediagoblin/tests/test_sql_migrations.py10
-rw-r--r--mediagoblin/tests/test_storage.py20
-rw-r--r--mediagoblin/tests/test_submission.py37
l---------[-rw-r--r--]mediagoblin/tests/test_submission/good.pdfbin194007 -> 44 bytes
-rw-r--r--mediagoblin/tests/test_timesince.py2
-rw-r--r--mediagoblin/tests/test_util.py15
-rw-r--r--mediagoblin/tests/test_video.py132
-rw-r--r--mediagoblin/tests/test_workbench.py2
-rw-r--r--mediagoblin/tests/tools.py32
33 files changed, 751 insertions, 487 deletions
diff --git a/mediagoblin/tests/media_tools.py b/mediagoblin/tests/media_tools.py
new file mode 100644
index 00000000..8d58c024
--- /dev/null
+++ b/mediagoblin/tests/media_tools.py
@@ -0,0 +1,61 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from contextlib import contextmanager
+import tempfile
+
+import gi
+gi.require_version('Gst', '1.0')
+from gi.repository import Gst
+Gst.init(None)
+
+@contextmanager
+def create_av(make_video=False, make_audio=False):
+ 'creates audio/video in `path`, throws AssertionError on any error'
+ media = tempfile.NamedTemporaryFile(suffix='.ogg')
+ pipeline = Gst.Pipeline()
+ mux = Gst.ElementFactory.make('oggmux', 'mux')
+ pipeline.add(mux)
+ if make_video:
+ video_src = Gst.ElementFactory.make('videotestsrc', 'video_src')
+ video_src.set_property('num-buffers', 20)
+ video_enc = Gst.ElementFactory.make('theoraenc', 'video_enc')
+ pipeline.add(video_src)
+ pipeline.add(video_enc)
+ assert video_src.link(video_enc)
+ assert video_enc.link(mux)
+ if make_audio:
+ audio_src = Gst.ElementFactory.make('audiotestsrc', 'audio_src')
+ audio_src.set_property('num-buffers', 20)
+ audio_enc = Gst.ElementFactory.make('vorbisenc', 'audio_enc')
+ pipeline.add(audio_src)
+ pipeline.add(audio_enc)
+ assert audio_src.link(audio_enc)
+ assert audio_enc.link(mux)
+ sink = Gst.ElementFactory.make('filesink', 'sink')
+ sink.set_property('location', media.name)
+ pipeline.add(sink)
+ mux.link(sink)
+ pipeline.set_state(Gst.State.PLAYING)
+ state = pipeline.get_state(Gst.SECOND)
+ assert state[0] == Gst.StateChangeReturn.SUCCESS
+ bus = pipeline.get_bus()
+ message = bus.timed_pop_filtered(
+ Gst.SECOND, # one second should be more than enough for 50-buf vid
+ Gst.MessageType.ERROR | Gst.MessageType.EOS)
+ assert message.type == Gst.MessageType.EOS
+ pipeline.set_state(Gst.State.NULL)
+ yield media.name
diff --git a/mediagoblin/tests/test_api.py b/mediagoblin/tests/test_api.py
index 93e82f18..83003875 100644
--- a/mediagoblin/tests/test_api.py
+++ b/mediagoblin/tests/test_api.py
@@ -15,7 +15,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
-import mock
+try:
+ import mock
+except ImportError:
+ import unittest.mock as mock
import pytest
from webtest import AppError
@@ -23,6 +26,7 @@ from webtest import AppError
from .resources import GOOD_JPG
from mediagoblin import mg_globals
from mediagoblin.db.models import User, MediaEntry, MediaComment
+from mediagoblin.tools.routing import extract_url_arguments
from mediagoblin.tests.tools import fixture_add_user
from mediagoblin.moderation.tools import take_away_privileges
@@ -55,7 +59,7 @@ class TestAPI(object):
headers=headers
)
- return response, json.loads(response.body)
+ return response, json.loads(response.body.decode())
def _upload_image(self, test_app, image):
""" Uploads and image to MediaGoblin via pump.io API """
@@ -72,7 +76,7 @@ class TestAPI(object):
data,
headers=headers
)
- image = json.loads(response.body)
+ image = json.loads(response.body.decode())
return response, image
@@ -142,7 +146,7 @@ class TestAPI(object):
headers=headers
)
- assert "403 FORBIDDEN" in excinfo.value.message
+ assert "403 FORBIDDEN" in excinfo.value.args[0]
def test_unable_to_post_feed_as_someone_else(self, test_app):
""" Tests that can't post an image to someone else's feed """
@@ -165,7 +169,7 @@ class TestAPI(object):
headers=headers
)
- assert "403 FORBIDDEN" in excinfo.value.message
+ assert "403 FORBIDDEN" in excinfo.value.args[0]
def test_only_able_to_update_own_image(self, test_app):
""" Test's that the uploader is the only person who can update an image """
@@ -184,7 +188,8 @@ class TestAPI(object):
# Lets change the image uploader to be self.other_user, this is easier
# than uploading the image as someone else as the way self.mocked_oauth_required
# and self._upload_image.
- media = MediaEntry.query.filter_by(id=data["object"]["id"]).first()
+ id = int(data["object"]["id"].split("/")[-2])
+ media = MediaEntry.query.filter_by(id=id).first()
media.uploader = self.other_user.id
media.save()
@@ -197,7 +202,7 @@ class TestAPI(object):
headers=headers
)
- assert "403 FORBIDDEN" in excinfo.value.message
+ assert "403 FORBIDDEN" in excinfo.value.args[0]
def test_upload_image_with_filename(self, test_app):
""" Tests that you can upload an image with filename and description """
@@ -224,16 +229,17 @@ class TestAPI(object):
headers={"Content-Type": "application/json"}
)
- image = json.loads(response.body)["object"]
+ image = json.loads(response.body.decode())["object"]
# Check everything has been set on the media correctly
- media = MediaEntry.query.filter_by(id=image["id"]).first()
+ id = int(image["id"].split("/")[-2])
+ media = MediaEntry.query.filter_by(id=id).first()
assert media.title == title
assert media.description == description
assert media.license == license
# Check we're being given back everything we should on an update
- assert image["id"] == media.id
+ assert int(image["id"].split("/")[-2]) == media.id
assert image["displayName"] == title
assert image["content"] == description
assert image["license"] == license
@@ -260,7 +266,7 @@ class TestAPI(object):
)
# Assert that we've got a 403
- assert "403 FORBIDDEN" in excinfo.value.message
+ assert "403 FORBIDDEN" in excinfo.value.args[0]
def test_object_endpoint(self, test_app):
""" Tests that object can be looked up at endpoint """
@@ -281,11 +287,11 @@ class TestAPI(object):
with self.mock_oauth():
request = test_app.get(object_uri)
- image = json.loads(request.body)
- entry = MediaEntry.query.filter_by(id=image["id"]).first()
+ image = json.loads(request.body.decode())
+ entry_id = int(image["id"].split("/")[-2])
+ entry = MediaEntry.query.filter_by(id=entry_id).first()
assert request.status_code == 200
- assert entry.id == image["id"]
assert "image" in image
assert "fullImage" in image
@@ -313,7 +319,8 @@ class TestAPI(object):
assert response.status_code == 200
# Find the objects in the database
- media = MediaEntry.query.filter_by(id=data["object"]["id"]).first()
+ media_id = int(data["object"]["id"].split("/")[-2])
+ media = MediaEntry.query.filter_by(id=media_id).first()
comment = media.get_comments()[0]
# Tests that it matches in the database
@@ -321,7 +328,6 @@ class TestAPI(object):
assert comment.content == content
# Test that the response is what we should be given
- assert comment.id == comment_data["object"]["id"]
assert comment.content == comment_data["object"]["content"]
def test_unable_to_post_comment_as_someone_else(self, test_app):
@@ -351,7 +357,7 @@ class TestAPI(object):
headers=headers
)
- assert "403 FORBIDDEN" in excinfo.value.message
+ assert "403 FORBIDDEN" in excinfo.value.args[0]
def test_unable_to_update_someone_elses_comment(self, test_app):
""" Test that you're able to update someoen elses comment. """
@@ -376,7 +382,7 @@ class TestAPI(object):
response, comment_data = self._activity_to_feed(test_app, activity)
# change who uploaded the comment as it's easier than changing
- comment_id = comment_data["object"]["id"]
+ comment_id = int(comment_data["object"]["id"].split("/")[-2])
comment = MediaComment.query.filter_by(id=comment_id).first()
comment.author = self.other_user.id
comment.save()
@@ -396,14 +402,14 @@ class TestAPI(object):
headers=headers
)
- assert "403 FORBIDDEN" in excinfo.value.message
+ assert "403 FORBIDDEN" in excinfo.value.args[0]
def test_profile(self, test_app):
""" Tests profile endpoint """
uri = "/api/user/{0}/profile".format(self.user.username)
with self.mock_oauth():
response = test_app.get(uri)
- profile = json.loads(response.body)
+ profile = json.loads(response.body.decode())
assert response.status_code == 200
@@ -417,7 +423,7 @@ class TestAPI(object):
uri = "/api/user/{0}/".format(self.user.username)
with self.mock_oauth():
response = test_app.get(uri)
- user = json.loads(response.body)
+ user = json.loads(response.body.decode())
assert response.status_code == 200
@@ -433,7 +439,7 @@ class TestAPI(object):
with pytest.raises(AppError) as excinfo:
response = test_app.get("/api/whoami")
- assert "401 UNAUTHORIZED" in excinfo.value.message
+ assert "401 UNAUTHORIZED" in excinfo.value.args[0]
def test_read_feed(self, test_app):
""" Test able to read objects from the feed """
@@ -443,7 +449,7 @@ class TestAPI(object):
uri = "/api/user/{0}/feed".format(self.active_user.username)
with self.mock_oauth():
response = test_app.get(uri)
- feed = json.loads(response.body)
+ feed = json.loads(response.body.decode())
assert response.status_code == 200
@@ -468,9 +474,9 @@ class TestAPI(object):
with pytest.raises(AppError) as excinfo:
self._post_image_to_feed(test_app, data)
- assert "403 FORBIDDEN" in excinfo.value.message
+ assert "403 FORBIDDEN" in excinfo.value.args[0]
- def test_object_endpoint(self, test_app):
+ def test_object_endpoint_requestable(self, test_app):
""" Test that object endpoint can be requested """
response, data = self._upload_image(test_app, GOOD_JPG)
response, data = self._post_image_to_feed(test_app, data)
@@ -478,7 +484,7 @@ class TestAPI(object):
with self.mock_oauth():
response = test_app.get(data["object"]["links"]["self"]["href"])
- data = json.loads(response.body)
+ data = json.loads(response.body.decode())
assert response.status_code == 200
@@ -486,3 +492,109 @@ class TestAPI(object):
assert "url" in data
assert "links" in data
assert data["objectType"] == "image"
+
+ def test_delete_media_by_activity(self, test_app):
+ """ Test that an image can be deleted by a delete activity to feed """
+ response, data = self._upload_image(test_app, GOOD_JPG)
+ response, data = self._post_image_to_feed(test_app, data)
+ object_id = data["object"]["id"]
+
+ activity = {
+ "verb": "delete",
+ "object": {
+ "id": object_id,
+ "objectType": "image",
+ }
+ }
+
+ response = self._activity_to_feed(test_app, activity)[1]
+
+ # Check the media is no longer in the database
+ media_id = int(object_id.split("/")[-2])
+ media = MediaEntry.query.filter_by(id=media_id).first()
+
+ assert media is None
+
+ # Check we've been given the full delete activity back
+ assert "id" in response
+ assert response["verb"] == "delete"
+ assert "object" in response
+ assert response["object"]["id"] == object_id
+ assert response["object"]["objectType"] == "image"
+
+ def test_delete_comment_by_activity(self, test_app):
+ """ Test that a comment is deleted by a delete activity to feed """
+ # First upload an image to comment against
+ response, data = self._upload_image(test_app, GOOD_JPG)
+ response, data = self._post_image_to_feed(test_app, data)
+
+ # Post a comment to delete
+ activity = {
+ "verb": "post",
+ "object": {
+ "objectType": "comment",
+ "content": "This is a comment.",
+ "inReplyTo": data["object"],
+ }
+ }
+
+ comment = self._activity_to_feed(test_app, activity)[1]
+
+ # Now delete the image
+ activity = {
+ "verb": "delete",
+ "object": {
+ "id": comment["object"]["id"],
+ "objectType": "comment",
+ }
+ }
+
+ delete = self._activity_to_feed(test_app, activity)[1]
+
+ # Verify the comment no longer exists
+ comment_id = int(comment["object"]["id"].split("/")[-2])
+ assert MediaComment.query.filter_by(id=comment_id).first() is None
+
+ # Check we've got a delete activity back
+ assert "id" in delete
+ assert delete["verb"] == "delete"
+ assert "object" in delete
+ assert delete["object"]["id"] == comment["object"]["id"]
+ assert delete["object"]["objectType"] == "comment"
+
+ def test_edit_comment(self, test_app):
+ """ Test that someone can update their own comment """
+ # First upload an image to comment against
+ response, data = self._upload_image(test_app, GOOD_JPG)
+ response, data = self._post_image_to_feed(test_app, data)
+
+ # Post a comment to edit
+ activity = {
+ "verb": "post",
+ "object": {
+ "objectType": "comment",
+ "content": "This is a comment",
+ "inReplyTo": data["object"],
+ }
+ }
+
+ comment = self._activity_to_feed(test_app, activity)[1]
+
+ # Now create an update activity to change the content
+ activity = {
+ "verb": "update",
+ "object": {
+ "id": comment["object"]["id"],
+ "content": "This is my fancy new content string!",
+ "objectType": "comment",
+ },
+ }
+
+ comment = self._activity_to_feed(test_app, activity)[1]
+
+ # Verify the comment reflects the changes
+ comment_id = int(comment["object"]["id"].split("/")[-2])
+ model = MediaComment.query.filter_by(id=comment_id).first()
+
+ assert model.content == activity["object"]["content"]
+
diff --git a/mediagoblin/tests/test_audio.py b/mediagoblin/tests/test_audio.py
new file mode 100644
index 00000000..9826ceb1
--- /dev/null
+++ b/mediagoblin/tests/test_audio.py
@@ -0,0 +1,105 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import tempfile
+import shutil
+import os
+import pytest
+from contextlib import contextmanager
+import logging
+import imghdr
+
+#os.environ['GST_DEBUG'] = '4,python:4'
+
+pytest.importorskip("gi.repository.Gst")
+pytest.importorskip("scikits.audiolab")
+import gi
+gi.require_version('Gst', '1.0')
+from gi.repository import Gst
+Gst.init(None)
+
+from mediagoblin.media_types.audio.transcoders import (AudioTranscoder,
+ AudioThumbnailer)
+from mediagoblin.media_types.tools import discover
+
+
+@contextmanager
+def create_audio():
+ audio = tempfile.NamedTemporaryFile()
+ src = Gst.ElementFactory.make('audiotestsrc', None)
+ src.set_property('num-buffers', 50)
+ enc = Gst.ElementFactory.make('flacenc', None)
+ dst = Gst.ElementFactory.make('filesink', None)
+ dst.set_property('location', audio.name)
+ pipeline = Gst.Pipeline()
+ pipeline.add(src)
+ pipeline.add(enc)
+ pipeline.add(dst)
+ src.link(enc)
+ enc.link(dst)
+ pipeline.set_state(Gst.State.PLAYING)
+ state = pipeline.get_state(3 * Gst.SECOND)
+ assert state[0] == Gst.StateChangeReturn.SUCCESS
+ bus = pipeline.get_bus()
+ bus.timed_pop_filtered(
+ 3 * Gst.SECOND,
+ Gst.MessageType.ERROR | Gst.MessageType.EOS)
+ pipeline.set_state(Gst.State.NULL)
+ yield (audio.name)
+
+
+@contextmanager
+def create_data_for_test():
+ with create_audio() as audio_name:
+ second_file = tempfile.NamedTemporaryFile()
+ yield (audio_name, second_file.name)
+
+
+def test_transcoder():
+ '''
+ Tests AudioTransocder's transcode method
+ '''
+ transcoder = AudioTranscoder()
+ with create_data_for_test() as (audio_name, result_name):
+ transcoder.transcode(audio_name, result_name, quality=0.3,
+ progress_callback=None)
+ info = discover(result_name)
+ assert len(info.get_audio_streams()) == 1
+ transcoder.transcode(audio_name, result_name, quality=0.3,
+ mux_name='oggmux', progress_callback=None)
+ info = discover(result_name)
+ assert len(info.get_audio_streams()) == 1
+
+
+def test_thumbnails():
+ '''Test thumbnails generation.
+
+ The code below heavily repeats
+ audio.processing.CommonAudioProcessor.create_spectrogram
+ 1. Create test audio
+ 2. Convert it to OGG source for spectogram using transcoder
+ 3. Create spectogram in jpg
+
+ '''
+ thumbnailer = AudioThumbnailer()
+ transcoder = AudioTranscoder()
+ with create_data_for_test() as (audio_name, new_name):
+ transcoder.transcode(audio_name, new_name, mux_name='oggmux')
+ thumbnail = tempfile.NamedTemporaryFile(suffix='.jpg')
+ # fft_size below is copypasted from config_spec.ini
+ thumbnailer.spectrogram(new_name, thumbnail.name, width=100,
+ fft_size=4096)
+ assert imghdr.what(thumbnail.name) == 'jpeg'
diff --git a/mediagoblin/tests/test_auth.py b/mediagoblin/tests/test_auth.py
index 1bbc3d01..5ce67688 100644
--- a/mediagoblin/tests/test_auth.py
+++ b/mediagoblin/tests/test_auth.py
@@ -14,10 +14,14 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import urlparse
+
import pkg_resources
import pytest
+import six
+
+import six.moves.urllib.parse as urlparse
+
from mediagoblin import mg_globals
from mediagoblin.db.models import User
from mediagoblin.tests.tools import get_app, fixture_add_user
@@ -107,7 +111,7 @@ def test_register_views(test_app):
## Make sure user is logged in
request = template.TEMPLATE_TEST_CONTEXT[
'mediagoblin/user_pages/user_nonactive.html']['request']
- assert request.session['user_id'] == unicode(new_user.id)
+ assert request.session['user_id'] == six.text_type(new_user.id)
## Make sure we get email confirmation, and try verifying
assert len(mail.EMAIL_TEST_INBOX) == 1
@@ -115,7 +119,7 @@ def test_register_views(test_app):
assert message['To'] == 'angrygrrl@example.org'
email_context = template.TEMPLATE_TEST_CONTEXT[
'mediagoblin/auth/verification_email.txt']
- assert email_context['verification_url'] in message.get_payload(decode=True)
+ assert email_context['verification_url'].encode('ascii') in message.get_payload(decode=True)
path = urlparse.urlsplit(email_context['verification_url'])[2]
get_params = urlparse.urlsplit(email_context['verification_url'])[3]
@@ -186,7 +190,7 @@ def test_register_views(test_app):
email_context = template.TEMPLATE_TEST_CONTEXT[
'mediagoblin/plugins/basic_auth/fp_verification_email.txt']
#TODO - change the name of verification_url to something forgot-password-ish
- assert email_context['verification_url'] in message.get_payload(decode=True)
+ assert email_context['verification_url'].encode('ascii') in message.get_payload(decode=True)
path = urlparse.urlsplit(email_context['verification_url'])[2]
get_params = urlparse.urlsplit(email_context['verification_url'])[3]
@@ -229,7 +233,6 @@ def test_register_views(test_app):
assert urlparse.urlsplit(response.location)[2] == '/'
assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
-
def test_authentication_views(test_app):
"""
Test logging in and logging out
@@ -305,7 +308,7 @@ def test_authentication_views(test_app):
# Make sure user is in the session
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
session = context['request'].session
- assert session['user_id'] == unicode(test_user.id)
+ assert session['user_id'] == six.text_type(test_user.id)
# Successful logout
# -----------------
@@ -332,6 +335,19 @@ def test_authentication_views(test_app):
'next' : '/u/chris/'})
assert urlparse.urlsplit(response.location)[2] == '/u/chris/'
+ ## Verify that username is lowercased on login attempt
+ template.clear_test_template_context()
+ response = test_app.post(
+ '/auth/login/', {
+ 'username': u'ANDREW',
+ 'password': 'fuselage'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html']
+ form = context['login_form']
+
+ # Username should no longer be uppercased; it should be lowercased
+ assert not form.username.data == u'ANDREW'
+ assert form.username.data == u'andrew'
+
@pytest.fixture()
def authentication_disabled_app(request):
return get_app(
diff --git a/mediagoblin/tests/test_basic_auth.py b/mediagoblin/tests/test_basic_auth.py
index 828f0515..e7157bee 100644
--- a/mediagoblin/tests/test_basic_auth.py
+++ b/mediagoblin/tests/test_basic_auth.py
@@ -13,7 +13,8 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import urlparse
+
+import six.moves.urllib.parse as urlparse
from mediagoblin.db.models import User
from mediagoblin.plugins.basic_auth import tools as auth_tools
diff --git a/mediagoblin/tests/test_edit.py b/mediagoblin/tests/test_edit.py
index dc9c422f..384929cb 100644
--- a/mediagoblin/tests/test_edit.py
+++ b/mediagoblin/tests/test_edit.py
@@ -14,7 +14,9 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import urlparse, os, pytest
+import six
+import six.moves.urllib.parse as urlparse
+import pytest
from mediagoblin import mg_globals
from mediagoblin.db.models import User, MediaEntry
@@ -142,8 +144,7 @@ class TestUserEdit(object):
assert message['To'] == 'new@example.com'
email_context = template.TEMPLATE_TEST_CONTEXT[
'mediagoblin/edit/verification.txt']
- assert email_context['verification_url'] in \
- message.get_payload(decode=True)
+ assert email_context['verification_url'].encode('ascii') in message.get_payload(decode=True)
path = urlparse.urlsplit(email_context['verification_url'])[2]
assert path == u'/edit/verify_email/'
@@ -250,5 +251,11 @@ class TestMetaDataEdit:
old_metadata = new_metadata
new_metadata = media_entry.media_metadata
assert new_metadata == old_metadata
- assert ("u&#39;On the worst day&#39; is not a &#39;date-time&#39;" in
- response.body)
+ context = template.TEMPLATE_TEST_CONTEXT[
+ 'mediagoblin/edit/metadata.html']
+ if six.PY2:
+ expected = "u'On the worst day' is not a 'date-time'"
+ else:
+ expected = "'On the worst day' is not a 'date-time'"
+ assert context['form'].errors[
+ 'media_metadata'][0]['identifier'][0] == expected
diff --git a/mediagoblin/tests/test_exif.py b/mediagoblin/tests/test_exif.py
index af301818..e3869ec8 100644
--- a/mediagoblin/tests/test_exif.py
+++ b/mediagoblin/tests/test_exif.py
@@ -20,6 +20,8 @@ try:
except ImportError:
import Image
+from collections import OrderedDict
+
from mediagoblin.tools.exif import exif_fix_image_orientation, \
extract_exif, clean_exif, get_gps_data, get_useful
from .resources import GOOD_JPG, EMPTY_JPG, BAD_JPG, GPS_JPG
@@ -39,31 +41,32 @@ def test_exif_extraction():
gps = get_gps_data(result)
# Do we have the result?
- assert len(result) == 55
+ assert len(result) >= 50
# Do we have clean data?
- assert len(clean) == 53
+ assert len(clean) >= 50
# GPS data?
assert gps == {}
# Do we have the "useful" tags?
- assert useful == {'EXIF CVAPattern': {'field_length': 8,
+
+ expected = OrderedDict({'EXIF CVAPattern': {'field_length': 8,
'field_offset': 26224,
'field_type': 7,
- 'printable': u'[0, 2, 0, 2, 1, 2, 0, 1]',
+ 'printable': '[0, 2, 0, 2, 1, 2, 0, 1]',
'tag': 41730,
'values': [0, 2, 0, 2, 1, 2, 0, 1]},
'EXIF ColorSpace': {'field_length': 2,
'field_offset': 476,
'field_type': 3,
- 'printable': u'sRGB',
+ 'printable': 'sRGB',
'tag': 40961,
'values': [1]},
'EXIF ComponentsConfiguration': {'field_length': 4,
'field_offset': 308,
'field_type': 7,
- 'printable': u'YCbCr',
+ 'printable': 'YCbCr',
'tag': 37121,
'values': [1, 2, 3, 0]},
'EXIF CompressedBitsPerPixel': {'field_length': 8,
@@ -365,7 +368,10 @@ def test_exif_extraction():
'field_type': 5,
'printable': u'300',
'tag': 283,
- 'values': [[300, 1]]}}
+ 'values': [[300, 1]]}})
+
+ for key in expected.keys():
+ assert useful[key] == expected[key]
def test_exif_image_orientation():
@@ -379,7 +385,7 @@ def test_exif_image_orientation():
result)
# Are the dimensions correct?
- assert image.size == (428, 640)
+ assert image.size in ((428, 640), (640, 428))
# If this pixel looks right, the rest of the image probably will too.
assert_in(image.getdata()[10000],
diff --git a/mediagoblin/tests/test_http_callback.py b/mediagoblin/tests/test_http_callback.py
deleted file mode 100644
index 64b7ee8f..00000000
--- a/mediagoblin/tests/test_http_callback.py
+++ /dev/null
@@ -1,83 +0,0 @@
-# GNU MediaGoblin -- federated, autonomous media hosting
-# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import json
-
-import pytest
-from urlparse import urlparse, parse_qs
-
-from mediagoblin import mg_globals
-from mediagoblin.tools import processing
-from mediagoblin.tests.tools import fixture_add_user
-from mediagoblin.tests.test_submission import GOOD_PNG
-from mediagoblin.tests import test_oauth2 as oauth
-
-
-class TestHTTPCallback(object):
- @pytest.fixture(autouse=True)
- def setup(self, test_app):
- self.test_app = test_app
-
- self.db = mg_globals.database
-
- self.user_password = u'secret'
- self.user = fixture_add_user(u'call_back', self.user_password)
-
- self.login()
-
- def login(self):
- self.test_app.post('/auth/login/', {
- 'username': self.user.username,
- 'password': self.user_password})
-
- def get_access_token(self, client_id, client_secret, code):
- response = self.test_app.get('/oauth-2/access_token', {
- 'code': code,
- 'client_id': client_id,
- 'client_secret': client_secret})
-
- response_data = json.loads(response.body)
-
- return response_data['access_token']
-
- def test_callback(self):
- ''' Test processing HTTP callback '''
- self.oauth = oauth.TestOAuth()
- self.oauth.setup(self.test_app)
-
- redirect, client_id = self.oauth.test_4_authorize_confidential_client()
-
- code = parse_qs(urlparse(redirect.location).query)['code'][0]
-
- client = self.db.OAuthClient.query.filter(
- self.db.OAuthClient.identifier == unicode(client_id)).first()
-
- client_secret = client.secret
-
- access_token = self.get_access_token(client_id, client_secret, code)
-
- callback_url = 'https://foo.example?secrettestmediagoblinparam'
-
- self.test_app.post('/api/submit?client_id={0}&access_token={1}\
-&client_secret={2}'.format(
- client_id,
- access_token,
- client_secret), {
- 'title': 'Test',
- 'callback_url': callback_url},
- upload_files=[('file', GOOD_PNG)])
-
- assert processing.TESTS_CALLBACKS[callback_url]['state'] == u'processed'
diff --git a/mediagoblin/tests/test_ldap.py b/mediagoblin/tests/test_ldap.py
index 7e20d059..f251d150 100644
--- a/mediagoblin/tests/test_ldap.py
+++ b/mediagoblin/tests/test_ldap.py
@@ -13,10 +13,16 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import urlparse
+
import pkg_resources
import pytest
-import mock
+import six
+try:
+ import mock
+except ImportError:
+ import unittest.mock as mock
+
+import six.moves.urllib.parse as urlparse
from mediagoblin import mg_globals
from mediagoblin.db.base import Session
@@ -126,6 +132,6 @@ def test_ldap_plugin(ldap_plugin_app):
# Make sure user is in the session
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
session = context['request'].session
- assert session['user_id'] == unicode(test_user.id)
+ assert session['user_id'] == six.text_type(test_user.id)
_test_authentication()
diff --git a/mediagoblin/tests/test_legacy_api.py b/mediagoblin/tests/test_legacy_api.py
index 4e0cbd8f..b3b2fcec 100644
--- a/mediagoblin/tests/test_legacy_api.py
+++ b/mediagoblin/tests/test_legacy_api.py
@@ -17,6 +17,7 @@
import logging
import base64
+import json
import pytest
@@ -48,10 +49,10 @@ class TestAPI(object):
return template.TEMPLATE_TEST_CONTEXT[template_name]
def http_auth_headers(self):
- return {'Authorization': 'Basic {0}'.format(
- base64.b64encode(':'.join([
+ return {'Authorization': ('Basic {0}'.format(
+ base64.b64encode((':'.join([
self.user.username,
- self.user_password])))}
+ self.user_password])).encode('ascii')).decode()))}
def do_post(self, data, test_app, **kwargs):
url = kwargs.pop('url', '/api/submit')
@@ -77,8 +78,8 @@ class TestAPI(object):
'/api/test',
headers=self.http_auth_headers())
- assert response.body == \
- '{"username": "joapi", "email": "joapi@example.com"}'
+ assert json.loads(response.body.decode()) == {
+ "username": "joapi", "email": "joapi@example.com"}
def test_2_test_submission(self, test_app):
self.login(test_app)
diff --git a/mediagoblin/tests/test_metadata.py b/mediagoblin/tests/test_metadata.py
index b4ea646e..a10e00ec 100644
--- a/mediagoblin/tests/test_metadata.py
+++ b/mediagoblin/tests/test_metadata.py
@@ -56,7 +56,7 @@ class TestMetadataFunctionality:
jsonld_fail_1 = None
try:
jsonld_fail_1 = compact_and_validate(metadata_fail_1)
- except ValidationError, e:
+ except ValidationError as e:
assert e.message == "'All Rights Reserved.' is not a 'uri'"
assert jsonld_fail_1 == None
#,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,
@@ -72,7 +72,7 @@ class TestMetadataFunctionality:
jsonld_fail_2 = None
try:
jsonld_fail_2 = compact_and_validate(metadata_fail_2)
- except ValidationError, e:
+ except ValidationError as e:
assert e.message == "'The other day' is not a 'date-time'"
assert jsonld_fail_2 == None
diff --git a/mediagoblin/tests/test_mgoblin_app.ini b/mediagoblin/tests/test_mgoblin_app.ini
index 4cd3d9b6..6ac64321 100644
--- a/mediagoblin/tests/test_mgoblin_app.ini
+++ b/mediagoblin/tests/test_mgoblin_app.ini
@@ -31,10 +31,11 @@ BROKER_URL = "sqlite:///%(here)s/test_user_dev/kombu.db"
[plugins]
[[mediagoblin.plugins.api]]
-[[mediagoblin.plugins.oauth]]
[[mediagoblin.plugins.httpapiauth]]
[[mediagoblin.plugins.piwigo]]
[[mediagoblin.plugins.basic_auth]]
[[mediagoblin.plugins.openid]]
[[mediagoblin.media_types.image]]
+[[mediagoblin.media_types.video]]
+[[mediagoblin.media_types.audio]]
[[mediagoblin.media_types.pdf]]
diff --git a/mediagoblin/tests/test_modelmethods.py b/mediagoblin/tests/test_modelmethods.py
index 32d5dce0..1ab0c476 100644
--- a/mediagoblin/tests/test_modelmethods.py
+++ b/mediagoblin/tests/test_modelmethods.py
@@ -17,13 +17,20 @@
# Maybe not every model needs a test, but some models have special
# methods, and so it makes sense to test them here.
+from __future__ import print_function
+
from mediagoblin.db.base import Session
-from mediagoblin.db.models import MediaEntry, User, Privilege
+from mediagoblin.db.models import MediaEntry, User, Privilege, Activity, \
+ Generator
from mediagoblin.tests import MGClientTestCase
-from mediagoblin.tests.tools import fixture_add_user
+from mediagoblin.tests.tools import fixture_add_user, fixture_media_entry, \
+ fixture_add_activity
-import mock
+try:
+ import mock
+except ImportError:
+ import unittest.mock as mock
import pytest
@@ -202,7 +209,7 @@ def test_media_data_init(test_app):
obj_in_session = 0
for obj in Session():
obj_in_session += 1
- print repr(obj)
+ print(repr(obj))
assert obj_in_session == 0
@@ -225,3 +232,55 @@ class TestUserUrlForSelf(MGClientTestCase):
self.user(u'lindsay').url_for_self(fake_urlgen())
assert excinfo.errisinstance(TypeError)
assert 'object is not callable' in str(excinfo)
+
+class TestActivitySetGet(object):
+ """ Test methods on the Activity and ActivityIntermediator models """
+
+ @pytest.fixture(autouse=True)
+ def setup(self, test_app):
+ self.app = test_app
+ self.user = fixture_add_user()
+ self.obj = fixture_media_entry()
+ self.target = fixture_media_entry()
+
+ def test_set_activity_object(self):
+ """ Activity.set_object should produce ActivityIntermediator """
+ # The fixture will set self.obj as the object on the activity.
+ activity = fixture_add_activity(self.obj, actor=self.user)
+
+ # Assert the media has been associated with an AI
+ assert self.obj.activity is not None
+
+ # Assert the AI on the media and object are the same
+ assert activity.object == self.obj.activity
+
+ def test_activity_set_target(self):
+ """ Activity.set_target should produce ActivityIntermediator """
+ # This should set everything needed on the target
+ activity = fixture_add_activity(self.obj, actor=self.user)
+ activity.set_target(self.target)
+
+ # Assert the media has been associated with the AI
+ assert self.target.activity is not None
+
+ # assert the AI on the media and target are the same
+ assert activity.target == self.target.activity
+
+ def test_get_activity_object(self):
+ """ Activity.get_object should return a set object """
+ activity = fixture_add_activity(self.obj, actor=self.user)
+
+ print("self.obj.activity = {0}".format(self.obj.activity))
+
+ # check we now can get the object
+ assert activity.get_object is not None
+ assert activity.get_object.id == self.obj.id
+
+ def test_get_activity_target(self):
+ """ Activity.set_target should return a set target """
+ activity = fixture_add_activity(self.obj, actor=self.user)
+ activity.set_target(self.target)
+
+ # check we can get the target
+ assert activity.get_target is not None
+ assert activity.get_target.id == self.target.id
diff --git a/mediagoblin/tests/test_notifications.py b/mediagoblin/tests/test_notifications.py
index 3bf36f5f..385da569 100644
--- a/mediagoblin/tests/test_notifications.py
+++ b/mediagoblin/tests/test_notifications.py
@@ -16,7 +16,7 @@
import pytest
-import urlparse
+import six.moves.urllib.parse as urlparse
from mediagoblin.tools import template, mail
@@ -135,13 +135,13 @@ otherperson@example.com\n\nSGkgb3RoZXJwZXJzb24sCmNocmlzIGNvbW1lbnRlZCBvbiB5b3VyI
self.logout()
self.login('otherperson', 'nosreprehto')
- self.test_app.get(media_uri_slug + '/c/{0}/'.format(comment_id))
+ self.test_app.get(media_uri_slug + 'c/{0}/'.format(comment_id))
notification = Notification.query.filter_by(id=notification_id).first()
assert notification.seen == True
- self.test_app.get(media_uri_slug + '/notifications/silence/')
+ self.test_app.get(media_uri_slug + 'notifications/silence/')
subscription = CommentSubscription.query.filter_by(id=subscription_id)\
.first()
diff --git a/mediagoblin/tests/test_oauth1.py b/mediagoblin/tests/test_oauth1.py
index 568036e5..e41a68c7 100644
--- a/mediagoblin/tests/test_oauth1.py
+++ b/mediagoblin/tests/test_oauth1.py
@@ -14,10 +14,9 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import cgi
-
import pytest
-from urlparse import parse_qs, urlparse
+
+from six.moves.urllib.parse import parse_qs, urlparse
from oauthlib.oauth1 import Client
@@ -73,7 +72,7 @@ class TestOAuth(object):
"application_name": "Testificate MD",
"application_type": "web",
"contacts": "someone@someplace.com tuteo@tsengeo.lu",
- "logo_url": "http://ayrel.com/utral.png",
+ "logo_uri": "http://ayrel.com/utral.png",
"redirect_uris": "http://navi-kosman.lu http://gmg-yawne-oeru.lu",
}
@@ -86,7 +85,7 @@ class TestOAuth(object):
assert client.secret == client_info["client_secret"]
assert client.application_type == query["application_type"]
assert client.redirect_uri == query["redirect_uris"].split()
- assert client.logo_url == query["logo_url"]
+ assert client.logo_url == query["logo_uri"]
assert client.contacts == query["contacts"].split()
@@ -103,7 +102,7 @@ class TestOAuth(object):
"type": "client_update",
"application_name": "neytiri",
"contacts": "someone@someplace.com abc@cba.com",
- "logo_url": "http://place.com/picture.png",
+ "logo_uri": "http://place.com/picture.png",
"application_type": "web",
"redirect_uris": "http://blah.gmg/whatever https://inboxen.org/",
}
@@ -118,7 +117,7 @@ class TestOAuth(object):
assert client.application_type == update_query["application_type"]
assert client.application_name == update_query["application_name"]
assert client.contacts == update_query["contacts"].split()
- assert client.logo_url == update_query["logo_url"]
+ assert client.logo_url == update_query["logo_uri"]
assert client.redirect_uri == update_query["redirect_uris"].split()
def to_authorize_headers(self, data):
@@ -146,7 +145,7 @@ class TestOAuth(object):
headers["Content-Type"] = self.MIME_FORM
response = self.test_app.post(endpoint, headers=headers)
- response = cgi.parse_qs(response.body)
+ response = parse_qs(response.body.decode())
# each element is a list, reduce it to a string
for key, value in response.items():
diff --git a/mediagoblin/tests/test_oauth2.py b/mediagoblin/tests/test_oauth2.py
deleted file mode 100644
index 957f4e65..00000000
--- a/mediagoblin/tests/test_oauth2.py
+++ /dev/null
@@ -1,223 +0,0 @@
-# GNU MediaGoblin -- federated, autonomous media hosting
-# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import json
-import logging
-
-import pytest
-from urlparse import parse_qs, urlparse
-
-from mediagoblin import mg_globals
-from mediagoblin.tools import template, pluginapi
-from mediagoblin.tests.tools import fixture_add_user
-
-
-_log = logging.getLogger(__name__)
-
-
-class TestOAuth(object):
- @pytest.fixture(autouse=True)
- def setup(self, test_app):
- self.test_app = test_app
-
- self.db = mg_globals.database
-
- self.pman = pluginapi.PluginManager()
-
- self.user_password = u'4cc355_70k3N'
- self.user = fixture_add_user(u'joauth', self.user_password,
- privileges=[u'active'])
-
- self.login()
-
- def login(self):
- self.test_app.post(
- '/auth/login/', {
- 'username': self.user.username,
- 'password': self.user_password})
-
- def register_client(self, name, client_type, description=None,
- redirect_uri=''):
- return self.test_app.post(
- '/oauth-2/client/register', {
- 'name': name,
- 'description': description,
- 'type': client_type,
- 'redirect_uri': redirect_uri})
-
- def get_context(self, template_name):
- return template.TEMPLATE_TEST_CONTEXT[template_name]
-
- def test_1_public_client_registration_without_redirect_uri(self):
- ''' Test 'public' OAuth client registration without any redirect uri '''
- response = self.register_client(
- u'OMGOMGOMG', 'public', 'OMGOMG Apache License v2')
-
- ctx = self.get_context('oauth/client/register.html')
-
- client = self.db.OAuthClient.query.filter(
- self.db.OAuthClient.name == u'OMGOMGOMG').first()
-
- assert response.status_int == 200
-
- # Should display an error
- assert len(ctx['form'].redirect_uri.errors)
-
- # Should not pass through
- assert not client
-
- def test_2_successful_public_client_registration(self):
- ''' Successfully register a public client '''
- uri = 'http://foo.example'
- self.register_client(
- u'OMGOMG', 'public', 'OMG!', uri)
-
- client = self.db.OAuthClient.query.filter(
- self.db.OAuthClient.name == u'OMGOMG').first()
-
- # redirect_uri should be set
- assert client.redirect_uri == uri
-
- # Client should have been registered
- assert client
-
- def test_3_successful_confidential_client_reg(self):
- ''' Register a confidential OAuth client '''
- response = self.register_client(
- u'GMOGMO', 'confidential', 'NO GMO!')
-
- assert response.status_int == 302
-
- client = self.db.OAuthClient.query.filter(
- self.db.OAuthClient.name == u'GMOGMO').first()
-
- # Client should have been registered
- assert client
-
- return client
-
- def test_4_authorize_confidential_client(self):
- ''' Authorize a confidential client as a logged in user '''
- client = self.test_3_successful_confidential_client_reg()
-
- client_identifier = client.identifier
-
- redirect_uri = 'https://foo.example'
- response = self.test_app.get('/oauth-2/authorize', {
- 'client_id': client.identifier,
- 'scope': 'all',
- 'redirect_uri': redirect_uri})
-
- # User-agent should NOT be redirected
- assert response.status_int == 200
-
- ctx = self.get_context('oauth/authorize.html')
-
- form = ctx['form']
-
- # Short for client authorization post reponse
- capr = self.test_app.post(
- '/oauth-2/client/authorize', {
- 'client_id': form.client_id.data,
- 'allow': 'Allow',
- 'next': form.next.data})
-
- assert capr.status_int == 302
-
- authorization_response = capr.follow()
-
- assert authorization_response.location.startswith(redirect_uri)
-
- return authorization_response, client_identifier
-
- def get_code_from_redirect_uri(self, uri):
- ''' Get the value of ?code= from an URI '''
- return parse_qs(urlparse(uri).query)['code'][0]
-
- def test_token_endpoint_successful_confidential_request(self):
- ''' Successful request against token endpoint '''
- code_redirect, client_id = self.test_4_authorize_confidential_client()
-
- code = self.get_code_from_redirect_uri(code_redirect.location)
-
- client = self.db.OAuthClient.query.filter(
- self.db.OAuthClient.identifier == unicode(client_id)).first()
-
- token_res = self.test_app.get('/oauth-2/access_token?client_id={0}&\
-code={1}&client_secret={2}'.format(client_id, code, client.secret))
-
- assert token_res.status_int == 200
-
- token_data = json.loads(token_res.body)
-
- assert not 'error' in token_data
- assert 'access_token' in token_data
- assert 'token_type' in token_data
- assert 'expires_in' in token_data
- assert type(token_data['expires_in']) == int
- assert token_data['expires_in'] > 0
-
- # There should be a refresh token provided in the token data
- assert len(token_data['refresh_token'])
-
- return client_id, token_data
-
- def test_token_endpont_missing_id_confidential_request(self):
- ''' Unsuccessful request against token endpoint, missing client_id '''
- code_redirect, client_id = self.test_4_authorize_confidential_client()
-
- code = self.get_code_from_redirect_uri(code_redirect.location)
-
- client = self.db.OAuthClient.query.filter(
- self.db.OAuthClient.identifier == unicode(client_id)).first()
-
- token_res = self.test_app.get('/oauth-2/access_token?\
-code={0}&client_secret={1}'.format(code, client.secret))
-
- assert token_res.status_int == 200
-
- token_data = json.loads(token_res.body)
-
- assert 'error' in token_data
- assert not 'access_token' in token_data
- assert token_data['error'] == 'invalid_request'
- assert len(token_data['error_description'])
-
- def test_refresh_token(self):
- ''' Try to get a new access token using the refresh token '''
- # Get an access token and a refresh token
- client_id, token_data =\
- self.test_token_endpoint_successful_confidential_request()
-
- client = self.db.OAuthClient.query.filter(
- self.db.OAuthClient.identifier == client_id).first()
-
- token_res = self.test_app.get('/oauth-2/access_token',
- {'refresh_token': token_data['refresh_token'],
- 'client_id': client_id,
- 'client_secret': client.secret
- })
-
- assert token_res.status_int == 200
-
- new_token_data = json.loads(token_res.body)
-
- assert not 'error' in new_token_data
- assert 'access_token' in new_token_data
- assert 'token_type' in new_token_data
- assert 'expires_in' in new_token_data
- assert type(new_token_data['expires_in']) == int
- assert new_token_data['expires_in'] > 0
diff --git a/mediagoblin/tests/test_openid.py b/mediagoblin/tests/test_openid.py
index 0424fdda..a3ab176a 100644
--- a/mediagoblin/tests/test_openid.py
+++ b/mediagoblin/tests/test_openid.py
@@ -14,10 +14,14 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import urlparse
import pkg_resources
import pytest
-import mock
+import six
+import six.moves.urllib.parse as urlparse
+try:
+ import mock
+except ImportError:
+ import unittest.mock as mock
openid_consumer = pytest.importorskip(
"openid.consumer.consumer")
@@ -206,7 +210,7 @@ class TestOpenIDPlugin(object):
# Make sure user is in the session
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
session = context['request'].session
- assert session['user_id'] == unicode(test_user.id)
+ assert session['user_id'] == six.text_type(test_user.id)
_test_new_user()
diff --git a/mediagoblin/tests/test_paste.ini b/mediagoblin/tests/test_paste.ini
index a9595432..8d75c3cb 100644
--- a/mediagoblin/tests/test_paste.ini
+++ b/mediagoblin/tests/test_paste.ini
@@ -1,40 +1,18 @@
[DEFAULT]
debug = true
-[composite:main]
-use = egg:Paste#urlmap
-/ = mediagoblin
-/mgoblin_media/ = publicstore_serve
-/test_static/ = mediagoblin_static
-/theme_static/ = theme_static
-/plugin_static/ = plugin_static
-
-[app:mediagoblin]
+[app:main]
use = egg:mediagoblin#app
config = %(here)s/mediagoblin.ini
-
-[app:publicstore_serve]
-use = egg:Paste#static
-document_root = %(here)s/user_dev/media/public
-
-[app:mediagoblin_static]
-use = egg:Paste#static
-document_root = %(here)s/mediagoblin/static/
-
-[app:theme_static]
-use = egg:Paste#static
-document_root = %(here)s/user_dev/theme_static/
-cache_max_age = 86400
-
-[app:plugin_static]
-use = egg:Paste#static
-document_root = %(here)s/user_dev/plugin_static/
-cache_max_age = 86400
+/mgoblin_media = %(here)s/user_dev/media/public
+/test_static = %(here)s/mediagoblin/static
+/theme_static = %(here)s/user_dev/theme_static
+/plugin_static = %(here)s/user_dev/plugin_static
[celery]
CELERY_ALWAYS_EAGER = true
[server:main]
-use = egg:Paste#http
+use = egg:gunicorn
host = 127.0.0.1
port = 6543
diff --git a/mediagoblin/tests/test_pdf.py b/mediagoblin/tests/test_pdf.py
index b4d1940a..7107dc9a 100644
--- a/mediagoblin/tests/test_pdf.py
+++ b/mediagoblin/tests/test_pdf.py
@@ -14,6 +14,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import collections
import tempfile
import shutil
import os
@@ -21,19 +22,25 @@ import pytest
from mediagoblin.media_types.pdf.processing import (
pdf_info, check_prerequisites, create_pdf_thumb)
-from .resources import GOOD_PDF as GOOD
+from .resources import GOOD_PDF
-@pytest.mark.skipif("not check_prerequisites()")
+@pytest.mark.skipif("not os.path.exists(GOOD_PDF) or not check_prerequisites()")
def test_pdf():
- good_dict = {'pdf_version_major': 1, 'pdf_title': '',
- 'pdf_page_size_width': 612, 'pdf_author': '',
- 'pdf_keywords': '', 'pdf_pages': 10,
- 'pdf_producer': 'dvips + GNU Ghostscript 7.05',
- 'pdf_version_minor': 3,
- 'pdf_creator': 'LaTeX with hyperref package',
- 'pdf_page_size_height': 792}
- assert pdf_info(GOOD) == good_dict
+ expected_dict = {'pdf_author': -1,
+ 'pdf_creator': -1,
+ 'pdf_keywords': -1,
+ 'pdf_page_size_height': -1,
+ 'pdf_page_size_width': -1,
+ 'pdf_pages': -1,
+ 'pdf_producer': -1,
+ 'pdf_title': -1,
+ 'pdf_version_major': 1,
+ 'pdf_version_minor': -1}
+ good_info = pdf_info(GOOD_PDF)
+ for k, v in expected_dict.items():
+ assert(k in good_info)
+ assert(v == -1 or v == good_info[k])
temp_dir = tempfile.mkdtemp()
- create_pdf_thumb(GOOD, os.path.join(temp_dir, 'good_256_256.png'), 256, 256)
+ create_pdf_thumb(GOOD_PDF, os.path.join(temp_dir, 'good_256_256.png'), 256, 256)
shutil.rmtree(temp_dir)
diff --git a/mediagoblin/tests/test_persona.py b/mediagoblin/tests/test_persona.py
index a1cd30eb..a8466b8a 100644
--- a/mediagoblin/tests/test_persona.py
+++ b/mediagoblin/tests/test_persona.py
@@ -13,10 +13,16 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import urlparse
+
import pkg_resources
import pytest
-import mock
+import six
+try:
+ import mock
+except ImportError:
+ import unittest.mock as mock
+
+import six.moves.urllib.parse as urlparse
pytest.importorskip("requests")
@@ -140,7 +146,7 @@ class TestPersonaPlugin(object):
# Make sure user is in the session
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
session = context['request'].session
- assert session['user_id'] == unicode(test_user.id)
+ assert session['user_id'] == six.text_type(test_user.id)
_test_registration()
diff --git a/mediagoblin/tests/test_piwigo.py b/mediagoblin/tests/test_piwigo.py
index 16ad0111..33aea580 100644
--- a/mediagoblin/tests/test_piwigo.py
+++ b/mediagoblin/tests/test_piwigo.py
@@ -44,28 +44,23 @@ class Test_PWG(object):
def test_session(self):
resp = self.do_post("pwg.session.login",
{"username": u"nouser", "password": "wrong"})
- assert resp.body == XML_PREFIX \
- + '<rsp stat="fail"><err code="999" msg="Invalid username/password"/></rsp>'
+ assert resp.body == (XML_PREFIX + '<rsp stat="fail"><err code="999" msg="Invalid username/password"/></rsp>').encode('ascii')
resp = self.do_post("pwg.session.login",
{"username": self.username, "password": "wrong"})
- assert resp.body == XML_PREFIX \
- + '<rsp stat="fail"><err code="999" msg="Invalid username/password"/></rsp>'
+ assert resp.body == (XML_PREFIX + '<rsp stat="fail"><err code="999" msg="Invalid username/password"/></rsp>').encode('ascii')
resp = self.do_get("pwg.session.getStatus")
- assert resp.body == XML_PREFIX \
- + '<rsp stat="ok"><username>guest</username></rsp>'
+ assert resp.body == (XML_PREFIX + '<rsp stat="ok"><username>guest</username></rsp>').encode('ascii')
resp = self.do_post("pwg.session.login",
{"username": self.username, "password": self.password})
- assert resp.body == XML_PREFIX + '<rsp stat="ok">1</rsp>'
+ assert resp.body == (XML_PREFIX + '<rsp stat="ok">1</rsp>').encode('ascii')
resp = self.do_get("pwg.session.getStatus")
- assert resp.body == XML_PREFIX \
- + '<rsp stat="ok"><username>chris</username></rsp>'
+ assert resp.body == (XML_PREFIX + '<rsp stat="ok"><username>chris</username></rsp>').encode('ascii')
self.do_get("pwg.session.logout")
resp = self.do_get("pwg.session.getStatus")
- assert resp.body == XML_PREFIX \
- + '<rsp stat="ok"><username>guest</username></rsp>'
+ assert resp.body == (XML_PREFIX + '<rsp stat="ok"><username>guest</username></rsp>').encode('ascii')
diff --git a/mediagoblin/tests/test_pluginapi.py b/mediagoblin/tests/test_pluginapi.py
index eae0ce15..2fd6df39 100644
--- a/mediagoblin/tests/test_pluginapi.py
+++ b/mediagoblin/tests/test_pluginapi.py
@@ -224,7 +224,7 @@ def test_hook_handle():
assert pluginapi.hook_handle(
"nothing_handling", call_log, unhandled_okay=True) is None
assert call_log == []
-
+
# Multiple provided, go with the first!
call_log = []
assert pluginapi.hook_handle(
@@ -348,7 +348,7 @@ def test_modify_context(context_modified_app):
"""
# Specific thing passed into a page
result = context_modified_app.get("/modify_context/specific/")
- assert result.body.strip() == """Specific page!
+ assert result.body.strip() == b"""Specific page!
specific thing: in yer specificpage
global thing: globally appended!
@@ -357,7 +357,7 @@ doubleme: happyhappy"""
# General test, should have global context variable only
result = context_modified_app.get("/modify_context/")
- assert result.body.strip() == """General page!
+ assert result.body.strip() == b"""General page!
global thing: globally appended!
lol: cats
@@ -421,7 +421,7 @@ def test_plugin_assetlink(static_plugin_app):
junk_file_path = os.path.join(
linked_assets_dir.rstrip(os.path.sep),
'junk.txt')
- with file(junk_file_path, 'w') as junk_file:
+ with open(junk_file_path, 'w') as junk_file:
junk_file.write('barf')
os.unlink(plugin_link_dir)
@@ -440,14 +440,14 @@ to:
# link dir exists, but is a non-symlink
os.unlink(plugin_link_dir)
- with file(plugin_link_dir, 'w') as clobber_file:
+ with open(plugin_link_dir, 'w') as clobber_file:
clobber_file.write('clobbered!')
result = run_assetlink().collection[0]
assert result == 'Could not link "staticstuff": %s exists and is not a symlink\n' % (
plugin_link_dir)
- with file(plugin_link_dir, 'r') as clobber_file:
+ with open(plugin_link_dir, 'r') as clobber_file:
assert clobber_file.read() == 'clobbered!'
@@ -456,11 +456,10 @@ def test_plugin_staticdirect(static_plugin_app):
Test that the staticdirect utilities pull up the right things
"""
result = json.loads(
- static_plugin_app.get('/staticstuff/').body)
+ static_plugin_app.get('/staticstuff/').body.decode())
assert len(result) == 2
assert result['mgoblin_bunny_pic'] == '/test_static/images/bunny_pic.png'
assert result['plugin_bunny_css'] == \
'/plugin_static/staticstuff/css/bunnify.css'
-
diff --git a/mediagoblin/tests/test_privileges.py b/mediagoblin/tests/test_privileges.py
index 05829b34..8ea3d754 100644
--- a/mediagoblin/tests/test_privileges.py
+++ b/mediagoblin/tests/test_privileges.py
@@ -14,6 +14,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import six
import pytest
from datetime import date, timedelta
from webtest import AppError
@@ -79,7 +80,7 @@ class TestPrivilegeFunctionality:
response = self.test_app.get('/')
assert response.status == "200 OK"
- assert "You are Banned" in response.body
+ assert b"You are Banned" in response.body
# Then test what happens when that ban has an expiration date which
# hasn't happened yet
#----------------------------------------------------------------------
@@ -92,7 +93,7 @@ class TestPrivilegeFunctionality:
response = self.test_app.get('/')
assert response.status == "200 OK"
- assert "You are Banned" in response.body
+ assert b"You are Banned" in response.body
# Then test what happens when that ban has an expiration date which
# has already happened
@@ -107,7 +108,7 @@ class TestPrivilegeFunctionality:
response = self.test_app.get('/')
assert response.status == "302 FOUND"
- assert not "You are Banned" in response.body
+ assert not b"You are Banned" in response.body
def testVariousPrivileges(self):
# The various actions that require privileges (ex. reporting,
@@ -127,14 +128,16 @@ class TestPrivilegeFunctionality:
#----------------------------------------------------------------------
with pytest.raises(AppError) as excinfo:
response = self.test_app.get('/submit/')
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
with pytest.raises(AppError) as excinfo:
response = self.do_post({'upload_files':[('file',GOOD_JPG)],
'title':u'Normal Upload 1'},
url='/submit/')
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
# Test that a user cannot comment without the commenter privilege
#----------------------------------------------------------------------
@@ -149,50 +152,58 @@ class TestPrivilegeFunctionality:
media_uri_slug = '/u/{0}/m/{1}/'.format(self.admin_user.username,
media_entry.slug)
response = self.test_app.get(media_uri_slug)
- assert not "Add a comment" in response.body
+ assert not b"Add a comment" in response.body
self.query_for_users()
with pytest.raises(AppError) as excinfo:
response = self.test_app.post(
media_uri_id + 'comment/add/',
{'comment_content': u'Test comment #42'})
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
# Test that a user cannot report without the reporter privilege
#----------------------------------------------------------------------
with pytest.raises(AppError) as excinfo:
response = self.test_app.get(media_uri_slug+"report/")
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
with pytest.raises(AppError) as excinfo:
response = self.do_post(
{'report_reason':u'Testing Reports #1',
'reporter_id':u'3'},
url=(media_uri_slug+"report/"))
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
# Test that a user cannot access the moderation pages w/o moderator
# or admin privileges
#----------------------------------------------------------------------
with pytest.raises(AppError) as excinfo:
response = self.test_app.get("/mod/users/")
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
with pytest.raises(AppError) as excinfo:
response = self.test_app.get("/mod/reports/")
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
with pytest.raises(AppError) as excinfo:
response = self.test_app.get("/mod/media/")
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
with pytest.raises(AppError) as excinfo:
response = self.test_app.get("/mod/users/1/")
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
with pytest.raises(AppError) as excinfo:
response = self.test_app.get("/mod/reports/1/")
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
self.query_for_users()
@@ -202,4 +213,5 @@ class TestPrivilegeFunctionality:
'targeted_user':self.admin_user.id},
url='/mod/reports/1/')
self.query_for_users()
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
diff --git a/mediagoblin/tests/test_reporting.py b/mediagoblin/tests/test_reporting.py
index a154a061..6a9fe205 100644
--- a/mediagoblin/tests/test_reporting.py
+++ b/mediagoblin/tests/test_reporting.py
@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytest
+import six
from mediagoblin.tools import template
from mediagoblin.tests.tools import (fixture_add_user, fixture_media_entry,
@@ -75,7 +76,7 @@ class TestReportFiling:
response, context = self.do_post(
{'report_reason':u'Testing Media Report',
- 'reporter_id':unicode(allie_id)},url= media_uri_slug + "report/")
+ 'reporter_id':six.text_type(allie_id)},url= media_uri_slug + "report/")
assert response.status == "302 FOUND"
@@ -110,7 +111,7 @@ class TestReportFiling:
response, context = self.do_post({
'report_reason':u'Testing Comment Report',
- 'reporter_id':unicode(allie_id)},url= comment_uri_slug + "report/")
+ 'reporter_id':six.text_type(allie_id)},url= comment_uri_slug + "report/")
assert response.status == "302 FOUND"
diff --git a/mediagoblin/tests/test_sql_migrations.py b/mediagoblin/tests/test_sql_migrations.py
index 3d67fdf6..7e0569ad 100644
--- a/mediagoblin/tests/test_sql_migrations.py
+++ b/mediagoblin/tests/test_sql_migrations.py
@@ -14,6 +14,11 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import six
+import pytest
+
+pytestmark = pytest.mark.skipif(six.PY3, reason='needs sqlalchemy.migrate')
+
import copy
from sqlalchemy import (
@@ -23,7 +28,8 @@ from sqlalchemy import (
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import select, insert
-from migrate import changeset
+if six.PY2:
+ from migrate import changeset
from mediagoblin.db.base import GMGTableBase
from mediagoblin.db.migration_tools import MigrationManager, RegisterMigration
@@ -190,7 +196,7 @@ def level_exits_new_table(db_conn):
for level in result:
- for exit_name, to_level in level['exits'].iteritems():
+ for exit_name, to_level in six.iteritems(level['exits']):
# Insert the level exit
db_conn.execute(
level_exits.insert().values(
diff --git a/mediagoblin/tests/test_storage.py b/mediagoblin/tests/test_storage.py
index f6f1d18f..5cb1672b 100644
--- a/mediagoblin/tests/test_storage.py
+++ b/mediagoblin/tests/test_storage.py
@@ -19,6 +19,8 @@ import os
import tempfile
import pytest
+import six
+
from werkzeug.utils import secure_filename
from mediagoblin import storage
@@ -45,7 +47,7 @@ def test_clean_listy_filepath():
storage.clean_listy_filepath(['../../', 'linooks.jpg'])
-class FakeStorageSystem():
+class FakeStorageSystem(object):
def __init__(self, foobie, blech, **kwargs):
self.foobie = foobie
self.blech = blech
@@ -78,8 +80,8 @@ def test_storage_system_from_config():
'mediagoblin.tests.test_storage:FakeStorageSystem'})
assert this_storage.foobie == 'eiboof'
assert this_storage.blech == 'hcelb'
- assert unicode(this_storage.__class__) == \
- u'mediagoblin.tests.test_storage.FakeStorageSystem'
+ assert six.text_type(this_storage.__class__) == \
+ u"<class 'mediagoblin.tests.test_storage.FakeStorageSystem'>"
##########################
@@ -172,7 +174,7 @@ def test_basic_storage_get_file():
with this_storage.get_file(filepath, 'r') as our_file:
assert our_file.read() == 'First file'
assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
- with file(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file:
+ with open(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file:
assert our_file.read() == 'First file'
# Write to the same path but try to get a unique file.
@@ -184,13 +186,13 @@ def test_basic_storage_get_file():
with this_storage.get_file(new_filepath, 'r') as our_file:
assert our_file.read() == 'Second file'
assert os.path.exists(os.path.join(tmpdir, *new_filepath))
- with file(os.path.join(tmpdir, *new_filepath), 'r') as our_file:
+ with open(os.path.join(tmpdir, *new_filepath), 'r') as our_file:
assert our_file.read() == 'Second file'
# Read from an existing file
manually_written_file = os.makedirs(
os.path.join(tmpdir, 'testydir'))
- with file(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile:
+ with open(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile:
testyfile.write('testy file! so testy.')
with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile:
@@ -286,7 +288,7 @@ def test_basic_storage_copy_locally():
this_storage.copy_locally(filepath, new_file_dest)
this_storage.delete_file(filepath)
- assert file(new_file_dest).read() == 'Testing this file'
+ assert open(new_file_dest).read() == 'Testing this file'
os.remove(new_file_dest)
os.rmdir(dest_tmpdir)
@@ -295,7 +297,7 @@ def test_basic_storage_copy_locally():
def _test_copy_local_to_storage_works(tmpdir, this_storage):
local_filename = tempfile.mktemp()
- with file(local_filename, 'w') as tmpfile:
+ with open(local_filename, 'w') as tmpfile:
tmpfile.write('haha')
this_storage.copy_local_to_storage(
@@ -303,7 +305,7 @@ def _test_copy_local_to_storage_works(tmpdir, this_storage):
os.remove(local_filename)
- assert file(
+ assert open(
os.path.join(tmpdir, 'dir1/dir2/copiedto.txt'),
'r').read() == 'haha'
diff --git a/mediagoblin/tests/test_submission.py b/mediagoblin/tests/test_submission.py
index b5b13ed3..65c4b3a3 100644
--- a/mediagoblin/tests/test_submission.py
+++ b/mediagoblin/tests/test_submission.py
@@ -14,15 +14,26 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import sys
-reload(sys)
-sys.setdefaultencoding('utf-8')
+import six
+
+if six.PY2: # this hack only work in Python 2
+ import sys
+ reload(sys)
+ sys.setdefaultencoding('utf-8')
-import urlparse
import os
import pytest
+import six.moves.urllib.parse as urlparse
+
+# this gst initialization stuff is really required here
+import gi
+gi.require_version('Gst', '1.0')
+from gi.repository import Gst
+Gst.init(None)
+
from mediagoblin.tests.tools import fixture_add_user
+from .media_tools import create_av
from mediagoblin import mg_globals
from mediagoblin.db.models import MediaEntry, User
from mediagoblin.db.base import Session
@@ -34,7 +45,7 @@ from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \
BIG_BLUE, GOOD_PDF, GPS_JPG, MED_PNG, BIG_PNG
GOOD_TAG_STRING = u'yin,yang'
-BAD_TAG_STRING = unicode('rage,' + 'f' * 26 + 'u' * 26)
+BAD_TAG_STRING = six.text_type('rage,' + 'f' * 26 + 'u' * 26)
FORM_CONTEXT = ['mediagoblin/submit/start.html', 'submit_form']
REQUEST_CONTEXT = ['mediagoblin/user_pages/user.html', 'request']
@@ -145,7 +156,7 @@ class TestSubmission:
def test_normal_png(self):
self.check_normal_upload(u'Normal upload 2', GOOD_PNG)
- @pytest.mark.skipif("not pdf_check_prerequisites()")
+ @pytest.mark.skipif("not os.path.exists(GOOD_PDF) or not pdf_check_prerequisites()")
def test_normal_pdf(self):
response, context = self.do_post({'title': u'Normal upload 3 (pdf)'},
do_follow=True,
@@ -359,7 +370,19 @@ class TestSubmission:
def test_media_data(self):
self.check_normal_upload(u"With GPS data", GPS_JPG)
media = self.check_media(None, {"title": u"With GPS data"}, 1)
- assert media.media_data.gps_latitude == 59.336666666666666
+ assert media.get_location.position["latitude"] == 59.336666666666666
+
+ def test_audio(self):
+ with create_av(make_audio=True) as path:
+ self.check_normal_upload('Audio', path)
+
+ def test_video(self):
+ with create_av(make_video=True) as path:
+ self.check_normal_upload('Video', path)
+
+ def test_audio_and_video(self):
+ with create_av(make_audio=True, make_video=True) as path:
+ self.check_normal_upload('Audio and Video', path)
def test_processing(self):
public_store_dir = mg_globals.global_config[
diff --git a/mediagoblin/tests/test_submission/good.pdf b/mediagoblin/tests/test_submission/good.pdf
index ab5db006..d7029f36 100644..120000
--- a/mediagoblin/tests/test_submission/good.pdf
+++ b/mediagoblin/tests/test_submission/good.pdf
Binary files differ
diff --git a/mediagoblin/tests/test_timesince.py b/mediagoblin/tests/test_timesince.py
index 6579eb09..99ae31da 100644
--- a/mediagoblin/tests/test_timesince.py
+++ b/mediagoblin/tests/test_timesince.py
@@ -16,7 +16,7 @@
from datetime import datetime, timedelta
-from mediagoblin.tools.timesince import is_aware, timesince
+from mediagoblin.tools.timesince import timesince
def test_timesince():
diff --git a/mediagoblin/tests/test_util.py b/mediagoblin/tests/test_util.py
index 36563e75..8193233f 100644
--- a/mediagoblin/tests/test_util.py
+++ b/mediagoblin/tests/test_util.py
@@ -14,12 +14,17 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import mock
+try:
+ import mock
+except ImportError:
+ import unittest.mock as mock
import email
import pytest
import smtplib
import pkg_resources
+import six
+
from mediagoblin.tests.tools import get_app
from mediagoblin.tools import common, url, translate, mail, text, testing
@@ -57,7 +62,7 @@ I hope you like unit tests JUST AS MUCH AS I DO!""")
assert message['From'] == "sender@mediagoblin.example.org"
assert message['To'] == "amanda@example.org, akila@example.org"
assert message['Subject'] == "Testing is so much fun!"
- assert message.get_payload(decode=True) == """HAYYY GUYS!
+ assert message.get_payload(decode=True) == b"""HAYYY GUYS!
I hope you like unit tests JUST AS MUCH AS I DO!"""
@@ -70,7 +75,7 @@ I hope you like unit tests JUST AS MUCH AS I DO!"""
assert mbox_message['From'] == "sender@mediagoblin.example.org"
assert mbox_message['To'] == "amanda@example.org, akila@example.org"
assert mbox_message['Subject'] == "Testing is so much fun!"
- assert mbox_message.get_payload(decode=True) == """HAYYY GUYS!
+ assert mbox_message.get_payload(decode=True) == b"""HAYYY GUYS!
I hope you like unit tests JUST AS MUCH AS I DO!"""
@@ -144,13 +149,13 @@ def test_gettext_lazy_proxy():
orig = u"Password"
set_thread_locale("es")
- p1 = unicode(proxy)
+ p1 = six.text_type(proxy)
p1_should = pass_to_ugettext(orig)
assert p1_should != orig, "Test useless, string not translated"
assert p1 == p1_should
set_thread_locale("sv")
- p2 = unicode(proxy)
+ p2 = six.text_type(proxy)
p2_should = pass_to_ugettext(orig)
assert p2_should != orig, "Test broken, string not translated"
assert p2 == p2_should
diff --git a/mediagoblin/tests/test_video.py b/mediagoblin/tests/test_video.py
new file mode 100644
index 00000000..79244515
--- /dev/null
+++ b/mediagoblin/tests/test_video.py
@@ -0,0 +1,132 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import tempfile
+import os
+from contextlib import contextmanager
+import imghdr
+
+#os.environ['GST_DEBUG'] = '4,python:4'
+import pytest
+pytest.importorskip("gi.repository.Gst")
+
+import gi
+gi.require_version('Gst', '1.0')
+from gi.repository import Gst
+Gst.init(None)
+
+from mediagoblin.media_types.video.transcoders import (capture_thumb,
+ VideoTranscoder)
+from mediagoblin.media_types.tools import discover
+
+@contextmanager
+def create_data(suffix=None, make_audio=False):
+ video = tempfile.NamedTemporaryFile()
+ src = Gst.ElementFactory.make('videotestsrc', None)
+ src.set_property('num-buffers', 10)
+ videorate = Gst.ElementFactory.make('videorate', None)
+ enc = Gst.ElementFactory.make('theoraenc', None)
+ mux = Gst.ElementFactory.make('oggmux', None)
+ dst = Gst.ElementFactory.make('filesink', None)
+ dst.set_property('location', video.name)
+ pipeline = Gst.Pipeline()
+ pipeline.add(src)
+ pipeline.add(videorate)
+ pipeline.add(enc)
+ pipeline.add(mux)
+ pipeline.add(dst)
+ src.link(videorate)
+ videorate.link(enc)
+ enc.link(mux)
+ mux.link(dst)
+ if make_audio:
+ audio_src = Gst.ElementFactory.make('audiotestsrc', None)
+ audio_src.set_property('num-buffers', 10)
+ audiorate = Gst.ElementFactory.make('audiorate', None)
+ audio_enc = Gst.ElementFactory.make('vorbisenc', None)
+ pipeline.add(audio_src)
+ pipeline.add(audio_enc)
+ pipeline.add(audiorate)
+ audio_src.link(audiorate)
+ audiorate.link(audio_enc)
+ audio_enc.link(mux)
+ pipeline.set_state(Gst.State.PLAYING)
+ state = pipeline.get_state(3 * Gst.SECOND)
+ assert state[0] == Gst.StateChangeReturn.SUCCESS
+ bus = pipeline.get_bus()
+ message = bus.timed_pop_filtered(
+ 3 * Gst.SECOND,
+ Gst.MessageType.ERROR | Gst.MessageType.EOS)
+ pipeline.set_state(Gst.State.NULL)
+ if suffix:
+ result = tempfile.NamedTemporaryFile(suffix=suffix)
+ else:
+ result = tempfile.NamedTemporaryFile()
+ yield (video.name, result.name)
+
+
+#TODO: this should be skipped if video plugin is not enabled
+def test_thumbnails():
+ '''
+ Test thumbnails generation.
+ 1. Create a video (+audio) from gst's videotestsrc
+ 2. Capture thumbnail
+ 3. Everything should get removed because of temp files usage
+ '''
+ #data create_data() as (video_name, thumbnail_name):
+ test_formats = [('.png', 'png'), ('.jpg', 'jpeg'), ('.gif', 'gif')]
+ for suffix, format in test_formats:
+ with create_data(suffix) as (video_name, thumbnail_name):
+ capture_thumb(video_name, thumbnail_name, width=40)
+ # check result file format
+ assert imghdr.what(thumbnail_name) == format
+ # TODO: check height and width
+ # FIXME: it doesn't work with small width, say, 10px. This should be
+ # fixed somehow
+ suffix, format = test_formats[0]
+ with create_data(suffix, True) as (video_name, thumbnail_name):
+ capture_thumb(video_name, thumbnail_name, width=40)
+ assert imghdr.what(thumbnail_name) == format
+ with create_data(suffix, True) as (video_name, thumbnail_name):
+ capture_thumb(video_name, thumbnail_name, width=10) # smaller width
+ assert imghdr.what(thumbnail_name) == format
+ with create_data(suffix, True) as (video_name, thumbnail_name):
+ capture_thumb(video_name, thumbnail_name, width=100) # bigger width
+ assert imghdr.what(thumbnail_name) == format
+
+
+def test_transcoder():
+ # test without audio
+ with create_data() as (video_name, result_name):
+ transcoder = VideoTranscoder()
+ transcoder.transcode(
+ video_name, result_name,
+ vp8_quality=8,
+ vp8_threads=0, # autodetect
+ vorbis_quality=0.3,
+ dimensions=(640, 640))
+ assert len(discover(result_name).get_video_streams()) == 1
+ # test with audio
+ with create_data(make_audio=True) as (video_name, result_name):
+ transcoder = VideoTranscoder()
+ transcoder.transcode(
+ video_name, result_name,
+ vp8_quality=8,
+ vp8_threads=0, # autodetect
+ vorbis_quality=0.3,
+ dimensions=(640, 640))
+ assert len(discover(result_name).get_video_streams()) == 1
+ assert len(discover(result_name).get_audio_streams()) == 1
diff --git a/mediagoblin/tests/test_workbench.py b/mediagoblin/tests/test_workbench.py
index 6695618b..f3ff57ed 100644
--- a/mediagoblin/tests/test_workbench.py
+++ b/mediagoblin/tests/test_workbench.py
@@ -50,7 +50,7 @@ class TestWorkbench(object):
# kill a workbench
this_workbench = self.workbench_manager.create()
tmpfile_name = this_workbench.joinpath('temp.txt')
- tmpfile = file(tmpfile_name, 'w')
+ tmpfile = open(tmpfile_name, 'w')
with tmpfile:
tmpfile.write('lollerskates')
diff --git a/mediagoblin/tests/tools.py b/mediagoblin/tests/tools.py
index 34392bf1..dec95e83 100644
--- a/mediagoblin/tests/tools.py
+++ b/mediagoblin/tests/tools.py
@@ -19,6 +19,7 @@ import os
import pkg_resources
import shutil
+import six
from paste.deploy import loadapp
from webtest import TestApp
@@ -26,7 +27,7 @@ from webtest import TestApp
from mediagoblin import mg_globals
from mediagoblin.db.models import User, MediaEntry, Collection, MediaComment, \
CommentSubscription, CommentNotification, Privilege, CommentReport, Client, \
- RequestToken, AccessToken
+ RequestToken, AccessToken, Activity, Generator
from mediagoblin.tools import testing
from mediagoblin.init.config import read_mediagoblin_config
from mediagoblin.db.base import Session
@@ -144,7 +145,7 @@ def install_fixtures_simple(db, fixtures):
"""
Very simply install fixtures in the database
"""
- for collection_name, collection_fixtures in fixtures.iteritems():
+ for collection_name, collection_fixtures in six.iteritems(fixtures):
collection = db[collection_name]
for fixture in collection_fixtures:
collection.insert(fixture)
@@ -164,7 +165,7 @@ def assert_db_meets_expected(db, expected):
{'id': 'foo',
'some_field': 'some_value'},]}
"""
- for collection_name, collection_data in expected.iteritems():
+ for collection_name, collection_data in six.iteritems(expected):
collection = db[collection_name]
for expected_document in collection_data:
document = collection.query.filter_by(id=expected_document['id']).first()
@@ -345,3 +346,28 @@ def fixture_add_comment_report(comment=None, reported_user=None,
Session.expunge(comment_report)
return comment_report
+
+def fixture_add_activity(obj, verb="post", target=None, generator=None, actor=None):
+ if generator is None:
+ generator = Generator(
+ name="GNU MediaGoblin",
+ object_type="service"
+ )
+ generator.save()
+
+ if actor is None:
+ actor = fixture_add_user()
+
+ activity = Activity(
+ verb=verb,
+ actor=actor.id,
+ generator=generator.id,
+ )
+
+ activity.set_object(obj)
+
+ if target is not None:
+ activity.set_target(target)
+
+ activity.save()
+ return activity \ No newline at end of file