package org.codehaus.activemq.store.howl;

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.QueueMessageContainer;
import org.codehaus.activemq.store.MessageStore;
import org.codehaus.activemq.util.Callback;
import org.codehaus.activemq.util.JMSExceptionHelper;
import org.codehaus.activemq.util.TransactionTemplate;
import org.objectweb.howl.log.LogConfigurationException;
import org.objectweb.howl.log.LogException;
import org.objectweb.howl.log.LogRecord;
import org.objectweb.howl.log.Logger;
import org.objectweb.howl.log.ReplayListener;

/* loaded from: input_file:org/codehaus/activemq/store/howl/HowlMessageStore.class */
public class HowlMessageStore implements MessageStore {
    private static final int DEFAULT_RECORD_SIZE = 65536;
    private static final Log log;
    private HowlPersistenceAdapter longTermPersistence;
    private MessageStore longTermStore;
    private Logger transactionLog;
    private WireFormat wireFormat;
    private TransactionTemplate transactionTemplate;
    private int maximumCacheSize = 100;
    private Map map = new LinkedHashMap();
    private boolean sync = true;
    private long lastLogMark;
    private Exception firstException;
    static Class class$org$codehaus$activemq$store$howl$HowlMessageStore;

    public HowlMessageStore(HowlPersistenceAdapter howlPersistenceAdapter, MessageStore messageStore, Logger logger, WireFormat wireFormat) {
        this.longTermPersistence = howlPersistenceAdapter;
        this.longTermStore = messageStore;
        this.transactionLog = logger;
        this.wireFormat = wireFormat;
        this.transactionTemplate = new TransactionTemplate(howlPersistenceAdapter);
    }

    @Override // org.codehaus.activemq.store.MessageStore
    public synchronized MessageIdentity addMessage(ActiveMQMessage activeMQMessage) throws JMSException {
        writePacket(activeMQMessage);
        if (!addMessageToCache(activeMQMessage)) {
            log.warn("Not enough RAM to store the active transaction log and so we're having to forcea checkpoint so that we can ensure that reads are efficient and do not have to replay the transaction log");
            checkpoint(activeMQMessage);
            this.longTermStore.addMessage(activeMQMessage);
        }
        return activeMQMessage.getJMSMessageIdentity();
    }

    @Override // org.codehaus.activemq.store.MessageStore
    public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException {
        ActiveMQMessage activeMQMessage;
        synchronized (this.map) {
            activeMQMessage = (ActiveMQMessage) this.map.get(messageIdentity.getMessageID());
        }
        if (activeMQMessage == null) {
            activeMQMessage = this.longTermStore.getMessage(messageIdentity);
        }
        return activeMQMessage;
    }

    @Override // org.codehaus.activemq.store.MessageStore
    public void removeMessage(MessageIdentity messageIdentity, MessageAck messageAck) throws JMSException {
        writePacket(messageAck);
        synchronized (this.map) {
            this.map.remove(messageIdentity.getMessageID());
        }
        this.longTermPersistence.onMessageRemove(this);
    }

    @Override // org.codehaus.activemq.store.MessageStore
    public synchronized void recover(QueueMessageContainer queueMessageContainer) throws JMSException {
        this.longTermStore.recover(queueMessageContainer);
        this.firstException = null;
        try {
            this.transactionLog.replay(new ReplayListener(this, queueMessageContainer) { // from class: org.codehaus.activemq.store.howl.HowlMessageStore.1
                LogRecord record = new LogRecord(HowlMessageStore.DEFAULT_RECORD_SIZE);
                private final QueueMessageContainer val$container;
                private final HowlMessageStore this$0;

                {
                    this.this$0 = this;
                    this.val$container = queueMessageContainer;
                }

                public void onRecord(LogRecord logRecord) {
                    this.this$0.readPacket(logRecord, this.val$container);
                }

                public void onError(LogException logException) {
                    HowlMessageStore.log.error(new StringBuffer().append("Error while recovering Howl transaction log: ").append(logException).toString(), logException);
                }

                public LogRecord getLogRecord() {
                    return this.record;
                }
            });
            if (this.firstException != null) {
                if (!(this.firstException instanceof JMSException)) {
                    throw createRecoveryFailedException(this.firstException);
                }
                throw this.firstException;
            }
        } catch (LogConfigurationException e) {
            throw createRecoveryFailedException(e);
        }
    }

    @Override // org.codehaus.activemq.service.Service
    public synchronized void start() throws JMSException {
        this.longTermStore.start();
    }

