/*
 * Decompiled with CFR 0.152.
 */
package net.tomp2p.p2p;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import net.tomp2p.connection.ChannelCreator;
import net.tomp2p.futures.BaseFuture;
import net.tomp2p.futures.BaseFutureAdapter;
import net.tomp2p.futures.BaseFutureListener;
import net.tomp2p.futures.FutureChannelCreator;
import net.tomp2p.futures.FutureDHT;
import net.tomp2p.futures.FutureDigest;
import net.tomp2p.futures.FutureForkJoin;
import net.tomp2p.futures.FutureGet;
import net.tomp2p.futures.FuturePut;
import net.tomp2p.futures.FutureRemove;
import net.tomp2p.futures.FutureResponse;
import net.tomp2p.futures.FutureRouting;
import net.tomp2p.futures.FutureSend;
import net.tomp2p.futures.FutureShutdown;
import net.tomp2p.message.Message;
import net.tomp2p.p2p.DistributedRouting;
import net.tomp2p.p2p.OperationMapper;
import net.tomp2p.p2p.RequestP2PConfiguration;
import net.tomp2p.p2p.VotingSchemeDHT;
import net.tomp2p.p2p.builder.AddBuilder;
import net.tomp2p.p2p.builder.BasicBuilder;
import net.tomp2p.p2p.builder.DigestBuilder;
import net.tomp2p.p2p.builder.GetBuilder;
import net.tomp2p.p2p.builder.PutBuilder;
import net.tomp2p.p2p.builder.RemoveBuilder;
import net.tomp2p.p2p.builder.RoutingBuilder;
import net.tomp2p.p2p.builder.SearchableBuilder;
import net.tomp2p.p2p.builder.SendBuilder;
import net.tomp2p.p2p.builder.ShutdownBuilder;
import net.tomp2p.peers.Number160;
import net.tomp2p.peers.Number640;
import net.tomp2p.peers.PeerAddress;
import net.tomp2p.rpc.DefaultBloomfilterFactory;
import net.tomp2p.rpc.DigestInfo;
import net.tomp2p.rpc.DigestResult;
import net.tomp2p.rpc.DirectDataRPC;
import net.tomp2p.rpc.QuitRPC;
import net.tomp2p.rpc.SimpleBloomFilter;
import net.tomp2p.rpc.StorageRPC;
import net.tomp2p.storage.Data;
import net.tomp2p.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedHashTable {
    private static final Logger logger = LoggerFactory.getLogger(DistributedHashTable.class);
    public static final int REASON_CANCEL = 254;
    public static final int REASON_UNKOWN = 255;
    private final DistributedRouting routing;
    private final StorageRPC storeRCP;
    private final DirectDataRPC directDataRPC;
    private final QuitRPC quitRPC;

    public DistributedHashTable(DistributedRouting routing, StorageRPC storeRCP, DirectDataRPC directDataRPC, QuitRPC quitRPC) {
        this.routing = routing;
        this.storeRCP = storeRCP;
        this.directDataRPC = directDataRPC;
        this.quitRPC = quitRPC;
    }

    public FuturePut add(final AddBuilder builder) {
        final FuturePut futureDHT = new FuturePut(builder, builder.getRequestP2PConfiguration().getMinimumResults(), builder.getDataSet().size());
        builder.getFutureChannelCreator().addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureChannelCreator>(){

            @Override
            public void operationComplete(final FutureChannelCreator future) throws Exception {
                if (future.isSuccess()) {
                    RoutingBuilder routingBuilder = DistributedHashTable.createBuilder(builder);
                    FutureRouting futureRouting = DistributedHashTable.this.routing.route(routingBuilder, Message.Type.REQUEST_1, future.channelCreator());
                    futureDHT.setFutureRouting(futureRouting);
                    futureRouting.addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureRouting>(){

                        @Override
                        public void operationComplete(FutureRouting futureRouting) throws Exception {
                            if (futureRouting.isSuccess()) {
                                logger.debug("adding lkey={} on {}", (Object)builder.getLocationKey(), futureRouting.getPotentialHits());
                                DistributedHashTable.parallelRequests(builder.getRequestP2PConfiguration(), (NavigableSet<PeerAddress>)futureRouting.getPotentialHits(), futureDHT, false, future.channelCreator(), new OperationMapper<FuturePut>(){
                                    Map<PeerAddress, Map<Number640, Byte>> rawData = new HashMap<PeerAddress, Map<Number640, Byte>>();

                                    @Override
                                    public FutureResponse create(ChannelCreator channelCreator, PeerAddress address) {
                                        return DistributedHashTable.this.storeRCP.add(address, builder, channelCreator);
                                    }

                                    @Override
                                    public void response(FuturePut futureDHT) {
                                        futureDHT.setStoredKeys(this.rawData);
                                    }

                                    @Override
                                    public void interMediateResponse(FutureResponse future) {
                                        if (future.isSuccess() && future.getResponse().isOk()) {
                                            this.rawData.put(future.getRequest().getRecipient(), future.getResponse().getKeyMapByte(0).keysMap());
                                        }
                                    }
                                });
                            } else {
                                futureDHT.setFailed("routing failed");
                            }
                        }
                    });
                    futureDHT.addFutureDHTReleaseListener(future.channelCreator());
                } else {
                    futureDHT.setFailed(future);
                }
            }
        });
        return futureDHT;
    }

    public FutureSend direct(final SendBuilder builder) {
        final FutureSend futureDHT = new FutureSend(builder, builder.getRequestP2PConfiguration().getMinimumResults(), new VotingSchemeDHT());
        builder.getFutureChannelCreator().addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureChannelCreator>(){

            @Override
            public void operationComplete(final FutureChannelCreator future) throws Exception {
                if (future.isSuccess()) {
                    RoutingBuilder routingBuilder = DistributedHashTable.createBuilder(builder);
                    FutureRouting futureRouting = DistributedHashTable.this.routing.route(routingBuilder, Message.Type.REQUEST_1, future.channelCreator());
                    futureDHT.setFutureRouting(futureRouting);
                    futureRouting.addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureRouting>(){

                        @Override
                        public void operationComplete(FutureRouting futureRouting) throws Exception {
                            if (futureRouting.isSuccess()) {
                                logger.debug("storing lkey={} on {}", (Object)builder.getLocationKey(), futureRouting.getPotentialHits());
                                DistributedHashTable.parallelRequests(builder.getRequestP2PConfiguration(), (NavigableSet<PeerAddress>)futureRouting.getPotentialHits(), futureDHT, builder.isCancelOnFinish(), future.channelCreator(), new OperationMapper<FutureSend>(){
                                    Map<PeerAddress, ByteBuf> rawChannels = new HashMap<PeerAddress, ByteBuf>();
                                    Map<PeerAddress, Object> rawObjects = new HashMap<PeerAddress, Object>();

                                    @Override
                                    public FutureResponse create(ChannelCreator channelCreator, PeerAddress address) {
                                        return DistributedHashTable.this.directDataRPC.send(address, builder, channelCreator);
                                    }

                                    @Override
                                    public void response(FutureSend futureDHT) {
                                        if (builder.isRaw()) {
                                            futureDHT.setDirectData1(this.rawChannels);
                                        } else {
                                            futureDHT.setDirectData2(this.rawObjects);
                                        }
                                    }

                                    @Override
                                    public void interMediateResponse(FutureResponse future) {
                                        if (future.isSuccess() && future.getResponse().isOk()) {
                                            if (builder.isRaw()) {
                                                this.rawChannels.put(future.getRequest().getRecipient(), future.getResponse().getBuffer(0).buffer());
                                            } else {
                                                try {
                                                    this.rawObjects.put(future.getRequest().getRecipient(), future.getResponse().getBuffer(0).object());
                                                }
                                                catch (ClassNotFoundException e) {
                                                    this.rawObjects.put(future.getRequest().getRecipient(), e);
                                                }
                                                catch (IOException e) {
                                                    this.rawObjects.put(future.getRequest().getRecipient(), e);
                                                }
                                            }
                                        }
                                    }
                                });
                            } else {
                                futureDHT.setFailed("routing failed");
                            }
                        }
                    });
                    futureDHT.addFutureDHTReleaseListener(future.channelCreator());
                } else {
                    futureDHT.setFailed(future);
                }
            }
        });
        return futureDHT;
    }

    public FuturePut put(final PutBuilder putBuilder) {
        int dataSize = Utils.dataSize(putBuilder);
        final FuturePut futureDHT = new FuturePut(putBuilder, putBuilder.getRequestP2PConfiguration().getMinimumResults(), dataSize);
        putBuilder.getFutureChannelCreator().addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureChannelCreator>(){

            @Override
            public void operationComplete(final FutureChannelCreator future) throws Exception {
                if (future.isSuccess()) {
                    RoutingBuilder routingBuilder = DistributedHashTable.createBuilder(putBuilder);
                    FutureRouting futureRouting = DistributedHashTable.this.routing.route(routingBuilder, Message.Type.REQUEST_1, future.channelCreator());
                    futureDHT.setFutureRouting(futureRouting);
                    futureRouting.addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureRouting>(){

                        @Override
                        public void operationComplete(FutureRouting futureRouting) throws Exception {
                            if (futureRouting.isSuccess()) {
                                logger.debug("storing lkey={} on {}", (Object)putBuilder.getLocationKey(), futureRouting.getPotentialHits());
                                DistributedHashTable.parallelRequests(putBuilder.getRequestP2PConfiguration(), (NavigableSet<PeerAddress>)futureRouting.getPotentialHits(), futureDHT, false, future.channelCreator(), new OperationMapper<FuturePut>(){
                                    Map<PeerAddress, Map<Number640, Byte>> rawData = new HashMap<PeerAddress, Map<Number640, Byte>>();

                                    @Override
                                    public FutureResponse create(ChannelCreator channelCreator, PeerAddress address) {
                                        if (putBuilder.isPutIfAbsent()) {
                                            return DistributedHashTable.this.storeRCP.putIfAbsent(address, putBuilder, channelCreator);
                                        }
                                        if (putBuilder.isPutMeta()) {
                                            return DistributedHashTable.this.storeRCP.putMeta(address, putBuilder, channelCreator);
                                        }
                                        return DistributedHashTable.this.storeRCP.put(address, putBuilder, channelCreator);
                                    }

                                    @Override
                                    public void response(FuturePut futureDHT) {
                                        futureDHT.setStoredKeys(this.rawData);
                                    }

                                    @Override
                                    public void interMediateResponse(FutureResponse future) {
                                        if (future.isSuccess() && future.getResponse().isOk()) {
                                            this.rawData.put(future.getRequest().getRecipient(), future.getResponse().getKeyMapByte(0).keysMap());
                                        } else if (future.getResponse() == null) {
                                            Map<Number640, Byte> error = Utils.setMapError(future.getRequest().getDataMap(0).dataMap(), (byte)-2);
                                            this.rawData.put(future.getRequest().getRecipient(), error);
                                        } else {
                                            logger.debug("future failed: " + future.getFailedReason());
                                            Map<Number640, Byte> error = Utils.setMapError(future.getRequest().getDataMap(0).dataMap(), (byte)-1);
                                            this.rawData.put(future.getRequest().getRecipient(), error);
                                        }
                                    }
                                });
                            } else {
                                futureDHT.setFailed("routing failed");
                            }
                        }
                    });
                    futureDHT.addFutureDHTReleaseListener(future.channelCreator());
                } else {
                    futureDHT.setFailed(future);
                }
            }
        });
        return futureDHT;
    }

    public FutureGet get(final GetBuilder builder) {
        final FutureGet futureDHT = new FutureGet(builder, builder.getRequestP2PConfiguration().getMinimumResults(), new VotingSchemeDHT());
        builder.getFutureChannelCreator().addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureChannelCreator>(){

            @Override
            public void operationComplete(final FutureChannelCreator future) throws Exception {
                if (future.isSuccess()) {
                    RoutingBuilder routingBuilder = DistributedHashTable.createBuilder(builder);
                    DistributedHashTable.fillRoutingBuilder(builder, routingBuilder);
                    FutureRouting futureRouting = DistributedHashTable.this.routing.route(routingBuilder, Message.Type.REQUEST_2, future.channelCreator());
                    futureDHT.setFutureRouting(futureRouting);
                    futureRouting.addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureRouting>(){

                        @Override
                        public void operationComplete(FutureRouting futureRouting) throws Exception {
                            if (futureRouting.isSuccess()) {
                                logger.debug("found direct hits for get: {}", futureRouting.getDirectHits());
                                RequestP2PConfiguration p2pConfiguration2 = builder.isRange() ? builder.getRequestP2PConfiguration() : DistributedHashTable.adjustConfiguration(builder.getRequestP2PConfiguration(), futureRouting.getDirectHitsDigest());
                                DistributedHashTable.parallelRequests(p2pConfiguration2, (NavigableSet<PeerAddress>)(builder.isRange() ? futureRouting.getPotentialHits() : futureRouting.getDirectHits()), futureDHT, true, future.channelCreator(), new OperationMapper<FutureGet>(){
                                    Map<PeerAddress, Map<Number640, Data>> rawData = new HashMap<PeerAddress, Map<Number640, Data>>();

                                    @Override
                                    public FutureResponse create(ChannelCreator channelCreator, PeerAddress address) {
                                        return DistributedHashTable.this.storeRCP.get(address, builder, channelCreator);
                                    }

                                    @Override
                                    public void response(FutureGet futureDHT) {
                                        futureDHT.setReceivedData(this.rawData);
                                    }

                                    @Override
                                    public void interMediateResponse(FutureResponse future) {
                                        if (future.isSuccess()) {
                                            this.rawData.put(future.getRequest().getRecipient(), future.getResponse().getDataMap(0).dataMap());
                                            logger.debug("set data from {}", (Object)future.getRequest().getRecipient());
                                        }
                                    }
                                });
                            } else {
                                futureDHT.setFailed("routing failed");
                            }
                        }
                    });
                    futureDHT.addFutureDHTReleaseListener(future.channelCreator());
                } else {
                    futureDHT.setFailed(future);
                }
            }
        });
        return futureDHT;
    }

    public FutureDigest digest(final DigestBuilder builder) {
        final FutureDigest futureDHT = new FutureDigest(builder, builder.getRequestP2PConfiguration().getMinimumResults(), new VotingSchemeDHT());
        builder.getFutureChannelCreator().addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureChannelCreator>(){

            @Override
            public void operationComplete(final FutureChannelCreator future) throws Exception {
                if (future.isSuccess()) {
                    RoutingBuilder routingBuilder = DistributedHashTable.createBuilder(builder);
                    DistributedHashTable.fillRoutingBuilder(builder, routingBuilder);
                    FutureRouting futureRouting = DistributedHashTable.this.routing.route(routingBuilder, Message.Type.REQUEST_2, future.channelCreator());
                    futureDHT.setFutureRouting(futureRouting);
                    futureRouting.addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureRouting>(){

                        @Override
                        public void operationComplete(FutureRouting futureRouting) throws Exception {
                            if (futureRouting.isSuccess()) {
                                logger.debug("found direct hits for digest: {}", futureRouting.getDirectHits());
                                DistributedHashTable.parallelRequests(builder.getRequestP2PConfiguration(), (NavigableSet<PeerAddress>)(builder.isRange() ? futureRouting.getPotentialHits() : futureRouting.getDirectHits()), futureDHT, true, future.channelCreator(), new OperationMapper<FutureDigest>(){
                                    Map<PeerAddress, DigestResult> rawDigest = new HashMap<PeerAddress, DigestResult>();

                                    @Override
                                    public FutureResponse create(ChannelCreator channelCreator, PeerAddress address) {
                                        return DistributedHashTable.this.storeRCP.digest(address, builder, channelCreator);
                                    }

                                    @Override
                                    public void response(FutureDigest futureDHT) {
                                        futureDHT.setReceivedDigest(this.rawDigest);
                                    }

                                    @Override
                                    public void interMediateResponse(FutureResponse future) {
                                        if (future.isSuccess()) {
                                            DigestResult digest;
                                            if (builder.isReturnMetaValues()) {
                                                Map<Number640, Data> dataMap = future.getResponse().getDataMap(0).dataMap();
                                                digest = new DigestResult(dataMap);
                                            } else if (builder.isReturnBloomFilter()) {
                                                SimpleBloomFilter<Number160> sbf1 = future.getResponse().getBloomFilter(0);
                                                SimpleBloomFilter<Number160> sbf2 = future.getResponse().getBloomFilter(1);
                                                digest = new DigestResult(sbf1, sbf2);
                                            } else {
                                                NavigableMap<Number640, Set<Number160>> keyDigest = future.getResponse().getKeyMap640Keys(0).keysMap();
                                                digest = new DigestResult(keyDigest);
                                            }
                                            this.rawDigest.put(future.getRequest().getRecipient(), digest);
                                            logger.debug("set data from {}", (Object)future.getRequest().getRecipient());
                                        }
                                    }
                                });
                            } else {
                                futureDHT.setFailed("routing failed");
                            }
                        }
                    });
                    futureDHT.addFutureDHTReleaseListener(future.channelCreator());
                } else {
                    futureDHT.setFailed(future);
                }
            }
        });
        return futureDHT;
    }

    public FutureRemove remove(final RemoveBuilder builder) {
        int dataSize = Utils.dataSize(builder);
        final FutureRemove futureDHT = new FutureRemove(builder, builder.getRequestP2PConfiguration().getMinimumResults(), new VotingSchemeDHT(), dataSize);
        builder.getFutureChannelCreator().addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureChannelCreator>(){

            @Override
            public void operationComplete(final FutureChannelCreator future) throws Exception {
                if (future.isSuccess()) {
                    RoutingBuilder routingBuilder = DistributedHashTable.createBuilder(builder);
                    DistributedHashTable.fillRoutingBuilder(builder, routingBuilder);
                    FutureRouting futureRouting = DistributedHashTable.this.routing.route(routingBuilder, Message.Type.REQUEST_2, future.channelCreator());
                    futureDHT.setFutureRouting(futureRouting);
                    futureRouting.addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureRouting>(){

                        @Override
                        public void operationComplete(FutureRouting futureRouting) throws Exception {
                            if (futureRouting.isSuccess()) {
                                logger.debug("found direct hits for remove: {}", futureRouting.getDirectHits());
                                RequestP2PConfiguration p2pConfiguration2 = DistributedHashTable.adjustConfiguration(builder.getRequestP2PConfiguration(), futureRouting.getDirectHitsDigest());
                                DistributedHashTable.parallelRequests(p2pConfiguration2, (NavigableSet<PeerAddress>)futureRouting.getDirectHits(), futureDHT, false, future.channelCreator(), new OperationMapper<FutureRemove>(){
                                    Map<PeerAddress, Map<Number640, Data>> rawDataResult = new HashMap<PeerAddress, Map<Number640, Data>>();
                                    Map<PeerAddress, Map<Number640, Byte>> rawDataNoResult = new HashMap<PeerAddress, Map<Number640, Byte>>();

                                    @Override
                                    public FutureResponse create(ChannelCreator channelCreator, PeerAddress address) {
                                        return DistributedHashTable.this.storeRCP.remove(address, builder, channelCreator);
                                    }

                                    @Override
                                    public void response(FutureRemove futureDHT) {
                                        if (builder.isReturnResults()) {
                                            futureDHT.setReceivedData(this.rawDataResult);
                                        } else {
                                            futureDHT.setStoredKeys(this.rawDataNoResult);
                                        }
                                    }

                                    @Override
                                    public void interMediateResponse(FutureResponse future) {
                                        if (future.isSuccess() && future.getResponse().isOk()) {
                                            if (builder.isReturnResults()) {
                                                this.rawDataResult.put(future.getRequest().getRecipient(), future.getResponse().getDataMap(0).dataMap());
                                            } else {
                                                this.rawDataNoResult.put(future.getRequest().getRecipient(), future.getResponse().getKeyMapByte(0).keysMap());
                                            }
                                        }
                                    }
                                });
                            } else {
                                futureDHT.setFailed("routing failed");
                            }
                        }
                    });
                    futureDHT.addFutureDHTReleaseListener(future.channelCreator());
                } else {
                    futureDHT.setFailed(future);
                }
            }
        });
        return futureDHT;
    }

    public FutureShutdown quit(final ShutdownBuilder builder) {
        final FutureShutdown futureShutdown = new FutureShutdown(builder);
        builder.getFutureChannelCreator().addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureChannelCreator>(){

            @Override
            public void operationComplete(FutureChannelCreator future) throws Exception {
                if (future.isSuccess()) {
                    NavigableSet<PeerAddress> closePeers = DistributedHashTable.this.routing.peerMap().closePeers(20);
                    closePeers = builder.filter(closePeers);
                    DistributedHashTable.parallelRequests(builder.getRequestP2PConfiguration(), (NavigableSet<PeerAddress>)closePeers, futureShutdown, false, future.channelCreator(), new OperationMapper<FutureShutdown>(){

                        @Override
                        public FutureResponse create(ChannelCreator channelCreator, PeerAddress address) {
                            return DistributedHashTable.this.quitRPC.quit(address, builder, channelCreator);
                        }

                        @Override
                        public void response(FutureShutdown future) {
                            future.setDone();
                        }

                        @Override
                        public void interMediateResponse(FutureResponse future) {
                            futureShutdown.report(future.getRequest().getRecipient(), future.isSuccess());
                        }
                    });
                    futureShutdown.addFutureDHTReleaseListener(future.channelCreator());
                } else {
                    futureShutdown.setFailed(future);
                }
            }
        });
        return futureShutdown;
    }

    public static <K extends FutureDHT<?>> K parallelRequests(final RequestP2PConfiguration p2pConfiguration, final NavigableSet<PeerAddress> queue, final boolean cancleOnFinish, FutureChannelCreator futureChannelCreator, final OperationMapper<K> operation, final K futureDHT) {
        futureChannelCreator.addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureChannelCreator>(){

            @Override
            public void operationComplete(FutureChannelCreator future) throws Exception {
                if (future.isSuccess()) {
                    DistributedHashTable.parallelRequests(p2pConfiguration, (NavigableSet<PeerAddress>)queue, futureDHT, cancleOnFinish, future.channelCreator(), operation);
                    Utils.addReleaseListener(future.channelCreator(), futureDHT);
                } else {
                    futureDHT.setFailed(future);
                }
            }
        });
        return futureDHT;
    }

    private static <K extends FutureDHT<?>> void parallelRequests(RequestP2PConfiguration p2pConfiguration, NavigableSet<PeerAddress> queue, K future, boolean cancleOnFinish, ChannelCreator channelCreator, OperationMapper<K> operation) {
        if (p2pConfiguration.getMinimumResults() == 0) {
            operation.response(future);
            return;
        }
        FutureResponse[] futures = new FutureResponse[p2pConfiguration.getParallel()];
        DistributedHashTable.loopRec(queue, p2pConfiguration.getMinimumResults(), new AtomicInteger(0), p2pConfiguration.getMaxFailure(), p2pConfiguration.getParallelDiff(), new AtomicReferenceArray<FutureResponse>(futures), future, cancleOnFinish, channelCreator, operation);
    }

    private static <K extends FutureDHT<?>> void loopRec(final NavigableSet<PeerAddress> queue, final int min, final AtomicInteger nrFailure, final int maxFailure, final int parallelDiff, final AtomicReferenceArray<FutureResponse> futures, final K futureDHT, final boolean cancelOnFinish, final ChannelCreator channelCreator, final OperationMapper<K> operation) {
        int active = 0;
        for (int i = 0; i < min + parallelDiff; ++i) {
            if (futures.get(i) == null) {
                PeerAddress next = queue.pollFirst();
                if (next == null) continue;
                ++active;
                FutureResponse futureResponse = operation.create(channelCreator, next);
                futures.set(i, futureResponse);
                futureDHT.addRequests(futureResponse);
                continue;
            }
            ++active;
        }
        if (active == 0) {
            operation.response(futureDHT);
            if (cancelOnFinish) {
                DistributedHashTable.cancel(futures);
            }
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("fork/join status: " + min + "/" + active + " (" + parallelDiff + ")");
        }
        FutureForkJoin<FutureResponse> fp = new FutureForkJoin<FutureResponse>(Math.min(min, active), false, futures);
        fp.addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureForkJoin<FutureResponse>>(){

            @Override
            public void operationComplete(FutureForkJoin<FutureResponse> future) throws Exception {
                for (FutureResponse futureResponse : future.getCompleted()) {
                    operation.interMediateResponse(futureResponse);
                }
                if (future.isSuccess() || nrFailure.incrementAndGet() > maxFailure) {
                    if (cancelOnFinish) {
                        DistributedHashTable.cancel(futures);
                    }
                    operation.response(futureDHT);
                } else {
                    DistributedHashTable.loopRec(queue, min - future.getSuccessCounter(), nrFailure, maxFailure, parallelDiff, futures, futureDHT, cancelOnFinish, channelCreator, operation);
                }
            }
        });
    }

    private static RoutingBuilder createBuilder(BasicBuilder<?> builder) {
        RoutingBuilder routingBuilder = builder.createBuilder(builder.getRequestP2PConfiguration(), builder.getRoutingConfiguration());
        routingBuilder.setLocationKey(builder.getLocationKey());
        routingBuilder.setDomainKey(builder.getDomainKey());
        routingBuilder.peerFilters(builder.peerFilters());
        return routingBuilder;
    }

    private static void fillRoutingBuilder(SearchableBuilder builder, RoutingBuilder routingBuilder) {
        if (builder.from() != null && builder.to() != null) {
            routingBuilder.setRange(builder.from(), builder.to());
        } else if (builder.contentKeys() != null && builder.contentKeys().size() == 1) {
            routingBuilder.setContentKey(builder.contentKeys().iterator().next());
        } else if (builder.contentKeys() != null && builder.contentKeys().size() > 1) {
            DefaultBloomfilterFactory factory = new DefaultBloomfilterFactory();
            SimpleBloomFilter<Number160> bf = factory.createContentKeyBloomFilter();
            for (Number160 contentKey : builder.contentKeys()) {
                bf.add(contentKey);
            }
            routingBuilder.setKeyBloomFilter(bf);
        }
    }

    private static void cancel(AtomicReferenceArray<FutureResponse> futures) {
        int len = futures.length();
        for (int i = 0; i < len; ++i) {
            BaseFuture baseFuture = futures.get(i);
            if (baseFuture == null) continue;
            baseFuture.cancel();
        }
    }

    public static RequestP2PConfiguration adjustConfiguration(RequestP2PConfiguration p2pConfiguration, SortedMap<PeerAddress, DigestInfo> directHitsDigest) {
        int requested;
        int size = directHitsDigest.size();
        if (size >= (requested = p2pConfiguration.getMinimumResults())) {
            return p2pConfiguration;
        }
        return new RequestP2PConfiguration(size, p2pConfiguration.getMaxFailure(), p2pConfiguration.getParallelDiff());
    }
}

