/*
 * Decompiled with CFR 0.152.
 */
package net.tomp2p.replication;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import net.tomp2p.futures.BaseFuture;
import net.tomp2p.futures.BaseFutureAdapter;
import net.tomp2p.futures.BaseFutureListener;
import net.tomp2p.futures.FutureChannelCreator;
import net.tomp2p.futures.FutureDHT;
import net.tomp2p.futures.FutureResponse;
import net.tomp2p.p2p.Peer;
import net.tomp2p.p2p.builder.PutBuilder;
import net.tomp2p.peers.Number160;
import net.tomp2p.peers.Number480;
import net.tomp2p.peers.PeerAddress;
import net.tomp2p.replication.Replication;
import net.tomp2p.replication.ResponsibilityListener;
import net.tomp2p.rpc.StorageRPC;
import net.tomp2p.storage.Data;
import net.tomp2p.storage.StorageGeneric;
import net.tomp2p.utils.Timings;
import net.tomp2p.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicationExecutor
implements ResponsibilityListener,
Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ReplicationExecutor.class);
    private final StorageGeneric storage;
    private final StorageRPC storageRPC;
    private final Peer peer;
    private final Map<BaseFuture, Long> pendingFutures;
    private final Replication replicationStorage;
    private static final int REPLICATION = 6;

    public ReplicationExecutor(Peer peer) {
        this.peer = peer;
        this.storage = peer.getPeerBean().getStorage();
        this.storageRPC = peer.getStoreRPC();
        this.pendingFutures = peer.getPendingFutures();
        this.replicationStorage = peer.getPeerBean().getReplicationStorage();
        this.replicationStorage.addResponsibilityListener(this);
        this.replicationStorage.setReplicationFactor(6);
    }

    @Override
    public void otherResponsible(Number160 locationKey, PeerAddress other) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Other peer " + other + " is responsible for " + locationKey + " I'm " + this.storageRPC.getPeerAddress());
        }
        Map<Number480, Data> dataMap = this.storage.subMap(locationKey);
        Number160 domainKeyOld = null;
        HashMap<Number160, Data> dataMapConverted = new HashMap<Number160, Data>();
        for (Map.Entry<Number480, Data> entry : dataMap.entrySet()) {
            Number160 domainKey = entry.getKey().getDomainKey();
            Number160 contentKey = entry.getKey().getContentKey();
            Data data = entry.getValue();
            if (LOG.isDebugEnabled()) {
                LOG.debug("transfer from " + this.storageRPC.getPeerAddress() + " to " + other + " for key " + locationKey);
            }
            if (domainKeyOld == null || domainKeyOld.equals(domainKey)) {
                dataMapConverted.put(contentKey, data);
            } else {
                HashMap<Number160, Data> dataMapConverted1 = new HashMap<Number160, Data>(dataMapConverted);
                this.sendDirect(other, locationKey, domainKey, dataMapConverted1);
                dataMapConverted.clear();
            }
            domainKeyOld = domainKey;
        }
        if (!dataMapConverted.isEmpty()) {
            this.sendDirect(other, locationKey, domainKeyOld, dataMapConverted);
        }
    }

    @Override
    public void meResponsible(Number160 locationKey) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("I (" + this.storageRPC.getPeerAddress() + ") now responsible for " + locationKey);
        }
        this.synchronizeData(locationKey);
    }

    @Override
    public void run() {
        Collection<Number160> locationKeys = this.storage.findContentForResponsiblePeerID(this.peer.getPeerID());
        for (Number160 locationKey : locationKeys) {
            this.synchronizeData(locationKey);
        }
    }

    private void synchronizeData(Number160 locationKey) {
        Map<Number480, Data> dataMap = this.storage.subMap(locationKey);
        Number160 domainKeyOld = null;
        HashMap<Number160, Data> dataMapConverted = new HashMap<Number160, Data>();
        for (Map.Entry<Number480, Data> entry : dataMap.entrySet()) {
            Number160 domainKey = entry.getKey().getDomainKey();
            Number160 contentKey = entry.getKey().getContentKey();
            Data data = entry.getValue();
            if (LOG.isDebugEnabled()) {
                LOG.debug("[storage refresh] I (" + this.storageRPC.getPeerAddress() + ") restore " + locationKey);
            }
            if (domainKeyOld == null || domainKeyOld.equals(domainKey)) {
                dataMapConverted.put(contentKey, data);
            } else {
                HashMap<Number160, Data> dataMapConverted1 = new HashMap<Number160, Data>(dataMapConverted);
                this.pendingFutures.put(this.send(locationKey, domainKey, dataMapConverted1), System.currentTimeMillis());
                dataMapConverted.clear();
                dataMapConverted.put(contentKey, data);
            }
            domainKeyOld = domainKey;
        }
        if (!dataMapConverted.isEmpty() && domainKeyOld != null) {
            this.pendingFutures.put(this.send(locationKey, domainKeyOld, dataMapConverted), System.currentTimeMillis());
        }
    }

    protected FutureDHT send(Number160 locationKey, Number160 domainKey, Map<Number160, Data> dataMapConverted) {
        return ((PutBuilder)this.peer.put(locationKey).setDataMap(dataMapConverted).setDomainKey(domainKey)).setPutIfAbsent(true).start();
    }

    protected void sendDirect(final PeerAddress other, final Number160 locationKey, final Number160 domainKey, final Map<Number160, Data> dataMapConverted) {
        FutureChannelCreator futureChannelCreator = this.peer.getConnectionBean().getConnectionReservation().reserve(1);
        futureChannelCreator.addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureChannelCreator>(){

            @Override
            public void operationComplete(FutureChannelCreator future) throws Exception {
                if (future.isSuccess()) {
                    FutureResponse futureResponse = ReplicationExecutor.this.storageRPC.put(other, locationKey, domainKey, dataMapConverted, false, false, false, future.getChannelCreator(), false, null);
                    Utils.addReleaseListener(futureResponse, ReplicationExecutor.this.peer.getConnectionBean().getConnectionReservation(), future.getChannelCreator(), 1);
                    ReplicationExecutor.this.pendingFutures.put(futureResponse, Timings.currentTimeMillis());
                } else if (LOG.isErrorEnabled()) {
                    LOG.error("otherResponsible failed " + future.getFailedReason());
                }
            }
        });
    }
}

