import { AbortError } from './errors';
import { fetchStreamMessagesFactory as messagesFactory } from './message-factory';
import { isFetchStreamChunkMessage, isFetchStreamErrorMessage, isFetchStreamResponseMessage } from './messages';
import { FetchStreamMessage, IFetchStreamMessageRequestPayload, IFetchStreamMessageResponsePayload } from './types';

const SERVER_RESPONSE_TIMEOUT = 30_000;

export class StreamEventsHandler {
    private readonly port: chrome.runtime.Port;
    private readonly signal: AbortSignal | null | undefined;
    private streamController: ReadableStreamDefaultController | null = null;
    private stream: ReadableStream | null = null;
    private streamClosed: boolean = true;
    private streamId: string | null = null;
    private timeoutTimerId: NodeJS.Timeout | undefined = undefined;

    constructor(port: chrome.runtime.Port, signal?: AbortSignal | null) {
        this.port = port;
        this.signal = signal;

        if (signal) {
            signal.addEventListener('abort', this.abortStream);
        }
    }

    public getStream(): ReadableStream {
        if (this.stream) {
            return this.stream;
        }

        this.stream = new ReadableStream({
            start: this.startStream,
            cancel: this.stopStream,
        });
        this.streamClosed = false;

        return this.stream;
    }

    private readonly startStream: UnderlyingDefaultSource['start'] = (controller) => {
        this.streamController = controller;
        this.port.onMessage.addListener(this.onMessage);
        this.setResponseTimeoutTimer();
    };

    private readonly stopStream: UnderlyingSourceCancelCallback = (reason) => {
        this.clearResponseTimout();
        this.port.onMessage.removeListener(this.onMessage);
        this.port.disconnect();

        if (!this.streamController || this.streamClosed) {
            return;
        }

        try {
            this.streamController.close();
        } catch {}
        this.streamClosed = true;
    };

    private sendMessage(message: FetchStreamMessage) {
        // avoid sending messages to disconnected port
        try {
            this.port.postMessage(message);
        } catch {}
    }

    public async initStream(payload: IFetchStreamMessageRequestPayload): Promise<IFetchStreamMessageResponsePayload> {
        const { streamId } = payload;
        this.streamId = streamId;

        if (this.signal?.aborted) {
            return Promise.reject(new AbortError(`Aborting background fetch request with requestId: ${streamId}`));
        }

        const requestMessage = messagesFactory.createRequestMessage(payload);

        return new Promise((resolve, reject) => {
            let initialTimeoutTimerId: NodeJS.Timeout | undefined = undefined;

            const onResponse = (message: unknown) => {
                clearTimeout(initialTimeoutTimerId);

                if (isFetchStreamResponseMessage(message)) {
                    resolve(message);
                }

                if (isFetchStreamErrorMessage(message)) {
                    reject(new Error(message.options.statusText));
                }

                this.port.onMessage.removeListener(onResponse);
            };

            this.port.onMessage.addListener(onResponse);
            this.sendMessage(requestMessage);

            initialTimeoutTimerId = setTimeout(() => {
                this.stopStream();
                reject(new Error('Server response timeout'));
            }, SERVER_RESPONSE_TIMEOUT);
        });
    }

    private readonly abortStream = () => {
        if (this.signal) {
            this.signal.removeEventListener('abort', this.abortStream);
        }

        this.sendMessage(
            messagesFactory.createAbortMessage({
                streamId: this.streamId ?? '',
            })
        );
        this.stopStream();
    };

    private readonly onMessage = (message: unknown) => {
        if (!this.streamController) {
            return;
        }

        this.setResponseTimeoutTimer();

        if (isFetchStreamErrorMessage(message)) {
            if (!this.streamClosed) {
                this.streamController.error(new Error('Stream error'));
            }

            this.stopStream();
        }

        if (isFetchStreamChunkMessage(message)) {
            const { value, done } = message;

            if (done) {
                this.stopStream();
            } else {
                const uint8array = Uint8Array.from((value ?? '').split(',').map(Number));

                this.streamController.enqueue(uint8array);
            }
        }
    };

    private setResponseTimeoutTimer() {
        this.clearResponseTimout();

        this.timeoutTimerId = setTimeout(() => {
            if (this.streamController && !this.streamClosed) {
                this.streamController.error(new Error('Server response timeout'));
            }

            this.stopStream();
        }, SERVER_RESPONSE_TIMEOUT);
    }

    private clearResponseTimout() {
        clearTimeout(this.timeoutTimerId);
    }
}
