/*
 * Decompiled with CFR 0.152.
 */
package net.tomp2p.task;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import net.tomp2p.connection.ConnectionBean;
import net.tomp2p.futures.BaseFuture;
import net.tomp2p.futures.BaseFutureAdapter;
import net.tomp2p.futures.BaseFutureListener;
import net.tomp2p.futures.FutureChannelCreator;
import net.tomp2p.futures.FutureResponse;
import net.tomp2p.p2p.Peer;
import net.tomp2p.peers.Number160;
import net.tomp2p.peers.Number320;
import net.tomp2p.peers.PeerAddress;
import net.tomp2p.rpc.DigestInfo;
import net.tomp2p.rpc.TaskRPC;
import net.tomp2p.storage.Data;
import net.tomp2p.task.TaskResultListener;
import net.tomp2p.task.TaskStatus;
import net.tomp2p.task.Worker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskManager {
    private static final Logger logger = LoggerFactory.getLogger(TaskManager.class);
    private final ConnectionBean connectionBean;
    private final Object lock = new Object();
    private final ThreadPoolExecutor executor;
    private final Map<Number320, TaskStatus.Status> status = new HashMap<Number320, TaskStatus.Status>();
    private final Map<Number320, String> exceptions = new HashMap<Number320, String>();
    private final Collection<TaskResultListener> listeners = new ArrayList<TaskResultListener>();
    private TaskRPC taskRPC;

    public TaskManager(ConnectionBean connectionBean, int threads) {
        this.connectionBean = connectionBean;
        this.executor = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    }

    public void addListener(TaskResultListener taskResultListener) {
        this.listeners.add(taskResultListener);
    }

    public void removeListener(TaskResultListener taskResultListener) {
        this.listeners.remove(taskResultListener);
    }

    public void notifyListeners(Number320 taskKey, Map<Number160, Data> dataMap) {
        for (TaskResultListener taskResultListener : this.listeners) {
            taskResultListener.taskReceived(taskKey, dataMap);
        }
    }

    public void init(TaskRPC taskRPC) {
        this.taskRPC = taskRPC;
    }

    public TaskRPC getTaskRPC() {
        if (this.taskRPC == null) {
            throw new IllegalStateException("init() was not called yet");
        }
        return this.taskRPC;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TaskStatus taskStatus(Number320 taskKey) {
        String exception;
        TaskStatus statusResult = new TaskStatus();
        Object object = this.lock;
        synchronized (object) {
            exception = this.exceptions.get(taskKey);
        }
        if (exception != null) {
            statusResult.setFaildeReason(exception);
            statusResult.setStatus(TaskStatus.Status.FAILED);
            if (logger.isDebugEnabled()) {
                logger.debug("finished task failed for task with ID " + taskKey);
            }
            return statusResult;
        }
        int pos = 0;
        Task taskFound = null;
        for (Runnable runnable : this.executor.getQueue()) {
            Task task = (Task)runnable;
            if (task.taskKey.equals(taskKey)) {
                taskFound = task;
                break;
            }
            ++pos;
        }
        if (taskFound != null) {
            statusResult.setQueuePosition(pos);
            statusResult.setStatus(TaskStatus.Status.QUEUE);
            if (logger.isDebugEnabled()) {
                logger.debug("finished task queue for task with ID " + taskKey);
            }
            return statusResult;
        }
        Object object2 = this.lock;
        synchronized (object2) {
            statusResult.setStatus(this.status.get(taskKey));
        }
        if (logger.isDebugEnabled()) {
            logger.debug("finished task status for task with ID " + taskKey);
        }
        return statusResult;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int submitTask(Peer peer, Number160 taskId, Worker mapper, Map<Number160, Data> data, PeerAddress senderAddress, boolean sign) {
        Number320 taskKey = new Number320(taskId, peer.getPeerID());
        Object object = this.lock;
        synchronized (object) {
            this.status.put(taskKey, TaskStatus.Status.QUEUE);
        }
        Task task = new Task(peer, taskId, mapper, data, senderAddress, sign);
        this.executor.execute(task);
        return this.executor.getQueue().size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void registerException(Number320 taskKey, String string) {
        Object object = this.lock;
        synchronized (object) {
            this.exceptions.put(taskKey, string);
        }
    }

    public DigestInfo digest() {
        return new DigestInfo(this.executor.getQueue().size());
    }

    public void shutdown() {
        List<Runnable> jobs = this.executor.shutdownNow();
        if (jobs.size() > 0 && logger.isWarnEnabled()) {
            logger.warn("shutting down and not executing " + jobs.size() + " jobs");
        }
    }

    private class Task
    implements Runnable {
        private final Number160 taskId;
        private final Worker mapper;
        private final Map<Number160, Data> inputData;
        private final PeerAddress senderAddress;
        private final boolean sign;
        private final Peer peer;
        final Number320 taskKey;

        public Task(Peer peer, Number160 taskId, Worker mapper, Map<Number160, Data> inputData, PeerAddress senderAddress, boolean sign) {
            this.peer = peer;
            this.taskId = taskId;
            this.mapper = mapper;
            this.inputData = inputData;
            this.senderAddress = senderAddress;
            this.sign = sign;
            this.taskKey = new Number320(taskId, peer.getPeerID());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Thread.currentThread().setName("task-manager " + this.taskId);
            if (logger.isDebugEnabled()) {
                logger.debug("started task " + this.taskId + " which came from " + this.senderAddress);
            }
            Object object = TaskManager.this.lock;
            synchronized (object) {
                TaskManager.this.status.put(this.taskKey, TaskStatus.Status.STARTED);
            }
            Map<Number160, Data> outputData = null;
            try {
                outputData = this.mapper.execute(this.peer, this.taskId, this.inputData);
            }
            catch (Exception e) {
                outputData = null;
                TaskManager.this.registerException(this.taskKey, e.toString());
            }
            Object e = TaskManager.this.lock;
            synchronized (e) {
                TaskManager.this.status.put(this.taskKey, TaskStatus.Status.SUCCESS_RESULT_NOT_SENT);
            }
            final Map<Number160, Data> outputData2 = outputData;
            FutureChannelCreator futureChannelCreator = TaskManager.this.connectionBean.getConnectionReservation().reserve(1);
            futureChannelCreator.addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureChannelCreator>(){

                @Override
                public void operationComplete(final FutureChannelCreator futureChannelCreator) throws Exception {
                    if (futureChannelCreator.isSuccess()) {
                        FutureResponse futureResponse = TaskManager.this.getTaskRPC().sendResult(Task.this.senderAddress, futureChannelCreator.getChannelCreator(), Task.this.taskId, outputData2, Task.this.peer.getPeerBean().getKeyPair(), false, Task.this.sign);
                        futureResponse.addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureResponse>(){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            @Override
                            public void operationComplete(FutureResponse future) throws Exception {
                                if (future.isSuccess()) {
                                    Object object = TaskManager.this.lock;
                                    synchronized (object) {
                                        TaskManager.this.status.put(Task.this.taskKey, TaskStatus.Status.SUCCESS_RESULT_SENT);
                                    }
                                } else {
                                    TaskManager.this.registerException(Task.this.taskKey, "could not send result back");
                                }
                                TaskManager.this.connectionBean.getConnectionReservation().release(futureChannelCreator.getChannelCreator());
                            }
                        });
                    } else {
                        TaskManager.this.registerException(Task.this.taskKey, "could not reserve connection");
                    }
                }
            });
        }
    }
}

