# -*- coding: utf-8 -*-
# This code is part of the Ansible collection community.docker, but is an independent component.
# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/)
#
# Copyright (c) 2016-2022 Docker, Inc.
#
# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection)
# SPDX-License-Identifier: Apache-2.0

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import base64
import collections
import json
import os
import os.path
import shlex
import string
from ansible_collections.community.docker.plugins.module_utils.version import StrictVersion

from ansible.module_utils.six import PY2, PY3, binary_type, integer_types, iteritems, string_types, text_type

from .. import errors
from ..constants import DEFAULT_HTTP_HOST
from ..constants import DEFAULT_UNIX_SOCKET
from ..constants import DEFAULT_NPIPE
from ..constants import BYTE_UNITS
from ..tls import TLSConfig

if PY2:
    from urlparse import urlparse, urlunparse
else:
    from urllib.parse import urlparse, urlunparse


URLComponents = collections.namedtuple(
    'URLComponents',
    'scheme netloc url params query fragment',
)


def create_ipam_pool(*args, **kwargs):
    raise errors.DeprecatedMethod(
        'utils.create_ipam_pool has been removed. Please use a '
        'docker.types.IPAMPool object instead.'
    )


def create_ipam_config(*args, **kwargs):
    raise errors.DeprecatedMethod(
        'utils.create_ipam_config has been removed. Please use a '
        'docker.types.IPAMConfig object instead.'
    )


def decode_json_header(header):
    data = base64.b64decode(header)
    if PY3:
        data = data.decode('utf-8')
    return json.loads(data)


def compare_version(v1, v2):
    """Compare docker versions

    >>> v1 = '1.9'
    >>> v2 = '1.10'
    >>> compare_version(v1, v2)
    1
    >>> compare_version(v2, v1)
    -1
    >>> compare_version(v2, v2)
    0
    """
    s1 = StrictVersion(v1)
    s2 = StrictVersion(v2)
    if s1 == s2:
        return 0
    elif s1 > s2:
        return -1
    else:
        return 1


def version_lt(v1, v2):
    return compare_version(v1, v2) > 0


def version_gte(v1, v2):
    return not version_lt(v1, v2)


def _convert_port_binding(binding):
    result = {'HostIp': '', 'HostPort': ''}
    if isinstance(binding, tuple):
        if len(binding) == 2:
            result['HostPort'] = binding[1]
            result['HostIp'] = binding[0]
        elif isinstance(binding[0], string_types):
            result['HostIp'] = binding[0]
        else:
            result['HostPort'] = binding[0]
    elif isinstance(binding, dict):
        if 'HostPort' in binding:
            result['HostPort'] = binding['HostPort']
            if 'HostIp' in binding:
                result['HostIp'] = binding['HostIp']
        else:
            raise ValueError(binding)
    else:
        result['HostPort'] = binding

    if result['HostPort'] is None:
        result['HostPort'] = ''
    else:
        result['HostPort'] = str(result['HostPort'])

    return result


def convert_port_bindings(port_bindings):
    result = {}
    for k, v in iteritems(port_bindings):
        key = str(k)
        if '/' not in key:
            key += '/tcp'
        if isinstance(v, list):
            result[key] = [_convert_port_binding(binding) for binding in v]
        else:
            result[key] = [_convert_port_binding(v)]
    return result


def convert_volume_binds(binds):
    if isinstance(binds, list):
        return binds

    result = []
    for k, v in binds.items():
        if isinstance(k, binary_type):
            k = k.decode('utf-8')

        if isinstance(v, dict):
            if 'ro' in v and 'mode' in v:
                raise ValueError(
                    'Binding cannot contain both "ro" and "mode": {0}'
                    .format(repr(v))
                )

            bind = v['bind']
            if isinstance(bind, binary_type):
                bind = bind.decode('utf-8')

            if 'ro' in v:
                mode = 'ro' if v['ro'] else 'rw'
            elif 'mode' in v:
                mode = v['mode']
            else:
                mode = 'rw'

            # NOTE: this is only relevant for Linux hosts
            # (does not apply in Docker Desktop)
            propagation_modes = [
                'rshared',
                'shared',
                'rslave',
                'slave',
                'rprivate',
                'private',
            ]
            if 'propagation' in v and v['propagation'] in propagation_modes:
                if mode:
                    mode = ','.join([mode, v['propagation']])
                else:
                    mode = v['propagation']

            result.append(
                text_type('{0}:{1}:{2}').format(k, bind, mode)
            )
        else:
            if isinstance(v, binary_type):
                v = v.decode('utf-8')
            result.append(
                text_type('{0}:{1}:rw').format(k, v)
            )
    return result


