const fs = require('fs');
const is = require('is-type-of');
const util = require('util');
const path = require('path');
const mime = require('mime');
const { isFile } = require('./common/utils/isFile');
const { isArray } = require('./common/utils/isArray');
const { isBuffer } = require('./common/utils/isBuffer');
const { retry } = require('./common/utils/retry');

const proto = exports;

/**
 * Multipart operations
 */

/**
 * Upload a file to OSS using multipart uploads
 * @param {String} name
 * @param {String|File|Buffer} file
 * @param {Object} options
 *        {Object} options.callback The callback parameter is composed of a JSON string encoded in Base64
 *        {String} options.callback.url the OSS sends a callback request to this URL
 *        {String} options.callback.host The host header value for initiating callback requests
 *        {String} options.callback.body The value of the request body when a callback is initiated
 *        {String} options.callback.contentType The Content-Type of the callback requests initiatiated
 *        {Object} options.callback.customValue Custom parameters are a map of key-values, e.g:
 *                  customValue = {
 *                    key1: 'value1',
 *                    key2: 'value2'
 *                  }
 */
proto.multipartUpload = async function multipartUpload(name, file, options) {
  this.resetCancelFlag();
  options = options || {};
  if (options.checkpoint && options.checkpoint.uploadId) {
    return await this._resumeMultipart(options.checkpoint, options);
  }

  const minPartSize = 100 * 1024;
  if (!options.mime) {
    if (isFile(file)) {
      options.mime = mime.getType(path.extname(file.name));
    } else if (isBuffer(file)) {
      options.mime = '';
    } else {
      options.mime = mime.getType(path.extname(file));
    }
  }
  options.headers = options.headers || {};
  this._convertMetaToHeaders(options.meta, options.headers);

  const fileSize = await this._getFileSize(file);
  if (fileSize < minPartSize) {
    options.contentLength = fileSize;
    const result = await this.put(name, file, options);
    if (options && options.progress) {
      await options.progress(1);
    }

    const ret = {
      res: result.res,
      bucket: this.options.bucket,
      name,
      etag: result.res.headers.etag
    };

    if ((options.headers && options.headers['x-oss-callback']) || options.callback) {
      ret.data = result.data;
    }

    return ret;
  }

  if (options.partSize && !(parseInt(options.partSize, 10) === options.partSize)) {
    throw new Error('partSize must be int number');
  }

  if (options.partSize && options.partSize < minPartSize) {
    throw new Error(`partSize must not be smaller than ${minPartSize}`);
  }

  const initResult = await this.initMultipartUpload(name, options);
  const { uploadId } = initResult;
  const partSize = this._getPartSize(fileSize, options.partSize);

  const checkpoint = {
    file,
    name,
    fileSize,
    partSize,
    uploadId,
    doneParts: []
  };

  if (options && options.progress) {
    await options.progress(0, checkpoint, initResult.res);
  }

  return await this._resumeMultipart(checkpoint, options);
};

/*
 * Resume multipart upload from checkpoint. The checkpoint will be
 * updated after each successful part upload.
 * @param {Object} checkpoint the checkpoint
 * @param {Object} options
 */
proto._resumeMultipart = async function _resumeMultipart(checkpoint, options) {
  const that = this;
  if (this.isCancel()) {
    throw this._makeCancelEvent();
  }
  const { file, fileSize, partSize, uploadId, doneParts, name } = checkpoint;

  const partOffs = this._divideParts(fileSize, partSize);
  const numParts = partOffs.length;
  let uploadPartJob = retry(
    (self, partNo) => {
      // eslint-disable-next-line no-async-promise-executor
      return new Promise(async (resolve, reject) => {
        try {
          if (!self.isCancel()) {
            const pi = partOffs[partNo - 1];
            const stream = await self._createStream(file, pi.start, pi.end);
            const data = {
              stream,
              size: pi.end - pi.start
            };

            if (isArray(self.multipartUploadStreams)) {
              self.multipartUploadStreams.push(data.stream);
            } else {
              self.multipartUploadStreams = [data.stream];
            }

            const removeStreamFromMultipartUploadStreams = function () {
              if (!stream.destroyed) {
                stream.destroy();
              }
              const index = self.multipartUploadStreams.indexOf(stream);
              if (index !== -1) {
                self.multipartUploadStreams.splice(index, 1);
              }
            };

            stream.on('close', removeStreamFromMultipartUploadStreams);
            stream.on('error', removeStreamFromMultipartUploadStreams);

            let result;
            try {
              result = await self._uploadPart(name, uploadId, partNo, data, options);
            } catch (error) {
              removeStreamFromMultipartUploadStreams();
              if (error.status === 404) {
                throw self._makeAbortEvent();
              }
              throw error;
            }
            if (!self.isCancel()) {
              doneParts.push({
                number: partNo,
                etag: result.res.headers.etag
              });
              checkpoint.doneParts = doneParts;

              if (options.progress) {
                await options.progress(doneParts.length / (numParts + 1), checkpoint, result.res);
              }
            }
          }
          resolve();
        } catch (err) {
          err.partNum = partNo;
          reject(err);
        }
      });
    },
    this.options.retryMax,
    {
      errorHandler: err => {
        const _errHandle = _err => {
          const statusErr = [-1, -2].includes(_err.status);
          const requestErrorRetryHandle = this.options.requestErrorRetryHandle || (() => true);
          return statusErr && requestErrorRetryHandle(_err);
        };
        return !!_errHandle(err);
      }
    }
  );

  const all = Array.from(new Array(numParts), (x, i) => i + 1);
  const done = doneParts.map(p => p.number);
  const todo = all.filter(p => done.indexOf(p) < 0);

  const defaultParallel = 5;
  const parallel = options.parallel || defaultParallel;

  if (this.checkBrowserAndVersion('Internet Explorer', '10') || parallel === 1) {
    for (let i = 0; i < todo.length; i++) {
      if (this.isCancel()) {
        throw this._makeCancelEvent();
      }
      /* eslint no-await-in-loop: [0] */
      await uploadPartJob(this, todo[i]);
    }
  } else {
    // upload in parallel
    const jobErr = await this._parallel(todo, parallel, value => {
      return new Promise((resolve, reject) => {
        uploadPartJob(that, value)
          .then(() => {
            resolve();
          })
          .catch(reject);
      });
    });

    const abortEvent = jobErr.find(err => err.name === 'abort');
    if (abortEvent) throw abortEvent;

    if (this.isCancel()) {
      uploadPartJob = null;
      throw this._makeCancelEvent();
    }

    if (jobErr && jobErr.length > 0) {
      jobErr[0].message = `Failed to upload some parts with error: ${jobErr[0].toString()} part_num: ${
        jobErr[0].partNum
      }`;
      throw jobErr[0];
    }
  }

  return await this.completeMultipartUpload(name, uploadId, doneParts, options);
};

