1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
# GNU MediaGoblin -- federated, autonomous media hosting
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from mediagoblin.db.base import Session
from mediagoblin.db.models import (MediaEntry, Tag, MediaTag, Collection, \
User, Privilege, FOUNDATIONS)
##########################
# Random utility functions
##########################
def atomic_update(table, query_dict, update_values):
table.query.filter_by(**query_dict).update(update_values,
synchronize_session=False)
Session.commit()
def check_media_slug_used(uploader_id, slug, ignore_m_id):
query = MediaEntry.query.filter_by(uploader=uploader_id, slug=slug)
if ignore_m_id is not None:
query = query.filter(MediaEntry.id != ignore_m_id)
does_exist = query.first() is not None
return does_exist
def media_entries_for_tag_slug(dummy_db, tag_slug):
return MediaEntry.query \
.join(MediaEntry.tags_helper) \
.join(MediaTag.tag_helper) \
.filter(
(MediaEntry.state == u'processed')
& (Tag.slug == tag_slug))
def clean_orphan_tags(commit=True):
"""Search for unused MediaTags and delete them"""
q1 = Session.query(Tag).outerjoin(MediaTag).filter(MediaTag.id==None)
for t in q1:
Session.delete(t)
# The "let the db do all the work" version:
# q1 = Session.query(Tag.id).outerjoin(MediaTag).filter(MediaTag.id==None)
# q2 = Session.query(Tag).filter(Tag.id.in_(q1))
# q2.delete(synchronize_session = False)
if commit:
Session.commit()
def check_collection_slug_used(creator_id, slug, ignore_c_id):
filt = (Collection.creator == creator_id) \
& (Collection.slug == slug)
if ignore_c_id is not None:
filt = filt & (Collection.id != ignore_c_id)
does_exist = Session.query(Collection.id).filter(filt).first() is not None
return does_exist
def user_privileges_to_dictionary(user_id):
"""
This function accepts a users id and returns a dictionary of True or False
values for each privilege the user does or does not have. This allows for
easier referencing of a user's privileges inside templates.
"""
privilege_dictionary = {}
user = User.query.get(user_id)
users_privileges = [p_item.privilege_name for p_item in user.all_privileges]
for privilege_name in FOUNDATIONS[Privilege]:
privilege_name = privilege_name[0]
if privilege_name in users_privileges:
privilege_dictionary[privilege_name]=True
else:
privilege_dictionary[privilege_name]=False
return privilege_dictionary
if __name__ == '__main__':
from mediagoblin.db.open import setup_connection_and_db_from_config
db = setup_connection_and_db_from_config({'sql_engine':'sqlite:///mediagoblin.db'})
clean_orphan_tags()
|