import { Socket } from "./core/socket";
import { StreamingCachePrintType } from "../dataflow/streamingCache";
import {
  SchemaType,
  schemaTypeToJSON
} from "../../gen/schema/schema_path/schema_path";
import { Stream, StreamReducer, StreamTransformer } from "./core/stream";
import {
  Message,
  MonitoredObjects
} from "../../gen/schema/syncfollow/syncfollow";
import { Observable } from "./core/observable";

type EVENT = "prune";
export abstract class StreamManager extends Observable<object> {
  static PRUNE: EVENT = "prune";
  protected abstract sock: Socket;
  protected streams: Map<string, Stream<Message, Message>> = new Map();
  private url?: string;

  constructor() {
    super();
    this.addEvents([StreamManager.PRUNE]);
  }

  abstract handleMessage(m: MessageEvent<string>): void;
  abstract handleReconnect(): void;

  abstract addStream<TMessage extends Message, TResponse>(
    message: TMessage,
    transformer: StreamTransformer<TResponse>,
    reducer: StreamReducer<TResponse>
  ): Stream<Message, Message> | undefined;

  init(url: string) {
    if (url) {
      this.sock.init(url);
      this.sock?.on(Socket.MESSAGE, this.handleMessage, this);
      this.sock?.on(Socket.RECONNECT, this.handleReconnect, this);
      this.url = url;
    }
  }

  close() {
    this.sock?.close();
  }

  isConnected(): boolean {
    return this.sock?.connected ?? false;
  }

  /**
   * Methods for Dev tools
   */
  getStreams = (): Stream<Message, Message>[] => {
    return Array.from(this.streams.values());
  };

  getStream(id: string): Stream<Message, Message> | undefined {
    return this.streams.get(id);
  }
  // show active channels and what data is currently being monitored
  showContent = (): StreamingCachePrintType[] => {
    return this.getStreams().map((s: Stream<Message, Message>) => {
      const messages = s.count;
      const req = s.getRequest();

      const requestObject: MonitoredObjects = req
        ?.objects[0] as unknown as MonitoredObjects;

      const schemaType = schemaTypeToJSON(
        requestObject?.types?.[0] || SchemaType.UNRECOGNIZED
      );

      const o: StreamingCachePrintType = {
        url: this.url || "",
        requestObject,
        schemaType,
        messages,
        observers: s.followers(),
        lastMessage: s.lastMessage,
        data: s.data,
        fabricId: requestObject?.fabricId?.[0],
        switchId: requestObject?.switchId?.[0],
        objectId: requestObject?.objectId?.[0]
      };
      return o;
    });
  };
}
