From 5122028a4bcac4ae577ef7fbd55ccad5cb34ef5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs?= Date: Mon, 18 Oct 2021 15:24:21 -0500 Subject: update from upstream --- hypervideo_dl/downloader/websocket.py | 59 +++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 hypervideo_dl/downloader/websocket.py (limited to 'hypervideo_dl/downloader/websocket.py') diff --git a/hypervideo_dl/downloader/websocket.py b/hypervideo_dl/downloader/websocket.py new file mode 100644 index 0000000..0882220 --- /dev/null +++ b/hypervideo_dl/downloader/websocket.py @@ -0,0 +1,59 @@ +import os +import signal +import asyncio +import threading + +try: + import websockets + has_websockets = True +except ImportError: + has_websockets = False + +from .common import FileDownloader +from .external import FFmpegFD + + +class FFmpegSinkFD(FileDownloader): + """ A sink to ffmpeg for downloading fragments in any form """ + + def real_download(self, filename, info_dict): + info_copy = info_dict.copy() + info_copy['url'] = '-' + + async def call_conn(proc, stdin): + try: + await self.real_connection(stdin, info_dict) + except (BrokenPipeError, OSError): + pass + finally: + try: + stdin.flush() + stdin.close() + except OSError: + pass + os.kill(os.getpid(), signal.SIGINT) + + class FFmpegStdinFD(FFmpegFD): + @classmethod + def get_basename(cls): + return FFmpegFD.get_basename() + + def on_process_started(self, proc, stdin): + thread = threading.Thread(target=asyncio.run, daemon=True, args=(call_conn(proc, stdin), )) + thread.start() + + return FFmpegStdinFD(self.ydl, self.params or {}).download(filename, info_copy) + + async def real_connection(self, sink, info_dict): + """ Override this in subclasses """ + raise NotImplementedError('This method must be implemented by subclasses') + + +class WebSocketFragmentFD(FFmpegSinkFD): + async def real_connection(self, sink, info_dict): + async with websockets.connect(info_dict['url'], extra_headers=info_dict.get('http_headers', {})) as ws: + while True: + recv = await ws.recv() + if isinstance(recv, str): + recv = recv.encode('utf8') + sink.write(recv) -- cgit v1.2.3