package se.streamsource.streamflow.infrastructure.event.domain.source.helper;

import java.util.Iterator;
import org.qi4j.api.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.streamsource.streamflow.infrastructure.event.domain.TransactionDomainEvents;
import se.streamsource.streamflow.infrastructure.event.domain.source.EventSource;
import se.streamsource.streamflow.infrastructure.event.domain.source.EventStream;
import se.streamsource.streamflow.infrastructure.event.domain.source.TransactionListener;
import se.streamsource.streamflow.infrastructure.event.domain.source.TransactionVisitor;

/* loaded from: input_file:se/streamsource/streamflow/infrastructure/event/domain/source/helper/TransactionTracker.class */
public class TransactionTracker implements TransactionListener, TransactionVisitor {
    private Configuration<? extends TransactionTrackerConfiguration> configuration;
    private TransactionVisitor visitor;
    private EventStream stream;
    private EventSource source;
    private boolean started = false;
    private boolean upToSpeed = false;
    private Logger logger;

    public TransactionTracker(EventStream eventStream, EventSource eventSource, Configuration<? extends TransactionTrackerConfiguration> configuration, TransactionVisitor transactionVisitor) {
        this.stream = eventStream;
        this.configuration = configuration;
        this.visitor = transactionVisitor;
        this.source = eventSource;
        this.logger = LoggerFactory.getLogger(transactionVisitor.getClass());
    }

    public synchronized void start() {
        this.started = true;
        this.upToSpeed = true;
        this.source.transactionsAfter(((Long) ((TransactionTrackerConfiguration) this.configuration.configuration()).lastEventDate().get()).longValue(), this);
        this.stream.registerListener(this);
    }

    public synchronized void stop() {
        if (this.started) {
            this.started = false;
            this.stream.unregisterListener(this);
        }
    }

    @Override // se.streamsource.streamflow.infrastructure.event.domain.source.TransactionListener
    public void notifyTransactions(Iterable<TransactionDomainEvents> iterable) {
        Iterator<TransactionDomainEvents> it = iterable.iterator();
        while (it.hasNext() && visit(it.next())) {
        }
    }

    @Override // se.streamsource.streamflow.infrastructure.event.domain.source.TransactionVisitor
    public synchronized boolean visit(TransactionDomainEvents transactionDomainEvents) {
        if (!this.started || !((Boolean) ((TransactionTrackerConfiguration) this.configuration.configuration()).enabled().get()).booleanValue()) {
            return false;
        }
        if (!this.upToSpeed) {
            this.upToSpeed = true;
            this.source.transactionsAfter(((Long) ((TransactionTrackerConfiguration) this.configuration.configuration()).lastEventDate().get()).longValue(), this);
            return this.upToSpeed;
        }
        try {
            boolean visit = this.visitor.visit(transactionDomainEvents);
            if (visit) {
                ((TransactionTrackerConfiguration) this.configuration.configuration()).lastEventDate().set(transactionDomainEvents.timestamp().get());
                this.configuration.save();
            }
            return visit;
        } catch (Throwable th) {
            this.logger.error("Could not handle events", th);
            this.upToSpeed = false;
            return false;
        }
    }
}