    @Override // org.codehaus.activemq.service.Service
    public synchronized void stop() throws JMSException {
        this.longTermStore.stop();
    }

    public synchronized void checkpoint() throws JMSException {
        checkpoint(null);
    }

    public int getMaximumCacheSize() {
        return this.maximumCacheSize;
    }

    public void setMaximumCacheSize(int i) {
        this.maximumCacheSize = i;
    }

    protected void checkpoint(ActiveMQMessage activeMQMessage) throws JMSException {
        ActiveMQMessage[] activeMQMessageArr;
        synchronized (this.map) {
            activeMQMessageArr = new ActiveMQMessage[this.map.size()];
            this.map.values().toArray(activeMQMessageArr);
            this.map.clear();
        }
        this.transactionTemplate.run(new Callback(this, activeMQMessageArr, activeMQMessage) { // from class: org.codehaus.activemq.store.howl.HowlMessageStore.2
            private final ActiveMQMessage[] val$data;
            private final ActiveMQMessage val$message;
            private final HowlMessageStore this$0;

            {
                this.this$0 = this;
                this.val$data = activeMQMessageArr;
                this.val$message = activeMQMessage;
            }

            @Override // org.codehaus.activemq.util.Callback
            public void execute() throws Throwable {
                int length = this.val$data.length;
                for (int i = 0; i < length; i++) {
                    this.this$0.longTermStore.addMessage(this.val$data[i]);
                }
                if (this.val$message != null) {
                    this.this$0.longTermStore.addMessage(this.val$message);
                }
            }
        });
        try {
            this.transactionLog.mark(this.lastLogMark);
        } catch (Exception e) {
            throw JMSExceptionHelper.newJMSException(new StringBuffer().append("Failed to checkpoint the Howl transaction log: ").append(e).toString(), e);
        }
    }

    protected boolean addMessageToCache(ActiveMQMessage activeMQMessage) {
        synchronized (this.map) {
            if (this.map.size() >= this.maximumCacheSize || !this.longTermPersistence.hasCacheCapacity(this)) {
                return false;
            }
            this.map.put(activeMQMessage.getJMSMessageID(), activeMQMessage);
            return true;
        }
    }

    protected void readPacket(LogRecord logRecord, QueueMessageContainer queueMessageContainer) {
        if (logRecord.isCTRL() || logRecord.isEOB() || logRecord.length <= 0) {
            return;
        }
        try {
            Packet fromBytes = this.wireFormat.fromBytes(logRecord.data, 2, logRecord.length - 2);
            if (fromBytes instanceof ActiveMQMessage) {
                queueMessageContainer.addMessage((ActiveMQMessage) fromBytes);
            } else if (fromBytes instanceof MessageAck) {
                MessageAck messageAck = (MessageAck) fromBytes;
                queueMessageContainer.delete(messageAck.getMessageIdentity(), messageAck);
            } else {
                log.error(new StringBuffer().append("Unknown type of packet in transaction log which will be discarded: ").append(fromBytes).toString());
            }
        } catch (Exception e) {
            if (this.firstException == null) {
                this.firstException = e;
            }
        }
    }

    protected synchronized void writePacket(Packet packet) throws JMSException {
        try {
            this.lastLogMark = this.transactionLog.put(this.wireFormat.toBytes(packet), this.sync);
        } catch (IOException e) {
            throw createWriteException(packet, e);
        } catch (InterruptedException e2) {
            throw createWriteException(packet, e2);
        } catch (LogException e3) {
            throw createWriteException(packet, e3);
        }
    }

    protected JMSException createRecoveryFailedException(Exception exc) {
        return JMSExceptionHelper.newJMSException(new StringBuffer().append("Failed to recover from Howl transaction log. Reason: ").append(exc).toString(), exc);
    }

    protected JMSException createWriteException(Packet packet, Exception exc) {
        return JMSExceptionHelper.newJMSException(new StringBuffer().append("Failed to write to Howl transaction log for: ").append(packet).append(". Reason: ").append(exc).toString(), exc);
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$codehaus$activemq$store$howl$HowlMessageStore == null) {
            cls = class$("org.codehaus.activemq.store.howl.HowlMessageStore");
            class$org$codehaus$activemq$store$howl$HowlMessageStore = cls;
        } else {
            cls = class$org$codehaus$activemq$store$howl$HowlMessageStore;
        }
        log = LogFactory.getLog(cls);
    }
}
