import _curryN from "lodash/fp/curryN.js";
import _get from "lodash/fp/get.js";
import _isEmpty from "lodash/fp/isEmpty.js";
import _isFunction from "lodash/fp/isFunction.js";
import _omit from "lodash/fp/omit.js";
import crossFetch from "cross-fetch";

import {ServiceSocketError} from "../error/service_socket_error.js";
import {JobError} from "../error/job_error.js";
import {websocket} from "../service/websocket.js";
import Utility from "../utility/index.js";

const debug = Utility.debug("aqumen:sdk:job:dispatch");

const LOCAL_PROPERTIES = [
  "__typename", "download", "log", "progress", "scratch"
];

const WS_OPEN = 1;

export const dispatch = _curryN(2, (session, spec, socket) => {
  const handlers = [];
  const tempSocket = (socket) ? null : websocket(session);
  const ws = (socket || tempSocket);

  return new Promise((resolve, reject) => {
    const callbacks = [];
    const handleError = (event) => {
      const ev = event?.event || event;
      debug("WebSocket error event", ev);

      reject(new ServiceSocketError(ev));
    };
    const handleMessage = (event) => {
      const message = JSON.parse(event.data);
      debug("WebSocket message: %j", message);
      if (!_isEmpty(message.errors)) {
        const je = new JobError(message.errors);
        return reject(je);
      }

      const artifact = _get("data.artifact", message);
      if (artifact) {
        if (artifact.job.id === spec.id && artifact.direction == "output") {
          if (artifact.type === "log" && _isFunction(spec.log)) {
            let resolve;
            callbacks.push(new Promise((r) => resolve = r));
            crossFetch(artifact.url).then((r) => r.text()).then((l) => {
              spec.log(l);
              resolve();
            });
          }
          if (_isFunction(spec.download)) {
            let resolve;
            callbacks.push(new Promise((r) => resolve = r));
            crossFetch(artifact.url).then(async (response) => {
              spec.download(
                artifact.type,
                artifact.name,
                (response.body) ? response.body : (await response.arrayBuffer()),
                response.headers.get("Content-Length"),
                response.headers.get("Content-Type")
              );
              resolve();
            });
          }
        }
        return;
      }

      const job = _get("data.job", message);
      if (job) {
        if (job.id === spec.id) {
          if (_isFunction(spec.progress)) {
            spec.progress(job);
          }
          if (!_isEmpty(job.errors)) {
            return reject(new JobError(job.errors));
          } else if (job.exitedAt) {
            return Promise.all(callbacks).then(() => resolve(job));
          }
        }
        return;
      }
    };

    const payload = _omit(LOCAL_PROPERTIES, spec);
    const sendSpec = () => {
      debug("Sending job spec: %O", payload);
      ws.send(JSON.stringify(payload));
    };
    let sendNow ;
    if (ws.readyState === WS_OPEN) {
      sendNow = true;
    } else {
      handlers.push({e: "open", f: sendSpec});
      sendNow = false;
    }
    handlers.push({e: "error", f: handleError});
    handlers.push({e: "message", f: handleMessage});
    handlers.forEach((h) => ws.addEventListener(h.e, h.f));
    if (sendNow) {
      sendSpec();
    }
  }).finally(() => {
    handlers.forEach((h) => ws.removeEventListener(h.e, h.f));
    if (tempSocket) {
      tempSocket.close();
    }
  });
});