def convert_tmpfs_mounts(tmpfs):
    if isinstance(tmpfs, dict):
        return tmpfs

    if not isinstance(tmpfs, list):
        raise ValueError(
            'Expected tmpfs value to be either a list or a dict, found: {0}'
            .format(type(tmpfs).__name__)
        )

    result = {}
    for mount in tmpfs:
        if isinstance(mount, string_types):
            if ":" in mount:
                name, options = mount.split(":", 1)
            else:
                name = mount
                options = ""

        else:
            raise ValueError(
                "Expected item in tmpfs list to be a string, found: {0}"
                .format(type(mount).__name__)
            )

        result[name] = options
    return result


def convert_service_networks(networks):
    if not networks:
        return networks
    if not isinstance(networks, list):
        raise TypeError('networks parameter must be a list.')

    result = []
    for n in networks:
        if isinstance(n, string_types):
            n = {'Target': n}
        result.append(n)
    return result


def parse_repository_tag(repo_name):
    parts = repo_name.rsplit('@', 1)
    if len(parts) == 2:
        return tuple(parts)
    parts = repo_name.rsplit(':', 1)
    if len(parts) == 2 and '/' not in parts[1]:
        return tuple(parts)
    return repo_name, None


def parse_host(addr, is_win32=False, tls=False):
    # Sensible defaults
    if not addr and is_win32:
        return DEFAULT_NPIPE
    if not addr or addr.strip() == 'unix://':
        return DEFAULT_UNIX_SOCKET

    addr = addr.strip()

    parsed_url = urlparse(addr)
    proto = parsed_url.scheme
    if not proto or any(x not in string.ascii_letters + '+' for x in proto):
        # https://bugs.python.org/issue754016
        parsed_url = urlparse('//' + addr, 'tcp')
        proto = 'tcp'

    if proto == 'fd':
        raise errors.DockerException('fd protocol is not implemented')

    # These protos are valid aliases for our library but not for the
    # official spec
    if proto == 'http' or proto == 'https':
        tls = proto == 'https'
        proto = 'tcp'
    elif proto == 'http+unix':
        proto = 'unix'

    if proto not in ('tcp', 'unix', 'npipe', 'ssh'):
        raise errors.DockerException(
            "Invalid bind address protocol: {0}".format(addr)
        )

    if proto == 'tcp' and not parsed_url.netloc:
        # "tcp://" is exceptionally disallowed by convention;
        # omitting a hostname for other protocols is fine
        raise errors.DockerException(
            'Invalid bind address format: {0}'.format(addr)
        )

    if any([
        parsed_url.params, parsed_url.query, parsed_url.fragment,
        parsed_url.password
    ]):
        raise errors.DockerException(
            'Invalid bind address format: {0}'.format(addr)
        )

    if parsed_url.path and proto == 'ssh':
        raise errors.DockerException(
            'Invalid bind address format: no path allowed for this protocol:'
            ' {0}'.format(addr)
        )
    else:
        path = parsed_url.path
        if proto == 'unix' and parsed_url.hostname is not None:
            # For legacy reasons, we consider unix://path
            # to be valid and equivalent to unix:///path
            path = '/'.join((parsed_url.hostname, path))

    netloc = parsed_url.netloc
    if proto in ('tcp', 'ssh'):
        port = parsed_url.port or 0
        if port <= 0:
            port = 22 if proto == 'ssh' else (2375 if tls else 2376)
            netloc = '{0}:{1}'.format(parsed_url.netloc, port)

        if not parsed_url.hostname:
            netloc = '{0}:{1}'.format(DEFAULT_HTTP_HOST, port)

    # Rewrite schemes to fit library internals (requests adapters)
    if proto == 'tcp':
        proto = 'http{0}'.format('s' if tls else '')
    elif proto == 'unix':
        proto = 'http+unix'

    if proto in ('http+unix', 'npipe'):
        return "{0}://{1}".format(proto, path).rstrip('/')
    return urlunparse(URLComponents(
        scheme=proto,
        netloc=netloc,
        url=path,
        params='',
        query='',
        fragment='',
    )).rstrip('/')


def parse_devices(devices):
    device_list = []
    for device in devices:
        if isinstance(device, dict):
            device_list.append(device)
            continue
        if not isinstance(device, string_types):
            raise errors.DockerException(
                'Invalid device type {0}'.format(type(device))
            )
        device_mapping = device.split(':')
        if device_mapping:
            path_on_host = device_mapping[0]
            if len(device_mapping) > 1:
                path_in_container = device_mapping[1]
            else:
                path_in_container = path_on_host
            if len(device_mapping) > 2:
                permissions = device_mapping[2]
            else:
                permissions = 'rwm'
            device_list.append({
                'PathOnHost': path_on_host,
                'PathInContainer': path_in_container,
                'CgroupPermissions': permissions
            })
    return device_list


