diff options
Diffstat (limited to 'python/atoma')
-rw-r--r-- | python/atoma/__init__.py | 12 | ||||
-rw-r--r-- | python/atoma/atom.py | 284 | ||||
-rw-r--r-- | python/atoma/const.py | 1 | ||||
-rw-r--r-- | python/atoma/exceptions.py | 14 | ||||
-rw-r--r-- | python/atoma/json_feed.py | 223 | ||||
-rw-r--r-- | python/atoma/opml.py | 107 | ||||
-rw-r--r-- | python/atoma/rss.py | 221 | ||||
-rw-r--r-- | python/atoma/simple.py | 224 | ||||
-rw-r--r-- | python/atoma/utils.py | 84 |
9 files changed, 0 insertions, 1170 deletions
diff --git a/python/atoma/__init__.py b/python/atoma/__init__.py deleted file mode 100644 index 0768081..0000000 --- a/python/atoma/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -from .atom import parse_atom_file, parse_atom_bytes -from .rss import parse_rss_file, parse_rss_bytes -from .json_feed import ( - parse_json_feed, parse_json_feed_file, parse_json_feed_bytes -) -from .opml import parse_opml_file, parse_opml_bytes -from .exceptions import ( - FeedParseError, FeedDocumentError, FeedXMLError, FeedJSONError -) -from .const import VERSION - -__version__ = VERSION diff --git a/python/atoma/atom.py b/python/atoma/atom.py deleted file mode 100644 index d4e676c..0000000 --- a/python/atoma/atom.py +++ /dev/null @@ -1,284 +0,0 @@ -from datetime import datetime -import enum -from io import BytesIO -from typing import Optional, List -from xml.etree.ElementTree import Element - -import attr - -from .utils import ( - parse_xml, get_child, get_text, get_datetime, FeedParseError, ns -) - - -class AtomTextType(enum.Enum): - text = "text" - html = "html" - xhtml = "xhtml" - - -@attr.s -class AtomTextConstruct: - text_type: str = attr.ib() - lang: Optional[str] = attr.ib() - value: str = attr.ib() - - -@attr.s -class AtomEntry: - title: AtomTextConstruct = attr.ib() - id_: str = attr.ib() - - # Should be mandatory but many feeds use published instead - updated: Optional[datetime] = attr.ib() - - authors: List['AtomPerson'] = attr.ib() - contributors: List['AtomPerson'] = attr.ib() - links: List['AtomLink'] = attr.ib() - categories: List['AtomCategory'] = attr.ib() - published: Optional[datetime] = attr.ib() - rights: Optional[AtomTextConstruct] = attr.ib() - summary: Optional[AtomTextConstruct] = attr.ib() - content: Optional[AtomTextConstruct] = attr.ib() - source: Optional['AtomFeed'] = attr.ib() - - -@attr.s -class AtomFeed: - title: Optional[AtomTextConstruct] = attr.ib() - id_: str = attr.ib() - - # Should be mandatory but many feeds do not include it - updated: Optional[datetime] = attr.ib() - - authors: List['AtomPerson'] = attr.ib() - contributors: List['AtomPerson'] = attr.ib() - links: List['AtomLink'] = attr.ib() - categories: List['AtomCategory'] = attr.ib() - generator: Optional['AtomGenerator'] = attr.ib() - subtitle: Optional[AtomTextConstruct] = attr.ib() - rights: Optional[AtomTextConstruct] = attr.ib() - icon: Optional[str] = attr.ib() - logo: Optional[str] = attr.ib() - - entries: List[AtomEntry] = attr.ib() - - -@attr.s -class AtomPerson: - name: str = attr.ib() - uri: Optional[str] = attr.ib() - email: Optional[str] = attr.ib() - - -@attr.s -class AtomLink: - href: str = attr.ib() - rel: Optional[str] = attr.ib() - type_: Optional[str] = attr.ib() - hreflang: Optional[str] = attr.ib() - title: Optional[str] = attr.ib() - length: Optional[int] = attr.ib() - - -@attr.s -class AtomCategory: - term: str = attr.ib() - scheme: Optional[str] = attr.ib() - label: Optional[str] = attr.ib() - - -@attr.s -class AtomGenerator: - name: str = attr.ib() - uri: Optional[str] = attr.ib() - version: Optional[str] = attr.ib() - - -def _get_generator(element: Element, name, - optional: bool=True) -> Optional[AtomGenerator]: - child = get_child(element, name, optional) - if child is None: - return None - - return AtomGenerator( - child.text.strip(), - child.attrib.get('uri'), - child.attrib.get('version'), - ) - - -def _get_text_construct(element: Element, name, - optional: bool=True) -> Optional[AtomTextConstruct]: - child = get_child(element, name, optional) - if child is None: - return None - - try: - text_type = AtomTextType(child.attrib['type']) - except KeyError: - text_type = AtomTextType.text - - try: - lang = child.lang - except AttributeError: - lang = None - - if child.text is None: - if optional: - return None - - raise FeedParseError( - 'Could not parse atom feed: "{}" text is required but is empty' - .format(name) - ) - - return AtomTextConstruct( - text_type, - lang, - child.text.strip() - ) - - -def _get_person(element: Element) -> Optional[AtomPerson]: - try: - return AtomPerson( - get_text(element, 'feed:name', optional=False), - get_text(element, 'feed:uri'), - get_text(element, 'feed:email') - ) - except FeedParseError: - return None - - -def _get_link(element: Element) -> AtomLink: - length = element.attrib.get('length') - length = int(length) if length else None - return AtomLink( - element.attrib['href'], - element.attrib.get('rel'), - element.attrib.get('type'), - element.attrib.get('hreflang'), - element.attrib.get('title'), - length - ) - - -def _get_category(element: Element) -> AtomCategory: - return AtomCategory( - element.attrib['term'], - element.attrib.get('scheme'), - element.attrib.get('label'), - ) - - -def _get_entry(element: Element, - default_authors: List[AtomPerson]) -> AtomEntry: - root = element - - # Mandatory - title = _get_text_construct(root, 'feed:title') - id_ = get_text(root, 'feed:id') - - # Optional - try: - source = _parse_atom(get_child(root, 'feed:source', optional=False), - parse_entries=False) - except FeedParseError: - source = None - source_authors = [] - else: - source_authors = source.authors - - authors = [_get_person(e) - for e in root.findall('feed:author', ns)] or default_authors - authors = [a for a in authors if a is not None] - authors = authors or default_authors or source_authors - - contributors = [_get_person(e) - for e in root.findall('feed:contributor', ns) if e] - contributors = [c for c in contributors if c is not None] - - links = [_get_link(e) for e in root.findall('feed:link', ns)] - categories = [_get_category(e) for e in root.findall('feed:category', ns)] - - updated = get_datetime(root, 'feed:updated') - published = get_datetime(root, 'feed:published') - rights = _get_text_construct(root, 'feed:rights') - summary = _get_text_construct(root, 'feed:summary') - content = _get_text_construct(root, 'feed:content') - - return AtomEntry( - title, - id_, - updated, - authors, - contributors, - links, - categories, - published, - rights, - summary, - content, - source - ) - - -def _parse_atom(root: Element, parse_entries: bool=True) -> AtomFeed: - # Mandatory - id_ = get_text(root, 'feed:id', optional=False) - - # Optional - title = _get_text_construct(root, 'feed:title') - updated = get_datetime(root, 'feed:updated') - authors = [_get_person(e) - for e in root.findall('feed:author', ns) if e] - authors = [a for a in authors if a is not None] - contributors = [_get_person(e) - for e in root.findall('feed:contributor', ns) if e] - contributors = [c for c in contributors if c is not None] - links = [_get_link(e) - for e in root.findall('feed:link', ns)] - categories = [_get_category(e) - for e in root.findall('feed:category', ns)] - - generator = _get_generator(root, 'feed:generator') - subtitle = _get_text_construct(root, 'feed:subtitle') - rights = _get_text_construct(root, 'feed:rights') - icon = get_text(root, 'feed:icon') - logo = get_text(root, 'feed:logo') - - if parse_entries: - entries = [_get_entry(e, authors) - for e in root.findall('feed:entry', ns)] - else: - entries = [] - - atom_feed = AtomFeed( - title, - id_, - updated, - authors, - contributors, - links, - categories, - generator, - subtitle, - rights, - icon, - logo, - entries - ) - return atom_feed - - -def parse_atom_file(filename: str) -> AtomFeed: - """Parse an Atom feed from a local XML file.""" - root = parse_xml(filename).getroot() - return _parse_atom(root) - - -def parse_atom_bytes(data: bytes) -> AtomFeed: - """Parse an Atom feed from a byte-string containing XML data.""" - root = parse_xml(BytesIO(data)).getroot() - return _parse_atom(root) diff --git a/python/atoma/const.py b/python/atoma/const.py deleted file mode 100644 index d52d0f6..0000000 --- a/python/atoma/const.py +++ /dev/null @@ -1 +0,0 @@ -VERSION = '0.0.13' diff --git a/python/atoma/exceptions.py b/python/atoma/exceptions.py deleted file mode 100644 index 88170c5..0000000 --- a/python/atoma/exceptions.py +++ /dev/null @@ -1,14 +0,0 @@ -class FeedParseError(Exception): - """Document is an invalid feed.""" - - -class FeedDocumentError(Exception): - """Document is not a supported file.""" - - -class FeedXMLError(FeedDocumentError): - """Document is not valid XML.""" - - -class FeedJSONError(FeedDocumentError): - """Document is not valid JSON.""" diff --git a/python/atoma/json_feed.py b/python/atoma/json_feed.py deleted file mode 100644 index 410ff4a..0000000 --- a/python/atoma/json_feed.py +++ /dev/null @@ -1,223 +0,0 @@ -from datetime import datetime, timedelta -import json -from typing import Optional, List - -import attr - -from .exceptions import FeedParseError, FeedJSONError -from .utils import try_parse_date - - -@attr.s -class JSONFeedAuthor: - - name: Optional[str] = attr.ib() - url: Optional[str] = attr.ib() - avatar: Optional[str] = attr.ib() - - -@attr.s -class JSONFeedAttachment: - - url: str = attr.ib() - mime_type: str = attr.ib() - title: Optional[str] = attr.ib() - size_in_bytes: Optional[int] = attr.ib() - duration: Optional[timedelta] = attr.ib() - - -@attr.s -class JSONFeedItem: - - id_: str = attr.ib() - url: Optional[str] = attr.ib() - external_url: Optional[str] = attr.ib() - title: Optional[str] = attr.ib() - content_html: Optional[str] = attr.ib() - content_text: Optional[str] = attr.ib() - summary: Optional[str] = attr.ib() - image: Optional[str] = attr.ib() - banner_image: Optional[str] = attr.ib() - date_published: Optional[datetime] = attr.ib() - date_modified: Optional[datetime] = attr.ib() - author: Optional[JSONFeedAuthor] = attr.ib() - - tags: List[str] = attr.ib() - attachments: List[JSONFeedAttachment] = attr.ib() - - -@attr.s -class JSONFeed: - - version: str = attr.ib() - title: str = attr.ib() - home_page_url: Optional[str] = attr.ib() - feed_url: Optional[str] = attr.ib() - description: Optional[str] = attr.ib() - user_comment: Optional[str] = attr.ib() - next_url: Optional[str] = attr.ib() - icon: Optional[str] = attr.ib() - favicon: Optional[str] = attr.ib() - author: Optional[JSONFeedAuthor] = attr.ib() - expired: bool = attr.ib() - - items: List[JSONFeedItem] = attr.ib() - - -def _get_items(root: dict) -> List[JSONFeedItem]: - rv = [] - items = root.get('items', []) - if not items: - return rv - - for item in items: - rv.append(_get_item(item)) - - return rv - - -def _get_item(item_dict: dict) -> JSONFeedItem: - return JSONFeedItem( - id_=_get_text(item_dict, 'id', optional=False), - url=_get_text(item_dict, 'url'), - external_url=_get_text(item_dict, 'external_url'), - title=_get_text(item_dict, 'title'), - content_html=_get_text(item_dict, 'content_html'), - content_text=_get_text(item_dict, 'content_text'), - summary=_get_text(item_dict, 'summary'), - image=_get_text(item_dict, 'image'), - banner_image=_get_text(item_dict, 'banner_image'), - date_published=_get_datetime(item_dict, 'date_published'), - date_modified=_get_datetime(item_dict, 'date_modified'), - author=_get_author(item_dict), - tags=_get_tags(item_dict, 'tags'), - attachments=_get_attachments(item_dict, 'attachments') - ) - - -def _get_attachments(root, name) -> List[JSONFeedAttachment]: - rv = list() - for attachment_dict in root.get(name, []): - rv.append(JSONFeedAttachment( - _get_text(attachment_dict, 'url', optional=False), - _get_text(attachment_dict, 'mime_type', optional=False), - _get_text(attachment_dict, 'title'), - _get_int(attachment_dict, 'size_in_bytes'), - _get_duration(attachment_dict, 'duration_in_seconds') - )) - return rv - - -def _get_tags(root, name) -> List[str]: - tags = root.get(name, []) - return [tag for tag in tags if isinstance(tag, str)] - - -def _get_datetime(root: dict, name, optional: bool=True) -> Optional[datetime]: - text = _get_text(root, name, optional) - if text is None: - return None - - return try_parse_date(text) - - -def _get_expired(root: dict) -> bool: - if root.get('expired') is True: - return True - - return False - - -def _get_author(root: dict) -> Optional[JSONFeedAuthor]: - author_dict = root.get('author') - if not author_dict: - return None - - rv = JSONFeedAuthor( - name=_get_text(author_dict, 'name'), - url=_get_text(author_dict, 'url'), - avatar=_get_text(author_dict, 'avatar'), - ) - if rv.name is None and rv.url is None and rv.avatar is None: - return None - - return rv - - -def _get_int(root: dict, name: str, optional: bool=True) -> Optional[int]: - rv = root.get(name) - if not optional and rv is None: - raise FeedParseError('Could not parse feed: "{}" int is required but ' - 'is empty'.format(name)) - - if optional and rv is None: - return None - - if not isinstance(rv, int): - raise FeedParseError('Could not parse feed: "{}" is not an int' - .format(name)) - - return rv - - -def _get_duration(root: dict, name: str, - optional: bool=True) -> Optional[timedelta]: - duration = _get_int(root, name, optional) - if duration is None: - return None - - return timedelta(seconds=duration) - - -def _get_text(root: dict, name: str, optional: bool=True) -> Optional[str]: - rv = root.get(name) - if not optional and rv is None: - raise FeedParseError('Could not parse feed: "{}" text is required but ' - 'is empty'.format(name)) - - if optional and rv is None: - return None - - if not isinstance(rv, str): - raise FeedParseError('Could not parse feed: "{}" is not a string' - .format(name)) - - return rv - - -def parse_json_feed(root: dict) -> JSONFeed: - return JSONFeed( - version=_get_text(root, 'version', optional=False), - title=_get_text(root, 'title', optional=False), - home_page_url=_get_text(root, 'home_page_url'), - feed_url=_get_text(root, 'feed_url'), - description=_get_text(root, 'description'), - user_comment=_get_text(root, 'user_comment'), - next_url=_get_text(root, 'next_url'), - icon=_get_text(root, 'icon'), - favicon=_get_text(root, 'favicon'), - author=_get_author(root), - expired=_get_expired(root), - items=_get_items(root) - ) - - -def parse_json_feed_file(filename: str) -> JSONFeed: - """Parse a JSON feed from a local json file.""" - with open(filename) as f: - try: - root = json.load(f) - except json.decoder.JSONDecodeError: - raise FeedJSONError('Not a valid JSON document') - - return parse_json_feed(root) - - -def parse_json_feed_bytes(data: bytes) -> JSONFeed: - """Parse a JSON feed from a byte-string containing JSON data.""" - try: - root = json.loads(data) - except json.decoder.JSONDecodeError: - raise FeedJSONError('Not a valid JSON document') - - return parse_json_feed(root) diff --git a/python/atoma/opml.py b/python/atoma/opml.py deleted file mode 100644 index a73105e..0000000 --- a/python/atoma/opml.py +++ /dev/null @@ -1,107 +0,0 @@ -from datetime import datetime -from io import BytesIO -from typing import Optional, List -from xml.etree.ElementTree import Element - -import attr - -from .utils import parse_xml, get_text, get_int, get_datetime - - -@attr.s -class OPMLOutline: - text: Optional[str] = attr.ib() - type: Optional[str] = attr.ib() - xml_url: Optional[str] = attr.ib() - description: Optional[str] = attr.ib() - html_url: Optional[str] = attr.ib() - language: Optional[str] = attr.ib() - title: Optional[str] = attr.ib() - version: Optional[str] = attr.ib() - - outlines: List['OPMLOutline'] = attr.ib() - - -@attr.s -class OPML: - title: Optional[str] = attr.ib() - owner_name: Optional[str] = attr.ib() - owner_email: Optional[str] = attr.ib() - date_created: Optional[datetime] = attr.ib() - date_modified: Optional[datetime] = attr.ib() - expansion_state: Optional[str] = attr.ib() - - vertical_scroll_state: Optional[int] = attr.ib() - window_top: Optional[int] = attr.ib() - window_left: Optional[int] = attr.ib() - window_bottom: Optional[int] = attr.ib() - window_right: Optional[int] = attr.ib() - - outlines: List[OPMLOutline] = attr.ib() - - -def _get_outlines(element: Element) -> List[OPMLOutline]: - rv = list() - - for outline in element.findall('outline'): - rv.append(OPMLOutline( - outline.attrib.get('text'), - outline.attrib.get('type'), - outline.attrib.get('xmlUrl'), - outline.attrib.get('description'), - outline.attrib.get('htmlUrl'), - outline.attrib.get('language'), - outline.attrib.get('title'), - outline.attrib.get('version'), - _get_outlines(outline) - )) - - return rv - - -def _parse_opml(root: Element) -> OPML: - head = root.find('head') - body = root.find('body') - - return OPML( - get_text(head, 'title'), - get_text(head, 'ownerName'), - get_text(head, 'ownerEmail'), - get_datetime(head, 'dateCreated'), - get_datetime(head, 'dateModified'), - get_text(head, 'expansionState'), - get_int(head, 'vertScrollState'), - get_int(head, 'windowTop'), - get_int(head, 'windowLeft'), - get_int(head, 'windowBottom'), - get_int(head, 'windowRight'), - outlines=_get_outlines(body) - ) - - -def parse_opml_file(filename: str) -> OPML: - """Parse an OPML document from a local XML file.""" - root = parse_xml(filename).getroot() - return _parse_opml(root) - - -def parse_opml_bytes(data: bytes) -> OPML: - """Parse an OPML document from a byte-string containing XML data.""" - root = parse_xml(BytesIO(data)).getroot() - return _parse_opml(root) - - -def get_feed_list(opml_obj: OPML) -> List[str]: - """Walk an OPML document to extract the list of feed it contains.""" - rv = list() - - def collect(obj): - for outline in obj.outlines: - if outline.type == 'rss' and outline.xml_url: - rv.append(outline.xml_url) - - if outline.outlines: - collect(outline) - - collect(opml_obj) - return rv diff --git a/python/atoma/rss.py b/python/atoma/rss.py deleted file mode 100644 index f447a2f..0000000 --- a/python/atoma/rss.py +++ /dev/null @@ -1,221 +0,0 @@ -from datetime import datetime -from io import BytesIO -from typing import Optional, List -from xml.etree.ElementTree import Element - -import attr - -from .utils import ( - parse_xml, get_child, get_text, get_int, get_datetime, FeedParseError -) - - -@attr.s -class RSSImage: - url: str = attr.ib() - title: Optional[str] = attr.ib() - link: str = attr.ib() - width: int = attr.ib() - height: int = attr.ib() - description: Optional[str] = attr.ib() - - -@attr.s -class RSSEnclosure: - url: str = attr.ib() - length: Optional[int] = attr.ib() - type: Optional[str] = attr.ib() - - -@attr.s -class RSSSource: - title: str = attr.ib() - url: Optional[str] = attr.ib() - - -@attr.s -class RSSItem: - title: Optional[str] = attr.ib() - link: Optional[str] = attr.ib() - description: Optional[str] = attr.ib() - author: Optional[str] = attr.ib() - categories: List[str] = attr.ib() - comments: Optional[str] = attr.ib() - enclosures: List[RSSEnclosure] = attr.ib() - guid: Optional[str] = attr.ib() - pub_date: Optional[datetime] = attr.ib() - source: Optional[RSSSource] = attr.ib() - - # Extension - content_encoded: Optional[str] = attr.ib() - - -@attr.s -class RSSChannel: - title: Optional[str] = attr.ib() - link: Optional[str] = attr.ib() - description: Optional[str] = attr.ib() - language: Optional[str] = attr.ib() - copyright: Optional[str] = attr.ib() - managing_editor: Optional[str] = attr.ib() - web_master: Optional[str] = attr.ib() - pub_date: Optional[datetime] = attr.ib() - last_build_date: Optional[datetime] = attr.ib() - categories: List[str] = attr.ib() - generator: Optional[str] = attr.ib() - docs: Optional[str] = attr.ib() - ttl: Optional[int] = attr.ib() - image: Optional[RSSImage] = attr.ib() - - items: List[RSSItem] = attr.ib() - - # Extension - content_encoded: Optional[str] = attr.ib() - - -def _get_image(element: Element, name, - optional: bool=True) -> Optional[RSSImage]: - child = get_child(element, name, optional) - if child is None: - return None - - return RSSImage( - get_text(child, 'url', optional=False), - get_text(child, 'title'), - get_text(child, 'link', optional=False), - get_int(child, 'width') or 88, - get_int(child, 'height') or 31, - get_text(child, 'description') - ) - - -def _get_source(element: Element, name, - optional: bool=True) -> Optional[RSSSource]: - child = get_child(element, name, optional) - if child is None: - return None - - return RSSSource( - child.text.strip(), - child.attrib.get('url'), - ) - - -def _get_enclosure(element: Element) -> RSSEnclosure: - length = element.attrib.get('length') - try: - length = int(length) - except (TypeError, ValueError): - length = None - - return RSSEnclosure( - element.attrib['url'], - length, - element.attrib.get('type'), - ) - - -def _get_link(element: Element) -> Optional[str]: - """Attempt to retrieve item link. - - Use the GUID as a fallback if it is a permalink. - """ - link = get_text(element, 'link') - if link is not None: - return link - - guid = get_child(element, 'guid') - if guid is not None and guid.attrib.get('isPermaLink') == 'true': - return get_text(element, 'guid') - - return None - - -def _get_item(element: Element) -> RSSItem: - root = element - - title = get_text(root, 'title') - link = _get_link(root) - description = get_text(root, 'description') - author = get_text(root, 'author') - categories = [e.text for e in root.findall('category')] - comments = get_text(root, 'comments') - enclosure = [_get_enclosure(e) for e in root.findall('enclosure')] - guid = get_text(root, 'guid') - pub_date = get_datetime(root, 'pubDate') - source = _get_source(root, 'source') - - content_encoded = get_text(root, 'content:encoded') - - return RSSItem( - title, - link, - description, - author, - categories, - comments, - enclosure, - guid, - pub_date, - source, - content_encoded - ) - - -def _parse_rss(root: Element) -> RSSChannel: - rss_version = root.get('version') - if rss_version != '2.0': - raise FeedParseError('Cannot process RSS feed version "{}"' - .format(rss_version)) - - root = root.find('channel') - - title = get_text(root, 'title') - link = get_text(root, 'link') - description = get_text(root, 'description') - language = get_text(root, 'language') - copyright = get_text(root, 'copyright') - managing_editor = get_text(root, 'managingEditor') - web_master = get_text(root, 'webMaster') - pub_date = get_datetime(root, 'pubDate') - last_build_date = get_datetime(root, 'lastBuildDate') - categories = [e.text for e in root.findall('category')] - generator = get_text(root, 'generator') - docs = get_text(root, 'docs') - ttl = get_int(root, 'ttl') - - image = _get_image(root, 'image') - items = [_get_item(e) for e in root.findall('item')] - - content_encoded = get_text(root, 'content:encoded') - - return RSSChannel( - title, - link, - description, - language, - copyright, - managing_editor, - web_master, - pub_date, - last_build_date, - categories, - generator, - docs, - ttl, - image, - items, - content_encoded - ) - - -def parse_rss_file(filename: str) -> RSSChannel: - """Parse an RSS feed from a local XML file.""" - root = parse_xml(filename).getroot() - return _parse_rss(root) - - -def parse_rss_bytes(data: bytes) -> RSSChannel: - """Parse an RSS feed from a byte-string containing XML data.""" - root = parse_xml(BytesIO(data)).getroot() - return _parse_rss(root) diff --git a/python/atoma/simple.py b/python/atoma/simple.py deleted file mode 100644 index 98bb3e1..0000000 --- a/python/atoma/simple.py +++ /dev/null @@ -1,224 +0,0 @@ -"""Simple API that abstracts away the differences between feed types.""" - -from datetime import datetime, timedelta -import html -import os -from typing import Optional, List, Tuple -import urllib.parse - -import attr - -from . import atom, rss, json_feed -from .exceptions import ( - FeedParseError, FeedDocumentError, FeedXMLError, FeedJSONError -) - - -@attr.s -class Attachment: - link: str = attr.ib() - mime_type: Optional[str] = attr.ib() - title: Optional[str] = attr.ib() - size_in_bytes: Optional[int] = attr.ib() - duration: Optional[timedelta] = attr.ib() - - -@attr.s -class Article: - id: str = attr.ib() - title: Optional[str] = attr.ib() - link: Optional[str] = attr.ib() - content: str = attr.ib() - published_at: Optional[datetime] = attr.ib() - updated_at: Optional[datetime] = attr.ib() - attachments: List[Attachment] = attr.ib() - - -@attr.s -class Feed: - title: str = attr.ib() - subtitle: Optional[str] = attr.ib() - link: Optional[str] = attr.ib() - updated_at: Optional[datetime] = attr.ib() - articles: List[Article] = attr.ib() - - -def _adapt_atom_feed(atom_feed: atom.AtomFeed) -> Feed: - articles = list() - for entry in atom_feed.entries: - if entry.content is not None: - content = entry.content.value - elif entry.summary is not None: - content = entry.summary.value - else: - content = '' - published_at, updated_at = _get_article_dates(entry.published, - entry.updated) - # Find article link and attachments - article_link = None - attachments = list() - for candidate_link in entry.links: - if candidate_link.rel in ('alternate', None): - article_link = candidate_link.href - elif candidate_link.rel == 'enclosure': - attachments.append(Attachment( - title=_get_attachment_title(candidate_link.title, - candidate_link.href), - link=candidate_link.href, - mime_type=candidate_link.type_, - size_in_bytes=candidate_link.length, - duration=None - )) - - if entry.title is None: - entry_title = None - elif entry.title.text_type in (atom.AtomTextType.html, - atom.AtomTextType.xhtml): - entry_title = html.unescape(entry.title.value).strip() - else: - entry_title = entry.title.value - - articles.append(Article( - entry.id_, - entry_title, - article_link, - content, - published_at, - updated_at, - attachments - )) - - # Find feed link - link = None - for candidate_link in atom_feed.links: - if candidate_link.rel == 'self': - link = candidate_link.href - break - - return Feed( - atom_feed.title.value if atom_feed.title else atom_feed.id_, - atom_feed.subtitle.value if atom_feed.subtitle else None, - link, - atom_feed.updated, - articles - ) - - -def _adapt_rss_channel(rss_channel: rss.RSSChannel) -> Feed: - articles = list() - for item in rss_channel.items: - attachments = [ - Attachment(link=e.url, mime_type=e.type, size_in_bytes=e.length, - title=_get_attachment_title(None, e.url), duration=None) - for e in item.enclosures - ] - articles.append(Article( - item.guid or item.link, - item.title, - item.link, - item.content_encoded or item.description or '', - item.pub_date, - None, - attachments - )) - - if rss_channel.title is None and rss_channel.link is None: - raise FeedParseError('RSS feed does not have a title nor a link') - - return Feed( - rss_channel.title if rss_channel.title else rss_channel.link, - rss_channel.description, - rss_channel.link, - rss_channel.pub_date, - articles - ) - - -def _adapt_json_feed(json_feed: json_feed.JSONFeed) -> Feed: - articles = list() - for item in json_feed.items: - attachments = [ - Attachment(a.url, a.mime_type, - _get_attachment_title(a.title, a.url), - a.size_in_bytes, a.duration) - for a in item.attachments - ] - articles.append(Article( - item.id_, - item.title, - item.url, - item.content_html or item.content_text or '', - item.date_published, - item.date_modified, - attachments - )) - - return Feed( - json_feed.title, - json_feed.description, - json_feed.feed_url, - None, - articles - ) - - -def _get_article_dates(published_at: Optional[datetime], - updated_at: Optional[datetime] - ) -> Tuple[Optional[datetime], Optional[datetime]]: - if published_at and updated_at: - return published_at, updated_at - - if updated_at: - return updated_at, None - - if published_at: - return published_at, None - - raise FeedParseError('Article does not have proper dates') - - -def _get_attachment_title(attachment_title: Optional[str], link: str) -> str: - if attachment_title: - return attachment_title - - parsed_link = urllib.parse.urlparse(link) - return os.path.basename(parsed_link.path) - - -def _simple_parse(pairs, content) -> Feed: - is_xml = True - is_json = True - for parser, adapter in pairs: - try: - return adapter(parser(content)) - except FeedXMLError: - is_xml = False - except FeedJSONError: - is_json = False - except FeedParseError: - continue - - if not is_xml and not is_json: - raise FeedDocumentError('File is not a supported feed type') - - raise FeedParseError('File is not a valid supported feed') - - -def simple_parse_file(filename: str) -> Feed: - """Parse an Atom, RSS or JSON feed from a local file.""" - pairs = ( - (rss.parse_rss_file, _adapt_rss_channel), - (atom.parse_atom_file, _adapt_atom_feed), - (json_feed.parse_json_feed_file, _adapt_json_feed) - ) - return _simple_parse(pairs, filename) - - -def simple_parse_bytes(data: bytes) -> Feed: - """Parse an Atom, RSS or JSON feed from a byte-string containing data.""" - pairs = ( - (rss.parse_rss_bytes, _adapt_rss_channel), - (atom.parse_atom_bytes, _adapt_atom_feed), - (json_feed.parse_json_feed_bytes, _adapt_json_feed) - ) - return _simple_parse(pairs, data) diff --git a/python/atoma/utils.py b/python/atoma/utils.py deleted file mode 100644 index 4dc1ab5..0000000 --- a/python/atoma/utils.py +++ /dev/null @@ -1,84 +0,0 @@ -from datetime import datetime, timezone -from xml.etree.ElementTree import Element -from typing import Optional - -import dateutil.parser -from defusedxml.ElementTree import parse as defused_xml_parse, ParseError - -from .exceptions import FeedXMLError, FeedParseError - -ns = { - 'content': 'http://purl.org/rss/1.0/modules/content/', - 'feed': 'http://www.w3.org/2005/Atom' -} - - -def parse_xml(xml_content): - try: - return defused_xml_parse(xml_content) - except ParseError: - raise FeedXMLError('Not a valid XML document') - - -def get_child(element: Element, name, - optional: bool=True) -> Optional[Element]: - child = element.find(name, namespaces=ns) - - if child is None and not optional: - raise FeedParseError( - 'Could not parse feed: "{}" does not have a "{}"' - .format(element.tag, name) - ) - - elif child is None: - return None - - return child - - -def get_text(element: Element, name, optional: bool=True) -> Optional[str]: - child = get_child(element, name, optional) - if child is None: - return None - - if child.text is None: - if optional: - return None - - raise FeedParseError( - 'Could not parse feed: "{}" text is required but is empty' - .format(name) - ) - - return child.text.strip() - - -def get_int(element: Element, name, optional: bool=True) -> Optional[int]: - text = get_text(element, name, optional) - if text is None: - return None - - return int(text) - - -def get_datetime(element: Element, name, - optional: bool=True) -> Optional[datetime]: - text = get_text(element, name, optional) - if text is None: - return None - - return try_parse_date(text) - - -def try_parse_date(date_str: str) -> Optional[datetime]: - try: - date = dateutil.parser.parse(date_str, fuzzy=True) - except (ValueError, OverflowError): - return None - - if date.tzinfo is None: - # TZ naive datetime, make it a TZ aware datetime by assuming it - # contains UTC time - date = date.replace(tzinfo=timezone.utc) - - return date |