import { Injectable } from '@angular/core';
import { Observable, Subject, ReplaySubject } from 'rxjs';
import { AnonymousSubject } from 'rxjs/internal/Subject';
import { LoggerService } from '../services/logger.service';

declare type SubjectMapKey = string;

interface SubjectMapValue {
  subject: Subject<string>;
  socket: () => WebSocket;
}

@Injectable({
  providedIn: 'root'
})
export class SocketService {
  private _opened = new Subject<boolean>();
  private subjectMap = new Map<SubjectMapKey, SubjectMapValue>();

  reconnectionAttempts = 30;
  reconnection = true;
  reconnectionDelay = 1000;
  reconnectionDelayMax = 5000;

  get opened() {
    return this._opened.asObservable();
  }

  constructor(private logger: LoggerService) {}

  connect(url: string): Subject<string> {
    this.logger.log(`SocketService:connect() called with url=${url}`);

    if (this.subjectMap.has(url)) {
      return this.subjectMap.get(url).subject;
    }

    const value = this.createSubject(url);

    this.subjectMap.set(url, value);

    return value.subject;
  }

  disconnect(url: string, code: number, reason = '') {
    this.logger.log(`SocketService:disconnect() called with url=${url}, code: ${code}, reason: ${reason}`);

    if (!this.subjectMap.has(url)) {
      return;
    }

    const socket = this.subjectMap.get(url).socket();

    if (!socket) {
      return;
    }

    if (socket.readyState === WebSocket.OPEN) {
      socket.close(code, reason);
    }
  }

  private createSubject(url: string): SubjectMapValue {
    const replay = new ReplaySubject();

    let attempts = this.reconnectionAttempts;
    let delay = this.reconnectionDelay;

    let socket: WebSocket;

    const observable = new Observable<string>(obs => {
      const onOpen = (ev: Event) => {
        this.logger.log(
          `SocketObservable: socket opened at ${url}: ${JSON.stringify(ev)}`
        );

        this._opened.next(true);

        attempts = this.reconnectionAttempts;
        delay = this.reconnectionDelay;
      };

      const onMessage = (ev: MessageEvent) => {
        obs.next(ev.data);
      };

      const onError = (ev: Event) => {
        this.logger.log(
          `SocketObservable: socket error at ${url}: ${JSON.stringify(ev)}`
        );

        disconnect(4000, 'Socket error');
      };

      const onClose = (ev: CloseEvent) => {
        this.logger.log(
          `SocketObservable: socket closed at ${url}: ${JSON.stringify(ev)}`
        );

        this._opened.next(false);

        if (ev.code !== 1000 && this.reconnection && attempts-- > 0) {
          delay = Math.min(delay * 2, this.reconnectionDelayMax);
          setTimeout(() => connect(), delay);
        } else {
          obs.complete();
        }
      };

      const subs = replay.subscribe(data => {
        if (socket && socket.readyState === WebSocket.OPEN) {
          socket.send(JSON.stringify(data));
        }
      });

      const connect = () => {
        this.logger.log(
          `SocketObservable: connecting to ${url} (attempts remaining: ${attempts})...`
        );

        socket = new WebSocket(url);
        console.log(`created new socket: ${socket.url}`);

        socket.onopen = onOpen;
        socket.onmessage = onMessage;
        socket.onerror = onError;
        socket.onclose = onClose;
      };

      const disconnect = (code: number, reason = '') => {
        this.logger.log(`SocketObservable: disconnecting from ${url}, code: ${code}, reason: ${reason}...`);

        if (
          socket.readyState !== WebSocket.CLOSED &&
          socket.readyState !== WebSocket.CLOSING
        ) {
          socket.close(code, reason);
        }
      };

      connect();

      return () => {
        this.logger.log(
          `SocketObservable: unsubscribing at ${url}`
        );

        if (subs) {
          subs.unsubscribe();
        }

        disconnect(1000);
      };
    });

    return {
      socket: () => socket,
      subject: new AnonymousSubject<string>(replay, observable)
    };
  }
}
