aboutsummaryrefslogtreecommitdiffstats
path: root/python/atoma
diff options
context:
space:
mode:
Diffstat (limited to 'python/atoma')
-rw-r--r--python/atoma/__init__.py12
-rw-r--r--python/atoma/atom.py284
-rw-r--r--python/atoma/const.py1
-rw-r--r--python/atoma/exceptions.py14
-rw-r--r--python/atoma/json_feed.py223
-rw-r--r--python/atoma/opml.py107
-rw-r--r--python/atoma/rss.py221
-rw-r--r--python/atoma/simple.py224
-rw-r--r--python/atoma/utils.py84
9 files changed, 0 insertions, 1170 deletions
diff --git a/python/atoma/__init__.py b/python/atoma/__init__.py
deleted file mode 100644
index 0768081..0000000
--- a/python/atoma/__init__.py
+++ /dev/null
@@ -1,12 +0,0 @@
-from .atom import parse_atom_file, parse_atom_bytes
-from .rss import parse_rss_file, parse_rss_bytes
-from .json_feed import (
- parse_json_feed, parse_json_feed_file, parse_json_feed_bytes
-)
-from .opml import parse_opml_file, parse_opml_bytes
-from .exceptions import (
- FeedParseError, FeedDocumentError, FeedXMLError, FeedJSONError
-)
-from .const import VERSION
-
-__version__ = VERSION
diff --git a/python/atoma/atom.py b/python/atoma/atom.py
deleted file mode 100644
index d4e676c..0000000
--- a/python/atoma/atom.py
+++ /dev/null
@@ -1,284 +0,0 @@
-from datetime import datetime
-import enum
-from io import BytesIO
-from typing import Optional, List
-from xml.etree.ElementTree import Element
-
-import attr
-
-from .utils import (
- parse_xml, get_child, get_text, get_datetime, FeedParseError, ns
-)
-
-
-class AtomTextType(enum.Enum):
- text = "text"
- html = "html"
- xhtml = "xhtml"
-
-
-@attr.s
-class AtomTextConstruct:
- text_type: str = attr.ib()
- lang: Optional[str] = attr.ib()
- value: str = attr.ib()
-
-
-@attr.s
-class AtomEntry:
- title: AtomTextConstruct = attr.ib()
- id_: str = attr.ib()
-
- # Should be mandatory but many feeds use published instead
- updated: Optional[datetime] = attr.ib()
-
- authors: List['AtomPerson'] = attr.ib()
- contributors: List['AtomPerson'] = attr.ib()
- links: List['AtomLink'] = attr.ib()
- categories: List['AtomCategory'] = attr.ib()
- published: Optional[datetime] = attr.ib()
- rights: Optional[AtomTextConstruct] = attr.ib()
- summary: Optional[AtomTextConstruct] = attr.ib()
- content: Optional[AtomTextConstruct] = attr.ib()
- source: Optional['AtomFeed'] = attr.ib()
-
-
-@attr.s
-class AtomFeed:
- title: Optional[AtomTextConstruct] = attr.ib()
- id_: str = attr.ib()
-
- # Should be mandatory but many feeds do not include it
- updated: Optional[datetime] = attr.ib()
-
- authors: List['AtomPerson'] = attr.ib()
- contributors: List['AtomPerson'] = attr.ib()
- links: List['AtomLink'] = attr.ib()
- categories: List['AtomCategory'] = attr.ib()
- generator: Optional['AtomGenerator'] = attr.ib()
- subtitle: Optional[AtomTextConstruct] = attr.ib()
- rights: Optional[AtomTextConstruct] = attr.ib()
- icon: Optional[str] = attr.ib()
- logo: Optional[str] = attr.ib()
-
- entries: List[AtomEntry] = attr.ib()
-
-
-@attr.s
-class AtomPerson:
- name: str = attr.ib()
- uri: Optional[str] = attr.ib()
- email: Optional[str] = attr.ib()
-
-
-@attr.s
-class AtomLink:
- href: str = attr.ib()
- rel: Optional[str] = attr.ib()
- type_: Optional[str] = attr.ib()
- hreflang: Optional[str] = attr.ib()
- title: Optional[str] = attr.ib()
- length: Optional[int] = attr.ib()
-
-
-@attr.s
-class AtomCategory:
- term: str = attr.ib()
- scheme: Optional[str] = attr.ib()
- label: Optional[str] = attr.ib()
-
-
-@attr.s
-class AtomGenerator:
- name: str = attr.ib()
- uri: Optional[str] = attr.ib()
- version: Optional[str] = attr.ib()
-
-
-def _get_generator(element: Element, name,
- optional: bool=True) -> Optional[AtomGenerator]:
- child = get_child(element, name, optional)
- if child is None:
- return None
-
- return AtomGenerator(
- child.text.strip(),
- child.attrib.get('uri'),
- child.attrib.get('version'),
- )
-
-
-def _get_text_construct(element: Element, name,
- optional: bool=True) -> Optional[AtomTextConstruct]:
- child = get_child(element, name, optional)
- if child is None:
- return None
-
- try:
- text_type = AtomTextType(child.attrib['type'])
- except KeyError:
- text_type = AtomTextType.text
-
- try:
- lang = child.lang
- except AttributeError:
- lang = None
-
- if child.text is None:
- if optional:
- return None
-
- raise FeedParseError(
- 'Could not parse atom feed: "{}" text is required but is empty'
- .format(name)
- )
-
- return AtomTextConstruct(
- text_type,
- lang,
- child.text.strip()
- )
-
-
-def _get_person(element: Element) -> Optional[AtomPerson]:
- try:
- return AtomPerson(
- get_text(element, 'feed:name', optional=False),
- get_text(element, 'feed:uri'),
- get_text(element, 'feed:email')
- )
- except FeedParseError:
- return None
-
-
-def _get_link(element: Element) -> AtomLink:
- length = element.attrib.get('length')
- length = int(length) if length else None
- return AtomLink(
- element.attrib['href'],
- element.attrib.get('rel'),
- element.attrib.get('type'),
- element.attrib.get('hreflang'),
- element.attrib.get('title'),
- length
- )
-
-
-def _get_category(element: Element) -> AtomCategory:
- return AtomCategory(
- element.attrib['term'],
- element.attrib.get('scheme'),
- element.attrib.get('label'),
- )
-
-
-def _get_entry(element: Element,
- default_authors: List[AtomPerson]) -> AtomEntry:
- root = element
-
- # Mandatory
- title = _get_text_construct(root, 'feed:title')
- id_ = get_text(root, 'feed:id')
-
- # Optional
- try:
- source = _parse_atom(get_child(root, 'feed:source', optional=False),
- parse_entries=False)
- except FeedParseError:
- source = None
- source_authors = []
- else:
- source_authors = source.authors
-
- authors = [_get_person(e)
- for e in root.findall('feed:author', ns)] or default_authors
- authors = [a for a in authors if a is not None]
- authors = authors or default_authors or source_authors
-
- contributors = [_get_person(e)
- for e in root.findall('feed:contributor', ns) if e]
- contributors = [c for c in contributors if c is not None]
-
- links = [_get_link(e) for e in root.findall('feed:link', ns)]
- categories = [_get_category(e) for e in root.findall('feed:category', ns)]
-
- updated = get_datetime(root, 'feed:updated')
- published = get_datetime(root, 'feed:published')
- rights = _get_text_construct(root, 'feed:rights')
- summary = _get_text_construct(root, 'feed:summary')
- content = _get_text_construct(root, 'feed:content')
-
- return AtomEntry(
- title,
- id_,
- updated,
- authors,
- contributors,
- links,
- categories,
- published,
- rights,
- summary,
- content,
- source
- )
-
-
-def _parse_atom(root: Element, parse_entries: bool=True) -> AtomFeed:
- # Mandatory
- id_ = get_text(root, 'feed:id', optional=False)
-
- # Optional
- title = _get_text_construct(root, 'feed:title')
- updated = get_datetime(root, 'feed:updated')
- authors = [_get_person(e)
- for e in root.findall('feed:author', ns) if e]
- authors = [a for a in authors if a is not None]
- contributors = [_get_person(e)
- for e in root.findall('feed:contributor', ns) if e]
- contributors = [c for c in contributors if c is not None]
- links = [_get_link(e)
- for e in root.findall('feed:link', ns)]
- categories = [_get_category(e)
- for e in root.findall('feed:category', ns)]
-
- generator = _get_generator(root, 'feed:generator')
- subtitle = _get_text_construct(root, 'feed:subtitle')
- rights = _get_text_construct(root, 'feed:rights')
- icon = get_text(root, 'feed:icon')
- logo = get_text(root, 'feed:logo')
-
- if parse_entries:
- entries = [_get_entry(e, authors)
- for e in root.findall('feed:entry', ns)]
- else:
- entries = []
-
- atom_feed = AtomFeed(
- title,
- id_,
- updated,
- authors,
- contributors,
- links,
- categories,
- generator,
- subtitle,
- rights,
- icon,
- logo,
- entries
- )
- return atom_feed
-
-
-def parse_atom_file(filename: str) -> AtomFeed:
- """Parse an Atom feed from a local XML file."""
- root = parse_xml(filename).getroot()
- return _parse_atom(root)
-
-
-def parse_atom_bytes(data: bytes) -> AtomFeed:
- """Parse an Atom feed from a byte-string containing XML data."""
- root = parse_xml(BytesIO(data)).getroot()
- return _parse_atom(root)
diff --git a/python/atoma/const.py b/python/atoma/const.py
deleted file mode 100644
index d52d0f6..0000000
--- a/python/atoma/const.py
+++ /dev/null
@@ -1 +0,0 @@
-VERSION = '0.0.13'
diff --git a/python/atoma/exceptions.py b/python/atoma/exceptions.py
deleted file mode 100644
index 88170c5..0000000
--- a/python/atoma/exceptions.py
+++ /dev/null
@@ -1,14 +0,0 @@
-class FeedParseError(Exception):
- """Document is an invalid feed."""
-
-
-class FeedDocumentError(Exception):
- """Document is not a supported file."""
-
-
-class FeedXMLError(FeedDocumentError):
- """Document is not valid XML."""
-
-
-class FeedJSONError(FeedDocumentError):
- """Document is not valid JSON."""
diff --git a/python/atoma/json_feed.py b/python/atoma/json_feed.py
deleted file mode 100644
index 410ff4a..0000000
--- a/python/atoma/json_feed.py
+++ /dev/null
@@ -1,223 +0,0 @@
-from datetime import datetime, timedelta
-import json
-from typing import Optional, List
-
-import attr
-
-from .exceptions import FeedParseError, FeedJSONError
-from .utils import try_parse_date
-
-
-@attr.s
-class JSONFeedAuthor:
-
- name: Optional[str] = attr.ib()
- url: Optional[str] = attr.ib()
- avatar: Optional[str] = attr.ib()
-
-
-@attr.s
-class JSONFeedAttachment:
-
- url: str = attr.ib()
- mime_type: str = attr.ib()
- title: Optional[str] = attr.ib()
- size_in_bytes: Optional[int] = attr.ib()
- duration: Optional[timedelta] = attr.ib()
-
-
-@attr.s
-class JSONFeedItem:
-
- id_: str = attr.ib()
- url: Optional[str] = attr.ib()
- external_url: Optional[str] = attr.ib()
- title: Optional[str] = attr.ib()
- content_html: Optional[str] = attr.ib()
- content_text: Optional[str] = attr.ib()
- summary: Optional[str] = attr.ib()
- image: Optional[str] = attr.ib()
- banner_image: Optional[str] = attr.ib()
- date_published: Optional[datetime] = attr.ib()
- date_modified: Optional[datetime] = attr.ib()
- author: Optional[JSONFeedAuthor] = attr.ib()
-
- tags: List[str] = attr.ib()
- attachments: List[JSONFeedAttachment] = attr.ib()
-
-
-@attr.s
-class JSONFeed:
-
- version: str = attr.ib()
- title: str = attr.ib()
- home_page_url: Optional[str] = attr.ib()
- feed_url: Optional[str] = attr.ib()
- description: Optional[str] = attr.ib()
- user_comment: Optional[str] = attr.ib()
- next_url: Optional[str] = attr.ib()
- icon: Optional[str] = attr.ib()
- favicon: Optional[str] = attr.ib()
- author: Optional[JSONFeedAuthor] = attr.ib()
- expired: bool = attr.ib()
-
- items: List[JSONFeedItem] = attr.ib()
-
-
-def _get_items(root: dict) -> List[JSONFeedItem]:
- rv = []
- items = root.get('items', [])
- if not items:
- return rv
-
- for item in items:
- rv.append(_get_item(item))
-
- return rv
-
-
-def _get_item(item_dict: dict) -> JSONFeedItem:
- return JSONFeedItem(
- id_=_get_text(item_dict, 'id', optional=False),
- url=_get_text(item_dict, 'url'),
- external_url=_get_text(item_dict, 'external_url'),
- title=_get_text(item_dict, 'title'),
- content_html=_get_text(item_dict, 'content_html'),
- content_text=_get_text(item_dict, 'content_text'),
- summary=_get_text(item_dict, 'summary'),
- image=_get_text(item_dict, 'image'),
- banner_image=_get_text(item_dict, 'banner_image'),
- date_published=_get_datetime(item_dict, 'date_published'),
- date_modified=_get_datetime(item_dict, 'date_modified'),
- author=_get_author(item_dict),
- tags=_get_tags(item_dict, 'tags'),
- attachments=_get_attachments(item_dict, 'attachments')
- )
-
-
-def _get_attachments(root, name) -> List[JSONFeedAttachment]:
- rv = list()
- for attachment_dict in root.get(name, []):
- rv.append(JSONFeedAttachment(
- _get_text(attachment_dict, 'url', optional=False),
- _get_text(attachment_dict, 'mime_type', optional=False),
- _get_text(attachment_dict, 'title'),
- _get_int(attachment_dict, 'size_in_bytes'),
- _get_duration(attachment_dict, 'duration_in_seconds')
- ))
- return rv
-
-
-def _get_tags(root, name) -> List[str]:
- tags = root.get(name, [])
- return [tag for tag in tags if isinstance(tag, str)]
-
-
-def _get_datetime(root: dict, name, optional: bool=True) -> Optional[datetime]:
- text = _get_text(root, name, optional)
- if text is None:
- return None
-
- return try_parse_date(text)
-
-
-def _get_expired(root: dict) -> bool:
- if root.get('expired') is True:
- return True
-
- return False
-
-
-def _get_author(root: dict) -> Optional[JSONFeedAuthor]:
- author_dict = root.get('author')
- if not author_dict:
- return None
-
- rv = JSONFeedAuthor(
- name=_get_text(author_dict, 'name'),
- url=_get_text(author_dict, 'url'),
- avatar=_get_text(author_dict, 'avatar'),
- )
- if rv.name is None and rv.url is None and rv.avatar is None:
- return None
-
- return rv
-
-
-def _get_int(root: dict, name: str, optional: bool=True) -> Optional[int]:
- rv = root.get(name)
- if not optional and rv is None:
- raise FeedParseError('Could not parse feed: "{}" int is required but '
- 'is empty'.format(name))
-
- if optional and rv is None:
- return None
-
- if not isinstance(rv, int):
- raise FeedParseError('Could not parse feed: "{}" is not an int'
- .format(name))
-
- return rv
-
-
-def _get_duration(root: dict, name: str,
- optional: bool=True) -> Optional[timedelta]:
- duration = _get_int(root, name, optional)
- if duration is None:
- return None
-
- return timedelta(seconds=duration)
-
-
-def _get_text(root: dict, name: str, optional: bool=True) -> Optional[str]:
- rv = root.get(name)
- if not optional and rv is None:
- raise FeedParseError('Could not parse feed: "{}" text is required but '
- 'is empty'.format(name))
-
- if optional and rv is None:
- return None
-
- if not isinstance(rv, str):
- raise FeedParseError('Could not parse feed: "{}" is not a string'
- .format(name))
-
- return rv
-
-
-def parse_json_feed(root: dict) -> JSONFeed:
- return JSONFeed(
- version=_get_text(root, 'version', optional=False),
- title=_get_text(root, 'title', optional=False),
- home_page_url=_get_text(root, 'home_page_url'),
- feed_url=_get_text(root, 'feed_url'),
- description=_get_text(root, 'description'),
- user_comment=_get_text(root, 'user_comment'),
- next_url=_get_text(root, 'next_url'),
- icon=_get_text(root, 'icon'),
- favicon=_get_text(root, 'favicon'),
- author=_get_author(root),
- expired=_get_expired(root),
- items=_get_items(root)
- )
-
-
-def parse_json_feed_file(filename: str) -> JSONFeed:
- """Parse a JSON feed from a local json file."""
- with open(filename) as f:
- try:
- root = json.load(f)
- except json.decoder.JSONDecodeError:
- raise FeedJSONError('Not a valid JSON document')
-
- return parse_json_feed(root)
-
-
-def parse_json_feed_bytes(data: bytes) -> JSONFeed:
- """Parse a JSON feed from a byte-string containing JSON data."""
- try:
- root = json.loads(data)
- except json.decoder.JSONDecodeError:
- raise FeedJSONError('Not a valid JSON document')
-
- return parse_json_feed(root)
diff --git a/python/atoma/opml.py b/python/atoma/opml.py
deleted file mode 100644
index a73105e..0000000
--- a/python/atoma/opml.py
+++ /dev/null
@@ -1,107 +0,0 @@
-from datetime import datetime
-from io import BytesIO
-from typing import Optional, List
-from xml.etree.ElementTree import Element
-
-import attr
-
-from .utils import parse_xml, get_text, get_int, get_datetime
-
-
-@attr.s
-class OPMLOutline:
- text: Optional[str] = attr.ib()
- type: Optional[str] = attr.ib()
- xml_url: Optional[str] = attr.ib()
- description: Optional[str] = attr.ib()
- html_url: Optional[str] = attr.ib()
- language: Optional[str] = attr.ib()
- title: Optional[str] = attr.ib()
- version: Optional[str] = attr.ib()
-
- outlines: List['OPMLOutline'] = attr.ib()
-
-
-@attr.s
-class OPML:
- title: Optional[str] = attr.ib()
- owner_name: Optional[str] = attr.ib()
- owner_email: Optional[str] = attr.ib()
- date_created: Optional[datetime] = attr.ib()
- date_modified: Optional[datetime] = attr.ib()
- expansion_state: Optional[str] = attr.ib()
-
- vertical_scroll_state: Optional[int] = attr.ib()
- window_top: Optional[int] = attr.ib()
- window_left: Optional[int] = attr.ib()
- window_bottom: Optional[int] = attr.ib()
- window_right: Optional[int] = attr.ib()
-
- outlines: List[OPMLOutline] = attr.ib()
-
-
-def _get_outlines(element: Element) -> List[OPMLOutline]:
- rv = list()
-
- for outline in element.findall('outline'):
- rv.append(OPMLOutline(
- outline.attrib.get('text'),
- outline.attrib.get('type'),
- outline.attrib.get('xmlUrl'),
- outline.attrib.get('description'),
- outline.attrib.get('htmlUrl'),
- outline.attrib.get('language'),
- outline.attrib.get('title'),
- outline.attrib.get('version'),
- _get_outlines(outline)
- ))
-
- return rv
-
-
-def _parse_opml(root: Element) -> OPML:
- head = root.find('head')
- body = root.find('body')
-
- return OPML(
- get_text(head, 'title'),
- get_text(head, 'ownerName'),
- get_text(head, 'ownerEmail'),
- get_datetime(head, 'dateCreated'),
- get_datetime(head, 'dateModified'),
- get_text(head, 'expansionState'),
- get_int(head, 'vertScrollState'),
- get_int(head, 'windowTop'),
- get_int(head, 'windowLeft'),
- get_int(head, 'windowBottom'),
- get_int(head, 'windowRight'),
- outlines=_get_outlines(body)
- )
-
-
-def parse_opml_file(filename: str) -> OPML:
- """Parse an OPML document from a local XML file."""
- root = parse_xml(filename).getroot()
- return _parse_opml(root)
-
-
-def parse_opml_bytes(data: bytes) -> OPML:
- """Parse an OPML document from a byte-string containing XML data."""
- root = parse_xml(BytesIO(data)).getroot()
- return _parse_opml(root)
-
-
-def get_feed_list(opml_obj: OPML) -> List[str]:
- """Walk an OPML document to extract the list of feed it contains."""
- rv = list()
-
- def collect(obj):
- for outline in obj.outlines:
- if outline.type == 'rss' and outline.xml_url:
- rv.append(outline.xml_url)
-
- if outline.outlines:
- collect(outline)
-
- collect(opml_obj)
- return rv
diff --git a/python/atoma/rss.py b/python/atoma/rss.py
deleted file mode 100644
index f447a2f..0000000
--- a/python/atoma/rss.py
+++ /dev/null
@@ -1,221 +0,0 @@
-from datetime import datetime
-from io import BytesIO
-from typing import Optional, List
-from xml.etree.ElementTree import Element
-
-import attr
-
-from .utils import (
- parse_xml, get_child, get_text, get_int, get_datetime, FeedParseError
-)
-
-
-@attr.s
-class RSSImage:
- url: str = attr.ib()
- title: Optional[str] = attr.ib()
- link: str = attr.ib()
- width: int = attr.ib()
- height: int = attr.ib()
- description: Optional[str] = attr.ib()
-
-
-@attr.s
-class RSSEnclosure:
- url: str = attr.ib()
- length: Optional[int] = attr.ib()
- type: Optional[str] = attr.ib()
-
-
-@attr.s
-class RSSSource:
- title: str = attr.ib()
- url: Optional[str] = attr.ib()
-
-
-@attr.s
-class RSSItem:
- title: Optional[str] = attr.ib()
- link: Optional[str] = attr.ib()
- description: Optional[str] = attr.ib()
- author: Optional[str] = attr.ib()
- categories: List[str] = attr.ib()
- comments: Optional[str] = attr.ib()
- enclosures: List[RSSEnclosure] = attr.ib()
- guid: Optional[str] = attr.ib()
- pub_date: Optional[datetime] = attr.ib()
- source: Optional[RSSSource] = attr.ib()
-
- # Extension
- content_encoded: Optional[str] = attr.ib()
-
-
-@attr.s
-class RSSChannel:
- title: Optional[str] = attr.ib()
- link: Optional[str] = attr.ib()
- description: Optional[str] = attr.ib()
- language: Optional[str] = attr.ib()
- copyright: Optional[str] = attr.ib()
- managing_editor: Optional[str] = attr.ib()
- web_master: Optional[str] = attr.ib()
- pub_date: Optional[datetime] = attr.ib()
- last_build_date: Optional[datetime] = attr.ib()
- categories: List[str] = attr.ib()
- generator: Optional[str] = attr.ib()
- docs: Optional[str] = attr.ib()
- ttl: Optional[int] = attr.ib()
- image: Optional[RSSImage] = attr.ib()
-
- items: List[RSSItem] = attr.ib()
-
- # Extension
- content_encoded: Optional[str] = attr.ib()
-
-
-def _get_image(element: Element, name,
- optional: bool=True) -> Optional[RSSImage]:
- child = get_child(element, name, optional)
- if child is None:
- return None
-
- return RSSImage(
- get_text(child, 'url', optional=False),
- get_text(child, 'title'),
- get_text(child, 'link', optional=False),
- get_int(child, 'width') or 88,
- get_int(child, 'height') or 31,
- get_text(child, 'description')
- )
-
-
-def _get_source(element: Element, name,
- optional: bool=True) -> Optional[RSSSource]:
- child = get_child(element, name, optional)
- if child is None:
- return None
-
- return RSSSource(
- child.text.strip(),
- child.attrib.get('url'),
- )
-
-
-def _get_enclosure(element: Element) -> RSSEnclosure:
- length = element.attrib.get('length')
- try:
- length = int(length)
- except (TypeError, ValueError):
- length = None
-
- return RSSEnclosure(
- element.attrib['url'],
- length,
- element.attrib.get('type'),
- )
-
-
-def _get_link(element: Element) -> Optional[str]:
- """Attempt to retrieve item link.
-
- Use the GUID as a fallback if it is a permalink.
- """
- link = get_text(element, 'link')
- if link is not None:
- return link
-
- guid = get_child(element, 'guid')
- if guid is not None and guid.attrib.get('isPermaLink') == 'true':
- return get_text(element, 'guid')
-
- return None
-
-
-def _get_item(element: Element) -> RSSItem:
- root = element
-
- title = get_text(root, 'title')
- link = _get_link(root)
- description = get_text(root, 'description')
- author = get_text(root, 'author')
- categories = [e.text for e in root.findall('category')]
- comments = get_text(root, 'comments')
- enclosure = [_get_enclosure(e) for e in root.findall('enclosure')]
- guid = get_text(root, 'guid')
- pub_date = get_datetime(root, 'pubDate')
- source = _get_source(root, 'source')
-
- content_encoded = get_text(root, 'content:encoded')
-
- return RSSItem(
- title,
- link,
- description,
- author,
- categories,
- comments,
- enclosure,
- guid,
- pub_date,
- source,
- content_encoded
- )
-
-
-def _parse_rss(root: Element) -> RSSChannel:
- rss_version = root.get('version')
- if rss_version != '2.0':
- raise FeedParseError('Cannot process RSS feed version "{}"'
- .format(rss_version))
-
- root = root.find('channel')
-
- title = get_text(root, 'title')
- link = get_text(root, 'link')
- description = get_text(root, 'description')
- language = get_text(root, 'language')
- copyright = get_text(root, 'copyright')
- managing_editor = get_text(root, 'managingEditor')
- web_master = get_text(root, 'webMaster')
- pub_date = get_datetime(root, 'pubDate')
- last_build_date = get_datetime(root, 'lastBuildDate')
- categories = [e.text for e in root.findall('category')]
- generator = get_text(root, 'generator')
- docs = get_text(root, 'docs')
- ttl = get_int(root, 'ttl')
-
- image = _get_image(root, 'image')
- items = [_get_item(e) for e in root.findall('item')]
-
- content_encoded = get_text(root, 'content:encoded')
-
- return RSSChannel(
- title,
- link,
- description,
- language,
- copyright,
- managing_editor,
- web_master,
- pub_date,
- last_build_date,
- categories,
- generator,
- docs,
- ttl,
- image,
- items,
- content_encoded
- )
-
-
-def parse_rss_file(filename: str) -> RSSChannel:
- """Parse an RSS feed from a local XML file."""
- root = parse_xml(filename).getroot()
- return _parse_rss(root)
-
-
-def parse_rss_bytes(data: bytes) -> RSSChannel:
- """Parse an RSS feed from a byte-string containing XML data."""
- root = parse_xml(BytesIO(data)).getroot()
- return _parse_rss(root)
diff --git a/python/atoma/simple.py b/python/atoma/simple.py
deleted file mode 100644
index 98bb3e1..0000000
--- a/python/atoma/simple.py
+++ /dev/null
@@ -1,224 +0,0 @@
-"""Simple API that abstracts away the differences between feed types."""
-
-from datetime import datetime, timedelta
-import html
-import os
-from typing import Optional, List, Tuple
-import urllib.parse
-
-import attr
-
-from . import atom, rss, json_feed
-from .exceptions import (
- FeedParseError, FeedDocumentError, FeedXMLError, FeedJSONError
-)
-
-
-@attr.s
-class Attachment:
- link: str = attr.ib()
- mime_type: Optional[str] = attr.ib()
- title: Optional[str] = attr.ib()
- size_in_bytes: Optional[int] = attr.ib()
- duration: Optional[timedelta] = attr.ib()
-
-
-@attr.s
-class Article:
- id: str = attr.ib()
- title: Optional[str] = attr.ib()
- link: Optional[str] = attr.ib()
- content: str = attr.ib()
- published_at: Optional[datetime] = attr.ib()
- updated_at: Optional[datetime] = attr.ib()
- attachments: List[Attachment] = attr.ib()
-
-
-@attr.s
-class Feed:
- title: str = attr.ib()
- subtitle: Optional[str] = attr.ib()
- link: Optional[str] = attr.ib()
- updated_at: Optional[datetime] = attr.ib()
- articles: List[Article] = attr.ib()
-
-
-def _adapt_atom_feed(atom_feed: atom.AtomFeed) -> Feed:
- articles = list()
- for entry in atom_feed.entries:
- if entry.content is not None:
- content = entry.content.value
- elif entry.summary is not None:
- content = entry.summary.value
- else:
- content = ''
- published_at, updated_at = _get_article_dates(entry.published,
- entry.updated)
- # Find article link and attachments
- article_link = None
- attachments = list()
- for candidate_link in entry.links:
- if candidate_link.rel in ('alternate', None):
- article_link = candidate_link.href
- elif candidate_link.rel == 'enclosure':
- attachments.append(Attachment(
- title=_get_attachment_title(candidate_link.title,
- candidate_link.href),
- link=candidate_link.href,
- mime_type=candidate_link.type_,
- size_in_bytes=candidate_link.length,
- duration=None
- ))
-
- if entry.title is None:
- entry_title = None
- elif entry.title.text_type in (atom.AtomTextType.html,
- atom.AtomTextType.xhtml):
- entry_title = html.unescape(entry.title.value).strip()
- else:
- entry_title = entry.title.value
-
- articles.append(Article(
- entry.id_,
- entry_title,
- article_link,
- content,
- published_at,
- updated_at,
- attachments
- ))
-
- # Find feed link
- link = None
- for candidate_link in atom_feed.links:
- if candidate_link.rel == 'self':
- link = candidate_link.href
- break
-
- return Feed(
- atom_feed.title.value if atom_feed.title else atom_feed.id_,
- atom_feed.subtitle.value if atom_feed.subtitle else None,
- link,
- atom_feed.updated,
- articles
- )
-
-
-def _adapt_rss_channel(rss_channel: rss.RSSChannel) -> Feed:
- articles = list()
- for item in rss_channel.items:
- attachments = [
- Attachment(link=e.url, mime_type=e.type, size_in_bytes=e.length,
- title=_get_attachment_title(None, e.url), duration=None)
- for e in item.enclosures
- ]
- articles.append(Article(
- item.guid or item.link,
- item.title,
- item.link,
- item.content_encoded or item.description or '',
- item.pub_date,
- None,
- attachments
- ))
-
- if rss_channel.title is None and rss_channel.link is None:
- raise FeedParseError('RSS feed does not have a title nor a link')
-
- return Feed(
- rss_channel.title if rss_channel.title else rss_channel.link,
- rss_channel.description,
- rss_channel.link,
- rss_channel.pub_date,
- articles
- )
-
-
-def _adapt_json_feed(json_feed: json_feed.JSONFeed) -> Feed:
- articles = list()
- for item in json_feed.items:
- attachments = [
- Attachment(a.url, a.mime_type,
- _get_attachment_title(a.title, a.url),
- a.size_in_bytes, a.duration)
- for a in item.attachments
- ]
- articles.append(Article(
- item.id_,
- item.title,
- item.url,
- item.content_html or item.content_text or '',
- item.date_published,
- item.date_modified,
- attachments
- ))
-
- return Feed(
- json_feed.title,
- json_feed.description,
- json_feed.feed_url,
- None,
- articles
- )
-
-
-def _get_article_dates(published_at: Optional[datetime],
- updated_at: Optional[datetime]
- ) -> Tuple[Optional[datetime], Optional[datetime]]:
- if published_at and updated_at:
- return published_at, updated_at
-
- if updated_at:
- return updated_at, None
-
- if published_at:
- return published_at, None
-
- raise FeedParseError('Article does not have proper dates')
-
-
-def _get_attachment_title(attachment_title: Optional[str], link: str) -> str:
- if attachment_title:
- return attachment_title
-
- parsed_link = urllib.parse.urlparse(link)
- return os.path.basename(parsed_link.path)
-
-
-def _simple_parse(pairs, content) -> Feed:
- is_xml = True
- is_json = True
- for parser, adapter in pairs:
- try:
- return adapter(parser(content))
- except FeedXMLError:
- is_xml = False
- except FeedJSONError:
- is_json = False
- except FeedParseError:
- continue
-
- if not is_xml and not is_json:
- raise FeedDocumentError('File is not a supported feed type')
-
- raise FeedParseError('File is not a valid supported feed')
-
-
-def simple_parse_file(filename: str) -> Feed:
- """Parse an Atom, RSS or JSON feed from a local file."""
- pairs = (
- (rss.parse_rss_file, _adapt_rss_channel),
- (atom.parse_atom_file, _adapt_atom_feed),
- (json_feed.parse_json_feed_file, _adapt_json_feed)
- )
- return _simple_parse(pairs, filename)
-
-
-def simple_parse_bytes(data: bytes) -> Feed:
- """Parse an Atom, RSS or JSON feed from a byte-string containing data."""
- pairs = (
- (rss.parse_rss_bytes, _adapt_rss_channel),
- (atom.parse_atom_bytes, _adapt_atom_feed),
- (json_feed.parse_json_feed_bytes, _adapt_json_feed)
- )
- return _simple_parse(pairs, data)
diff --git a/python/atoma/utils.py b/python/atoma/utils.py
deleted file mode 100644
index 4dc1ab5..0000000
--- a/python/atoma/utils.py
+++ /dev/null
@@ -1,84 +0,0 @@
-from datetime import datetime, timezone
-from xml.etree.ElementTree import Element
-from typing import Optional
-
-import dateutil.parser
-from defusedxml.ElementTree import parse as defused_xml_parse, ParseError
-
-from .exceptions import FeedXMLError, FeedParseError
-
-ns = {
- 'content': 'http://purl.org/rss/1.0/modules/content/',
- 'feed': 'http://www.w3.org/2005/Atom'
-}
-
-
-def parse_xml(xml_content):
- try:
- return defused_xml_parse(xml_content)
- except ParseError:
- raise FeedXMLError('Not a valid XML document')
-
-
-def get_child(element: Element, name,
- optional: bool=True) -> Optional[Element]:
- child = element.find(name, namespaces=ns)
-
- if child is None and not optional:
- raise FeedParseError(
- 'Could not parse feed: "{}" does not have a "{}"'
- .format(element.tag, name)
- )
-
- elif child is None:
- return None
-
- return child
-
-
-def get_text(element: Element, name, optional: bool=True) -> Optional[str]:
- child = get_child(element, name, optional)
- if child is None:
- return None
-
- if child.text is None:
- if optional:
- return None
-
- raise FeedParseError(
- 'Could not parse feed: "{}" text is required but is empty'
- .format(name)
- )
-
- return child.text.strip()
-
-
-def get_int(element: Element, name, optional: bool=True) -> Optional[int]:
- text = get_text(element, name, optional)
- if text is None:
- return None
-
- return int(text)
-
-
-def get_datetime(element: Element, name,
- optional: bool=True) -> Optional[datetime]:
- text = get_text(element, name, optional)
- if text is None:
- return None
-
- return try_parse_date(text)
-
-
-def try_parse_date(date_str: str) -> Optional[datetime]:
- try:
- date = dateutil.parser.parse(date_str, fuzzy=True)
- except (ValueError, OverflowError):
- return None
-
- if date.tzinfo is None:
- # TZ naive datetime, make it a TZ aware datetime by assuming it
- # contains UTC time
- date = date.replace(tzinfo=timezone.utc)
-
- return date