/*
 * Decompiled with CFR 0.152.
 */
package net.tomp2p.p2p;

import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import net.tomp2p.connection.ChannelCreator;
import net.tomp2p.connection.ConnectionReservation;
import net.tomp2p.futures.BaseFuture;
import net.tomp2p.futures.BaseFutureAdapter;
import net.tomp2p.futures.BaseFutureListener;
import net.tomp2p.futures.FutureAsyncTask;
import net.tomp2p.futures.FutureChannelCreator;
import net.tomp2p.futures.FutureForkJoin;
import net.tomp2p.futures.FutureRouting;
import net.tomp2p.futures.FutureTask;
import net.tomp2p.message.Message;
import net.tomp2p.p2p.DistributedRouting;
import net.tomp2p.p2p.RequestP2PConfiguration;
import net.tomp2p.p2p.RoutingConfiguration;
import net.tomp2p.peers.Number160;
import net.tomp2p.peers.PeerAddress;
import net.tomp2p.peers.PeerMap;
import net.tomp2p.rpc.DigestInfo;
import net.tomp2p.storage.Data;
import net.tomp2p.task.AsyncTask;
import net.tomp2p.task.Worker;
import net.tomp2p.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedTask {
    private static final Logger logger = LoggerFactory.getLogger(DistributedTask.class);
    private final DistributedRouting routing;
    private final AsyncTask asyncTask;

    public DistributedTask(DistributedRouting routing, AsyncTask asyncTask) {
        this.routing = routing;
        this.asyncTask = asyncTask;
    }

    public FutureTask submit(final Number160 locationKey, final Map<Number160, Data> dataMap, final Worker worker, final RoutingConfiguration routingConfiguration, final RequestP2PConfiguration requestP2PConfiguration, FutureChannelCreator futureChannelCreator, final boolean signMessage, final boolean isManualCleanup, final ConnectionReservation connectionReservation) {
        final FutureTask futureTask = new FutureTask();
        futureChannelCreator.addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureChannelCreator>(){

            @Override
            public void operationComplete(FutureChannelCreator future) throws Exception {
                if (future.isSuccess()) {
                    final ChannelCreator channelCreator = future.getChannelCreator();
                    final FutureRouting futureRouting = DistributedTask.this.createRouting(locationKey, null, null, routingConfiguration, requestP2PConfiguration, Message.Type.REQUEST_4, channelCreator);
                    futureRouting.addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureRouting>(){

                        @Override
                        public void operationComplete(FutureRouting future) throws Exception {
                            if (futureRouting.isSuccess()) {
                                SortedMap<PeerAddress, DigestInfo> map = future.getDirectHitsDigest();
                                NavigableSet<Pair> queue = DistributedTask.findBest(map, future.getPotentialHits(), locationKey);
                                DistributedTask.this.parallelRequests(futureTask, queue, requestP2PConfiguration, channelCreator, locationKey, dataMap, worker, requestP2PConfiguration.isForceUPD(), signMessage);
                            } else {
                                futureTask.setFailed(futureRouting);
                            }
                        }
                    });
                }
                if (!isManualCleanup) {
                    Utils.addReleaseListenerAll(futureTask, connectionReservation, future.getChannelCreator());
                } else {
                    futureTask.setFailed(future);
                }
            }
        });
        return futureTask;
    }

    private void parallelRequests(FutureTask futureTask, NavigableSet<Pair> queue, RequestP2PConfiguration requestP2PConfiguration, ChannelCreator channelCreator, Number160 taskId, Map<Number160, Data> dataMap, Worker worker, boolean forceUDP, boolean sign) {
        FutureAsyncTask[] futures = new FutureAsyncTask[requestP2PConfiguration.getParallel()];
        this.loopRec(queue, requestP2PConfiguration.getMinimumResults(), new AtomicInteger(0), requestP2PConfiguration.getMaxFailure(), requestP2PConfiguration.getParallelDiff(), futures, futureTask, true, channelCreator, taskId, dataMap, worker, forceUDP, sign);
    }

    private void loopRec(final NavigableSet<Pair> queue, final int min, final AtomicInteger nrFailure, final int maxFailure, final int parallelDiff, final FutureAsyncTask[] futures, final FutureTask futureTask, final boolean cancelOnFinish, final ChannelCreator channelCreator, final Number160 taskId, final Map<Number160, Data> dataMap, final Worker mapper, final boolean forceUDP, final boolean sign) {
        int active = 0;
        for (int i = 0; i < min + parallelDiff; ++i) {
            if (futures[i] == null) {
                PeerAddress next = queue.pollFirst().peerAddress;
                if (next == null) continue;
                ++active;
                futures[i] = this.asyncTask.submit(next, channelCreator, taskId, dataMap, mapper, forceUDP, sign);
                futureTask.addRequests(futures[i]);
                continue;
            }
            ++active;
        }
        if (active == 0) {
            futureTask.setDone();
            DistributedRouting.cancel(cancelOnFinish, min + parallelDiff, futures);
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("fork/join status: " + min + "/" + active + " (" + parallelDiff + ")");
        }
        FutureForkJoin fp = new FutureForkJoin(Math.min(min, active), false, (BaseFuture[])futures);
        fp.addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureForkJoin<FutureAsyncTask>>(){

            @Override
            public void operationComplete(FutureForkJoin<FutureAsyncTask> future) throws Exception {
                for (FutureAsyncTask futureAsyncTask : future.getCompleted()) {
                    futureTask.setProgress(futureAsyncTask);
                }
                if (future.isSuccess() || nrFailure.incrementAndGet() > maxFailure) {
                    if (cancelOnFinish) {
                        DistributedRouting.cancel(cancelOnFinish, min + parallelDiff, futures);
                    }
                    futureTask.setDone();
                } else {
                    DistributedTask.this.loopRec(queue, min - future.getSuccessCounter(), nrFailure, maxFailure, parallelDiff, futures, futureTask, cancelOnFinish, channelCreator, taskId, dataMap, mapper, forceUDP, sign);
                }
            }
        });
    }

    private FutureRouting createRouting(Number160 locationKey, Number160 domainKey, Set<Number160> contentKeys, RoutingConfiguration routingConfiguration, RequestP2PConfiguration requestP2PConfiguration, Message.Type type, ChannelCreator channelCreator) {
        return this.routing.route(locationKey, domainKey, contentKeys, type, routingConfiguration.getDirectHits(), routingConfiguration.getMaxNoNewInfo(requestP2PConfiguration.getMinimumResults()), routingConfiguration.getMaxFailures(), routingConfiguration.getMaxSuccess(), routingConfiguration.getParallel(), routingConfiguration.isForceTCP(), channelCreator);
    }

    static NavigableSet<Pair> findBest(SortedMap<PeerAddress, DigestInfo> map, NavigableSet<PeerAddress> navigableSet, Number160 locationKey) {
        TreeSet<Pair> set = new TreeSet<Pair>();
        for (Map.Entry<PeerAddress, DigestInfo> entry : map.entrySet()) {
            set.add(new Pair(entry.getKey(), entry.getValue().getSize(), locationKey));
        }
        for (PeerAddress peerAddress : navigableSet) {
            set.add(new Pair(peerAddress, 0, locationKey));
        }
        return set;
    }

    private static class Pair
    implements Comparable<Pair> {
        private final PeerAddress peerAddress;
        private final int queueSize;
        private final Number160 locationKey;

        public Pair(PeerAddress peerAddress, int queueSize, Number160 locationKey) {
            this.peerAddress = peerAddress;
            this.queueSize = queueSize;
            this.locationKey = locationKey;
        }

        @Override
        public int compareTo(Pair o) {
            int diff = this.queueSize - o.queueSize;
            if (diff != 0) {
                return diff;
            }
            return PeerMap.isKadCloser(this.locationKey, this.peerAddress, o.peerAddress);
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof Pair)) {
                return false;
            }
            return this.compareTo((Pair)obj) == 0;
        }
    }
}

