/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.dynamicfiltering;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.table.connector.source.DynamicFilteringData;
import org.apache.flink.table.connector.source.DynamicFilteringEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DynamicFilteringDataCollectorOperatorCoordinator
implements OperatorCoordinator,
CoordinationRequestHandler {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicFilteringDataCollectorOperatorCoordinator.class);
    private final CoordinatorStore coordinatorStore;
    private final List<String> dynamicFilteringDataListenerIDs;
    private DynamicFilteringData receivedFilteringData;

    public DynamicFilteringDataCollectorOperatorCoordinator(OperatorCoordinator.Context context, List<String> dynamicFilteringDataListenerIDs) {
        this.coordinatorStore = Preconditions.checkNotNull(context.getCoordinatorStore());
        this.dynamicFilteringDataListenerIDs = Preconditions.checkNotNull(dynamicFilteringDataListenerIDs);
    }

    @Override
    public void start() throws Exception {
    }

    @Override
    public void close() throws Exception {
    }

    @Override
    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
        DynamicFilteringData currentData = ((DynamicFilteringEvent)((SourceEventWrapper)event).getSourceEvent()).getData();
        if (this.receivedFilteringData != null) {
            if (DynamicFilteringData.isEqual(this.receivedFilteringData, currentData)) {
                return;
            }
            throw new IllegalStateException("DynamicFilteringData is recomputed but not equal. Triggering global failover in case the result is incorrect.  It's recommended to re-run the job with dynamic filtering disabled.");
        }
        this.receivedFilteringData = currentData;
        for (String listenerID : this.dynamicFilteringDataListenerIDs) {
            this.coordinatorStore.compute(listenerID, (key, oldValue) -> {
                if (oldValue == null || oldValue instanceof OperatorEvent) {
                    LOG.info("Updating event {} before the source coordinator with ID {} is registered", (Object)event, (Object)listenerID);
                    return event;
                }
                Preconditions.checkState(oldValue instanceof OperatorCoordinator, "The existing value for " + listenerID + "is expected to be an operator coordinator, but it is in fact " + oldValue);
                LOG.info("Distributing event {} to source coordinator with ID {}", (Object)event, (Object)listenerID);
                try {
                    ((OperatorCoordinator)oldValue).handleEventFromOperator(0, 0, event);
                }
                catch (Exception e) {
                    ExceptionUtils.rethrow(e);
                }
                return null;
            });
        }
    }

    @Override
    public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request) {
        throw new UnsupportedOperationException();
    }

    @Override
    public void subtaskReset(int subtask, long checkpointId) {
    }

    @Override
    public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) {
    }

    @Override
    public void executionAttemptReady(int subtask, int attemptNumber, OperatorCoordinator.SubtaskGateway gateway) {
    }

    @Override
    public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
    }

    @Override
    public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {
    }

    public static class Provider
    extends RecreateOnResetOperatorCoordinator.Provider {
        private final List<String> dynamicFilteringDataListenerIDs;

        public Provider(OperatorID operatorID, List<String> dynamicFilteringDataListenerIDs) {
            super(operatorID);
            this.dynamicFilteringDataListenerIDs = Preconditions.checkNotNull(dynamicFilteringDataListenerIDs);
        }

        @Override
        protected OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) throws Exception {
            return new DynamicFilteringDataCollectorOperatorCoordinator(context, this.dynamicFilteringDataListenerIDs);
        }
    }
}

