import { useEffect } from "react";
import { useSelector } from "react-redux";
import { Subject, bufferTime } from "rxjs";
import { PubSub } from "aws-amplify";

import { useInterval } from "utils";
import { MQTT_KEEPALIVE_TOPIC, MQTT_BROADCAST_TOPIC } from "config";
import { KEEP_ALIVE_INTERVAL, BUFFER_EMPTY_INTERVAL } from "utils/defines";
import { handlePipedTelemetry, handlePipedEvents } from "../utils/unitsUtils";

/**
 * How this works:
 * 1) Subscribe to IoT Broker begins connection
 * 2) Whenever a new message is received, the payload is run through `addDataToMqttBuffer`
 *    - The dataPoint or event payload is buffered in telemetryHandler$ or eventMessageHandler$ respectively
 *    - This buffering prevents unecessary re-renders
 * 3) telemetryMessageHandler$ or eventMessageHandler$ releases the buffer every INTERVAL ms
 *    - handlePipedTelemetry/handlePipedEvents calls dispatchSetDataHistory/dispatchStoreEvents respectively
 */

/**
 * Event Receivers / Buffers
 */
// create subject instance as an event receiver
const telemetryMessageHandler$ = new Subject();
const eventMessageHandler$ = new Subject();
// configure receivers to release buffers every INTERVAL ms
const telemetryOutput$ = telemetryMessageHandler$.pipe(
  bufferTime(BUFFER_EMPTY_INTERVAL)
);
const eventOutput$ = eventMessageHandler$.pipe(
  bufferTime(BUFFER_EMPTY_INTERVAL)
);
// configure buffers with appropriate callbacks
telemetryOutput$.subscribe(handlePipedTelemetry);
eventOutput$.subscribe(handlePipedEvents);

/**
 * Callbacks for addDataToMqttBuffer
 * which add telemetry or event data to respective handlers/buffers
 */

const pipeDataToTelemetryHandler = (dataPoints) => {
  const mappedDataArray = dataPoints.map((dataPoint) => ({
    product_id: dataPoint.product_id,
    product_sn_id: dataPoint.product_sn_id,
    data_value: dataPoint.signalValue,
    signal_id: dataPoint.signalId,
    timestamp_recorded: parseInt(dataPoint.d_tsr),
  }));
  telemetryMessageHandler$.next(mappedDataArray);
};

// @todo: implement lossless buffering (vs lossy buffering)
export const useLiveMQTTData = () => {
  // create subscription object
  let mqttSubscription;

  const pipeEventsToEventHandler = (records) => {
    eventMessageHandler$.next(records);
  };

  const isLive = useSelector((state) => state.productSNData.isLive);
  // buffer incoming data to prevent extraneous rerenders
  const addDataToMqttBuffer = (data) => {
    const payload = data.value;
    if (payload.type === "events") {
      // event data
      pipeEventsToEventHandler(payload.records);
    } else {
      // telemetry data
      pipeDataToTelemetryHandler(payload);
    }
  };

  // ping the iot broker to prevent disconnect timeout
  const keepMQTTAlive = () => {
    // reattempt connection
    subscribeToMqtt();

    // keep-alive ping
    PubSub.publish(MQTT_KEEPALIVE_TOPIC, "{}");
  };

  // connect to MQTT
  const subscribeToMqtt = () => {
    if (mqttSubscription) return;
    mqttSubscription = PubSub.subscribe(MQTT_BROADCAST_TOPIC).subscribe({
      next: addDataToMqttBuffer,
      error: (error) => console.error(error),
      complete: () => console.log("Done"),
    });
  };

  // disconnect
  const unsubscribeFromMqtt = () => {
    if (mqttSubscription) {
      mqttSubscription.unsubscribe();
      mqttSubscription = null;
    }
  };

  /** MQTT buffer */

  useEffect(() => {
    if (isLive) {
      subscribeToMqtt();
    } else {
      unsubscribeFromMqtt();
    }
    return () => {
      unsubscribeFromMqtt();
    };
  }, [isLive]);

  useInterval(() => {
    if (isLive) keepMQTTAlive();
  }, KEEP_ALIVE_INTERVAL);

  useEffect(() => {
    const telemetrySub = telemetryOutput$.subscribe(handlePipedTelemetry);
    const eventSub = eventOutput$.subscribe(handlePipedEvents);

    return () => {
      telemetrySub.unsubscribe();
      eventSub.unsubscribe();
    };
  }, []);

  return [isLive];
};
