/*
 * Decompiled with CFR 0.152.
 */
package net.tomp2p.dht;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import net.tomp2p.connection.ChannelCreator;
import net.tomp2p.dht.AddBuilder;
import net.tomp2p.dht.DigestBuilder;
import net.tomp2p.dht.FutureDHT;
import net.tomp2p.dht.FutureDigest;
import net.tomp2p.dht.FutureGet;
import net.tomp2p.dht.FuturePut;
import net.tomp2p.dht.FutureRemove;
import net.tomp2p.dht.FutureSend;
import net.tomp2p.dht.GetBuilder;
import net.tomp2p.dht.OperationMapper;
import net.tomp2p.dht.PutBuilder;
import net.tomp2p.dht.RemoveBuilder;
import net.tomp2p.dht.SearchableBuilder;
import net.tomp2p.dht.SendBuilder;
import net.tomp2p.dht.StorageLayer;
import net.tomp2p.dht.StorageRPC;
import net.tomp2p.dht.UtilsDHT;
import net.tomp2p.dht.VotingSchemeDHT;
import net.tomp2p.futures.BaseFuture;
import net.tomp2p.futures.BaseFutureAdapter;
import net.tomp2p.futures.BaseFutureListener;
import net.tomp2p.futures.FutureChannelCreator;
import net.tomp2p.futures.FutureDone;
import net.tomp2p.futures.FutureForkJoin;
import net.tomp2p.futures.FutureResponse;
import net.tomp2p.futures.FutureRouting;
import net.tomp2p.message.KeyMap640Keys;
import net.tomp2p.message.Message;
import net.tomp2p.p2p.DistributedRouting;
import net.tomp2p.p2p.RequestP2PConfiguration;
import net.tomp2p.p2p.builder.BasicBuilder;
import net.tomp2p.p2p.builder.RoutingBuilder;
import net.tomp2p.peers.Number160;
import net.tomp2p.peers.Number640;
import net.tomp2p.peers.PeerAddress;
import net.tomp2p.rpc.DefaultBloomfilterFactory;
import net.tomp2p.rpc.DigestResult;
import net.tomp2p.rpc.DirectDataRPC;
import net.tomp2p.rpc.RPC;
import net.tomp2p.rpc.SendDirectBuilderI;
import net.tomp2p.rpc.SimpleBloomFilter;
import net.tomp2p.storage.Data;
import net.tomp2p.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedHashTable {
    private static final Logger logger = LoggerFactory.getLogger(DistributedHashTable.class);
    private static final NavigableSet<PeerAddress> EMPTY_NAVIGABLE_SET = new TreeSet<PeerAddress>();
    public static final int REASON_CANCEL = 254;
    public static final int REASON_UNKOWN = 255;
    private final DistributedRouting routing;
    private final StorageRPC storeRCP;
    private final DirectDataRPC directDataRPC;

    public DistributedHashTable(DistributedRouting routing, StorageRPC storeRCP, DirectDataRPC directDataRPC) {
        this.routing = routing;
        this.storeRCP = storeRCP;
        this.directDataRPC = directDataRPC;
    }

    public FuturePut add(final AddBuilder builder) {
        final FuturePut futureDHT = new FuturePut(builder, builder.requestP2PConfiguration().minimumResults(), builder.dataSet().size());
        builder.futureChannelCreator().addListener((BaseFutureListener)new BaseFutureAdapter<FutureChannelCreator>(){

            public void operationComplete(final FutureChannelCreator future) throws Exception {
                if (future.isSuccess()) {
                    RoutingBuilder routingBuilder = DistributedHashTable.createBuilder(builder);
                    FutureRouting futureRouting = DistributedHashTable.this.routing.route(routingBuilder, Message.Type.REQUEST_1, future.channelCreator());
                    futureDHT.futureRouting(futureRouting);
                    futureRouting.addListener((BaseFutureListener)new BaseFutureAdapter<FutureRouting>(){

                        public void operationComplete(FutureRouting futureRouting) throws Exception {
                            if (futureRouting.isSuccess()) {
                                logger.debug("adding lkey={} on {}", (Object)builder.locationKey(), (Object)futureRouting.potentialHits());
                                DistributedHashTable.parallelRequests(builder.requestP2PConfiguration(), (NavigableSet<PeerAddress>)EMPTY_NAVIGABLE_SET, (NavigableSet<PeerAddress>)futureRouting.potentialHits(), futureDHT, false, future.channelCreator(), new OperationMapper<FuturePut>(){
                                    Map<PeerAddress, Map<Number640, Byte>> rawData = new HashMap<PeerAddress, Map<Number640, Byte>>();

                                    @Override
                                    public FutureResponse create(ChannelCreator channelCreator, PeerAddress address) {
                                        return DistributedHashTable.this.storeRCP.add(address, builder, channelCreator);
                                    }

                                    @Override
                                    public void response(FuturePut futureDHT, FutureDone<Void> futuresCompleted) {
                                        futureDHT.storedKeys(this.rawData, futuresCompleted);
                                    }

                                    @Override
                                    public void interMediateResponse(FutureResponse future) {
                                        if (future.isSuccess() && future.responseMessage().isOk()) {
                                            this.rawData.put(future.request().recipient(), future.responseMessage().keyMapByte(0).keysMap());
                                        }
                                    }
                                });
                            } else {
                                futureDHT.failed((BaseFuture)futureRouting);
                            }
                        }
                    });
                    futureDHT.addFutureDHTReleaseListener(future.channelCreator());
                } else {
                    futureDHT.failed((BaseFuture)future);
                }
            }
        });
        return futureDHT;
    }

    public FutureSend direct(final SendBuilder builder) {
        final FutureSend futureDHT = new FutureSend(builder, builder.requestP2PConfiguration().minimumResults(), new VotingSchemeDHT());
        builder.futureChannelCreator().addListener((BaseFutureListener)new BaseFutureAdapter<FutureChannelCreator>(){

            public void operationComplete(final FutureChannelCreator future) throws Exception {
                if (future.isSuccess()) {
                    RoutingBuilder routingBuilder = DistributedHashTable.createBuilder(builder);
                    FutureRouting futureRouting = DistributedHashTable.this.routing.route(routingBuilder, Message.Type.REQUEST_1, future.channelCreator());
                    futureDHT.futureRouting(futureRouting);
                    futureRouting.addListener((BaseFutureListener)new BaseFutureAdapter<FutureRouting>(){

                        public void operationComplete(FutureRouting futureRouting) throws Exception {
                            if (futureRouting.isSuccess()) {
                                logger.debug("storing lkey={} on {}", (Object)builder.locationKey(), (Object)futureRouting.potentialHits());
                                DistributedHashTable.parallelRequests(builder.requestP2PConfiguration(), (NavigableSet<PeerAddress>)EMPTY_NAVIGABLE_SET, (NavigableSet<PeerAddress>)futureRouting.potentialHits(), futureDHT, builder.isCancelOnFinish(), future.channelCreator(), new OperationMapper<FutureSend>(){
                                    Map<PeerAddress, ByteBuf> rawChannels = new HashMap<PeerAddress, ByteBuf>();
                                    Map<PeerAddress, Object> rawObjects = new HashMap<PeerAddress, Object>();

                                    @Override
                                    public FutureResponse create(ChannelCreator channelCreator, PeerAddress address) {
                                        return DistributedHashTable.this.directDataRPC.send(address, (SendDirectBuilderI)builder, channelCreator);
                                    }

                                    @Override
                                    public void response(FutureSend futureDHT, FutureDone<Void> futuresCompleted) {
                                        if (builder.isRaw()) {
                                            futureDHT.directData1(this.rawChannels, futuresCompleted);
                                        } else {
                                            futureDHT.directData2(this.rawObjects, futuresCompleted);
                                        }
                                    }

                                    @Override
                                    public void interMediateResponse(FutureResponse future) {
                                        if (future.isSuccess() && future.responseMessage().isOk()) {
                                            if (builder.isRaw()) {
                                                this.rawChannels.put(future.request().recipient(), future.responseMessage().buffer(0).buffer());
                                            } else {
                                                try {
                                                    this.rawObjects.put(future.request().recipient(), future.responseMessage().buffer(0).object());
                                                }
                                                catch (ClassNotFoundException e) {
                                                    this.rawObjects.put(future.request().recipient(), e);
                                                }
                                                catch (IOException e) {
                                                    this.rawObjects.put(future.request().recipient(), e);
                                                }
                                            }
                                        }
                                    }
                                });
                            } else {
                                futureDHT.failed((BaseFuture)futureRouting);
                            }
                        }
                    });
                    futureDHT.addFutureDHTReleaseListener(future.channelCreator());
                } else {
                    futureDHT.failed((BaseFuture)future);
                }
            }
        });
        return futureDHT;
    }

    public FuturePut put(final PutBuilder putBuilder) {
        int dataSize = UtilsDHT.dataSize(putBuilder);
        final FuturePut futureDHT = new FuturePut(putBuilder, putBuilder.requestP2PConfiguration().minimumResults(), dataSize);
        putBuilder.futureChannelCreator().addListener((BaseFutureListener)new BaseFutureAdapter<FutureChannelCreator>(){

            public void operationComplete(final FutureChannelCreator future) throws Exception {
                if (future.isSuccess()) {
                    RoutingBuilder routingBuilder = DistributedHashTable.createBuilder(putBuilder);
                    FutureRouting futureRouting = DistributedHashTable.this.routing.route(routingBuilder, Message.Type.REQUEST_1, future.channelCreator());
                    futureDHT.futureRouting(futureRouting);
                    futureRouting.addListener((BaseFutureListener)new BaseFutureAdapter<FutureRouting>(){

                        public void operationComplete(FutureRouting futureRouting) throws Exception {
                            if (futureRouting.isSuccess()) {
                                logger.debug("storing lkey={} on {}", (Object)putBuilder.locationKey(), (Object)futureRouting.potentialHits());
                                DistributedHashTable.parallelRequests(putBuilder.requestP2PConfiguration(), (NavigableSet<PeerAddress>)EMPTY_NAVIGABLE_SET, (NavigableSet<PeerAddress>)futureRouting.potentialHits(), futureDHT, false, future.channelCreator(), new OperationMapper<FuturePut>(){
                                    Map<PeerAddress, Map<Number640, Byte>> rawData = new HashMap<PeerAddress, Map<Number640, Byte>>();

                                    @Override
                                    public FutureResponse create(ChannelCreator channelCreator, PeerAddress address) {
                                        if (putBuilder.isPutIfAbsent()) {
                                            return DistributedHashTable.this.storeRCP.putIfAbsent(address, putBuilder, channelCreator);
                                        }
                                        if (putBuilder.isPutMeta()) {
                                            return DistributedHashTable.this.storeRCP.putMeta(address, putBuilder, channelCreator);
                                        }
                                        if (putBuilder.isPutConfirm()) {
                                            return DistributedHashTable.this.storeRCP.putConfirm(address, putBuilder, channelCreator);
                                        }
                                        return DistributedHashTable.this.storeRCP.put(address, putBuilder, channelCreator);
                                    }

                                    @Override
                                    public void response(FuturePut futureDHT, FutureDone<Void> futuresCompleted) {
                                        futureDHT.storedKeys(this.rawData, futuresCompleted);
                                    }

                                    @Override
                                    public void interMediateResponse(FutureResponse future) {
                                        if (future.isSuccess() && future.responseMessage().isOk()) {
                                            this.rawData.put(future.request().recipient(), future.responseMessage().keyMapByte(0).keysMap());
                                        } else if (future.emptyResponse() == null) {
                                            Map error = Utils.setMapError((Map)future.request().dataMap(0).dataMap(), (byte)-2);
                                            this.rawData.put(future.request().recipient(), error);
                                        } else {
                                            logger.debug("future failed: " + future.failedReason());
                                            Map error = Utils.setMapError((Map)future.request().dataMap(0).dataMap(), (byte)-1);
                                            this.rawData.put(future.request().recipient(), error);
                                        }
                                    }
                                });
                            } else {
                                futureDHT.failed((BaseFuture)futureRouting);
                            }
                        }
                    });
                    futureDHT.addFutureDHTReleaseListener(future.channelCreator());
                } else {
                    futureDHT.failed((BaseFuture)future);
                }
            }
        });
        return futureDHT;
    }

    public FutureGet get(final GetBuilder builder) {
        final FutureGet futureDHT = new FutureGet(builder, builder.requestP2PConfiguration().minimumResults(), new VotingSchemeDHT());
        builder.futureChannelCreator().addListener((BaseFutureListener)new BaseFutureAdapter<FutureChannelCreator>(){

            public void operationComplete(final FutureChannelCreator future) throws Exception {
                if (future.isSuccess()) {
                    RoutingBuilder routingBuilder = DistributedHashTable.createBuilder(builder);
                    DistributedHashTable.fillRoutingBuilder(builder, routingBuilder);
                    FutureRouting futureRouting = DistributedHashTable.this.routing.route(routingBuilder, builder.isFastGet() ? Message.Type.REQUEST_2 : Message.Type.REQUEST_1, future.channelCreator());
                    futureDHT.futureRouting(futureRouting);
                    futureRouting.addListener((BaseFutureListener)new BaseFutureAdapter<FutureRouting>(){

                        public void operationComplete(FutureRouting futureRouting) throws Exception {
                            if (futureRouting.isSuccess()) {
                                logger.debug("found direct hits for get: {}", (Object)futureRouting.directHits());
                                RequestP2PConfiguration p2pConfiguration2 = DistributedHashTable.adjustConfiguration(builder.requestP2PConfiguration, futureRouting.potentialHits().size());
                                DistributedHashTable.parallelRequests(p2pConfiguration2, (NavigableSet<PeerAddress>)(builder.isFastGet() ? futureRouting.directHits() : EMPTY_NAVIGABLE_SET), (NavigableSet<PeerAddress>)futureRouting.potentialHits(), futureDHT, true, future.channelCreator(), new OperationMapper<FutureGet>(){
                                    Map<PeerAddress, Map<Number640, Data>> rawData = new HashMap<PeerAddress, Map<Number640, Data>>();
                                    Map<PeerAddress, DigestResult> rawDigest = new HashMap<PeerAddress, DigestResult>();
                                    Map<PeerAddress, Byte> rawStatus = new HashMap<PeerAddress, Byte>();

                                    @Override
                                    public FutureResponse create(ChannelCreator channelCreator, PeerAddress address) {
                                        if (builder.isGetLatest()) {
                                            if (builder.isWithDigest()) {
                                                return DistributedHashTable.this.storeRCP.getLatest(address, builder, channelCreator, RPC.Commands.GET_LATEST_WITH_DIGEST);
                                            }
                                            return DistributedHashTable.this.storeRCP.getLatest(address, builder, channelCreator, RPC.Commands.GET_LATEST);
                                        }
                                        return DistributedHashTable.this.storeRCP.get(address, builder, channelCreator);
                                    }

                                    @Override
                                    public void response(FutureGet futureDHT, FutureDone<Void> futuresCompleted) {
                                        futureDHT.receivedData(this.rawData, this.rawDigest, this.rawStatus, futuresCompleted);
                                    }

                                    @Override
                                    public void interMediateResponse(FutureResponse future) {
                                        if (future.isSuccess()) {
                                            KeyMap640Keys keyMaps;
                                            boolean hasData = false;
                                            NavigableMap data = future.responseMessage().dataMap(0).dataMap();
                                            if (data != null && !data.isEmpty()) {
                                                this.rawData.put(future.request().recipient(), data);
                                                hasData = true;
                                            }
                                            if ((keyMaps = future.responseMessage().keyMap640Keys(0)) != null && keyMaps.keysMap() != null) {
                                                this.rawDigest.put(future.request().recipient(), new DigestResult(keyMaps.keysMap()));
                                                hasData = true;
                                            }
                                            if (hasData) {
                                                this.rawStatus.put(future.request().recipient(), (byte)StorageLayer.PutStatus.OK.ordinal());
                                            } else {
                                                this.rawStatus.put(future.request().recipient(), (byte)StorageLayer.PutStatus.NOT_FOUND.ordinal());
                                            }
                                            logger.debug("set data from {}", (Object)future.request().recipient());
                                        } else {
                                            this.rawStatus.put(future.request().recipient(), (byte)StorageLayer.PutStatus.FAILED.ordinal());
                                        }
                                    }
                                });
                            } else {
                                futureDHT.failed((BaseFuture)futureRouting);
                            }
                        }
                    });
                    futureDHT.addFutureDHTReleaseListener(future.channelCreator());
                } else {
                    futureDHT.failed((BaseFuture)future);
                }
            }
        });
        return futureDHT;
    }

    public FutureDigest digest(final DigestBuilder builder) {
        final FutureDigest futureDHT = new FutureDigest(builder, builder.requestP2PConfiguration().minimumResults(), new VotingSchemeDHT());
        builder.futureChannelCreator().addListener((BaseFutureListener)new BaseFutureAdapter<FutureChannelCreator>(){

            public void operationComplete(final FutureChannelCreator future) throws Exception {
                if (future.isSuccess()) {
                    RoutingBuilder routingBuilder = DistributedHashTable.createBuilder(builder);
                    DistributedHashTable.fillRoutingBuilder(builder, routingBuilder);
                    FutureRouting futureRouting = DistributedHashTable.this.routing.route(routingBuilder, builder.isFastGet() ? Message.Type.REQUEST_2 : Message.Type.REQUEST_1, future.channelCreator());
                    futureDHT.futureRouting(futureRouting);
                    futureRouting.addListener((BaseFutureListener)new BaseFutureAdapter<FutureRouting>(){

                        public void operationComplete(FutureRouting futureRouting) throws Exception {
                            if (futureRouting.isSuccess()) {
                                logger.debug("found direct hits for digest: {}", (Object)futureRouting.directHits());
                                DistributedHashTable.parallelRequests(builder.requestP2PConfiguration(), (NavigableSet<PeerAddress>)(builder.isFastGet() ? futureRouting.directHits() : EMPTY_NAVIGABLE_SET), (NavigableSet<PeerAddress>)futureRouting.potentialHits(), futureDHT, true, future.channelCreator(), new OperationMapper<FutureDigest>(){
                                    Map<PeerAddress, DigestResult> rawDigest = new HashMap<PeerAddress, DigestResult>();

                                    @Override
                                    public FutureResponse create(ChannelCreator channelCreator, PeerAddress address) {
                                        return DistributedHashTable.this.storeRCP.digest(address, builder, channelCreator);
                                    }

                                    @Override
                                    public void response(FutureDigest futureDHT, FutureDone<Void> futuresCompleted) {
                                        futureDHT.receivedDigest(this.rawDigest, futuresCompleted);
                                    }

                                    @Override
                                    public void interMediateResponse(FutureResponse future) {
                                        if (future.isSuccess()) {
                                            DigestResult digest;
                                            if (builder.isReturnMetaValues()) {
                                                NavigableMap dataMap = future.responseMessage().dataMap(0).dataMap();
                                                digest = new DigestResult((Map)dataMap);
                                            } else if (builder.isReturnBloomFilter() || builder.isReturnAllBloomFilter()) {
                                                SimpleBloomFilter sbf1 = future.responseMessage().bloomFilter(0);
                                                SimpleBloomFilter sbf2 = future.responseMessage().bloomFilter(1);
                                                SimpleBloomFilter sbf3 = future.responseMessage().bloomFilter(2);
                                                digest = new DigestResult(sbf1, sbf2, sbf3);
                                            } else {
                                                NavigableMap keyDigest = future.responseMessage().keyMap640Keys(0).keysMap();
                                                digest = new DigestResult(keyDigest);
                                            }
                                            this.rawDigest.put(future.request().recipient(), digest);
                                            logger.debug("set data from {}", (Object)future.request().recipient());
                                        }
                                    }
                                });
                            } else {
                                futureDHT.failed((BaseFuture)futureRouting);
                            }
                        }
                    });
                    futureDHT.addFutureDHTReleaseListener(future.channelCreator());
                } else {
                    futureDHT.failed((BaseFuture)future);
                }
            }
        });
        return futureDHT;
    }

    public FutureRemove remove(final RemoveBuilder builder) {
        final FutureRemove futureDHT = new FutureRemove(builder, new VotingSchemeDHT());
        builder.futureChannelCreator().addListener((BaseFutureListener)new BaseFutureAdapter<FutureChannelCreator>(){

            public void operationComplete(final FutureChannelCreator future) throws Exception {
                if (future.isSuccess()) {
                    RoutingBuilder routingBuilder = DistributedHashTable.createBuilder(builder);
                    DistributedHashTable.fillRoutingBuilder(builder, routingBuilder);
                    FutureRouting futureRouting = DistributedHashTable.this.routing.route(routingBuilder, builder.isFastGet() ? Message.Type.REQUEST_2 : Message.Type.REQUEST_1, future.channelCreator());
                    futureDHT.futureRouting(futureRouting);
                    futureRouting.addListener((BaseFutureListener)new BaseFutureAdapter<FutureRouting>(){

                        public void operationComplete(FutureRouting futureRouting) throws Exception {
                            if (futureRouting.isSuccess()) {
                                logger.debug("found direct hits for remove: {}", (Object)futureRouting.directHits());
                                RequestP2PConfiguration p2pConfiguration2 = DistributedHashTable.adjustConfiguration(builder.requestP2PConfiguration, futureRouting.potentialHits().size());
                                DistributedHashTable.parallelRequests(p2pConfiguration2, (NavigableSet<PeerAddress>)(builder.isFastGet() ? futureRouting.directHits() : EMPTY_NAVIGABLE_SET), (NavigableSet<PeerAddress>)futureRouting.potentialHits(), futureDHT, false, future.channelCreator(), new OperationMapper<FutureRemove>(){
                                    Map<PeerAddress, Map<Number640, Data>> rawDataResult = new HashMap<PeerAddress, Map<Number640, Data>>();
                                    Map<PeerAddress, Map<Number640, Byte>> rawDataNoResult = new HashMap<PeerAddress, Map<Number640, Byte>>();

                                    @Override
                                    public FutureResponse create(ChannelCreator channelCreator, PeerAddress address) {
                                        return DistributedHashTable.this.storeRCP.remove(address, builder, channelCreator);
                                    }

                                    @Override
                                    public void response(FutureRemove futureDHT, FutureDone<Void> futuresCompleted) {
                                        if (builder.isReturnResults()) {
                                            futureDHT.receivedData(this.rawDataResult, futuresCompleted);
                                        } else {
                                            futureDHT.storedKeys(this.rawDataNoResult, futuresCompleted);
                                        }
                                    }

                                    @Override
                                    public void interMediateResponse(FutureResponse future) {
                                        if (future.isSuccess() && future.responseMessage().isOk()) {
                                            if (builder.isReturnResults()) {
                                                this.rawDataResult.put(future.request().recipient(), future.responseMessage().dataMap(0).dataMap());
                                            } else {
                                                this.rawDataNoResult.put(future.request().recipient(), future.responseMessage().keyMapByte(0).keysMap());
                                            }
                                        }
                                    }
                                });
                            } else {
                                futureDHT.failed((BaseFuture)futureRouting);
                            }
                        }
                    });
                    futureDHT.addFutureDHTReleaseListener(future.channelCreator());
                } else {
                    futureDHT.failed((BaseFuture)future);
                }
            }
        });
        return futureDHT;
    }

    public static <K extends FutureDHT<?>> K parallelRequests(final RequestP2PConfiguration p2pConfiguration, final NavigableSet<PeerAddress> directHit, final NavigableSet<PeerAddress> potentialHit, final boolean cancleOnFinish, FutureChannelCreator futureChannelCreator, final OperationMapper<K> operation, final K futureDHT) {
        futureChannelCreator.addListener((BaseFutureListener)new BaseFutureAdapter<FutureChannelCreator>(){

            public void operationComplete(FutureChannelCreator future) throws Exception {
                if (future.isSuccess()) {
                    DistributedHashTable.parallelRequests(p2pConfiguration, (NavigableSet<PeerAddress>)directHit, (NavigableSet<PeerAddress>)potentialHit, futureDHT, cancleOnFinish, future.channelCreator(), operation);
                    UtilsDHT.addReleaseListener(future.channelCreator(), futureDHT);
                } else {
                    futureDHT.failed((BaseFuture)future);
                }
            }
        });
        return futureDHT;
    }

    private static <K extends FutureDHT<?>> void parallelRequests(RequestP2PConfiguration p2pConfiguration, NavigableSet<PeerAddress> directHit, NavigableSet<PeerAddress> potentialHit, K future, boolean cancleOnFinish, ChannelCreator channelCreator, OperationMapper<K> operation) {
        for (PeerAddress peerAddress : directHit) {
            potentialHit.remove(peerAddress);
        }
        if (p2pConfiguration.minimumResults() == 0) {
            operation.response(future, null);
            return;
        }
        FutureResponse[] futures = new FutureResponse[p2pConfiguration.parallel()];
        DistributedHashTable.loopRec(directHit, potentialHit, p2pConfiguration.minimumResults(), new AtomicInteger(0), p2pConfiguration.maxFailure(), p2pConfiguration.parallelDiff(), new AtomicReferenceArray<FutureResponse>(futures), future, cancleOnFinish, channelCreator, operation);
    }

    private static <K extends FutureDHT<?>> void loopRec(final NavigableSet<PeerAddress> directHit, final NavigableSet<PeerAddress> potentialHit, final int min, final AtomicInteger nrFailure, final int maxFailure, final int parallelDiff, final AtomicReferenceArray<FutureResponse> futures, final K futureDHT, final boolean cancelOnFinish, final ChannelCreator channelCreator, final OperationMapper<K> operation) {
        int active = 0;
        for (int i = 0; i < min + parallelDiff; ++i) {
            if (futures.get(i) == null) {
                PeerAddress next = directHit.pollFirst();
                if (next == null) {
                    next = potentialHit.pollFirst();
                }
                if (next == null) continue;
                ++active;
                FutureResponse futureResponse = operation.create(channelCreator, next);
                futures.set(i, futureResponse);
                futureDHT.addRequests(futureResponse);
                continue;
            }
            ++active;
        }
        if (active == 0) {
            operation.response(futureDHT, null);
            if (cancelOnFinish) {
                DistributedHashTable.cancel(futures);
            }
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("fork/join status: " + min + "/" + active + " (" + parallelDiff + ")");
        }
        FutureForkJoin fp = new FutureForkJoin(Math.min(min, active), false, futures);
        fp.addListener((BaseFutureListener)new BaseFutureAdapter<FutureForkJoin<FutureResponse>>(){

            public void operationComplete(FutureForkJoin<FutureResponse> future) throws Exception {
                for (FutureResponse futureResponse : future.completed()) {
                    operation.interMediateResponse(futureResponse);
                }
                if (future.isSuccess() || nrFailure.incrementAndGet() > maxFailure) {
                    if (cancelOnFinish) {
                        DistributedHashTable.cancel(futures);
                    }
                    operation.response(futureDHT, (FutureDone<Void>)future.futuresCompleted());
                } else {
                    DistributedHashTable.loopRec(directHit, potentialHit, min - future.successCounter(), nrFailure, maxFailure, parallelDiff, futures, futureDHT, cancelOnFinish, channelCreator, operation);
                }
            }
        });
    }

    private static RoutingBuilder createBuilder(BasicBuilder<?> builder) {
        RoutingBuilder routingBuilder = builder.createBuilder(builder.requestP2PConfiguration(), builder.routingConfiguration());
        routingBuilder.locationKey(builder.locationKey());
        routingBuilder.domainKey(builder.domainKey());
        routingBuilder.peerMapFilters(builder.peerMapFilters());
        routingBuilder.postRoutingFilters(builder.postRoutingFilters());
        return routingBuilder;
    }

    private static void fillRoutingBuilder(SearchableBuilder builder, RoutingBuilder routingBuilder) {
        if (builder.from() != null && builder.to() != null) {
            routingBuilder.range(builder.from(), builder.to());
        } else if (builder.contentKeys() != null && builder.contentKeys().size() == 1) {
            routingBuilder.contentKey(builder.contentKeys().iterator().next());
        } else if (builder.contentKeys() != null && builder.contentKeys().size() > 1) {
            DefaultBloomfilterFactory factory = new DefaultBloomfilterFactory();
            SimpleBloomFilter bf = factory.createContentKeyBloomFilter();
            for (Number160 contentKey : builder.contentKeys()) {
                bf.add((Object)contentKey);
            }
            routingBuilder.keyBloomFilter(bf);
        }
    }

    private static void cancel(AtomicReferenceArray<FutureResponse> futures) {
        int len = futures.length();
        for (int i = 0; i < len; ++i) {
            BaseFuture baseFuture = (BaseFuture)futures.get(i);
            if (baseFuture == null) continue;
            baseFuture.cancel();
        }
    }

    public static RequestP2PConfiguration adjustConfiguration(RequestP2PConfiguration p2pConfiguration, int size) {
        int requested = p2pConfiguration.minimumResults();
        if (size >= requested) {
            return p2pConfiguration;
        }
        return new RequestP2PConfiguration(size, p2pConfiguration.maxFailure(), p2pConfiguration.parallelDiff());
    }
}

