import { Injectable } from '@angular/core';
import { io } from 'socket.io-client';
import * as moment from 'moment';
import { EasternTimeZoneName, ExchangeCountriesCodes, isNullOrUndefined } from '@const';
import { UserDataService } from '@s/user-data.service';
import { MarketTimeService } from '@s/market-time.service';
import { environment } from '@env/environment';

interface ISymbolData {
  symbol: any,
  subscriptionIds: string[]
}

@Injectable({
  providedIn: 'root'
})
export class StreamingService {
  private socket: any = null;
  private symbols: Map<string, ISymbolData> = new Map<string, ISymbolData>();
  private processedData: any = {};
  private handlers: any = {};
  private onBeforeHandlerCallbacks: any = {};

  constructor(
    private userDataService: UserDataService,
    private marketTimeService: MarketTimeService,
  ) {
    const token = this.userDataService.getAuthToken();
    this.socket = io(environment.WsBaseUrl, {
      transports: ['websocket'],
      auth: { token }
    });

    this.socket.on('connect', () => {
      const subscribeData = [];

      for (const symbolData of Object.values(this.symbols)) {
        const { symbol, subscriptionIds } = symbolData;
        const key = this.getSymbolKey(symbol.symbol, symbol.country_code);
        const processedData = this.processedData[key];
        const handlers = [];

        for (const subscriptionId of subscriptionIds) {
          handlers.push({
            subscriptionId,
            handler: this.handlers[subscriptionId],
            onBeforeHandlerCallback: this.onBeforeHandlerCallbacks[subscriptionId]
          });
        }

        subscribeData.push({
          symbol,
          processedData,
          handlers
        });
      }

      this.symbols = new Map<string, ISymbolData>();
      for (const subscribe of subscribeData) {
        const { symbol, processedData, handlers } = subscribe;

        for (const handlerData of handlers) {
          const { subscriptionId, handler, onBeforeHandlerCallback } = handlerData;
          this.subscribe(subscriptionId, symbol, processedData, handler, onBeforeHandlerCallback);
        }
      }
    });
  }

  subscribe(subscriptionId: string, symbol: any, processedData: any, handler: any, onBeforeHandlerCallback: any = null) {
    const key = this.getSymbolKey(symbol.symbol, symbol.country_code);
    const needSubscribeToSymbol = !this.symbols[key];

    if (needSubscribeToSymbol) {
      this.symbols[key] = { symbol: { ...symbol }, subscriptionIds: [subscriptionId] };
      this.socket.emit('pxo_live_data_subscribe', this.symbols[key].symbol);
    }

    if (!this.symbols[key].subscriptionIds.some((x: string) => x === subscriptionId)) {
      this.symbols[key].subscriptionIds.push(subscriptionId);
    }

    this.handlers[subscriptionId] = handler;
    this.onBeforeHandlerCallbacks[subscriptionId] = onBeforeHandlerCallback;
    this.processedData[key] = { ...processedData };

    this.socket.on('pxo_live_data', async (data: any) => {
      if (!data.symbol || !data.country) {
        return;
      }

      const dataKey = this.getSymbolKey(data.symbol, data.country);
      await this._handleOnBeforeHandlerCallbacks(dataKey, data);

      if (isNullOrUndefined(data.high) || isNullOrUndefined(data.low) || isNullOrUndefined(data.open) || isNullOrUndefined(data.volume)) {
        return;
      }

      if (!this.symbols[dataKey] || !this.processedData[dataKey]) {
        return;
      }

      if (this.symbols[dataKey].symbol?.country_code !== ExchangeCountriesCodes.CC && this.marketTimeService.isBeforePreMarket()) {
        return;
      }

      const isProcessed = data.country === ExchangeCountriesCodes.CC
        ? this.processedData[dataKey][moment.utc(data.time).format('YYYY-MM-DD')]
        : this.processedData[dataKey][moment(data.time).tz(EasternTimeZoneName).format('YYYY-MM-DD')];

      if (isProcessed && !this.marketTimeService.isPrePostMarketTime()) {
        return;
      }

      for (const subscriptionId of this.symbols[dataKey].subscriptionIds) {
        if (this.handlers[subscriptionId]) {
          this.handlers[subscriptionId](data);
        }
      }
    });

    this.socket.emit('pxo_live_data_ping', this.symbols[key].symbol);
  }

  unsubscribe(subscriptionId: string, symbol: any) {
    const key = this.getSymbolKey(symbol.symbol, symbol.country_code);

    if (this.symbols[key]) {
      this.symbols[key].subscriptionIds = this.symbols[key].subscriptionIds.filter((x: string) => x !== subscriptionId);

      if (!this.symbols[key].subscriptionIds.length) {
        this.socket.emit('pxo_live_data_unsubscribe', this.symbols[key].symbol);
        delete this.symbols[key];
        delete this.processedData[key];
      }
    }

    if (this.onBeforeHandlerCallbacks[subscriptionId]) {
      delete this.onBeforeHandlerCallbacks[subscriptionId];
    }

    if (this.handlers[subscriptionId]) {
      delete this.handlers[subscriptionId];
    }
  }

  private getSymbolKey = (symbol: string, country: string) => `${symbol}_${country || 'N/A'}`;

  async _handleOnBeforeHandlerCallbacks(dataKey: string, data: any) {
    if (!this.symbols[dataKey]?.subscriptionIds) {
      return;
    }

    for (const subscriptionId of this.symbols[dataKey]?.subscriptionIds) {
      if (this.onBeforeHandlerCallbacks[subscriptionId]) {
        await this.onBeforeHandlerCallbacks[subscriptionId](data);
      }
    }
  }
}
