diff options
Diffstat (limited to 'mediagoblin/tests/test_storage.py')
-rw-r--r-- | mediagoblin/tests/test_storage.py | 321 |
1 files changed, 321 insertions, 0 deletions
diff --git a/mediagoblin/tests/test_storage.py b/mediagoblin/tests/test_storage.py new file mode 100644 index 00000000..f6f1d18f --- /dev/null +++ b/mediagoblin/tests/test_storage.py @@ -0,0 +1,321 @@ +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import os +import tempfile + +import pytest +from werkzeug.utils import secure_filename + +from mediagoblin import storage + + +################ +# Test utilities +################ + +def test_clean_listy_filepath(): + expected = [u'dir1', u'dir2', u'linooks.jpg'] + assert storage.clean_listy_filepath( + ['dir1', 'dir2', 'linooks.jpg']) == expected + + expected = [u'dir1', u'foo_.._nasty', u'linooks.jpg'] + assert storage.clean_listy_filepath( + ['/dir1/', 'foo/../nasty', 'linooks.jpg']) == expected + + expected = [u'etc', u'passwd'] + assert storage.clean_listy_filepath( + ['../../../etc/', 'passwd']) == expected + + with pytest.raises(storage.InvalidFilepath): + storage.clean_listy_filepath(['../../', 'linooks.jpg']) + + +class FakeStorageSystem(): + def __init__(self, foobie, blech, **kwargs): + self.foobie = foobie + self.blech = blech + +class FakeRemoteStorage(storage.filestorage.BasicFileStorage): + # Theoretically despite this, all the methods should work but it + # should force copying to the workbench + local_storage = False + + def copy_local_to_storage(self, *args, **kwargs): + return storage.StorageInterface.copy_local_to_storage( + self, *args, **kwargs) + + +def test_storage_system_from_config(): + this_storage = storage.storage_system_from_config( + {'base_url': 'http://example.org/moodia/', + 'base_dir': '/tmp/', + 'garbage_arg': 'garbage_arg', + 'garbage_arg': 'trash'}) + assert this_storage.base_url == 'http://example.org/moodia/' + assert this_storage.base_dir == '/tmp/' + assert this_storage.__class__ is storage.filestorage.BasicFileStorage + + this_storage = storage.storage_system_from_config( + {'foobie': 'eiboof', + 'blech': 'hcelb', + 'garbage_arg': 'garbage_arg', + 'storage_class': + 'mediagoblin.tests.test_storage:FakeStorageSystem'}) + assert this_storage.foobie == 'eiboof' + assert this_storage.blech == 'hcelb' + assert unicode(this_storage.__class__) == \ + u'mediagoblin.tests.test_storage.FakeStorageSystem' + + +########################## +# Basic file storage tests +########################## + +def get_tmp_filestorage(mount_url=None, fake_remote=False): + tmpdir = tempfile.mkdtemp(prefix="test_gmg_storage") + if fake_remote: + this_storage = FakeRemoteStorage(tmpdir, mount_url) + else: + this_storage = storage.filestorage.BasicFileStorage(tmpdir, mount_url) + return tmpdir, this_storage + + +def cleanup_storage(this_storage, tmpdir, *paths): + for p in paths: + while p: + assert this_storage.delete_dir(p) == True + p.pop(-1) + os.rmdir(tmpdir) + + +def test_basic_storage__resolve_filepath(): + tmpdir, this_storage = get_tmp_filestorage() + + result = this_storage._resolve_filepath(['dir1', 'dir2', 'filename.jpg']) + assert result == os.path.join( + tmpdir, 'dir1/dir2/filename.jpg') + + result = this_storage._resolve_filepath(['../../etc/', 'passwd']) + assert result == os.path.join( + tmpdir, 'etc/passwd') + + pytest.raises( + storage.InvalidFilepath, + this_storage._resolve_filepath, + ['../../', 'etc', 'passwd']) + + cleanup_storage(this_storage, tmpdir) + + +def test_basic_storage_file_exists(): + tmpdir, this_storage = get_tmp_filestorage() + + os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2')) + filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt') + with open(filename, 'w') as ourfile: + ourfile.write("I'm having a lovely day!") + + assert this_storage.file_exists(['dir1', 'dir2', 'filename.txt']) + assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol']) + assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol']) + + this_storage.delete_file(['dir1', 'dir2', 'filename.txt']) + cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) + + +def test_basic_storage_get_unique_filepath(): + tmpdir, this_storage = get_tmp_filestorage() + + # write something that exists + os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2')) + filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt') + with open(filename, 'w') as ourfile: + ourfile.write("I'm having a lovely day!") + + # now we want something new, with the same name! + new_filepath = this_storage.get_unique_filepath( + ['dir1', 'dir2', 'filename.txt']) + assert new_filepath[:-1] == [u'dir1', u'dir2'] + + new_filename = new_filepath[-1] + assert new_filename.endswith('filename.txt') + assert len(new_filename) > len('filename.txt') + assert new_filename == secure_filename(new_filename) + + os.remove(filename) + cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) + + +def test_basic_storage_get_file(): + tmpdir, this_storage = get_tmp_filestorage() + + # Write a brand new file + filepath = ['dir1', 'dir2', 'ourfile.txt'] + + with this_storage.get_file(filepath, 'w') as our_file: + our_file.write('First file') + with this_storage.get_file(filepath, 'r') as our_file: + assert our_file.read() == 'First file' + assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) + with file(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file: + assert our_file.read() == 'First file' + + # Write to the same path but try to get a unique file. + new_filepath = this_storage.get_unique_filepath(filepath) + assert not os.path.exists(os.path.join(tmpdir, *new_filepath)) + + with this_storage.get_file(new_filepath, 'w') as our_file: + our_file.write('Second file') + with this_storage.get_file(new_filepath, 'r') as our_file: + assert our_file.read() == 'Second file' + assert os.path.exists(os.path.join(tmpdir, *new_filepath)) + with file(os.path.join(tmpdir, *new_filepath), 'r') as our_file: + assert our_file.read() == 'Second file' + + # Read from an existing file + manually_written_file = os.makedirs( + os.path.join(tmpdir, 'testydir')) + with file(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile: + testyfile.write('testy file! so testy.') + + with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile: + assert testyfile.read() == 'testy file! so testy.' + + this_storage.delete_file(filepath) + this_storage.delete_file(new_filepath) + this_storage.delete_file(['testydir', 'testyfile.txt']) + cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'], ['testydir']) + + +def test_basic_storage_delete_file(): + tmpdir, this_storage = get_tmp_filestorage() + + assert not os.path.exists( + os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) + + filepath = ['dir1', 'dir2', 'ourfile.txt'] + with this_storage.get_file(filepath, 'w') as our_file: + our_file.write('Testing this file') + + assert os.path.exists( + os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) + + assert this_storage.delete_dir(['dir1', 'dir2']) == False + this_storage.delete_file(filepath) + assert this_storage.delete_dir(['dir1', 'dir2']) == True + + assert not os.path.exists( + os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) + + cleanup_storage(this_storage, tmpdir, ['dir1']) + + +def test_basic_storage_url_for_file(): + # Not supplying a base_url should actually just bork. + tmpdir, this_storage = get_tmp_filestorage() + pytest.raises( + storage.NoWebServing, + this_storage.file_url, + ['dir1', 'dir2', 'filename.txt']) + cleanup_storage(this_storage, tmpdir) + + # base_url without domain + tmpdir, this_storage = get_tmp_filestorage('/media/') + result = this_storage.file_url( + ['dir1', 'dir2', 'filename.txt']) + expected = '/media/dir1/dir2/filename.txt' + assert result == expected + cleanup_storage(this_storage, tmpdir) + + # base_url with domain + tmpdir, this_storage = get_tmp_filestorage( + 'http://media.example.org/ourmedia/') + result = this_storage.file_url( + ['dir1', 'dir2', 'filename.txt']) + expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt' + assert result == expected + cleanup_storage(this_storage, tmpdir) + + +def test_basic_storage_get_local_path(): + tmpdir, this_storage = get_tmp_filestorage() + + result = this_storage.get_local_path( + ['dir1', 'dir2', 'filename.txt']) + + expected = os.path.join( + tmpdir, 'dir1/dir2/filename.txt') + + assert result == expected + + cleanup_storage(this_storage, tmpdir) + + +def test_basic_storage_is_local(): + tmpdir, this_storage = get_tmp_filestorage() + assert this_storage.local_storage is True + cleanup_storage(this_storage, tmpdir) + + +def test_basic_storage_copy_locally(): + tmpdir, this_storage = get_tmp_filestorage() + + dest_tmpdir = tempfile.mkdtemp() + + filepath = ['dir1', 'dir2', 'ourfile.txt'] + with this_storage.get_file(filepath, 'w') as our_file: + our_file.write('Testing this file') + + new_file_dest = os.path.join(dest_tmpdir, 'file2.txt') + + this_storage.copy_locally(filepath, new_file_dest) + this_storage.delete_file(filepath) + + assert file(new_file_dest).read() == 'Testing this file' + + os.remove(new_file_dest) + os.rmdir(dest_tmpdir) + cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) + + +def _test_copy_local_to_storage_works(tmpdir, this_storage): + local_filename = tempfile.mktemp() + with file(local_filename, 'w') as tmpfile: + tmpfile.write('haha') + + this_storage.copy_local_to_storage( + local_filename, ['dir1', 'dir2', 'copiedto.txt']) + + os.remove(local_filename) + + assert file( + os.path.join(tmpdir, 'dir1/dir2/copiedto.txt'), + 'r').read() == 'haha' + + this_storage.delete_file(['dir1', 'dir2', 'copiedto.txt']) + cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) + + +def test_basic_storage_copy_local_to_storage(): + tmpdir, this_storage = get_tmp_filestorage() + _test_copy_local_to_storage_works(tmpdir, this_storage) + + +def test_general_storage_copy_local_to_storage(): + tmpdir, this_storage = get_tmp_filestorage(fake_remote=True) + _test_copy_local_to_storage_works(tmpdir, this_storage) |