/*
 * Decompiled with CFR 0.152.
 */
package org.openstreetmap.osmosis.replicationhttp.v0_6.impl;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.openstreetmap.osmosis.core.OsmosisRuntimeException;
import org.openstreetmap.osmosis.replicationhttp.v0_6.impl.SequenceServerChannelPipelineFactory;
import org.openstreetmap.osmosis.replicationhttp.v0_6.impl.SequenceServerControl;
import org.openstreetmap.osmosis.replicationhttp.v0_6.impl.ServerStatistics;

public class SequenceServer
implements SequenceServerControl {
    private static final Logger LOG = Logger.getLogger(SequenceServer.class.getName());
    private int port;
    private SequenceServerChannelPipelineFactory channelPipelineFactory;
    private Lock sharedLock;
    private boolean serverStarted;
    private long sequenceNumber;
    private ChannelFactory factory;
    private ChannelGroup allChannels;
    private List<Channel> waitingChannels;
    private ExecutorService sendService;
    private int totalRequests;

    public SequenceServer(int port, SequenceServerChannelPipelineFactory channelPipelineFactory) {
        this.port = port;
        this.channelPipelineFactory = channelPipelineFactory;
        channelPipelineFactory.setControl(this);
        this.sharedLock = new ReentrantLock();
        this.waitingChannels = new ArrayList<Channel>();
    }

    public int getPort() {
        return this.port;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start(long initialSequenceNumber) {
        this.sharedLock.lock();
        try {
            if (this.serverStarted) {
                throw new OsmosisRuntimeException("The server has already been started");
            }
            this.sequenceNumber = initialSequenceNumber;
            this.totalRequests = 0;
            this.allChannels = new DefaultChannelGroup("sequence-server");
            this.factory = new NioServerSocketChannelFactory((Executor)Executors.newCachedThreadPool(), (Executor)Executors.newCachedThreadPool());
            ServerBootstrap bootstrap = new ServerBootstrap(this.factory);
            bootstrap.setPipelineFactory((ChannelPipelineFactory)this.channelPipelineFactory);
            bootstrap.setOption("child.tcpNoDelay", (Object)true);
            bootstrap.setOption("child.keepAlive", (Object)true);
            Channel serverChannel = bootstrap.bind((SocketAddress)new InetSocketAddress(this.port));
            this.allChannels.add((Object)serverChannel);
            InetSocketAddress address = (InetSocketAddress)serverChannel.getLocalAddress();
            this.port = address.getPort();
            if (LOG.isLoggable(Level.INFO)) {
                LOG.info("Server listening on port " + this.port);
            }
            this.sendService = Executors.newSingleThreadExecutor();
            this.serverStarted = true;
        }
        finally {
            this.sharedLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void update(long newSequenceNumber) {
        this.sharedLock.lock();
        try {
            if (!this.serverStarted) {
                throw new OsmosisRuntimeException("The server has not been started");
            }
            if (LOG.isLoggable(Level.FINER)) {
                LOG.finer("Updating with new sequence " + newSequenceNumber);
            }
            if (newSequenceNumber < this.sequenceNumber) {
                throw new OsmosisRuntimeException("Received sequence number " + newSequenceNumber + " from server, expected " + this.sequenceNumber + " or greater");
            }
            long oldSequenceNumber = this.sequenceNumber;
            this.sequenceNumber = newSequenceNumber;
            if (oldSequenceNumber < this.sequenceNumber) {
                final long nextSequenceNumber = oldSequenceNumber + 1L;
                List<Channel> existingWaitingChannels = this.waitingChannels;
                this.waitingChannels = new ArrayList<Channel>();
                for (final Channel channel : existingWaitingChannels) {
                    if (LOG.isLoggable(Level.FINEST)) {
                        LOG.finest("Waking up channel " + channel + " with sequence " + this.sequenceNumber);
                    }
                    this.sendService.submit(new Runnable(){

                        @Override
                        public void run() {
                            SequenceServer.this.sendSequence(channel, nextSequenceNumber, true);
                        }
                    });
                }
            }
        }
        finally {
            this.sharedLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        this.sharedLock.lock();
        try {
            if (this.serverStarted) {
                this.sendService.shutdownNow();
                this.allChannels.close().awaitUninterruptibly();
                this.factory.releaseExternalResources();
                this.serverStarted = false;
            }
        }
        finally {
            this.sharedLock.unlock();
        }
    }

    private void sendSequence(final Channel channel, final long currentSequenceNumber, final boolean follow) {
        ChannelFuture future = channel.write((Object)currentSequenceNumber);
        if (follow) {
            future.addListener(new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        SequenceServer.this.determineNextChannelAction(channel, currentSequenceNumber + 1L, follow);
                    }
                }
            });
        } else {
            future.addListener(new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) throws Exception {
                    channel.close();
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void determineNextChannelActionImpl(Channel channel, long nextSequenceNumber, boolean follow) {
        boolean sequenceAvailable;
        this.sharedLock.lock();
        try {
            long currentSequenceNumber = this.sequenceNumber;
            boolean bl = sequenceAvailable = nextSequenceNumber <= currentSequenceNumber;
            if (!sequenceAvailable && nextSequenceNumber - currentSequenceNumber > 1L) {
                channel.close();
                throw new OsmosisRuntimeException("Requested sequence number " + nextSequenceNumber + " is more than 1 past current number " + currentSequenceNumber);
            }
            if (!sequenceAvailable) {
                if (LOG.isLoggable(Level.FINEST)) {
                    LOG.finest("Next sequence " + nextSequenceNumber + " is not available yet so adding channel " + channel + " to waiting list.");
                }
                this.waitingChannels.add(channel);
            }
        }
        finally {
            this.sharedLock.unlock();
        }
        if (sequenceAvailable) {
            if (LOG.isLoggable(Level.FINEST)) {
                LOG.finest("Next sequence " + nextSequenceNumber + " is available.");
            }
            this.sendSequence(channel, nextSequenceNumber, follow);
        }
    }

    @Override
    public void determineNextChannelAction(final Channel channel, final long nextSequenceNumber, final boolean follow) {
        this.sendService.submit(new Runnable(){

            @Override
            public void run() {
                SequenceServer.this.determineNextChannelActionImpl(channel, nextSequenceNumber, follow);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public long getLatestSequenceNumber() {
        this.sharedLock.lock();
        try {
            long l = this.sequenceNumber;
            return l;
        }
        finally {
            this.sharedLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerChannel(Channel channel) {
        this.sharedLock.lock();
        try {
            ++this.totalRequests;
        }
        finally {
            this.sharedLock.unlock();
        }
        this.allChannels.add((Object)channel);
    }

    @Override
    public ServerStatistics getStatistics() {
        return new ServerStatistics(this.totalRequests, this.allChannels.size() - 1);
    }
}

