import { replicateRxCollection } from 'rxdb/plugins/replication';
import { reportError } from '@/features/tracker/tracker.service';
import { getAPIHeaders } from '@/services/api.service';
import { toCamelCase } from '@/services/sanitize.service';
import addToCache from '@/services/cache.service';
import { interval, Subject, throttle } from 'rxjs';
import onMessage from '@/services/socket.service';
import { setCollectionSyncStatus } from '@/state/state';
import { getCollectionKeySuffix } from '@/rxdb/database';

export default function (collection, projectId, { cacheURLsFn, pullLimit, mapFn } = {}) {
    const collectionSuffix = getCollectionKeySuffix(projectId);
    const collectionName = toCamelCase(collection.name.replace('_' + collectionSuffix, ''));
    const replicationState = replicateRxCollection({
        live: true,
        liveInterval: 0,
        retryTime: 10000,
        autoStart: false,
        collection,
        replicationIdentifier: `${collectionName}-${collectionSuffix}-replication`,
        pull: {
            async handler(latestPullDocument) {
                const minTimestamp = latestPullDocument ? latestPullDocument.updatedAt : '';
                const limitPerPull = typeof pullLimit === 'undefined' ? 1000 : pullLimit > 0 ? pullLimit : '';
                let response;
                try {
                    response = await fetch(
                        `/api/projects/${projectId}/${collectionName}?from=${minTimestamp}&limit=${limitPerPull}`,
                        {
                            headers: getAPIHeaders({ 'x-client-version': 'PATCH_VERSION' }),
                        },
                    );
                } catch (error) {
                    if (!navigator.onLine) {
                        await setCollectionSyncStatus(projectId, collectionName, { state: 'error' });
                        //ignore fetch error id offline
                        return {
                            documents: [],
                            hasMoreDocuments: false,
                        };
                    } else {
                        throw new Error('replication pull failed');
                    }
                }
                if (response && response.ok) {
                    let documents = await response.json();
                    if (cacheURLsFn) {
                        addToCache(cacheURLsFn(documents));
                    }
                    if (mapFn) {
                        documents = mapFn(collection, projectId, documents);
                    }
                    if (documents.length === 0 || documents.length !== limitPerPull) {
                        await setCollectionSyncStatus(projectId, collectionName, { state: 'ok' });
                    }
                    return {
                        documents: documents.map((document) => ({
                            ...document,
                            rootVersion: document.updatedAt,
                            _deleted: !!document.deletedAt,
                        })),
                        hasMoreDocuments: documents.length === limitPerPull,
                    };
                } else if (
                    response.status === 401 ||
                    response.status === 403 ||
                    response.status === 429 ||
                    response.status === 404
                ) {
                    await setCollectionSyncStatus(projectId, collectionName, { state: 'error' });
                    if (response.status === 401 && window.location.pathname !== 'login') {
                        window.location = '/login';
                    }
                    return {
                        documents: [],
                        hasMoreDocuments: false,
                    };
                } else {
                    throw new Error('replication pull failed');
                }
            },
        },
        push: {
            async handler(docs) {
                if (
                    docs.length > 0 &&
                    (!docs[0].rootVersion || (docs[0].rootVersion && docs[0].updatedAt !== docs[0].rootVersion))
                ) {
                    const response = await fetch(`/api/projects/${projectId}/${collectionName}/${docs[0].id}`, {
                        method: 'PUT',
                        headers: {
                            Accept: 'application/json',
                            'Content-Type': 'application/json',
                        },
                        body: JSON.stringify(docs[0]),
                    });
                    if (!(response.ok || response.status === 403)) {
                        // do not throw error if ok ou if 401 : drop pushed patchs when not allowed
                        throw new Error(await response.text());
                    }
                }
            },
            /**
             * Batch size, optional
             * Defines how many documents will be given to the push handler at once.
             */
            batchSize: 1,
        },
    });
    replicationState.error$.subscribe((error) => {
        if (navigator.onLine) {
            reportError(collectionName + 'Replication_' + projectId + ' ' + error.message);
            console.error(error);
        }
        setCollectionSyncStatus(projectId, collectionName, { state: 'error' });
    });
    replicationState.active$.subscribe((bool) => {
        if (bool) {
            setCollectionSyncStatus(projectId, collectionName, { state: 'pending' });
        }
    });
    const socketObservable = new Subject();
    onMessage(collectionName + '_' + projectId, (params) => {
        if (params.updatedBy && localStorage.getItem('userId')) {
            // ignore my own changes
            return;
        }
        socketObservable.next(event);
    });
    socketObservable.pipe(throttle(() => interval(200))).subscribe(() => replicationState.run());
    return replicationState;
}
