aboutsummaryrefslogtreecommitdiffstats
path: root/mediagoblin/tests
diff options
context:
space:
mode:
Diffstat (limited to 'mediagoblin/tests')
-rw-r--r--mediagoblin/tests/fake_carrot_conf_bad.ini2
-rw-r--r--mediagoblin/tests/fake_carrot_conf_good.ini2
-rw-r--r--mediagoblin/tests/fake_celery_conf.ini14
-rw-r--r--mediagoblin/tests/fake_celery_conf_mgdb.ini14
-rw-r--r--mediagoblin/tests/fake_config_spec.ini2
-rw-r--r--mediagoblin/tests/test_celery_setup.py45
-rw-r--r--mediagoblin/tests/test_config.py8
-rw-r--r--mediagoblin/tests/test_mgoblin_app.ini2
-rw-r--r--mediagoblin/tests/test_paste.ini2
-rw-r--r--mediagoblin/tests/test_processing.py20
-rw-r--r--mediagoblin/tests/test_sql_migrations.py884
-rw-r--r--mediagoblin/tests/test_submission.py335
-rw-r--r--mediagoblin/tests/test_submission/bigblue.pngbin0 -> 3142 bytes
13 files changed, 1074 insertions, 256 deletions
diff --git a/mediagoblin/tests/fake_carrot_conf_bad.ini b/mediagoblin/tests/fake_carrot_conf_bad.ini
index 0c79b354..9d8cf518 100644
--- a/mediagoblin/tests/fake_carrot_conf_bad.ini
+++ b/mediagoblin/tests/fake_carrot_conf_bad.ini
@@ -11,4 +11,4 @@ encouragement_phrase = 586956856856 # shouldn't throw error
blah_blah = "blah!" # shouldn't throw error either
[celery]
-eat_celery_with_carrots = pants # yeah that's def an error right there.
+EAT_CELERY_WITH_CARROTS = pants # yeah that's def an error right there.
diff --git a/mediagoblin/tests/fake_carrot_conf_good.ini b/mediagoblin/tests/fake_carrot_conf_good.ini
index fed14d07..1377907b 100644
--- a/mediagoblin/tests/fake_carrot_conf_good.ini
+++ b/mediagoblin/tests/fake_carrot_conf_good.ini
@@ -10,4 +10,4 @@ encouragement_phrase = "I'd love it if you eat your carrots!"
blah_blah = "blah!"
[celery]
-eat_celery_with_carrots = False
+EAT_CELERY_WITH_CARROTS = False
diff --git a/mediagoblin/tests/fake_celery_conf.ini b/mediagoblin/tests/fake_celery_conf.ini
index 3e52ac3a..67b0cba6 100644
--- a/mediagoblin/tests/fake_celery_conf.ini
+++ b/mediagoblin/tests/fake_celery_conf.ini
@@ -1,9 +1,9 @@
-['mediagoblin']
+[mediagoblin]
# I got nothin' in this file!
-['celery']
-some_variable = floop
-mail_port = 2000
-celeryd_eta_scheduler_precision = 1.3
-celery_result_persistent = true
-celery_imports = foo.bar.baz, this.is.an.import
+[celery]
+SOME_VARIABLE = floop
+MAIL_PORT = 2000
+CELERYD_ETA_SCHEDULER_PRECISION = 1.3
+CELERY_RESULT_PERSISTENT = true
+CELERY_IMPORTS = foo.bar.baz, this.is.an.import
diff --git a/mediagoblin/tests/fake_celery_conf_mgdb.ini b/mediagoblin/tests/fake_celery_conf_mgdb.ini
deleted file mode 100644
index 52671c14..00000000
--- a/mediagoblin/tests/fake_celery_conf_mgdb.ini
+++ /dev/null
@@ -1,14 +0,0 @@
-['mediagoblin']
-db_host = mongodb.example.org
-db_port = 8080
-db_name = captain_lollerskates
-
-['something']
-or = other
-
-['celery']
-some_variable = poolf
-mail_port = 2020
-celeryd_eta_scheduler_precision = 3.1
-celery_result_persistent = false
-celery_imports = baz.bar.foo, import.is.a.this
diff --git a/mediagoblin/tests/fake_config_spec.ini b/mediagoblin/tests/fake_config_spec.ini
index 9421ce36..43f2e236 100644
--- a/mediagoblin/tests/fake_config_spec.ini
+++ b/mediagoblin/tests/fake_config_spec.ini
@@ -7,4 +7,4 @@ num_carrots = integer(default=1)
encouragement_phrase = string()
[celery]
-eat_celery_with_carrots = boolean(default=True) \ No newline at end of file
+EAT_CELERY_WITH_CARROTS = boolean(default=True) \ No newline at end of file
diff --git a/mediagoblin/tests/test_celery_setup.py b/mediagoblin/tests/test_celery_setup.py
index c9c77821..5530c6f2 100644
--- a/mediagoblin/tests/test_celery_setup.py
+++ b/mediagoblin/tests/test_celery_setup.py
@@ -22,8 +22,6 @@ from mediagoblin.init.config import read_mediagoblin_config
TEST_CELERY_CONF_NOSPECIALDB = pkg_resources.resource_filename(
'mediagoblin.tests', 'fake_celery_conf.ini')
-TEST_CELERY_CONF_MGSPECIALDB = pkg_resources.resource_filename(
- 'mediagoblin.tests', 'fake_celery_conf_mgdb.ini')
def test_setup_celery_from_config():
@@ -50,36 +48,13 @@ def test_setup_celery_from_config():
assert isinstance(fake_celery_module.CELERYD_ETA_SCHEDULER_PRECISION, float)
assert fake_celery_module.CELERY_RESULT_PERSISTENT is True
assert fake_celery_module.CELERY_IMPORTS == [
- 'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing']
- assert fake_celery_module.CELERY_MONGODB_BACKEND_SETTINGS == {
- 'database': 'mediagoblin'}
- assert fake_celery_module.CELERY_RESULT_BACKEND == 'mongodb'
- assert fake_celery_module.BROKER_BACKEND == 'mongodb'
-
- _wipe_testmodule_clean(fake_celery_module)
-
- global_config, validation_result = read_mediagoblin_config(
- TEST_CELERY_CONF_MGSPECIALDB)
- app_config = global_config['mediagoblin']
-
- celery_setup.setup_celery_from_config(
- app_config, global_config,
- 'mediagoblin.tests.fake_celery_module', set_environ=False)
-
- from mediagoblin.tests import fake_celery_module
- assert fake_celery_module.SOME_VARIABLE == 'poolf'
- assert fake_celery_module.MAIL_PORT == 2020
- assert isinstance(fake_celery_module.MAIL_PORT, int)
- assert fake_celery_module.CELERYD_ETA_SCHEDULER_PRECISION == 3.1
- assert isinstance(fake_celery_module.CELERYD_ETA_SCHEDULER_PRECISION, float)
- assert fake_celery_module.CELERY_RESULT_PERSISTENT is False
- assert fake_celery_module.CELERY_IMPORTS == [
- 'baz.bar.foo', 'import.is.a.this', 'mediagoblin.processing']
- assert fake_celery_module.CELERY_MONGODB_BACKEND_SETTINGS == {
- 'database': 'captain_lollerskates',
- 'host': 'mongodb.example.org',
- 'port': 8080}
- assert fake_celery_module.CELERY_RESULT_BACKEND == 'mongodb'
- assert fake_celery_module.BROKER_BACKEND == 'mongodb'
- assert fake_celery_module.BROKER_HOST == 'mongodb.example.org'
- assert fake_celery_module.BROKER_PORT == 8080
+ 'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing.task']
+ assert fake_celery_module.CELERY_RESULT_BACKEND == 'database'
+ assert fake_celery_module.CELERY_RESULT_DBURI == (
+ 'sqlite:///' +
+ pkg_resources.resource_filename('mediagoblin.tests', 'celery.db'))
+
+ assert fake_celery_module.BROKER_TRANSPORT == 'sqlalchemy'
+ assert fake_celery_module.BROKER_HOST == (
+ 'sqlite:///' +
+ pkg_resources.resource_filename('mediagoblin.tests', 'kombu.db'))
diff --git a/mediagoblin/tests/test_config.py b/mediagoblin/tests/test_config.py
index c596f6a6..7d8c65c1 100644
--- a/mediagoblin/tests/test_config.py
+++ b/mediagoblin/tests/test_config.py
@@ -37,7 +37,7 @@ def test_read_mediagoblin_config():
assert this_conf['carrotapp']['carrotcake'] == False
assert this_conf['carrotapp']['num_carrots'] == 1
assert not this_conf['carrotapp'].has_key('encouragement_phrase')
- assert this_conf['celery']['eat_celery_with_carrots'] == True
+ assert this_conf['celery']['EAT_CELERY_WITH_CARROTS'] == True
# A good file
this_conf, validation_results = config.read_mediagoblin_config(
@@ -48,7 +48,7 @@ def test_read_mediagoblin_config():
assert this_conf['carrotapp']['encouragement_phrase'] == \
"I'd love it if you eat your carrots!"
assert this_conf['carrotapp']['blah_blah'] == "blah!"
- assert this_conf['celery']['eat_celery_with_carrots'] == False
+ assert this_conf['celery']['EAT_CELERY_WITH_CARROTS'] == False
# A bad file
this_conf, validation_results = config.read_mediagoblin_config(
@@ -61,7 +61,7 @@ def test_read_mediagoblin_config():
assert this_conf['carrotapp']['encouragement_phrase'] == \
"586956856856"
assert this_conf['carrotapp']['blah_blah'] == "blah!"
- assert this_conf['celery']['eat_celery_with_carrots'] == "pants"
+ assert this_conf['celery']['EAT_CELERY_WITH_CARROTS'] == "pants"
def test_generate_validation_report():
@@ -89,7 +89,7 @@ There were validation problems loading this config file:
expected_warnings = [
'carrotapp:carrotcake = the value "slobber" is of the wrong type.',
'carrotapp:num_carrots = the value "GROSS" is of the wrong type.',
- 'celery:eat_celery_with_carrots = the value "pants" is of the wrong type.']
+ 'celery:EAT_CELERY_WITH_CARROTS = the value "pants" is of the wrong type.']
warnings = report.splitlines()[2:]
assert len(warnings) == 3
diff --git a/mediagoblin/tests/test_mgoblin_app.ini b/mediagoblin/tests/test_mgoblin_app.ini
index c91ed92b..01bf0972 100644
--- a/mediagoblin/tests/test_mgoblin_app.ini
+++ b/mediagoblin/tests/test_mgoblin_app.ini
@@ -26,4 +26,4 @@ data_dir = %(here)s/test_user_dev/beaker/cache/data
lock_dir = %(here)s/test_user_dev/beaker/cache/lock
[celery]
-celery_always_eager = true
+CELERY_ALWAYS_EAGER = true
diff --git a/mediagoblin/tests/test_paste.ini b/mediagoblin/tests/test_paste.ini
index bd57994b..d7c18642 100644
--- a/mediagoblin/tests/test_paste.ini
+++ b/mediagoblin/tests/test_paste.ini
@@ -29,7 +29,7 @@ beaker.session.data_dir = %(here)s/test_user_dev/beaker/sessions/data
beaker.session.lock_dir = %(here)s/test_user_dev/beaker/sessions/lock
[celery]
-celery_always_eager = true
+CELERY_ALWAYS_EAGER = true
[server:main]
use = egg:Paste#http
diff --git a/mediagoblin/tests/test_processing.py b/mediagoblin/tests/test_processing.py
new file mode 100644
index 00000000..417f91f3
--- /dev/null
+++ b/mediagoblin/tests/test_processing.py
@@ -0,0 +1,20 @@
+#!/usr/bin/env python
+
+from nose.tools import assert_equal, assert_true, assert_false
+
+from mediagoblin import processing
+
+class TestProcessing(object):
+ def run_fill(self, input, format, output=None):
+ builder = processing.FilenameBuilder(input)
+ result = builder.fill(format)
+ if output is None:
+ return result
+ assert_equal(output, result)
+
+ def test_easy_filename_fill(self):
+ self.run_fill('/home/user/foo.TXT', '{basename}bar{ext}', 'foobar.txt')
+
+ def test_long_filename_fill(self):
+ self.run_fill('{0}.png'.format('A' * 300), 'image-{basename}{ext}',
+ 'image-{0}.png'.format('A' * 245))
diff --git a/mediagoblin/tests/test_sql_migrations.py b/mediagoblin/tests/test_sql_migrations.py
new file mode 100644
index 00000000..507a7725
--- /dev/null
+++ b/mediagoblin/tests/test_sql_migrations.py
@@ -0,0 +1,884 @@
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2012, 2012 MediaGoblin contributors. See AUTHORS.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import copy
+
+from sqlalchemy import (
+ Table, Column, MetaData, Index,
+ Integer, Float, Unicode, UnicodeText, DateTime, Boolean,
+ ForeignKey, UniqueConstraint, PickleType, VARCHAR)
+from sqlalchemy.orm import sessionmaker, relationship
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.sql import select, insert
+from migrate import changeset
+
+from mediagoblin.db.sql.base import GMGTableBase
+from mediagoblin.db.sql.util import MigrationManager, RegisterMigration
+
+
+# This one will get filled with local migrations
+FULL_MIGRATIONS = {}
+
+
+#######################################################
+# Migration set 1: Define initial models, no migrations
+#######################################################
+
+Base1 = declarative_base(cls=GMGTableBase)
+
+class Creature1(Base1):
+ __tablename__ = "creature"
+
+ id = Column(Integer, primary_key=True)
+ name = Column(Unicode, unique=True, nullable=False, index=True)
+ num_legs = Column(Integer, nullable=False)
+ is_demon = Column(Boolean)
+
+class Level1(Base1):
+ __tablename__ = "level"
+
+ id = Column(Unicode, primary_key=True)
+ name = Column(Unicode)
+ description = Column(Unicode)
+ exits = Column(PickleType)
+
+SET1_MODELS = [Creature1, Level1]
+
+SET1_MIGRATIONS = {}
+
+#######################################################
+# Migration set 2: A few migrations and new model
+#######################################################
+
+Base2 = declarative_base(cls=GMGTableBase)
+
+class Creature2(Base2):
+ __tablename__ = "creature"
+
+ id = Column(Integer, primary_key=True)
+ name = Column(Unicode, unique=True, nullable=False, index=True)
+ num_legs = Column(Integer, nullable=False)
+ magical_powers = relationship("CreaturePower2")
+
+class CreaturePower2(Base2):
+ __tablename__ = "creature_power"
+
+ id = Column(Integer, primary_key=True)
+ creature = Column(
+ Integer, ForeignKey('creature.id'), nullable=False)
+ name = Column(Unicode)
+ description = Column(Unicode)
+ hitpower = Column(Integer, nullable=False)
+
+class Level2(Base2):
+ __tablename__ = "level"
+
+ id = Column(Unicode, primary_key=True)
+ name = Column(Unicode)
+ description = Column(Unicode)
+
+class LevelExit2(Base2):
+ __tablename__ = "level_exit"
+
+ id = Column(Integer, primary_key=True)
+ name = Column(Unicode)
+ from_level = Column(
+ Unicode, ForeignKey('level.id'), nullable=False)
+ to_level = Column(
+ Unicode, ForeignKey('level.id'), nullable=False)
+
+SET2_MODELS = [Creature2, CreaturePower2, Level2, LevelExit2]
+
+
+@RegisterMigration(1, FULL_MIGRATIONS)
+def creature_remove_is_demon(db_conn):
+ """
+ Remove the is_demon field from the creature model. We don't need
+ it!
+ """
+ # :( Commented out 'cuz of:
+ # http://code.google.com/p/sqlalchemy-migrate/issues/detail?id=143&thanks=143&ts=1327882242
+
+ # metadata = MetaData(bind=db_conn.bind)
+ # creature_table = Table(
+ # 'creature', metadata,
+ # autoload=True, autoload_with=db_conn.bind)
+ # creature_table.drop_column('is_demon')
+ pass
+
+
+@RegisterMigration(2, FULL_MIGRATIONS)
+def creature_powers_new_table(db_conn):
+ """
+ Add a new table for creature powers. Nothing needs to go in it
+ yet though as there wasn't anything that previously held this
+ information
+ """
+ metadata = MetaData(bind=db_conn.bind)
+
+ # We have to access the creature table so sqlalchemy can make the
+ # foreign key relationship
+ creature_table = Table(
+ 'creature', metadata,
+ autoload=True, autoload_with=db_conn.bind)
+
+ creature_powers = Table(
+ 'creature_power', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('creature',
+ Integer, ForeignKey('creature.id'), nullable=False),
+ Column('name', Unicode),
+ Column('description', Unicode),
+ Column('hitpower', Integer, nullable=False))
+ metadata.create_all(db_conn.bind)
+
+
+@RegisterMigration(3, FULL_MIGRATIONS)
+def level_exits_new_table(db_conn):
+ """
+ Make a new table for level exits and move the previously pickled
+ stuff over to here (then drop the old unneeded table)
+ """
+ # First, create the table
+ # -----------------------
+ metadata = MetaData(bind=db_conn.bind)
+
+ # Minimal representation of level table.
+ # Not auto-introspecting here because of pickle table. I'm not
+ # sure sqlalchemy can auto-introspect pickle columns.
+ levels = Table(
+ 'level', metadata,
+ Column('id', Unicode, primary_key=True),
+ Column('name', Unicode),
+ Column('description', Unicode),
+ Column('exits', PickleType))
+
+ level_exits = Table(
+ 'level_exit', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('name', Unicode),
+ Column('from_level',
+ Unicode, ForeignKey('level.id'), nullable=False),
+ Column('to_level',
+ Unicode, ForeignKey('level.id'), nullable=False))
+ metadata.create_all(db_conn.bind)
+
+ # And now, convert all the old exit pickles to new level exits
+ # ------------------------------------------------------------
+
+ # query over and insert
+ result = db_conn.execute(
+ select([levels], levels.c.exits!=None))
+
+ for level in result:
+
+ for exit_name, to_level in level['exits'].iteritems():
+ # Insert the level exit
+ db_conn.execute(
+ level_exits.insert().values(
+ name=exit_name,
+ from_level=level.id,
+ to_level=to_level))
+
+ # Finally, drop the old level exits pickle table
+ # ----------------------------------------------
+ levels.drop_column('exits')
+
+
+# A hack! At this point we freeze-fame and get just a partial list of
+# migrations
+
+SET2_MIGRATIONS = copy.copy(FULL_MIGRATIONS)
+
+#######################################################
+# Migration set 3: Final migrations
+#######################################################
+
+Base3 = declarative_base(cls=GMGTableBase)
+
+class Creature3(Base3):
+ __tablename__ = "creature"
+
+ id = Column(Integer, primary_key=True)
+ name = Column(Unicode, unique=True, nullable=False, index=True)
+ num_limbs= Column(Integer, nullable=False)
+ magical_powers = relationship("CreaturePower3")
+
+class CreaturePower3(Base3):
+ __tablename__ = "creature_power"
+
+ id = Column(Integer, primary_key=True)
+ creature = Column(
+ Integer, ForeignKey('creature.id'), nullable=False, index=True)
+ name = Column(Unicode)
+ description = Column(Unicode)
+ hitpower = Column(Float, nullable=False)
+
+class Level3(Base3):
+ __tablename__ = "level"
+
+ id = Column(Unicode, primary_key=True)
+ name = Column(Unicode)
+ description = Column(Unicode)
+
+class LevelExit3(Base3):
+ __tablename__ = "level_exit"
+
+ id = Column(Integer, primary_key=True)
+ name = Column(Unicode)
+ from_level = Column(
+ Unicode, ForeignKey('level.id'), nullable=False, index=True)
+ to_level = Column(
+ Unicode, ForeignKey('level.id'), nullable=False, index=True)
+
+
+SET3_MODELS = [Creature3, CreaturePower3, Level3, LevelExit3]
+SET3_MIGRATIONS = FULL_MIGRATIONS
+
+
+@RegisterMigration(4, FULL_MIGRATIONS)
+def creature_num_legs_to_num_limbs(db_conn):
+ """
+ Turns out we're tracking all sorts of limbs, not "legs"
+ specifically. Humans would be 4 here, for instance. So we
+ renamed the column.
+ """
+ metadata = MetaData(bind=db_conn.bind)
+ creature_table = Table(
+ 'creature', metadata,
+ autoload=True, autoload_with=db_conn.bind)
+ creature_table.c.num_legs.alter(name=u"num_limbs")
+
+
+@RegisterMigration(5, FULL_MIGRATIONS)
+def level_exit_index_from_and_to_level(db_conn):
+ """
+ Index the from and to levels of the level exit table.
+ """
+ metadata = MetaData(bind=db_conn.bind)
+ level_exit = Table(
+ 'level_exit', metadata,
+ autoload=True, autoload_with=db_conn.bind)
+ Index('ix_level_exit_from_level',
+ level_exit.c.from_level).create(db_conn.bind)
+ Index('ix_level_exit_to_level',
+ level_exit.c.to_level).create(db_conn.bind)
+
+
+@RegisterMigration(6, FULL_MIGRATIONS)
+def creature_power_index_creature(db_conn):
+ """
+ Index our foreign key relationship to the creatures
+ """
+ metadata = MetaData(bind=db_conn.bind)
+ creature_power = Table(
+ 'creature_power', metadata,
+ autoload=True, autoload_with=db_conn.bind)
+ Index('ix_creature_power_creature',
+ creature_power.c.creature).create(db_conn.bind)
+
+
+@RegisterMigration(7, FULL_MIGRATIONS)
+def creature_power_hitpower_to_float(db_conn):
+ """
+ Convert hitpower column on creature power table from integer to
+ float.
+
+ Turns out we want super precise values of how much hitpower there
+ really is.
+ """
+ metadata = MetaData(bind=db_conn.bind)
+
+ # We have to access the creature table so sqlalchemy can make the
+ # foreign key relationship
+ creature_table = Table(
+ 'creature', metadata,
+ autoload=True, autoload_with=db_conn.bind)
+
+ creature_power = Table(
+ 'creature_power', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('creature', Integer,
+ ForeignKey('creature.id'), nullable=False,
+ index=True),
+ Column('name', Unicode),
+ Column('description', Unicode),
+ Column('hitpower', Integer, nullable=False))
+
+ creature_power.c.hitpower.alter(type=Float)
+
+
+def _insert_migration1_objects(session):
+ """
+ Test objects to insert for the first set of things
+ """
+ # Insert creatures
+ session.add_all(
+ [Creature1(name=u'centipede',
+ num_legs=100,
+ is_demon=False),
+ Creature1(name=u'wolf',
+ num_legs=4,
+ is_demon=False),
+ # don't ask me what a wizardsnake is.
+ Creature1(name=u'wizardsnake',
+ num_legs=0,
+ is_demon=True)])
+
+ # Insert levels
+ session.add_all(
+ [Level1(id=u'necroplex',
+ name=u'The Necroplex',
+ description=u'A complex full of pure deathzone.',
+ exits={
+ 'deathwell': 'evilstorm',
+ 'portal': 'central_park'}),
+ Level1(id=u'evilstorm',
+ name=u'Evil Storm',
+ description=u'A storm full of pure evil.',
+ exits={}), # you can't escape the evilstorm
+ Level1(id=u'central_park',
+ name=u'Central Park, NY, NY',
+ description=u"New York's friendly Central Park.",
+ exits={
+ 'portal': 'necroplex'})])
+
+ session.commit()
+
+
+def _insert_migration2_objects(session):
+ """
+ Test objects to insert for the second set of things
+ """
+ # Insert creatures
+ session.add_all(
+ [Creature2(
+ name=u'centipede',
+ num_legs=100),
+ Creature2(
+ name=u'wolf',
+ num_legs=4,
+ magical_powers = [
+ CreaturePower2(
+ name=u"ice breath",
+ description=u"A blast of icy breath!",
+ hitpower=20),
+ CreaturePower2(
+ name=u"death stare",
+ description=u"A frightening stare, for sure!",
+ hitpower=45)]),
+ Creature2(
+ name=u'wizardsnake',
+ num_legs=0,
+ magical_powers=[
+ CreaturePower2(
+ name=u'death_rattle',
+ description=u'A rattle... of DEATH!',
+ hitpower=1000),
+ CreaturePower2(
+ name=u'sneaky_stare',
+ description=u"The sneakiest stare you've ever seen!",
+ hitpower=300),
+ CreaturePower2(
+ name=u'slithery_smoke',
+ description=u"A blast of slithery, slithery smoke.",
+ hitpower=10),
+ CreaturePower2(
+ name=u'treacherous_tremors',
+ description=u"The ground shakes beneath footed animals!",
+ hitpower=0)])])
+
+ # Insert levels
+ session.add_all(
+ [Level2(id=u'necroplex',
+ name=u'The Necroplex',
+ description=u'A complex full of pure deathzone.'),
+ Level2(id=u'evilstorm',
+ name=u'Evil Storm',
+ description=u'A storm full of pure evil.',
+ exits=[]), # you can't escape the evilstorm
+ Level2(id=u'central_park',
+ name=u'Central Park, NY, NY',
+ description=u"New York's friendly Central Park.")])
+
+ # necroplex exits
+ session.add_all(
+ [LevelExit2(name=u'deathwell',
+ from_level=u'necroplex',
+ to_level=u'evilstorm'),
+ LevelExit2(name=u'portal',
+ from_level=u'necroplex',
+ to_level=u'central_park')])
+
+ # there are no evilstorm exits because there is no exit from the
+ # evilstorm
+
+ # central park exits
+ session.add_all(
+ [LevelExit2(name=u'portal',
+ from_level=u'central_park',
+ to_level=u'necroplex')])
+
+ session.commit()
+
+
+def _insert_migration3_objects(session):
+ """
+ Test objects to insert for the third set of things
+ """
+ # Insert creatures
+ session.add_all(
+ [Creature3(
+ name=u'centipede',
+ num_limbs=100),
+ Creature3(
+ name=u'wolf',
+ num_limbs=4,
+ magical_powers = [
+ CreaturePower3(
+ name=u"ice breath",
+ description=u"A blast of icy breath!",
+ hitpower=20.0),
+ CreaturePower3(
+ name=u"death stare",
+ description=u"A frightening stare, for sure!",
+ hitpower=45.0)]),
+ Creature3(
+ name=u'wizardsnake',
+ num_limbs=0,
+ magical_powers=[
+ CreaturePower3(
+ name=u'death_rattle',
+ description=u'A rattle... of DEATH!',
+ hitpower=1000.0),
+ CreaturePower3(
+ name=u'sneaky_stare',
+ description=u"The sneakiest stare you've ever seen!",
+ hitpower=300.0),
+ CreaturePower3(
+ name=u'slithery_smoke',
+ description=u"A blast of slithery, slithery smoke.",
+ hitpower=10.0),
+ CreaturePower3(
+ name=u'treacherous_tremors',
+ description=u"The ground shakes beneath footed animals!",
+ hitpower=0.0)])],
+ # annnnnd one more to test a floating point hitpower
+ Creature3(
+ name=u'deity',
+ numb_limbs=30,
+ magical_powers=[
+ CreaturePower3(
+ name=u'smite',
+ description=u'Smitten by holy wrath!',
+ hitpower=9999.9)]))
+
+ # Insert levels
+ session.add_all(
+ [Level3(id=u'necroplex',
+ name=u'The Necroplex',
+ description=u'A complex full of pure deathzone.'),
+ Level3(id=u'evilstorm',
+ name=u'Evil Storm',
+ description=u'A storm full of pure evil.',
+ exits=[]), # you can't escape the evilstorm
+ Level3(id=u'central_park',
+ name=u'Central Park, NY, NY',
+ description=u"New York's friendly Central Park.")])
+
+ # necroplex exits
+ session.add_all(
+ [LevelExit3(name=u'deathwell',
+ from_level=u'necroplex',
+ to_level=u'evilstorm'),
+ LevelExit3(name=u'portal',
+ from_level=u'necroplex',
+ to_level=u'central_park')])
+
+ # there are no evilstorm exits because there is no exit from the
+ # evilstorm
+
+ # central park exits
+ session.add_all(
+ [LevelExit3(name=u'portal',
+ from_level=u'central_park',
+ to_level=u'necroplex')])
+
+ session.commit()
+
+
+class CollectingPrinter(object):
+ def __init__(self):
+ self.collection = []
+
+ def __call__(self, string):
+ self.collection.append(string)
+
+ @property
+ def combined_string(self):
+ return u''.join(self.collection)
+
+
+def create_test_engine():
+ from sqlalchemy import create_engine
+ engine = create_engine('sqlite:///:memory:', echo=False)
+ Session = sessionmaker(bind=engine)
+ return engine, Session
+
+
+def assert_col_type(column, this_class):
+ assert isinstance(column.type, this_class)
+
+
+def _get_level3_exits(session, level):
+ return dict(
+ [(level_exit.name, level_exit.to_level)
+ for level_exit in
+ session.query(LevelExit3).filter_by(from_level=level.id)])
+
+
+def test_set1_to_set3():
+ # Create / connect to database
+ # ----------------------------
+
+ engine, Session = create_test_engine()
+
+ # Create tables by migrating on empty initial set
+ # -----------------------------------------------
+
+ printer = CollectingPrinter()
+ migration_manager = MigrationManager(
+ '__main__', SET1_MODELS, SET1_MIGRATIONS, Session(),
+ printer)
+
+ # Check latest migration and database current migration
+ assert migration_manager.latest_migration == 0
+ assert migration_manager.database_current_migration == None
+
+ result = migration_manager.init_or_migrate()
+
+ # Make sure output was "inited"
+ assert result == u'inited'
+ # Check output
+ assert printer.combined_string == (
+ "-> Initializing main mediagoblin tables... done.\n")
+ # Check version in database
+ assert migration_manager.latest_migration == 0
+ assert migration_manager.database_current_migration == 0
+
+ # Install the initial set
+ # -----------------------
+
+ _insert_migration1_objects(Session())
+
+ # Try to "re-migrate" with same manager settings... nothing should happen
+ migration_manager = MigrationManager(
+ '__main__', SET1_MODELS, SET1_MIGRATIONS, Session(),
+ printer)
+ assert migration_manager.init_or_migrate() == None
+
+ # Check version in database
+ assert migration_manager.latest_migration == 0
+ assert migration_manager.database_current_migration == 0
+
+ # Sanity check a few things in the database...
+ metadata = MetaData(bind=engine)
+
+ # Check the structure of the creature table
+ creature_table = Table(
+ 'creature', metadata,
+ autoload=True, autoload_with=engine)
+ assert set(creature_table.c.keys()) == set(
+ ['id', 'name', 'num_legs', 'is_demon'])
+ assert_col_type(creature_table.c.id, Integer)
+ assert_col_type(creature_table.c.name, VARCHAR)
+ assert creature_table.c.name.nullable is False
+ #assert creature_table.c.name.index is True
+ #assert creature_table.c.name.unique is True
+ assert_col_type(creature_table.c.num_legs, Integer)
+ assert creature_table.c.num_legs.nullable is False
+ assert_col_type(creature_table.c.is_demon, Boolean)
+
+ # Check the structure of the level table
+ level_table = Table(
+ 'level', metadata,
+ autoload=True, autoload_with=engine)
+ assert set(level_table.c.keys()) == set(
+ ['id', 'name', 'description', 'exits'])
+ assert_col_type(level_table.c.id, VARCHAR)
+ assert level_table.c.id.primary_key is True
+ assert_col_type(level_table.c.name, VARCHAR)
+ assert_col_type(level_table.c.description, VARCHAR)
+ # Skipping exits... Not sure if we can detect pickletype, not a
+ # big deal regardless.
+
+ # Now check to see if stuff seems to be in there.
+ session = Session()
+
+ creature = session.query(Creature1).filter_by(
+ name=u'centipede').one()
+ assert creature.num_legs == 100
+ assert creature.is_demon == False
+
+ creature = session.query(Creature1).filter_by(
+ name=u'wolf').one()
+ assert creature.num_legs == 4
+ assert creature.is_demon == False
+
+ creature = session.query(Creature1).filter_by(
+ name=u'wizardsnake').one()
+ assert creature.num_legs == 0
+ assert creature.is_demon == True
+
+ level = session.query(Level1).filter_by(
+ id=u'necroplex').one()
+ assert level.name == u'The Necroplex'
+ assert level.description == u'A complex full of pure deathzone.'
+ assert level.exits == {
+ 'deathwell': 'evilstorm',
+ 'portal': 'central_park'}
+
+ level = session.query(Level1).filter_by(
+ id=u'evilstorm').one()
+ assert level.name == u'Evil Storm'
+ assert level.description == u'A storm full of pure evil.'
+ assert level.exits == {} # You still can't escape the evilstorm!
+
+ level = session.query(Level1).filter_by(
+ id=u'central_park').one()
+ assert level.name == u'Central Park, NY, NY'
+ assert level.description == u"New York's friendly Central Park."
+ assert level.exits == {
+ 'portal': 'necroplex'}
+
+ # Create new migration manager, but make sure the db migration
+ # isn't said to be updated yet
+ printer = CollectingPrinter()
+ migration_manager = MigrationManager(
+ '__main__', SET3_MODELS, SET3_MIGRATIONS, Session(),
+ printer)
+
+ assert migration_manager.latest_migration == 7
+ assert migration_manager.database_current_migration == 0
+
+ # Migrate
+ result = migration_manager.init_or_migrate()
+
+ # Make sure result was "migrated"
+ assert result == u'migrated'
+
+ # TODO: Check output to user
+ assert printer.combined_string == """\
+-> Updating main mediagoblin tables:
+ + Running migration 1, "creature_remove_is_demon"... done.
+ + Running migration 2, "creature_powers_new_table"... done.
+ + Running migration 3, "level_exits_new_table"... done.
+ + Running migration 4, "creature_num_legs_to_num_limbs"... done.
+ + Running migration 5, "level_exit_index_from_and_to_level"... done.
+ + Running migration 6, "creature_power_index_creature"... done.
+ + Running migration 7, "creature_power_hitpower_to_float"... done.
+"""
+
+ # Make sure version matches expected
+ migration_manager = MigrationManager(
+ '__main__', SET3_MODELS, SET3_MIGRATIONS, Session(),
+ printer)
+ assert migration_manager.latest_migration == 7
+ assert migration_manager.database_current_migration == 7
+
+ # Check all things in database match expected
+
+ # Check the creature table
+ metadata = MetaData(bind=engine)
+ creature_table = Table(
+ 'creature', metadata,
+ autoload=True, autoload_with=engine)
+ # assert set(creature_table.c.keys()) == set(
+ # ['id', 'name', 'num_limbs'])
+ assert set(creature_table.c.keys()) == set(
+ [u'id', 'name', u'num_limbs', u'is_demon'])
+ assert_col_type(creature_table.c.id, Integer)
+ assert_col_type(creature_table.c.name, VARCHAR)
+ assert creature_table.c.name.nullable is False
+ #assert creature_table.c.name.index is True
+ #assert creature_table.c.name.unique is True
+ assert_col_type(creature_table.c.num_limbs, Integer)
+ assert creature_table.c.num_limbs.nullable is False
+
+ # Check the CreaturePower table
+ creature_power_table = Table(
+ 'creature_power', metadata,
+ autoload=True, autoload_with=engine)
+ assert set(creature_power_table.c.keys()) == set(
+ ['id', 'creature', 'name', 'description', 'hitpower'])
+ assert_col_type(creature_power_table.c.id, Integer)
+ assert_col_type(creature_power_table.c.creature, Integer)
+ assert creature_power_table.c.creature.nullable is False
+ assert_col_type(creature_power_table.c.name, VARCHAR)
+ assert_col_type(creature_power_table.c.description, VARCHAR)
+ assert_col_type(creature_power_table.c.hitpower, Float)
+ assert creature_power_table.c.hitpower.nullable is False
+
+ # Check the structure of the level table
+ level_table = Table(
+ 'level', metadata,
+ autoload=True, autoload_with=engine)
+ assert set(level_table.c.keys()) == set(
+ ['id', 'name', 'description'])
+ assert_col_type(level_table.c.id, VARCHAR)
+ assert level_table.c.id.primary_key is True
+ assert_col_type(level_table.c.name, VARCHAR)
+ assert_col_type(level_table.c.description, VARCHAR)
+
+ # Check the structure of the level_exits table
+ level_exit_table = Table(
+ 'level_exit', metadata,
+ autoload=True, autoload_with=engine)
+ assert set(level_exit_table.c.keys()) == set(
+ ['id', 'name', 'from_level', 'to_level'])
+ assert_col_type(level_exit_table.c.id, Integer)
+ assert_col_type(level_exit_table.c.name, VARCHAR)
+ assert_col_type(level_exit_table.c.from_level, VARCHAR)
+ assert level_exit_table.c.from_level.nullable is False
+ #assert level_exit_table.c.from_level.index is True
+ assert_col_type(level_exit_table.c.to_level, VARCHAR)
+ assert level_exit_table.c.to_level.nullable is False
+ #assert level_exit_table.c.to_level.index is True
+
+ # Now check to see if stuff seems to be in there.
+ session = Session()
+ creature = session.query(Creature3).filter_by(
+ name=u'centipede').one()
+ assert creature.num_limbs == 100.0
+ assert creature.magical_powers == []
+
+ creature = session.query(Creature3).filter_by(
+ name=u'wolf').one()
+ assert creature.num_limbs == 4.0
+ assert creature.magical_powers == []
+
+ creature = session.query(Creature3).filter_by(
+ name=u'wizardsnake').one()
+ assert creature.num_limbs == 0.0
+ assert creature.magical_powers == []
+
+ level = session.query(Level3).filter_by(
+ id=u'necroplex').one()
+ assert level.name == u'The Necroplex'
+ assert level.description == u'A complex full of pure deathzone.'
+ level_exits = _get_level3_exits(session, level)
+ assert level_exits == {
+ u'deathwell': u'evilstorm',
+ u'portal': u'central_park'}
+
+ level = session.query(Level3).filter_by(
+ id=u'evilstorm').one()
+ assert level.name == u'Evil Storm'
+ assert level.description == u'A storm full of pure evil.'
+ level_exits = _get_level3_exits(session, level)
+ assert level_exits == {} # You still can't escape the evilstorm!
+
+ level = session.query(Level3).filter_by(
+ id=u'central_park').one()
+ assert level.name == u'Central Park, NY, NY'
+ assert level.description == u"New York's friendly Central Park."
+ level_exits = _get_level3_exits(session, level)
+ assert level_exits == {
+ 'portal': 'necroplex'}
+
+
+#def test_set2_to_set3():
+ # Create / connect to database
+ # Create tables by migrating on empty initial set
+
+ # Install the initial set
+ # Check version in database
+ # Sanity check a few things in the database
+
+ # Migrate
+ # Make sure version matches expected
+ # Check all things in database match expected
+ # pass
+
+
+#def test_set1_to_set2_to_set3():
+ # Create / connect to database
+ # Create tables by migrating on empty initial set
+
+ # Install the initial set
+ # Check version in database
+ # Sanity check a few things in the database
+
+ # Migrate
+ # Make sure version matches expected
+ # Check all things in database match expected
+
+ # Migrate again
+ # Make sure version matches expected again
+ # Check all things in database match expected again
+
+ ##### Set2
+ # creature_table = Table(
+ # 'creature', metadata,
+ # autoload=True, autoload_with=db_conn.bind)
+ # assert set(creature_table.c.keys()) == set(
+ # ['id', 'name', 'num_legs'])
+ # assert_col_type(creature_table.c.id, Integer)
+ # assert_col_type(creature_table.c.name, VARCHAR)
+ # assert creature_table.c.name.nullable is False
+ # assert creature_table.c.name.index is True
+ # assert creature_table.c.name.unique is True
+ # assert_col_type(creature_table.c.num_legs, Integer)
+ # assert creature_table.c.num_legs.nullable is False
+
+ # # Check the CreaturePower table
+ # creature_power_table = Table(
+ # 'creature_power', metadata,
+ # autoload=True, autoload_with=db_conn.bind)
+ # assert set(creature_power_table.c.keys()) == set(
+ # ['id', 'creature', 'name', 'description', 'hitpower'])
+ # assert_col_type(creature_power_table.c.id, Integer)
+ # assert_col_type(creature_power_table.c.creature, Integer)
+ # assert creature_power_table.c.creature.nullable is False
+ # assert_col_type(creature_power_table.c.name, VARCHAR)
+ # assert_col_type(creature_power_table.c.description, VARCHAR)
+ # assert_col_type(creature_power_table.c.hitpower, Integer)
+ # assert creature_power_table.c.hitpower.nullable is False
+
+ # # Check the structure of the level table
+ # level_table = Table(
+ # 'level', metadata,
+ # autoload=True, autoload_with=db_conn.bind)
+ # assert set(level_table.c.keys()) == set(
+ # ['id', 'name', 'description'])
+ # assert_col_type(level_table.c.id, VARCHAR)
+ # assert level_table.c.id.primary_key is True
+ # assert_col_type(level_table.c.name, VARCHAR)
+ # assert_col_type(level_table.c.description, VARCHAR)
+
+ # # Check the structure of the level_exits table
+ # level_exit_table = Table(
+ # 'level_exit', metadata,
+ # autoload=True, autoload_with=db_conn.bind)
+ # assert set(level_exit_table.c.keys()) == set(
+ # ['id', 'name', 'from_level', 'to_level'])
+ # assert_col_type(level_exit_table.c.id, Integer)
+ # assert_col_type(level_exit_table.c.name, VARCHAR)
+ # assert_col_type(level_exit_table.c.from_level, VARCHAR)
+ # assert level_exit_table.c.from_level.nullable is False
+ # assert_col_type(level_exit_table.c.to_level, VARCHAR)
+
+ # pass
diff --git a/mediagoblin/tests/test_submission.py b/mediagoblin/tests/test_submission.py
index 702741b4..8bf7d13c 100644
--- a/mediagoblin/tests/test_submission.py
+++ b/mediagoblin/tests/test_submission.py
@@ -15,30 +15,35 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import urlparse
-import pkg_resources
+import os
import re
from nose.tools import assert_equal, assert_true, assert_false
+from pkg_resources import resource_filename
-from mediagoblin.tests.tools import setup_fresh_app, get_test_app, \
+from mediagoblin.tests.tools import get_test_app, \
fixture_add_user
from mediagoblin import mg_globals
-from mediagoblin.tools import template, common
-
-GOOD_JPG = pkg_resources.resource_filename(
- 'mediagoblin.tests', 'test_submission/good.jpg')
-GOOD_PNG = pkg_resources.resource_filename(
- 'mediagoblin.tests', 'test_submission/good.png')
-EVIL_FILE = pkg_resources.resource_filename(
- 'mediagoblin.tests', 'test_submission/evil')
-EVIL_JPG = pkg_resources.resource_filename(
- 'mediagoblin.tests', 'test_submission/evil.jpg')
-EVIL_PNG = pkg_resources.resource_filename(
- 'mediagoblin.tests', 'test_submission/evil.png')
+from mediagoblin.tools import template
+
+
+def resource(filename):
+ return resource_filename('mediagoblin.tests', 'test_submission/' + filename)
+
+
+GOOD_JPG = resource('good.jpg')
+GOOD_PNG = resource('good.png')
+EVIL_FILE = resource('evil')
+EVIL_JPG = resource('evil.jpg')
+EVIL_PNG = resource('evil.png')
+BIG_BLUE = resource('bigblue.png')
GOOD_TAG_STRING = 'yin,yang'
BAD_TAG_STRING = 'rage,' + 'f' * 26 + 'u' * 26
+FORM_CONTEXT = ['mediagoblin/submit/start.html', 'submit_form']
+REQUEST_CONTEXT = ['mediagoblin/user_pages/user.html', 'request']
+
class TestSubmission:
def setUp(self):
@@ -61,163 +66,130 @@ class TestSubmission:
def logout(self):
self.test_app.get('/auth/logout/')
+ def do_post(self, data, *context_keys, **kwargs):
+ url = kwargs.pop('url', '/submit/')
+ do_follow = kwargs.pop('do_follow', False)
+ template.clear_test_template_context()
+ response = self.test_app.post(url, data, **kwargs)
+ if do_follow:
+ response.follow()
+ context_data = template.TEMPLATE_TEST_CONTEXT
+ for key in context_keys:
+ context_data = context_data[key]
+ return response, context_data
+
+ def upload_data(self, filename):
+ return {'upload_files': [('file', filename)]}
+
+ def check_comments(self, request, media, count):
+ comments = request.db.MediaComment.find({'media_entry': media._id})
+ assert_equal(count, len(list(comments)))
+
def test_missing_fields(self):
# Test blank form
# ---------------
- template.clear_test_template_context()
- response = self.test_app.post(
- '/submit/', {})
- context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html']
- form = context['submit_form']
- assert form.file.errors == [u'You must provide a file.']
+ response, form = self.do_post({}, *FORM_CONTEXT)
+ assert_equal(form.file.errors, [u'You must provide a file.'])
# Test blank file
# ---------------
- template.clear_test_template_context()
- response = self.test_app.post(
- '/submit/', {
- 'title': 'test title'})
- context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html']
- form = context['submit_form']
- assert form.file.errors == [u'You must provide a file.']
-
-
- def test_normal_uploads(self):
- # Test JPG
- # --------
- template.clear_test_template_context()
- response = self.test_app.post(
- '/submit/', {
- 'title': 'Normal upload 1'
- }, upload_files=[(
- 'file', GOOD_JPG)])
+ response, form = self.do_post({'title': 'test title'}, *FORM_CONTEXT)
+ assert_equal(form.file.errors, [u'You must provide a file.'])
- # User should be redirected
- response.follow()
- assert_equal(
- urlparse.urlsplit(response.location)[2],
- '/u/chris/')
- assert template.TEMPLATE_TEST_CONTEXT.has_key(
- 'mediagoblin/user_pages/user.html')
+ def check_url(self, response, path):
+ assert_equal(urlparse.urlsplit(response.location)[2], path)
+ def check_normal_upload(self, title, filename):
+ response, context = self.do_post({'title': title}, do_follow=True,
+ **self.upload_data(filename))
+ self.check_url(response, '/u/{0}/'.format(self.test_user.username))
+ assert_true('mediagoblin/user_pages/user.html' in context)
# Make sure the media view is at least reachable, logged in...
- self.test_app.get('/u/chris/m/normal-upload-1/')
+ url = '/u/{0}/m/{1}/'.format(self.test_user.username,
+ title.lower().replace(' ', '-'))
+ self.test_app.get(url)
# ... and logged out too.
self.logout()
- self.test_app.get('/u/chris/m/normal-upload-1/')
- # Log back in for the remaining tests.
- self.login()
+ self.test_app.get(url)
- # Test PNG
- # --------
- template.clear_test_template_context()
- response = self.test_app.post(
- '/submit/', {
- 'title': 'Normal upload 2'
- }, upload_files=[(
- 'file', GOOD_PNG)])
+ def test_normal_jpg(self):
+ self.check_normal_upload('Normal upload 1', GOOD_JPG)
- response.follow()
- assert_equal(
- urlparse.urlsplit(response.location)[2],
- '/u/chris/')
- assert template.TEMPLATE_TEST_CONTEXT.has_key(
- 'mediagoblin/user_pages/user.html')
+ def test_normal_png(self):
+ self.check_normal_upload('Normal upload 2', GOOD_PNG)
+
+ def check_media(self, request, find_data, count=None):
+ media = request.db.MediaEntry.find(find_data)
+ if count is not None:
+ assert_equal(media.count(), count)
+ if count == 0:
+ return
+ return media[0]
def test_tags(self):
# Good tag string
# --------
- template.clear_test_template_context()
- response = self.test_app.post(
- '/submit/', {
- 'title': 'Balanced Goblin',
- 'tags': GOOD_TAG_STRING
- }, upload_files=[(
- 'file', GOOD_JPG)])
-
- # New media entry with correct tags should be created
- response.follow()
- context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/user_pages/user.html']
- request = context['request']
- media = request.db.MediaEntry.find({'title': 'Balanced Goblin'})[0]
- assert_equal(media['tags'],
+ response, request = self.do_post({'title': 'Balanced Goblin',
+ 'tags': GOOD_TAG_STRING},
+ *REQUEST_CONTEXT, do_follow=True,
+ **self.upload_data(GOOD_JPG))
+ media = self.check_media(request, {'title': 'Balanced Goblin'}, 1)
+ assert_equal(media.tags,
[{'name': u'yin', 'slug': u'yin'},
- {'name': u'yang', 'slug': u'yang'}])
+ {'name': u'yang', 'slug': u'yang'}])
# Test tags that are too long
# ---------------
- template.clear_test_template_context()
- response = self.test_app.post(
- '/submit/', {
- 'title': 'Balanced Goblin',
- 'tags': BAD_TAG_STRING
- }, upload_files=[(
- 'file', GOOD_JPG)])
-
- # Too long error should be raised
- context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html']
- form = context['submit_form']
- assert form.tags.errors == [
- u'Tags must be shorter than 50 characters. Tags that are too long'\
- ': ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu']
+ response, form = self.do_post({'title': 'Balanced Goblin',
+ 'tags': BAD_TAG_STRING},
+ *FORM_CONTEXT,
+ **self.upload_data(GOOD_JPG))
+ assert_equal(form.tags.errors, [
+ u'Tags must be shorter than 50 characters. ' \
+ 'Tags that are too long: ' \
+ 'ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu'])
def test_delete(self):
- template.clear_test_template_context()
- response = self.test_app.post(
- '/submit/', {
- 'title': 'Balanced Goblin',
- }, upload_files=[(
- 'file', GOOD_JPG)])
-
- # Post image
- response.follow()
-
- request = template.TEMPLATE_TEST_CONTEXT[
- 'mediagoblin/user_pages/user.html']['request']
-
- media = request.db.MediaEntry.find({'title': 'Balanced Goblin'})[0]
-
- # Does media entry exist?
- assert_true(media)
+ response, request = self.do_post({'title': 'Balanced Goblin'},
+ *REQUEST_CONTEXT, do_follow=True,
+ **self.upload_data(GOOD_JPG))
+ media = self.check_media(request, {'title': 'Balanced Goblin'}, 1)
+
+ # Add a comment, so we can test for its deletion later.
+ self.check_comments(request, media, 0)
+ comment_url = request.urlgen(
+ 'mediagoblin.user_pages.media_post_comment',
+ user=self.test_user.username, media=media._id)
+ response = self.do_post({'comment_content': 'i love this test'},
+ url=comment_url, do_follow=True)[0]
+ self.check_comments(request, media, 1)
# Do not confirm deletion
# ---------------------------------------------------
- response = self.test_app.post(
- request.urlgen('mediagoblin.user_pages.media_confirm_delete',
- # No work: user=media.uploader().username,
- user=self.test_user.username,
- media=media._id),
- # no value means no confirm
- {})
-
- response.follow()
-
- request = template.TEMPLATE_TEST_CONTEXT[
- 'mediagoblin/user_pages/user.html']['request']
-
- media = request.db.MediaEntry.find({'title': 'Balanced Goblin'})[0]
-
- # Does media entry still exist?
- assert_true(media)
+ delete_url = request.urlgen(
+ 'mediagoblin.user_pages.media_confirm_delete',
+ user=self.test_user.username, media=media._id)
+ # Empty data means don't confirm
+ response = self.do_post({}, do_follow=True, url=delete_url)[0]
+ media = self.check_media(request, {'title': 'Balanced Goblin'}, 1)
# Confirm deletion
# ---------------------------------------------------
- response = self.test_app.post(
- request.urlgen('mediagoblin.user_pages.media_confirm_delete',
- # No work: user=media.uploader().username,
- user=self.test_user.username,
- media=media._id),
- {'confirm': 'y'})
-
- response.follow()
-
- request = template.TEMPLATE_TEST_CONTEXT[
- 'mediagoblin/user_pages/user.html']['request']
+ response, request = self.do_post({'confirm': 'y'}, *REQUEST_CONTEXT,
+ do_follow=True, url=delete_url)
+ self.check_media(request, {'_id': media._id}, 0)
+ self.check_comments(request, media, 0)
- # Does media entry still exist?
- assert_false(
- request.db.MediaEntry.find(
- {'_id': media._id}).count())
+ def test_evil_file(self):
+ # Test non-suppoerted file with non-supported extension
+ # -----------------------------------------------------
+ response, form = self.do_post({'title': 'Malicious Upload 1'},
+ *FORM_CONTEXT,
+ **self.upload_data(EVIL_FILE))
+ assert_equal(len(form.file.errors), 1)
+ assert_true(re.match(
+ r'^Could not extract any file extension from ".*?"$',
+ str(form.file.errors[0])))
def test_sniffing(self):
'''
@@ -241,62 +213,43 @@ class TestSubmission:
assert media.media_type == 'mediagoblin.media_types.image'
- def test_malicious_uploads(self):
- # Test non-suppoerted file with non-supported extension
- # -----------------------------------------------------
- template.clear_test_template_context()
- response = self.test_app.post(
- '/submit/', {
- 'title': 'Malicious Upload 1'
- }, upload_files=[(
- 'file', EVIL_FILE)])
-
- context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html']
- form = context['submit_form']
- assert 'Sorry, I don\'t support that file type :(' == \
- str(form.file.errors[0])
- assert len(form.file.errors) == 1
-
+ def check_false_image(self, title, filename):
# NOTE: The following 2 tests will ultimately fail, but they
# *will* pass the initial form submission step. Instead,
# they'll be caught as failures during the processing step.
+ response, context = self.do_post({'title': title}, do_follow=True,
+ **self.upload_data(filename))
+ self.check_url(response, '/u/{0}/'.format(self.test_user.username))
+ entry = mg_globals.database.MediaEntry.find_one({'title': title})
+ assert_equal(entry.state, 'failed')
+ assert_equal(entry.fail_error, u'mediagoblin.processing:BadMediaFail')
+ def test_evil_jpg(self):
# Test non-supported file with .jpg extension
# -------------------------------------------
- template.clear_test_template_context()
- response = self.test_app.post(
- '/submit/', {
- 'title': 'Malicious Upload 2'
- }, upload_files=[(
- 'file', EVIL_JPG)])
- response.follow()
- assert_equal(
- urlparse.urlsplit(response.location)[2],
- '/u/chris/')
-
- entry = mg_globals.database.MediaEntry.find_one(
- {'title': 'Malicious Upload 2'})
- assert_equal(entry.state, 'failed')
- assert_equal(
- entry['fail_error'],
- u'mediagoblin.processing:BadMediaFail')
+ self.check_false_image('Malicious Upload 2', EVIL_JPG)
+ def test_evil_png(self):
# Test non-supported file with .png extension
# -------------------------------------------
- template.clear_test_template_context()
- response = self.test_app.post(
- '/submit/', {
- 'title': 'Malicious Upload 3'
- }, upload_files=[(
- 'file', EVIL_PNG)])
- response.follow()
- assert_equal(
- urlparse.urlsplit(response.location)[2],
- '/u/chris/')
-
- entry = mg_globals.database.MediaEntry.find_one(
- {'title': 'Malicious Upload 3'})
- assert_equal(entry.state, 'failed')
- assert_equal(
- entry['fail_error'],
- u'mediagoblin.processing:BadMediaFail')
+ self.check_false_image('Malicious Upload 3', EVIL_PNG)
+
+ def test_processing(self):
+ data = {'title': 'Big Blue'}
+ response, request = self.do_post(data, *REQUEST_CONTEXT, do_follow=True,
+ **self.upload_data(BIG_BLUE))
+ media = self.check_media(request, data, 1)
+ last_size = 1024 ** 3 # Needs to be larger than bigblue.png
+ for key, basename in (('original', 'bigblue.png'),
+ ('medium', 'bigblue.medium.png'),
+ ('thumb', 'bigblue.thumbnail.png')):
+ # Does the processed image have a good filename?
+ filename = resource_filename(
+ 'mediagoblin.tests',
+ os.path.join('test_user_dev/media/public',
+ *media['media_files'].get(key, [])))
+ assert_true(filename.endswith('_' + basename))
+ # Is it smaller than the last processed image we looked at?
+ size = os.stat(filename).st_size
+ assert_true(last_size > size)
+ last_size = size
diff --git a/mediagoblin/tests/test_submission/bigblue.png b/mediagoblin/tests/test_submission/bigblue.png
new file mode 100644
index 00000000..2b2c2a44
--- /dev/null
+++ b/mediagoblin/tests/test_submission/bigblue.png
Binary files differ