import { Subject, buffer, fromEvent, interval, map, withLatestFrom } from "rxjs";
import type { Level, LogEvent } from "pino";
import { errWithCause } from "pino-std-serializers";
import { configure } from "safe-stable-stringify";
import { mergeMap, mergeWith, skipUntil } from "rxjs/operators";

const LOG_BATCH_MAX_SIZE = 10;
const LOG_BATCH_INTERVAL = 5000;

export interface LogToServerProps {
    accessToken: string;
    collectorUrl: string;
}

const logToServerProps$ = new Subject<LogToServerProps>();

const flush$ = interval(LOG_BATCH_INTERVAL).pipe(
    mergeWith(fromEvent(window, "beforeunload")),
    mergeWith(fromEvent(window, "pagehide")),
    mergeWith(fromEvent(document, "visibilitychange")),
    // Don't emit until authorized so we can send events to the backend
    skipUntil(logToServerProps$),
);

const logs$ = new Subject<LogEvent>();
logs$
    .pipe(
        // Aggregate all the log events in time interval or until window event
        buffer(flush$),
        // Split log events batches if the size is too big
        mergeMap((array) => {
            const chunks: LogEvent[][] = [];
            for (let i = 0; i < array.length; i += LOG_BATCH_MAX_SIZE) {
                const chunk = array.slice(i, i + LOG_BATCH_MAX_SIZE);
                chunks.push(chunk);
            }
            return chunks;
        }),
        // Receive access token to be able to send
        withLatestFrom(logToServerProps$),
        // Send log batches
        map(async ([logRecords, { accessToken, collectorUrl }]) => {
            if (!logRecords.length) return;
            await sendLogs(logRecords, accessToken, collectorUrl);
        }),
    )
    // We need this to make Observable to work
    .subscribe({
        error: (error) => {
            console.error("==== ERROR in Log Event sending Observable ====", error);
        },
    });

const safeStringify = configure({
    deterministic: false,
    maximumBreadth: 30,
    maximumDepth: 10,
    strict: false,
});

function jsonReplacer(key: string, value: unknown) {
    // Twilio data has a LOT of private members that may hold sensitive information like our auth token
    // These are all prefixed with an underscore, so we can filter them out here
    if (key.startsWith("_")) return undefined;

    // https://github.com/pinojs/pino/issues/2132
    if (value instanceof Error) return errWithCause(value);

    return value;
}

async function gzip(string: string): Promise<Uint8Array> {
    const stream = new Blob([string]).stream().pipeThrough(new CompressionStream("gzip"));
    const arrayBuffer = await new Response(stream).arrayBuffer();
    return new Uint8Array(arrayBuffer);
}

async function sendLogs(logRecords: LogEvent[], accessToken: string, logCollectorUrl: string) {
    const body = safeStringify(logRecords, jsonReplacer);
    if (!body) return;

    await fetch(logCollectorUrl, {
        method: "POST",
        headers: {
            Accept: "application/json",
            "Content-Type": "application/json",
            "Content-Encoding": "gzip",
            Authorization: `Bearer ${accessToken}`,
        },
        keepalive: true,
        body: await gzip(body),
    });
}

export function initLoggingTransmission(props: { accessToken: string; collectorUrl: string }) {
    logToServerProps$.next(props);
}

export function send(_level: Level, logEvent: LogEvent) {
    logs$.next(logEvent);
}
