/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.util.Map;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.StateMigrationException;

@Internal
public class HeapPriorityQueuesManager {
    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
    private final HeapPriorityQueueSetFactory priorityQueueSetFactory;
    private final KeyGroupRange keyGroupRange;
    private final int numberOfKeyGroups;

    public HeapPriorityQueuesManager(Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, HeapPriorityQueueSetFactory priorityQueueSetFactory, KeyGroupRange keyGroupRange, int numberOfKeyGroups) {
        this.registeredPQStates = registeredPQStates;
        this.priorityQueueSetFactory = priorityQueueSetFactory;
        this.keyGroupRange = keyGroupRange;
        this.numberOfKeyGroups = numberOfKeyGroups;
    }

    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> createOrUpdate(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        return this.createOrUpdate(stateName, byteOrderedElementSerializer, false);
    }

    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> createOrUpdate(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer, boolean allowFutureMetadataUpdates) {
        HeapPriorityQueueSnapshotRestoreWrapper<?> existingState = this.registeredPQStates.get(stateName);
        if (existingState != null) {
            TypeSerializerSchemaCompatibility<?> compatibilityResult = existingState.getMetaInfo().updateElementSerializer(byteOrderedElementSerializer);
            if (compatibilityResult.isIncompatible()) {
                throw new FlinkRuntimeException(new StateMigrationException("For heap backends, the new priority queue serializer must not be incompatible."));
            }
            this.registeredPQStates.put(stateName, existingState.forUpdatedSerializer(byteOrderedElementSerializer, allowFutureMetadataUpdates));
            return existingState.getPriorityQueue();
        }
        RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo = new RegisteredPriorityQueueStateBackendMetaInfo<T>(stateName, byteOrderedElementSerializer);
        metaInfo = allowFutureMetadataUpdates ? metaInfo.withSerializerUpgradesAllowed() : metaInfo;
        return this.createInternal(metaInfo);
    }

    @Nonnull
    private <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> createInternal(RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo) {
        String stateName = metaInfo.getName();
        KeyGroupedInternalPriorityQueue priorityQueue = this.priorityQueueSetFactory.create(stateName, (TypeSerializer)metaInfo.getElementSerializer());
        HeapPriorityQueueSnapshotRestoreWrapper<T> wrapper = new HeapPriorityQueueSnapshotRestoreWrapper<T>(priorityQueue, metaInfo, KeyExtractorFunction.forKeyedObjects(), this.keyGroupRange, this.numberOfKeyGroups);
        this.registeredPQStates.put(stateName, wrapper);
        return priorityQueue;
    }

    public Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> getRegisteredPQStates() {
        return this.registeredPQStates;
    }
}

