/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.file.mergetree;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.compact.CompactManager;
import org.apache.flink.table.store.file.compact.CompactResult;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileWriter;
import org.apache.flink.table.store.file.memory.MemoryOwner;
import org.apache.flink.table.store.file.mergetree.Increment;
import org.apache.flink.table.store.file.mergetree.Levels;
import org.apache.flink.table.store.file.mergetree.MemTable;
import org.apache.flink.table.store.file.mergetree.SortBufferMemTable;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.CloseableIterator;

public class MergeTreeWriter
implements RecordWriter<KeyValue>,
MemoryOwner {
    private final RowType keyType;
    private final RowType valueType;
    private final CompactManager compactManager;
    private final Levels levels;
    private final Comparator<RowData> keyComparator;
    private final MergeFunction mergeFunction;
    private final DataFileWriter dataFileWriter;
    private final boolean commitForceCompact;
    private final int numSortedRunStopTrigger;
    private final CoreOptions.ChangelogProducer changelogProducer;
    private final LinkedHashSet<DataFileMeta> newFiles;
    private final LinkedHashMap<String, DataFileMeta> compactBefore;
    private final LinkedHashSet<DataFileMeta> compactAfter;
    private long newSequenceNumber;
    private MemTable memTable;

    public MergeTreeWriter(RowType keyType, RowType valueType, CompactManager compactManager, Levels levels, long maxSequenceNumber, Comparator<RowData> keyComparator, MergeFunction mergeFunction, DataFileWriter dataFileWriter, boolean commitForceCompact, int numSortedRunStopTrigger, CoreOptions.ChangelogProducer changelogProducer) {
        this.keyType = keyType;
        this.valueType = valueType;
        this.compactManager = compactManager;
        this.levels = levels;
        this.newSequenceNumber = maxSequenceNumber + 1L;
        this.keyComparator = keyComparator;
        this.mergeFunction = mergeFunction;
        this.dataFileWriter = dataFileWriter;
        this.commitForceCompact = commitForceCompact;
        this.numSortedRunStopTrigger = numSortedRunStopTrigger;
        this.changelogProducer = changelogProducer;
        this.newFiles = new LinkedHashSet();
        this.compactBefore = new LinkedHashMap();
        this.compactAfter = new LinkedHashSet();
    }

    private long newSequenceNumber() {
        return this.newSequenceNumber++;
    }

    @VisibleForTesting
    Levels levels() {
        return this.levels;
    }

    @Override
    public void setMemoryPool(MemorySegmentPool memoryPool) {
        this.memTable = new SortBufferMemTable(this.keyType, this.valueType, memoryPool);
    }

    @Override
    public void write(KeyValue kv) throws Exception {
        long sequenceNumber = kv.sequenceNumber() == -1L ? this.newSequenceNumber() : kv.sequenceNumber();
        boolean success = this.memTable.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
        if (!success) {
            this.flushMemory();
            success = this.memTable.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
            if (!success) {
                throw new RuntimeException("Mem table is too small to hold a single element.");
            }
        }
    }

    @Override
    public long memoryOccupancy() {
        return this.memTable.memoryOccupancy();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void flushMemory() throws Exception {
        if (this.memTable.size() > 0) {
            if (this.levels.numberOfSortedRuns() > this.numSortedRunStopTrigger) {
                this.finishCompaction(true);
            }
            ArrayList<String> extraFiles = new ArrayList<String>();
            if (this.changelogProducer == CoreOptions.ChangelogProducer.INPUT) {
                extraFiles.add(this.dataFileWriter.writeLevel0Changelog(CloseableIterator.adapterForIterator(this.memTable.rawIterator())).getName());
            }
            boolean success = false;
            try {
                Iterator<KeyValue> iterator = this.memTable.mergeIterator(this.keyComparator, this.mergeFunction);
                success = this.dataFileWriter.writeLevel0(CloseableIterator.adapterForIterator(iterator)).map(file -> {
                    DataFileMeta fileMeta = file.copy(extraFiles);
                    this.newFiles.add(fileMeta);
                    this.levels.addLevel0File(fileMeta);
                    return true;
                }).orElse(false);
            }
            finally {
                if (!success) {
                    extraFiles.forEach(this.dataFileWriter::delete);
                }
            }
            this.memTable.clear();
            this.submitCompaction();
        }
    }

    @Override
    public Increment prepareCommit(boolean endOfInput) throws Exception {
        this.flushMemory();
        boolean blocking = endOfInput || this.commitForceCompact;
        this.finishCompaction(blocking);
        return this.drainIncrement();
    }

    @Override
    public void sync() throws Exception {
        this.finishCompaction(true);
    }

    private Increment drainIncrement() {
        Increment increment = new Increment(new ArrayList<DataFileMeta>(this.newFiles), new ArrayList<DataFileMeta>(this.compactBefore.values()), new ArrayList<DataFileMeta>(this.compactAfter));
        this.newFiles.clear();
        this.compactBefore.clear();
        this.compactAfter.clear();
        return increment;
    }

    private void updateCompactResult(CompactResult result) {
        Set afterFiles = result.after().stream().map(DataFileMeta::fileName).collect(Collectors.toSet());
        for (DataFileMeta file : result.before()) {
            if (this.compactAfter.remove(file)) {
                if (this.compactBefore.containsKey(file.fileName()) || afterFiles.contains(file.fileName())) continue;
                this.dataFileWriter.delete(file);
                continue;
            }
            this.compactBefore.put(file.fileName(), file);
        }
        this.compactAfter.addAll(result.after());
    }

    private void submitCompaction() throws Exception {
        this.finishCompaction(false);
        if (this.compactManager.isCompactionFinished()) {
            this.compactManager.submitCompaction();
        }
    }

    private void finishCompaction(boolean blocking) throws Exception {
        Optional<CompactResult> result = this.compactManager.finishCompaction(blocking);
        result.ifPresent(this::updateCompactResult);
    }

    @Override
    public List<DataFileMeta> close() throws Exception {
        this.compactManager.cancelCompaction();
        this.sync();
        ArrayList<DataFileMeta> delete = new ArrayList<DataFileMeta>(this.newFiles);
        for (DataFileMeta file : this.compactAfter) {
            if (this.compactBefore.containsKey(file.fileName())) continue;
            delete.add(file);
        }
        for (DataFileMeta file : delete) {
            this.dataFileWriter.delete(file);
        }
        this.newFiles.clear();
        this.compactAfter.clear();
        return delete;
    }
}

