diff options
author | James Taylor <user234683@users.noreply.github.com> | 2019-02-16 23:41:52 -0800 |
---|---|---|
committer | James Taylor <user234683@users.noreply.github.com> | 2019-02-16 23:41:52 -0800 |
commit | 3905e7e64059b45479894ba1fdfb0ef9cef64475 (patch) | |
tree | 4c5dbbfd204d0351cac8412cc87a65fea49c1a52 /python/atoma/json_feed.py | |
parent | 24642455d0dc5841ddec99f456598c4f763c1e8a (diff) | |
download | yt-local-3905e7e64059b45479894ba1fdfb0ef9cef64475.tar.lz yt-local-3905e7e64059b45479894ba1fdfb0ef9cef64475.tar.xz yt-local-3905e7e64059b45479894ba1fdfb0ef9cef64475.zip |
basic subscriptions system
Diffstat (limited to 'python/atoma/json_feed.py')
-rw-r--r-- | python/atoma/json_feed.py | 223 |
1 files changed, 223 insertions, 0 deletions
diff --git a/python/atoma/json_feed.py b/python/atoma/json_feed.py new file mode 100644 index 0000000..410ff4a --- /dev/null +++ b/python/atoma/json_feed.py @@ -0,0 +1,223 @@ +from datetime import datetime, timedelta +import json +from typing import Optional, List + +import attr + +from .exceptions import FeedParseError, FeedJSONError +from .utils import try_parse_date + + +@attr.s +class JSONFeedAuthor: + + name: Optional[str] = attr.ib() + url: Optional[str] = attr.ib() + avatar: Optional[str] = attr.ib() + + +@attr.s +class JSONFeedAttachment: + + url: str = attr.ib() + mime_type: str = attr.ib() + title: Optional[str] = attr.ib() + size_in_bytes: Optional[int] = attr.ib() + duration: Optional[timedelta] = attr.ib() + + +@attr.s +class JSONFeedItem: + + id_: str = attr.ib() + url: Optional[str] = attr.ib() + external_url: Optional[str] = attr.ib() + title: Optional[str] = attr.ib() + content_html: Optional[str] = attr.ib() + content_text: Optional[str] = attr.ib() + summary: Optional[str] = attr.ib() + image: Optional[str] = attr.ib() + banner_image: Optional[str] = attr.ib() + date_published: Optional[datetime] = attr.ib() + date_modified: Optional[datetime] = attr.ib() + author: Optional[JSONFeedAuthor] = attr.ib() + + tags: List[str] = attr.ib() + attachments: List[JSONFeedAttachment] = attr.ib() + + +@attr.s +class JSONFeed: + + version: str = attr.ib() + title: str = attr.ib() + home_page_url: Optional[str] = attr.ib() + feed_url: Optional[str] = attr.ib() + description: Optional[str] = attr.ib() + user_comment: Optional[str] = attr.ib() + next_url: Optional[str] = attr.ib() + icon: Optional[str] = attr.ib() + favicon: Optional[str] = attr.ib() + author: Optional[JSONFeedAuthor] = attr.ib() + expired: bool = attr.ib() + + items: List[JSONFeedItem] = attr.ib() + + +def _get_items(root: dict) -> List[JSONFeedItem]: + rv = [] + items = root.get('items', []) + if not items: + return rv + + for item in items: + rv.append(_get_item(item)) + + return rv + + +def _get_item(item_dict: dict) -> JSONFeedItem: + return JSONFeedItem( + id_=_get_text(item_dict, 'id', optional=False), + url=_get_text(item_dict, 'url'), + external_url=_get_text(item_dict, 'external_url'), + title=_get_text(item_dict, 'title'), + content_html=_get_text(item_dict, 'content_html'), + content_text=_get_text(item_dict, 'content_text'), + summary=_get_text(item_dict, 'summary'), + image=_get_text(item_dict, 'image'), + banner_image=_get_text(item_dict, 'banner_image'), + date_published=_get_datetime(item_dict, 'date_published'), + date_modified=_get_datetime(item_dict, 'date_modified'), + author=_get_author(item_dict), + tags=_get_tags(item_dict, 'tags'), + attachments=_get_attachments(item_dict, 'attachments') + ) + + +def _get_attachments(root, name) -> List[JSONFeedAttachment]: + rv = list() + for attachment_dict in root.get(name, []): + rv.append(JSONFeedAttachment( + _get_text(attachment_dict, 'url', optional=False), + _get_text(attachment_dict, 'mime_type', optional=False), + _get_text(attachment_dict, 'title'), + _get_int(attachment_dict, 'size_in_bytes'), + _get_duration(attachment_dict, 'duration_in_seconds') + )) + return rv + + +def _get_tags(root, name) -> List[str]: + tags = root.get(name, []) + return [tag for tag in tags if isinstance(tag, str)] + + +def _get_datetime(root: dict, name, optional: bool=True) -> Optional[datetime]: + text = _get_text(root, name, optional) + if text is None: + return None + + return try_parse_date(text) + + +def _get_expired(root: dict) -> bool: + if root.get('expired') is True: + return True + + return False + + +def _get_author(root: dict) -> Optional[JSONFeedAuthor]: + author_dict = root.get('author') + if not author_dict: + return None + + rv = JSONFeedAuthor( + name=_get_text(author_dict, 'name'), + url=_get_text(author_dict, 'url'), + avatar=_get_text(author_dict, 'avatar'), + ) + if rv.name is None and rv.url is None and rv.avatar is None: + return None + + return rv + + +def _get_int(root: dict, name: str, optional: bool=True) -> Optional[int]: + rv = root.get(name) + if not optional and rv is None: + raise FeedParseError('Could not parse feed: "{}" int is required but ' + 'is empty'.format(name)) + + if optional and rv is None: + return None + + if not isinstance(rv, int): + raise FeedParseError('Could not parse feed: "{}" is not an int' + .format(name)) + + return rv + + +def _get_duration(root: dict, name: str, + optional: bool=True) -> Optional[timedelta]: + duration = _get_int(root, name, optional) + if duration is None: + return None + + return timedelta(seconds=duration) + + +def _get_text(root: dict, name: str, optional: bool=True) -> Optional[str]: + rv = root.get(name) + if not optional and rv is None: + raise FeedParseError('Could not parse feed: "{}" text is required but ' + 'is empty'.format(name)) + + if optional and rv is None: + return None + + if not isinstance(rv, str): + raise FeedParseError('Could not parse feed: "{}" is not a string' + .format(name)) + + return rv + + +def parse_json_feed(root: dict) -> JSONFeed: + return JSONFeed( + version=_get_text(root, 'version', optional=False), + title=_get_text(root, 'title', optional=False), + home_page_url=_get_text(root, 'home_page_url'), + feed_url=_get_text(root, 'feed_url'), + description=_get_text(root, 'description'), + user_comment=_get_text(root, 'user_comment'), + next_url=_get_text(root, 'next_url'), + icon=_get_text(root, 'icon'), + favicon=_get_text(root, 'favicon'), + author=_get_author(root), + expired=_get_expired(root), + items=_get_items(root) + ) + + +def parse_json_feed_file(filename: str) -> JSONFeed: + """Parse a JSON feed from a local json file.""" + with open(filename) as f: + try: + root = json.load(f) + except json.decoder.JSONDecodeError: + raise FeedJSONError('Not a valid JSON document') + + return parse_json_feed(root) + + +def parse_json_feed_bytes(data: bytes) -> JSONFeed: + """Parse a JSON feed from a byte-string containing JSON data.""" + try: + root = json.loads(data) + except json.decoder.JSONDecodeError: + raise FeedJSONError('Not a valid JSON document') + + return parse_json_feed(root) |