import { Observable, Subject } from 'rxjs';
import { map } from 'rxjs/operators';
import mqtt_client from 'u8-mqtt/esm/web/v4.js';
import { environment } from '../../../environments/environment';
import { getAccessToken, getUserId } from '../common';

export interface ClientOptions {
    protocol: 'wss' | 'ws';
    hostname: string;
    port: number;
}

export const defaultClientOptions: ClientOptions = {
    protocol: environment.production ? 'wss' : 'ws',
    hostname: environment.mqttUrl,
    port: environment.mqttPort,
};

export interface ConnectOptions {
    will?: OutgoingPacket;
    username?: string;
    password?: string;
}

export interface ObserveOptions {
    parseJson?: boolean;
}

export interface Packet {
    topic: string;
    payload: Uint8Array;
    retain: boolean;
}

export type IncomingPacket = Packet & {
    json?;
};

export type OutgoingPacket = Pick<Packet, 'topic'> &
    Partial<Omit<Packet, 'topic'>> & {
        text?: string;
        qos?: number;
    };

export class MqttClient {
    private _id: string;
    private _client;
    private _topics = new Set<string>();

    public abnormalDisconnect$ = new Subject();

    constructor(options: ClientOptions = defaultClientOptions) {
        this._id =
            'spoke-zone-web.' +
            getUserId() +
            '.' +
            Math.random().toString(16).substr(2, 8);
        const { protocol, hostname, port } = options;
        this._client = mqtt_client({
            on_reconnect: () => this.abnormalDisconnect$.next(),
        }).with_websock(`${protocol}://${hostname}:${port}`);
    }

    async connect(options: ConnectOptions = {}) {
        const finalOptions = {
            client_id: this._id,
            will: null,
            username: getAccessToken(),
            password: 'gib-gibbery',
            flags: { clean_start: true },
        };
        if (options.will) {
            finalOptions.will = polishOutgoingPacket(options.will);
        }
        if (options.username) {
            finalOptions.username = options.username;
        }
        if (options.password) {
            finalOptions.password = options.password;
        }
        return await this._client.connect(finalOptions);
    }

    disconnect() {
        for (const topic of this._topics) {
            this._client.unsubscribe(topic);
        }
        this._client.disconnect();
    }

    observe(
        topic: string,
        options: ObserveOptions = {}
    ): Observable<IncomingPacket> {
        return this.observeRoute(topic.replace('+', '*'), topic, options).pipe(
            map(({ packet }) => packet)
        );
    }

    observeRoute(
        route: string,
        topic: string,
        options: ObserveOptions = {}
    ): Observable<{ packet: IncomingPacket; params: Record<string, string> }> {
        options.parseJson ??= false;
        return new Observable((subscriber) => {
            this._client.on_topic(route, (pkt, params, ctx) => {
                const { topic, payload, retain } = pkt;
                let json = null;
                if (options.parseJson) {
                    try {
                        json = pkt.json();
                    } catch {
                        json = null;
                    }
                }
                const packet: IncomingPacket = { topic, payload, json, retain };
                subscriber.next({ packet, params });
            });
            this._topics.add(topic);
            this._client.subscribe(topic);
        });
    }

    async publish(packet: OutgoingPacket) {
        const { topic, payload, qos, retain } = polishOutgoingPacket(packet);
        await this._client.publish({ topic, payload, qos, retain });
    }
}

function polishOutgoingPacket(packet: OutgoingPacket) {
    packet.qos ??= 0;
    packet.retain ??= false;
    if (packet.text && packet.payload == undefined) {
        const encoder = new TextEncoder();
        packet.payload = encoder.encode(packet.text);
    }
    return packet;
}
