# -*- coding: utf-8 -*- #
# Copyright 2013 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A git credential helper that provides Google git repository passwords.

Reads a session from stdin that looks a lot like:
  protocol=https
  host=code.google.com
And writes out a session to stdout that looks a lot like:
  username=me
  password=secret

Errors will be reported on stderr.

Note that spaces may be part of key names so, for example, "protocol" must not
be proceeded by leading spaces.
"""


import os
import re
import subprocess
import sys
import textwrap

from google.auth import exceptions as google_auth_exceptions
from googlecloudsdk.api_lib.auth import exceptions as auth_exceptions
from googlecloudsdk.calliope import base
from googlecloudsdk.calliope import exceptions as c_exc
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core import requests
from googlecloudsdk.core.credentials import creds as c_creds
from googlecloudsdk.core.credentials import exceptions as creds_exceptions
from googlecloudsdk.core.credentials import store as c_store
from googlecloudsdk.core.util import files
from googlecloudsdk.core.util import platforms


_KEYVAL_RE = re.compile(r'(.+)=(.*)')
_BLANK_LINE_RE = re.compile(r'^ *$')


def _GetAccessTokenFromAdc():
  """Obtains an access token using Application Default Credentials.

  Writes error messages to stderr and returns None if an error occurs.

  Returns:
    str or None: The access token if successful, otherwise None.
  """
  try:
    creds, _ = c_creds.GetGoogleAuthDefault().default()
    creds.refresh(requests.GoogleAuthRequest())
  except google_auth_exceptions.DefaultCredentialsError as e:
    sys.stderr.write(textwrap.dedent(f"""\
        ERROR: Failed to get application default credentials: {e}
        """))
    return None
  else:
    return creds.token


@base.Hidden
@base.DefaultUniverseOnly
class GitHelper(base.Command):
  """A git credential helper to provide access to Google git repositories."""

  GET = 'get'
  STORE = 'store'
  METHODS = [GET, STORE]

  GOOGLESOURCE = 'googlesource.com'

  @staticmethod
  def Args(parser):
    parser.add_argument('method', help='The git credential helper method.')
    parser.add_argument(
        '--ignore-unknown',
        action='store_true',
        help=(
            'Produce no output and exit with 0 when given '
            'an unknown method (e.g. store) or host.'
        ),
    )

  @c_exc.RaiseErrorInsteadOf(auth_exceptions.AuthenticationError,
                             google_auth_exceptions.GoogleAuthError)
  def Run(self, args):
    """Run the helper command."""

    # Disable self signed jwt for this command.
    properties.VALUES.auth.service_account_use_self_signed_jwt.Set(False)

    if args.method not in GitHelper.METHODS:
      if args.ignore_unknown:
        return
      raise auth_exceptions.GitCredentialHelperError(
          'Unexpected method [{meth}]. One of [{methods}] expected.'.format(
              meth=args.method, methods=', '.join(GitHelper.METHODS)
          )
      )

    info = self._ParseInput()
    credentialed_domains = [
        'source.developers.google.com',
        GitHelper.GOOGLESOURCE,  # Requires a different username value.
    ]
    credentialed_domains_suffix = [
        '.sourcemanager.dev',
        '.blueoryx.dev',
        '.developerconnect.dev',
        '.' + GitHelper.GOOGLESOURCE,
    ]
    extra = properties.VALUES.core.credentialed_hosted_repo_domains.Get()
    if extra:
      credentialed_domains.extend(extra.split(','))
    host = info.get('host')

    def _ValidateHost(host):
      if host in credentialed_domains:
        return True
      for suffix in credentialed_domains_suffix:
        if host.endswith(suffix):
          return True
      return False

    if not _ValidateHost(host):
      if not args.ignore_unknown:
        raise auth_exceptions.GitCredentialHelperError(
            'Unknown host [{host}].'.format(host=host)
        )
      return

    if args.method == GitHelper.GET:
      access_token = self._GetAccessToken()

      if not access_token:
        # Error messages are already written to stderr by _GetAccessToken
        return

      self._CheckNetrc()

      # For googlesource.com, any username beginning with "git-" is accepted
      # and the identity of the user is extracted from the token server-side.
      if (
          host == GitHelper.GOOGLESOURCE
          or host.endswith('.' + GitHelper.GOOGLESOURCE)
      ):
        sent_account = 'git-account'
      else:
        sent_account = properties.VALUES.core.account.Get()

      sys.stdout.write(textwrap.dedent(f"""\
          username={sent_account}
          password={access_token}
          """))
    elif args.method == GitHelper.STORE:
      # On OSX, there is an additional credential helper that gets called before
      # ours does.  When we return a token, it gets cached there.  Git continues
      # to get it from there first until it expires.  That command then fails,
      # and the token is deleted, but it does not retry the operation.  The next
      # command gets a new token from us and it starts working again, for an
      # hour.  This erases our credential from the other cache whenever 'store'
      # is called on us.  Because they are called first, the token will already
      # be stored there, and so we can successfully erase it to prevent caching.
      if (
          platforms.OperatingSystem.Current()
          == platforms.OperatingSystem.MACOSX
      ):
        log.debug('Clearing OSX credential cache.')
        try:
          input_string = 'protocol={protocol}\nhost={host}\n\n'.format(
              protocol=info.get('protocol'), host=info.get('host')
          )
          log.debug('Calling erase with input:\n%s', input_string)
          p = subprocess.Popen(
              ['git-credential-osxkeychain', 'erase'],
              stdin=subprocess.PIPE,
              stdout=subprocess.PIPE,
              stderr=subprocess.PIPE,
          )
          out, err = p.communicate(input_string)
          if p.returncode:
            log.debug(
                'Failed to clear OSX keychain:\nstdout: {%s}\nstderr: {%s}',
                out,
                err,
            )
        # pylint:disable=broad-except, This can fail and should only be done as
        # best effort.
        except Exception as e:
          log.debug('Failed to clear OSX keychain', exc_info=True)

  def _GetAccessTokenFromGcloud(self):
    """Obtains an access token using gcloud credentials.

    Writes error messages to stderr and returns None if an error occurs.

    Returns:
      str or None: The access token if successful, otherwise None.
    """
    account = properties.VALUES.core.account.Get()
    try:
      cred = c_store.Load(account)
      c_store.Refresh(cred)
    except creds_exceptions.Error as e:
      sys.stderr.write(textwrap.dedent(f"""\
          ERROR: {e}
          Run 'gcloud auth login' to log in.
          """))
      return None
    except Exception as e:  # pylint: disable=broad-except
      sys.stderr.write(f'ERROR: Failed to obtain access token: {e}\n')
      return None
    else:
      return cred.token

  def _GetAccessToken(self):
    """Obtains an access token using either ADC or gcloud credentials.

    Writes error messages to stderr and returns None if an error occurs.

    Returns:
      str or None: The access token if successful, otherwise None.
    """
    if properties.VALUES.auth.git_helper_use_adc.GetBool():
      return _GetAccessTokenFromAdc()
    else:
      return self._GetAccessTokenFromGcloud()

  def _ParseInput(self):
    """Parse the fields from stdin.

    Returns:
      {str: str}, The parsed parameters given on stdin.
    """
    info = {}
    for line in sys.stdin:
      if _BLANK_LINE_RE.match(line):
        continue
      match = _KEYVAL_RE.match(line)
      if not match:
        raise auth_exceptions.GitCredentialHelperError(
            'Invalid input line format: [{format}].'.format(
                format=line.rstrip('\n')
            )
        )
      key, val = match.groups()
      info[key] = val.strip()

    if 'protocol' not in info:
      raise auth_exceptions.GitCredentialHelperError(
          'Required key "protocol" missing.'
      )

    if 'host' not in info:
      raise auth_exceptions.GitCredentialHelperError(
          'Required key "host" missing.'
      )

    if info.get('protocol') != 'https':
      raise auth_exceptions.GitCredentialHelperError(
          'Invalid protocol [{p}].  "https" expected.'.format(
              p=info.get('protocol')
          )
      )
    return info

  def _CheckNetrc(self):
    """Warn on stderr if ~/.netrc contains redundant credentials."""

    def Check(p):
      """Warn about other credential helpers that will be ignored."""
      if not os.path.exists(p):
        return
      try:
        data = files.ReadFileContents(p)
        if 'source.developers.google.com' in data:
          sys.stderr.write(textwrap.dedent("""\
You have credentials for your Google repository in [{path}]. This repository's
git credential helper is set correctly, so the credentials in [{path}] will not
be used, but you may want to remove them to avoid confusion.
""".format(path=p)))
      # pylint:disable=broad-except, If something went wrong, forget about it.
      except Exception:
        pass

    Check(files.ExpandHomeDir(os.path.join('~', '.netrc')))
    Check(files.ExpandHomeDir(os.path.join('~', '_netrc')))
