/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.sort;

import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.AlgorithmOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.sort.ExternalSorter;
import org.apache.flink.runtime.operators.sort.PushSorter;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.sort.FixedLengthByteKeyComparator;
import org.apache.flink.streaming.api.operators.sort.KeyAndValueSerializer;
import org.apache.flink.streaming.api.operators.sort.ObservableStreamTaskInput;
import org.apache.flink.streaming.api.operators.sort.VariableLengthByteKeyComparator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.MutableObjectIterator;

public final class MultiInputSortingDataInput<IN, K>
implements StreamTaskInput<IN> {
    private final int idx;
    private final StreamTaskInput<IN> wrappedInput;
    private final PushSorter<Tuple2<byte[], StreamRecord<IN>>> sorter;
    private final CommonContext commonContext;
    private final SortingPhaseDataOutput sortingPhaseDataOutput = new SortingPhaseDataOutput();
    private final KeySelector<IN, K> keySelector;
    private final TypeSerializer<K> keySerializer;
    private final DataOutputSerializer dataOutputSerializer;
    private MutableObjectIterator<Tuple2<byte[], StreamRecord<IN>>> sortedInput;
    private long seenWatermark = Long.MIN_VALUE;

    private MultiInputSortingDataInput(CommonContext commonContext, StreamTaskInput<IN> wrappedInput, int inputIdx, PushSorter<Tuple2<byte[], StreamRecord<IN>>> sorter, KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, DataOutputSerializer dataOutputSerializer) {
        this.wrappedInput = wrappedInput;
        this.idx = inputIdx;
        this.commonContext = commonContext;
        this.sorter = sorter;
        this.keySelector = keySelector;
        this.keySerializer = keySerializer;
        this.dataOutputSerializer = dataOutputSerializer;
    }

    public static <K> SelectableSortingInputs wrapInputs(TaskInvokable containingTask, StreamTaskInput<Object>[] sortingInputs, KeySelector<Object, K>[] keySelectors, TypeSerializer<Object>[] inputSerializers, TypeSerializer<K> keySerializer, StreamTaskInput<Object>[] passThroughInputs, MemoryManager memoryManager, IOManager ioManager, boolean objectReuse, double managedMemoryFraction, Configuration jobConfiguration, ExecutionConfig executionConfig) {
        TypeComparator comparator;
        DataOutputSerializer dataOutputSerializer;
        int keyLength = keySerializer.getLength();
        if (keyLength > 0) {
            dataOutputSerializer = new DataOutputSerializer(keyLength);
            comparator = new FixedLengthByteKeyComparator(keyLength);
        } else {
            dataOutputSerializer = new DataOutputSerializer(64);
            comparator = new VariableLengthByteKeyComparator();
        }
        List passThroughInputIndices = Arrays.stream(passThroughInputs).map(StreamTaskInput::getInputIndex).collect(Collectors.toList());
        int numberOfInputs = sortingInputs.length + passThroughInputs.length;
        CommonContext commonContext = new CommonContext(sortingInputs);
        InputSelector inputSelector = new InputSelector(commonContext, numberOfInputs, passThroughInputIndices);
        StreamTaskInput[] wrappedSortingInputs = (StreamTaskInput[])IntStream.range(0, sortingInputs.length).mapToObj(idx -> {
            try {
                KeyAndValueSerializer keyAndValueSerializer = new KeyAndValueSerializer(inputSerializers[idx], keyLength);
                return new MultiInputSortingDataInput(commonContext, sortingInputs[idx], sortingInputs[idx].getInputIndex(), ExternalSorter.newBuilder(memoryManager, containingTask, keyAndValueSerializer, comparator, executionConfig).memoryFraction(managedMemoryFraction / (double)numberOfInputs).enableSpilling(ioManager, jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD).floatValue()).maxNumFileHandles(jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN) / numberOfInputs).objectReuse(objectReuse).largeRecords(true).build(), keySelectors[idx], keySerializer, dataOutputSerializer);
            }
            catch (MemoryAllocationException e) {
                throw new RuntimeException();
            }
        }).toArray(StreamTaskInput[]::new);
        StreamTaskInput[] wrappedPassThroughInputs = (StreamTaskInput[])Arrays.stream(passThroughInputs).map(input -> new ObservableStreamTaskInput(input, inputSelector)).toArray(StreamTaskInput[]::new);
        return new SelectableSortingInputs(wrappedSortingInputs, wrappedPassThroughInputs, inputSelector);
    }

    @Override
    public int getInputIndex() {
        return this.idx;
    }

    @Override
    public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) {
        throw new UnsupportedOperationException("Checkpoints are not supported with sorted inputs in the BATCH runtime.");
    }

    @Override
    public void close() throws IOException {
        IOException ex = null;
        try {
            this.wrappedInput.close();
        }
        catch (IOException e) {
            ex = ExceptionUtils.firstOrSuppressed(e, ex);
        }
        try {
            this.sorter.close();
        }
        catch (IOException e) {
            ex = ExceptionUtils.firstOrSuppressed(e, ex);
        }
        if (ex != null) {
            throw ex;
        }
    }

    @Override
    public DataInputStatus emitNext(PushingAsyncDataInput.DataOutput<IN> output) throws Exception {
        if (this.sortedInput != null) {
            if (this.commonContext.isFinishedEmitting(this.idx)) {
                return this.wrappedInput.emitNext(this.sortingPhaseDataOutput);
            }
            return this.emitNextAfterSorting(output);
        }
        DataInputStatus inputStatus = this.wrappedInput.emitNext(this.sortingPhaseDataOutput);
        if (inputStatus == DataInputStatus.END_OF_DATA) {
            this.endSorting();
            return this.addNextToQueue(new HeadElement(this.idx), output);
        }
        return inputStatus;
    }

    @Nonnull
    private DataInputStatus emitNextAfterSorting(PushingAsyncDataInput.DataOutput<IN> output) throws Exception {
        if (this.commonContext.allSorted()) {
            HeadElement head = this.commonContext.getQueueOfHeads().peek();
            if (head != null && head.inputIndex == this.idx) {
                HeadElement headElement = this.commonContext.getQueueOfHeads().poll();
                output.emitRecord((StreamRecord)headElement.streamElement.f1);
                return this.addNextToQueue(headElement, output);
            }
            return DataInputStatus.NOTHING_AVAILABLE;
        }
        return DataInputStatus.NOTHING_AVAILABLE;
    }

    private void endSorting() throws Exception {
        this.sorter.finishReading();
        this.commonContext.setFinishedSorting(this.idx);
        this.sortedInput = this.sorter.getIterator();
        if (this.commonContext.allSorted()) {
            this.commonContext.getAllFinished().getUnavailableToResetAvailable().complete(null);
        }
    }

    @Nonnull
    private DataInputStatus addNextToQueue(HeadElement reuse, PushingAsyncDataInput.DataOutput<IN> output) throws Exception {
        HeadElement headElement;
        Tuple2<byte[], StreamRecord<IN>> next = this.sortedInput.next();
        if (next == null) {
            this.commonContext.setFinishedEmitting(this.idx);
            if (this.seenWatermark > Long.MIN_VALUE) {
                output.emitWatermark(new Watermark(this.seenWatermark));
            }
            return DataInputStatus.END_OF_DATA;
        }
        reuse.streamElement = this.getAsObject(next);
        this.commonContext.getQueueOfHeads().add(reuse);
        if (this.commonContext.allSorted() && (headElement = this.commonContext.getQueueOfHeads().peek()) != null && headElement.inputIndex == this.idx) {
            return DataInputStatus.MORE_AVAILABLE;
        }
        return DataInputStatus.NOTHING_AVAILABLE;
    }

    private Tuple2<byte[], StreamRecord<Object>> getAsObject(Tuple2<byte[], StreamRecord<IN>> next) {
        return next;
    }

    @Override
    public CompletableFuture<?> getAvailableFuture() {
        if (this.sortedInput != null) {
            return this.commonContext.getAllFinished().getAvailableFuture();
        }
        return this.wrappedInput.getAvailableFuture();
    }

    private static final class CommonContext {
        private final PriorityQueue<HeadElement> queueOfHeads = new PriorityQueue();
        private final AvailabilityProvider.AvailabilityHelper allFinished = new AvailabilityProvider.AvailabilityHelper();
        private long notFinishedSortingMask = 0L;
        private long notFinishedEmitting = 0L;

        public CommonContext(StreamTaskInput<Object>[] sortingInputs) {
            for (StreamTaskInput<Object> sortingInput : sortingInputs) {
                this.notFinishedSortingMask = CommonContext.setBitMask(this.notFinishedSortingMask, sortingInput.getInputIndex());
                this.notFinishedEmitting = CommonContext.setBitMask(this.notFinishedEmitting, sortingInput.getInputIndex());
            }
        }

        public boolean allSorted() {
            return this.notFinishedSortingMask == 0L;
        }

        public boolean allEndOfPartition() {
            return this.notFinishedEmitting == 0L;
        }

        public void setFinishedSorting(int inputIndex) {
            this.notFinishedSortingMask = CommonContext.unsetBitMask(this.notFinishedSortingMask, inputIndex);
        }

        public void setFinishedEmitting(int inputIndex) {
            this.notFinishedEmitting = CommonContext.unsetBitMask(this.notFinishedEmitting, inputIndex);
        }

        public boolean isFinishedEmitting(int inputIndex) {
            return !CommonContext.checkBitMask(this.notFinishedEmitting, inputIndex);
        }

        public PriorityQueue<HeadElement> getQueueOfHeads() {
            return this.queueOfHeads;
        }

        public AvailabilityProvider.AvailabilityHelper getAllFinished() {
            return this.allFinished;
        }

        private static long setBitMask(long mask, int inputIndex) {
            return mask | 1L << inputIndex;
        }

        private static long unsetBitMask(long mask, int inputIndex) {
            return mask & (1L << inputIndex ^ 0xFFFFFFFFFFFFFFFFL);
        }

        private static boolean checkBitMask(long mask, int inputIndex) {
            return (mask & 1L << inputIndex) != 0L;
        }
    }

    private static final class HeadElement
    implements Comparable<HeadElement> {
        final int inputIndex;
        Tuple2<byte[], StreamRecord<Object>> streamElement;

        private HeadElement(int inputIndex) {
            this.inputIndex = inputIndex;
        }

        @Override
        public int compareTo(HeadElement o) {
            int keyCmp = this.compare((byte[])this.streamElement.f0, (byte[])o.streamElement.f0);
            if (keyCmp != 0) {
                return keyCmp;
            }
            return Long.compare(((StreamRecord)this.streamElement.f1).asRecord().getTimestamp(), ((StreamRecord)o.streamElement.f1).asRecord().getTimestamp());
        }

        private int compare(byte[] first, byte[] second) {
            int firstLength = first.length;
            int secondLength = second.length;
            int minLength = Math.min(firstLength, secondLength);
            for (int i = 0; i < minLength; ++i) {
                int cmp = Byte.compare(first[i], second[i]);
                if (cmp == 0) continue;
                return cmp;
            }
            return Integer.compare(firstLength, secondLength);
        }
    }

    private class SortingPhaseDataOutput
    implements PushingAsyncDataInput.DataOutput<IN> {
        private SortingPhaseDataOutput() {
        }

        @Override
        public void emitRecord(StreamRecord<IN> streamRecord) throws Exception {
            Object key = MultiInputSortingDataInput.this.keySelector.getKey(streamRecord.getValue());
            MultiInputSortingDataInput.this.keySerializer.serialize(key, MultiInputSortingDataInput.this.dataOutputSerializer);
            byte[] serializedKey = MultiInputSortingDataInput.this.dataOutputSerializer.getCopyOfBuffer();
            MultiInputSortingDataInput.this.dataOutputSerializer.clear();
            MultiInputSortingDataInput.this.sorter.writeRecord(Tuple2.of(serializedKey, streamRecord));
        }

        @Override
        public void emitWatermark(Watermark watermark) {
            MultiInputSortingDataInput.this.seenWatermark = Math.max(MultiInputSortingDataInput.this.seenWatermark, watermark.getTimestamp());
        }

        @Override
        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
        }
    }

    private static class InputSelector
    implements InputSelectable,
    BoundedMultiInput {
        private final CommonContext commonContext;
        private final int numInputs;
        private final Queue<Integer> passThroughInputsIndices;

        private InputSelector(CommonContext commonContext, int numInputs, List<Integer> passThroughInputIndices) {
            this.commonContext = commonContext;
            this.numInputs = numInputs;
            this.passThroughInputsIndices = new LinkedList<Integer>(passThroughInputIndices);
        }

        @Override
        public void endInput(int inputId) throws Exception {
            this.passThroughInputsIndices.remove(inputId);
        }

        @Override
        public InputSelection nextSelection() {
            HeadElement headElement;
            Integer currentPassThroughInputIndex = this.passThroughInputsIndices.peek();
            if (currentPassThroughInputIndex != null) {
                return new InputSelection.Builder().select(currentPassThroughInputIndex + 1).build(this.numInputs);
            }
            if (this.commonContext.allEndOfPartition()) {
                return InputSelection.ALL;
            }
            if (this.commonContext.allSorted() && (headElement = this.commonContext.getQueueOfHeads().peek()) != null) {
                int headIdx = headElement.inputIndex;
                return new InputSelection.Builder().select(headIdx + 1).build(this.numInputs);
            }
            return InputSelection.ALL;
        }
    }

    public static class SelectableSortingInputs {
        private final InputSelectable inputSelectable;
        private final StreamTaskInput<?>[] sortedInputs;
        private final StreamTaskInput<?>[] passThroughInputs;

        public SelectableSortingInputs(StreamTaskInput<?>[] sortedInputs, StreamTaskInput<?>[] passThroughInputs, InputSelectable inputSelectable) {
            this.sortedInputs = sortedInputs;
            this.passThroughInputs = passThroughInputs;
            this.inputSelectable = inputSelectable;
        }

        public InputSelectable getInputSelectable() {
            return this.inputSelectable;
        }

        public StreamTaskInput<?>[] getSortedInputs() {
            return this.sortedInputs;
        }

        public StreamTaskInput<?>[] getPassThroughInputs() {
            return this.passThroughInputs;
        }
    }
}

