aboutsummaryrefslogtreecommitdiffstats
path: root/mediagoblin/tests
diff options
context:
space:
mode:
Diffstat (limited to 'mediagoblin/tests')
-rw-r--r--mediagoblin/tests/__init__.py4
-rw-r--r--mediagoblin/tests/media_tools.py61
-rw-r--r--mediagoblin/tests/starttls_config.ini4
-rw-r--r--mediagoblin/tests/test_api.py678
-rw-r--r--mediagoblin/tests/test_audio.py105
-rw-r--r--mediagoblin/tests/test_auth.py119
-rw-r--r--mediagoblin/tests/test_basic_auth.py9
-rw-r--r--mediagoblin/tests/test_celery_setup.py3
-rw-r--r--mediagoblin/tests/test_csrf_middleware.py4
-rw-r--r--mediagoblin/tests/test_edit.py40
-rw-r--r--mediagoblin/tests/test_exif.py28
-rw-r--r--mediagoblin/tests/test_http_callback.py83
-rw-r--r--mediagoblin/tests/test_ldap.py18
-rw-r--r--mediagoblin/tests/test_legacy_api.py94
-rw-r--r--mediagoblin/tests/test_metadata.py4
-rw-r--r--mediagoblin/tests/test_mgoblin_app.ini6
-rw-r--r--mediagoblin/tests/test_mgoblin_app_audio.ini20
-rw-r--r--mediagoblin/tests/test_mgoblin_app_audio_video.ini21
-rw-r--r--mediagoblin/tests/test_mgoblin_app_pdf.ini20
-rw-r--r--mediagoblin/tests/test_mgoblin_app_video.ini20
-rw-r--r--mediagoblin/tests/test_misc.py96
-rw-r--r--mediagoblin/tests/test_modelmethods.py42
-rw-r--r--mediagoblin/tests/test_moderation.py53
-rw-r--r--mediagoblin/tests/test_notifications.py38
-rw-r--r--mediagoblin/tests/test_oauth1.py24
-rw-r--r--mediagoblin/tests/test_oauth2.py223
-rw-r--r--mediagoblin/tests/test_openid.py17
-rw-r--r--mediagoblin/tests/test_paste.ini34
-rw-r--r--mediagoblin/tests/test_pdf.py29
-rw-r--r--mediagoblin/tests/test_persona.py24
-rw-r--r--mediagoblin/tests/test_piwigo.py17
-rw-r--r--mediagoblin/tests/test_pluginapi.py15
-rw-r--r--mediagoblin/tests/test_privileges.py50
-rw-r--r--mediagoblin/tests/test_reporting.py42
-rw-r--r--mediagoblin/tests/test_response.py65
-rw-r--r--mediagoblin/tests/test_sql_migrations.py46
-rw-r--r--mediagoblin/tests/test_storage.py34
-rw-r--r--mediagoblin/tests/test_submission.py271
l---------[-rw-r--r--]mediagoblin/tests/test_submission/good.pdfbin194007 -> 44 bytes
-rw-r--r--mediagoblin/tests/test_tags.py4
-rw-r--r--mediagoblin/tests/test_timesince.py2
-rw-r--r--mediagoblin/tests/test_tools.py118
-rw-r--r--mediagoblin/tests/test_util.py69
-rw-r--r--mediagoblin/tests/test_video.py132
-rw-r--r--mediagoblin/tests/test_workbench.py6
-rw-r--r--mediagoblin/tests/tools.py113
46 files changed, 2179 insertions, 726 deletions
diff --git a/mediagoblin/tests/__init__.py b/mediagoblin/tests/__init__.py
index fbf3fc6c..651dac24 100644
--- a/mediagoblin/tests/__init__.py
+++ b/mediagoblin/tests/__init__.py
@@ -16,7 +16,7 @@
import pytest
-from mediagoblin.db.models import User
+from mediagoblin.db.models import User, LocalUser
from mediagoblin.tests.tools import fixture_add_user
from mediagoblin.tools import template
@@ -44,7 +44,7 @@ class MGClientTestCase:
fixture_add_user(username, **options)
def user(self, username):
- return User.query.filter(User.username == username).first()
+ return LocalUser.query.filter(LocalUser.username==username).first()
def _do_request(self, url, *context_keys, **kwargs):
template.clear_test_template_context()
diff --git a/mediagoblin/tests/media_tools.py b/mediagoblin/tests/media_tools.py
new file mode 100644
index 00000000..8d58c024
--- /dev/null
+++ b/mediagoblin/tests/media_tools.py
@@ -0,0 +1,61 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from contextlib import contextmanager
+import tempfile
+
+import gi
+gi.require_version('Gst', '1.0')
+from gi.repository import Gst
+Gst.init(None)
+
+@contextmanager
+def create_av(make_video=False, make_audio=False):
+ 'creates audio/video in `path`, throws AssertionError on any error'
+ media = tempfile.NamedTemporaryFile(suffix='.ogg')
+ pipeline = Gst.Pipeline()
+ mux = Gst.ElementFactory.make('oggmux', 'mux')
+ pipeline.add(mux)
+ if make_video:
+ video_src = Gst.ElementFactory.make('videotestsrc', 'video_src')
+ video_src.set_property('num-buffers', 20)
+ video_enc = Gst.ElementFactory.make('theoraenc', 'video_enc')
+ pipeline.add(video_src)
+ pipeline.add(video_enc)
+ assert video_src.link(video_enc)
+ assert video_enc.link(mux)
+ if make_audio:
+ audio_src = Gst.ElementFactory.make('audiotestsrc', 'audio_src')
+ audio_src.set_property('num-buffers', 20)
+ audio_enc = Gst.ElementFactory.make('vorbisenc', 'audio_enc')
+ pipeline.add(audio_src)
+ pipeline.add(audio_enc)
+ assert audio_src.link(audio_enc)
+ assert audio_enc.link(mux)
+ sink = Gst.ElementFactory.make('filesink', 'sink')
+ sink.set_property('location', media.name)
+ pipeline.add(sink)
+ mux.link(sink)
+ pipeline.set_state(Gst.State.PLAYING)
+ state = pipeline.get_state(Gst.SECOND)
+ assert state[0] == Gst.StateChangeReturn.SUCCESS
+ bus = pipeline.get_bus()
+ message = bus.timed_pop_filtered(
+ Gst.SECOND, # one second should be more than enough for 50-buf vid
+ Gst.MessageType.ERROR | Gst.MessageType.EOS)
+ assert message.type == Gst.MessageType.EOS
+ pipeline.set_state(Gst.State.NULL)
+ yield media.name
diff --git a/mediagoblin/tests/starttls_config.ini b/mediagoblin/tests/starttls_config.ini
new file mode 100644
index 00000000..1e290202
--- /dev/null
+++ b/mediagoblin/tests/starttls_config.ini
@@ -0,0 +1,4 @@
+[mediagoblin]
+email_debug_mode = false
+email_smtp_force_starttls = true
+email_smtp_host = someplace.com
diff --git a/mediagoblin/tests/test_api.py b/mediagoblin/tests/test_api.py
index 4e0cbd8f..90873cb9 100644
--- a/mediagoblin/tests/test_api.py
+++ b/mediagoblin/tests/test_api.py
@@ -13,81 +13,655 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import json
-
-import logging
-import base64
-
+try:
+ import mock
+except ImportError:
+ import unittest.mock as mock
import pytest
+from webtest import AppError
+
+from .resources import GOOD_JPG
from mediagoblin import mg_globals
-from mediagoblin.tools import template, pluginapi
+from mediagoblin.db.models import User, Activity, MediaEntry, TextComment
+from mediagoblin.tools.routing import extract_url_arguments
from mediagoblin.tests.tools import fixture_add_user
-from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \
- BIG_BLUE
+from mediagoblin.moderation.tools import take_away_privileges
+
+class TestAPI(object):
+ """ Test mediagoblin's pump.io complient APIs """
+ @pytest.fixture(autouse=True)
+ def setup(self, test_app):
+ self.test_app = test_app
+ self.db = mg_globals.database
-_log = logging.getLogger(__name__)
+ self.user = fixture_add_user(privileges=[u'active', u'uploader', u'commenter'])
+ self.other_user = fixture_add_user(
+ username="otheruser",
+ privileges=[u'active', u'uploader', u'commenter']
+ )
+ self.active_user = self.user
+ def _activity_to_feed(self, test_app, activity, headers=None):
+ """ Posts an activity to the user's feed """
+ if headers:
+ headers.setdefault("Content-Type", "application/json")
+ else:
+ headers = {"Content-Type": "application/json"}
-class TestAPI(object):
- def setup(self):
- self.db = mg_globals.database
+ with self.mock_oauth():
+ response = test_app.post(
+ "/api/user/{0}/feed".format(self.active_user.username),
+ json.dumps(activity),
+ headers=headers
+ )
+
+ return response, json.loads(response.body.decode())
+
+ def _upload_image(self, test_app, image):
+ """ Uploads and image to MediaGoblin via pump.io API """
+ data = open(image, "rb").read()
+ headers = {
+ "Content-Type": "image/jpeg",
+ "Content-Length": str(len(data))
+ }
+
+
+ with self.mock_oauth():
+ response = test_app.post(
+ "/api/user/{0}/uploads".format(self.active_user.username),
+ data,
+ headers=headers
+ )
+ image = json.loads(response.body.decode())
+
+ return response, image
+
+ def _post_image_to_feed(self, test_app, image):
+ """ Posts an already uploaded image to feed """
+ activity = {
+ "verb": "post",
+ "object": image,
+ }
+
+ return self._activity_to_feed(test_app, activity)
+
+ def mocked_oauth_required(self, *args, **kwargs):
+ """ Mocks mediagoblin.decorator.oauth_required to always validate """
+
+ def fake_controller(controller, request, *args, **kwargs):
+ request.user = User.query.filter_by(id=self.active_user.id).first()
+ return controller(request, *args, **kwargs)
+
+ def oauth_required(c):
+ return lambda *args, **kwargs: fake_controller(c, *args, **kwargs)
+
+ return oauth_required
+
+ def mock_oauth(self):
+ """ Returns a mock.patch for the oauth_required decorator """
+ return mock.patch(
+ target="mediagoblin.decorators.oauth_required",
+ new_callable=self.mocked_oauth_required
+ )
+
+ def test_can_post_image(self, test_app):
+ """ Tests that an image can be posted to the API """
+ # First request we need to do is to upload the image
+ response, image = self._upload_image(test_app, GOOD_JPG)
+
+ # I should have got certain things back
+ assert response.status_code == 200
+
+ assert "id" in image
+ assert "fullImage" in image
+ assert "url" in image["fullImage"]
+ assert "url" in image
+ assert "author" in image
+ assert "published" in image
+ assert "updated" in image
+ assert image["objectType"] == "image"
+
+ # Check that we got the response we're expecting
+ response, _ = self._post_image_to_feed(test_app, image)
+ assert response.status_code == 200
+
+ def test_unable_to_upload_as_someone_else(self, test_app):
+ """ Test that can't upload as someoen else """
+ data = open(GOOD_JPG, "rb").read()
+ headers = {
+ "Content-Type": "image/jpeg",
+ "Content-Length": str(len(data))
+ }
+
+ with self.mock_oauth():
+ # Will be self.user trying to upload as self.other_user
+ with pytest.raises(AppError) as excinfo:
+ test_app.post(
+ "/api/user/{0}/uploads".format(self.other_user.username),
+ data,
+ headers=headers
+ )
+
+ assert "403 FORBIDDEN" in excinfo.value.args[0]
+
+ def test_unable_to_post_feed_as_someone_else(self, test_app):
+ """ Tests that can't post an image to someone else's feed """
+ response, data = self._upload_image(test_app, GOOD_JPG)
+
+ activity = {
+ "verb": "post",
+ "object": data
+ }
+
+ headers = {
+ "Content-Type": "application/json",
+ }
+
+ with self.mock_oauth():
+ with pytest.raises(AppError) as excinfo:
+ test_app.post(
+ "/api/user/{0}/feed".format(self.other_user.username),
+ json.dumps(activity),
+ headers=headers
+ )
+
+ assert "403 FORBIDDEN" in excinfo.value.args[0]
+
+ def test_only_able_to_update_own_image(self, test_app):
+ """ Test's that the uploader is the only person who can update an image """
+ response, data = self._upload_image(test_app, GOOD_JPG)
+ response, data = self._post_image_to_feed(test_app, data)
+
+ activity = {
+ "verb": "update",
+ "object": data["object"],
+ }
+
+ headers = {
+ "Content-Type": "application/json",
+ }
+
+ # Lets change the image uploader to be self.other_user, this is easier
+ # than uploading the image as someone else as the way self.mocked_oauth_required
+ # and self._upload_image.
+ media = MediaEntry.query.filter_by(public_id=data["object"]["id"]).first()
+ media.actor = self.other_user.id
+ media.save()
+
+ # Now lets try and edit the image as self.user, this should produce a 403 error.
+ with self.mock_oauth():
+ with pytest.raises(AppError) as excinfo:
+ test_app.post(
+ "/api/user/{0}/feed".format(self.user.username),
+ json.dumps(activity),
+ headers=headers
+ )
+
+ assert "403 FORBIDDEN" in excinfo.value.args[0]
+
+ def test_upload_image_with_filename(self, test_app):
+ """ Tests that you can upload an image with filename and description """
+ response, data = self._upload_image(test_app, GOOD_JPG)
+ response, data = self._post_image_to_feed(test_app, data)
+
+ image = data["object"]
+
+ # Now we need to add a title and description
+ title = "My image ^_^"
+ description = "This is my super awesome image :D"
+ license = "CC-BY-SA"
+
+ image["displayName"] = title
+ image["content"] = description
+ image["license"] = license
+
+ activity = {"verb": "update", "object": image}
+
+ with self.mock_oauth():
+ response = test_app.post(
+ "/api/user/{0}/feed".format(self.user.username),
+ json.dumps(activity),
+ headers={"Content-Type": "application/json"}
+ )
+
+ image = json.loads(response.body.decode())["object"]
+
+ # Check everything has been set on the media correctly
+ media = MediaEntry.query.filter_by(public_id=image["id"]).first()
+ assert media.title == title
+ assert media.description == description
+ assert media.license == license
+
+ # Check we're being given back everything we should on an update
+ assert image["id"] == media.public_id
+ assert image["displayName"] == title
+ assert image["content"] == description
+ assert image["license"] == license
+
+
+ def test_only_uploaders_post_image(self, test_app):
+ """ Test that only uploaders can upload images """
+ # Remove uploader permissions from user
+ take_away_privileges(self.user.username, u"uploader")
+
+ # Now try and upload a image
+ data = open(GOOD_JPG, "rb").read()
+ headers = {
+ "Content-Type": "image/jpeg",
+ "Content-Length": str(len(data)),
+ }
+
+ with self.mock_oauth():
+ with pytest.raises(AppError) as excinfo:
+ test_app.post(
+ "/api/user/{0}/uploads".format(self.user.username),
+ data,
+ headers=headers
+ )
+
+ # Assert that we've got a 403
+ assert "403 FORBIDDEN" in excinfo.value.args[0]
+
+ def test_object_endpoint(self, test_app):
+ """ Tests that object can be looked up at endpoint """
+ # Post an image
+ response, data = self._upload_image(test_app, GOOD_JPG)
+ response, data = self._post_image_to_feed(test_app, data)
+
+ # Now lookup image to check that endpoint works.
+ image = data["object"]
+
+ assert "links" in image
+ assert "self" in image["links"]
+
+ # Get URI and strip testing host off
+ object_uri = image["links"]["self"]["href"]
+ object_uri = object_uri.replace("http://localhost:80", "")
+
+ with self.mock_oauth():
+ request = test_app.get(object_uri)
+
+ image = json.loads(request.body.decode())
+ entry = MediaEntry.query.filter_by(public_id=image["id"]).first()
+
+ assert request.status_code == 200
+
+ assert "image" in image
+ assert "fullImage" in image
+ assert "pump_io" in image
+ assert "links" in image
+
+ def test_post_comment(self, test_app):
+ """ Tests that I can post an comment media """
+ # Upload some media to comment on
+ response, data = self._upload_image(test_app, GOOD_JPG)
+ response, data = self._post_image_to_feed(test_app, data)
+
+ content = "Hai this is a comment on this lovely picture ^_^"
+
+ activity = {
+ "verb": "post",
+ "object": {
+ "objectType": "comment",
+ "content": content,
+ "inReplyTo": data["object"],
+ }
+ }
+
+ response, comment_data = self._activity_to_feed(test_app, activity)
+ assert response.status_code == 200
+
+ # Find the objects in the database
+ media = MediaEntry.query.filter_by(public_id=data["object"]["id"]).first()
+ comment = media.get_comments()[0].comment()
+
+ # Tests that it matches in the database
+ assert comment.actor == self.user.id
+ assert comment.content == content
+
+ # Test that the response is what we should be given
+ assert comment.content == comment_data["object"]["content"]
+
+ def test_unable_to_post_comment_as_someone_else(self, test_app):
+ """ Tests that you're unable to post a comment as someone else. """
+ # Upload some media to comment on
+ response, data = self._upload_image(test_app, GOOD_JPG)
+ response, data = self._post_image_to_feed(test_app, data)
+
+ activity = {
+ "verb": "post",
+ "object": {
+ "objectType": "comment",
+ "content": "comment commenty comment ^_^",
+ "inReplyTo": data["object"],
+ }
+ }
+
+ headers = {
+ "Content-Type": "application/json",
+ }
+
+ with self.mock_oauth():
+ with pytest.raises(AppError) as excinfo:
+ test_app.post(
+ "/api/user/{0}/feed".format(self.other_user.username),
+ json.dumps(activity),
+ headers=headers
+ )
+
+ assert "403 FORBIDDEN" in excinfo.value.args[0]
+
+ def test_unable_to_update_someone_elses_comment(self, test_app):
+ """ Test that you're able to update someoen elses comment. """
+ # Upload some media to comment on
+ response, data = self._upload_image(test_app, GOOD_JPG)
+ response, data = self._post_image_to_feed(test_app, data)
+
+ activity = {
+ "verb": "post",
+ "object": {
+ "objectType": "comment",
+ "content": "comment commenty comment ^_^",
+ "inReplyTo": data["object"],
+ }
+ }
+
+ headers = {
+ "Content-Type": "application/json",
+ }
+
+ # Post the comment.
+ response, comment_data = self._activity_to_feed(test_app, activity)
+
+ # change who uploaded the comment as it's easier than changing
+ comment = TextComment.query.filter_by(public_id=comment_data["object"]["id"]).first()
+ comment.actor = self.other_user.id
+ comment.save()
+
+ # Update the comment as someone else.
+ comment_data["object"]["content"] = "Yep"
+ activity = {
+ "verb": "update",
+ "object": comment_data["object"]
+ }
+
+ with self.mock_oauth():
+ with pytest.raises(AppError) as excinfo:
+ test_app.post(
+ "/api/user/{0}/feed".format(self.user.username),
+ json.dumps(activity),
+ headers=headers
+ )
+
+ assert "403 FORBIDDEN" in excinfo.value.args[0]
+
+ def test_profile(self, test_app):
+ """ Tests profile endpoint """
+ uri = "/api/user/{0}/profile".format(self.user.username)
+ with self.mock_oauth():
+ response = test_app.get(uri)
+ profile = json.loads(response.body.decode())
+
+ assert response.status_code == 200
+
+ assert profile["preferredUsername"] == self.user.username
+ assert profile["objectType"] == "person"
+
+ assert "links" in profile
+
+ def test_user(self, test_app):
+ """ Test the user endpoint """
+ uri = "/api/user/{0}/".format(self.user.username)
+ with self.mock_oauth():
+ response = test_app.get(uri)
+ user = json.loads(response.body.decode())
+
+ assert response.status_code == 200
+
+ assert user["nickname"] == self.user.username
+ assert user["updated"] == self.user.created.isoformat()
+ assert user["published"] == self.user.created.isoformat()
+
+ # Test profile exists but self.test_profile will test the value
+ assert "profile" in response
+
+ def test_whoami_without_login(self, test_app):
+ """ Test that whoami endpoint returns error when not logged in """
+ with pytest.raises(AppError) as excinfo:
+ response = test_app.get("/api/whoami")
+
+ assert "401 UNAUTHORIZED" in excinfo.value.args[0]
+
+ def test_read_feed(self, test_app):
+ """ Test able to read objects from the feed """
+ response, image_data = self._upload_image(test_app, GOOD_JPG)
+ response, data = self._post_image_to_feed(test_app, image_data)
+
+ uri = "/api/user/{0}/feed".format(self.active_user.username)
+ with self.mock_oauth():
+ response = test_app.get(uri)
+ feed = json.loads(response.body.decode())
+
+ assert response.status_code == 200
+
+ # Check it has the attributes it should
+ assert "displayName" in feed
+ assert "objectTypes" in feed
+ assert "url" in feed
+ assert "links" in feed
+ assert "author" in feed
+ assert "items" in feed
+
+ # Check that image i uploaded is there
+ assert feed["items"][0]["verb"] == "post"
+ assert feed["items"][0]["id"] == data["id"]
+ assert feed["items"][0]["object"]["objectType"] == "image"
+ assert feed["items"][0]["object"]["id"] == data["object"]["id"]
+
+ default_limit = 20
+ items_count = default_limit * 2
+ for i in range(items_count):
+ response, image_data = self._upload_image(test_app, GOOD_JPG)
+ self._post_image_to_feed(test_app, image_data)
+ items_count += 1 # because there already is one
+
+ #
+ # default returns default_limit items
+ #
+ with self.mock_oauth():
+ response = test_app.get(uri)
+ feed = json.loads(response.body.decode())
+ assert len(feed["items"]) == default_limit
+
+ #
+ # silentely ignore count and offset that that are
+ # not a number
+ #
+ with self.mock_oauth():
+ response = test_app.get(uri + "?count=BAD&offset=WORSE")
+ feed = json.loads(response.body.decode())
+ assert len(feed["items"]) == default_limit
+
+ #
+ # if offset is less than default_limit items
+ # from the end of the feed, return less than
+ # default_limit
+ #
+ with self.mock_oauth():
+ near_the_end = items_count - default_limit / 2
+ response = test_app.get(uri + "?offset=%d" % near_the_end)
+ feed = json.loads(response.body.decode())
+ assert len(feed["items"]) < default_limit
+
+ #
+ # count=5 returns 5 items
+ #
+ with self.mock_oauth():
+ response = test_app.get(uri + "?count=5")
+ feed = json.loads(response.body.decode())
+ assert len(feed["items"]) == 5
+
+ def test_read_another_feed(self, test_app):
+ """ Test able to read objects from someone else's feed """
+ response, data = self._upload_image(test_app, GOOD_JPG)
+ response, data = self._post_image_to_feed(test_app, data)
+
+ # Change the active user to someone else.
+ self.active_user = self.other_user
+
+ # Fetch the feed
+ url = "/api/user/{0}/feed".format(self.user.username)
+ with self.mock_oauth():
+ response = test_app.get(url)
+ feed = json.loads(response.body.decode())
+
+ assert response.status_code == 200
+
+ # Check it has the attributes it ought to.
+ assert "displayName" in feed
+ assert "objectTypes" in feed
+ assert "url" in feed
+ assert "links" in feed
+ assert "author" in feed
+ assert "items" in feed
+
+ # Assert the uploaded image is there
+ assert feed["items"][0]["verb"] == "post"
+ assert feed["items"][0]["id"] == data["id"]
+ assert feed["items"][0]["object"]["objectType"] == "image"
+ assert feed["items"][0]["object"]["id"] == data["object"]["id"]
+
+ def test_cant_post_to_someone_elses_feed(self, test_app):
+ """ Test that can't post to someone elses feed """
+ response, data = self._upload_image(test_app, GOOD_JPG)
+ self.active_user = self.other_user
+
+ with self.mock_oauth():
+ with pytest.raises(AppError) as excinfo:
+ self._post_image_to_feed(test_app, data)
+
+ assert "403 FORBIDDEN" in excinfo.value.args[0]
+
+ def test_object_endpoint_requestable(self, test_app):
+ """ Test that object endpoint can be requested """
+ response, data = self._upload_image(test_app, GOOD_JPG)
+ response, data = self._post_image_to_feed(test_app, data)
+ object_id = data["object"]["id"]
+
+ with self.mock_oauth():
+ response = test_app.get(data["object"]["links"]["self"]["href"])
+ data = json.loads(response.body.decode())
+
+ assert response.status_code == 200
+
+ assert object_id == data["id"]
+ assert "url" in data
+ assert "links" in data
+ assert data["objectType"] == "image"
+
+ def test_delete_media_by_activity(self, test_app):
+ """ Test that an image can be deleted by a delete activity to feed """
+ response, data = self._upload_image(test_app, GOOD_JPG)
+ response, data = self._post_image_to_feed(test_app, data)
+ object_id = data["object"]["id"]
+
+ activity = {
+ "verb": "delete",
+ "object": {
+ "id": object_id,
+ "objectType": "image",
+ }
+ }
+
+ response = self._activity_to_feed(test_app, activity)[1]
- self.user_password = u'4cc355_70k3N'
- self.user = fixture_add_user(u'joapi', self.user_password,
- privileges=[u'active',u'uploader'])
+ # Check the media is no longer in the database
+ media = MediaEntry.query.filter_by(public_id=object_id).first()
- def login(self, test_app):
- test_app.post(
- '/auth/login/', {
- 'username': self.user.username,
- 'password': self.user_password})
+ assert media is None
- def get_context(self, template_name):
- return template.TEMPLATE_TEST_CONTEXT[template_name]
+ # Check we've been given the full delete activity back
+ assert "id" in response
+ assert response["verb"] == "delete"
+ assert "object" in response
+ assert response["object"]["id"] == object_id
+ assert response["object"]["objectType"] == "image"
- def http_auth_headers(self):
- return {'Authorization': 'Basic {0}'.format(
- base64.b64encode(':'.join([
- self.user.username,
- self.user_password])))}
+ def test_delete_comment_by_activity(self, test_app):
+ """ Test that a comment is deleted by a delete activity to feed """
+ # First upload an image to comment against
+ response, data = self._upload_image(test_app, GOOD_JPG)
+ response, data = self._post_image_to_feed(test_app, data)
- def do_post(self, data, test_app, **kwargs):
- url = kwargs.pop('url', '/api/submit')
- do_follow = kwargs.pop('do_follow', False)
+ # Post a comment to delete
+ activity = {
+ "verb": "post",
+ "object": {
+ "objectType": "comment",
+ "content": "This is a comment.",
+ "inReplyTo": data["object"],
+ }
+ }
- if not 'headers' in kwargs.keys():
- kwargs['headers'] = self.http_auth_headers()
+ comment = self._activity_to_feed(test_app, activity)[1]
- response = test_app.post(url, data, **kwargs)
+ # Now delete the image
+ activity = {
+ "verb": "delete",
+ "object": {
+ "id": comment["object"]["id"],
+ "objectType": "comment",
+ }
+ }
- if do_follow:
- response.follow()
+ delete = self._activity_to_feed(test_app, activity)[1]
- return response
+ # Verify the comment no longer exists
+ assert TextComment.query.filter_by(public_id=comment["object"]["id"]).first() is None
+ comment_id = comment["object"]["id"]
- def upload_data(self, filename):
- return {'upload_files': [('file', filename)]}
+ # Check we've got a delete activity back
+ assert "id" in delete
+ assert delete["verb"] == "delete"
+ assert "object" in delete
+ assert delete["object"]["id"] == comment["object"]["id"]
+ assert delete["object"]["objectType"] == "comment"
- def test_1_test_test_view(self, test_app):
- self.login(test_app)
+ def test_edit_comment(self, test_app):
+ """ Test that someone can update their own comment """
+ # First upload an image to comment against
+ response, data = self._upload_image(test_app, GOOD_JPG)
+ response, data = self._post_image_to_feed(test_app, data)
- response = test_app.get(
- '/api/test',
- headers=self.http_auth_headers())
+ # Post a comment to edit
+ activity = {
+ "verb": "post",
+ "object": {
+ "objectType": "comment",
+ "content": "This is a comment",
+ "inReplyTo": data["object"],
+ }
+ }
- assert response.body == \
- '{"username": "joapi", "email": "joapi@example.com"}'
+ comment = self._activity_to_feed(test_app, activity)[1]
- def test_2_test_submission(self, test_app):
- self.login(test_app)
+ # Now create an update activity to change the content
+ activity = {
+ "verb": "update",
+ "object": {
+ "id": comment["object"]["id"],
+ "content": "This is my fancy new content string!",
+ "objectType": "comment",
+ },
+ }
- response = self.do_post(
- {'title': 'Great JPG!'},
- test_app,
- **self.upload_data(GOOD_JPG))
+ comment = self._activity_to_feed(test_app, activity)[1]
- assert response.status_int == 200
+ # Verify the comment reflects the changes
+ model = TextComment.query.filter_by(public_id=comment["object"]["id"]).first()
- assert self.db.MediaEntry.query.filter_by(title=u'Great JPG!').first()
+ assert model.content == activity["object"]["content"]
diff --git a/mediagoblin/tests/test_audio.py b/mediagoblin/tests/test_audio.py
new file mode 100644
index 00000000..9826ceb1
--- /dev/null
+++ b/mediagoblin/tests/test_audio.py
@@ -0,0 +1,105 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import tempfile
+import shutil
+import os
+import pytest
+from contextlib import contextmanager
+import logging
+import imghdr
+
+#os.environ['GST_DEBUG'] = '4,python:4'
+
+pytest.importorskip("gi.repository.Gst")
+pytest.importorskip("scikits.audiolab")
+import gi
+gi.require_version('Gst', '1.0')
+from gi.repository import Gst
+Gst.init(None)
+
+from mediagoblin.media_types.audio.transcoders import (AudioTranscoder,
+ AudioThumbnailer)
+from mediagoblin.media_types.tools import discover
+
+
+@contextmanager
+def create_audio():
+ audio = tempfile.NamedTemporaryFile()
+ src = Gst.ElementFactory.make('audiotestsrc', None)
+ src.set_property('num-buffers', 50)
+ enc = Gst.ElementFactory.make('flacenc', None)
+ dst = Gst.ElementFactory.make('filesink', None)
+ dst.set_property('location', audio.name)
+ pipeline = Gst.Pipeline()
+ pipeline.add(src)
+ pipeline.add(enc)
+ pipeline.add(dst)
+ src.link(enc)
+ enc.link(dst)
+ pipeline.set_state(Gst.State.PLAYING)
+ state = pipeline.get_state(3 * Gst.SECOND)
+ assert state[0] == Gst.StateChangeReturn.SUCCESS
+ bus = pipeline.get_bus()
+ bus.timed_pop_filtered(
+ 3 * Gst.SECOND,
+ Gst.MessageType.ERROR | Gst.MessageType.EOS)
+ pipeline.set_state(Gst.State.NULL)
+ yield (audio.name)
+
+
+@contextmanager
+def create_data_for_test():
+ with create_audio() as audio_name:
+ second_file = tempfile.NamedTemporaryFile()
+ yield (audio_name, second_file.name)
+
+
+def test_transcoder():
+ '''
+ Tests AudioTransocder's transcode method
+ '''
+ transcoder = AudioTranscoder()
+ with create_data_for_test() as (audio_name, result_name):
+ transcoder.transcode(audio_name, result_name, quality=0.3,
+ progress_callback=None)
+ info = discover(result_name)
+ assert len(info.get_audio_streams()) == 1
+ transcoder.transcode(audio_name, result_name, quality=0.3,
+ mux_name='oggmux', progress_callback=None)
+ info = discover(result_name)
+ assert len(info.get_audio_streams()) == 1
+
+
+def test_thumbnails():
+ '''Test thumbnails generation.
+
+ The code below heavily repeats
+ audio.processing.CommonAudioProcessor.create_spectrogram
+ 1. Create test audio
+ 2. Convert it to OGG source for spectogram using transcoder
+ 3. Create spectogram in jpg
+
+ '''
+ thumbnailer = AudioThumbnailer()
+ transcoder = AudioTranscoder()
+ with create_data_for_test() as (audio_name, new_name):
+ transcoder.transcode(audio_name, new_name, mux_name='oggmux')
+ thumbnail = tempfile.NamedTemporaryFile(suffix='.jpg')
+ # fft_size below is copypasted from config_spec.ini
+ thumbnailer.spectrogram(new_name, thumbnail.name, width=100,
+ fft_size=4096)
+ assert imghdr.what(thumbnail.name) == 'jpeg'
diff --git a/mediagoblin/tests/test_auth.py b/mediagoblin/tests/test_auth.py
index 1bbc3d01..618d02b6 100644
--- a/mediagoblin/tests/test_auth.py
+++ b/mediagoblin/tests/test_auth.py
@@ -1,4 +1,3 @@
-
# GNU MediaGoblin -- federated, autonomous media hosting
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
@@ -14,12 +13,16 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import urlparse
+
import pkg_resources
import pytest
+import six
+
+import six.moves.urllib.parse as urlparse
+
from mediagoblin import mg_globals
-from mediagoblin.db.models import User
+from mediagoblin.db.models import User, LocalUser
from mediagoblin.tests.tools import get_app, fixture_add_user
from mediagoblin.tools import template, mail
from mediagoblin.auth import tools as auth_tools
@@ -76,9 +79,31 @@ def test_register_views(test_app):
assert form.username.errors == [u'This field does not take email addresses.']
assert form.email.errors == [u'This field requires an email address.']
+ ## invalid characters
+ template.clear_test_template_context()
+ test_app.post(
+ '/auth/register/', {
+ 'username': 'ampersand&invalid',
+ 'email': 'easter@egg.com'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
+ form = context['register_form']
+
+ assert form.username.errors == [u'Invalid input.']
+
## At this point there should be no users in the database ;)
assert User.query.count() == 0
+ ## mixture of characters from all valid ranges
+ template.clear_test_template_context()
+ test_app.post(
+ '/auth/register/', {
+ 'username': 'Jean-Louis1_Le-Chat',
+ 'password': 'iamsohappy',
+ 'email': 'easter@egg.com'})
+
+ ## At this point there should on user in the database
+ assert User.query.count() == 1
+
# Successful register
# -------------------
template.clear_test_template_context()
@@ -94,8 +119,9 @@ def test_register_views(test_app):
assert 'mediagoblin/user_pages/user_nonactive.html' in template.TEMPLATE_TEST_CONTEXT
## Make sure user is in place
- new_user = mg_globals.database.User.query.filter_by(
- username=u'angrygirl').first()
+ new_user = mg_globals.database.LocalUser.query.filter(
+ LocalUser.username==u'angrygirl'
+ ).first()
assert new_user
## Make sure that the proper privileges are granted on registration
@@ -107,15 +133,15 @@ def test_register_views(test_app):
## Make sure user is logged in
request = template.TEMPLATE_TEST_CONTEXT[
'mediagoblin/user_pages/user_nonactive.html']['request']
- assert request.session['user_id'] == unicode(new_user.id)
+ assert request.session['user_id'] == six.text_type(new_user.id)
## Make sure we get email confirmation, and try verifying
- assert len(mail.EMAIL_TEST_INBOX) == 1
+ assert len(mail.EMAIL_TEST_INBOX) == 2
message = mail.EMAIL_TEST_INBOX.pop()
assert message['To'] == 'angrygrrl@example.org'
email_context = template.TEMPLATE_TEST_CONTEXT[
'mediagoblin/auth/verification_email.txt']
- assert email_context['verification_url'] in message.get_payload(decode=True)
+ assert email_context['verification_url'].encode('ascii') in message.get_payload(decode=True)
path = urlparse.urlsplit(email_context['verification_url'])[2]
get_params = urlparse.urlsplit(email_context['verification_url'])[3]
@@ -133,8 +159,9 @@ def test_register_views(test_app):
# assert context['verification_successful'] == True
# TODO: Would be good to test messages here when we can do so...
- new_user = mg_globals.database.User.query.filter_by(
- username=u'angrygirl').first()
+ new_user = mg_globals.database.LocalUser.query.filter(
+ LocalUser.username==u'angrygirl'
+ ).first()
assert new_user
## Verify the email activation works
@@ -145,8 +172,9 @@ def test_register_views(test_app):
'mediagoblin/user_pages/user.html']
# assert context['verification_successful'] == True
# TODO: Would be good to test messages here when we can do so...
- new_user = mg_globals.database.User.query.filter_by(
- username=u'angrygirl').first()
+ new_user = mg_globals.database.LocalUser.query.filter(
+ LocalUser.username==u'angrygirl'
+ ).first()
assert new_user
# Uniqueness checks
@@ -180,13 +208,13 @@ def test_register_views(test_app):
assert 'mediagoblin/auth/login.html' in template.TEMPLATE_TEST_CONTEXT
## Make sure link to change password is sent by email
- assert len(mail.EMAIL_TEST_INBOX) == 1
+ assert len(mail.EMAIL_TEST_INBOX) == 2
message = mail.EMAIL_TEST_INBOX.pop()
assert message['To'] == 'angrygrrl@example.org'
email_context = template.TEMPLATE_TEST_CONTEXT[
'mediagoblin/plugins/basic_auth/fp_verification_email.txt']
#TODO - change the name of verification_url to something forgot-password-ish
- assert email_context['verification_url'] in message.get_payload(decode=True)
+ assert email_context['verification_url'].encode('ascii') in message.get_payload(decode=True)
path = urlparse.urlsplit(email_context['verification_url'])[2]
get_params = urlparse.urlsplit(email_context['verification_url'])[3]
@@ -229,7 +257,6 @@ def test_register_views(test_app):
assert urlparse.urlsplit(response.location)[2] == '/'
assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
-
def test_authentication_views(test_app):
"""
Test logging in and logging out
@@ -305,7 +332,7 @@ def test_authentication_views(test_app):
# Make sure user is in the session
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
session = context['request'].session
- assert session['user_id'] == unicode(test_user.id)
+ assert session['user_id'] == six.text_type(test_user.id)
# Successful logout
# -----------------
@@ -332,6 +359,66 @@ def test_authentication_views(test_app):
'next' : '/u/chris/'})
assert urlparse.urlsplit(response.location)[2] == '/u/chris/'
+ ## Verify that username is lowercased on login attempt
+ template.clear_test_template_context()
+ response = test_app.post(
+ '/auth/login/', {
+ 'username': u'ANDREW',
+ 'password': 'fuselage'})
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/login.html']
+ form = context['login_form']
+
+ # Username should no longer be uppercased; it should be lowercased
+ assert not form.username.data == u'ANDREW'
+ assert form.username.data == u'andrew'
+
+ # Successful login with short user
+ # --------------------------------
+ short_user = fixture_add_user(username=u'me', password=u'sho')
+ template.clear_test_template_context()
+ response = test_app.post(
+ '/auth/login/', {
+ 'username': u'me',
+ 'password': 'sho'})
+
+ # User should be redirected
+ response.follow()
+
+ assert urlparse.urlsplit(response.location)[2] == '/'
+ assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # Make sure user is in the session
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
+ session = context['request'].session
+ assert session['user_id'] == six.text_type(short_user.id)
+
+ # Must logout
+ template.clear_test_template_context()
+ response = test_app.get('/auth/logout/')
+
+ # Successful login with long user
+ # ----------------
+ long_user = fixture_add_user(
+ username=u'realllylonguser@reallylongdomain.com.co', password=u'sho')
+ template.clear_test_template_context()
+ response = test_app.post(
+ '/auth/login/', {
+ 'username': u'realllylonguser@reallylongdomain.com.co',
+ 'password': 'sho'})
+
+ # User should be redirected
+ response.follow()
+ assert urlparse.urlsplit(response.location)[2] == '/'
+ assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
+
+ # Make sure user is in the session
+ context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
+ session = context['request'].session
+ assert session['user_id'] == six.text_type(long_user.id)
+
+ template.clear_test_template_context()
+ response = test_app.get('/auth/logout/')
+
@pytest.fixture()
def authentication_disabled_app(request):
return get_app(
diff --git a/mediagoblin/tests/test_basic_auth.py b/mediagoblin/tests/test_basic_auth.py
index 828f0515..3a42e407 100644
--- a/mediagoblin/tests/test_basic_auth.py
+++ b/mediagoblin/tests/test_basic_auth.py
@@ -13,9 +13,10 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import urlparse
-from mediagoblin.db.models import User
+import six.moves.urllib.parse as urlparse
+
+from mediagoblin.db.models import User, LocalUser
from mediagoblin.plugins.basic_auth import tools as auth_tools
from mediagoblin.tests.tools import fixture_add_user
from mediagoblin.tools import template
@@ -87,7 +88,7 @@ def test_change_password(test_app):
assert urlparse.urlsplit(res.location)[2] == '/edit/account/'
# test_user has to be fetched again in order to have the current values
- test_user = User.query.filter_by(username=u'chris').first()
+ test_user = LocalUser.query.filter(LocalUser.username==u'chris').first()
assert auth_tools.bcrypt_check_password('123456', test_user.pw_hash)
# test that the password cannot be changed if the given
@@ -99,5 +100,5 @@ def test_change_password(test_app):
'new_password': '098765',
})
- test_user = User.query.filter_by(username=u'chris').first()
+ test_user = LocalUser.query.filter(LocalUser.username==u'chris').first()
assert not auth_tools.bcrypt_check_password('098765', test_user.pw_hash)
diff --git a/mediagoblin/tests/test_celery_setup.py b/mediagoblin/tests/test_celery_setup.py
index d60293f9..df0d04b0 100644
--- a/mediagoblin/tests/test_celery_setup.py
+++ b/mediagoblin/tests/test_celery_setup.py
@@ -48,7 +48,8 @@ def test_setup_celery_from_config():
assert isinstance(fake_celery_module.CELERYD_ETA_SCHEDULER_PRECISION, float)
assert fake_celery_module.CELERY_RESULT_PERSISTENT is True
assert fake_celery_module.CELERY_IMPORTS == [
- 'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing.task', 'mediagoblin.notifications.task']
+ 'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing.task', \
+ 'mediagoblin.notifications.task', 'mediagoblin.submit.task']
assert fake_celery_module.CELERY_RESULT_BACKEND == 'database'
assert fake_celery_module.CELERY_RESULT_DBURI == (
'sqlite:///' +
diff --git a/mediagoblin/tests/test_csrf_middleware.py b/mediagoblin/tests/test_csrf_middleware.py
index a272caf6..4452112b 100644
--- a/mediagoblin/tests/test_csrf_middleware.py
+++ b/mediagoblin/tests/test_csrf_middleware.py
@@ -25,7 +25,7 @@ def test_csrf_cookie_set(test_app):
# assert that the mediagoblin nonce cookie has been set
assert 'Set-Cookie' in response.headers
- assert cookie_name in response.cookies_set
+ assert cookie_name in test_app.cookies
# assert that we're also sending a vary header
assert response.headers.get('Vary', False) == 'Cookie'
@@ -34,7 +34,7 @@ def test_csrf_cookie_set(test_app):
# We need a fresh app for this test on webtest < 1.3.6.
# We do not understand why, but it fixes the tests.
# If we require webtest >= 1.3.6, we can switch to a non fresh app here.
-#
+#
# ... this comment might be irrelevant post-pytest-fixtures, but I'm not
# removing it yet in case we move to module-level tests :)
# -- cwebber
diff --git a/mediagoblin/tests/test_edit.py b/mediagoblin/tests/test_edit.py
index dc9c422f..632c8e3c 100644
--- a/mediagoblin/tests/test_edit.py
+++ b/mediagoblin/tests/test_edit.py
@@ -14,10 +14,12 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import urlparse, os, pytest
+import six
+import six.moves.urllib.parse as urlparse
+import pytest
from mediagoblin import mg_globals
-from mediagoblin.db.models import User, MediaEntry
+from mediagoblin.db.models import User, LocalUser, MediaEntry
from mediagoblin.tests.tools import fixture_add_user, fixture_media_entry
from mediagoblin import auth
from mediagoblin.tools import template, mail
@@ -42,12 +44,12 @@ class TestUserEdit(object):
self.login(test_app)
# Make sure user exists
- assert User.query.filter_by(username=u'chris').first()
+ assert LocalUser.query.filter(LocalUser.username==u'chris').first()
res = test_app.post('/edit/account/delete/', {'confirmed': 'y'})
# Make sure user has been deleted
- assert User.query.filter_by(username=u'chris').first() == None
+ assert LocalUser.query.filter(LocalUser.username==u'chris').first() == None
#TODO: make sure all corresponding items comments etc have been
# deleted too. Perhaps in submission test?
@@ -77,7 +79,7 @@ class TestUserEdit(object):
'bio': u'I love toast!',
'url': u'http://dustycloud.org/'})
- test_user = User.query.filter_by(username=u'chris').first()
+ test_user = LocalUser.query.filter(LocalUser.username==u'chris').first()
assert test_user.bio == u'I love toast!'
assert test_user.url == u'http://dustycloud.org/'
@@ -142,8 +144,7 @@ class TestUserEdit(object):
assert message['To'] == 'new@example.com'
email_context = template.TEMPLATE_TEST_CONTEXT[
'mediagoblin/edit/verification.txt']
- assert email_context['verification_url'] in \
- message.get_payload(decode=True)
+ assert email_context['verification_url'].encode('ascii') in message.get_payload(decode=True)
path = urlparse.urlsplit(email_context['verification_url'])[2]
assert path == u'/edit/verify_email/'
@@ -158,9 +159,10 @@ class TestUserEdit(object):
assert urlparse.urlsplit(res.location)[2] == '/'
# Email shouldn't be saved
- email_in_db = mg_globals.database.User.query.filter_by(
- email='new@example.com').first()
- email = User.query.filter_by(username='chris').first().email
+ email_in_db = mg_globals.database.LocalUser.query.filter(
+ LocalUser.email=='new@example.com'
+ ).first()
+ email = LocalUser.query.filter(LocalUser.username=='chris').first().email
assert email_in_db is None
assert email == 'chris@example.com'
@@ -171,7 +173,7 @@ class TestUserEdit(object):
res.follow()
# New email saved?
- email = User.query.filter_by(username='chris').first().email
+ email = LocalUser.query.filter(LocalUser.username=='chris').first().email
assert email == 'new@example.com'
# test changing the url inproperly
@@ -180,8 +182,10 @@ class TestMetaDataEdit:
def setup(self, test_app):
# set up new user
self.user_password = u'toast'
- self.user = fixture_add_user(password = self.user_password,
- privileges=[u'active',u'admin'])
+ self.user = fixture_add_user(
+ password = self.user_password,
+ privileges=[u'active',u'admin']
+ )
self.test_app = test_app
def login(self, test_app):
@@ -250,5 +254,11 @@ class TestMetaDataEdit:
old_metadata = new_metadata
new_metadata = media_entry.media_metadata
assert new_metadata == old_metadata
- assert ("u&#39;On the worst day&#39; is not a &#39;date-time&#39;" in
- response.body)
+ context = template.TEMPLATE_TEST_CONTEXT[
+ 'mediagoblin/edit/metadata.html']
+ if six.PY2:
+ expected = "u'On the worst day' is not a 'date-time'"
+ else:
+ expected = "'On the worst day' is not a 'date-time'"
+ assert context['form'].errors[
+ 'media_metadata'][0]['identifier'][0] == expected
diff --git a/mediagoblin/tests/test_exif.py b/mediagoblin/tests/test_exif.py
index af301818..d0495a7a 100644
--- a/mediagoblin/tests/test_exif.py
+++ b/mediagoblin/tests/test_exif.py
@@ -20,6 +20,8 @@ try:
except ImportError:
import Image
+from collections import OrderedDict
+
from mediagoblin.tools.exif import exif_fix_image_orientation, \
extract_exif, clean_exif, get_gps_data, get_useful
from .resources import GOOD_JPG, EMPTY_JPG, BAD_JPG, GPS_JPG
@@ -39,31 +41,32 @@ def test_exif_extraction():
gps = get_gps_data(result)
# Do we have the result?
- assert len(result) == 55
+ assert len(result) >= 50
# Do we have clean data?
- assert len(clean) == 53
+ assert len(clean) >= 50
# GPS data?
assert gps == {}
# Do we have the "useful" tags?
- assert useful == {'EXIF CVAPattern': {'field_length': 8,
+
+ expected = OrderedDict({'EXIF CVAPattern': {'field_length': 8,
'field_offset': 26224,
'field_type': 7,
- 'printable': u'[0, 2, 0, 2, 1, 2, 0, 1]',
+ 'printable': '[0, 2, 0, 2, 1, 2, 0, 1]',
'tag': 41730,
'values': [0, 2, 0, 2, 1, 2, 0, 1]},
'EXIF ColorSpace': {'field_length': 2,
'field_offset': 476,
'field_type': 3,
- 'printable': u'sRGB',
+ 'printable': 'sRGB',
'tag': 40961,
'values': [1]},
'EXIF ComponentsConfiguration': {'field_length': 4,
'field_offset': 308,
'field_type': 7,
- 'printable': u'YCbCr',
+ 'printable': 'YCbCr',
'tag': 37121,
'values': [1, 2, 3, 0]},
'EXIF CompressedBitsPerPixel': {'field_length': 8,
@@ -303,7 +306,7 @@ def test_exif_extraction():
'Image Orientation': {'field_length': 2,
'field_offset': 42,
'field_type': 3,
- 'printable': u'Rotated 90 CCW',
+ 'printable': u'Rotated 90 CW',
'tag': 274,
'values': [6]},
'Image ResolutionUnit': {'field_length': 2,
@@ -365,7 +368,10 @@ def test_exif_extraction():
'field_type': 5,
'printable': u'300',
'tag': 283,
- 'values': [[300, 1]]}}
+ 'values': [[300, 1]]}})
+
+ for key in expected.keys():
+ assert useful[key] == expected[key]
def test_exif_image_orientation():
@@ -379,11 +385,13 @@ def test_exif_image_orientation():
result)
# Are the dimensions correct?
- assert image.size == (428, 640)
+ assert image.size in ((428, 640), (640, 428))
# If this pixel looks right, the rest of the image probably will too.
+ # It seems different values are being seen on different platforms/systems
+ # as of ccca39f1 it seems we're adding to the list those which are seen.
assert_in(image.getdata()[10000],
- ((41, 28, 11), (43, 27, 11))
+ ((37, 23, 14), (41, 28, 11), (43, 27, 11))
)
diff --git a/mediagoblin/tests/test_http_callback.py b/mediagoblin/tests/test_http_callback.py
deleted file mode 100644
index 64b7ee8f..00000000
--- a/mediagoblin/tests/test_http_callback.py
+++ /dev/null
@@ -1,83 +0,0 @@
-# GNU MediaGoblin -- federated, autonomous media hosting
-# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import json
-
-import pytest
-from urlparse import urlparse, parse_qs
-
-from mediagoblin import mg_globals
-from mediagoblin.tools import processing
-from mediagoblin.tests.tools import fixture_add_user
-from mediagoblin.tests.test_submission import GOOD_PNG
-from mediagoblin.tests import test_oauth2 as oauth
-
-
-class TestHTTPCallback(object):
- @pytest.fixture(autouse=True)
- def setup(self, test_app):
- self.test_app = test_app
-
- self.db = mg_globals.database
-
- self.user_password = u'secret'
- self.user = fixture_add_user(u'call_back', self.user_password)
-
- self.login()
-
- def login(self):
- self.test_app.post('/auth/login/', {
- 'username': self.user.username,
- 'password': self.user_password})
-
- def get_access_token(self, client_id, client_secret, code):
- response = self.test_app.get('/oauth-2/access_token', {
- 'code': code,
- 'client_id': client_id,
- 'client_secret': client_secret})
-
- response_data = json.loads(response.body)
-
- return response_data['access_token']
-
- def test_callback(self):
- ''' Test processing HTTP callback '''
- self.oauth = oauth.TestOAuth()
- self.oauth.setup(self.test_app)
-
- redirect, client_id = self.oauth.test_4_authorize_confidential_client()
-
- code = parse_qs(urlparse(redirect.location).query)['code'][0]
-
- client = self.db.OAuthClient.query.filter(
- self.db.OAuthClient.identifier == unicode(client_id)).first()
-
- client_secret = client.secret
-
- access_token = self.get_access_token(client_id, client_secret, code)
-
- callback_url = 'https://foo.example?secrettestmediagoblinparam'
-
- self.test_app.post('/api/submit?client_id={0}&access_token={1}\
-&client_secret={2}'.format(
- client_id,
- access_token,
- client_secret), {
- 'title': 'Test',
- 'callback_url': callback_url},
- upload_files=[('file', GOOD_PNG)])
-
- assert processing.TESTS_CALLBACKS[callback_url]['state'] == u'processed'
diff --git a/mediagoblin/tests/test_ldap.py b/mediagoblin/tests/test_ldap.py
index 7e20d059..6ac0fc46 100644
--- a/mediagoblin/tests/test_ldap.py
+++ b/mediagoblin/tests/test_ldap.py
@@ -13,13 +13,20 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import urlparse
+
import pkg_resources
import pytest
-import mock
+import six
+try:
+ import mock
+except ImportError:
+ import unittest.mock as mock
+
+import six.moves.urllib.parse as urlparse
from mediagoblin import mg_globals
from mediagoblin.db.base import Session
+from mediagoblin.db.models import LocalUser
from mediagoblin.tests.tools import get_app
from mediagoblin.tools import template
@@ -108,8 +115,9 @@ def test_ldap_plugin(ldap_plugin_app):
ldap_plugin_app.get('/auth/logout/')
# Get user and detach from session
- test_user = mg_globals.database.User.query.filter_by(
- username=u'chris').first()
+ test_user = mg_globals.database.LocalUser.query.filter(
+ LocalUser.username==u'chris'
+ ).first()
Session.expunge(test_user)
# Log back in
@@ -126,6 +134,6 @@ def test_ldap_plugin(ldap_plugin_app):
# Make sure user is in the session
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
session = context['request'].session
- assert session['user_id'] == unicode(test_user.id)
+ assert session['user_id'] == six.text_type(test_user.id)
_test_authentication()
diff --git a/mediagoblin/tests/test_legacy_api.py b/mediagoblin/tests/test_legacy_api.py
new file mode 100644
index 00000000..b3b2fcec
--- /dev/null
+++ b/mediagoblin/tests/test_legacy_api.py
@@ -0,0 +1,94 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import logging
+import base64
+import json
+
+import pytest
+
+from mediagoblin import mg_globals
+from mediagoblin.tools import template, pluginapi
+from mediagoblin.tests.tools import fixture_add_user
+from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \
+ BIG_BLUE
+
+
+_log = logging.getLogger(__name__)
+
+
+class TestAPI(object):
+ def setup(self):
+ self.db = mg_globals.database
+
+ self.user_password = u'4cc355_70k3N'
+ self.user = fixture_add_user(u'joapi', self.user_password,
+ privileges=[u'active',u'uploader'])
+
+ def login(self, test_app):
+ test_app.post(
+ '/auth/login/', {
+ 'username': self.user.username,
+ 'password': self.user_password})
+
+ def get_context(self, template_name):
+ return template.TEMPLATE_TEST_CONTEXT[template_name]
+
+ def http_auth_headers(self):
+ return {'Authorization': ('Basic {0}'.format(
+ base64.b64encode((':'.join([
+ self.user.username,
+ self.user_password])).encode('ascii')).decode()))}
+
+ def do_post(self, data, test_app, **kwargs):
+ url = kwargs.pop('url', '/api/submit')
+ do_follow = kwargs.pop('do_follow', False)
+
+ if not 'headers' in kwargs.keys():
+ kwargs['headers'] = self.http_auth_headers()
+
+ response = test_app.post(url, data, **kwargs)
+
+ if do_follow:
+ response.follow()
+
+ return response
+
+ def upload_data(self, filename):
+ return {'upload_files': [('file', filename)]}
+
+ def test_1_test_test_view(self, test_app):
+ self.login(test_app)
+
+ response = test_app.get(
+ '/api/test',
+ headers=self.http_auth_headers())
+
+ assert json.loads(response.body.decode()) == {
+ "username": "joapi", "email": "joapi@example.com"}
+
+ def test_2_test_submission(self, test_app):
+ self.login(test_app)
+
+ response = self.do_post(
+ {'title': 'Great JPG!'},
+ test_app,
+ **self.upload_data(GOOD_JPG))
+
+ assert response.status_int == 200
+
+ assert self.db.MediaEntry.query.filter_by(title=u'Great JPG!').first()
diff --git a/mediagoblin/tests/test_metadata.py b/mediagoblin/tests/test_metadata.py
index b4ea646e..a10e00ec 100644
--- a/mediagoblin/tests/test_metadata.py
+++ b/mediagoblin/tests/test_metadata.py
@@ -56,7 +56,7 @@ class TestMetadataFunctionality:
jsonld_fail_1 = None
try:
jsonld_fail_1 = compact_and_validate(metadata_fail_1)
- except ValidationError, e:
+ except ValidationError as e:
assert e.message == "'All Rights Reserved.' is not a 'uri'"
assert jsonld_fail_1 == None
#,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,
@@ -72,7 +72,7 @@ class TestMetadataFunctionality:
jsonld_fail_2 = None
try:
jsonld_fail_2 = compact_and_validate(metadata_fail_2)
- except ValidationError, e:
+ except ValidationError as e:
assert e.message == "'The other day' is not a 'date-time'"
assert jsonld_fail_2 == None
diff --git a/mediagoblin/tests/test_mgoblin_app.ini b/mediagoblin/tests/test_mgoblin_app.ini
index 4cd3d9b6..a873f71f 100644
--- a/mediagoblin/tests/test_mgoblin_app.ini
+++ b/mediagoblin/tests/test_mgoblin_app.ini
@@ -31,10 +31,12 @@ BROKER_URL = "sqlite:///%(here)s/test_user_dev/kombu.db"
[plugins]
[[mediagoblin.plugins.api]]
-[[mediagoblin.plugins.oauth]]
[[mediagoblin.plugins.httpapiauth]]
[[mediagoblin.plugins.piwigo]]
[[mediagoblin.plugins.basic_auth]]
[[mediagoblin.plugins.openid]]
[[mediagoblin.media_types.image]]
-[[mediagoblin.media_types.pdf]]
+## These ones enabled by specific applications
+# [[mediagoblin.media_types.video]]
+# [[mediagoblin.media_types.audio]]
+# [[mediagoblin.media_types.pdf]]
diff --git a/mediagoblin/tests/test_mgoblin_app_audio.ini b/mediagoblin/tests/test_mgoblin_app_audio.ini
new file mode 100644
index 00000000..e3bdb11a
--- /dev/null
+++ b/mediagoblin/tests/test_mgoblin_app_audio.ini
@@ -0,0 +1,20 @@
+[mediagoblin]
+#Runs with an in-memory sqlite db for speed.
+sql_engine = "sqlite://"
+run_migrations = true
+
+[storage:publicstore]
+base_dir = %(here)s/user_dev/media/public
+base_url = /mgoblin_media/
+
+[storage:queuestore]
+base_dir = %(here)s/user_dev/media/queue
+
+[celery]
+CELERY_ALWAYS_EAGER = true
+CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db"
+BROKER_URL = "sqlite:///%(here)s/test_user_dev/kombu.db"
+
+[plugins]
+[[mediagoblin.plugins.basic_auth]]
+[[mediagoblin.media_types.audio]]
diff --git a/mediagoblin/tests/test_mgoblin_app_audio_video.ini b/mediagoblin/tests/test_mgoblin_app_audio_video.ini
new file mode 100644
index 00000000..784015ee
--- /dev/null
+++ b/mediagoblin/tests/test_mgoblin_app_audio_video.ini
@@ -0,0 +1,21 @@
+[mediagoblin]
+#Runs with an in-memory sqlite db for speed.
+sql_engine = "sqlite://"
+run_migrations = true
+
+[storage:publicstore]
+base_dir = %(here)s/user_dev/media/public
+base_url = /mgoblin_media/
+
+[storage:queuestore]
+base_dir = %(here)s/user_dev/media/queue
+
+[celery]
+CELERY_ALWAYS_EAGER = true
+CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db"
+BROKER_URL = "sqlite:///%(here)s/test_user_dev/kombu.db"
+
+[plugins]
+[[mediagoblin.plugins.basic_auth]]
+[[mediagoblin.media_types.audio]]
+[[mediagoblin.media_types.video]]
diff --git a/mediagoblin/tests/test_mgoblin_app_pdf.ini b/mediagoblin/tests/test_mgoblin_app_pdf.ini
new file mode 100644
index 00000000..d1d3fcef
--- /dev/null
+++ b/mediagoblin/tests/test_mgoblin_app_pdf.ini
@@ -0,0 +1,20 @@
+[mediagoblin]
+#Runs with an in-memory sqlite db for speed.
+sql_engine = "sqlite://"
+run_migrations = true
+
+[storage:publicstore]
+base_dir = %(here)s/user_dev/media/public
+base_url = /mgoblin_media/
+
+[storage:queuestore]
+base_dir = %(here)s/user_dev/media/queue
+
+[celery]
+CELERY_ALWAYS_EAGER = true
+CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db"
+BROKER_URL = "sqlite:///%(here)s/test_user_dev/kombu.db"
+
+[plugins]
+[[mediagoblin.plugins.basic_auth]]
+[[mediagoblin.media_types.pdf]]
diff --git a/mediagoblin/tests/test_mgoblin_app_video.ini b/mediagoblin/tests/test_mgoblin_app_video.ini
new file mode 100644
index 00000000..8b0e16e5
--- /dev/null
+++ b/mediagoblin/tests/test_mgoblin_app_video.ini
@@ -0,0 +1,20 @@
+[mediagoblin]
+#Runs with an in-memory sqlite db for speed.
+sql_engine = "sqlite://"
+run_migrations = true
+
+[storage:publicstore]
+base_dir = %(here)s/user_dev/media/public
+base_url = /mgoblin_media/
+
+[storage:queuestore]
+base_dir = %(here)s/user_dev/media/queue
+
+[celery]
+CELERY_ALWAYS_EAGER = true
+CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db"
+BROKER_URL = "sqlite:///%(here)s/test_user_dev/kombu.db"
+
+[plugins]
+[[mediagoblin.plugins.basic_auth]]
+[[mediagoblin.media_types.video]]
diff --git a/mediagoblin/tests/test_misc.py b/mediagoblin/tests/test_misc.py
index 43ad0b6d..5500a0d7 100644
--- a/mediagoblin/tests/test_misc.py
+++ b/mediagoblin/tests/test_misc.py
@@ -14,8 +14,17 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import pytz
+import datetime
+
+from werkzeug.datastructures import FileStorage
+
+from .resources import GOOD_JPG
from mediagoblin.db.base import Session
-from mediagoblin.db.models import User, MediaEntry, MediaComment
+from mediagoblin.media_types import sniff_media
+from mediagoblin.submit.lib import new_upload_entry
+from mediagoblin.submit.task import collect_garbage
+from mediagoblin.db.models import User, MediaEntry, TextComment, Comment
from mediagoblin.tests.tools import fixture_add_user, fixture_media_entry
@@ -37,25 +46,31 @@ def test_user_deletes_other_comments(test_app):
Session.flush()
# Create all 4 possible comments:
- for u_id in (user_a.id, user_b.id):
- for m_id in (media_a.id, media_b.id):
- cmt = MediaComment()
- cmt.media_entry = m_id
- cmt.author = u_id
+ for u in (user_a, user_b):
+ for m in (media_a, media_b):
+ cmt = TextComment()
+ cmt.actor = u.id
cmt.content = u"Some Comment"
Session.add(cmt)
+ # think i need this to get the command ID
+ Session.flush()
+
+ link = Comment()
+ link.target = m
+ link.comment = cmt
+ Session.add(link)
Session.flush()
usr_cnt1 = User.query.count()
med_cnt1 = MediaEntry.query.count()
- cmt_cnt1 = MediaComment.query.count()
+ cmt_cnt1 = Comment.query.count()
User.query.get(user_a.id).delete(commit=False)
usr_cnt2 = User.query.count()
med_cnt2 = MediaEntry.query.count()
- cmt_cnt2 = MediaComment.query.count()
+ cmt_cnt2 = Comment.query.count()
# One user deleted
assert usr_cnt2 == usr_cnt1 - 1
@@ -68,7 +83,7 @@ def test_user_deletes_other_comments(test_app):
usr_cnt2 = User.query.count()
med_cnt2 = MediaEntry.query.count()
- cmt_cnt2 = MediaComment.query.count()
+ cmt_cnt2 = Comment.query.count()
# All users gone
assert usr_cnt2 == usr_cnt1 - 2
@@ -91,3 +106,66 @@ def test_media_deletes_broken_attachment(test_app):
MediaEntry.query.get(media.id).delete()
User.query.get(user_a.id).delete()
+
+def test_garbage_collection_task(test_app):
+ """ Test old media entry are removed by GC task """
+ user = fixture_add_user()
+
+ # Create a media entry that's unprocessed and over an hour old.
+ entry_id = 72
+ now = datetime.datetime.now(pytz.UTC)
+ file_data = FileStorage(
+ stream=open(GOOD_JPG, "rb"),
+ filename="mah_test.jpg",
+ content_type="image/jpeg"
+ )
+
+ # Find media manager
+ media_type, media_manager = sniff_media(file_data, "mah_test.jpg")
+ entry = new_upload_entry(user)
+ entry.id = entry_id
+ entry.title = "Mah Image"
+ entry.slug = "slugy-slug-slug"
+ entry.media_type = 'image'
+ entry.created = now - datetime.timedelta(days=2)
+ entry.save()
+
+ # Validate the model exists
+ assert MediaEntry.query.filter_by(id=entry_id).first() is not None
+
+ # Call the garbage collection task
+ collect_garbage()
+
+ # Now validate the image has been deleted
+ assert MediaEntry.query.filter_by(id=entry_id).first() is None
+
+def test_comments_removed_when_graveyarded(test_app):
+ """ Checks comments which are tombstones are removed from collection """
+ user = fixture_add_user()
+ media = fixture_media_entry(
+ uploader=user.id,
+ expunge=False,
+ fake_upload=False
+ )
+
+ # Add the TextComment
+ comment = TextComment()
+ comment.actor = user.id
+ comment.content = u"This is a comment that will be deleted."
+ comment.save()
+
+ # Add a link for the comment
+ link = Comment()
+ link.target = media
+ link.comment = comment
+ link.save()
+
+ # First double check it's there and all is well...
+ assert Comment.query.filter_by(target_id=link.target_id).first() is not None
+
+ # Now delete the comment.
+ comment.delete()
+
+ # Verify this also deleted the Comment link, ergo there is no comment left.
+ assert Comment.query.filter_by(target_id=link.target_id).first() is None
+
diff --git a/mediagoblin/tests/test_modelmethods.py b/mediagoblin/tests/test_modelmethods.py
index ca436c76..4c66e27b 100644
--- a/mediagoblin/tests/test_modelmethods.py
+++ b/mediagoblin/tests/test_modelmethods.py
@@ -17,13 +17,20 @@
# Maybe not every model needs a test, but some models have special
# methods, and so it makes sense to test them here.
+from __future__ import print_function
+
from mediagoblin.db.base import Session
-from mediagoblin.db.models import MediaEntry, User, Privilege
+from mediagoblin.db.models import MediaEntry, User, LocalUser, Privilege, \
+ Activity, Generator
from mediagoblin.tests import MGClientTestCase
-from mediagoblin.tests.tools import fixture_add_user
+from mediagoblin.tests.tools import fixture_add_user, fixture_media_entry, \
+ fixture_add_activity
-import mock
+try:
+ import mock
+except ImportError:
+ import unittest.mock as mock
import pytest
@@ -49,7 +56,7 @@ class TestMediaEntrySlugs(object):
entry.title = title or u"Some title"
entry.slug = slug
entry.id = this_id
- entry.uploader = uploader or self.chris_user.id
+ entry.actor = uploader or self.chris_user.id
entry.media_type = u'image'
if save:
@@ -161,10 +168,10 @@ class TestUserHasPrivilege:
privileges=[u'admin',u'moderator',u'active'])
fixture_add_user(u'aeva',
privileges=[u'moderator',u'active'])
- self.natalie_user = User.query.filter(
- User.username==u'natalie').first()
- self.aeva_user = User.query.filter(
- User.username==u'aeva').first()
+ self.natalie_user = LocalUser.query.filter(
+ LocalUser.username==u'natalie').first()
+ self.aeva_user = LocalUser.query.filter(
+ LocalUser.username==u'aeva').first()
def test_privilege_added_correctly(self, test_app):
self._setup()
@@ -179,20 +186,17 @@ class TestUserHasPrivilege:
self._setup()
# then test out the user.has_privilege method for one privilege
- assert not self.natalie_user.has_privilege(u'commenter')
- assert self.aeva_user.has_privilege(u'active')
-
+ assert not self.aeva_user.has_privilege(u'admin')
+ assert self.natalie_user.has_privilege(u'active')
- def test_user_has_privileges_multiple(self, test_app):
+ def test_allow_admin(self, test_app):
self._setup()
- # when multiple args are passed to has_privilege, the method returns
- # True if the user has ANY of the privileges
- assert self.natalie_user.has_privilege(u'admin',u'commenter')
- assert self.aeva_user.has_privilege(u'moderator',u'active')
- assert not self.natalie_user.has_privilege(u'commenter',u'uploader')
-
+ # This should work because she is an admin.
+ assert self.natalie_user.has_privilege(u'commenter')
+ # Test that we can look this out ignoring that she's an admin
+ assert not self.natalie_user.has_privilege(u'commenter', allow_admin=False)
def test_media_data_init(test_app):
Session.rollback()
@@ -205,7 +209,7 @@ def test_media_data_init(test_app):
obj_in_session = 0
for obj in Session():
obj_in_session += 1
- print repr(obj)
+ print(repr(obj))
assert obj_in_session == 0
diff --git a/mediagoblin/tests/test_moderation.py b/mediagoblin/tests/test_moderation.py
index e7a0ebef..55bb4c4b 100644
--- a/mediagoblin/tests/test_moderation.py
+++ b/mediagoblin/tests/test_moderation.py
@@ -18,7 +18,8 @@ import pytest
from mediagoblin.tests.tools import (fixture_add_user,
fixture_add_comment_report, fixture_add_comment)
-from mediagoblin.db.models import User, CommentReport, MediaComment, UserBan
+from mediagoblin.db.models import User, LocalUser, Report, TextComment, \
+ UserBan, GenericModelReference
from mediagoblin.tools import template, mail
from webtest import AppError
@@ -47,9 +48,9 @@ class TestModerationViews:
self.query_for_users()
def query_for_users(self):
- self.admin_user = User.query.filter(User.username==u'admin').first()
- self.mod_user = User.query.filter(User.username==u'moderator').first()
- self.user = User.query.filter(User.username==u'regular').first()
+ self.admin_user = LocalUser.query.filter(LocalUser.username==u'admin').first()
+ self.mod_user = LocalUser.query.filter(LocalUser.username==u'moderator').first()
+ self.user = LocalUser.query.filter(LocalUser.username==u'regular').first()
def do_post(self, data, *context_keys, **kwargs):
url = kwargs.pop('url', '/submit/')
@@ -102,15 +103,15 @@ class TestModerationViews:
# to a reported comment
#----------------------------------------------------------------------
fixture_add_comment_report(reported_user=self.user)
- comment_report = CommentReport.query.filter(
- CommentReport.reported_user==self.user).first()
+ comment_report = Report.query.filter(
+ Report.reported_user==self.user).first()
response = self.test_app.get('/mod/reports/{0}/'.format(
comment_report.id))
assert response.status == '200 OK'
self.query_for_users()
- comment_report = CommentReport.query.filter(
- CommentReport.reported_user==self.user).first()
+ comment_report = Report.query.filter(
+ Report.reported_user==self.user).first()
response, context = self.do_post({'action_to_resolve':[u'takeaway'],
'take_away_privileges':[u'commenter'],
@@ -118,15 +119,15 @@ class TestModerationViews:
url='/mod/reports/{0}/'.format(comment_report.id))
self.query_for_users()
- comment_report = CommentReport.query.filter(
- CommentReport.reported_user==self.user).first()
+ comment_report = Report.query.filter(
+ Report.reported_user==self.user).first()
assert response.status == '302 FOUND'
assert not self.user.has_privilege(u'commenter')
assert comment_report.is_archived_report() is True
fixture_add_comment_report(reported_user=self.user)
- comment_report = CommentReport.query.filter(
- CommentReport.reported_user==self.user).first()
+ comment_report = Report.query.filter(
+ Report.reported_user==self.user).first()
# Then, test a moderator sending an email to a user in response to a
# reported comment
@@ -139,8 +140,8 @@ class TestModerationViews:
url='/mod/reports/{0}/'.format(comment_report.id))
self.query_for_users()
- comment_report = CommentReport.query.filter(
- CommentReport.reported_user==self.user).first()
+ comment_report = Report.query.filter(
+ Report.reported_user==self.user).first()
assert response.status == '302 FOUND'
assert mail.EMAIL_TEST_MBOX_INBOX == [{'to': [u'regular@example.com'],
'message': 'Content-Type: text/plain; charset="utf-8"\n\
@@ -157,13 +158,17 @@ VGhpcyBpcyB5b3VyIGxhc3Qgd2FybmluZywgcmVndWxhci4uLi4=\n',
self.query_for_users()
fixture_add_comment(author=self.user.id,
comment=u'Comment will be removed')
- test_comment = MediaComment.query.filter(
- MediaComment.author==self.user.id).first()
+ test_comment = TextComment.query.filter(
+ TextComment.actor==self.user.id).first()
fixture_add_comment_report(comment=test_comment,
reported_user=self.user)
- comment_report = CommentReport.query.filter(
- CommentReport.comment==test_comment).filter(
- CommentReport.resolved==None).first()
+ comment_gmr = GenericModelReference.query.filter_by(
+ obj_pk=test_comment.id,
+ model_type=test_comment.__tablename__
+ ).first()
+ comment_report = Report.query.filter(
+ Report.object_id==comment_gmr.id).filter(
+ Report.resolved==None).first()
response, context = self.do_post(
{'action_to_resolve':[u'userban', u'delete'],
@@ -176,17 +181,17 @@ VGhpcyBpcyB5b3VyIGxhc3Qgd2FybmluZywgcmVndWxhci4uLi4=\n',
test_user_ban = UserBan.query.filter(
UserBan.user_id == self.user.id).first()
assert test_user_ban is not None
- test_comment = MediaComment.query.filter(
- MediaComment.author==self.user.id).first()
+ test_comment = TextComment.query.filter(
+ TextComment.actor==self.user.id).first()
assert test_comment is None
# Then, test what happens when a moderator attempts to punish an admin
# from a reported comment on an admin.
#----------------------------------------------------------------------
fixture_add_comment_report(reported_user=self.admin_user)
- comment_report = CommentReport.query.filter(
- CommentReport.reported_user==self.admin_user).filter(
- CommentReport.resolved==None).first()
+ comment_report = Report.query.filter(
+ Report.reported_user==self.admin_user).filter(
+ Report.resolved==None).first()
response, context = self.do_post({'action_to_resolve':[u'takeaway'],
'take_away_privileges':[u'active'],
diff --git a/mediagoblin/tests/test_notifications.py b/mediagoblin/tests/test_notifications.py
index 3bf36f5f..776bfc71 100644
--- a/mediagoblin/tests/test_notifications.py
+++ b/mediagoblin/tests/test_notifications.py
@@ -16,12 +16,11 @@
import pytest
-import urlparse
+import six.moves.urllib.parse as urlparse
from mediagoblin.tools import template, mail
-from mediagoblin.db.models import Notification, CommentNotification, \
- CommentSubscription
+from mediagoblin.db.models import Notification, CommentSubscription
from mediagoblin.db.base import Session
from mediagoblin.notifications import mark_comment_notification_seen
@@ -109,39 +108,52 @@ class TestNotifications:
notification = notifications[0]
- assert type(notification) == CommentNotification
assert notification.seen == False
assert notification.user_id == user.id
- assert notification.subject.get_author.id == self.test_user.id
- assert notification.subject.content == u'Test comment #42'
+ assert notification.obj().comment().get_actor.id == self.test_user.id
+ assert notification.obj().comment().content == u'Test comment #42'
if wants_email == True:
- assert mail.EMAIL_TEST_MBOX_INBOX == [
- {'from': 'notice@mediagoblin.example.org',
- 'message': 'Content-Type: text/plain; \
+ # Why the `or' here? In Werkzeug 0.11.0 and above
+ # werkzeug stopped showing the port for localhost when
+ # rendering something like this. As long as we're
+ # supporting pre-0.11.0 we'll keep this `or', but maybe
+ # in the future we can kill it.
+ assert (
+ mail.EMAIL_TEST_MBOX_INBOX == [
+ {'from': 'notice@mediagoblin.example.org',
+ 'message': 'Content-Type: text/plain; \
charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: \
base64\nSubject: GNU MediaGoblin - chris commented on your \
post\nFrom: notice@mediagoblin.example.org\nTo: \
otherperson@example.com\n\nSGkgb3RoZXJwZXJzb24sCmNocmlzIGNvbW1lbnRlZCBvbiB5b3VyIHBvc3QgKGh0dHA6Ly9sb2Nh\nbGhvc3Q6ODAvdS9vdGhlcnBlcnNvbi9tL3NvbWUtdGl0bGUvYy8xLyNjb21tZW50KSBhdCBHTlUg\nTWVkaWFHb2JsaW4KClRlc3QgY29tbWVudCAjNDIKCkdOVSBNZWRpYUdvYmxpbg==\n',
- 'to': [u'otherperson@example.com']}]
+ 'to': [u'otherperson@example.com']}]
+ or mail.EMAIL_TEST_MBOX_INBOX == [
+ {'from': 'notice@mediagoblin.example.org',
+ 'message': 'Content-Type: text/plain; \
+charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: \
+base64\nSubject: GNU MediaGoblin - chris commented on your \
+post\nFrom: notice@mediagoblin.example.org\nTo: \
+otherperson@example.com\n\nSGkgb3RoZXJwZXJzb24sCmNocmlzIGNvbW1lbnRlZCBvbiB5b3VyIHBvc3QgKGh0dHA6Ly9sb2Nh\nbGhvc3QvdS9vdGhlcnBlcnNvbi9tL3NvbWUtdGl0bGUvYy8xLyNjb21tZW50KSBhdCBHTlUgTWVk\naWFHb2JsaW4KClRlc3QgY29tbWVudCAjNDIKCkdOVSBNZWRpYUdvYmxpbg==\n',
+ 'to': [u'otherperson@example.com']}])
else:
assert mail.EMAIL_TEST_MBOX_INBOX == []
# Save the ids temporarily because of DetachedInstanceError
notification_id = notification.id
- comment_id = notification.subject.id
+ comment_id = notification.obj().id
self.logout()
self.login('otherperson', 'nosreprehto')
- self.test_app.get(media_uri_slug + '/c/{0}/'.format(comment_id))
+ self.test_app.get(media_uri_slug + 'c/{0}/'.format(comment_id))
notification = Notification.query.filter_by(id=notification_id).first()
assert notification.seen == True
- self.test_app.get(media_uri_slug + '/notifications/silence/')
+ self.test_app.get(media_uri_slug + 'notifications/silence/')
subscription = CommentSubscription.query.filter_by(id=subscription_id)\
.first()
diff --git a/mediagoblin/tests/test_oauth1.py b/mediagoblin/tests/test_oauth1.py
index 073c2884..e41a68c7 100644
--- a/mediagoblin/tests/test_oauth1.py
+++ b/mediagoblin/tests/test_oauth1.py
@@ -14,10 +14,9 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import cgi
-
import pytest
-from urlparse import parse_qs, urlparse
+
+from six.moves.urllib.parse import parse_qs, urlparse
from oauthlib.oauth1 import Client
@@ -52,8 +51,8 @@ class TestOAuth(object):
def register_client(self, **kwargs):
""" Regiters a client with the API """
-
- kwargs["type"] = "client_associate"
+
+ kwargs["type"] = "client_associate"
kwargs["application_type"] = kwargs.get("application_type", "native")
return self.test_app.post("/api/client/register", kwargs)
@@ -63,7 +62,7 @@ class TestOAuth(object):
client_info = response.json
client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
-
+
assert response.status_int == 200
assert client is not None
@@ -73,7 +72,7 @@ class TestOAuth(object):
"application_name": "Testificate MD",
"application_type": "web",
"contacts": "someone@someplace.com tuteo@tsengeo.lu",
- "logo_url": "http://ayrel.com/utral.png",
+ "logo_uri": "http://ayrel.com/utral.png",
"redirect_uris": "http://navi-kosman.lu http://gmg-yawne-oeru.lu",
}
@@ -81,12 +80,12 @@ class TestOAuth(object):
client_info = response.json
client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
-
+
assert client is not None
assert client.secret == client_info["client_secret"]
assert client.application_type == query["application_type"]
assert client.redirect_uri == query["redirect_uris"].split()
- assert client.logo_url == query["logo_url"]
+ assert client.logo_url == query["logo_uri"]
assert client.contacts == query["contacts"].split()
@@ -103,7 +102,7 @@ class TestOAuth(object):
"type": "client_update",
"application_name": "neytiri",
"contacts": "someone@someplace.com abc@cba.com",
- "logo_url": "http://place.com/picture.png",
+ "logo_uri": "http://place.com/picture.png",
"application_type": "web",
"redirect_uris": "http://blah.gmg/whatever https://inboxen.org/",
}
@@ -118,7 +117,7 @@ class TestOAuth(object):
assert client.application_type == update_query["application_type"]
assert client.application_name == update_query["application_name"]
assert client.contacts == update_query["contacts"].split()
- assert client.logo_url == update_query["logo_url"]
+ assert client.logo_url == update_query["logo_uri"]
assert client.redirect_uri == update_query["redirect_uris"].split()
def to_authorize_headers(self, data):
@@ -146,7 +145,7 @@ class TestOAuth(object):
headers["Content-Type"] = self.MIME_FORM
response = self.test_app.post(endpoint, headers=headers)
- response = cgi.parse_qs(response.body)
+ response = parse_qs(response.body.decode())
# each element is a list, reduce it to a string
for key, value in response.items():
@@ -163,4 +162,3 @@ class TestOAuth(object):
assert request_token.client == client.id
assert request_token.used == False
assert request_token.callback == request_query["oauth_callback"]
-
diff --git a/mediagoblin/tests/test_oauth2.py b/mediagoblin/tests/test_oauth2.py
deleted file mode 100644
index 957f4e65..00000000
--- a/mediagoblin/tests/test_oauth2.py
+++ /dev/null
@@ -1,223 +0,0 @@
-# GNU MediaGoblin -- federated, autonomous media hosting
-# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import json
-import logging
-
-import pytest
-from urlparse import parse_qs, urlparse
-
-from mediagoblin import mg_globals
-from mediagoblin.tools import template, pluginapi
-from mediagoblin.tests.tools import fixture_add_user
-
-
-_log = logging.getLogger(__name__)
-
-
-class TestOAuth(object):
- @pytest.fixture(autouse=True)
- def setup(self, test_app):
- self.test_app = test_app
-
- self.db = mg_globals.database
-
- self.pman = pluginapi.PluginManager()
-
- self.user_password = u'4cc355_70k3N'
- self.user = fixture_add_user(u'joauth', self.user_password,
- privileges=[u'active'])
-
- self.login()
-
- def login(self):
- self.test_app.post(
- '/auth/login/', {
- 'username': self.user.username,
- 'password': self.user_password})
-
- def register_client(self, name, client_type, description=None,
- redirect_uri=''):
- return self.test_app.post(
- '/oauth-2/client/register', {
- 'name': name,
- 'description': description,
- 'type': client_type,
- 'redirect_uri': redirect_uri})
-
- def get_context(self, template_name):
- return template.TEMPLATE_TEST_CONTEXT[template_name]
-
- def test_1_public_client_registration_without_redirect_uri(self):
- ''' Test 'public' OAuth client registration without any redirect uri '''
- response = self.register_client(
- u'OMGOMGOMG', 'public', 'OMGOMG Apache License v2')
-
- ctx = self.get_context('oauth/client/register.html')
-
- client = self.db.OAuthClient.query.filter(
- self.db.OAuthClient.name == u'OMGOMGOMG').first()
-
- assert response.status_int == 200
-
- # Should display an error
- assert len(ctx['form'].redirect_uri.errors)
-
- # Should not pass through
- assert not client
-
- def test_2_successful_public_client_registration(self):
- ''' Successfully register a public client '''
- uri = 'http://foo.example'
- self.register_client(
- u'OMGOMG', 'public', 'OMG!', uri)
-
- client = self.db.OAuthClient.query.filter(
- self.db.OAuthClient.name == u'OMGOMG').first()
-
- # redirect_uri should be set
- assert client.redirect_uri == uri
-
- # Client should have been registered
- assert client
-
- def test_3_successful_confidential_client_reg(self):
- ''' Register a confidential OAuth client '''
- response = self.register_client(
- u'GMOGMO', 'confidential', 'NO GMO!')
-
- assert response.status_int == 302
-
- client = self.db.OAuthClient.query.filter(
- self.db.OAuthClient.name == u'GMOGMO').first()
-
- # Client should have been registered
- assert client
-
- return client
-
- def test_4_authorize_confidential_client(self):
- ''' Authorize a confidential client as a logged in user '''
- client = self.test_3_successful_confidential_client_reg()
-
- client_identifier = client.identifier
-
- redirect_uri = 'https://foo.example'
- response = self.test_app.get('/oauth-2/authorize', {
- 'client_id': client.identifier,
- 'scope': 'all',
- 'redirect_uri': redirect_uri})
-
- # User-agent should NOT be redirected
- assert response.status_int == 200
-
- ctx = self.get_context('oauth/authorize.html')
-
- form = ctx['form']
-
- # Short for client authorization post reponse
- capr = self.test_app.post(
- '/oauth-2/client/authorize', {
- 'client_id': form.client_id.data,
- 'allow': 'Allow',
- 'next': form.next.data})
-
- assert capr.status_int == 302
-
- authorization_response = capr.follow()
-
- assert authorization_response.location.startswith(redirect_uri)
-
- return authorization_response, client_identifier
-
- def get_code_from_redirect_uri(self, uri):
- ''' Get the value of ?code= from an URI '''
- return parse_qs(urlparse(uri).query)['code'][0]
-
- def test_token_endpoint_successful_confidential_request(self):
- ''' Successful request against token endpoint '''
- code_redirect, client_id = self.test_4_authorize_confidential_client()
-
- code = self.get_code_from_redirect_uri(code_redirect.location)
-
- client = self.db.OAuthClient.query.filter(
- self.db.OAuthClient.identifier == unicode(client_id)).first()
-
- token_res = self.test_app.get('/oauth-2/access_token?client_id={0}&\
-code={1}&client_secret={2}'.format(client_id, code, client.secret))
-
- assert token_res.status_int == 200
-
- token_data = json.loads(token_res.body)
-
- assert not 'error' in token_data
- assert 'access_token' in token_data
- assert 'token_type' in token_data
- assert 'expires_in' in token_data
- assert type(token_data['expires_in']) == int
- assert token_data['expires_in'] > 0
-
- # There should be a refresh token provided in the token data
- assert len(token_data['refresh_token'])
-
- return client_id, token_data
-
- def test_token_endpont_missing_id_confidential_request(self):
- ''' Unsuccessful request against token endpoint, missing client_id '''
- code_redirect, client_id = self.test_4_authorize_confidential_client()
-
- code = self.get_code_from_redirect_uri(code_redirect.location)
-
- client = self.db.OAuthClient.query.filter(
- self.db.OAuthClient.identifier == unicode(client_id)).first()
-
- token_res = self.test_app.get('/oauth-2/access_token?\
-code={0}&client_secret={1}'.format(code, client.secret))
-
- assert token_res.status_int == 200
-
- token_data = json.loads(token_res.body)
-
- assert 'error' in token_data
- assert not 'access_token' in token_data
- assert token_data['error'] == 'invalid_request'
- assert len(token_data['error_description'])
-
- def test_refresh_token(self):
- ''' Try to get a new access token using the refresh token '''
- # Get an access token and a refresh token
- client_id, token_data =\
- self.test_token_endpoint_successful_confidential_request()
-
- client = self.db.OAuthClient.query.filter(
- self.db.OAuthClient.identifier == client_id).first()
-
- token_res = self.test_app.get('/oauth-2/access_token',
- {'refresh_token': token_data['refresh_token'],
- 'client_id': client_id,
- 'client_secret': client.secret
- })
-
- assert token_res.status_int == 200
-
- new_token_data = json.loads(token_res.body)
-
- assert not 'error' in new_token_data
- assert 'access_token' in new_token_data
- assert 'token_type' in new_token_data
- assert 'expires_in' in new_token_data
- assert type(new_token_data['expires_in']) == int
- assert new_token_data['expires_in'] > 0
diff --git a/mediagoblin/tests/test_openid.py b/mediagoblin/tests/test_openid.py
index 0424fdda..71767032 100644
--- a/mediagoblin/tests/test_openid.py
+++ b/mediagoblin/tests/test_openid.py
@@ -14,17 +14,21 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import urlparse
import pkg_resources
import pytest
-import mock
+import six
+import six.moves.urllib.parse as urlparse
+try:
+ import mock
+except ImportError:
+ import unittest.mock as mock
openid_consumer = pytest.importorskip(
"openid.consumer.consumer")
from mediagoblin import mg_globals
from mediagoblin.db.base import Session
-from mediagoblin.db.models import User
+from mediagoblin.db.models import User, LocalUser
from mediagoblin.plugins.openid.models import OpenIDUserURL
from mediagoblin.tests.tools import get_app, fixture_add_user
from mediagoblin.tools import template
@@ -188,8 +192,9 @@ class TestOpenIDPlugin(object):
openid_plugin_app.get('/auth/logout')
# Get user and detach from session
- test_user = mg_globals.database.User.query.filter_by(
- username=u'chris').first()
+ test_user = mg_globals.database.LocalUser.query.filter(
+ LocalUser.username==u'chris'
+ ).first()
Session.expunge(test_user)
# Log back in
@@ -206,7 +211,7 @@ class TestOpenIDPlugin(object):
# Make sure user is in the session
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
session = context['request'].session
- assert session['user_id'] == unicode(test_user.id)
+ assert session['user_id'] == six.text_type(test_user.id)
_test_new_user()
diff --git a/mediagoblin/tests/test_paste.ini b/mediagoblin/tests/test_paste.ini
index a9595432..1c5f09fa 100644
--- a/mediagoblin/tests/test_paste.ini
+++ b/mediagoblin/tests/test_paste.ini
@@ -1,40 +1,18 @@
[DEFAULT]
debug = true
-[composite:main]
-use = egg:Paste#urlmap
-/ = mediagoblin
-/mgoblin_media/ = publicstore_serve
-/test_static/ = mediagoblin_static
-/theme_static/ = theme_static
-/plugin_static/ = plugin_static
-
-[app:mediagoblin]
+[app:main]
use = egg:mediagoblin#app
config = %(here)s/mediagoblin.ini
-
-[app:publicstore_serve]
-use = egg:Paste#static
-document_root = %(here)s/user_dev/media/public
-
-[app:mediagoblin_static]
-use = egg:Paste#static
-document_root = %(here)s/mediagoblin/static/
-
-[app:theme_static]
-use = egg:Paste#static
-document_root = %(here)s/user_dev/theme_static/
-cache_max_age = 86400
-
-[app:plugin_static]
-use = egg:Paste#static
-document_root = %(here)s/user_dev/plugin_static/
-cache_max_age = 86400
+/mgoblin_media = %(here)s/user_dev/media/public
+/test_static = %(here)s/mediagoblin/static
+/theme_static = %(here)s/user_dev/theme_static
+/plugin_static = %(here)s/user_dev/plugin_static
[celery]
CELERY_ALWAYS_EAGER = true
[server:main]
-use = egg:Paste#http
+use = egg:waitress#main
host = 127.0.0.1
port = 6543
diff --git a/mediagoblin/tests/test_pdf.py b/mediagoblin/tests/test_pdf.py
index b4d1940a..7107dc9a 100644
--- a/mediagoblin/tests/test_pdf.py
+++ b/mediagoblin/tests/test_pdf.py
@@ -14,6 +14,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import collections
import tempfile
import shutil
import os
@@ -21,19 +22,25 @@ import pytest
from mediagoblin.media_types.pdf.processing import (
pdf_info, check_prerequisites, create_pdf_thumb)
-from .resources import GOOD_PDF as GOOD
+from .resources import GOOD_PDF
-@pytest.mark.skipif("not check_prerequisites()")
+@pytest.mark.skipif("not os.path.exists(GOOD_PDF) or not check_prerequisites()")
def test_pdf():
- good_dict = {'pdf_version_major': 1, 'pdf_title': '',
- 'pdf_page_size_width': 612, 'pdf_author': '',
- 'pdf_keywords': '', 'pdf_pages': 10,
- 'pdf_producer': 'dvips + GNU Ghostscript 7.05',
- 'pdf_version_minor': 3,
- 'pdf_creator': 'LaTeX with hyperref package',
- 'pdf_page_size_height': 792}
- assert pdf_info(GOOD) == good_dict
+ expected_dict = {'pdf_author': -1,
+ 'pdf_creator': -1,
+ 'pdf_keywords': -1,
+ 'pdf_page_size_height': -1,
+ 'pdf_page_size_width': -1,
+ 'pdf_pages': -1,
+ 'pdf_producer': -1,
+ 'pdf_title': -1,
+ 'pdf_version_major': 1,
+ 'pdf_version_minor': -1}
+ good_info = pdf_info(GOOD_PDF)
+ for k, v in expected_dict.items():
+ assert(k in good_info)
+ assert(v == -1 or v == good_info[k])
temp_dir = tempfile.mkdtemp()
- create_pdf_thumb(GOOD, os.path.join(temp_dir, 'good_256_256.png'), 256, 256)
+ create_pdf_thumb(GOOD_PDF, os.path.join(temp_dir, 'good_256_256.png'), 256, 256)
shutil.rmtree(temp_dir)
diff --git a/mediagoblin/tests/test_persona.py b/mediagoblin/tests/test_persona.py
index a1cd30eb..437cb7a1 100644
--- a/mediagoblin/tests/test_persona.py
+++ b/mediagoblin/tests/test_persona.py
@@ -13,16 +13,22 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import urlparse
+
import pkg_resources
import pytest
-import mock
+import six
+try:
+ import mock
+except ImportError:
+ import unittest.mock as mock
+
+import six.moves.urllib.parse as urlparse
pytest.importorskip("requests")
from mediagoblin import mg_globals
from mediagoblin.db.base import Session
-from mediagoblin.db.models import Privilege
+from mediagoblin.db.models import Privilege, LocalUser
from mediagoblin.tests.tools import get_app
from mediagoblin.tools import template
@@ -111,14 +117,16 @@ class TestPersonaPlugin(object):
persona_plugin_app.get('/auth/logout/')
# Get user and detach from session
- test_user = mg_globals.database.User.query.filter_by(
- username=u'chris').first()
+ test_user = mg_globals.database.LocalUser.query.filter(
+ LocalUser.username==u'chris'
+ ).first()
active_privilege = Privilege.query.filter(
Privilege.privilege_name==u'active').first()
test_user.all_privileges.append(active_privilege)
test_user.save()
- test_user = mg_globals.database.User.query.filter_by(
- username=u'chris').first()
+ test_user = mg_globals.database.LocalUser.query.filter(
+ LocalUser.username==u'chris'
+ ).first()
Session.expunge(test_user)
# Add another user for _test_edit_persona
@@ -140,7 +148,7 @@ class TestPersonaPlugin(object):
# Make sure user is in the session
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
session = context['request'].session
- assert session['user_id'] == unicode(test_user.id)
+ assert session['user_id'] == six.text_type(test_user.id)
_test_registration()
diff --git a/mediagoblin/tests/test_piwigo.py b/mediagoblin/tests/test_piwigo.py
index 16ad0111..33aea580 100644
--- a/mediagoblin/tests/test_piwigo.py
+++ b/mediagoblin/tests/test_piwigo.py
@@ -44,28 +44,23 @@ class Test_PWG(object):
def test_session(self):
resp = self.do_post("pwg.session.login",
{"username": u"nouser", "password": "wrong"})
- assert resp.body == XML_PREFIX \
- + '<rsp stat="fail"><err code="999" msg="Invalid username/password"/></rsp>'
+ assert resp.body == (XML_PREFIX + '<rsp stat="fail"><err code="999" msg="Invalid username/password"/></rsp>').encode('ascii')
resp = self.do_post("pwg.session.login",
{"username": self.username, "password": "wrong"})
- assert resp.body == XML_PREFIX \
- + '<rsp stat="fail"><err code="999" msg="Invalid username/password"/></rsp>'
+ assert resp.body == (XML_PREFIX + '<rsp stat="fail"><err code="999" msg="Invalid username/password"/></rsp>').encode('ascii')
resp = self.do_get("pwg.session.getStatus")
- assert resp.body == XML_PREFIX \
- + '<rsp stat="ok"><username>guest</username></rsp>'
+ assert resp.body == (XML_PREFIX + '<rsp stat="ok"><username>guest</username></rsp>').encode('ascii')
resp = self.do_post("pwg.session.login",
{"username": self.username, "password": self.password})
- assert resp.body == XML_PREFIX + '<rsp stat="ok">1</rsp>'
+ assert resp.body == (XML_PREFIX + '<rsp stat="ok">1</rsp>').encode('ascii')
resp = self.do_get("pwg.session.getStatus")
- assert resp.body == XML_PREFIX \
- + '<rsp stat="ok"><username>chris</username></rsp>'
+ assert resp.body == (XML_PREFIX + '<rsp stat="ok"><username>chris</username></rsp>').encode('ascii')
self.do_get("pwg.session.logout")
resp = self.do_get("pwg.session.getStatus")
- assert resp.body == XML_PREFIX \
- + '<rsp stat="ok"><username>guest</username></rsp>'
+ assert resp.body == (XML_PREFIX + '<rsp stat="ok"><username>guest</username></rsp>').encode('ascii')
diff --git a/mediagoblin/tests/test_pluginapi.py b/mediagoblin/tests/test_pluginapi.py
index eae0ce15..2fd6df39 100644
--- a/mediagoblin/tests/test_pluginapi.py
+++ b/mediagoblin/tests/test_pluginapi.py
@@ -224,7 +224,7 @@ def test_hook_handle():
assert pluginapi.hook_handle(
"nothing_handling", call_log, unhandled_okay=True) is None
assert call_log == []
-
+
# Multiple provided, go with the first!
call_log = []
assert pluginapi.hook_handle(
@@ -348,7 +348,7 @@ def test_modify_context(context_modified_app):
"""
# Specific thing passed into a page
result = context_modified_app.get("/modify_context/specific/")
- assert result.body.strip() == """Specific page!
+ assert result.body.strip() == b"""Specific page!
specific thing: in yer specificpage
global thing: globally appended!
@@ -357,7 +357,7 @@ doubleme: happyhappy"""
# General test, should have global context variable only
result = context_modified_app.get("/modify_context/")
- assert result.body.strip() == """General page!
+ assert result.body.strip() == b"""General page!
global thing: globally appended!
lol: cats
@@ -421,7 +421,7 @@ def test_plugin_assetlink(static_plugin_app):
junk_file_path = os.path.join(
linked_assets_dir.rstrip(os.path.sep),
'junk.txt')
- with file(junk_file_path, 'w') as junk_file:
+ with open(junk_file_path, 'w') as junk_file:
junk_file.write('barf')
os.unlink(plugin_link_dir)
@@ -440,14 +440,14 @@ to:
# link dir exists, but is a non-symlink
os.unlink(plugin_link_dir)
- with file(plugin_link_dir, 'w') as clobber_file:
+ with open(plugin_link_dir, 'w') as clobber_file:
clobber_file.write('clobbered!')
result = run_assetlink().collection[0]
assert result == 'Could not link "staticstuff": %s exists and is not a symlink\n' % (
plugin_link_dir)
- with file(plugin_link_dir, 'r') as clobber_file:
+ with open(plugin_link_dir, 'r') as clobber_file:
assert clobber_file.read() == 'clobbered!'
@@ -456,11 +456,10 @@ def test_plugin_staticdirect(static_plugin_app):
Test that the staticdirect utilities pull up the right things
"""
result = json.loads(
- static_plugin_app.get('/staticstuff/').body)
+ static_plugin_app.get('/staticstuff/').body.decode())
assert len(result) == 2
assert result['mgoblin_bunny_pic'] == '/test_static/images/bunny_pic.png'
assert result['plugin_bunny_css'] == \
'/plugin_static/staticstuff/css/bunnify.css'
-
diff --git a/mediagoblin/tests/test_privileges.py b/mediagoblin/tests/test_privileges.py
index 05829b34..2e0b7347 100644
--- a/mediagoblin/tests/test_privileges.py
+++ b/mediagoblin/tests/test_privileges.py
@@ -14,13 +14,14 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import six
import pytest
from datetime import date, timedelta
from webtest import AppError
from mediagoblin.tests.tools import fixture_add_user, fixture_media_entry
-from mediagoblin.db.models import User, UserBan
+from mediagoblin.db.models import User, LocalUser, UserBan
from mediagoblin.tools import template
from .resources import GOOD_JPG
@@ -63,9 +64,9 @@ class TestPrivilegeFunctionality:
return response, context_data
def query_for_users(self):
- self.admin_user = User.query.filter(User.username==u'alex').first()
- self.mod_user = User.query.filter(User.username==u'meow').first()
- self.user = User.query.filter(User.username==u'natalie').first()
+ self.admin_user = LocalUser.query.filter(LocalUser.username==u'alex').first()
+ self.mod_user = LocalUser.query.filter(LocalUser.username==u'meow').first()
+ self.user = LocalUser.query.filter(LocalUser.username==u'natalie').first()
def testUserBanned(self):
self.login(u'natalie')
@@ -79,7 +80,7 @@ class TestPrivilegeFunctionality:
response = self.test_app.get('/')
assert response.status == "200 OK"
- assert "You are Banned" in response.body
+ assert b"You are Banned" in response.body
# Then test what happens when that ban has an expiration date which
# hasn't happened yet
#----------------------------------------------------------------------
@@ -92,7 +93,7 @@ class TestPrivilegeFunctionality:
response = self.test_app.get('/')
assert response.status == "200 OK"
- assert "You are Banned" in response.body
+ assert b"You are Banned" in response.body
# Then test what happens when that ban has an expiration date which
# has already happened
@@ -107,7 +108,7 @@ class TestPrivilegeFunctionality:
response = self.test_app.get('/')
assert response.status == "302 FOUND"
- assert not "You are Banned" in response.body
+ assert not b"You are Banned" in response.body
def testVariousPrivileges(self):
# The various actions that require privileges (ex. reporting,
@@ -127,14 +128,16 @@ class TestPrivilegeFunctionality:
#----------------------------------------------------------------------
with pytest.raises(AppError) as excinfo:
response = self.test_app.get('/submit/')
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
with pytest.raises(AppError) as excinfo:
response = self.do_post({'upload_files':[('file',GOOD_JPG)],
'title':u'Normal Upload 1'},
url='/submit/')
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
# Test that a user cannot comment without the commenter privilege
#----------------------------------------------------------------------
@@ -149,50 +152,58 @@ class TestPrivilegeFunctionality:
media_uri_slug = '/u/{0}/m/{1}/'.format(self.admin_user.username,
media_entry.slug)
response = self.test_app.get(media_uri_slug)
- assert not "Add a comment" in response.body
+ assert not b"Add a comment" in response.body
self.query_for_users()
with pytest.raises(AppError) as excinfo:
response = self.test_app.post(
media_uri_id + 'comment/add/',
{'comment_content': u'Test comment #42'})
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
# Test that a user cannot report without the reporter privilege
#----------------------------------------------------------------------
with pytest.raises(AppError) as excinfo:
response = self.test_app.get(media_uri_slug+"report/")
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
with pytest.raises(AppError) as excinfo:
response = self.do_post(
{'report_reason':u'Testing Reports #1',
'reporter_id':u'3'},
url=(media_uri_slug+"report/"))
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
# Test that a user cannot access the moderation pages w/o moderator
# or admin privileges
#----------------------------------------------------------------------
with pytest.raises(AppError) as excinfo:
response = self.test_app.get("/mod/users/")
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
with pytest.raises(AppError) as excinfo:
response = self.test_app.get("/mod/reports/")
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
with pytest.raises(AppError) as excinfo:
response = self.test_app.get("/mod/media/")
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
with pytest.raises(AppError) as excinfo:
response = self.test_app.get("/mod/users/1/")
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
with pytest.raises(AppError) as excinfo:
response = self.test_app.get("/mod/reports/1/")
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
self.query_for_users()
@@ -202,4 +213,5 @@ class TestPrivilegeFunctionality:
'targeted_user':self.admin_user.id},
url='/mod/reports/1/')
self.query_for_users()
- assert 'Bad response: 403 FORBIDDEN' in str(excinfo)
+ excinfo = str(excinfo) if six.PY2 else str(excinfo).encode('ascii')
+ assert b'Bad response: 403 FORBIDDEN' in excinfo
diff --git a/mediagoblin/tests/test_reporting.py b/mediagoblin/tests/test_reporting.py
index a154a061..803fc849 100644
--- a/mediagoblin/tests/test_reporting.py
+++ b/mediagoblin/tests/test_reporting.py
@@ -15,12 +15,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytest
+import six
from mediagoblin.tools import template
from mediagoblin.tests.tools import (fixture_add_user, fixture_media_entry,
fixture_add_comment, fixture_add_comment_report)
-from mediagoblin.db.models import (MediaReport, CommentReport, User,
- MediaComment)
+from mediagoblin.db.models import Report, User, LocalUser, TextComment
class TestReportFiling:
@@ -55,8 +55,8 @@ class TestReportFiling:
return response, context_data
def query_for_users(self):
- return (User.query.filter(User.username==u'allie').first(),
- User.query.filter(User.username==u'natalie').first())
+ return (LocalUser.query.filter(LocalUser.username==u'allie').first(),
+ LocalUser.query.filter(LocalUser.username==u'natalie').first())
def testMediaReports(self):
self.login(u'allie')
@@ -75,11 +75,11 @@ class TestReportFiling:
response, context = self.do_post(
{'report_reason':u'Testing Media Report',
- 'reporter_id':unicode(allie_id)},url= media_uri_slug + "report/")
+ 'reporter_id':six.text_type(allie_id)},url= media_uri_slug + "report/")
assert response.status == "302 FOUND"
- media_report = MediaReport.query.first()
+ media_report = Report.query.first()
allie_user, natalie_user = self.query_for_users()
assert media_report is not None
@@ -87,7 +87,6 @@ class TestReportFiling:
assert media_report.reporter_id == allie_id
assert media_report.reported_user_id == natalie_user.id
assert media_report.created is not None
- assert media_report.discriminator == 'media_report'
def testCommentReports(self):
self.login(u'allie')
@@ -97,9 +96,11 @@ class TestReportFiling:
media_entry = fixture_media_entry(uploader=natalie_user.id,
state=u'processed')
mid = media_entry.id
- fixture_add_comment(media_entry=mid,
- author=natalie_user.id)
- comment = MediaComment.query.first()
+ fixture_add_comment(
+ media_entry=media_entry,
+ author=natalie_user.id
+ )
+ comment = TextComment.query.first()
comment_uri_slug = '/u/{0}/m/{1}/c/{2}/'.format(natalie_user.username,
media_entry.slug,
@@ -110,11 +111,11 @@ class TestReportFiling:
response, context = self.do_post({
'report_reason':u'Testing Comment Report',
- 'reporter_id':unicode(allie_id)},url= comment_uri_slug + "report/")
+ 'reporter_id':six.text_type(allie_id)},url= comment_uri_slug + "report/")
assert response.status == "302 FOUND"
- comment_report = CommentReport.query.first()
+ comment_report = Report.query.first()
allie_user, natalie_user = self.query_for_users()
assert comment_report is not None
@@ -122,7 +123,6 @@ class TestReportFiling:
assert comment_report.reporter_id == allie_id
assert comment_report.reported_user_id == natalie_user.id
assert comment_report.created is not None
- assert comment_report.discriminator == 'comment_report'
def testArchivingReports(self):
self.login(u'natalie')
@@ -131,14 +131,14 @@ class TestReportFiling:
fixture_add_comment(author=allie_user.id,
comment=u'Comment will be removed')
- test_comment = MediaComment.query.filter(
- MediaComment.author==allie_user.id).first()
+ test_comment = TextComment.query.filter(
+ TextComment.actor==allie_user.id).first()
fixture_add_comment_report(comment=test_comment,
reported_user=allie_user,
report_content=u'Testing Archived Reports #1',
reporter=natalie_user)
- comment_report = CommentReport.query.filter(
- CommentReport.reported_user==allie_user).first()
+ comment_report = Report.query.filter(
+ Report.reported_user==allie_user).first()
assert comment_report.report_content == u'Testing Archived Reports #1'
response, context = self.do_post(
@@ -150,10 +150,10 @@ class TestReportFiling:
assert response.status == "302 FOUND"
allie_user, natalie_user = self.query_for_users()
- archived_report = CommentReport.query.filter(
- CommentReport.reported_user==allie_user).first()
+ archived_report = Report.query.filter(
+ Report.reported_user==allie_user).first()
- assert CommentReport.query.count() != 0
+ assert Report.query.count() != 0
assert archived_report is not None
assert archived_report.report_content == u'Testing Archived Reports #1'
assert archived_report.reporter_id == natalie_id
@@ -163,5 +163,3 @@ class TestReportFiling:
assert archived_report.result == u'''This is a test of archiving reports.
natalie banned user allie indefinitely.
natalie deleted the comment.'''
- assert archived_report.discriminator == 'comment_report'
-
diff --git a/mediagoblin/tests/test_response.py b/mediagoblin/tests/test_response.py
new file mode 100644
index 00000000..7f929155
--- /dev/null
+++ b/mediagoblin/tests/test_response.py
@@ -0,0 +1,65 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from __future__ import absolute_import, unicode_literals
+
+from werkzeug.wrappers import Request
+
+from ..tools.response import redirect, redirect_obj
+
+class TestRedirect(object):
+ def test_redirect_respects_location(self):
+ """Test that redirect returns a 302 to location specified."""
+ request = Request({})
+ response = redirect(request, location='/test')
+ assert response.status_code == 302
+ assert response.location == '/test'
+
+ def test_redirect_respects_querystring(self):
+ """Test that redirect includes querystring in returned location."""
+ request = Request({})
+ response = redirect(request, location='', querystring='#baz')
+ assert response.location == '#baz'
+
+ def test_redirect_respects_urlgen_args(self):
+ """Test that redirect returns a 302 to location from urlgen args."""
+
+ # Using a mock urlgen here so we're only testing redirect itself. We
+ # could instantiate a url_map and map_adaptor with WSGI environ as per
+ # app.py, but that would really just be testing Werkzeug.
+ def urlgen(endpoint, **kwargs):
+ return '/test?foo=bar'
+
+ request = Request({})
+ request.urlgen = urlgen
+ response = redirect(request, 'test-endpoint', foo='bar')
+ assert response.status_code == 302
+ assert response.location == '/test?foo=bar'
+
+ def test_redirect_obj_calls_url_for_self(self):
+ """Test that redirect_obj returns a 302 to obj's url_for_self()."""
+
+ # Using a mock obj here so that we're only testing redirect_obj itself,
+ # rather than also testing the url_for_self implementation.
+ class Foo(object):
+ def url_for_self(*args, **kwargs):
+ return '/foo'
+
+ request = Request({})
+ request.urlgen = None
+ response = redirect_obj(request, Foo())
+ assert response.status_code == 302
+ assert response.location == '/foo'
diff --git a/mediagoblin/tests/test_sql_migrations.py b/mediagoblin/tests/test_sql_migrations.py
index 3d67fdf6..97d7da09 100644
--- a/mediagoblin/tests/test_sql_migrations.py
+++ b/mediagoblin/tests/test_sql_migrations.py
@@ -14,6 +14,11 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import six
+import pytest
+
+pytest.importorskip("migrate")
+
import copy
from sqlalchemy import (
@@ -23,7 +28,11 @@ from sqlalchemy import (
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import select, insert
-from migrate import changeset
+try:
+ from migrate import changeset
+except ImportError:
+ # We'll be skipping in this case anyway
+ pass
from mediagoblin.db.base import GMGTableBase
from mediagoblin.db.migration_tools import MigrationManager, RegisterMigration
@@ -58,10 +67,6 @@ class Level1(Base1):
SET1_MODELS = [Creature1, Level1]
-FOUNDATIONS = {Creature1:[{'name':u'goblin','num_legs':2,'is_demon':False},
- {'name':u'cerberus','num_legs':4,'is_demon':True}]
- }
-
SET1_MIGRATIONS = {}
#######################################################
@@ -190,7 +195,7 @@ def level_exits_new_table(db_conn):
for level in result:
- for exit_name, to_level in level['exits'].iteritems():
+ for exit_name, to_level in six.iteritems(level['exits']):
# Insert the level exit
db_conn.execute(
level_exits.insert().values(
@@ -575,7 +580,7 @@ def test_set1_to_set3():
printer = CollectingPrinter()
migration_manager = MigrationManager(
- u'__main__', SET1_MODELS, FOUNDATIONS, SET1_MIGRATIONS, Session(),
+ u'__main__', SET1_MODELS, SET1_MIGRATIONS, Session(),
printer)
# Check latest migration and database current migration
@@ -588,8 +593,7 @@ def test_set1_to_set3():
assert result == u'inited'
# Check output
assert printer.combined_string == (
- "-> Initializing main mediagoblin tables... done.\n" + \
- " + Laying foundations for Creature1 table\n" )
+ "-> Initializing main mediagoblin tables... done.\n")
# Check version in database
assert migration_manager.latest_migration == 0
assert migration_manager.database_current_migration == 0
@@ -602,7 +606,7 @@ def test_set1_to_set3():
# Try to "re-migrate" with same manager settings... nothing should happen
migration_manager = MigrationManager(
- u'__main__', SET1_MODELS, FOUNDATIONS, SET1_MIGRATIONS,
+ u'__main__', SET1_MODELS, SET1_MIGRATIONS,
Session(), printer)
assert migration_manager.init_or_migrate() == None
@@ -644,18 +648,6 @@ def test_set1_to_set3():
# Now check to see if stuff seems to be in there.
session = Session()
- # Check the creation of the foundation rows on the creature table
- creature = session.query(Creature1).filter_by(
- name=u'goblin').one()
- assert creature.num_legs == 2
- assert creature.is_demon == False
-
- creature = session.query(Creature1).filter_by(
- name=u'cerberus').one()
- assert creature.num_legs == 4
- assert creature.is_demon == True
-
-
# Check the creation of the inserted rows on the creature and levels tables
creature = session.query(Creature1).filter_by(
@@ -698,7 +690,7 @@ def test_set1_to_set3():
# isn't said to be updated yet
printer = CollectingPrinter()
migration_manager = MigrationManager(
- u'__main__', SET3_MODELS, FOUNDATIONS, SET3_MIGRATIONS, Session(),
+ u'__main__', SET3_MODELS, SET3_MIGRATIONS, Session(),
printer)
assert migration_manager.latest_migration == 8
@@ -725,7 +717,7 @@ def test_set1_to_set3():
# Make sure version matches expected
migration_manager = MigrationManager(
- u'__main__', SET3_MODELS, FOUNDATIONS, SET3_MIGRATIONS, Session(),
+ u'__main__', SET3_MODELS, SET3_MIGRATIONS, Session(),
printer)
assert migration_manager.latest_migration == 8
assert migration_manager.database_current_migration == 8
@@ -793,12 +785,6 @@ def test_set1_to_set3():
session = Session()
- # Start with making sure that the foundations did not run again
- assert session.query(Creature3).filter_by(
- name=u'goblin').count() == 1
- assert session.query(Creature3).filter_by(
- name=u'cerberus').count() == 1
-
# Then make sure the models have been migrated correctly
creature = session.query(Creature3).filter_by(
name=u'centipede').one()
diff --git a/mediagoblin/tests/test_storage.py b/mediagoblin/tests/test_storage.py
index f6f1d18f..a4c3e4eb 100644
--- a/mediagoblin/tests/test_storage.py
+++ b/mediagoblin/tests/test_storage.py
@@ -19,6 +19,8 @@ import os
import tempfile
import pytest
+import six
+
from werkzeug.utils import secure_filename
from mediagoblin import storage
@@ -45,7 +47,7 @@ def test_clean_listy_filepath():
storage.clean_listy_filepath(['../../', 'linooks.jpg'])
-class FakeStorageSystem():
+class FakeStorageSystem(object):
def __init__(self, foobie, blech, **kwargs):
self.foobie = foobie
self.blech = blech
@@ -78,8 +80,8 @@ def test_storage_system_from_config():
'mediagoblin.tests.test_storage:FakeStorageSystem'})
assert this_storage.foobie == 'eiboof'
assert this_storage.blech == 'hcelb'
- assert unicode(this_storage.__class__) == \
- u'mediagoblin.tests.test_storage.FakeStorageSystem'
+ assert six.text_type(this_storage.__class__) == \
+ u"<class 'mediagoblin.tests.test_storage.FakeStorageSystem'>"
##########################
@@ -168,11 +170,11 @@ def test_basic_storage_get_file():
filepath = ['dir1', 'dir2', 'ourfile.txt']
with this_storage.get_file(filepath, 'w') as our_file:
- our_file.write('First file')
+ our_file.write(b'First file')
with this_storage.get_file(filepath, 'r') as our_file:
- assert our_file.read() == 'First file'
+ assert our_file.read() == b'First file'
assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
- with file(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file:
+ with open(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file:
assert our_file.read() == 'First file'
# Write to the same path but try to get a unique file.
@@ -180,21 +182,21 @@ def test_basic_storage_get_file():
assert not os.path.exists(os.path.join(tmpdir, *new_filepath))
with this_storage.get_file(new_filepath, 'w') as our_file:
- our_file.write('Second file')
+ our_file.write(b'Second file')
with this_storage.get_file(new_filepath, 'r') as our_file:
- assert our_file.read() == 'Second file'
+ assert our_file.read() == b'Second file'
assert os.path.exists(os.path.join(tmpdir, *new_filepath))
- with file(os.path.join(tmpdir, *new_filepath), 'r') as our_file:
+ with open(os.path.join(tmpdir, *new_filepath), 'r') as our_file:
assert our_file.read() == 'Second file'
# Read from an existing file
manually_written_file = os.makedirs(
os.path.join(tmpdir, 'testydir'))
- with file(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile:
+ with open(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile:
testyfile.write('testy file! so testy.')
with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile:
- assert testyfile.read() == 'testy file! so testy.'
+ assert testyfile.read() == b'testy file! so testy.'
this_storage.delete_file(filepath)
this_storage.delete_file(new_filepath)
@@ -210,7 +212,7 @@ def test_basic_storage_delete_file():
filepath = ['dir1', 'dir2', 'ourfile.txt']
with this_storage.get_file(filepath, 'w') as our_file:
- our_file.write('Testing this file')
+ our_file.write(b'Testing this file')
assert os.path.exists(
os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
@@ -279,14 +281,14 @@ def test_basic_storage_copy_locally():
filepath = ['dir1', 'dir2', 'ourfile.txt']
with this_storage.get_file(filepath, 'w') as our_file:
- our_file.write('Testing this file')
+ our_file.write(b'Testing this file')
new_file_dest = os.path.join(dest_tmpdir, 'file2.txt')
this_storage.copy_locally(filepath, new_file_dest)
this_storage.delete_file(filepath)
- assert file(new_file_dest).read() == 'Testing this file'
+ assert open(new_file_dest).read() == 'Testing this file'
os.remove(new_file_dest)
os.rmdir(dest_tmpdir)
@@ -295,7 +297,7 @@ def test_basic_storage_copy_locally():
def _test_copy_local_to_storage_works(tmpdir, this_storage):
local_filename = tempfile.mktemp()
- with file(local_filename, 'w') as tmpfile:
+ with open(local_filename, 'w') as tmpfile:
tmpfile.write('haha')
this_storage.copy_local_to_storage(
@@ -303,7 +305,7 @@ def _test_copy_local_to_storage_works(tmpdir, this_storage):
os.remove(local_filename)
- assert file(
+ assert open(
os.path.join(tmpdir, 'dir1/dir2/copiedto.txt'),
'r').read() == 'haha'
diff --git a/mediagoblin/tests/test_submission.py b/mediagoblin/tests/test_submission.py
index b5b13ed3..f51b132c 100644
--- a/mediagoblin/tests/test_submission.py
+++ b/mediagoblin/tests/test_submission.py
@@ -14,17 +14,46 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import sys
-reload(sys)
-sys.setdefaultencoding('utf-8')
+## Optional audio/video stuff
+
+SKIP_AUDIO = False
+SKIP_VIDEO = False
+
+try:
+ import gi.repository.Gst
+ # this gst initialization stuff is really required here
+ import gi
+ gi.require_version('Gst', '1.0')
+ from gi.repository import Gst
+ Gst.init(None)
+ from .media_tools import create_av
+except ImportError:
+ SKIP_AUDIO = True
+ SKIP_VIDEO = True
+
+try:
+ import scikits.audiolab
+except ImportError:
+ SKIP_AUDIO = True
+
+import six
+
+if six.PY2: # this hack only work in Python 2
+ import sys
+ reload(sys)
+ sys.setdefaultencoding('utf-8')
-import urlparse
import os
import pytest
+import webtest.forms
+import pkg_resources
-from mediagoblin.tests.tools import fixture_add_user
+import six.moves.urllib.parse as urlparse
+
+from mediagoblin.tests.tools import (
+ fixture_add_user, fixture_add_collection, get_app)
from mediagoblin import mg_globals
-from mediagoblin.db.models import MediaEntry, User
+from mediagoblin.db.models import MediaEntry, User, LocalUser, Activity
from mediagoblin.db.base import Session
from mediagoblin.tools import template
from mediagoblin.media_types.image import ImageMediaManager
@@ -34,13 +63,46 @@ from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \
BIG_BLUE, GOOD_PDF, GPS_JPG, MED_PNG, BIG_PNG
GOOD_TAG_STRING = u'yin,yang'
-BAD_TAG_STRING = unicode('rage,' + 'f' * 26 + 'u' * 26)
+BAD_TAG_STRING = six.text_type('rage,' + 'f' * 26 + 'u' * 26)
FORM_CONTEXT = ['mediagoblin/submit/start.html', 'submit_form']
REQUEST_CONTEXT = ['mediagoblin/user_pages/user.html', 'request']
-class TestSubmission:
+@pytest.fixture()
+def audio_plugin_app(request):
+ return get_app(
+ request,
+ mgoblin_config=pkg_resources.resource_filename(
+ 'mediagoblin.tests',
+ 'test_mgoblin_app_audio.ini'))
+
+@pytest.fixture()
+def video_plugin_app(request):
+ return get_app(
+ request,
+ mgoblin_config=pkg_resources.resource_filename(
+ 'mediagoblin.tests',
+ 'test_mgoblin_app_video.ini'))
+
+@pytest.fixture()
+def audio_video_plugin_app(request):
+ return get_app(
+ request,
+ mgoblin_config=pkg_resources.resource_filename(
+ 'mediagoblin.tests',
+ 'test_mgoblin_app_audio_video.ini'))
+
+@pytest.fixture()
+def pdf_plugin_app(request):
+ return get_app(
+ request,
+ mgoblin_config=pkg_resources.resource_filename(
+ 'mediagoblin.tests',
+ 'test_mgoblin_app_pdf.ini'))
+
+
+class BaseTestSubmission:
@pytest.fixture(autouse=True)
def setup(self, test_app):
self.test_app = test_app
@@ -61,7 +123,7 @@ class TestSubmission:
#### totally stupid.
#### Also if we found a way to make this run it should be a
#### property.
- return User.query.filter(User.username==u'chris').first()
+ return LocalUser.query.filter(LocalUser.username==u'chris').first()
def login(self):
self.test_app.post(
@@ -88,19 +150,14 @@ class TestSubmission:
return {'upload_files': [('file', filename)]}
def check_comments(self, request, media_id, count):
- comments = request.db.MediaComment.query.filter_by(media_entry=media_id)
- assert count == len(list(comments))
-
- def test_missing_fields(self):
- # Test blank form
- # ---------------
- response, form = self.do_post({}, *FORM_CONTEXT)
- assert form.file.errors == [u'You must provide a file.']
-
- # Test blank file
- # ---------------
- response, form = self.do_post({'title': u'test title'}, *FORM_CONTEXT)
- assert form.file.errors == [u'You must provide a file.']
+ gmr = request.db.GenericModelReference.query.filter_by(
+ obj_pk=media_id,
+ model_type=request.db.MediaEntry.__tablename__
+ ).first()
+ if gmr is None and count <= 0:
+ return # Yerp it's fine.
+ comments = request.db.Comment.query.filter_by(target_id=gmr.id)
+ assert count == comments.count()
def check_url(self, response, path):
assert urlparse.urlsplit(response.location)[2] == path
@@ -129,6 +186,19 @@ class TestSubmission:
our_user.save()
Session.expunge(our_user)
+
+class TestSubmissionBasics(BaseTestSubmission):
+ def test_missing_fields(self):
+ # Test blank form
+ # ---------------
+ response, form = self.do_post({}, *FORM_CONTEXT)
+ assert form.file.errors == [u'You must provide a file.']
+
+ # Test blank file
+ # ---------------
+ response, form = self.do_post({'title': u'test title'}, *FORM_CONTEXT)
+ assert form.file.errors == [u'You must provide a file.']
+
def test_normal_jpg(self):
# User uploaded should be 0
assert self.our_user().uploaded == 0
@@ -142,17 +212,19 @@ class TestSubmission:
# Reload user
assert self.our_user().uploaded == file_size
+ def test_public_id_populated(self):
+ # Upload the image first.
+ response, request = self.do_post({'title': u'Balanced Goblin'},
+ *REQUEST_CONTEXT, do_follow=True,
+ **self.upload_data(GOOD_JPG))
+ media = self.check_media(request, {'title': u'Balanced Goblin'}, 1)
+
+ # Now check that the public_id attribute is set.
+ assert media.public_id != None
+
def test_normal_png(self):
self.check_normal_upload(u'Normal upload 2', GOOD_PNG)
- @pytest.mark.skipif("not pdf_check_prerequisites()")
- def test_normal_pdf(self):
- response, context = self.do_post({'title': u'Normal upload 3 (pdf)'},
- do_follow=True,
- **self.upload_data(GOOD_PDF))
- self.check_url(response, '/u/{0}/'.format(self.our_user().username))
- assert 'mediagoblin/user_pages/user.html' in context
-
def test_default_upload_limits(self):
self.user_upload_limits(uploaded=500)
@@ -359,7 +431,7 @@ class TestSubmission:
def test_media_data(self):
self.check_normal_upload(u"With GPS data", GPS_JPG)
media = self.check_media(None, {"title": u"With GPS data"}, 1)
- assert media.media_data.gps_latitude == 59.336666666666666
+ assert media.get_location.position["latitude"] == 59.336666666666666
def test_processing(self):
public_store_dir = mg_globals.global_config[
@@ -382,3 +454,140 @@ class TestSubmission:
size = os.stat(filename).st_size
assert last_size > size
last_size = size
+
+ def test_collection_selection(self):
+ """Test the ability to choose a collection when submitting media
+ """
+ # Collection option should have been removed if the user has no
+ # collections.
+ response = self.test_app.get('/submit/')
+ assert 'collection' not in response.form.fields
+
+ # Test upload of an image when a user has no collections.
+ upload = webtest.forms.Upload(os.path.join(
+ 'mediagoblin', 'static', 'images', 'media_thumbs', 'image.png'))
+ response.form['file'] = upload
+ no_collection_title = 'no collection'
+ response.form['title'] = no_collection_title
+ response.form.submit()
+ assert MediaEntry.query.filter_by(
+ actor=self.our_user().id
+ ).first().title == no_collection_title
+
+ # Collection option should be present if the user has collections. It
+ # shouldn't allow other users' collections to be selected.
+ col = fixture_add_collection(user=self.our_user())
+ user = fixture_add_user(username=u'different')
+ fixture_add_collection(user=user, name=u'different')
+ response = self.test_app.get('/submit/')
+ form = response.form
+ assert 'collection' in form.fields
+ # Option length is 2, because of the default "--Select--" option
+ assert len(form['collection'].options) == 2
+ assert form['collection'].options[1][2] == col.title
+
+ # Test that if we specify a collection then the media entry is added to
+ # the specified collection.
+ form['file'] = upload
+ title = 'new picture'
+ form['title'] = title
+ form['collection'] = form['collection'].options[1][0]
+ form.submit()
+ # The title of the first item in our user's first collection should
+ # match the title of the picture that was just added.
+ col = self.our_user().collections[0]
+ assert col.collection_items[0].get_object().title == title
+
+ # Test that an activity was created when the item was added to the
+ # collection. That should be the last activity.
+ assert Activity.query.order_by(
+ Activity.id.desc()
+ ).first().content == '{0} added new picture to {1}'.format(
+ self.our_user().username, col.title)
+
+ # Test upload succeeds if the user has collection and no collection is
+ # chosen.
+ form['file'] = webtest.forms.Upload(os.path.join(
+ 'mediagoblin', 'static', 'images', 'media_thumbs', 'image.png'))
+ title = 'no collection 2'
+ form['title'] = title
+ form['collection'] = form['collection'].options[0][0]
+ form.submit()
+ # The title of the first item in our user's first collection should
+ # match the title of the picture that was just added.
+ assert MediaEntry.query.filter_by(
+ actor=self.our_user().id
+ ).count() == 3
+
+class TestSubmissionVideo(BaseTestSubmission):
+ @pytest.fixture(autouse=True)
+ def setup(self, video_plugin_app):
+ self.test_app = video_plugin_app
+
+ # TODO: Possibly abstract into a decorator like:
+ # @as_authenticated_user('chris')
+ fixture_add_user(privileges=[u'active',u'uploader', u'commenter'])
+
+ self.login()
+
+ @pytest.mark.skipif(SKIP_VIDEO,
+ reason="Dependencies for video not met")
+ def test_video(self, video_plugin_app):
+ with create_av(make_video=True) as path:
+ self.check_normal_upload('Video', path)
+
+
+class TestSubmissionAudio(BaseTestSubmission):
+ @pytest.fixture(autouse=True)
+ def setup(self, audio_plugin_app):
+ self.test_app = audio_plugin_app
+
+ # TODO: Possibly abstract into a decorator like:
+ # @as_authenticated_user('chris')
+ fixture_add_user(privileges=[u'active',u'uploader', u'commenter'])
+
+ self.login()
+
+ @pytest.mark.skipif(SKIP_AUDIO,
+ reason="Dependencies for audio not met")
+ def test_audio(self, audio_plugin_app):
+ with create_av(make_audio=True) as path:
+ self.check_normal_upload('Audio', path)
+
+
+class TestSubmissionAudioVideo(BaseTestSubmission):
+ @pytest.fixture(autouse=True)
+ def setup(self, audio_video_plugin_app):
+ self.test_app = audio_video_plugin_app
+
+ # TODO: Possibly abstract into a decorator like:
+ # @as_authenticated_user('chris')
+ fixture_add_user(privileges=[u'active',u'uploader', u'commenter'])
+
+ self.login()
+
+ @pytest.mark.skipif(SKIP_AUDIO or SKIP_VIDEO,
+ reason="Dependencies for audio or video not met")
+ def test_audio_and_video(self):
+ with create_av(make_audio=True, make_video=True) as path:
+ self.check_normal_upload('Audio and Video', path)
+
+
+class TestSubmissionPDF(BaseTestSubmission):
+ @pytest.fixture(autouse=True)
+ def setup(self, pdf_plugin_app):
+ self.test_app = pdf_plugin_app
+
+ # TODO: Possibly abstract into a decorator like:
+ # @as_authenticated_user('chris')
+ fixture_add_user(privileges=[u'active',u'uploader', u'commenter'])
+
+ self.login()
+
+ @pytest.mark.skipif("not os.path.exists(GOOD_PDF) or not pdf_check_prerequisites()")
+ def test_normal_pdf(self):
+ response, context = self.do_post({'title': u'Normal upload 3 (pdf)'},
+ do_follow=True,
+ **self.upload_data(GOOD_PDF))
+ self.check_url(response, '/u/{0}/'.format(self.our_user().username))
+ assert 'mediagoblin/user_pages/user.html' in context
diff --git a/mediagoblin/tests/test_submission/good.pdf b/mediagoblin/tests/test_submission/good.pdf
index ab5db006..d7029f36 100644..120000
--- a/mediagoblin/tests/test_submission/good.pdf
+++ b/mediagoblin/tests/test_submission/good.pdf
Binary files differ
diff --git a/mediagoblin/tests/test_tags.py b/mediagoblin/tests/test_tags.py
index e25cc283..8358b052 100644
--- a/mediagoblin/tests/test_tags.py
+++ b/mediagoblin/tests/test_tags.py
@@ -33,6 +33,10 @@ def test_list_of_dicts_conversion(test_app):
assert text.convert_to_tag_list_of_dicts('echo,echo') == [{'name': u'echo',
'slug': u'echo'}]
+ # When checking for duplicates, use the slug, not the tag
+ assert text.convert_to_tag_list_of_dicts('echo,#echo') == [{'name': u'#echo',
+ 'slug': u'echo'}]
+
# Make sure converting the list of dicts to a string works
assert text.media_tags_as_string([{'name': u'yin', 'slug': u'yin'},
{'name': u'yang', 'slug': u'yang'}]) == \
diff --git a/mediagoblin/tests/test_timesince.py b/mediagoblin/tests/test_timesince.py
index 6579eb09..99ae31da 100644
--- a/mediagoblin/tests/test_timesince.py
+++ b/mediagoblin/tests/test_timesince.py
@@ -16,7 +16,7 @@
from datetime import datetime, timedelta
-from mediagoblin.tools.timesince import is_aware, timesince
+from mediagoblin.tools.timesince import timesince
def test_timesince():
diff --git a/mediagoblin/tests/test_tools.py b/mediagoblin/tests/test_tools.py
new file mode 100644
index 00000000..5f916400
--- /dev/null
+++ b/mediagoblin/tests/test_tools.py
@@ -0,0 +1,118 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from __future__ import absolute_import, unicode_literals
+
+try:
+ import mock
+except ImportError:
+ import unittest.mock as mock
+
+from werkzeug.wrappers import Request
+from werkzeug.test import EnvironBuilder
+
+from mediagoblin.tools.request import decode_request
+from mediagoblin.tools.pagination import Pagination
+
+class TestDecodeRequest(object):
+ """Test the decode_request function."""
+
+ def test_form_type(self):
+ """Try a normal form-urlencoded request."""
+ builder = EnvironBuilder(method='POST', data={'foo': 'bar'})
+ request = Request(builder.get_environ())
+ data = decode_request(request)
+ assert data['foo'] == 'bar'
+
+ def test_json_type(self):
+ """Try a normal JSON request."""
+ builder = EnvironBuilder(
+ method='POST', content_type='application/json',
+ data='{"foo": "bar"}')
+ request = Request(builder.get_environ())
+ data = decode_request(request)
+ assert data['foo'] == 'bar'
+
+ def test_content_type_with_options(self):
+ """Content-Type can also have options."""
+ builder = EnvironBuilder(
+ method='POST',
+ content_type='application/x-www-form-urlencoded; charset=utf-8')
+ request = Request(builder.get_environ())
+ # Must populate form field manually with non-default content-type.
+ request.form = {'foo': 'bar'}
+ data = decode_request(request)
+ assert data['foo'] == 'bar'
+
+ def test_form_type_is_default(self):
+ """Assume form-urlencoded if blank in the request."""
+ builder = EnvironBuilder(method='POST', content_type='')
+ request = Request(builder.get_environ())
+ # Must populate form field manually with non-default content-type.
+ request.form = {'foo': 'bar'}
+ data = decode_request(request)
+ assert data['foo'] == 'bar'
+
+
+class TestPagination(object):
+ def _create_paginator(self, num_items, page, per_page):
+ """Create a Paginator with a mock database cursor."""
+ mock_cursor = mock.MagicMock()
+ mock_cursor.count.return_value = num_items
+ return Pagination(page, mock_cursor, per_page)
+
+ def test_creates_valid_page_url_from_explicit_base_url(self):
+ """Check that test_page_url_explicit runs.
+
+ This is a regression test for a Python 2/3 compatibility fix.
+
+ """
+ paginator = self._create_paginator(num_items=1, page=1, per_page=30)
+ url = paginator.get_page_url_explicit('http://example.com', [], 1)
+ assert url == 'http://example.com?page=1'
+
+ def test_iter_pages_handles_single_page(self):
+ """Check that iter_pages produces the expected result for single page.
+
+ This is a regression test for a Python 2/3 compatibility fix.
+
+ """
+ paginator = self._create_paginator(num_items=1, page=1, per_page=30)
+ assert list(paginator.iter_pages()) == [1]
+
+ def test_zero_items(self):
+ """Check that no items produces no pages."""
+ paginator = self._create_paginator(num_items=0, page=1, per_page=30)
+ assert paginator.total_count == 0
+ assert paginator.pages == 0
+
+ def test_single_item(self):
+ """Check that one item produces one page."""
+ paginator = self._create_paginator(num_items=1, page=1, per_page=30)
+ assert paginator.total_count == 1
+ assert paginator.pages == 1
+
+ def test_full_page(self):
+ """Check that a full page of items produces one page."""
+ paginator = self._create_paginator(num_items=30, page=1, per_page=30)
+ assert paginator.total_count == 30
+ assert paginator.pages == 1
+
+ def test_multiple_pages(self):
+ """Check that more than a full page produces two pages."""
+ paginator = self._create_paginator(num_items=31, page=1, per_page=30)
+ assert paginator.total_count == 31
+ assert paginator.pages == 2
diff --git a/mediagoblin/tests/test_util.py b/mediagoblin/tests/test_util.py
index 9d9b1c16..02976405 100644
--- a/mediagoblin/tests/test_util.py
+++ b/mediagoblin/tests/test_util.py
@@ -14,8 +14,20 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+try:
+ import mock
+except ImportError:
+ import unittest.mock as mock
import email
+import socket
+import pytest
+import smtplib
+import pkg_resources
+import six
+
+from mediagoblin.tests.tools import get_app
+from mediagoblin import mg_globals
from mediagoblin.tools import common, url, translate, mail, text, testing
testing._activate_testing()
@@ -52,7 +64,7 @@ I hope you like unit tests JUST AS MUCH AS I DO!""")
assert message['From'] == "sender@mediagoblin.example.org"
assert message['To'] == "amanda@example.org, akila@example.org"
assert message['Subject'] == "Testing is so much fun!"
- assert message.get_payload(decode=True) == """HAYYY GUYS!
+ assert message.get_payload(decode=True) == b"""HAYYY GUYS!
I hope you like unit tests JUST AS MUCH AS I DO!"""
@@ -65,10 +77,32 @@ I hope you like unit tests JUST AS MUCH AS I DO!"""
assert mbox_message['From'] == "sender@mediagoblin.example.org"
assert mbox_message['To'] == "amanda@example.org, akila@example.org"
assert mbox_message['Subject'] == "Testing is so much fun!"
- assert mbox_message.get_payload(decode=True) == """HAYYY GUYS!
+ assert mbox_message.get_payload(decode=True) == b"""HAYYY GUYS!
I hope you like unit tests JUST AS MUCH AS I DO!"""
+@pytest.fixture()
+def starttls_enabled_app(request):
+ return get_app(
+ request,
+ mgoblin_config=pkg_resources.resource_filename(
+ "mediagoblin.tests",
+ "starttls_config.ini"
+ )
+ )
+
+def test_email_force_starttls(starttls_enabled_app):
+ common.TESTS_ENABLED = False
+ SMTP = lambda *args, **kwargs: mail.FakeMhost()
+ with mock.patch('smtplib.SMTP', SMTP):
+ with pytest.raises(smtplib.SMTPException):
+ mail.send_email(
+ from_addr="notices@my.test.instance.com",
+ to_addrs="someone@someplace.com",
+ subject="Testing is so much fun!",
+ message_body="Ohai ^_^"
+ )
+
def test_slugify():
assert url.slugify(u'a walk in the park') == u'a-walk-in-the-park'
assert url.slugify(u'A Walk in the Park') == u'a-walk-in-the-park'
@@ -117,13 +151,13 @@ def test_gettext_lazy_proxy():
orig = u"Password"
set_thread_locale("es")
- p1 = unicode(proxy)
+ p1 = six.text_type(proxy)
p1_should = pass_to_ugettext(orig)
assert p1_should != orig, "Test useless, string not translated"
assert p1 == p1_should
set_thread_locale("sv")
- p2 = unicode(proxy)
+ p2 = six.text_type(proxy)
p2_should = pass_to_ugettext(orig)
assert p2_should != orig, "Test broken, string not translated"
assert p2 == p2_should
@@ -149,3 +183,30 @@ def test_html_cleaner():
'<p><a href="javascript:nasty_surprise">innocent link!</a></p>')
assert result == (
'<p><a href="">innocent link!</a></p>')
+
+
+class TestMail(object):
+ """ Test mediagoblin's mail tool """
+ def test_no_mail_server(self):
+ """ Tests that no smtp server is available """
+ with pytest.raises(mail.NoSMTPServerError), mock.patch("smtplib.SMTP") as smtp_mock:
+ smtp_mock.side_effect = socket.error
+ mg_globals.app_config = {
+ "email_debug_mode": False,
+ "email_smtp_use_ssl": False,
+ "email_smtp_host": "127.0.0.1",
+ "email_smtp_port": 0}
+ common.TESTS_ENABLED = False
+ mail.send_email("", "", "", "")
+
+ def test_no_smtp_host(self):
+ """ Empty email_smtp_host """
+ with pytest.raises(mail.NoSMTPServerError), mock.patch("smtplib.SMTP") as smtp_mock:
+ smtp_mock.return_value.connect.side_effect = socket.error
+ mg_globals.app_config = {
+ "email_debug_mode": False,
+ "email_smtp_use_ssl": False,
+ "email_smtp_host": "",
+ "email_smtp_port": 0}
+ common.TESTS_ENABLED = False
+ mail.send_email("", "", "", "")
diff --git a/mediagoblin/tests/test_video.py b/mediagoblin/tests/test_video.py
new file mode 100644
index 00000000..79244515
--- /dev/null
+++ b/mediagoblin/tests/test_video.py
@@ -0,0 +1,132 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import tempfile
+import os
+from contextlib import contextmanager
+import imghdr
+
+#os.environ['GST_DEBUG'] = '4,python:4'
+import pytest
+pytest.importorskip("gi.repository.Gst")
+
+import gi
+gi.require_version('Gst', '1.0')
+from gi.repository import Gst
+Gst.init(None)
+
+from mediagoblin.media_types.video.transcoders import (capture_thumb,
+ VideoTranscoder)
+from mediagoblin.media_types.tools import discover
+
+@contextmanager
+def create_data(suffix=None, make_audio=False):
+ video = tempfile.NamedTemporaryFile()
+ src = Gst.ElementFactory.make('videotestsrc', None)
+ src.set_property('num-buffers', 10)
+ videorate = Gst.ElementFactory.make('videorate', None)
+ enc = Gst.ElementFactory.make('theoraenc', None)
+ mux = Gst.ElementFactory.make('oggmux', None)
+ dst = Gst.ElementFactory.make('filesink', None)
+ dst.set_property('location', video.name)
+ pipeline = Gst.Pipeline()
+ pipeline.add(src)
+ pipeline.add(videorate)
+ pipeline.add(enc)
+ pipeline.add(mux)
+ pipeline.add(dst)
+ src.link(videorate)
+ videorate.link(enc)
+ enc.link(mux)
+ mux.link(dst)
+ if make_audio:
+ audio_src = Gst.ElementFactory.make('audiotestsrc', None)
+ audio_src.set_property('num-buffers', 10)
+ audiorate = Gst.ElementFactory.make('audiorate', None)
+ audio_enc = Gst.ElementFactory.make('vorbisenc', None)
+ pipeline.add(audio_src)
+ pipeline.add(audio_enc)
+ pipeline.add(audiorate)
+ audio_src.link(audiorate)
+ audiorate.link(audio_enc)
+ audio_enc.link(mux)
+ pipeline.set_state(Gst.State.PLAYING)
+ state = pipeline.get_state(3 * Gst.SECOND)
+ assert state[0] == Gst.StateChangeReturn.SUCCESS
+ bus = pipeline.get_bus()
+ message = bus.timed_pop_filtered(
+ 3 * Gst.SECOND,
+ Gst.MessageType.ERROR | Gst.MessageType.EOS)
+ pipeline.set_state(Gst.State.NULL)
+ if suffix:
+ result = tempfile.NamedTemporaryFile(suffix=suffix)
+ else:
+ result = tempfile.NamedTemporaryFile()
+ yield (video.name, result.name)
+
+
+#TODO: this should be skipped if video plugin is not enabled
+def test_thumbnails():
+ '''
+ Test thumbnails generation.
+ 1. Create a video (+audio) from gst's videotestsrc
+ 2. Capture thumbnail
+ 3. Everything should get removed because of temp files usage
+ '''
+ #data create_data() as (video_name, thumbnail_name):
+ test_formats = [('.png', 'png'), ('.jpg', 'jpeg'), ('.gif', 'gif')]
+ for suffix, format in test_formats:
+ with create_data(suffix) as (video_name, thumbnail_name):
+ capture_thumb(video_name, thumbnail_name, width=40)
+ # check result file format
+ assert imghdr.what(thumbnail_name) == format
+ # TODO: check height and width
+ # FIXME: it doesn't work with small width, say, 10px. This should be
+ # fixed somehow
+ suffix, format = test_formats[0]
+ with create_data(suffix, True) as (video_name, thumbnail_name):
+ capture_thumb(video_name, thumbnail_name, width=40)
+ assert imghdr.what(thumbnail_name) == format
+ with create_data(suffix, True) as (video_name, thumbnail_name):
+ capture_thumb(video_name, thumbnail_name, width=10) # smaller width
+ assert imghdr.what(thumbnail_name) == format
+ with create_data(suffix, True) as (video_name, thumbnail_name):
+ capture_thumb(video_name, thumbnail_name, width=100) # bigger width
+ assert imghdr.what(thumbnail_name) == format
+
+
+def test_transcoder():
+ # test without audio
+ with create_data() as (video_name, result_name):
+ transcoder = VideoTranscoder()
+ transcoder.transcode(
+ video_name, result_name,
+ vp8_quality=8,
+ vp8_threads=0, # autodetect
+ vorbis_quality=0.3,
+ dimensions=(640, 640))
+ assert len(discover(result_name).get_video_streams()) == 1
+ # test with audio
+ with create_data(make_audio=True) as (video_name, result_name):
+ transcoder = VideoTranscoder()
+ transcoder.transcode(
+ video_name, result_name,
+ vp8_quality=8,
+ vp8_threads=0, # autodetect
+ vorbis_quality=0.3,
+ dimensions=(640, 640))
+ assert len(discover(result_name).get_video_streams()) == 1
+ assert len(discover(result_name).get_audio_streams()) == 1
diff --git a/mediagoblin/tests/test_workbench.py b/mediagoblin/tests/test_workbench.py
index 6695618b..3202e698 100644
--- a/mediagoblin/tests/test_workbench.py
+++ b/mediagoblin/tests/test_workbench.py
@@ -50,7 +50,7 @@ class TestWorkbench(object):
# kill a workbench
this_workbench = self.workbench_manager.create()
tmpfile_name = this_workbench.joinpath('temp.txt')
- tmpfile = file(tmpfile_name, 'w')
+ tmpfile = open(tmpfile_name, 'w')
with tmpfile:
tmpfile.write('lollerskates')
@@ -69,7 +69,7 @@ class TestWorkbench(object):
filepath = ['dir1', 'dir2', 'ourfile.txt']
with this_storage.get_file(filepath, 'w') as our_file:
- our_file.write('Our file')
+ our_file.write(b'Our file')
# with a local file storage
filename = this_workbench.localized_file(this_storage, filepath)
@@ -83,7 +83,7 @@ class TestWorkbench(object):
# ... write a brand new file, again ;)
with this_storage.get_file(filepath, 'w') as our_file:
- our_file.write('Our file')
+ our_file.write(b'Our file')
filename = this_workbench.localized_file(this_storage, filepath)
assert filename == os.path.join(
diff --git a/mediagoblin/tests/tools.py b/mediagoblin/tests/tools.py
index 060dfda9..39b9ac50 100644
--- a/mediagoblin/tests/tools.py
+++ b/mediagoblin/tests/tools.py
@@ -19,19 +19,22 @@ import os
import pkg_resources
import shutil
+import six
from paste.deploy import loadapp
from webtest import TestApp
from mediagoblin import mg_globals
-from mediagoblin.db.models import User, MediaEntry, Collection, MediaComment, \
- CommentSubscription, CommentNotification, Privilege, CommentReport
+from mediagoblin.db.models import User, LocalUser, MediaEntry, Collection, TextComment, \
+ CommentSubscription, Notification, Privilege, Report, Client, \
+ RequestToken, AccessToken, Activity, Generator, Comment
from mediagoblin.tools import testing
from mediagoblin.init.config import read_mediagoblin_config
from mediagoblin.db.base import Session
from mediagoblin.meddleware import BaseMeddleware
from mediagoblin.auth import gen_password_hash
from mediagoblin.gmg_commands.dbupdate import run_dbupdate
+from mediagoblin.tools.crypto import random_string
from datetime import datetime
@@ -122,6 +125,8 @@ def get_app(request, paste_config=None, mgoblin_config=None):
app_config = global_config['mediagoblin']
# Run database setup/migrations
+ # @@: The *only* test that doesn't pass if we remove this is in
+ # test_persona.py... why?
run_dbupdate(app_config, global_config)
# setup app and return
@@ -142,7 +147,7 @@ def install_fixtures_simple(db, fixtures):
"""
Very simply install fixtures in the database
"""
- for collection_name, collection_fixtures in fixtures.iteritems():
+ for collection_name, collection_fixtures in six.iteritems(fixtures):
collection = db[collection_name]
for fixture in collection_fixtures:
collection.insert(fixture)
@@ -162,7 +167,7 @@ def assert_db_meets_expected(db, expected):
{'id': 'foo',
'some_field': 'some_value'},]}
"""
- for collection_name, collection_data in expected.iteritems():
+ for collection_name, collection_data in six.iteritems(expected):
collection = db[collection_name]
for expected_document in collection_data:
document = collection.query.filter_by(id=expected_document['id']).first()
@@ -173,9 +178,9 @@ def assert_db_meets_expected(db, expected):
def fixture_add_user(username=u'chris', password=u'toast',
privileges=[], wants_comment_notification=True):
# Reuse existing user or create a new one
- test_user = User.query.filter_by(username=username).first()
+ test_user = LocalUser.query.filter(LocalUser.username==username).first()
if test_user is None:
- test_user = User()
+ test_user = LocalUser()
test_user.username = username
test_user.email = username + u'@example.com'
if password is not None:
@@ -187,8 +192,11 @@ def fixture_add_user(username=u'chris', password=u'toast',
test_user.all_privileges.append(query.one())
test_user.save()
- # Reload
- test_user = User.query.filter_by(username=username).first()
+
+ # Reload - The `with_polymorphic` needs to be there to eagerly load
+ # the attributes on the LocalUser as this can't be done post detachment.
+ user_query = LocalUser.query.with_polymorphic(LocalUser)
+ test_user = user_query.filter(LocalUser.username==username).first()
# ... and detach from session:
Session.expunge(test_user)
@@ -198,12 +206,12 @@ def fixture_add_user(username=u'chris', password=u'toast',
def fixture_comment_subscription(entry, notify=True, send_email=None):
if send_email is None:
- uploader = User.query.filter_by(id=entry.uploader).first()
- send_email = uploader.wants_comment_notification
+ actor = LocalUser.query.filter_by(id=entry.actor).first()
+ send_email = actor.wants_comment_notification
cs = CommentSubscription(
media_entry_id=entry.id,
- user_id=entry.uploader,
+ user_id=entry.actor,
notify=notify,
send_email=send_email)
@@ -216,14 +224,16 @@ def fixture_comment_subscription(entry, notify=True, send_email=None):
return cs
-def fixture_add_comment_notification(entry_id, subject_id, user_id,
+def fixture_add_comment_notification(entry, subject, user,
seen=False):
- cn = CommentNotification(user_id=user_id,
- seen=seen,
- subject_id=subject_id)
+ cn = Notification(
+ user_id=user,
+ seen=seen,
+ )
+ cn.obj = subject
cn.save()
- cn = CommentNotification.query.filter_by(id=cn.id).first()
+ cn = Notification.query.filter_by(id=cn.id).first()
Session.expunge(cn)
@@ -248,7 +258,7 @@ def fixture_media_entry(title=u"Some title", slug=None,
entry = MediaEntry()
entry.title = title
entry.slug = slug
- entry.uploader = uploader
+ entry.actor = uploader
entry.media_type = u'image'
entry.state = state
@@ -272,15 +282,21 @@ def fixture_media_entry(title=u"Some title", slug=None,
return entry
-def fixture_add_collection(name=u"My first Collection", user=None):
+def fixture_add_collection(name=u"My first Collection", user=None,
+ collection_type=Collection.USER_DEFINED_TYPE):
if user is None:
user = fixture_add_user()
- coll = Collection.query.filter_by(creator=user.id, title=name).first()
+ coll = Collection.query.filter_by(
+ actor=user.id,
+ title=name,
+ type=collection_type
+ ).first()
if coll is not None:
return coll
coll = Collection()
- coll.creator = user.id
+ coll.actor = user.id
coll.title = name
+ coll.type = collection_type
coll.generate_slug()
coll.save()
@@ -297,22 +313,27 @@ def fixture_add_comment(author=None, media_entry=None, comment=None):
author = fixture_add_user().id
if media_entry is None:
- media_entry = fixture_media_entry().id
+ media_entry = fixture_media_entry()
if comment is None:
comment = \
'Auto-generated test comment by user #{0} on media #{0}'.format(
author, media_entry)
- comment = MediaComment(author=author,
- media_entry=media_entry,
- content=comment)
+ text_comment = TextComment(
+ actor=author,
+ content=comment
+ )
+ text_comment.save()
- comment.save()
+ comment_link = Comment()
+ comment_link.target = media_entry
+ comment_link.comment = text_comment
+ comment_link.save()
- Session.expunge(comment)
+ Session.expunge(comment_link)
- return comment
+ return text_comment
def fixture_add_comment_report(comment=None, reported_user=None,
reporter=None, created=None, report_content=None):
@@ -332,14 +353,40 @@ def fixture_add_comment_report(comment=None, reported_user=None,
report_content = \
'Auto-generated test report'
- comment_report = CommentReport(comment=comment,
- reported_user = reported_user,
- reporter = reporter,
- created = created,
- report_content=report_content)
-
+ comment_report = Report()
+ comment_report.obj = comment
+ comment_report.reported_user = reported_user
+ comment_report.reporter = reporter
+ comment_report.created = created
+ comment_report.report_content = report_content
+ comment_report.obj = comment
comment_report.save()
Session.expunge(comment_report)
return comment_report
+
+def fixture_add_activity(obj, verb="post", target=None, generator=None, actor=None):
+ if generator is None:
+ generator = Generator(
+ name="GNU MediaGoblin",
+ object_type="service"
+ )
+ generator.save()
+
+ if actor is None:
+ actor = fixture_add_user()
+
+ activity = Activity(
+ verb=verb,
+ actor=actor.id,
+ generator=generator.id,
+ )
+
+ activity.set_object(obj)
+
+ if target is not None:
+ activity.set_target(target)
+
+ activity.save()
+ return activity