/**
 * Get file size
 */
proto._getFileSize = async function _getFileSize(file) {
  if (isBuffer(file)) {
    return file.length;
  } else if (isFile(file)) {
    return file.size;
  } else if (is.string(file)) {
    const stat = await this._statFile(file);
    return stat.size;
  }

  throw new Error('_getFileSize requires Buffer/File/String.');
};

/*
 * Readable stream for Web File
 */
const { Readable } = require('stream');

function WebFileReadStream(file, options) {
  if (!(this instanceof WebFileReadStream)) {
    return new WebFileReadStream(file, options);
  }

  Readable.call(this, options);

  this.file = file;
  this.reader = new FileReader();
  this.start = 0;
  this.finish = false;
  this.fileBuffer = null;
}
util.inherits(WebFileReadStream, Readable);

WebFileReadStream.prototype.readFileAndPush = function readFileAndPush(size) {
  if (this.fileBuffer) {
    let pushRet = true;
    while (pushRet && this.fileBuffer && this.start < this.fileBuffer.length) {
      const { start } = this;
      let end = start + size;
      end = end > this.fileBuffer.length ? this.fileBuffer.length : end;
      this.start = end;
      pushRet = this.push(this.fileBuffer.slice(start, end));
    }
  }
};

WebFileReadStream.prototype._read = function _read(size) {
  if (
    (this.file && this.start >= this.file.size) ||
    (this.fileBuffer && this.start >= this.fileBuffer.length) ||
    this.finish ||
    (this.start === 0 && !this.file)
  ) {
    if (!this.finish) {
      this.fileBuffer = null;
      this.finish = true;
    }
    this.push(null);
    return;
  }

  const defaultReadSize = 16 * 1024;
  size = size || defaultReadSize;

  const that = this;
  this.reader.onload = function (e) {
    that.fileBuffer = Buffer.from(new Uint8Array(e.target.result));
    that.file = null;
    that.readFileAndPush(size);
  };
  this.reader.onerror = function onload(e) {
    const error = e.srcElement && e.srcElement.error;
    if (error) {
      throw error;
    }
    throw e;
  };

  if (this.start === 0) {
    this.reader.readAsArrayBuffer(this.file);
  } else {
    this.readFileAndPush(size);
  }
};

proto._createStream = function _createStream(file, start, end) {
  if (is.readableStream(file)) {
    return file;
  } else if (isFile(file)) {
    return new WebFileReadStream(file.slice(start, end));
  } else if (isBuffer(file)) {
    const iterable = file.subarray(start, end);
    // we can't use Readable.from() since it is only support in Node v10
    return new Readable({
      read() {
        this.push(iterable);
        this.push(null);
      }
    });
  } else if (is.string(file)) {
    return fs.createReadStream(file, {
      start,
      end: end - 1
    });
  }
  throw new Error('_createStream requires Buffer/File/String.');
};

proto._getPartSize = function _getPartSize(fileSize, partSize) {
  const maxNumParts = 10 * 1000;
  const defaultPartSize = 1 * 1024 * 1024;

  if (!partSize) partSize = defaultPartSize;
  const safeSize = Math.ceil(fileSize / maxNumParts);

  if (partSize < safeSize) {
    partSize = safeSize;
    console.warn(
      `partSize has been set to ${partSize}, because the partSize you provided causes partNumber to be greater than 10,000`
    );
  }
  return partSize;
};

proto._divideParts = function _divideParts(fileSize, partSize) {
  const numParts = Math.ceil(fileSize / partSize);

  const partOffs = [];
  for (let i = 0; i < numParts; i++) {
    const start = partSize * i;
    const end = Math.min(start + partSize, fileSize);

    partOffs.push({
      start,
      end
    });
  }

  return partOffs;
};