package se.streamsource.streamflow.infrastructure.event.domain.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.qi4j.api.entity.Identity;
import org.qi4j.api.injection.scope.Service;
import org.qi4j.api.injection.scope.Structure;
import org.qi4j.api.injection.scope.This;
import org.qi4j.api.service.Activatable;
import org.qi4j.api.util.Iterables;
import org.qi4j.api.value.ValueBuilder;
import org.qi4j.spi.property.ValueType;
import org.qi4j.spi.structure.ModuleSPI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.streamsource.streamflow.infrastructure.event.domain.DomainEvent;
import se.streamsource.streamflow.infrastructure.event.domain.TransactionDomainEvents;
import se.streamsource.streamflow.infrastructure.time.Time;

/* loaded from: input_file:se/streamsource/streamflow/infrastructure/event/domain/source/AbstractEventStoreMixin.class */
public abstract class AbstractEventStoreMixin implements EventStore, EventStream, Activatable {

    @Service
    Time time;

    @This
    protected Identity identity;
    protected Logger logger;
    protected ValueType domainEventType;
    protected ValueType transactionEventsType;

    @Structure
    protected ModuleSPI module;
    private ExecutorService transactionNotifier;
    protected Lock lock = new ReentrantLock();
    private final List<TransactionListener> listeners = Collections.synchronizedList(new ArrayList());
    private long lastTimestamp = 0;

    public void activate() throws IOException {
        this.logger = LoggerFactory.getLogger((String) this.identity.identity().get());
        this.domainEventType = this.module.valueDescriptor(DomainEvent.class.getName()).valueType();
        this.transactionEventsType = this.module.valueDescriptor(TransactionDomainEvents.class.getName()).valueType();
        this.transactionNotifier = Executors.newSingleThreadExecutor();
    }

    public void passivate() throws Exception {
        this.transactionNotifier.shutdown();
        this.transactionNotifier.awaitTermination(10000L, TimeUnit.MILLISECONDS);
    }

    @Override // se.streamsource.streamflow.infrastructure.event.domain.source.EventStore
    public TransactionDomainEvents storeEvents(Iterable<DomainEvent> iterable) throws IOException {
        ValueBuilder newValueBuilder = this.module.valueBuilderFactory().newValueBuilder(TransactionDomainEvents.class);
        Iterables.addAll((Collection) ((TransactionDomainEvents) newValueBuilder.prototype()).events().get(), iterable);
        ((TransactionDomainEvents) newValueBuilder.prototype()).timestamp().set(Long.valueOf(getCurrentTimestamp()));
        final TransactionDomainEvents transactionDomainEvents = (TransactionDomainEvents) newValueBuilder.newInstance();
        lock();
        try {
            storeEvents(transactionDomainEvents);
            this.lock.unlock();
            this.transactionNotifier.submit(new Runnable() { // from class: se.streamsource.streamflow.infrastructure.event.domain.source.AbstractEventStoreMixin.1
                @Override // java.lang.Runnable
                public void run() {
                    synchronized (AbstractEventStoreMixin.this.listeners) {
                        Iterator it = AbstractEventStoreMixin.this.listeners.iterator();
                        while (it.hasNext()) {
                            try {
                                ((TransactionListener) it.next()).notifyTransactions(Collections.singleton(transactionDomainEvents));
                            } catch (Exception e) {
                                AbstractEventStoreMixin.this.logger.warn("Could not notify event listener", e);
                            }
                        }
                    }
                }
            });
            return transactionDomainEvents;
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // se.streamsource.streamflow.infrastructure.event.domain.source.EventStream
    public void registerListener(TransactionListener transactionListener) {
        this.listeners.add(transactionListener);
    }

    @Override // se.streamsource.streamflow.infrastructure.event.domain.source.EventStream
    public void unregisterListener(TransactionListener transactionListener) {
        this.listeners.remove(transactionListener);
    }

    protected abstract void rollback() throws IOException;

    protected abstract void commit() throws IOException;

    protected abstract void storeEvents(TransactionDomainEvents transactionDomainEvents) throws IOException;

    protected void lock() {
        while (true) {
            try {
                this.lock.tryLock(1000L, TimeUnit.MILLISECONDS);
                return;
            } catch (InterruptedException e) {
            }
        }
    }

    private synchronized long getCurrentTimestamp() {
        long timeNow = this.time.timeNow();
        if (timeNow <= this.lastTimestamp) {
            timeNow = this.lastTimestamp + 1;
        }
        this.lastTimestamp = timeNow;
        return timeNow;
    }
}
