import { interval } from 'rxjs';
import { filter, map, take, takeUntil } from 'rxjs/operators';
import { clone, isEmpty, setNestedValue } from 'src/app/shared/common';
import { log } from 'src/app/shared/log';
import { CanDatabase } from 'src/app/shared/models/can-database';
import { Device } from 'src/app/shared/models/device';
import {
    IncomingPacket,
    MqttClient,
    OutgoingPacket,
} from 'src/app/shared/mqtt/mqtt';
import { DataPack } from '../../../shared/data-pack';
import {
    Datasource,
    DatasourceConfig,
    DatasourceConnectContext,
    DatasourceTopicOption,
} from '../datasource';
import { DatasourceType } from '../datasource-type';
import { SandboxDevice } from '../sandbox/device';
import { ensureLoaded } from 'src/app/services/api/cached-api';

export class DatasourceDevice extends Datasource {
    canDatabases: CanDatabase[] = [];
    client: MqttClient;
    enableTopic;

    busNames: { [key: string | number]: string | number };
    devices: { [key: string]: Device };

    get prefix(): string {
        return `mrs/d/${this.settings.device}/`;
    }

    constructor(config: DatasourceConfig) {
        super(DatasourceType.DEVICE, config);
        this.settings.device = +this.settings.device;
        if (this.settings.device) {
            this.enableTopic = `${this.prefix}resp/enable_mqtt`;
        }
    }

    build(): DatasourceConfig {
        return {
            uuid: this.uuid,
            name: this.name,
            type: this.type,
            settings: { device: this.settings.device },
        };
    }

    async connect(context: DatasourceConnectContext) {
        await ensureLoaded([context.dashboardPageService.api.mqttCommands]);
        const { busNames, devices } = context;

        this.busNames = busNames;
        this.devices = devices;

        await this.connectMqtt();
        const deviceId = this.settings.device;
        if (deviceId && this.devices[deviceId]) {
            const deviceInfo = this.devices[deviceId];

            const topicOptions: DatasourceTopicOption[] =
                context.dashboardPageService.api.mqttCommands
                    .current({ modelId: deviceInfo.modelId, sort: 'name' })
                    .map(({ name, topic }) => ({
                        direction: 'publish',
                        name: name ?? topic,
                        group: 'commands',
                        topicSegments: [topic],
                    }));
            // This is for supporting the old publish topics system.
            const oldTopicOptions: DatasourceTopicOption[] = (
                this.settings['publish_topics'] ?? ''
            )
                .split(' ')
                .filter((topic) => !['#', ''].includes(topic))
                .map((topic) => ({
                    direction: 'publish',
                    name: topic,
                    group: 'commands',
                    topicSegments: [topic],
                }));
            this.topicOptions = topicOptions.concat(oldTopicOptions);

            this.createDatasourcePath(context);
            this.sandboxDatasource.device = new SandboxDevice(
                deviceInfo,
                this.send.bind(this),
            );
        } else {
            log.error(
                'Can not connect device datasource. Device ID:',
                deviceId,
                'Devices:',
                this.devices,
            );
        }
    }

    async disconnect() {
        await super.disconnect();
        if (this.client) {
            await this.publishEnabling('0');
            this.client.disconnect();
        }
    }

    async send(topic: string, message: Uint8Array | string) {
        const packet: OutgoingPacket = { topic: this.prefix + topic };
        if (message instanceof Uint8Array) {
            packet.payload = message;
        } else {
            packet.text = message;
        }
        return await this.client.publish(packet);
    }

    // TODO: this MAJORLY needs replaced!
    async createDatasourcePath(context: DatasourceConnectContext) {
        const { dashboardPageService, busNames, devices, canDatabases } =
            context;
        const device = devices[this.settings.device];
        const canNames = dashboardPageService.canIds$.getValue() ?? {
            names: {},
            ids: {},
        };

        this.canDatabases = canDatabases.filter(
            (db) => db.modelId == device.modelId,
        );

        this.canDatabases.forEach((db: CanDatabase) => {
            db.messages.forEach((m) => {
                // save can_ids
                if (m.canId || m.canId === 0) {
                    canNames.names[m.canId] = m.descriptor;
                    canNames.ids[m.descriptor] = m.canId;
                }
                this.topicOptions.push(
                    ...m.signals.map<DatasourceTopicOption>((signal) => ({
                        direction: 'subscribe',
                        name: signal.name,
                        group: m.descriptor,
                        topicSegments: [
                            'mon',
                            db.busNumberType,
                            m.descriptor,
                            signal.name,
                        ],
                    })),
                );
            });
        });
        dashboardPageService.canIds$.next(clone(canNames));
    }