def kwargs_from_env(ssl_version=None, assert_hostname=None, environment=None):
    if not environment:
        environment = os.environ
    host = environment.get('DOCKER_HOST')

    # empty string for cert path is the same as unset.
    cert_path = environment.get('DOCKER_CERT_PATH') or None

    # empty string for tls verify counts as "false".
    # Any value or 'unset' counts as true.
    tls_verify = environment.get('DOCKER_TLS_VERIFY')
    if tls_verify == '':
        tls_verify = False
    else:
        tls_verify = tls_verify is not None
    enable_tls = cert_path or tls_verify

    params = {}

    if host:
        params['base_url'] = host

    if not enable_tls:
        return params

    if not cert_path:
        cert_path = os.path.join(os.path.expanduser('~'), '.docker')

    if not tls_verify and assert_hostname is None:
        # assert_hostname is a subset of TLS verification,
        # so if it is not set already then set it to false.
        assert_hostname = False

    params['tls'] = TLSConfig(
        client_cert=(os.path.join(cert_path, 'cert.pem'),
                     os.path.join(cert_path, 'key.pem')),
        ca_cert=os.path.join(cert_path, 'ca.pem'),
        verify=tls_verify,
        ssl_version=ssl_version,
        assert_hostname=assert_hostname,
    )

    return params


def convert_filters(filters):
    result = {}
    for k, v in iteritems(filters):
        if isinstance(v, bool):
            v = 'true' if v else 'false'
        if not isinstance(v, list):
            v = [v, ]
        result[k] = [
            str(item) if not isinstance(item, string_types) else item
            for item in v
        ]
    return json.dumps(result)


def parse_bytes(s):
    if isinstance(s, integer_types + (float,)):
        return s
    if len(s) == 0:
        return 0

    if s[-2:-1].isalpha() and s[-1].isalpha():
        if s[-1] == "b" or s[-1] == "B":
            s = s[:-1]
    units = BYTE_UNITS
    suffix = s[-1].lower()

    # Check if the variable is a string representation of an int
    # without a units part. Assuming that the units are bytes.
    if suffix.isdigit():
        digits_part = s
        suffix = 'b'
    else:
        digits_part = s[:-1]

    if suffix in units.keys() or suffix.isdigit():
        try:
            digits = float(digits_part)
        except ValueError:
            raise errors.DockerException(
                'Failed converting the string value for memory ({0}) to'
                ' an integer.'.format(digits_part)
            )

        # Reconvert to long for the final result
        s = int(digits * units[suffix])
    else:
        raise errors.DockerException(
            'The specified value for memory ({0}) should specify the'
            ' units. The postfix should be one of the `b` `k` `m` `g`'
            ' characters'.format(s)
        )

    return s


def normalize_links(links):
    if isinstance(links, dict):
        links = iteritems(links)

    return ['{0}:{1}'.format(k, v) if v else k for k, v in sorted(links)]


def parse_env_file(env_file):
    """
    Reads a line-separated environment file.
    The format of each line should be "key=value".
    """
    environment = {}

    with open(env_file, 'r') as f:
        for line in f:

            if line[0] == '#':
                continue

            line = line.strip()
            if not line:
                continue

            parse_line = line.split('=', 1)
            if len(parse_line) == 2:
                k, v = parse_line
                environment[k] = v
            else:
                raise errors.DockerException(
                    'Invalid line in environment file {0}:\n{1}'.format(
                        env_file, line))

    return environment


def split_command(command):
    if PY2 and not isinstance(command, binary_type):
        command = command.encode('utf-8')
    return shlex.split(command)


def format_environment(environment):
    def format_env(key, value):
        if value is None:
            return key
        if isinstance(value, binary_type):
            value = value.decode('utf-8')

        return u'{key}={value}'.format(key=key, value=value)
    return [format_env(*var) for var in iteritems(environment)]


def format_extra_hosts(extra_hosts, task=False):
    # Use format dictated by Swarm API if container is part of a task
    if task:
        return [
            '{0} {1}'.format(v, k) for k, v in sorted(iteritems(extra_hosts))
        ]

    return [
        '{0}:{1}'.format(k, v) for k, v in sorted(iteritems(extra_hosts))
    ]


def create_host_config(self, *args, **kwargs):
    raise errors.DeprecatedMethod(
        'utils.create_host_config has been removed. Please use a '
        'docker.types.HostConfig object instead.'
    )
