diff options
author | James Taylor <user234683@users.noreply.github.com> | 2019-02-16 23:41:52 -0800 |
---|---|---|
committer | James Taylor <user234683@users.noreply.github.com> | 2019-02-16 23:41:52 -0800 |
commit | 3905e7e64059b45479894ba1fdfb0ef9cef64475 (patch) | |
tree | 4c5dbbfd204d0351cac8412cc87a65fea49c1a52 /python/atoma | |
parent | 24642455d0dc5841ddec99f456598c4f763c1e8a (diff) | |
download | yt-local-3905e7e64059b45479894ba1fdfb0ef9cef64475.tar.lz yt-local-3905e7e64059b45479894ba1fdfb0ef9cef64475.tar.xz yt-local-3905e7e64059b45479894ba1fdfb0ef9cef64475.zip |
basic subscriptions system
Diffstat (limited to 'python/atoma')
-rw-r--r-- | python/atoma/__init__.py | 12 | ||||
-rw-r--r-- | python/atoma/atom.py | 284 | ||||
-rw-r--r-- | python/atoma/const.py | 1 | ||||
-rw-r--r-- | python/atoma/exceptions.py | 14 | ||||
-rw-r--r-- | python/atoma/json_feed.py | 223 | ||||
-rw-r--r-- | python/atoma/opml.py | 107 | ||||
-rw-r--r-- | python/atoma/rss.py | 221 | ||||
-rw-r--r-- | python/atoma/simple.py | 224 | ||||
-rw-r--r-- | python/atoma/utils.py | 84 |
9 files changed, 1170 insertions, 0 deletions
diff --git a/python/atoma/__init__.py b/python/atoma/__init__.py new file mode 100644 index 0000000..0768081 --- /dev/null +++ b/python/atoma/__init__.py @@ -0,0 +1,12 @@ +from .atom import parse_atom_file, parse_atom_bytes +from .rss import parse_rss_file, parse_rss_bytes +from .json_feed import ( + parse_json_feed, parse_json_feed_file, parse_json_feed_bytes +) +from .opml import parse_opml_file, parse_opml_bytes +from .exceptions import ( + FeedParseError, FeedDocumentError, FeedXMLError, FeedJSONError +) +from .const import VERSION + +__version__ = VERSION diff --git a/python/atoma/atom.py b/python/atoma/atom.py new file mode 100644 index 0000000..d4e676c --- /dev/null +++ b/python/atoma/atom.py @@ -0,0 +1,284 @@ +from datetime import datetime +import enum +from io import BytesIO +from typing import Optional, List +from xml.etree.ElementTree import Element + +import attr + +from .utils import ( + parse_xml, get_child, get_text, get_datetime, FeedParseError, ns +) + + +class AtomTextType(enum.Enum): + text = "text" + html = "html" + xhtml = "xhtml" + + +@attr.s +class AtomTextConstruct: + text_type: str = attr.ib() + lang: Optional[str] = attr.ib() + value: str = attr.ib() + + +@attr.s +class AtomEntry: + title: AtomTextConstruct = attr.ib() + id_: str = attr.ib() + + # Should be mandatory but many feeds use published instead + updated: Optional[datetime] = attr.ib() + + authors: List['AtomPerson'] = attr.ib() + contributors: List['AtomPerson'] = attr.ib() + links: List['AtomLink'] = attr.ib() + categories: List['AtomCategory'] = attr.ib() + published: Optional[datetime] = attr.ib() + rights: Optional[AtomTextConstruct] = attr.ib() + summary: Optional[AtomTextConstruct] = attr.ib() + content: Optional[AtomTextConstruct] = attr.ib() + source: Optional['AtomFeed'] = attr.ib() + + +@attr.s +class AtomFeed: + title: Optional[AtomTextConstruct] = attr.ib() + id_: str = attr.ib() + + # Should be mandatory but many feeds do not include it + updated: Optional[datetime] = attr.ib() + + authors: List['AtomPerson'] = attr.ib() + contributors: List['AtomPerson'] = attr.ib() + links: List['AtomLink'] = attr.ib() + categories: List['AtomCategory'] = attr.ib() + generator: Optional['AtomGenerator'] = attr.ib() + subtitle: Optional[AtomTextConstruct] = attr.ib() + rights: Optional[AtomTextConstruct] = attr.ib() + icon: Optional[str] = attr.ib() + logo: Optional[str] = attr.ib() + + entries: List[AtomEntry] = attr.ib() + + +@attr.s +class AtomPerson: + name: str = attr.ib() + uri: Optional[str] = attr.ib() + email: Optional[str] = attr.ib() + + +@attr.s +class AtomLink: + href: str = attr.ib() + rel: Optional[str] = attr.ib() + type_: Optional[str] = attr.ib() + hreflang: Optional[str] = attr.ib() + title: Optional[str] = attr.ib() + length: Optional[int] = attr.ib() + + +@attr.s +class AtomCategory: + term: str = attr.ib() + scheme: Optional[str] = attr.ib() + label: Optional[str] = attr.ib() + + +@attr.s +class AtomGenerator: + name: str = attr.ib() + uri: Optional[str] = attr.ib() + version: Optional[str] = attr.ib() + + +def _get_generator(element: Element, name, + optional: bool=True) -> Optional[AtomGenerator]: + child = get_child(element, name, optional) + if child is None: + return None + + return AtomGenerator( + child.text.strip(), + child.attrib.get('uri'), + child.attrib.get('version'), + ) + + +def _get_text_construct(element: Element, name, + optional: bool=True) -> Optional[AtomTextConstruct]: + child = get_child(element, name, optional) + if child is None: + return None + + try: + text_type = AtomTextType(child.attrib['type']) + except KeyError: + text_type = AtomTextType.text + + try: + lang = child.lang + except AttributeError: + lang = None + + if child.text is None: + if optional: + return None + + raise FeedParseError( + 'Could not parse atom feed: "{}" text is required but is empty' + .format(name) + ) + + return AtomTextConstruct( + text_type, + lang, + child.text.strip() + ) + + +def _get_person(element: Element) -> Optional[AtomPerson]: + try: + return AtomPerson( + get_text(element, 'feed:name', optional=False), + get_text(element, 'feed:uri'), + get_text(element, 'feed:email') + ) + except FeedParseError: + return None + + +def _get_link(element: Element) -> AtomLink: + length = element.attrib.get('length') + length = int(length) if length else None + return AtomLink( + element.attrib['href'], + element.attrib.get('rel'), + element.attrib.get('type'), + element.attrib.get('hreflang'), + element.attrib.get('title'), + length + ) + + +def _get_category(element: Element) -> AtomCategory: + return AtomCategory( + element.attrib['term'], + element.attrib.get('scheme'), + element.attrib.get('label'), + ) + + +def _get_entry(element: Element, + default_authors: List[AtomPerson]) -> AtomEntry: + root = element + + # Mandatory + title = _get_text_construct(root, 'feed:title') + id_ = get_text(root, 'feed:id') + + # Optional + try: + source = _parse_atom(get_child(root, 'feed:source', optional=False), + parse_entries=False) + except FeedParseError: + source = None + source_authors = [] + else: + source_authors = source.authors + + authors = [_get_person(e) + for e in root.findall('feed:author', ns)] or default_authors + authors = [a for a in authors if a is not None] + authors = authors or default_authors or source_authors + + contributors = [_get_person(e) + for e in root.findall('feed:contributor', ns) if e] + contributors = [c for c in contributors if c is not None] + + links = [_get_link(e) for e in root.findall('feed:link', ns)] + categories = [_get_category(e) for e in root.findall('feed:category', ns)] + + updated = get_datetime(root, 'feed:updated') + published = get_datetime(root, 'feed:published') + rights = _get_text_construct(root, 'feed:rights') + summary = _get_text_construct(root, 'feed:summary') + content = _get_text_construct(root, 'feed:content') + + return AtomEntry( + title, + id_, + updated, + authors, + contributors, + links, + categories, + published, + rights, + summary, + content, + source + ) + + +def _parse_atom(root: Element, parse_entries: bool=True) -> AtomFeed: + # Mandatory + id_ = get_text(root, 'feed:id', optional=False) + + # Optional + title = _get_text_construct(root, 'feed:title') + updated = get_datetime(root, 'feed:updated') + authors = [_get_person(e) + for e in root.findall('feed:author', ns) if e] + authors = [a for a in authors if a is not None] + contributors = [_get_person(e) + for e in root.findall('feed:contributor', ns) if e] + contributors = [c for c in contributors if c is not None] + links = [_get_link(e) + for e in root.findall('feed:link', ns)] + categories = [_get_category(e) + for e in root.findall('feed:category', ns)] + + generator = _get_generator(root, 'feed:generator') + subtitle = _get_text_construct(root, 'feed:subtitle') + rights = _get_text_construct(root, 'feed:rights') + icon = get_text(root, 'feed:icon') + logo = get_text(root, 'feed:logo') + + if parse_entries: + entries = [_get_entry(e, authors) + for e in root.findall('feed:entry', ns)] + else: + entries = [] + + atom_feed = AtomFeed( + title, + id_, + updated, + authors, + contributors, + links, + categories, + generator, + subtitle, + rights, + icon, + logo, + entries + ) + return atom_feed + + +def parse_atom_file(filename: str) -> AtomFeed: + """Parse an Atom feed from a local XML file.""" + root = parse_xml(filename).getroot() + return _parse_atom(root) + + +def parse_atom_bytes(data: bytes) -> AtomFeed: + """Parse an Atom feed from a byte-string containing XML data.""" + root = parse_xml(BytesIO(data)).getroot() + return _parse_atom(root) diff --git a/python/atoma/const.py b/python/atoma/const.py new file mode 100644 index 0000000..d52d0f6 --- /dev/null +++ b/python/atoma/const.py @@ -0,0 +1 @@ +VERSION = '0.0.13' diff --git a/python/atoma/exceptions.py b/python/atoma/exceptions.py new file mode 100644 index 0000000..88170c5 --- /dev/null +++ b/python/atoma/exceptions.py @@ -0,0 +1,14 @@ +class FeedParseError(Exception): + """Document is an invalid feed.""" + + +class FeedDocumentError(Exception): + """Document is not a supported file.""" + + +class FeedXMLError(FeedDocumentError): + """Document is not valid XML.""" + + +class FeedJSONError(FeedDocumentError): + """Document is not valid JSON.""" diff --git a/python/atoma/json_feed.py b/python/atoma/json_feed.py new file mode 100644 index 0000000..410ff4a --- /dev/null +++ b/python/atoma/json_feed.py @@ -0,0 +1,223 @@ +from datetime import datetime, timedelta +import json +from typing import Optional, List + +import attr + +from .exceptions import FeedParseError, FeedJSONError +from .utils import try_parse_date + + +@attr.s +class JSONFeedAuthor: + + name: Optional[str] = attr.ib() + url: Optional[str] = attr.ib() + avatar: Optional[str] = attr.ib() + + +@attr.s +class JSONFeedAttachment: + + url: str = attr.ib() + mime_type: str = attr.ib() + title: Optional[str] = attr.ib() + size_in_bytes: Optional[int] = attr.ib() + duration: Optional[timedelta] = attr.ib() + + +@attr.s +class JSONFeedItem: + + id_: str = attr.ib() + url: Optional[str] = attr.ib() + external_url: Optional[str] = attr.ib() + title: Optional[str] = attr.ib() + content_html: Optional[str] = attr.ib() + content_text: Optional[str] = attr.ib() + summary: Optional[str] = attr.ib() + image: Optional[str] = attr.ib() + banner_image: Optional[str] = attr.ib() + date_published: Optional[datetime] = attr.ib() + date_modified: Optional[datetime] = attr.ib() + author: Optional[JSONFeedAuthor] = attr.ib() + + tags: List[str] = attr.ib() + attachments: List[JSONFeedAttachment] = attr.ib() + + +@attr.s +class JSONFeed: + + version: str = attr.ib() + title: str = attr.ib() + home_page_url: Optional[str] = attr.ib() + feed_url: Optional[str] = attr.ib() + description: Optional[str] = attr.ib() + user_comment: Optional[str] = attr.ib() + next_url: Optional[str] = attr.ib() + icon: Optional[str] = attr.ib() + favicon: Optional[str] = attr.ib() + author: Optional[JSONFeedAuthor] = attr.ib() + expired: bool = attr.ib() + + items: List[JSONFeedItem] = attr.ib() + + +def _get_items(root: dict) -> List[JSONFeedItem]: + rv = [] + items = root.get('items', []) + if not items: + return rv + + for item in items: + rv.append(_get_item(item)) + + return rv + + +def _get_item(item_dict: dict) -> JSONFeedItem: + return JSONFeedItem( + id_=_get_text(item_dict, 'id', optional=False), + url=_get_text(item_dict, 'url'), + external_url=_get_text(item_dict, 'external_url'), + title=_get_text(item_dict, 'title'), + content_html=_get_text(item_dict, 'content_html'), + content_text=_get_text(item_dict, 'content_text'), + summary=_get_text(item_dict, 'summary'), + image=_get_text(item_dict, 'image'), + banner_image=_get_text(item_dict, 'banner_image'), + date_published=_get_datetime(item_dict, 'date_published'), + date_modified=_get_datetime(item_dict, 'date_modified'), + author=_get_author(item_dict), + tags=_get_tags(item_dict, 'tags'), + attachments=_get_attachments(item_dict, 'attachments') + ) + + +def _get_attachments(root, name) -> List[JSONFeedAttachment]: + rv = list() + for attachment_dict in root.get(name, []): + rv.append(JSONFeedAttachment( + _get_text(attachment_dict, 'url', optional=False), + _get_text(attachment_dict, 'mime_type', optional=False), + _get_text(attachment_dict, 'title'), + _get_int(attachment_dict, 'size_in_bytes'), + _get_duration(attachment_dict, 'duration_in_seconds') + )) + return rv + + +def _get_tags(root, name) -> List[str]: + tags = root.get(name, []) + return [tag for tag in tags if isinstance(tag, str)] + + +def _get_datetime(root: dict, name, optional: bool=True) -> Optional[datetime]: + text = _get_text(root, name, optional) + if text is None: + return None + + return try_parse_date(text) + + +def _get_expired(root: dict) -> bool: + if root.get('expired') is True: + return True + + return False + + +def _get_author(root: dict) -> Optional[JSONFeedAuthor]: + author_dict = root.get('author') + if not author_dict: + return None + + rv = JSONFeedAuthor( + name=_get_text(author_dict, 'name'), + url=_get_text(author_dict, 'url'), + avatar=_get_text(author_dict, 'avatar'), + ) + if rv.name is None and rv.url is None and rv.avatar is None: + return None + + return rv + + +def _get_int(root: dict, name: str, optional: bool=True) -> Optional[int]: + rv = root.get(name) + if not optional and rv is None: + raise FeedParseError('Could not parse feed: "{}" int is required but ' + 'is empty'.format(name)) + + if optional and rv is None: + return None + + if not isinstance(rv, int): + raise FeedParseError('Could not parse feed: "{}" is not an int' + .format(name)) + + return rv + + +def _get_duration(root: dict, name: str, + optional: bool=True) -> Optional[timedelta]: + duration = _get_int(root, name, optional) + if duration is None: + return None + + return timedelta(seconds=duration) + + +def _get_text(root: dict, name: str, optional: bool=True) -> Optional[str]: + rv = root.get(name) + if not optional and rv is None: + raise FeedParseError('Could not parse feed: "{}" text is required but ' + 'is empty'.format(name)) + + if optional and rv is None: + return None + + if not isinstance(rv, str): + raise FeedParseError('Could not parse feed: "{}" is not a string' + .format(name)) + + return rv + + +def parse_json_feed(root: dict) -> JSONFeed: + return JSONFeed( + version=_get_text(root, 'version', optional=False), + title=_get_text(root, 'title', optional=False), + home_page_url=_get_text(root, 'home_page_url'), + feed_url=_get_text(root, 'feed_url'), + description=_get_text(root, 'description'), + user_comment=_get_text(root, 'user_comment'), + next_url=_get_text(root, 'next_url'), + icon=_get_text(root, 'icon'), + favicon=_get_text(root, 'favicon'), + author=_get_author(root), + expired=_get_expired(root), + items=_get_items(root) + ) + + +def parse_json_feed_file(filename: str) -> JSONFeed: + """Parse a JSON feed from a local json file.""" + with open(filename) as f: + try: + root = json.load(f) + except json.decoder.JSONDecodeError: + raise FeedJSONError('Not a valid JSON document') + + return parse_json_feed(root) + + +def parse_json_feed_bytes(data: bytes) -> JSONFeed: + """Parse a JSON feed from a byte-string containing JSON data.""" + try: + root = json.loads(data) + except json.decoder.JSONDecodeError: + raise FeedJSONError('Not a valid JSON document') + + return parse_json_feed(root) diff --git a/python/atoma/opml.py b/python/atoma/opml.py new file mode 100644 index 0000000..a73105e --- /dev/null +++ b/python/atoma/opml.py @@ -0,0 +1,107 @@ +from datetime import datetime +from io import BytesIO +from typing import Optional, List +from xml.etree.ElementTree import Element + +import attr + +from .utils import parse_xml, get_text, get_int, get_datetime + + +@attr.s +class OPMLOutline: + text: Optional[str] = attr.ib() + type: Optional[str] = attr.ib() + xml_url: Optional[str] = attr.ib() + description: Optional[str] = attr.ib() + html_url: Optional[str] = attr.ib() + language: Optional[str] = attr.ib() + title: Optional[str] = attr.ib() + version: Optional[str] = attr.ib() + + outlines: List['OPMLOutline'] = attr.ib() + + +@attr.s +class OPML: + title: Optional[str] = attr.ib() + owner_name: Optional[str] = attr.ib() + owner_email: Optional[str] = attr.ib() + date_created: Optional[datetime] = attr.ib() + date_modified: Optional[datetime] = attr.ib() + expansion_state: Optional[str] = attr.ib() + + vertical_scroll_state: Optional[int] = attr.ib() + window_top: Optional[int] = attr.ib() + window_left: Optional[int] = attr.ib() + window_bottom: Optional[int] = attr.ib() + window_right: Optional[int] = attr.ib() + + outlines: List[OPMLOutline] = attr.ib() + + +def _get_outlines(element: Element) -> List[OPMLOutline]: + rv = list() + + for outline in element.findall('outline'): + rv.append(OPMLOutline( + outline.attrib.get('text'), + outline.attrib.get('type'), + outline.attrib.get('xmlUrl'), + outline.attrib.get('description'), + outline.attrib.get('htmlUrl'), + outline.attrib.get('language'), + outline.attrib.get('title'), + outline.attrib.get('version'), + _get_outlines(outline) + )) + + return rv + + +def _parse_opml(root: Element) -> OPML: + head = root.find('head') + body = root.find('body') + + return OPML( + get_text(head, 'title'), + get_text(head, 'ownerName'), + get_text(head, 'ownerEmail'), + get_datetime(head, 'dateCreated'), + get_datetime(head, 'dateModified'), + get_text(head, 'expansionState'), + get_int(head, 'vertScrollState'), + get_int(head, 'windowTop'), + get_int(head, 'windowLeft'), + get_int(head, 'windowBottom'), + get_int(head, 'windowRight'), + outlines=_get_outlines(body) + ) + + +def parse_opml_file(filename: str) -> OPML: + """Parse an OPML document from a local XML file.""" + root = parse_xml(filename).getroot() + return _parse_opml(root) + + +def parse_opml_bytes(data: bytes) -> OPML: + """Parse an OPML document from a byte-string containing XML data.""" + root = parse_xml(BytesIO(data)).getroot() + return _parse_opml(root) + + +def get_feed_list(opml_obj: OPML) -> List[str]: + """Walk an OPML document to extract the list of feed it contains.""" + rv = list() + + def collect(obj): + for outline in obj.outlines: + if outline.type == 'rss' and outline.xml_url: + rv.append(outline.xml_url) + + if outline.outlines: + collect(outline) + + collect(opml_obj) + return rv diff --git a/python/atoma/rss.py b/python/atoma/rss.py new file mode 100644 index 0000000..f447a2f --- /dev/null +++ b/python/atoma/rss.py @@ -0,0 +1,221 @@ +from datetime import datetime +from io import BytesIO +from typing import Optional, List +from xml.etree.ElementTree import Element + +import attr + +from .utils import ( + parse_xml, get_child, get_text, get_int, get_datetime, FeedParseError +) + + +@attr.s +class RSSImage: + url: str = attr.ib() + title: Optional[str] = attr.ib() + link: str = attr.ib() + width: int = attr.ib() + height: int = attr.ib() + description: Optional[str] = attr.ib() + + +@attr.s +class RSSEnclosure: + url: str = attr.ib() + length: Optional[int] = attr.ib() + type: Optional[str] = attr.ib() + + +@attr.s +class RSSSource: + title: str = attr.ib() + url: Optional[str] = attr.ib() + + +@attr.s +class RSSItem: + title: Optional[str] = attr.ib() + link: Optional[str] = attr.ib() + description: Optional[str] = attr.ib() + author: Optional[str] = attr.ib() + categories: List[str] = attr.ib() + comments: Optional[str] = attr.ib() + enclosures: List[RSSEnclosure] = attr.ib() + guid: Optional[str] = attr.ib() + pub_date: Optional[datetime] = attr.ib() + source: Optional[RSSSource] = attr.ib() + + # Extension + content_encoded: Optional[str] = attr.ib() + + +@attr.s +class RSSChannel: + title: Optional[str] = attr.ib() + link: Optional[str] = attr.ib() + description: Optional[str] = attr.ib() + language: Optional[str] = attr.ib() + copyright: Optional[str] = attr.ib() + managing_editor: Optional[str] = attr.ib() + web_master: Optional[str] = attr.ib() + pub_date: Optional[datetime] = attr.ib() + last_build_date: Optional[datetime] = attr.ib() + categories: List[str] = attr.ib() + generator: Optional[str] = attr.ib() + docs: Optional[str] = attr.ib() + ttl: Optional[int] = attr.ib() + image: Optional[RSSImage] = attr.ib() + + items: List[RSSItem] = attr.ib() + + # Extension + content_encoded: Optional[str] = attr.ib() + + +def _get_image(element: Element, name, + optional: bool=True) -> Optional[RSSImage]: + child = get_child(element, name, optional) + if child is None: + return None + + return RSSImage( + get_text(child, 'url', optional=False), + get_text(child, 'title'), + get_text(child, 'link', optional=False), + get_int(child, 'width') or 88, + get_int(child, 'height') or 31, + get_text(child, 'description') + ) + + +def _get_source(element: Element, name, + optional: bool=True) -> Optional[RSSSource]: + child = get_child(element, name, optional) + if child is None: + return None + + return RSSSource( + child.text.strip(), + child.attrib.get('url'), + ) + + +def _get_enclosure(element: Element) -> RSSEnclosure: + length = element.attrib.get('length') + try: + length = int(length) + except (TypeError, ValueError): + length = None + + return RSSEnclosure( + element.attrib['url'], + length, + element.attrib.get('type'), + ) + + +def _get_link(element: Element) -> Optional[str]: + """Attempt to retrieve item link. + + Use the GUID as a fallback if it is a permalink. + """ + link = get_text(element, 'link') + if link is not None: + return link + + guid = get_child(element, 'guid') + if guid is not None and guid.attrib.get('isPermaLink') == 'true': + return get_text(element, 'guid') + + return None + + +def _get_item(element: Element) -> RSSItem: + root = element + + title = get_text(root, 'title') + link = _get_link(root) + description = get_text(root, 'description') + author = get_text(root, 'author') + categories = [e.text for e in root.findall('category')] + comments = get_text(root, 'comments') + enclosure = [_get_enclosure(e) for e in root.findall('enclosure')] + guid = get_text(root, 'guid') + pub_date = get_datetime(root, 'pubDate') + source = _get_source(root, 'source') + + content_encoded = get_text(root, 'content:encoded') + + return RSSItem( + title, + link, + description, + author, + categories, + comments, + enclosure, + guid, + pub_date, + source, + content_encoded + ) + + +def _parse_rss(root: Element) -> RSSChannel: + rss_version = root.get('version') + if rss_version != '2.0': + raise FeedParseError('Cannot process RSS feed version "{}"' + .format(rss_version)) + + root = root.find('channel') + + title = get_text(root, 'title') + link = get_text(root, 'link') + description = get_text(root, 'description') + language = get_text(root, 'language') + copyright = get_text(root, 'copyright') + managing_editor = get_text(root, 'managingEditor') + web_master = get_text(root, 'webMaster') + pub_date = get_datetime(root, 'pubDate') + last_build_date = get_datetime(root, 'lastBuildDate') + categories = [e.text for e in root.findall('category')] + generator = get_text(root, 'generator') + docs = get_text(root, 'docs') + ttl = get_int(root, 'ttl') + + image = _get_image(root, 'image') + items = [_get_item(e) for e in root.findall('item')] + + content_encoded = get_text(root, 'content:encoded') + + return RSSChannel( + title, + link, + description, + language, + copyright, + managing_editor, + web_master, + pub_date, + last_build_date, + categories, + generator, + docs, + ttl, + image, + items, + content_encoded + ) + + +def parse_rss_file(filename: str) -> RSSChannel: + """Parse an RSS feed from a local XML file.""" + root = parse_xml(filename).getroot() + return _parse_rss(root) + + +def parse_rss_bytes(data: bytes) -> RSSChannel: + """Parse an RSS feed from a byte-string containing XML data.""" + root = parse_xml(BytesIO(data)).getroot() + return _parse_rss(root) diff --git a/python/atoma/simple.py b/python/atoma/simple.py new file mode 100644 index 0000000..98bb3e1 --- /dev/null +++ b/python/atoma/simple.py @@ -0,0 +1,224 @@ +"""Simple API that abstracts away the differences between feed types.""" + +from datetime import datetime, timedelta +import html +import os +from typing import Optional, List, Tuple +import urllib.parse + +import attr + +from . import atom, rss, json_feed +from .exceptions import ( + FeedParseError, FeedDocumentError, FeedXMLError, FeedJSONError +) + + +@attr.s +class Attachment: + link: str = attr.ib() + mime_type: Optional[str] = attr.ib() + title: Optional[str] = attr.ib() + size_in_bytes: Optional[int] = attr.ib() + duration: Optional[timedelta] = attr.ib() + + +@attr.s +class Article: + id: str = attr.ib() + title: Optional[str] = attr.ib() + link: Optional[str] = attr.ib() + content: str = attr.ib() + published_at: Optional[datetime] = attr.ib() + updated_at: Optional[datetime] = attr.ib() + attachments: List[Attachment] = attr.ib() + + +@attr.s +class Feed: + title: str = attr.ib() + subtitle: Optional[str] = attr.ib() + link: Optional[str] = attr.ib() + updated_at: Optional[datetime] = attr.ib() + articles: List[Article] = attr.ib() + + +def _adapt_atom_feed(atom_feed: atom.AtomFeed) -> Feed: + articles = list() + for entry in atom_feed.entries: + if entry.content is not None: + content = entry.content.value + elif entry.summary is not None: + content = entry.summary.value + else: + content = '' + published_at, updated_at = _get_article_dates(entry.published, + entry.updated) + # Find article link and attachments + article_link = None + attachments = list() + for candidate_link in entry.links: + if candidate_link.rel in ('alternate', None): + article_link = candidate_link.href + elif candidate_link.rel == 'enclosure': + attachments.append(Attachment( + title=_get_attachment_title(candidate_link.title, + candidate_link.href), + link=candidate_link.href, + mime_type=candidate_link.type_, + size_in_bytes=candidate_link.length, + duration=None + )) + + if entry.title is None: + entry_title = None + elif entry.title.text_type in (atom.AtomTextType.html, + atom.AtomTextType.xhtml): + entry_title = html.unescape(entry.title.value).strip() + else: + entry_title = entry.title.value + + articles.append(Article( + entry.id_, + entry_title, + article_link, + content, + published_at, + updated_at, + attachments + )) + + # Find feed link + link = None + for candidate_link in atom_feed.links: + if candidate_link.rel == 'self': + link = candidate_link.href + break + + return Feed( + atom_feed.title.value if atom_feed.title else atom_feed.id_, + atom_feed.subtitle.value if atom_feed.subtitle else None, + link, + atom_feed.updated, + articles + ) + + +def _adapt_rss_channel(rss_channel: rss.RSSChannel) -> Feed: + articles = list() + for item in rss_channel.items: + attachments = [ + Attachment(link=e.url, mime_type=e.type, size_in_bytes=e.length, + title=_get_attachment_title(None, e.url), duration=None) + for e in item.enclosures + ] + articles.append(Article( + item.guid or item.link, + item.title, + item.link, + item.content_encoded or item.description or '', + item.pub_date, + None, + attachments + )) + + if rss_channel.title is None and rss_channel.link is None: + raise FeedParseError('RSS feed does not have a title nor a link') + + return Feed( + rss_channel.title if rss_channel.title else rss_channel.link, + rss_channel.description, + rss_channel.link, + rss_channel.pub_date, + articles + ) + + +def _adapt_json_feed(json_feed: json_feed.JSONFeed) -> Feed: + articles = list() + for item in json_feed.items: + attachments = [ + Attachment(a.url, a.mime_type, + _get_attachment_title(a.title, a.url), + a.size_in_bytes, a.duration) + for a in item.attachments + ] + articles.append(Article( + item.id_, + item.title, + item.url, + item.content_html or item.content_text or '', + item.date_published, + item.date_modified, + attachments + )) + + return Feed( + json_feed.title, + json_feed.description, + json_feed.feed_url, + None, + articles + ) + + +def _get_article_dates(published_at: Optional[datetime], + updated_at: Optional[datetime] + ) -> Tuple[Optional[datetime], Optional[datetime]]: + if published_at and updated_at: + return published_at, updated_at + + if updated_at: + return updated_at, None + + if published_at: + return published_at, None + + raise FeedParseError('Article does not have proper dates') + + +def _get_attachment_title(attachment_title: Optional[str], link: str) -> str: + if attachment_title: + return attachment_title + + parsed_link = urllib.parse.urlparse(link) + return os.path.basename(parsed_link.path) + + +def _simple_parse(pairs, content) -> Feed: + is_xml = True + is_json = True + for parser, adapter in pairs: + try: + return adapter(parser(content)) + except FeedXMLError: + is_xml = False + except FeedJSONError: + is_json = False + except FeedParseError: + continue + + if not is_xml and not is_json: + raise FeedDocumentError('File is not a supported feed type') + + raise FeedParseError('File is not a valid supported feed') + + +def simple_parse_file(filename: str) -> Feed: + """Parse an Atom, RSS or JSON feed from a local file.""" + pairs = ( + (rss.parse_rss_file, _adapt_rss_channel), + (atom.parse_atom_file, _adapt_atom_feed), + (json_feed.parse_json_feed_file, _adapt_json_feed) + ) + return _simple_parse(pairs, filename) + + +def simple_parse_bytes(data: bytes) -> Feed: + """Parse an Atom, RSS or JSON feed from a byte-string containing data.""" + pairs = ( + (rss.parse_rss_bytes, _adapt_rss_channel), + (atom.parse_atom_bytes, _adapt_atom_feed), + (json_feed.parse_json_feed_bytes, _adapt_json_feed) + ) + return _simple_parse(pairs, data) diff --git a/python/atoma/utils.py b/python/atoma/utils.py new file mode 100644 index 0000000..4dc1ab5 --- /dev/null +++ b/python/atoma/utils.py @@ -0,0 +1,84 @@ +from datetime import datetime, timezone +from xml.etree.ElementTree import Element +from typing import Optional + +import dateutil.parser +from defusedxml.ElementTree import parse as defused_xml_parse, ParseError + +from .exceptions import FeedXMLError, FeedParseError + +ns = { + 'content': 'http://purl.org/rss/1.0/modules/content/', + 'feed': 'http://www.w3.org/2005/Atom' +} + + +def parse_xml(xml_content): + try: + return defused_xml_parse(xml_content) + except ParseError: + raise FeedXMLError('Not a valid XML document') + + +def get_child(element: Element, name, + optional: bool=True) -> Optional[Element]: + child = element.find(name, namespaces=ns) + + if child is None and not optional: + raise FeedParseError( + 'Could not parse feed: "{}" does not have a "{}"' + .format(element.tag, name) + ) + + elif child is None: + return None + + return child + + +def get_text(element: Element, name, optional: bool=True) -> Optional[str]: + child = get_child(element, name, optional) + if child is None: + return None + + if child.text is None: + if optional: + return None + + raise FeedParseError( + 'Could not parse feed: "{}" text is required but is empty' + .format(name) + ) + + return child.text.strip() + + +def get_int(element: Element, name, optional: bool=True) -> Optional[int]: + text = get_text(element, name, optional) + if text is None: + return None + + return int(text) + + +def get_datetime(element: Element, name, + optional: bool=True) -> Optional[datetime]: + text = get_text(element, name, optional) + if text is None: + return None + + return try_parse_date(text) + + +def try_parse_date(date_str: str) -> Optional[datetime]: + try: + date = dateutil.parser.parse(date_str, fuzzy=True) + except (ValueError, OverflowError): + return None + + if date.tzinfo is None: + # TZ naive datetime, make it a TZ aware datetime by assuming it + # contains UTC time + date = date.replace(tzinfo=timezone.utc) + + return date |