import { Pipe, PipeTransform } from '@angular/core';
import { Observable, of } from 'rxjs';
import { catchError, map, pairwise, startWith, takeUntil } from 'rxjs/operators';
import { WebSocketService } from '@shared/helpers/websocket.service';
import { BaseObject } from '@shared/base/base-object';
import { isNotNull } from '@shared/base/core';

type PriceChange = 'success' | 'danger';

interface RecalculatedSymbolInstrument {
  symbol: string;
  micCode: string;
  price: number;
}

interface PriceByMicCodeSocketData {
  price: number;
  type: PriceChange;
}
@Pipe({
  name: 'getInstrumentPriceByMicCode',
})
export class GetInstrumentPriceByMicCodePipe extends BaseObject implements PipeTransform {
  constructor(private ws: WebSocketService) {
    super();
  }

  public transform(
    lastPrice: number,
    symbol: string,
    micCode: string,
  ): Observable<PriceByMicCodeSocketData> {
    const defaultInstrument: RecalculatedSymbolInstrument = {
      symbol: symbol,
      micCode: micCode,
      price: lastPrice,
    };

    return this.getRecalculatedInstrument(symbol, micCode, [this.destroy$]).pipe(
      startWith(defaultInstrument),
      startWith(defaultInstrument),
      pairwise(),
      map(([previousData, currentData]) => {
        let type: PriceChange = null;
        let previousValue: RecalculatedSymbolInstrument;

        if (isNotNull(currentData?.price)) {
          if (isNotNull(previousData?.price)) {
            previousValue = previousData;
          } else {
            previousValue = defaultInstrument;
          }

          if (currentData.price > previousValue.price) {
            type = 'success';
          }

          if (currentData.price < previousValue.price) {
            type = 'danger';
          }

          return { price: currentData.price, type: type };
        } else if (isNotNull(previousData?.price)) {
          return { price: previousData.price, type: type };
        }

        return { price: lastPrice, type: type };
      }),
      catchError(() => of({ price: lastPrice, type: null })),
      takeUntil(this.destroy$),
    );
  }

  private getRecalculatedInstrument(
    symbol: string,
    micCode: string,
    stopListenEvents: Observable<unknown>[],
  ): Observable<RecalculatedSymbolInstrument> {
    const headers = {
      symbol,
      micCode,
    };

    return this.ws.listenChannel<RecalculatedSymbolInstrument>(
      `/topic/instruments.symb.${symbol}.${micCode}.price`,
      stopListenEvents,
      headers,
    );
  }
}
