diff options
Diffstat (limited to 'mediagoblin/tests')
-rw-r--r-- | mediagoblin/tests/fake_carrot_conf_bad.ini | 2 | ||||
-rw-r--r-- | mediagoblin/tests/fake_carrot_conf_good.ini | 2 | ||||
-rw-r--r-- | mediagoblin/tests/fake_celery_conf.ini | 14 | ||||
-rw-r--r-- | mediagoblin/tests/fake_celery_conf_mgdb.ini | 14 | ||||
-rw-r--r-- | mediagoblin/tests/fake_config_spec.ini | 2 | ||||
-rw-r--r-- | mediagoblin/tests/test_celery_setup.py | 45 | ||||
-rw-r--r-- | mediagoblin/tests/test_config.py | 8 | ||||
-rw-r--r-- | mediagoblin/tests/test_mgoblin_app.ini | 2 | ||||
-rw-r--r-- | mediagoblin/tests/test_paste.ini | 2 | ||||
-rw-r--r-- | mediagoblin/tests/test_processing.py | 20 | ||||
-rw-r--r-- | mediagoblin/tests/test_sql_migrations.py | 884 | ||||
-rw-r--r-- | mediagoblin/tests/test_submission.py | 335 | ||||
-rw-r--r-- | mediagoblin/tests/test_submission/bigblue.png | bin | 0 -> 3142 bytes |
13 files changed, 1074 insertions, 256 deletions
diff --git a/mediagoblin/tests/fake_carrot_conf_bad.ini b/mediagoblin/tests/fake_carrot_conf_bad.ini index 0c79b354..9d8cf518 100644 --- a/mediagoblin/tests/fake_carrot_conf_bad.ini +++ b/mediagoblin/tests/fake_carrot_conf_bad.ini @@ -11,4 +11,4 @@ encouragement_phrase = 586956856856 # shouldn't throw error blah_blah = "blah!" # shouldn't throw error either [celery] -eat_celery_with_carrots = pants # yeah that's def an error right there. +EAT_CELERY_WITH_CARROTS = pants # yeah that's def an error right there. diff --git a/mediagoblin/tests/fake_carrot_conf_good.ini b/mediagoblin/tests/fake_carrot_conf_good.ini index fed14d07..1377907b 100644 --- a/mediagoblin/tests/fake_carrot_conf_good.ini +++ b/mediagoblin/tests/fake_carrot_conf_good.ini @@ -10,4 +10,4 @@ encouragement_phrase = "I'd love it if you eat your carrots!" blah_blah = "blah!" [celery] -eat_celery_with_carrots = False +EAT_CELERY_WITH_CARROTS = False diff --git a/mediagoblin/tests/fake_celery_conf.ini b/mediagoblin/tests/fake_celery_conf.ini index 3e52ac3a..67b0cba6 100644 --- a/mediagoblin/tests/fake_celery_conf.ini +++ b/mediagoblin/tests/fake_celery_conf.ini @@ -1,9 +1,9 @@ -['mediagoblin'] +[mediagoblin] # I got nothin' in this file! -['celery'] -some_variable = floop -mail_port = 2000 -celeryd_eta_scheduler_precision = 1.3 -celery_result_persistent = true -celery_imports = foo.bar.baz, this.is.an.import +[celery] +SOME_VARIABLE = floop +MAIL_PORT = 2000 +CELERYD_ETA_SCHEDULER_PRECISION = 1.3 +CELERY_RESULT_PERSISTENT = true +CELERY_IMPORTS = foo.bar.baz, this.is.an.import diff --git a/mediagoblin/tests/fake_celery_conf_mgdb.ini b/mediagoblin/tests/fake_celery_conf_mgdb.ini deleted file mode 100644 index 52671c14..00000000 --- a/mediagoblin/tests/fake_celery_conf_mgdb.ini +++ /dev/null @@ -1,14 +0,0 @@ -['mediagoblin'] -db_host = mongodb.example.org -db_port = 8080 -db_name = captain_lollerskates - -['something'] -or = other - -['celery'] -some_variable = poolf -mail_port = 2020 -celeryd_eta_scheduler_precision = 3.1 -celery_result_persistent = false -celery_imports = baz.bar.foo, import.is.a.this diff --git a/mediagoblin/tests/fake_config_spec.ini b/mediagoblin/tests/fake_config_spec.ini index 9421ce36..43f2e236 100644 --- a/mediagoblin/tests/fake_config_spec.ini +++ b/mediagoblin/tests/fake_config_spec.ini @@ -7,4 +7,4 @@ num_carrots = integer(default=1) encouragement_phrase = string() [celery] -eat_celery_with_carrots = boolean(default=True)
\ No newline at end of file +EAT_CELERY_WITH_CARROTS = boolean(default=True)
\ No newline at end of file diff --git a/mediagoblin/tests/test_celery_setup.py b/mediagoblin/tests/test_celery_setup.py index c9c77821..5530c6f2 100644 --- a/mediagoblin/tests/test_celery_setup.py +++ b/mediagoblin/tests/test_celery_setup.py @@ -22,8 +22,6 @@ from mediagoblin.init.config import read_mediagoblin_config TEST_CELERY_CONF_NOSPECIALDB = pkg_resources.resource_filename( 'mediagoblin.tests', 'fake_celery_conf.ini') -TEST_CELERY_CONF_MGSPECIALDB = pkg_resources.resource_filename( - 'mediagoblin.tests', 'fake_celery_conf_mgdb.ini') def test_setup_celery_from_config(): @@ -50,36 +48,13 @@ def test_setup_celery_from_config(): assert isinstance(fake_celery_module.CELERYD_ETA_SCHEDULER_PRECISION, float) assert fake_celery_module.CELERY_RESULT_PERSISTENT is True assert fake_celery_module.CELERY_IMPORTS == [ - 'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing'] - assert fake_celery_module.CELERY_MONGODB_BACKEND_SETTINGS == { - 'database': 'mediagoblin'} - assert fake_celery_module.CELERY_RESULT_BACKEND == 'mongodb' - assert fake_celery_module.BROKER_BACKEND == 'mongodb' - - _wipe_testmodule_clean(fake_celery_module) - - global_config, validation_result = read_mediagoblin_config( - TEST_CELERY_CONF_MGSPECIALDB) - app_config = global_config['mediagoblin'] - - celery_setup.setup_celery_from_config( - app_config, global_config, - 'mediagoblin.tests.fake_celery_module', set_environ=False) - - from mediagoblin.tests import fake_celery_module - assert fake_celery_module.SOME_VARIABLE == 'poolf' - assert fake_celery_module.MAIL_PORT == 2020 - assert isinstance(fake_celery_module.MAIL_PORT, int) - assert fake_celery_module.CELERYD_ETA_SCHEDULER_PRECISION == 3.1 - assert isinstance(fake_celery_module.CELERYD_ETA_SCHEDULER_PRECISION, float) - assert fake_celery_module.CELERY_RESULT_PERSISTENT is False - assert fake_celery_module.CELERY_IMPORTS == [ - 'baz.bar.foo', 'import.is.a.this', 'mediagoblin.processing'] - assert fake_celery_module.CELERY_MONGODB_BACKEND_SETTINGS == { - 'database': 'captain_lollerskates', - 'host': 'mongodb.example.org', - 'port': 8080} - assert fake_celery_module.CELERY_RESULT_BACKEND == 'mongodb' - assert fake_celery_module.BROKER_BACKEND == 'mongodb' - assert fake_celery_module.BROKER_HOST == 'mongodb.example.org' - assert fake_celery_module.BROKER_PORT == 8080 + 'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing.task'] + assert fake_celery_module.CELERY_RESULT_BACKEND == 'database' + assert fake_celery_module.CELERY_RESULT_DBURI == ( + 'sqlite:///' + + pkg_resources.resource_filename('mediagoblin.tests', 'celery.db')) + + assert fake_celery_module.BROKER_TRANSPORT == 'sqlalchemy' + assert fake_celery_module.BROKER_HOST == ( + 'sqlite:///' + + pkg_resources.resource_filename('mediagoblin.tests', 'kombu.db')) diff --git a/mediagoblin/tests/test_config.py b/mediagoblin/tests/test_config.py index c596f6a6..7d8c65c1 100644 --- a/mediagoblin/tests/test_config.py +++ b/mediagoblin/tests/test_config.py @@ -37,7 +37,7 @@ def test_read_mediagoblin_config(): assert this_conf['carrotapp']['carrotcake'] == False assert this_conf['carrotapp']['num_carrots'] == 1 assert not this_conf['carrotapp'].has_key('encouragement_phrase') - assert this_conf['celery']['eat_celery_with_carrots'] == True + assert this_conf['celery']['EAT_CELERY_WITH_CARROTS'] == True # A good file this_conf, validation_results = config.read_mediagoblin_config( @@ -48,7 +48,7 @@ def test_read_mediagoblin_config(): assert this_conf['carrotapp']['encouragement_phrase'] == \ "I'd love it if you eat your carrots!" assert this_conf['carrotapp']['blah_blah'] == "blah!" - assert this_conf['celery']['eat_celery_with_carrots'] == False + assert this_conf['celery']['EAT_CELERY_WITH_CARROTS'] == False # A bad file this_conf, validation_results = config.read_mediagoblin_config( @@ -61,7 +61,7 @@ def test_read_mediagoblin_config(): assert this_conf['carrotapp']['encouragement_phrase'] == \ "586956856856" assert this_conf['carrotapp']['blah_blah'] == "blah!" - assert this_conf['celery']['eat_celery_with_carrots'] == "pants" + assert this_conf['celery']['EAT_CELERY_WITH_CARROTS'] == "pants" def test_generate_validation_report(): @@ -89,7 +89,7 @@ There were validation problems loading this config file: expected_warnings = [ 'carrotapp:carrotcake = the value "slobber" is of the wrong type.', 'carrotapp:num_carrots = the value "GROSS" is of the wrong type.', - 'celery:eat_celery_with_carrots = the value "pants" is of the wrong type.'] + 'celery:EAT_CELERY_WITH_CARROTS = the value "pants" is of the wrong type.'] warnings = report.splitlines()[2:] assert len(warnings) == 3 diff --git a/mediagoblin/tests/test_mgoblin_app.ini b/mediagoblin/tests/test_mgoblin_app.ini index c91ed92b..01bf0972 100644 --- a/mediagoblin/tests/test_mgoblin_app.ini +++ b/mediagoblin/tests/test_mgoblin_app.ini @@ -26,4 +26,4 @@ data_dir = %(here)s/test_user_dev/beaker/cache/data lock_dir = %(here)s/test_user_dev/beaker/cache/lock [celery] -celery_always_eager = true +CELERY_ALWAYS_EAGER = true diff --git a/mediagoblin/tests/test_paste.ini b/mediagoblin/tests/test_paste.ini index bd57994b..d7c18642 100644 --- a/mediagoblin/tests/test_paste.ini +++ b/mediagoblin/tests/test_paste.ini @@ -29,7 +29,7 @@ beaker.session.data_dir = %(here)s/test_user_dev/beaker/sessions/data beaker.session.lock_dir = %(here)s/test_user_dev/beaker/sessions/lock [celery] -celery_always_eager = true +CELERY_ALWAYS_EAGER = true [server:main] use = egg:Paste#http diff --git a/mediagoblin/tests/test_processing.py b/mediagoblin/tests/test_processing.py new file mode 100644 index 00000000..417f91f3 --- /dev/null +++ b/mediagoblin/tests/test_processing.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python + +from nose.tools import assert_equal, assert_true, assert_false + +from mediagoblin import processing + +class TestProcessing(object): + def run_fill(self, input, format, output=None): + builder = processing.FilenameBuilder(input) + result = builder.fill(format) + if output is None: + return result + assert_equal(output, result) + + def test_easy_filename_fill(self): + self.run_fill('/home/user/foo.TXT', '{basename}bar{ext}', 'foobar.txt') + + def test_long_filename_fill(self): + self.run_fill('{0}.png'.format('A' * 300), 'image-{basename}{ext}', + 'image-{0}.png'.format('A' * 245)) diff --git a/mediagoblin/tests/test_sql_migrations.py b/mediagoblin/tests/test_sql_migrations.py new file mode 100644 index 00000000..507a7725 --- /dev/null +++ b/mediagoblin/tests/test_sql_migrations.py @@ -0,0 +1,884 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2012, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import copy + +from sqlalchemy import ( + Table, Column, MetaData, Index, + Integer, Float, Unicode, UnicodeText, DateTime, Boolean, + ForeignKey, UniqueConstraint, PickleType, VARCHAR) +from sqlalchemy.orm import sessionmaker, relationship +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.sql import select, insert +from migrate import changeset + +from mediagoblin.db.sql.base import GMGTableBase +from mediagoblin.db.sql.util import MigrationManager, RegisterMigration + + +# This one will get filled with local migrations +FULL_MIGRATIONS = {} + + +####################################################### +# Migration set 1: Define initial models, no migrations +####################################################### + +Base1 = declarative_base(cls=GMGTableBase) + +class Creature1(Base1): + __tablename__ = "creature" + + id = Column(Integer, primary_key=True) + name = Column(Unicode, unique=True, nullable=False, index=True) + num_legs = Column(Integer, nullable=False) + is_demon = Column(Boolean) + +class Level1(Base1): + __tablename__ = "level" + + id = Column(Unicode, primary_key=True) + name = Column(Unicode) + description = Column(Unicode) + exits = Column(PickleType) + +SET1_MODELS = [Creature1, Level1] + +SET1_MIGRATIONS = {} + +####################################################### +# Migration set 2: A few migrations and new model +####################################################### + +Base2 = declarative_base(cls=GMGTableBase) + +class Creature2(Base2): + __tablename__ = "creature" + + id = Column(Integer, primary_key=True) + name = Column(Unicode, unique=True, nullable=False, index=True) + num_legs = Column(Integer, nullable=False) + magical_powers = relationship("CreaturePower2") + +class CreaturePower2(Base2): + __tablename__ = "creature_power" + + id = Column(Integer, primary_key=True) + creature = Column( + Integer, ForeignKey('creature.id'), nullable=False) + name = Column(Unicode) + description = Column(Unicode) + hitpower = Column(Integer, nullable=False) + +class Level2(Base2): + __tablename__ = "level" + + id = Column(Unicode, primary_key=True) + name = Column(Unicode) + description = Column(Unicode) + +class LevelExit2(Base2): + __tablename__ = "level_exit" + + id = Column(Integer, primary_key=True) + name = Column(Unicode) + from_level = Column( + Unicode, ForeignKey('level.id'), nullable=False) + to_level = Column( + Unicode, ForeignKey('level.id'), nullable=False) + +SET2_MODELS = [Creature2, CreaturePower2, Level2, LevelExit2] + + +@RegisterMigration(1, FULL_MIGRATIONS) +def creature_remove_is_demon(db_conn): + """ + Remove the is_demon field from the creature model. We don't need + it! + """ + # :( Commented out 'cuz of: + # http://code.google.com/p/sqlalchemy-migrate/issues/detail?id=143&thanks=143&ts=1327882242 + + # metadata = MetaData(bind=db_conn.bind) + # creature_table = Table( + # 'creature', metadata, + # autoload=True, autoload_with=db_conn.bind) + # creature_table.drop_column('is_demon') + pass + + +@RegisterMigration(2, FULL_MIGRATIONS) +def creature_powers_new_table(db_conn): + """ + Add a new table for creature powers. Nothing needs to go in it + yet though as there wasn't anything that previously held this + information + """ + metadata = MetaData(bind=db_conn.bind) + + # We have to access the creature table so sqlalchemy can make the + # foreign key relationship + creature_table = Table( + 'creature', metadata, + autoload=True, autoload_with=db_conn.bind) + + creature_powers = Table( + 'creature_power', metadata, + Column('id', Integer, primary_key=True), + Column('creature', + Integer, ForeignKey('creature.id'), nullable=False), + Column('name', Unicode), + Column('description', Unicode), + Column('hitpower', Integer, nullable=False)) + metadata.create_all(db_conn.bind) + + +@RegisterMigration(3, FULL_MIGRATIONS) +def level_exits_new_table(db_conn): + """ + Make a new table for level exits and move the previously pickled + stuff over to here (then drop the old unneeded table) + """ + # First, create the table + # ----------------------- + metadata = MetaData(bind=db_conn.bind) + + # Minimal representation of level table. + # Not auto-introspecting here because of pickle table. I'm not + # sure sqlalchemy can auto-introspect pickle columns. + levels = Table( + 'level', metadata, + Column('id', Unicode, primary_key=True), + Column('name', Unicode), + Column('description', Unicode), + Column('exits', PickleType)) + + level_exits = Table( + 'level_exit', metadata, + Column('id', Integer, primary_key=True), + Column('name', Unicode), + Column('from_level', + Unicode, ForeignKey('level.id'), nullable=False), + Column('to_level', + Unicode, ForeignKey('level.id'), nullable=False)) + metadata.create_all(db_conn.bind) + + # And now, convert all the old exit pickles to new level exits + # ------------------------------------------------------------ + + # query over and insert + result = db_conn.execute( + select([levels], levels.c.exits!=None)) + + for level in result: + + for exit_name, to_level in level['exits'].iteritems(): + # Insert the level exit + db_conn.execute( + level_exits.insert().values( + name=exit_name, + from_level=level.id, + to_level=to_level)) + + # Finally, drop the old level exits pickle table + # ---------------------------------------------- + levels.drop_column('exits') + + +# A hack! At this point we freeze-fame and get just a partial list of +# migrations + +SET2_MIGRATIONS = copy.copy(FULL_MIGRATIONS) + +####################################################### +# Migration set 3: Final migrations +####################################################### + +Base3 = declarative_base(cls=GMGTableBase) + +class Creature3(Base3): + __tablename__ = "creature" + + id = Column(Integer, primary_key=True) + name = Column(Unicode, unique=True, nullable=False, index=True) + num_limbs= Column(Integer, nullable=False) + magical_powers = relationship("CreaturePower3") + +class CreaturePower3(Base3): + __tablename__ = "creature_power" + + id = Column(Integer, primary_key=True) + creature = Column( + Integer, ForeignKey('creature.id'), nullable=False, index=True) + name = Column(Unicode) + description = Column(Unicode) + hitpower = Column(Float, nullable=False) + +class Level3(Base3): + __tablename__ = "level" + + id = Column(Unicode, primary_key=True) + name = Column(Unicode) + description = Column(Unicode) + +class LevelExit3(Base3): + __tablename__ = "level_exit" + + id = Column(Integer, primary_key=True) + name = Column(Unicode) + from_level = Column( + Unicode, ForeignKey('level.id'), nullable=False, index=True) + to_level = Column( + Unicode, ForeignKey('level.id'), nullable=False, index=True) + + +SET3_MODELS = [Creature3, CreaturePower3, Level3, LevelExit3] +SET3_MIGRATIONS = FULL_MIGRATIONS + + +@RegisterMigration(4, FULL_MIGRATIONS) +def creature_num_legs_to_num_limbs(db_conn): + """ + Turns out we're tracking all sorts of limbs, not "legs" + specifically. Humans would be 4 here, for instance. So we + renamed the column. + """ + metadata = MetaData(bind=db_conn.bind) + creature_table = Table( + 'creature', metadata, + autoload=True, autoload_with=db_conn.bind) + creature_table.c.num_legs.alter(name=u"num_limbs") + + +@RegisterMigration(5, FULL_MIGRATIONS) +def level_exit_index_from_and_to_level(db_conn): + """ + Index the from and to levels of the level exit table. + """ + metadata = MetaData(bind=db_conn.bind) + level_exit = Table( + 'level_exit', metadata, + autoload=True, autoload_with=db_conn.bind) + Index('ix_level_exit_from_level', + level_exit.c.from_level).create(db_conn.bind) + Index('ix_level_exit_to_level', + level_exit.c.to_level).create(db_conn.bind) + + +@RegisterMigration(6, FULL_MIGRATIONS) +def creature_power_index_creature(db_conn): + """ + Index our foreign key relationship to the creatures + """ + metadata = MetaData(bind=db_conn.bind) + creature_power = Table( + 'creature_power', metadata, + autoload=True, autoload_with=db_conn.bind) + Index('ix_creature_power_creature', + creature_power.c.creature).create(db_conn.bind) + + +@RegisterMigration(7, FULL_MIGRATIONS) +def creature_power_hitpower_to_float(db_conn): + """ + Convert hitpower column on creature power table from integer to + float. + + Turns out we want super precise values of how much hitpower there + really is. + """ + metadata = MetaData(bind=db_conn.bind) + + # We have to access the creature table so sqlalchemy can make the + # foreign key relationship + creature_table = Table( + 'creature', metadata, + autoload=True, autoload_with=db_conn.bind) + + creature_power = Table( + 'creature_power', metadata, + Column('id', Integer, primary_key=True), + Column('creature', Integer, + ForeignKey('creature.id'), nullable=False, + index=True), + Column('name', Unicode), + Column('description', Unicode), + Column('hitpower', Integer, nullable=False)) + + creature_power.c.hitpower.alter(type=Float) + + +def _insert_migration1_objects(session): + """ + Test objects to insert for the first set of things + """ + # Insert creatures + session.add_all( + [Creature1(name=u'centipede', + num_legs=100, + is_demon=False), + Creature1(name=u'wolf', + num_legs=4, + is_demon=False), + # don't ask me what a wizardsnake is. + Creature1(name=u'wizardsnake', + num_legs=0, + is_demon=True)]) + + # Insert levels + session.add_all( + [Level1(id=u'necroplex', + name=u'The Necroplex', + description=u'A complex full of pure deathzone.', + exits={ + 'deathwell': 'evilstorm', + 'portal': 'central_park'}), + Level1(id=u'evilstorm', + name=u'Evil Storm', + description=u'A storm full of pure evil.', + exits={}), # you can't escape the evilstorm + Level1(id=u'central_park', + name=u'Central Park, NY, NY', + description=u"New York's friendly Central Park.", + exits={ + 'portal': 'necroplex'})]) + + session.commit() + + +def _insert_migration2_objects(session): + """ + Test objects to insert for the second set of things + """ + # Insert creatures + session.add_all( + [Creature2( + name=u'centipede', + num_legs=100), + Creature2( + name=u'wolf', + num_legs=4, + magical_powers = [ + CreaturePower2( + name=u"ice breath", + description=u"A blast of icy breath!", + hitpower=20), + CreaturePower2( + name=u"death stare", + description=u"A frightening stare, for sure!", + hitpower=45)]), + Creature2( + name=u'wizardsnake', + num_legs=0, + magical_powers=[ + CreaturePower2( + name=u'death_rattle', + description=u'A rattle... of DEATH!', + hitpower=1000), + CreaturePower2( + name=u'sneaky_stare', + description=u"The sneakiest stare you've ever seen!", + hitpower=300), + CreaturePower2( + name=u'slithery_smoke', + description=u"A blast of slithery, slithery smoke.", + hitpower=10), + CreaturePower2( + name=u'treacherous_tremors', + description=u"The ground shakes beneath footed animals!", + hitpower=0)])]) + + # Insert levels + session.add_all( + [Level2(id=u'necroplex', + name=u'The Necroplex', + description=u'A complex full of pure deathzone.'), + Level2(id=u'evilstorm', + name=u'Evil Storm', + description=u'A storm full of pure evil.', + exits=[]), # you can't escape the evilstorm + Level2(id=u'central_park', + name=u'Central Park, NY, NY', + description=u"New York's friendly Central Park.")]) + + # necroplex exits + session.add_all( + [LevelExit2(name=u'deathwell', + from_level=u'necroplex', + to_level=u'evilstorm'), + LevelExit2(name=u'portal', + from_level=u'necroplex', + to_level=u'central_park')]) + + # there are no evilstorm exits because there is no exit from the + # evilstorm + + # central park exits + session.add_all( + [LevelExit2(name=u'portal', + from_level=u'central_park', + to_level=u'necroplex')]) + + session.commit() + + +def _insert_migration3_objects(session): + """ + Test objects to insert for the third set of things + """ + # Insert creatures + session.add_all( + [Creature3( + name=u'centipede', + num_limbs=100), + Creature3( + name=u'wolf', + num_limbs=4, + magical_powers = [ + CreaturePower3( + name=u"ice breath", + description=u"A blast of icy breath!", + hitpower=20.0), + CreaturePower3( + name=u"death stare", + description=u"A frightening stare, for sure!", + hitpower=45.0)]), + Creature3( + name=u'wizardsnake', + num_limbs=0, + magical_powers=[ + CreaturePower3( + name=u'death_rattle', + description=u'A rattle... of DEATH!', + hitpower=1000.0), + CreaturePower3( + name=u'sneaky_stare', + description=u"The sneakiest stare you've ever seen!", + hitpower=300.0), + CreaturePower3( + name=u'slithery_smoke', + description=u"A blast of slithery, slithery smoke.", + hitpower=10.0), + CreaturePower3( + name=u'treacherous_tremors', + description=u"The ground shakes beneath footed animals!", + hitpower=0.0)])], + # annnnnd one more to test a floating point hitpower + Creature3( + name=u'deity', + numb_limbs=30, + magical_powers=[ + CreaturePower3( + name=u'smite', + description=u'Smitten by holy wrath!', + hitpower=9999.9)])) + + # Insert levels + session.add_all( + [Level3(id=u'necroplex', + name=u'The Necroplex', + description=u'A complex full of pure deathzone.'), + Level3(id=u'evilstorm', + name=u'Evil Storm', + description=u'A storm full of pure evil.', + exits=[]), # you can't escape the evilstorm + Level3(id=u'central_park', + name=u'Central Park, NY, NY', + description=u"New York's friendly Central Park.")]) + + # necroplex exits + session.add_all( + [LevelExit3(name=u'deathwell', + from_level=u'necroplex', + to_level=u'evilstorm'), + LevelExit3(name=u'portal', + from_level=u'necroplex', + to_level=u'central_park')]) + + # there are no evilstorm exits because there is no exit from the + # evilstorm + + # central park exits + session.add_all( + [LevelExit3(name=u'portal', + from_level=u'central_park', + to_level=u'necroplex')]) + + session.commit() + + +class CollectingPrinter(object): + def __init__(self): + self.collection = [] + + def __call__(self, string): + self.collection.append(string) + + @property + def combined_string(self): + return u''.join(self.collection) + + +def create_test_engine(): + from sqlalchemy import create_engine + engine = create_engine('sqlite:///:memory:', echo=False) + Session = sessionmaker(bind=engine) + return engine, Session + + +def assert_col_type(column, this_class): + assert isinstance(column.type, this_class) + + +def _get_level3_exits(session, level): + return dict( + [(level_exit.name, level_exit.to_level) + for level_exit in + session.query(LevelExit3).filter_by(from_level=level.id)]) + + +def test_set1_to_set3(): + # Create / connect to database + # ---------------------------- + + engine, Session = create_test_engine() + + # Create tables by migrating on empty initial set + # ----------------------------------------------- + + printer = CollectingPrinter() + migration_manager = MigrationManager( + '__main__', SET1_MODELS, SET1_MIGRATIONS, Session(), + printer) + + # Check latest migration and database current migration + assert migration_manager.latest_migration == 0 + assert migration_manager.database_current_migration == None + + result = migration_manager.init_or_migrate() + + # Make sure output was "inited" + assert result == u'inited' + # Check output + assert printer.combined_string == ( + "-> Initializing main mediagoblin tables... done.\n") + # Check version in database + assert migration_manager.latest_migration == 0 + assert migration_manager.database_current_migration == 0 + + # Install the initial set + # ----------------------- + + _insert_migration1_objects(Session()) + + # Try to "re-migrate" with same manager settings... nothing should happen + migration_manager = MigrationManager( + '__main__', SET1_MODELS, SET1_MIGRATIONS, Session(), + printer) + assert migration_manager.init_or_migrate() == None + + # Check version in database + assert migration_manager.latest_migration == 0 + assert migration_manager.database_current_migration == 0 + + # Sanity check a few things in the database... + metadata = MetaData(bind=engine) + + # Check the structure of the creature table + creature_table = Table( + 'creature', metadata, + autoload=True, autoload_with=engine) + assert set(creature_table.c.keys()) == set( + ['id', 'name', 'num_legs', 'is_demon']) + assert_col_type(creature_table.c.id, Integer) + assert_col_type(creature_table.c.name, VARCHAR) + assert creature_table.c.name.nullable is False + #assert creature_table.c.name.index is True + #assert creature_table.c.name.unique is True + assert_col_type(creature_table.c.num_legs, Integer) + assert creature_table.c.num_legs.nullable is False + assert_col_type(creature_table.c.is_demon, Boolean) + + # Check the structure of the level table + level_table = Table( + 'level', metadata, + autoload=True, autoload_with=engine) + assert set(level_table.c.keys()) == set( + ['id', 'name', 'description', 'exits']) + assert_col_type(level_table.c.id, VARCHAR) + assert level_table.c.id.primary_key is True + assert_col_type(level_table.c.name, VARCHAR) + assert_col_type(level_table.c.description, VARCHAR) + # Skipping exits... Not sure if we can detect pickletype, not a + # big deal regardless. + + # Now check to see if stuff seems to be in there. + session = Session() + + creature = session.query(Creature1).filter_by( + name=u'centipede').one() + assert creature.num_legs == 100 + assert creature.is_demon == False + + creature = session.query(Creature1).filter_by( + name=u'wolf').one() + assert creature.num_legs == 4 + assert creature.is_demon == False + + creature = session.query(Creature1).filter_by( + name=u'wizardsnake').one() + assert creature.num_legs == 0 + assert creature.is_demon == True + + level = session.query(Level1).filter_by( + id=u'necroplex').one() + assert level.name == u'The Necroplex' + assert level.description == u'A complex full of pure deathzone.' + assert level.exits == { + 'deathwell': 'evilstorm', + 'portal': 'central_park'} + + level = session.query(Level1).filter_by( + id=u'evilstorm').one() + assert level.name == u'Evil Storm' + assert level.description == u'A storm full of pure evil.' + assert level.exits == {} # You still can't escape the evilstorm! + + level = session.query(Level1).filter_by( + id=u'central_park').one() + assert level.name == u'Central Park, NY, NY' + assert level.description == u"New York's friendly Central Park." + assert level.exits == { + 'portal': 'necroplex'} + + # Create new migration manager, but make sure the db migration + # isn't said to be updated yet + printer = CollectingPrinter() + migration_manager = MigrationManager( + '__main__', SET3_MODELS, SET3_MIGRATIONS, Session(), + printer) + + assert migration_manager.latest_migration == 7 + assert migration_manager.database_current_migration == 0 + + # Migrate + result = migration_manager.init_or_migrate() + + # Make sure result was "migrated" + assert result == u'migrated' + + # TODO: Check output to user + assert printer.combined_string == """\ +-> Updating main mediagoblin tables: + + Running migration 1, "creature_remove_is_demon"... done. + + Running migration 2, "creature_powers_new_table"... done. + + Running migration 3, "level_exits_new_table"... done. + + Running migration 4, "creature_num_legs_to_num_limbs"... done. + + Running migration 5, "level_exit_index_from_and_to_level"... done. + + Running migration 6, "creature_power_index_creature"... done. + + Running migration 7, "creature_power_hitpower_to_float"... done. +""" + + # Make sure version matches expected + migration_manager = MigrationManager( + '__main__', SET3_MODELS, SET3_MIGRATIONS, Session(), + printer) + assert migration_manager.latest_migration == 7 + assert migration_manager.database_current_migration == 7 + + # Check all things in database match expected + + # Check the creature table + metadata = MetaData(bind=engine) + creature_table = Table( + 'creature', metadata, + autoload=True, autoload_with=engine) + # assert set(creature_table.c.keys()) == set( + # ['id', 'name', 'num_limbs']) + assert set(creature_table.c.keys()) == set( + [u'id', 'name', u'num_limbs', u'is_demon']) + assert_col_type(creature_table.c.id, Integer) + assert_col_type(creature_table.c.name, VARCHAR) + assert creature_table.c.name.nullable is False + #assert creature_table.c.name.index is True + #assert creature_table.c.name.unique is True + assert_col_type(creature_table.c.num_limbs, Integer) + assert creature_table.c.num_limbs.nullable is False + + # Check the CreaturePower table + creature_power_table = Table( + 'creature_power', metadata, + autoload=True, autoload_with=engine) + assert set(creature_power_table.c.keys()) == set( + ['id', 'creature', 'name', 'description', 'hitpower']) + assert_col_type(creature_power_table.c.id, Integer) + assert_col_type(creature_power_table.c.creature, Integer) + assert creature_power_table.c.creature.nullable is False + assert_col_type(creature_power_table.c.name, VARCHAR) + assert_col_type(creature_power_table.c.description, VARCHAR) + assert_col_type(creature_power_table.c.hitpower, Float) + assert creature_power_table.c.hitpower.nullable is False + + # Check the structure of the level table + level_table = Table( + 'level', metadata, + autoload=True, autoload_with=engine) + assert set(level_table.c.keys()) == set( + ['id', 'name', 'description']) + assert_col_type(level_table.c.id, VARCHAR) + assert level_table.c.id.primary_key is True + assert_col_type(level_table.c.name, VARCHAR) + assert_col_type(level_table.c.description, VARCHAR) + + # Check the structure of the level_exits table + level_exit_table = Table( + 'level_exit', metadata, + autoload=True, autoload_with=engine) + assert set(level_exit_table.c.keys()) == set( + ['id', 'name', 'from_level', 'to_level']) + assert_col_type(level_exit_table.c.id, Integer) + assert_col_type(level_exit_table.c.name, VARCHAR) + assert_col_type(level_exit_table.c.from_level, VARCHAR) + assert level_exit_table.c.from_level.nullable is False + #assert level_exit_table.c.from_level.index is True + assert_col_type(level_exit_table.c.to_level, VARCHAR) + assert level_exit_table.c.to_level.nullable is False + #assert level_exit_table.c.to_level.index is True + + # Now check to see if stuff seems to be in there. + session = Session() + creature = session.query(Creature3).filter_by( + name=u'centipede').one() + assert creature.num_limbs == 100.0 + assert creature.magical_powers == [] + + creature = session.query(Creature3).filter_by( + name=u'wolf').one() + assert creature.num_limbs == 4.0 + assert creature.magical_powers == [] + + creature = session.query(Creature3).filter_by( + name=u'wizardsnake').one() + assert creature.num_limbs == 0.0 + assert creature.magical_powers == [] + + level = session.query(Level3).filter_by( + id=u'necroplex').one() + assert level.name == u'The Necroplex' + assert level.description == u'A complex full of pure deathzone.' + level_exits = _get_level3_exits(session, level) + assert level_exits == { + u'deathwell': u'evilstorm', + u'portal': u'central_park'} + + level = session.query(Level3).filter_by( + id=u'evilstorm').one() + assert level.name == u'Evil Storm' + assert level.description == u'A storm full of pure evil.' + level_exits = _get_level3_exits(session, level) + assert level_exits == {} # You still can't escape the evilstorm! + + level = session.query(Level3).filter_by( + id=u'central_park').one() + assert level.name == u'Central Park, NY, NY' + assert level.description == u"New York's friendly Central Park." + level_exits = _get_level3_exits(session, level) + assert level_exits == { + 'portal': 'necroplex'} + + +#def test_set2_to_set3(): + # Create / connect to database + # Create tables by migrating on empty initial set + + # Install the initial set + # Check version in database + # Sanity check a few things in the database + + # Migrate + # Make sure version matches expected + # Check all things in database match expected + # pass + + +#def test_set1_to_set2_to_set3(): + # Create / connect to database + # Create tables by migrating on empty initial set + + # Install the initial set + # Check version in database + # Sanity check a few things in the database + + # Migrate + # Make sure version matches expected + # Check all things in database match expected + + # Migrate again + # Make sure version matches expected again + # Check all things in database match expected again + + ##### Set2 + # creature_table = Table( + # 'creature', metadata, + # autoload=True, autoload_with=db_conn.bind) + # assert set(creature_table.c.keys()) == set( + # ['id', 'name', 'num_legs']) + # assert_col_type(creature_table.c.id, Integer) + # assert_col_type(creature_table.c.name, VARCHAR) + # assert creature_table.c.name.nullable is False + # assert creature_table.c.name.index is True + # assert creature_table.c.name.unique is True + # assert_col_type(creature_table.c.num_legs, Integer) + # assert creature_table.c.num_legs.nullable is False + + # # Check the CreaturePower table + # creature_power_table = Table( + # 'creature_power', metadata, + # autoload=True, autoload_with=db_conn.bind) + # assert set(creature_power_table.c.keys()) == set( + # ['id', 'creature', 'name', 'description', 'hitpower']) + # assert_col_type(creature_power_table.c.id, Integer) + # assert_col_type(creature_power_table.c.creature, Integer) + # assert creature_power_table.c.creature.nullable is False + # assert_col_type(creature_power_table.c.name, VARCHAR) + # assert_col_type(creature_power_table.c.description, VARCHAR) + # assert_col_type(creature_power_table.c.hitpower, Integer) + # assert creature_power_table.c.hitpower.nullable is False + + # # Check the structure of the level table + # level_table = Table( + # 'level', metadata, + # autoload=True, autoload_with=db_conn.bind) + # assert set(level_table.c.keys()) == set( + # ['id', 'name', 'description']) + # assert_col_type(level_table.c.id, VARCHAR) + # assert level_table.c.id.primary_key is True + # assert_col_type(level_table.c.name, VARCHAR) + # assert_col_type(level_table.c.description, VARCHAR) + + # # Check the structure of the level_exits table + # level_exit_table = Table( + # 'level_exit', metadata, + # autoload=True, autoload_with=db_conn.bind) + # assert set(level_exit_table.c.keys()) == set( + # ['id', 'name', 'from_level', 'to_level']) + # assert_col_type(level_exit_table.c.id, Integer) + # assert_col_type(level_exit_table.c.name, VARCHAR) + # assert_col_type(level_exit_table.c.from_level, VARCHAR) + # assert level_exit_table.c.from_level.nullable is False + # assert_col_type(level_exit_table.c.to_level, VARCHAR) + + # pass diff --git a/mediagoblin/tests/test_submission.py b/mediagoblin/tests/test_submission.py index 702741b4..8bf7d13c 100644 --- a/mediagoblin/tests/test_submission.py +++ b/mediagoblin/tests/test_submission.py @@ -15,30 +15,35 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import urlparse -import pkg_resources +import os import re from nose.tools import assert_equal, assert_true, assert_false +from pkg_resources import resource_filename -from mediagoblin.tests.tools import setup_fresh_app, get_test_app, \ +from mediagoblin.tests.tools import get_test_app, \ fixture_add_user from mediagoblin import mg_globals -from mediagoblin.tools import template, common - -GOOD_JPG = pkg_resources.resource_filename( - 'mediagoblin.tests', 'test_submission/good.jpg') -GOOD_PNG = pkg_resources.resource_filename( - 'mediagoblin.tests', 'test_submission/good.png') -EVIL_FILE = pkg_resources.resource_filename( - 'mediagoblin.tests', 'test_submission/evil') -EVIL_JPG = pkg_resources.resource_filename( - 'mediagoblin.tests', 'test_submission/evil.jpg') -EVIL_PNG = pkg_resources.resource_filename( - 'mediagoblin.tests', 'test_submission/evil.png') +from mediagoblin.tools import template + + +def resource(filename): + return resource_filename('mediagoblin.tests', 'test_submission/' + filename) + + +GOOD_JPG = resource('good.jpg') +GOOD_PNG = resource('good.png') +EVIL_FILE = resource('evil') +EVIL_JPG = resource('evil.jpg') +EVIL_PNG = resource('evil.png') +BIG_BLUE = resource('bigblue.png') GOOD_TAG_STRING = 'yin,yang' BAD_TAG_STRING = 'rage,' + 'f' * 26 + 'u' * 26 +FORM_CONTEXT = ['mediagoblin/submit/start.html', 'submit_form'] +REQUEST_CONTEXT = ['mediagoblin/user_pages/user.html', 'request'] + class TestSubmission: def setUp(self): @@ -61,163 +66,130 @@ class TestSubmission: def logout(self): self.test_app.get('/auth/logout/') + def do_post(self, data, *context_keys, **kwargs): + url = kwargs.pop('url', '/submit/') + do_follow = kwargs.pop('do_follow', False) + template.clear_test_template_context() + response = self.test_app.post(url, data, **kwargs) + if do_follow: + response.follow() + context_data = template.TEMPLATE_TEST_CONTEXT + for key in context_keys: + context_data = context_data[key] + return response, context_data + + def upload_data(self, filename): + return {'upload_files': [('file', filename)]} + + def check_comments(self, request, media, count): + comments = request.db.MediaComment.find({'media_entry': media._id}) + assert_equal(count, len(list(comments))) + def test_missing_fields(self): # Test blank form # --------------- - template.clear_test_template_context() - response = self.test_app.post( - '/submit/', {}) - context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html'] - form = context['submit_form'] - assert form.file.errors == [u'You must provide a file.'] + response, form = self.do_post({}, *FORM_CONTEXT) + assert_equal(form.file.errors, [u'You must provide a file.']) # Test blank file # --------------- - template.clear_test_template_context() - response = self.test_app.post( - '/submit/', { - 'title': 'test title'}) - context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html'] - form = context['submit_form'] - assert form.file.errors == [u'You must provide a file.'] - - - def test_normal_uploads(self): - # Test JPG - # -------- - template.clear_test_template_context() - response = self.test_app.post( - '/submit/', { - 'title': 'Normal upload 1' - }, upload_files=[( - 'file', GOOD_JPG)]) + response, form = self.do_post({'title': 'test title'}, *FORM_CONTEXT) + assert_equal(form.file.errors, [u'You must provide a file.']) - # User should be redirected - response.follow() - assert_equal( - urlparse.urlsplit(response.location)[2], - '/u/chris/') - assert template.TEMPLATE_TEST_CONTEXT.has_key( - 'mediagoblin/user_pages/user.html') + def check_url(self, response, path): + assert_equal(urlparse.urlsplit(response.location)[2], path) + def check_normal_upload(self, title, filename): + response, context = self.do_post({'title': title}, do_follow=True, + **self.upload_data(filename)) + self.check_url(response, '/u/{0}/'.format(self.test_user.username)) + assert_true('mediagoblin/user_pages/user.html' in context) # Make sure the media view is at least reachable, logged in... - self.test_app.get('/u/chris/m/normal-upload-1/') + url = '/u/{0}/m/{1}/'.format(self.test_user.username, + title.lower().replace(' ', '-')) + self.test_app.get(url) # ... and logged out too. self.logout() - self.test_app.get('/u/chris/m/normal-upload-1/') - # Log back in for the remaining tests. - self.login() + self.test_app.get(url) - # Test PNG - # -------- - template.clear_test_template_context() - response = self.test_app.post( - '/submit/', { - 'title': 'Normal upload 2' - }, upload_files=[( - 'file', GOOD_PNG)]) + def test_normal_jpg(self): + self.check_normal_upload('Normal upload 1', GOOD_JPG) - response.follow() - assert_equal( - urlparse.urlsplit(response.location)[2], - '/u/chris/') - assert template.TEMPLATE_TEST_CONTEXT.has_key( - 'mediagoblin/user_pages/user.html') + def test_normal_png(self): + self.check_normal_upload('Normal upload 2', GOOD_PNG) + + def check_media(self, request, find_data, count=None): + media = request.db.MediaEntry.find(find_data) + if count is not None: + assert_equal(media.count(), count) + if count == 0: + return + return media[0] def test_tags(self): # Good tag string # -------- - template.clear_test_template_context() - response = self.test_app.post( - '/submit/', { - 'title': 'Balanced Goblin', - 'tags': GOOD_TAG_STRING - }, upload_files=[( - 'file', GOOD_JPG)]) - - # New media entry with correct tags should be created - response.follow() - context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/user_pages/user.html'] - request = context['request'] - media = request.db.MediaEntry.find({'title': 'Balanced Goblin'})[0] - assert_equal(media['tags'], + response, request = self.do_post({'title': 'Balanced Goblin', + 'tags': GOOD_TAG_STRING}, + *REQUEST_CONTEXT, do_follow=True, + **self.upload_data(GOOD_JPG)) + media = self.check_media(request, {'title': 'Balanced Goblin'}, 1) + assert_equal(media.tags, [{'name': u'yin', 'slug': u'yin'}, - {'name': u'yang', 'slug': u'yang'}]) + {'name': u'yang', 'slug': u'yang'}]) # Test tags that are too long # --------------- - template.clear_test_template_context() - response = self.test_app.post( - '/submit/', { - 'title': 'Balanced Goblin', - 'tags': BAD_TAG_STRING - }, upload_files=[( - 'file', GOOD_JPG)]) - - # Too long error should be raised - context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html'] - form = context['submit_form'] - assert form.tags.errors == [ - u'Tags must be shorter than 50 characters. Tags that are too long'\ - ': ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu'] + response, form = self.do_post({'title': 'Balanced Goblin', + 'tags': BAD_TAG_STRING}, + *FORM_CONTEXT, + **self.upload_data(GOOD_JPG)) + assert_equal(form.tags.errors, [ + u'Tags must be shorter than 50 characters. ' \ + 'Tags that are too long: ' \ + 'ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu']) def test_delete(self): - template.clear_test_template_context() - response = self.test_app.post( - '/submit/', { - 'title': 'Balanced Goblin', - }, upload_files=[( - 'file', GOOD_JPG)]) - - # Post image - response.follow() - - request = template.TEMPLATE_TEST_CONTEXT[ - 'mediagoblin/user_pages/user.html']['request'] - - media = request.db.MediaEntry.find({'title': 'Balanced Goblin'})[0] - - # Does media entry exist? - assert_true(media) + response, request = self.do_post({'title': 'Balanced Goblin'}, + *REQUEST_CONTEXT, do_follow=True, + **self.upload_data(GOOD_JPG)) + media = self.check_media(request, {'title': 'Balanced Goblin'}, 1) + + # Add a comment, so we can test for its deletion later. + self.check_comments(request, media, 0) + comment_url = request.urlgen( + 'mediagoblin.user_pages.media_post_comment', + user=self.test_user.username, media=media._id) + response = self.do_post({'comment_content': 'i love this test'}, + url=comment_url, do_follow=True)[0] + self.check_comments(request, media, 1) # Do not confirm deletion # --------------------------------------------------- - response = self.test_app.post( - request.urlgen('mediagoblin.user_pages.media_confirm_delete', - # No work: user=media.uploader().username, - user=self.test_user.username, - media=media._id), - # no value means no confirm - {}) - - response.follow() - - request = template.TEMPLATE_TEST_CONTEXT[ - 'mediagoblin/user_pages/user.html']['request'] - - media = request.db.MediaEntry.find({'title': 'Balanced Goblin'})[0] - - # Does media entry still exist? - assert_true(media) + delete_url = request.urlgen( + 'mediagoblin.user_pages.media_confirm_delete', + user=self.test_user.username, media=media._id) + # Empty data means don't confirm + response = self.do_post({}, do_follow=True, url=delete_url)[0] + media = self.check_media(request, {'title': 'Balanced Goblin'}, 1) # Confirm deletion # --------------------------------------------------- - response = self.test_app.post( - request.urlgen('mediagoblin.user_pages.media_confirm_delete', - # No work: user=media.uploader().username, - user=self.test_user.username, - media=media._id), - {'confirm': 'y'}) - - response.follow() - - request = template.TEMPLATE_TEST_CONTEXT[ - 'mediagoblin/user_pages/user.html']['request'] + response, request = self.do_post({'confirm': 'y'}, *REQUEST_CONTEXT, + do_follow=True, url=delete_url) + self.check_media(request, {'_id': media._id}, 0) + self.check_comments(request, media, 0) - # Does media entry still exist? - assert_false( - request.db.MediaEntry.find( - {'_id': media._id}).count()) + def test_evil_file(self): + # Test non-suppoerted file with non-supported extension + # ----------------------------------------------------- + response, form = self.do_post({'title': 'Malicious Upload 1'}, + *FORM_CONTEXT, + **self.upload_data(EVIL_FILE)) + assert_equal(len(form.file.errors), 1) + assert_true(re.match( + r'^Could not extract any file extension from ".*?"$', + str(form.file.errors[0]))) def test_sniffing(self): ''' @@ -241,62 +213,43 @@ class TestSubmission: assert media.media_type == 'mediagoblin.media_types.image' - def test_malicious_uploads(self): - # Test non-suppoerted file with non-supported extension - # ----------------------------------------------------- - template.clear_test_template_context() - response = self.test_app.post( - '/submit/', { - 'title': 'Malicious Upload 1' - }, upload_files=[( - 'file', EVIL_FILE)]) - - context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html'] - form = context['submit_form'] - assert 'Sorry, I don\'t support that file type :(' == \ - str(form.file.errors[0]) - assert len(form.file.errors) == 1 - + def check_false_image(self, title, filename): # NOTE: The following 2 tests will ultimately fail, but they # *will* pass the initial form submission step. Instead, # they'll be caught as failures during the processing step. + response, context = self.do_post({'title': title}, do_follow=True, + **self.upload_data(filename)) + self.check_url(response, '/u/{0}/'.format(self.test_user.username)) + entry = mg_globals.database.MediaEntry.find_one({'title': title}) + assert_equal(entry.state, 'failed') + assert_equal(entry.fail_error, u'mediagoblin.processing:BadMediaFail') + def test_evil_jpg(self): # Test non-supported file with .jpg extension # ------------------------------------------- - template.clear_test_template_context() - response = self.test_app.post( - '/submit/', { - 'title': 'Malicious Upload 2' - }, upload_files=[( - 'file', EVIL_JPG)]) - response.follow() - assert_equal( - urlparse.urlsplit(response.location)[2], - '/u/chris/') - - entry = mg_globals.database.MediaEntry.find_one( - {'title': 'Malicious Upload 2'}) - assert_equal(entry.state, 'failed') - assert_equal( - entry['fail_error'], - u'mediagoblin.processing:BadMediaFail') + self.check_false_image('Malicious Upload 2', EVIL_JPG) + def test_evil_png(self): # Test non-supported file with .png extension # ------------------------------------------- - template.clear_test_template_context() - response = self.test_app.post( - '/submit/', { - 'title': 'Malicious Upload 3' - }, upload_files=[( - 'file', EVIL_PNG)]) - response.follow() - assert_equal( - urlparse.urlsplit(response.location)[2], - '/u/chris/') - - entry = mg_globals.database.MediaEntry.find_one( - {'title': 'Malicious Upload 3'}) - assert_equal(entry.state, 'failed') - assert_equal( - entry['fail_error'], - u'mediagoblin.processing:BadMediaFail') + self.check_false_image('Malicious Upload 3', EVIL_PNG) + + def test_processing(self): + data = {'title': 'Big Blue'} + response, request = self.do_post(data, *REQUEST_CONTEXT, do_follow=True, + **self.upload_data(BIG_BLUE)) + media = self.check_media(request, data, 1) + last_size = 1024 ** 3 # Needs to be larger than bigblue.png + for key, basename in (('original', 'bigblue.png'), + ('medium', 'bigblue.medium.png'), + ('thumb', 'bigblue.thumbnail.png')): + # Does the processed image have a good filename? + filename = resource_filename( + 'mediagoblin.tests', + os.path.join('test_user_dev/media/public', + *media['media_files'].get(key, []))) + assert_true(filename.endswith('_' + basename)) + # Is it smaller than the last processed image we looked at? + size = os.stat(filename).st_size + assert_true(last_size > size) + last_size = size diff --git a/mediagoblin/tests/test_submission/bigblue.png b/mediagoblin/tests/test_submission/bigblue.png Binary files differnew file mode 100644 index 00000000..2b2c2a44 --- /dev/null +++ b/mediagoblin/tests/test_submission/bigblue.png |