aboutsummaryrefslogtreecommitdiffstats
path: root/python/atoma
diff options
context:
space:
mode:
authorJames Taylor <user234683@users.noreply.github.com>2019-09-06 15:45:01 -0700
committerJames Taylor <user234683@users.noreply.github.com>2019-09-06 15:45:01 -0700
commitac32b24b2a011292b704a3f27e8fd08a7ae9424b (patch)
tree0d6e021519dee62089733e20880c65cdb85d8841 /python/atoma
parent7a93acabb3f5a8dd95ec0d56ae57cc34eb57c1b8 (diff)
parentc393031ac54af959561214c8b1d6b22647a81b89 (diff)
downloadyt-local-ac32b24b2a011292b704a3f27e8fd08a7ae9424b.tar.lz
yt-local-ac32b24b2a011292b704a3f27e8fd08a7ae9424b.tar.xz
yt-local-ac32b24b2a011292b704a3f27e8fd08a7ae9424b.zip
Merge subscriptions into master
Diffstat (limited to 'python/atoma')
-rw-r--r--python/atoma/__init__.py12
-rw-r--r--python/atoma/atom.py284
-rw-r--r--python/atoma/const.py1
-rw-r--r--python/atoma/exceptions.py14
-rw-r--r--python/atoma/json_feed.py223
-rw-r--r--python/atoma/opml.py107
-rw-r--r--python/atoma/rss.py221
-rw-r--r--python/atoma/simple.py224
-rw-r--r--python/atoma/utils.py84
9 files changed, 1170 insertions, 0 deletions
diff --git a/python/atoma/__init__.py b/python/atoma/__init__.py
new file mode 100644
index 0000000..0768081
--- /dev/null
+++ b/python/atoma/__init__.py
@@ -0,0 +1,12 @@
+from .atom import parse_atom_file, parse_atom_bytes
+from .rss import parse_rss_file, parse_rss_bytes
+from .json_feed import (
+ parse_json_feed, parse_json_feed_file, parse_json_feed_bytes
+)
+from .opml import parse_opml_file, parse_opml_bytes
+from .exceptions import (
+ FeedParseError, FeedDocumentError, FeedXMLError, FeedJSONError
+)
+from .const import VERSION
+
+__version__ = VERSION
diff --git a/python/atoma/atom.py b/python/atoma/atom.py
new file mode 100644
index 0000000..d4e676c
--- /dev/null
+++ b/python/atoma/atom.py
@@ -0,0 +1,284 @@
+from datetime import datetime
+import enum
+from io import BytesIO
+from typing import Optional, List
+from xml.etree.ElementTree import Element
+
+import attr
+
+from .utils import (
+ parse_xml, get_child, get_text, get_datetime, FeedParseError, ns
+)
+
+
+class AtomTextType(enum.Enum):
+ text = "text"
+ html = "html"
+ xhtml = "xhtml"
+
+
+@attr.s
+class AtomTextConstruct:
+ text_type: str = attr.ib()
+ lang: Optional[str] = attr.ib()
+ value: str = attr.ib()
+
+
+@attr.s
+class AtomEntry:
+ title: AtomTextConstruct = attr.ib()
+ id_: str = attr.ib()
+
+ # Should be mandatory but many feeds use published instead
+ updated: Optional[datetime] = attr.ib()
+
+ authors: List['AtomPerson'] = attr.ib()
+ contributors: List['AtomPerson'] = attr.ib()
+ links: List['AtomLink'] = attr.ib()
+ categories: List['AtomCategory'] = attr.ib()
+ published: Optional[datetime] = attr.ib()
+ rights: Optional[AtomTextConstruct] = attr.ib()
+ summary: Optional[AtomTextConstruct] = attr.ib()
+ content: Optional[AtomTextConstruct] = attr.ib()
+ source: Optional['AtomFeed'] = attr.ib()
+
+
+@attr.s
+class AtomFeed:
+ title: Optional[AtomTextConstruct] = attr.ib()
+ id_: str = attr.ib()
+
+ # Should be mandatory but many feeds do not include it
+ updated: Optional[datetime] = attr.ib()
+
+ authors: List['AtomPerson'] = attr.ib()
+ contributors: List['AtomPerson'] = attr.ib()
+ links: List['AtomLink'] = attr.ib()
+ categories: List['AtomCategory'] = attr.ib()
+ generator: Optional['AtomGenerator'] = attr.ib()
+ subtitle: Optional[AtomTextConstruct] = attr.ib()
+ rights: Optional[AtomTextConstruct] = attr.ib()
+ icon: Optional[str] = attr.ib()
+ logo: Optional[str] = attr.ib()
+
+ entries: List[AtomEntry] = attr.ib()
+
+
+@attr.s
+class AtomPerson:
+ name: str = attr.ib()
+ uri: Optional[str] = attr.ib()
+ email: Optional[str] = attr.ib()
+
+
+@attr.s
+class AtomLink:
+ href: str = attr.ib()
+ rel: Optional[str] = attr.ib()
+ type_: Optional[str] = attr.ib()
+ hreflang: Optional[str] = attr.ib()
+ title: Optional[str] = attr.ib()
+ length: Optional[int] = attr.ib()
+
+
+@attr.s
+class AtomCategory:
+ term: str = attr.ib()
+ scheme: Optional[str] = attr.ib()
+ label: Optional[str] = attr.ib()
+
+
+@attr.s
+class AtomGenerator:
+ name: str = attr.ib()
+ uri: Optional[str] = attr.ib()
+ version: Optional[str] = attr.ib()
+
+
+def _get_generator(element: Element, name,
+ optional: bool=True) -> Optional[AtomGenerator]:
+ child = get_child(element, name, optional)
+ if child is None:
+ return None
+
+ return AtomGenerator(
+ child.text.strip(),
+ child.attrib.get('uri'),
+ child.attrib.get('version'),
+ )
+
+
+def _get_text_construct(element: Element, name,
+ optional: bool=True) -> Optional[AtomTextConstruct]:
+ child = get_child(element, name, optional)
+ if child is None:
+ return None
+
+ try:
+ text_type = AtomTextType(child.attrib['type'])
+ except KeyError:
+ text_type = AtomTextType.text
+
+ try:
+ lang = child.lang
+ except AttributeError:
+ lang = None
+
+ if child.text is None:
+ if optional:
+ return None
+
+ raise FeedParseError(
+ 'Could not parse atom feed: "{}" text is required but is empty'
+ .format(name)
+ )
+
+ return AtomTextConstruct(
+ text_type,
+ lang,
+ child.text.strip()
+ )
+
+
+def _get_person(element: Element) -> Optional[AtomPerson]:
+ try:
+ return AtomPerson(
+ get_text(element, 'feed:name', optional=False),
+ get_text(element, 'feed:uri'),
+ get_text(element, 'feed:email')
+ )
+ except FeedParseError:
+ return None
+
+
+def _get_link(element: Element) -> AtomLink:
+ length = element.attrib.get('length')
+ length = int(length) if length else None
+ return AtomLink(
+ element.attrib['href'],
+ element.attrib.get('rel'),
+ element.attrib.get('type'),
+ element.attrib.get('hreflang'),
+ element.attrib.get('title'),
+ length
+ )
+
+
+def _get_category(element: Element) -> AtomCategory:
+ return AtomCategory(
+ element.attrib['term'],
+ element.attrib.get('scheme'),
+ element.attrib.get('label'),
+ )
+
+
+def _get_entry(element: Element,
+ default_authors: List[AtomPerson]) -> AtomEntry:
+ root = element
+
+ # Mandatory
+ title = _get_text_construct(root, 'feed:title')
+ id_ = get_text(root, 'feed:id')
+
+ # Optional
+ try:
+ source = _parse_atom(get_child(root, 'feed:source', optional=False),
+ parse_entries=False)
+ except FeedParseError:
+ source = None
+ source_authors = []
+ else:
+ source_authors = source.authors
+
+ authors = [_get_person(e)
+ for e in root.findall('feed:author', ns)] or default_authors
+ authors = [a for a in authors if a is not None]
+ authors = authors or default_authors or source_authors
+
+ contributors = [_get_person(e)
+ for e in root.findall('feed:contributor', ns) if e]
+ contributors = [c for c in contributors if c is not None]
+
+ links = [_get_link(e) for e in root.findall('feed:link', ns)]
+ categories = [_get_category(e) for e in root.findall('feed:category', ns)]
+
+ updated = get_datetime(root, 'feed:updated')
+ published = get_datetime(root, 'feed:published')
+ rights = _get_text_construct(root, 'feed:rights')
+ summary = _get_text_construct(root, 'feed:summary')
+ content = _get_text_construct(root, 'feed:content')
+
+ return AtomEntry(
+ title,
+ id_,
+ updated,
+ authors,
+ contributors,
+ links,
+ categories,
+ published,
+ rights,
+ summary,
+ content,
+ source
+ )
+
+
+def _parse_atom(root: Element, parse_entries: bool=True) -> AtomFeed:
+ # Mandatory
+ id_ = get_text(root, 'feed:id', optional=False)
+
+ # Optional
+ title = _get_text_construct(root, 'feed:title')
+ updated = get_datetime(root, 'feed:updated')
+ authors = [_get_person(e)
+ for e in root.findall('feed:author', ns) if e]
+ authors = [a for a in authors if a is not None]
+ contributors = [_get_person(e)
+ for e in root.findall('feed:contributor', ns) if e]
+ contributors = [c for c in contributors if c is not None]
+ links = [_get_link(e)
+ for e in root.findall('feed:link', ns)]
+ categories = [_get_category(e)
+ for e in root.findall('feed:category', ns)]
+
+ generator = _get_generator(root, 'feed:generator')
+ subtitle = _get_text_construct(root, 'feed:subtitle')
+ rights = _get_text_construct(root, 'feed:rights')
+ icon = get_text(root, 'feed:icon')
+ logo = get_text(root, 'feed:logo')
+
+ if parse_entries:
+ entries = [_get_entry(e, authors)
+ for e in root.findall('feed:entry', ns)]
+ else:
+ entries = []
+
+ atom_feed = AtomFeed(
+ title,
+ id_,
+ updated,
+ authors,
+ contributors,
+ links,
+ categories,
+ generator,
+ subtitle,
+ rights,
+ icon,
+ logo,
+ entries
+ )
+ return atom_feed
+
+
+def parse_atom_file(filename: str) -> AtomFeed:
+ """Parse an Atom feed from a local XML file."""
+ root = parse_xml(filename).getroot()
+ return _parse_atom(root)
+
+
+def parse_atom_bytes(data: bytes) -> AtomFeed:
+ """Parse an Atom feed from a byte-string containing XML data."""
+ root = parse_xml(BytesIO(data)).getroot()
+ return _parse_atom(root)
diff --git a/python/atoma/const.py b/python/atoma/const.py
new file mode 100644
index 0000000..d52d0f6
--- /dev/null
+++ b/python/atoma/const.py
@@ -0,0 +1 @@
+VERSION = '0.0.13'
diff --git a/python/atoma/exceptions.py b/python/atoma/exceptions.py
new file mode 100644
index 0000000..88170c5
--- /dev/null
+++ b/python/atoma/exceptions.py
@@ -0,0 +1,14 @@
+class FeedParseError(Exception):
+ """Document is an invalid feed."""
+
+
+class FeedDocumentError(Exception):
+ """Document is not a supported file."""
+
+
+class FeedXMLError(FeedDocumentError):
+ """Document is not valid XML."""
+
+
+class FeedJSONError(FeedDocumentError):
+ """Document is not valid JSON."""
diff --git a/python/atoma/json_feed.py b/python/atoma/json_feed.py
new file mode 100644
index 0000000..410ff4a
--- /dev/null
+++ b/python/atoma/json_feed.py
@@ -0,0 +1,223 @@
+from datetime import datetime, timedelta
+import json
+from typing import Optional, List
+
+import attr
+
+from .exceptions import FeedParseError, FeedJSONError
+from .utils import try_parse_date
+
+
+@attr.s
+class JSONFeedAuthor:
+
+ name: Optional[str] = attr.ib()
+ url: Optional[str] = attr.ib()
+ avatar: Optional[str] = attr.ib()
+
+
+@attr.s
+class JSONFeedAttachment:
+
+ url: str = attr.ib()
+ mime_type: str = attr.ib()
+ title: Optional[str] = attr.ib()
+ size_in_bytes: Optional[int] = attr.ib()
+ duration: Optional[timedelta] = attr.ib()
+
+
+@attr.s
+class JSONFeedItem:
+
+ id_: str = attr.ib()
+ url: Optional[str] = attr.ib()
+ external_url: Optional[str] = attr.ib()
+ title: Optional[str] = attr.ib()
+ content_html: Optional[str] = attr.ib()
+ content_text: Optional[str] = attr.ib()
+ summary: Optional[str] = attr.ib()
+ image: Optional[str] = attr.ib()
+ banner_image: Optional[str] = attr.ib()
+ date_published: Optional[datetime] = attr.ib()
+ date_modified: Optional[datetime] = attr.ib()
+ author: Optional[JSONFeedAuthor] = attr.ib()
+
+ tags: List[str] = attr.ib()
+ attachments: List[JSONFeedAttachment] = attr.ib()
+
+
+@attr.s
+class JSONFeed:
+
+ version: str = attr.ib()
+ title: str = attr.ib()
+ home_page_url: Optional[str] = attr.ib()
+ feed_url: Optional[str] = attr.ib()
+ description: Optional[str] = attr.ib()
+ user_comment: Optional[str] = attr.ib()
+ next_url: Optional[str] = attr.ib()
+ icon: Optional[str] = attr.ib()
+ favicon: Optional[str] = attr.ib()
+ author: Optional[JSONFeedAuthor] = attr.ib()
+ expired: bool = attr.ib()
+
+ items: List[JSONFeedItem] = attr.ib()
+
+
+def _get_items(root: dict) -> List[JSONFeedItem]:
+ rv = []
+ items = root.get('items', [])
+ if not items:
+ return rv
+
+ for item in items:
+ rv.append(_get_item(item))
+
+ return rv
+
+
+def _get_item(item_dict: dict) -> JSONFeedItem:
+ return JSONFeedItem(
+ id_=_get_text(item_dict, 'id', optional=False),
+ url=_get_text(item_dict, 'url'),
+ external_url=_get_text(item_dict, 'external_url'),
+ title=_get_text(item_dict, 'title'),
+ content_html=_get_text(item_dict, 'content_html'),
+ content_text=_get_text(item_dict, 'content_text'),
+ summary=_get_text(item_dict, 'summary'),
+ image=_get_text(item_dict, 'image'),
+ banner_image=_get_text(item_dict, 'banner_image'),
+ date_published=_get_datetime(item_dict, 'date_published'),
+ date_modified=_get_datetime(item_dict, 'date_modified'),
+ author=_get_author(item_dict),
+ tags=_get_tags(item_dict, 'tags'),
+ attachments=_get_attachments(item_dict, 'attachments')
+ )
+
+
+def _get_attachments(root, name) -> List[JSONFeedAttachment]:
+ rv = list()
+ for attachment_dict in root.get(name, []):
+ rv.append(JSONFeedAttachment(
+ _get_text(attachment_dict, 'url', optional=False),
+ _get_text(attachment_dict, 'mime_type', optional=False),
+ _get_text(attachment_dict, 'title'),
+ _get_int(attachment_dict, 'size_in_bytes'),
+ _get_duration(attachment_dict, 'duration_in_seconds')
+ ))
+ return rv
+
+
+def _get_tags(root, name) -> List[str]:
+ tags = root.get(name, [])
+ return [tag for tag in tags if isinstance(tag, str)]
+
+
+def _get_datetime(root: dict, name, optional: bool=True) -> Optional[datetime]:
+ text = _get_text(root, name, optional)
+ if text is None:
+ return None
+
+ return try_parse_date(text)
+
+
+def _get_expired(root: dict) -> bool:
+ if root.get('expired') is True:
+ return True
+
+ return False
+
+
+def _get_author(root: dict) -> Optional[JSONFeedAuthor]:
+ author_dict = root.get('author')
+ if not author_dict:
+ return None
+
+ rv = JSONFeedAuthor(
+ name=_get_text(author_dict, 'name'),
+ url=_get_text(author_dict, 'url'),
+ avatar=_get_text(author_dict, 'avatar'),
+ )
+ if rv.name is None and rv.url is None and rv.avatar is None:
+ return None
+
+ return rv
+
+
+def _get_int(root: dict, name: str, optional: bool=True) -> Optional[int]:
+ rv = root.get(name)
+ if not optional and rv is None:
+ raise FeedParseError('Could not parse feed: "{}" int is required but '
+ 'is empty'.format(name))
+
+ if optional and rv is None:
+ return None
+
+ if not isinstance(rv, int):
+ raise FeedParseError('Could not parse feed: "{}" is not an int'
+ .format(name))
+
+ return rv
+
+
+def _get_duration(root: dict, name: str,
+ optional: bool=True) -> Optional[timedelta]:
+ duration = _get_int(root, name, optional)
+ if duration is None:
+ return None
+
+ return timedelta(seconds=duration)
+
+
+def _get_text(root: dict, name: str, optional: bool=True) -> Optional[str]:
+ rv = root.get(name)
+ if not optional and rv is None:
+ raise FeedParseError('Could not parse feed: "{}" text is required but '
+ 'is empty'.format(name))
+
+ if optional and rv is None:
+ return None
+
+ if not isinstance(rv, str):
+ raise FeedParseError('Could not parse feed: "{}" is not a string'
+ .format(name))
+
+ return rv
+
+
+def parse_json_feed(root: dict) -> JSONFeed:
+ return JSONFeed(
+ version=_get_text(root, 'version', optional=False),
+ title=_get_text(root, 'title', optional=False),
+ home_page_url=_get_text(root, 'home_page_url'),
+ feed_url=_get_text(root, 'feed_url'),
+ description=_get_text(root, 'description'),
+ user_comment=_get_text(root, 'user_comment'),
+ next_url=_get_text(root, 'next_url'),
+ icon=_get_text(root, 'icon'),
+ favicon=_get_text(root, 'favicon'),
+ author=_get_author(root),
+ expired=_get_expired(root),
+ items=_get_items(root)
+ )
+
+
+def parse_json_feed_file(filename: str) -> JSONFeed:
+ """Parse a JSON feed from a local json file."""
+ with open(filename) as f:
+ try:
+ root = json.load(f)
+ except json.decoder.JSONDecodeError:
+ raise FeedJSONError('Not a valid JSON document')
+
+ return parse_json_feed(root)
+
+
+def parse_json_feed_bytes(data: bytes) -> JSONFeed:
+ """Parse a JSON feed from a byte-string containing JSON data."""
+ try:
+ root = json.loads(data)
+ except json.decoder.JSONDecodeError:
+ raise FeedJSONError('Not a valid JSON document')
+
+ return parse_json_feed(root)
diff --git a/python/atoma/opml.py b/python/atoma/opml.py
new file mode 100644
index 0000000..a73105e
--- /dev/null
+++ b/python/atoma/opml.py
@@ -0,0 +1,107 @@
+from datetime import datetime
+from io import BytesIO
+from typing import Optional, List
+from xml.etree.ElementTree import Element
+
+import attr
+
+from .utils import parse_xml, get_text, get_int, get_datetime
+
+
+@attr.s
+class OPMLOutline:
+ text: Optional[str] = attr.ib()
+ type: Optional[str] = attr.ib()
+ xml_url: Optional[str] = attr.ib()
+ description: Optional[str] = attr.ib()
+ html_url: Optional[str] = attr.ib()
+ language: Optional[str] = attr.ib()
+ title: Optional[str] = attr.ib()
+ version: Optional[str] = attr.ib()
+
+ outlines: List['OPMLOutline'] = attr.ib()
+
+
+@attr.s
+class OPML:
+ title: Optional[str] = attr.ib()
+ owner_name: Optional[str] = attr.ib()
+ owner_email: Optional[str] = attr.ib()
+ date_created: Optional[datetime] = attr.ib()
+ date_modified: Optional[datetime] = attr.ib()
+ expansion_state: Optional[str] = attr.ib()
+
+ vertical_scroll_state: Optional[int] = attr.ib()
+ window_top: Optional[int] = attr.ib()
+ window_left: Optional[int] = attr.ib()
+ window_bottom: Optional[int] = attr.ib()
+ window_right: Optional[int] = attr.ib()
+
+ outlines: List[OPMLOutline] = attr.ib()
+
+
+def _get_outlines(element: Element) -> List[OPMLOutline]:
+ rv = list()
+
+ for outline in element.findall('outline'):
+ rv.append(OPMLOutline(
+ outline.attrib.get('text'),
+ outline.attrib.get('type'),
+ outline.attrib.get('xmlUrl'),
+ outline.attrib.get('description'),
+ outline.attrib.get('htmlUrl'),
+ outline.attrib.get('language'),
+ outline.attrib.get('title'),
+ outline.attrib.get('version'),
+ _get_outlines(outline)
+ ))
+
+ return rv
+
+
+def _parse_opml(root: Element) -> OPML:
+ head = root.find('head')
+ body = root.find('body')
+
+ return OPML(
+ get_text(head, 'title'),
+ get_text(head, 'ownerName'),
+ get_text(head, 'ownerEmail'),
+ get_datetime(head, 'dateCreated'),
+ get_datetime(head, 'dateModified'),
+ get_text(head, 'expansionState'),
+ get_int(head, 'vertScrollState'),
+ get_int(head, 'windowTop'),
+ get_int(head, 'windowLeft'),
+ get_int(head, 'windowBottom'),
+ get_int(head, 'windowRight'),
+ outlines=_get_outlines(body)
+ )
+
+
+def parse_opml_file(filename: str) -> OPML:
+ """Parse an OPML document from a local XML file."""
+ root = parse_xml(filename).getroot()
+ return _parse_opml(root)
+
+
+def parse_opml_bytes(data: bytes) -> OPML:
+ """Parse an OPML document from a byte-string containing XML data."""
+ root = parse_xml(BytesIO(data)).getroot()
+ return _parse_opml(root)
+
+
+def get_feed_list(opml_obj: OPML) -> List[str]:
+ """Walk an OPML document to extract the list of feed it contains."""
+ rv = list()
+
+ def collect(obj):
+ for outline in obj.outlines:
+ if outline.type == 'rss' and outline.xml_url:
+ rv.append(outline.xml_url)
+
+ if outline.outlines:
+ collect(outline)
+
+ collect(opml_obj)
+ return rv
diff --git a/python/atoma/rss.py b/python/atoma/rss.py
new file mode 100644
index 0000000..f447a2f
--- /dev/null
+++ b/python/atoma/rss.py
@@ -0,0 +1,221 @@
+from datetime import datetime
+from io import BytesIO
+from typing import Optional, List
+from xml.etree.ElementTree import Element
+
+import attr
+
+from .utils import (
+ parse_xml, get_child, get_text, get_int, get_datetime, FeedParseError
+)
+
+
+@attr.s
+class RSSImage:
+ url: str = attr.ib()
+ title: Optional[str] = attr.ib()
+ link: str = attr.ib()
+ width: int = attr.ib()
+ height: int = attr.ib()
+ description: Optional[str] = attr.ib()
+
+
+@attr.s
+class RSSEnclosure:
+ url: str = attr.ib()
+ length: Optional[int] = attr.ib()
+ type: Optional[str] = attr.ib()
+
+
+@attr.s
+class RSSSource:
+ title: str = attr.ib()
+ url: Optional[str] = attr.ib()
+
+
+@attr.s
+class RSSItem:
+ title: Optional[str] = attr.ib()
+ link: Optional[str] = attr.ib()
+ description: Optional[str] = attr.ib()
+ author: Optional[str] = attr.ib()
+ categories: List[str] = attr.ib()
+ comments: Optional[str] = attr.ib()
+ enclosures: List[RSSEnclosure] = attr.ib()
+ guid: Optional[str] = attr.ib()
+ pub_date: Optional[datetime] = attr.ib()
+ source: Optional[RSSSource] = attr.ib()
+
+ # Extension
+ content_encoded: Optional[str] = attr.ib()
+
+
+@attr.s
+class RSSChannel:
+ title: Optional[str] = attr.ib()
+ link: Optional[str] = attr.ib()
+ description: Optional[str] = attr.ib()
+ language: Optional[str] = attr.ib()
+ copyright: Optional[str] = attr.ib()
+ managing_editor: Optional[str] = attr.ib()
+ web_master: Optional[str] = attr.ib()
+ pub_date: Optional[datetime] = attr.ib()
+ last_build_date: Optional[datetime] = attr.ib()
+ categories: List[str] = attr.ib()
+ generator: Optional[str] = attr.ib()
+ docs: Optional[str] = attr.ib()
+ ttl: Optional[int] = attr.ib()
+ image: Optional[RSSImage] = attr.ib()
+
+ items: List[RSSItem] = attr.ib()
+
+ # Extension
+ content_encoded: Optional[str] = attr.ib()
+
+
+def _get_image(element: Element, name,
+ optional: bool=True) -> Optional[RSSImage]:
+ child = get_child(element, name, optional)
+ if child is None:
+ return None
+
+ return RSSImage(
+ get_text(child, 'url', optional=False),
+ get_text(child, 'title'),
+ get_text(child, 'link', optional=False),
+ get_int(child, 'width') or 88,
+ get_int(child, 'height') or 31,
+ get_text(child, 'description')
+ )
+
+
+def _get_source(element: Element, name,
+ optional: bool=True) -> Optional[RSSSource]:
+ child = get_child(element, name, optional)
+ if child is None:
+ return None
+
+ return RSSSource(
+ child.text.strip(),
+ child.attrib.get('url'),
+ )
+
+
+def _get_enclosure(element: Element) -> RSSEnclosure:
+ length = element.attrib.get('length')
+ try:
+ length = int(length)
+ except (TypeError, ValueError):
+ length = None
+
+ return RSSEnclosure(
+ element.attrib['url'],
+ length,
+ element.attrib.get('type'),
+ )
+
+
+def _get_link(element: Element) -> Optional[str]:
+ """Attempt to retrieve item link.
+
+ Use the GUID as a fallback if it is a permalink.
+ """
+ link = get_text(element, 'link')
+ if link is not None:
+ return link
+
+ guid = get_child(element, 'guid')
+ if guid is not None and guid.attrib.get('isPermaLink') == 'true':
+ return get_text(element, 'guid')
+
+ return None
+
+
+def _get_item(element: Element) -> RSSItem:
+ root = element
+
+ title = get_text(root, 'title')
+ link = _get_link(root)
+ description = get_text(root, 'description')
+ author = get_text(root, 'author')
+ categories = [e.text for e in root.findall('category')]
+ comments = get_text(root, 'comments')
+ enclosure = [_get_enclosure(e) for e in root.findall('enclosure')]
+ guid = get_text(root, 'guid')
+ pub_date = get_datetime(root, 'pubDate')
+ source = _get_source(root, 'source')
+
+ content_encoded = get_text(root, 'content:encoded')
+
+ return RSSItem(
+ title,
+ link,
+ description,
+ author,
+ categories,
+ comments,
+ enclosure,
+ guid,
+ pub_date,
+ source,
+ content_encoded
+ )
+
+
+def _parse_rss(root: Element) -> RSSChannel:
+ rss_version = root.get('version')
+ if rss_version != '2.0':
+ raise FeedParseError('Cannot process RSS feed version "{}"'
+ .format(rss_version))
+
+ root = root.find('channel')
+
+ title = get_text(root, 'title')
+ link = get_text(root, 'link')
+ description = get_text(root, 'description')
+ language = get_text(root, 'language')
+ copyright = get_text(root, 'copyright')
+ managing_editor = get_text(root, 'managingEditor')
+ web_master = get_text(root, 'webMaster')
+ pub_date = get_datetime(root, 'pubDate')
+ last_build_date = get_datetime(root, 'lastBuildDate')
+ categories = [e.text for e in root.findall('category')]
+ generator = get_text(root, 'generator')
+ docs = get_text(root, 'docs')
+ ttl = get_int(root, 'ttl')
+
+ image = _get_image(root, 'image')
+ items = [_get_item(e) for e in root.findall('item')]
+
+ content_encoded = get_text(root, 'content:encoded')
+
+ return RSSChannel(
+ title,
+ link,
+ description,
+ language,
+ copyright,
+ managing_editor,
+ web_master,
+ pub_date,
+ last_build_date,
+ categories,
+ generator,
+ docs,
+ ttl,
+ image,
+ items,
+ content_encoded
+ )
+
+
+def parse_rss_file(filename: str) -> RSSChannel:
+ """Parse an RSS feed from a local XML file."""
+ root = parse_xml(filename).getroot()
+ return _parse_rss(root)
+
+
+def parse_rss_bytes(data: bytes) -> RSSChannel:
+ """Parse an RSS feed from a byte-string containing XML data."""
+ root = parse_xml(BytesIO(data)).getroot()
+ return _parse_rss(root)
diff --git a/python/atoma/simple.py b/python/atoma/simple.py
new file mode 100644
index 0000000..98bb3e1
--- /dev/null
+++ b/python/atoma/simple.py
@@ -0,0 +1,224 @@
+"""Simple API that abstracts away the differences between feed types."""
+
+from datetime import datetime, timedelta
+import html
+import os
+from typing import Optional, List, Tuple
+import urllib.parse
+
+import attr
+
+from . import atom, rss, json_feed
+from .exceptions import (
+ FeedParseError, FeedDocumentError, FeedXMLError, FeedJSONError
+)
+
+
+@attr.s
+class Attachment:
+ link: str = attr.ib()
+ mime_type: Optional[str] = attr.ib()
+ title: Optional[str] = attr.ib()
+ size_in_bytes: Optional[int] = attr.ib()
+ duration: Optional[timedelta] = attr.ib()
+
+
+@attr.s
+class Article:
+ id: str = attr.ib()
+ title: Optional[str] = attr.ib()
+ link: Optional[str] = attr.ib()
+ content: str = attr.ib()
+ published_at: Optional[datetime] = attr.ib()
+ updated_at: Optional[datetime] = attr.ib()
+ attachments: List[Attachment] = attr.ib()
+
+
+@attr.s
+class Feed:
+ title: str = attr.ib()
+ subtitle: Optional[str] = attr.ib()
+ link: Optional[str] = attr.ib()
+ updated_at: Optional[datetime] = attr.ib()
+ articles: List[Article] = attr.ib()
+
+
+def _adapt_atom_feed(atom_feed: atom.AtomFeed) -> Feed:
+ articles = list()
+ for entry in atom_feed.entries:
+ if entry.content is not None:
+ content = entry.content.value
+ elif entry.summary is not None:
+ content = entry.summary.value
+ else:
+ content = ''
+ published_at, updated_at = _get_article_dates(entry.published,
+ entry.updated)
+ # Find article link and attachments
+ article_link = None
+ attachments = list()
+ for candidate_link in entry.links:
+ if candidate_link.rel in ('alternate', None):
+ article_link = candidate_link.href
+ elif candidate_link.rel == 'enclosure':
+ attachments.append(Attachment(
+ title=_get_attachment_title(candidate_link.title,
+ candidate_link.href),
+ link=candidate_link.href,
+ mime_type=candidate_link.type_,
+ size_in_bytes=candidate_link.length,
+ duration=None
+ ))
+
+ if entry.title is None:
+ entry_title = None
+ elif entry.title.text_type in (atom.AtomTextType.html,
+ atom.AtomTextType.xhtml):
+ entry_title = html.unescape(entry.title.value).strip()
+ else:
+ entry_title = entry.title.value
+
+ articles.append(Article(
+ entry.id_,
+ entry_title,
+ article_link,
+ content,
+ published_at,
+ updated_at,
+ attachments
+ ))
+
+ # Find feed link
+ link = None
+ for candidate_link in atom_feed.links:
+ if candidate_link.rel == 'self':
+ link = candidate_link.href
+ break
+
+ return Feed(
+ atom_feed.title.value if atom_feed.title else atom_feed.id_,
+ atom_feed.subtitle.value if atom_feed.subtitle else None,
+ link,
+ atom_feed.updated,
+ articles
+ )
+
+
+def _adapt_rss_channel(rss_channel: rss.RSSChannel) -> Feed:
+ articles = list()
+ for item in rss_channel.items:
+ attachments = [
+ Attachment(link=e.url, mime_type=e.type, size_in_bytes=e.length,
+ title=_get_attachment_title(None, e.url), duration=None)
+ for e in item.enclosures
+ ]
+ articles.append(Article(
+ item.guid or item.link,
+ item.title,
+ item.link,
+ item.content_encoded or item.description or '',
+ item.pub_date,
+ None,
+ attachments
+ ))
+
+ if rss_channel.title is None and rss_channel.link is None:
+ raise FeedParseError('RSS feed does not have a title nor a link')
+
+ return Feed(
+ rss_channel.title if rss_channel.title else rss_channel.link,
+ rss_channel.description,
+ rss_channel.link,
+ rss_channel.pub_date,
+ articles
+ )
+
+
+def _adapt_json_feed(json_feed: json_feed.JSONFeed) -> Feed:
+ articles = list()
+ for item in json_feed.items:
+ attachments = [
+ Attachment(a.url, a.mime_type,
+ _get_attachment_title(a.title, a.url),
+ a.size_in_bytes, a.duration)
+ for a in item.attachments
+ ]
+ articles.append(Article(
+ item.id_,
+ item.title,
+ item.url,
+ item.content_html or item.content_text or '',
+ item.date_published,
+ item.date_modified,
+ attachments
+ ))
+
+ return Feed(
+ json_feed.title,
+ json_feed.description,
+ json_feed.feed_url,
+ None,
+ articles
+ )
+
+
+def _get_article_dates(published_at: Optional[datetime],
+ updated_at: Optional[datetime]
+ ) -> Tuple[Optional[datetime], Optional[datetime]]:
+ if published_at and updated_at:
+ return published_at, updated_at
+
+ if updated_at:
+ return updated_at, None
+
+ if published_at:
+ return published_at, None
+
+ raise FeedParseError('Article does not have proper dates')
+
+
+def _get_attachment_title(attachment_title: Optional[str], link: str) -> str:
+ if attachment_title:
+ return attachment_title
+
+ parsed_link = urllib.parse.urlparse(link)
+ return os.path.basename(parsed_link.path)
+
+
+def _simple_parse(pairs, content) -> Feed:
+ is_xml = True
+ is_json = True
+ for parser, adapter in pairs:
+ try:
+ return adapter(parser(content))
+ except FeedXMLError:
+ is_xml = False
+ except FeedJSONError:
+ is_json = False
+ except FeedParseError:
+ continue
+
+ if not is_xml and not is_json:
+ raise FeedDocumentError('File is not a supported feed type')
+
+ raise FeedParseError('File is not a valid supported feed')
+
+
+def simple_parse_file(filename: str) -> Feed:
+ """Parse an Atom, RSS or JSON feed from a local file."""
+ pairs = (
+ (rss.parse_rss_file, _adapt_rss_channel),
+ (atom.parse_atom_file, _adapt_atom_feed),
+ (json_feed.parse_json_feed_file, _adapt_json_feed)
+ )
+ return _simple_parse(pairs, filename)
+
+
+def simple_parse_bytes(data: bytes) -> Feed:
+ """Parse an Atom, RSS or JSON feed from a byte-string containing data."""
+ pairs = (
+ (rss.parse_rss_bytes, _adapt_rss_channel),
+ (atom.parse_atom_bytes, _adapt_atom_feed),
+ (json_feed.parse_json_feed_bytes, _adapt_json_feed)
+ )
+ return _simple_parse(pairs, data)
diff --git a/python/atoma/utils.py b/python/atoma/utils.py
new file mode 100644
index 0000000..4dc1ab5
--- /dev/null
+++ b/python/atoma/utils.py
@@ -0,0 +1,84 @@
+from datetime import datetime, timezone
+from xml.etree.ElementTree import Element
+from typing import Optional
+
+import dateutil.parser
+from defusedxml.ElementTree import parse as defused_xml_parse, ParseError
+
+from .exceptions import FeedXMLError, FeedParseError
+
+ns = {
+ 'content': 'http://purl.org/rss/1.0/modules/content/',
+ 'feed': 'http://www.w3.org/2005/Atom'
+}
+
+
+def parse_xml(xml_content):
+ try:
+ return defused_xml_parse(xml_content)
+ except ParseError:
+ raise FeedXMLError('Not a valid XML document')
+
+
+def get_child(element: Element, name,
+ optional: bool=True) -> Optional[Element]:
+ child = element.find(name, namespaces=ns)
+
+ if child is None and not optional:
+ raise FeedParseError(
+ 'Could not parse feed: "{}" does not have a "{}"'
+ .format(element.tag, name)
+ )
+
+ elif child is None:
+ return None
+
+ return child
+
+
+def get_text(element: Element, name, optional: bool=True) -> Optional[str]:
+ child = get_child(element, name, optional)
+ if child is None:
+ return None
+
+ if child.text is None:
+ if optional:
+ return None
+
+ raise FeedParseError(
+ 'Could not parse feed: "{}" text is required but is empty'
+ .format(name)
+ )
+
+ return child.text.strip()
+
+
+def get_int(element: Element, name, optional: bool=True) -> Optional[int]:
+ text = get_text(element, name, optional)
+ if text is None:
+ return None
+
+ return int(text)
+
+
+def get_datetime(element: Element, name,
+ optional: bool=True) -> Optional[datetime]:
+ text = get_text(element, name, optional)
+ if text is None:
+ return None
+
+ return try_parse_date(text)
+
+
+def try_parse_date(date_str: str) -> Optional[datetime]:
+ try:
+ date = dateutil.parser.parse(date_str, fuzzy=True)
+ except (ValueError, OverflowError):
+ return None
+
+ if date.tzinfo is None:
+ # TZ naive datetime, make it a TZ aware datetime by assuming it
+ # contains UTC time
+ date = date.replace(tzinfo=timezone.utc)
+
+ return date