import { useQueries, UseQueryResult } from '@tanstack/react-query';
import { useSeinoAuth } from './use-seino-auth';
import { useContext, useMemo } from 'react';
import { DuckContext, DuckContextProps } from '../providers/duck';
import { DataModelField, DataModelSchema } from '../types/datamodel/schema';
import * as Sentry from '@sentry/react';
import { useActiveWorkspaceId } from '../providers/useActiveWorkspaceId.ts';

export type DuckDataset = {
    id: string;
    tableId: string;
    workspaceId: string;
    schema: DataModelSchema;
};

const PROGRESS_INTERVAL = 1024 * 100;

function fetchDataset(
    duck: DuckContextProps | null,
    fetchToken: () => Promise<string>,
    workspaceId: string,
    dataset: string,
    onProgress: (progress: number) => void = () => {}
) {
    const fetchSchema = !dataset.includes('engagement');

    return async (): Promise<DuckDataset> => {
        if (!duck) {
            throw new Error('DuckDB is not initialized');
        }

        const token = await fetchToken();
        const filename = `workspaces/${workspaceId}/datasets/${dataset}`;
        const url = `${import.meta.env.VITE_API_RS_URL}/${filename}`;

        const startDownload = performance.now();
        const response = await fetch(url, {
            headers: { Authorization: `Bearer ${token}` },
        });

        if (!response.body) {
            throw new Error(`Body for dataset ${dataset} is empty`);
        }

        const length = +(response.headers.get('x-content-length') ?? 0);
        const buffer = length
            ? await fetchWithProgress(length, response.body, onProgress)
            : await fetchWithoutProgress(response);

        Sentry.metrics.timing(
            'duckdb_dataset_download',
            performance.now() - startDownload,
            'millisecond',
            { tags: { workspace_id: workspaceId, dataset } }
        );

        const tableId = `"${workspaceId}"."${dataset}"`;

        await duck.db.registerFileBuffer(filename, buffer);
        await duck.connection.query(
            `create schema if not exists "${workspaceId}"`
        );

        if (!fetchSchema) {
            await duck.connection.query(
                `create or replace view ${tableId} as select * from read_parquet('${filename}')`
            );

            return {
                id: dataset,
                tableId,
                workspaceId,
                schema: { dateField: 'date', fields: [] },
            };
        }

        const schemaResponse = await fetch(`${url}?schema=true`, {
            headers: { Authorization: `Bearer ${token}` },
        });

        const columnNames: string[] = (
            await duck.connection.query(
                `describe select * from read_parquet('${filename}')`
            )
        )
            .toArray()
            .map((row: { column_name: string }) => row['column_name']);

        // todo: parse incoming JSON
        const schema: DataModelSchema = filterMissingFields(
            await schemaResponse.json(),
            dataset,
            columnNames
        );

        const extraFields =
            schema.fields?.filter(
                f =>
                    f.dataSourceType === 'virtual' &&
                    f.agg?.type !== 'weighted_avg'
            ) || [];

        const extraFieldsSql =
            extraFields.length > 0
                ? `, ${extraFields.map(f => `${f.sql} as ${f.id}`).join(',\n')}`
                : '';

        await duck.connection.query(
            `create or replace view ${tableId} as select * ${extraFieldsSql} from read_parquet('${filename}')`
        );

        return { id: dataset, tableId, workspaceId, schema };
    };
}

export const useDuckDataset = (
    dataset: string
): UseQueryResult<DuckDataset> => {
    const datasets = useMemo(() => [dataset], [dataset]);

    return useDuckDatasets({ datasets })[0];
};

type UseDuckDatasetsProps = {
    datasets: string[];
};

const useDuckDatasets = ({
    datasets,
}: UseDuckDatasetsProps): UseQueryResult<DuckDataset>[] => {
    const duck = useContext(DuckContext);
    const { fetchToken } = useSeinoAuth();
    const workspaceId = useActiveWorkspaceId();

    const queries = useMemo(
        () =>
            datasets.map(dataset => ({
                queryKey: ['duckdb', workspaceId, dataset],
                enabled: !!duck,
                staleTime: 30_000,
                queryFn: fetchDataset(
                    duck,
                    fetchToken,
                    workspaceId,
                    dataset,
                    progress =>
                        duck?.setProgress(workspaceId, dataset, progress)
                ),
            })),
        [datasets, workspaceId, duck, fetchToken]
    );

    return useQueries({
        queries,
    });
};

async function fetchWithProgress(
    length: number,
    body: ReadableStream<Uint8Array>,
    onProgress: (progress: number) => void
) {
    const buffer = new Uint8Array(length);
    const reader = body.getReader();
    let received = 0;
    let next_progress = PROGRESS_INTERVAL;

    // todo: 'true' results in TS error, but this is not much better.
    while (received >= 0) {
        const { done, value } = await reader.read();

        if (done) {
            break;
        }

        buffer.set(value, received);
        received += value.length;

        if (received > next_progress) {
            next_progress += PROGRESS_INTERVAL;
            onProgress((100 * received) / +length);
        }
    }
    return buffer;
}

async function fetchWithoutProgress(response: Response) {
    return new Uint8Array(await response.arrayBuffer());
}

const filterMissingFields = (
    schema: DataModelSchema,
    dataset: string,
    columnNames: string[]
): DataModelSchema => {
    const schemaFieldNames = schema.fields.map(f => f.id);

    const fieldIncluded = (f: DataModelField) => {
        if (f.dataSourceType === 'virtual') {
            return true;
        }

        if (f.agg?.type === 'weighted_avg') {
            return (
                columnNames.includes(f.agg.numerator) &&
                schemaFieldNames.includes(f.agg.numerator) &&
                columnNames.includes(f.agg.denominator) &&
                schemaFieldNames.includes(f.agg.denominator)
            );
        }

        return columnNames.includes(f.id);
    };

    return {
        ...schema,
        fields: schema.fields.filter(f => {
            const result = fieldIncluded(f);
            if (!result) {
                console.warn(
                    `Removing field ${f.id} because it is missing in ${dataset}`
                );
            }
            return result;
        }),
    };
};
