import {webSocket, WebSocketSubject} from "rxjs/webSocket";
import {EMPTY, Observable} from "rxjs";
import {catchError, repeat, retry, tap} from "rxjs/operators";
import {CampaignProcessingProgress} from "./campaign-processing-progress-socket";

export class ProgressSocket<T> {
  private socket: WebSocketSubject<any>;
  private readonly url: string;

  private RECONNECT_INTERVAL = 2000;

  constructor(private socketPath: string, private accessToken: string) {
    // set ws protocol when using http and wss when using https
    const protocol = window.location.protocol.replace('http', 'ws');
    const host = window.location.host;
    this.url = `${protocol}//${host}/ws/${socketPath}`;
  }

  public connect(): Observable<T> {
    if (!this.socket || this.socket.closed) {
      this.socket = this.getNewWebSocket();
    }
    return this.socket.pipe(
      retry({ delay: this.RECONNECT_INTERVAL }),
      repeat({ delay: this.RECONNECT_INTERVAL }),
      tap({
        error: (error) => {
          if (error.name === 'ObjectUnsubscribedError') {
            return;
          }
          console.log('ProgressSocket error', {error});
        },
      }),
      catchError((_) => EMPTY)
    );
  }

  private getNewWebSocket(): WebSocketSubject<any> {
    return webSocket<any>({
      url: this.url,
      openObserver: {
        next: () => {
          this.authenticate();
        },
      },
    });
  }

  private authenticate() {
    if (this.socket && !this.socket.closed) {
      this.socket.next(this.accessToken);
    } else {
      console.warn('Failed to authenticate, Socket is not open');
    }
  }

  close() {
    this.socket.unsubscribe();
  }
}
