#!/usr/bin/env python
# coding: utf-8
from __future__ import unicode_literals

# Allow direct execution
import os
import re
import sys
import subprocess
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from test.helper import (
    FakeLogger,
    FakeYDL,
    http_server_port,
    try_rm,
)
from youtube_dl import YoutubeDL
from youtube_dl.compat import (
    compat_http_cookiejar_Cookie,
    compat_http_server,
    compat_kwargs,
)
from youtube_dl.utils import (
    encodeFilename,
    join_nonempty,
)
from youtube_dl.downloader.external import (
    Aria2cFD,
    Aria2pFD,
    AxelFD,
    CurlFD,
    FFmpegFD,
    HttpieFD,
    WgetFD,
)
import threading

TEST_SIZE = 10 * 1024

TEST_COOKIE = {
    'version': 0,
    'name': 'test',
    'value': 'ytdlp',
    'port': None,
    'port_specified': False,
    'domain': '.example.com',
    'domain_specified': True,
    'domain_initial_dot': False,
    'path': '/',
    'path_specified': True,
    'secure': False,
    'expires': None,
    'discard': False,
    'comment': None,
    'comment_url': None,
    'rest': {},
}

TEST_COOKIE_VALUE = join_nonempty('name', 'value', delim='=', from_dict=TEST_COOKIE)

TEST_INFO = {'url': 'http://www.example.com/'}


def cookiejar_Cookie(**cookie_args):
    return compat_http_cookiejar_Cookie(**compat_kwargs(cookie_args))


def ifExternalFDAvailable(externalFD):
    return unittest.skipUnless(externalFD.available(),
                               externalFD.get_basename() + ' not found')


class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):
    def log_message(self, format, *args):
        pass

    def send_content_range(self, total=None):
        range_header = self.headers.get('Range')
        start = end = None
        if range_header:
            mobj = re.match(r'bytes=(\d+)-(\d+)', range_header)
            if mobj:
                start, end = (int(mobj.group(i)) for i in (1, 2))
        valid_range = start is not None and end is not None
        if valid_range:
            content_range = 'bytes %d-%d' % (start, end)
            if total:
                content_range += '/%d' % total
            self.send_header('Content-Range', content_range)
        return (end - start + 1) if valid_range else total

    def serve(self, range=True, content_length=True):
        self.send_response(200)
        self.send_header('Content-Type', 'video/mp4')
        size = TEST_SIZE
        if range:
            size = self.send_content_range(TEST_SIZE)
        if content_length:
            self.send_header('Content-Length', size)
        self.end_headers()
        self.wfile.write(b'#' * size)

    def do_GET(self):
        if self.path == '/regular':
            self.serve()
        elif self.path == '/no-content-length':
            self.serve(content_length=False)
        elif self.path == '/no-range':
            self.serve(range=False)
        elif self.path == '/no-range-no-content-length':
            self.serve(range=False, content_length=False)
        else:
            assert False, 'unrecognised server path'


@ifExternalFDAvailable(Aria2pFD)
class TestAria2pFD(unittest.TestCase):
    def setUp(self):
        self.httpd = compat_http_server.HTTPServer(
            ('127.0.0.1', 0), HTTPTestRequestHandler)
        self.port = http_server_port(self.httpd)
        self.server_thread = threading.Thread(target=self.httpd.serve_forever)
        self.server_thread.daemon = True
        self.server_thread.start()

    def download(self, params, ep):
        with subprocess.Popen(
            ['aria2c', '--enable-rpc'],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL
        ) as process:
            if not process.poll():
                filename = 'testfile.mp4'
                params['logger'] = FakeLogger()
                params['outtmpl'] = filename
                ydl = YoutubeDL(params)
                try_rm(encodeFilename(filename))
                self.assertEqual(ydl.download(['http://127.0.0.1:%d/%s' % (self.port, ep)]), 0)
                self.assertEqual(os.path.getsize(encodeFilename(filename)), TEST_SIZE)
                try_rm(encodeFilename(filename))
            process.kill()

    def download_all(self, params):
        for ep in ('regular', 'no-content-length', 'no-range', 'no-range-no-content-length'):
            self.download(params, ep)

    def test_regular(self):
        self.download_all({'external_downloader': 'aria2p'})

    def test_chunked(self):
        self.download_all({
            'external_downloader': 'aria2p',
            'http_chunk_size': 1000,
        })