    async connectMqtt() {
        if (!this.settings.device) {
            return [];
        }
        if (this.client) {
            this.client.disconnect();
        }
        this.client = new MqttClient();
        this.client.abnormalDisconnect$.pipe(take(1)).subscribe(() => {
            log.info('Abnormal disconnect, attempting automatic reconnect');
            this.connectMqtt();
        });
        await this.client.connect({
            will: { topic: this.enableTopic, text: '0' },
        });
        await this.enableMqtt();
        this.client
            .observe(`mrs/d/${this.settings.device}/mon/#`)
            .pipe(
                takeUntil(this.disconnect$),
                filter((packet) => !!packet),
                filter(({ retain }) => !retain),
                map((packet) => this.parsePacket(packet)),
                filter((data) => data && !isEmpty(data)),
            )
            .subscribe((data) => this.data$.next(data));
    }

    private parsePacket(packet: IncomingPacket) {
        // for selection like mon/xxx | ctrl/xxx ....
        let key = packet.topic.replace(this.prefix, '');
        if (!key.startsWith('mon')) {
            return;
        }

        let rawValue;
        const parsedData = this.parseCanData(packet);
        if (parsedData && !isEmpty(parsedData)) {
            // Handle CAN data
            key = Object.keys(parsedData)[0];
            rawValue = parsedData[key];
        } else {
            // Decode the string from the payload.
            rawValue = new TextDecoder().decode(packet.payload);
        }

        let value;
        try {
            // try to parse JSON
            value = JSON.parse(rawValue);
        } catch {
            value = rawValue;
        }
        if (value == null || value == undefined) {
            // Invalid value, so don't do anything with it.
            return;
        }

        return { [key]: value };
    }

    private parseCanData({ topic, payload }: IncomingPacket) {
        // If we're going to try to extract CAN data, it needs to match the MRS topic format
        // We can accept the following topic formats:
        // mrs/d/1234/mon/0/3fa4567
        // mrs/d/1234/mon/can1/568732
        const regex =
            /mrs\/d\/([0-9]*)\/(mon|ctrl)\/([a-zA-Z]*)([0-9]*)\/([a-fA-F0-9]*)/i;
        const matches = regex.exec(topic);
        // If we didn't get a match or if the length is not 6, then the topic is invalid
        if (matches === null || matches.length !== 6) {
            return;
        }

        const action = matches[2];
        const busId = parseInt(matches[4]);
        let canId: any = matches[5];
        // The CAN ID may be in hex format, so let's check and convert it to integer
        if (isNaN(canId)) {
            canId = parseInt(canId, 16);
        }
        if (!canId) {
            return;
        }

        // Find the messages for all applicable CAN databases
        const messages = this.canDatabases
            .filter((db) => db.busNumberId == busId)
            .map((db) => db.messages)
            .flat(1);
        if (messages.length == 0) {
            return;
        }
        // Find the CAN message
        const canMessage = messages.find((m) => m.canId == canId);
        if (!canMessage) {
            return;
        }
        const messageValues = DataPack.decode(payload, canMessage);
        const data = {};
        const busName = this.busNames[busId].toString();
        const path = [action, busName, canMessage.descriptor];
        setNestedValue(data, path, messageValues);
        return data;
    }

    /**
     * The MQTT datasource needs to publish a "1" on the topic mrs/d/<deivce_id>/resp/enable_mqtt every 10 seconds.  When the dashboard is closed, or the MQTT client disconnects, it needs to publish a "0" on this topic as a LWT (last will and testament).
     */
    async enableMqtt() {
        await this.publishEnabling('1');
        interval(10000)
            .pipe(takeUntil(this.disconnect$))
            .subscribe(() => this.publishEnabling('1'));
    }

    private async publishEnabling(text: '0' | '1') {
        if (!this.enableTopic) {
            return;
        }
        await this.client.publish({ topic: this.enableTopic, text });
    }
}
