/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.krystal.krystex.node;

import com.flipkart.krystal.data.Inputs;
import com.flipkart.krystal.krystex.KrystalExecutor;
import com.flipkart.krystal.krystex.MainLogicDefinition;
import com.flipkart.krystal.krystex.RequestId;
import com.flipkart.krystal.krystex.commands.ExecuteWithInputs;
import com.flipkart.krystal.krystex.commands.Flush;
import com.flipkart.krystal.krystex.commands.NodeCommand;
import com.flipkart.krystal.krystex.commands.NodeRequestCommand;
import com.flipkart.krystal.krystex.decoration.InitiateActiveDepChains;
import com.flipkart.krystal.krystex.decoration.LogicDecorationOrdering;
import com.flipkart.krystal.krystex.decoration.LogicExecutionContext;
import com.flipkart.krystal.krystex.decoration.MainLogicDecorator;
import com.flipkart.krystal.krystex.decoration.MainLogicDecoratorConfig;
import com.flipkart.krystal.krystex.node.DependantChain;
import com.flipkart.krystal.krystex.node.DependantChainStart;
import com.flipkart.krystal.krystex.node.KrystalNodeExecutorMetrics;
import com.flipkart.krystal.krystex.node.Node;
import com.flipkart.krystal.krystex.node.NodeDefinition;
import com.flipkart.krystal.krystex.node.NodeDefinitionRegistry;
import com.flipkart.krystal.krystex.node.NodeId;
import com.flipkart.krystal.krystex.node.NodeRegistry;
import com.flipkart.krystal.krystex.node.NodeResponse;
import com.flipkart.krystal.utils.Futures;
import com.flipkart.krystal.utils.MultiLeasePool;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class KrystalNodeExecutor
implements KrystalExecutor {
    private static final Logger log = LoggerFactory.getLogger(KrystalNodeExecutor.class);
    private final NodeDefinitionRegistry nodeDefinitionRegistry;
    private final LogicDecorationOrdering logicDecorationOrdering;
    private final MultiLeasePool.Lease<? extends ExecutorService> commandQueueLease;
    private final RequestId requestId;
    private final ImmutableMap<String, List<MainLogicDecoratorConfig>> requestScopedLogicDecoratorConfigs;
    private final Map<String, Map<String, MainLogicDecorator>> requestScopedMainDecorators = new LinkedHashMap<String, Map<String, MainLogicDecorator>>();
    private final NodeRegistry nodeRegistry = new NodeRegistry();
    private final KrystalNodeExecutorMetrics krystalNodeMetrics;
    private volatile boolean closed;
    private final Map<RequestId, List<NodeResult>> allRequests = new LinkedHashMap<RequestId, List<NodeResult>>();
    private final Map<RequestId, List<NodeResult>> unFlushedRequests = new LinkedHashMap<RequestId, List<NodeResult>>();
    private final Map<NodeId, Set<DependantChain>> dependantChainsPerNode = new LinkedHashMap<NodeId, Set<DependantChain>>();

    public KrystalNodeExecutor(NodeDefinitionRegistry nodeDefinitionRegistry, LogicDecorationOrdering logicDecorationOrdering, MultiLeasePool<? extends ExecutorService> commandQueuePool, String requestId, Map<String, List<MainLogicDecoratorConfig>> requestScopedLogicDecoratorConfigs) {
        this.nodeDefinitionRegistry = nodeDefinitionRegistry;
        this.logicDecorationOrdering = logicDecorationOrdering;
        this.commandQueueLease = commandQueuePool.lease();
        this.requestId = new RequestId(requestId);
        this.requestScopedLogicDecoratorConfigs = ImmutableMap.copyOf(requestScopedLogicDecoratorConfigs);
        this.krystalNodeMetrics = new KrystalNodeExecutorMetrics();
    }

    private ImmutableMap<String, MainLogicDecorator> getRequestScopedDecorators(LogicExecutionContext logicExecutionContext) {
        NodeId nodeId = logicExecutionContext.nodeId();
        NodeDefinition nodeDefinition = this.nodeDefinitionRegistry.get(nodeId);
        MainLogicDefinition mainLogicDefinition = nodeDefinition.getMainLogicDefinition();
        LinkedHashMap decorators = new LinkedHashMap();
        Stream.concat(mainLogicDefinition.getRequestScopedLogicDecoratorConfigs().entrySet().stream(), this.requestScopedLogicDecoratorConfigs.entrySet().stream()).forEach(entry -> {
            String decoratorType = (String)entry.getKey();
            ArrayList decoratorConfigList = new ArrayList((Collection)entry.getValue());
            decoratorConfigList.forEach(decoratorConfig -> {
                String instanceId = decoratorConfig.instanceIdGenerator().apply(logicExecutionContext);
                if (decoratorConfig.shouldDecorate().test(logicExecutionContext)) {
                    MainLogicDecorator mainLogicDecorator = this.requestScopedMainDecorators.computeIfAbsent(decoratorType, t -> new LinkedHashMap()).computeIfAbsent(instanceId, _i -> decoratorConfig.factory().apply(new MainLogicDecoratorConfig.DecoratorContext(instanceId, logicExecutionContext)));
                    mainLogicDecorator.executeCommand(new InitiateActiveDepChains(nodeId, (ImmutableSet<DependantChain>)ImmutableSet.copyOf((Collection)this.dependantChainsPerNode.get(nodeId))));
                    decorators.put(decoratorType, mainLogicDecorator);
                }
            });
        });
        return ImmutableMap.copyOf(decorators);
    }

    @Override
    public <T> CompletableFuture<T> executeNode(NodeId nodeId, Inputs inputs) {
        return this.executeNode(nodeId, inputs, this.requestId);
    }

    @Override
    public <T> CompletableFuture<T> executeNode(NodeId nodeId, Inputs inputs, String executionId) {
        if (executionId == null) {
            throw new IllegalArgumentException("Execution id can not be null");
        }
        return this.executeNode(nodeId, inputs, this.requestId.append(executionId));
    }

    private CompletableFuture<?> executeNode(NodeId nodeId, Inputs inputs, RequestId requestId) {
        if (this.closed) {
            throw new RejectedExecutionException("KrystalNodeExecutor is already closed");
        }
        return this.enqueueCommand(() -> {
            this.createDependantNodes(nodeId, DependantChainStart.instance());
            CompletableFuture<Object> future = new CompletableFuture<Object>();
            NodeResult nodeResult = new NodeResult(nodeId, inputs, future);
            this.allRequests.computeIfAbsent(requestId, r -> new ArrayList()).add(nodeResult);
            this.unFlushedRequests.computeIfAbsent(requestId, r -> new ArrayList()).add(nodeResult);
            return future;
        }).thenCompose(Function.identity());
    }

    private void createDependantNodes(NodeId nodeId, DependantChain dependantChain) {
        NodeDefinition nodeDefinition = this.nodeDefinitionRegistry.get(nodeId);
        if (dependantChain.contains(nodeId)) {
            this.nodeDefinitionRegistry.registerRecursive(nodeDefinition);
            this.nodeRegistry.tryGet(nodeId).ifPresent(Node::markRecursive);
            this.dependantChainsPerNode.computeIfAbsent(nodeId, _n -> new LinkedHashSet()).add(dependantChain);
        } else {
            this.nodeRegistry.createIfAbsent(nodeId, _n -> new Node(nodeDefinition, this, logicExecutionContext -> this.getRequestScopedDecorators((LogicExecutionContext)logicExecutionContext), this.logicDecorationOrdering));
            ImmutableMap<String, NodeId> dependencyNodes = nodeDefinition.dependencyNodes();
            dependencyNodes.forEach((dependencyName, depNodeId) -> this.createDependantNodes((NodeId)depNodeId, DependantChain.from(nodeId, dependencyName, dependantChain)));
            this.dependantChainsPerNode.computeIfAbsent(nodeId, _n -> new LinkedHashSet()).add(dependantChain);
        }
    }

    CompletableFuture<NodeResponse> enqueueNodeCommand(Supplier<NodeRequestCommand> nodeCommand) {
        return this.enqueueCommand(() -> this._executeCommand((NodeCommand)nodeCommand.get())).thenCompose(Function.identity());
    }

    CompletableFuture<NodeResponse> executeCommand(NodeCommand nodeCommand) {
        this.krystalNodeMetrics.commandQueueBypassed();
        return this._executeCommand(nodeCommand);
    }

    private CompletableFuture<NodeResponse> _executeCommand(NodeCommand nodeCommand) {
        if (nodeCommand instanceof NodeRequestCommand) {
            NodeRequestCommand nodeRequestCommand = (NodeRequestCommand)nodeCommand;
            return this.nodeRegistry.get(nodeCommand.nodeId()).executeRequestCommand(nodeRequestCommand);
        }
        if (nodeCommand instanceof Flush) {
            Flush flush = (Flush)nodeCommand;
            this.nodeRegistry.get(flush.nodeId()).executeCommand(flush);
            return CompletableFuture.failedFuture(new UnsupportedOperationException("No data returned for flush command"));
        }
        throw new UnsupportedOperationException("Unknown NodeCommand type %s".formatted(nodeCommand.getClass()));
    }

    @Override
    public void flush() {
        this.enqueueCommand(() -> {
            this.unFlushedRequests.forEach((requestId, nodeExecutionInfos) -> nodeExecutionInfos.forEach(nodeResult -> {
                NodeId nodeId = nodeResult.nodeId();
                if (nodeResult.future().isDone()) {
                    return;
                }
                NodeDefinition nodeDefinition = this.nodeDefinitionRegistry.get(nodeId);
                CompletionStage submissionResult = ((CompletableFuture)this.executeCommand(new ExecuteWithInputs(nodeId, (ImmutableSet<String>)((ImmutableSet)nodeDefinition.getMainLogicDefinition().inputNames().stream().filter(s -> !nodeDefinition.dependencyNodes().containsKey(s)).collect(ImmutableSet.toImmutableSet())), nodeResult.inputs(), DependantChainStart.instance(), (RequestId)requestId)).thenApply(NodeResponse::response)).thenApply(valueOrError -> {
                    if (valueOrError.error().isPresent()) {
                        throw new RuntimeException((Throwable)valueOrError.error().get());
                    }
                    return valueOrError.value().orElse(null);
                });
                Futures.linkFutures((CompletableFuture)submissionResult, nodeResult.future());
            }));
            this.unFlushedRequests.forEach((requestId, nodeExecutionInfos) -> nodeExecutionInfos.forEach(nodeResult -> this.executeCommand(new Flush(nodeResult.nodeId()))));
            this.unFlushedRequests.clear();
        });
    }

    public KrystalNodeExecutorMetrics getKrystalNodeMetrics() {
        return this.krystalNodeMetrics;
    }

    @Override
    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.flush();
        this.enqueueCommand(() -> CompletableFuture.allOf((CompletableFuture[])this.allRequests.values().stream().flatMap(nodeExecutionInfos -> nodeExecutionInfos.stream().map(NodeResult::future)).toArray(CompletableFuture[]::new)).whenComplete((unused, throwable) -> this.commandQueueLease.close()));
    }

    private void enqueueCommand(Runnable command) {
        this.enqueueCommand(() -> {
            command.run();
            return null;
        });
    }

    private <T> CompletableFuture<T> enqueueCommand(Supplier<T> command) {
        return CompletableFuture.supplyAsync(() -> {
            this.krystalNodeMetrics.commandQueued();
            return command.get();
        }, (Executor)this.commandQueueLease.get());
    }

    private record NodeResult(NodeId nodeId, Inputs inputs, CompletableFuture<Object> future) {
    }
}

