/*
 * Decompiled with CFR 0.152.
 */
package net.tomp2p.connection;

import java.net.SocketAddress;
import net.tomp2p.connection.ChannelCreator;
import net.tomp2p.connection.ReplyTimeoutHandler;
import net.tomp2p.connection.Sender;
import net.tomp2p.futures.BaseFuture;
import net.tomp2p.futures.BaseFutureAdapter;
import net.tomp2p.futures.BaseFutureListener;
import net.tomp2p.futures.Cancellable;
import net.tomp2p.futures.FutureChannel;
import net.tomp2p.futures.FutureResponse;
import net.tomp2p.message.Message;
import net.tomp2p.p2p.ConnectionConfiguration;
import net.tomp2p.peers.PeerAddress;
import net.tomp2p.rpc.RequestHandlerTCP;
import net.tomp2p.rpc.RequestHandlerUDP;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SenderNetty
implements Sender {
    private static final Logger logger = LoggerFactory.getLogger(SenderNetty.class);
    private final Timer timer;
    private final ConnectionConfiguration configuration;
    private volatile boolean shutdown = false;

    public SenderNetty(ConnectionConfiguration configuration, Timer timer) {
        this.configuration = configuration;
        this.timer = timer;
    }

    @Override
    public void sendTCP(RequestHandlerTCP<? extends BaseFuture> handler, FutureResponse futureResponse, Message message, ChannelCreator channelCreator, int idleTCPMillis) {
        if (logger.isDebugEnabled()) {
            logger.debug("send TCP " + Thread.currentThread().getName());
        }
        if (this.shutdown) {
            futureResponse.setFailed("shutdown in progres");
            return;
        }
        this.sendTCP0(message.getRecipient(), handler, futureResponse, message, channelCreator, idleTCPMillis);
    }

    @Override
    public void sendUDP(RequestHandlerUDP<? extends BaseFuture> handler, FutureResponse futureResponse, Message message, ChannelCreator channelCreator) {
        if (logger.isDebugEnabled()) {
            logger.debug("send UDP " + Thread.currentThread().getName());
        }
        if (this.shutdown) {
            futureResponse.setFailed("shutdown in progres");
            return;
        }
        this.sendUDP0(message.getRecipient(), handler, futureResponse, message, false, channelCreator);
    }

    @Override
    public void sendBroadcastUDP(RequestHandlerUDP<? extends BaseFuture> handler, FutureResponse futureResponse, Message message, ChannelCreator channelCreator) {
        if (logger.isDebugEnabled()) {
            logger.debug("send UDP " + Thread.currentThread().getName());
        }
        if (this.shutdown) {
            futureResponse.setFailed("shutdown in progres");
            return;
        }
        this.sendUDP0(message.getRecipient(), handler, futureResponse, message, true, channelCreator);
    }

    private void sendTCP0(PeerAddress remotePeer, final RequestHandlerTCP<? extends BaseFuture> requestHandler, final FutureResponse futureResponse, final Message message, ChannelCreator channelCreator, int idleTCPMillis) {
        if (futureResponse.isCompleted()) {
            return;
        }
        try {
            ReplyTimeoutHandler replyTimeoutHandler = null;
            if (requestHandler != null) {
                replyTimeoutHandler = new ReplyTimeoutHandler(this.timer, idleTCPMillis, remotePeer);
                if (!channelCreator.isKeepAliveAndReuse()) {
                    futureResponse.setReplyTimeoutHandler(replyTimeoutHandler);
                }
            } else if (message.getType() != Message.Type.REQUEST_FF_1) {
                throw new RuntimeException("This send needs to be a fire and forget request");
            }
            final FutureChannel channelFutureConnect = channelCreator.createTCPChannel(replyTimeoutHandler, requestHandler, this.configuration.getConnectTimeoutMillis(), message.getRecipient().createSocketTCP());
            Cancellable cancel = new Cancellable(){

                @Override
                public void cancel() {
                    if (logger.isDebugEnabled()) {
                        logger.debug("cancel TCP connect");
                    }
                    channelFutureConnect.getChannelFuture().getChannel().close();
                    channelFutureConnect.getChannelFuture().cancel();
                    futureResponse.setFailed("cancelled");
                }
            };
            futureResponse.addCancellation(cancel);
            channelFutureConnect.addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureChannel>(){

                @Override
                public void operationComplete(FutureChannel future) throws Exception {
                    if (future.isSuccess()) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("send TCP message " + message);
                        }
                        ChannelFuture writeFuture = future.getChannel().write((Object)message);
                        SenderNetty.this.afterSend(writeFuture, futureResponse, requestHandler == null);
                    } else {
                        futureResponse.setFailed(future);
                        if (logger.isWarnEnabled()) {
                            logger.warn("Failed to connect TCP channel:" + message + "/" + futureResponse.getFailedReason());
                        }
                    }
                }
            });
        }
        catch (Exception ce) {
            futureResponse.setFailed("Could not get channel " + ce.toString());
            if (logger.isWarnEnabled()) {
                logger.warn(ce.toString());
            }
            if (logger.isDebugEnabled()) {
                ce.printStackTrace();
            }
            return;
        }
    }

    private void sendUDP0(final PeerAddress remotePeer, final RequestHandlerUDP<? extends BaseFuture> requestHandler, final FutureResponse futureResponse, final Message message, boolean broadcast, ChannelCreator channelCreator) {
        if (futureResponse.isCompleted()) {
            return;
        }
        ReplyTimeoutHandler replyTimeoutHandler = null;
        if (requestHandler != null) {
            replyTimeoutHandler = new ReplyTimeoutHandler(this.timer, this.configuration.getIdleUDPMillis(), remotePeer);
            futureResponse.setReplyTimeoutHandler(replyTimeoutHandler);
        } else if (message.getType() != Message.Type.REQUEST_FF_1 && message.getType() != Message.Type.REQUEST_FF_2) {
            throw new RuntimeException("This send needs to be a fire and forget request");
        }
        try {
            FutureChannel futureChannelCreation = channelCreator.createUDPChannel(replyTimeoutHandler, requestHandler, broadcast);
            futureChannelCreation.addListener((BaseFutureListener<BaseFuture>)new BaseFutureAdapter<FutureChannel>(){

                @Override
                public void operationComplete(FutureChannel future) throws Exception {
                    if (future.isSuccess()) {
                        ChannelFuture writeFuture = future.getChannel().write((Object)message, (SocketAddress)remotePeer.createSocketUDP());
                        SenderNetty.this.afterSend(writeFuture, futureResponse, requestHandler == null);
                    } else {
                        futureResponse.setFailed("shutdown in progres");
                    }
                }
            });
        }
        catch (Exception ce) {
            futureResponse.setFailed("Could not get channel " + ce.toString());
            logger.warn(ce.toString());
            if (logger.isDebugEnabled()) {
                ce.printStackTrace();
            }
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("send UDP message " + message);
        }
    }

    private void afterSend(final ChannelFuture writeFuture, final FutureResponse futureResponse, final boolean isFireAndForget) {
        Cancellable cancel = new Cancellable(){

            @Override
            public void cancel() {
                writeFuture.cancel();
                writeFuture.getChannel().close();
                futureResponse.setFailed("canceled");
            }
        };
        futureResponse.addCancellation(cancel);
        writeFuture.addListener(new ChannelFutureListener(){

            public void operationComplete(ChannelFuture writeFuture) {
                if (!writeFuture.isSuccess()) {
                    writeFuture.getChannel().close();
                    futureResponse.setFailed("Write failed");
                    logger.warn("Failed to write channel the request " + futureResponse.getRequest());
                    if (logger.isWarnEnabled() && writeFuture.getCause() != null) {
                        writeFuture.getCause().printStackTrace();
                    }
                }
                if (isFireAndForget) {
                    if (writeFuture.getChannel() instanceof DatagramChannel) {
                        writeFuture.getChannel().close();
                    }
                    futureResponse.setResponse(null);
                }
            }
        });
    }

    @Override
    public void shutdown() {
        this.shutdown = true;
    }
}