@ifExternalFDAvailable(HttpieFD)
class TestHttpieFD(unittest.TestCase):
    def test_make_cmd(self):
        with FakeYDL() as ydl:
            downloader = HttpieFD(ydl, {})
            self.assertEqual(
                downloader._make_cmd('test', TEST_INFO),
                ['http', '--download', '--output', 'test', 'http://www.example.com/'])

            # Test cookie header is added
            ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
            self.assertEqual(
                downloader._make_cmd('test', TEST_INFO),
                ['http', '--download', '--output', 'test',
                 'http://www.example.com/', 'Cookie:' + TEST_COOKIE_VALUE])


@ifExternalFDAvailable(AxelFD)
class TestAxelFD(unittest.TestCase):
    def test_make_cmd(self):
        with FakeYDL() as ydl:
            downloader = AxelFD(ydl, {})
            self.assertEqual(
                downloader._make_cmd('test', TEST_INFO),
                ['axel', '-o', 'test', '--', 'http://www.example.com/'])

            # Test cookie header is added
            ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
            self.assertEqual(
                downloader._make_cmd('test', TEST_INFO),
                ['axel', '-o', 'test', '-H', 'Cookie: ' + TEST_COOKIE_VALUE,
                 '--max-redirect=0', '--', 'http://www.example.com/'])


@ifExternalFDAvailable(WgetFD)
class TestWgetFD(unittest.TestCase):
    def test_make_cmd(self):
        with FakeYDL() as ydl:
            downloader = WgetFD(ydl, {})
            self.assertNotIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))
            # Test cookiejar tempfile arg is added
            ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
            self.assertIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))


@ifExternalFDAvailable(CurlFD)
class TestCurlFD(unittest.TestCase):
    def test_make_cmd(self):
        with FakeYDL() as ydl:
            downloader = CurlFD(ydl, {})
            self.assertNotIn('--cookie', downloader._make_cmd('test', TEST_INFO))
            # Test cookie header is added
            ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
            self.assertIn('--cookie', downloader._make_cmd('test', TEST_INFO))
            self.assertIn(TEST_COOKIE_VALUE, downloader._make_cmd('test', TEST_INFO))


@ifExternalFDAvailable(Aria2cFD)
class TestAria2cFD(unittest.TestCase):
    def test_make_cmd(self):
        with FakeYDL() as ydl:
            downloader = Aria2cFD(ydl, {})
            downloader._make_cmd('test', TEST_INFO)
            self.assertFalse(hasattr(downloader, '_cookies_tempfile'))

            # Test cookiejar tempfile arg is added
            ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
            cmd = downloader._make_cmd('test', TEST_INFO)
            self.assertIn('--load-cookies=%s' % downloader._cookies_tempfile, cmd)


@ifExternalFDAvailable(FFmpegFD)
class TestFFmpegFD(unittest.TestCase):
    _args = []

    def _test_cmd(self, args):
        self._args = args

    def test_make_cmd(self):
        with FakeYDL() as ydl:
            downloader = FFmpegFD(ydl, {})
            downloader._debug_cmd = self._test_cmd
            info_dict = TEST_INFO.copy()
            info_dict['ext'] = 'mp4'

            downloader._call_downloader('test', info_dict)
            self.assertEqual(self._args, [
                'ffmpeg', '-y', '-i', 'http://www.example.com/',
                '-c', 'copy', '-f', 'mp4', 'file:test'])

            # Test cookies arg is added
            ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
            downloader._call_downloader('test', info_dict)
            self.assertEqual(self._args, [
                'ffmpeg', '-y', '-cookies', TEST_COOKIE_VALUE + '; path=/; domain=.example.com;\r\n',
                '-i', 'http://www.example.com/', '-c', 'copy', '-f', 'mp4', 'file:test'])


if __name__ == '__main__':
    unittest.main()