package org.apache.sling.distribution.journal.impl.publisher;

import java.io.Closeable;
import java.util.Collections;
import java.util.Dictionary;
import java.util.EnumMap;
import java.util.Hashtable;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.ToLongFunction;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.management.NotCompliantMBeanException;
import org.apache.commons.io.IOUtils;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestState;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.queue.PubQueueProvider;
import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
import org.apache.sling.distribution.journal.shared.DistributionLogEventListener;
import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.shared.JMXRegistration;
import org.apache.sling.distribution.journal.shared.Strings;
import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.log.spi.DistributionLog;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;

@ParametersAreNonnullByDefault
@Designate(ocd = PublisherConfiguration.class, factory = true)
@Component(service = {}, immediate = true, configurationPid = {DistributionPublisher.FACTORY_PID})
/* loaded from: input_file:org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.class */
public class DistributionPublisher implements DistributionAgent {
    public static final String FACTORY_PID = "org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory";

    @Reference
    private MessagingProvider messagingProvider;

    @Reference(name = "packageBuilder")
    private DistributionPackageBuilder packageBuilder;

    @Reference
    private DiscoveryService discoveryService;

    @Reference
    private PackageMessageFactory factory;

    @Reference
    private EventAdmin eventAdmin;

    @Reference
    private Topics topics;

    @Reference
    JournalAvailable journalAvailable;

    @Reference
    private DistributionMetricsService distributionMetricsService;

    @Reference
    private PubQueueProvider pubQueueProvider;
    private String pkgType;
    private long queuedTimeout;
    private ServiceRegistration<DistributionAgent> componentReg;
    private Consumer<PackageMessage> sender;
    private JMXRegistration reg;
    private DistributionMetricsService.GaugeService<Integer> subscriberCountGauge;
    private Closeable statusPoller;
    private DistributionLogEventListener distributionLogEventListener;
    private final EnumMap<DistributionRequestType, ToLongFunction<PackageMessage>> reqTypes = new EnumMap<>(DistributionRequestType.class);
    private String pubAgentName;
    private final DefaultDistributionLog log = new DefaultDistributionLog(this.pubAgentName, getClass(), DefaultDistributionLog.LogLevel.INFO);

    public DistributionPublisher() {
        this.reqTypes.put((EnumMap<DistributionRequestType, ToLongFunction<PackageMessage>>) DistributionRequestType.ADD, (DistributionRequestType) this::sendAndWait);
        this.reqTypes.put((EnumMap<DistributionRequestType, ToLongFunction<PackageMessage>>) DistributionRequestType.DELETE, (DistributionRequestType) this::sendAndWait);
        this.reqTypes.put((EnumMap<DistributionRequestType, ToLongFunction<PackageMessage>>) DistributionRequestType.TEST, (DistributionRequestType) this::send);
    }

    @Activate
    public void activate(PublisherConfiguration publisherConfiguration, BundleContext bundleContext) {
        Objects.requireNonNull(this.factory);
        Objects.requireNonNull(this.distributionMetricsService);
        this.pubAgentName = Strings.requireNotBlank(publisherConfiguration.name());
        this.queuedTimeout = publisherConfiguration.queuedTimeout();
        this.pkgType = this.packageBuilder.getType();
        this.sender = this.messagingProvider.createSender(this.topics.getPackageTopic());
        this.componentReg = (ServiceRegistration) Objects.requireNonNull(bundleContext.registerService(DistributionAgent.class, this, createServiceProps(publisherConfiguration)));
        this.distributionLogEventListener = new DistributionLogEventListener(bundleContext, this.log, this.pubAgentName);
        try {
            this.reg = new JMXRegistration(new DistPublisherJMX(this.pubAgentName, this.discoveryService, this), "agent", this.pubAgentName);
            String format = String.format("Started Publisher agent %s with packageBuilder %s, queuedTimeout %s", this.pubAgentName, this.pkgType, Long.valueOf(this.queuedTimeout));
            this.subscriberCountGauge = this.distributionMetricsService.createGauge("distribution.journal.publisher.subscriber_count;pub_name=" + this.pubAgentName, "Current number of publish subscribers", () -> {
                return Integer.valueOf(this.discoveryService.getTopologyView().getSubscribedAgentIds().size());
            });
            MessagingProvider messagingProvider = this.messagingProvider;
            String statusTopic = this.topics.getStatusTopic();
            Reset reset = Reset.earliest;
            PubQueueProvider pubQueueProvider = this.pubQueueProvider;
            pubQueueProvider.getClass();
            this.statusPoller = messagingProvider.createPoller(statusTopic, reset, new HandlerAdapter[]{HandlerAdapter.create(PackageStatusMessage.class, pubQueueProvider::handleStatus)});
            this.log.info(format, new Object[0]);
        } catch (NotCompliantMBeanException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Deactivate
    public void deactivate() {
        IOUtils.closeQuietly(new Closeable[]{this.statusPoller, this.distributionLogEventListener, this.reg});
        this.componentReg.unregister();
        String format = String.format("Stopped Publisher agent %s with packageBuilder %s, queuedTimeout %s", this.pubAgentName, this.pkgType, Long.valueOf(this.queuedTimeout));
        IOUtils.closeQuietly(this.subscriberCountGauge);
        this.log.info(format, new Object[0]);
    }

    private Dictionary<String, Object> createServiceProps(PublisherConfiguration publisherConfiguration) {
        Hashtable hashtable = new Hashtable();
        hashtable.put("name", publisherConfiguration.name());
        hashtable.put("title", publisherConfiguration.name());
        hashtable.put("details", publisherConfiguration.name());
        hashtable.put("packageBuilder.target", publisherConfiguration.packageBuilder_target());
        hashtable.put("webconsole.configurationFactory.nameHint", publisherConfiguration.webconsole_configurationFactory_nameHint());
        return hashtable;
    }

    @Nonnull
    public Iterable<String> getQueueNames() {
        return Collections.unmodifiableCollection(this.pubQueueProvider.getQueueNames(this.pubAgentName));
    }

    public DistributionQueue getQueue(String str) {
        try {
            DistributionQueue queue = this.pubQueueProvider.getQueue(this.pubAgentName, str);
            if (queue == null) {
                this.distributionMetricsService.getQueueAccessErrorCount().increment();
            }
            return queue;
        } catch (Exception e) {
            this.distributionMetricsService.getQueueAccessErrorCount().increment();
            throw e;
        }
    }

    @Nonnull
    public DistributionLog getLog() {
        return this.log;
    }

    @Nonnull
    public DistributionAgentState getState() {
        return AgentState.getState(this);
    }

    @Nonnull
    public DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest distributionRequest) throws DistributionException {
        ToLongFunction<PackageMessage> toLongFunction = this.reqTypes.get(distributionRequest.getRequestType());
        return toLongFunction != null ? execute(resourceResolver, distributionRequest, toLongFunction) : executeUnsupported(distributionRequest);
    }

    private DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest distributionRequest, ToLongFunction<PackageMessage> toLongFunction) throws DistributionException {
        try {
            PackageMessage packageMessage = (PackageMessage) DistributionMetricsService.timed(this.distributionMetricsService.getBuildPackageDuration(), () -> {
                return this.factory.create(this.packageBuilder, resourceResolver, this.pubAgentName, distributionRequest);
            });
            try {
                long longValue = ((Long) DistributionMetricsService.timed(this.distributionMetricsService.getEnqueuePackageDuration(), () -> {
                    return Long.valueOf(toLongFunction.applyAsLong(packageMessage));
                })).longValue();
                this.distributionMetricsService.getExportedPackageSize().update(packageMessage.getPkgLength());
                this.distributionMetricsService.getAcceptedRequests().mark();
                String format = String.format("Request accepted with distribution package %s at offset=%s", packageMessage, Long.valueOf(longValue));
                this.log.info(format, new Object[0]);
                return new SimpleDistributionResponse(DistributionRequestState.ACCEPTED, format);
            } catch (Throwable th) {
                this.distributionMetricsService.getDroppedRequests().mark();
                String format2 = String.format("Failed to append distribution package %s to the journal", packageMessage);
                this.log.error(format2, th);
                if (th instanceof Error) {
                    throw ((Error) th);
                }
                throw new DistributionException(format2, th);
            }
        } catch (Exception e) {
            this.distributionMetricsService.getDroppedRequests().mark();
            this.log.error("Failed to create content package for requestType={}, paths={}", distributionRequest.getRequestType(), distributionRequest.getPaths(), e);
            throw new DistributionException(e);
        }
    }

    private long send(PackageMessage packageMessage) {
        this.sender.accept(packageMessage);
        return -1L;
    }

    private long sendAndWait(PackageMessage packageMessage) {
        PackageQueuedNotifier queuedNotifier = this.pubQueueProvider.getQueuedNotifier();
        try {
            CompletableFuture<Long> registerWait = queuedNotifier.registerWait(packageMessage.getPkgId());
            this.eventAdmin.postEvent(DistributionEvent.eventPackageCreated(packageMessage, this.pubAgentName));
            this.sender.accept(packageMessage);
            return registerWait.get(this.queuedTimeout, TimeUnit.MILLISECONDS).longValue();
        } catch (Exception e) {
            queuedNotifier.unRegisterWait(packageMessage.getPkgId());
            throw new RuntimeException(e);
        }
    }

    @Nonnull
    private DistributionResponse executeUnsupported(DistributionRequest distributionRequest) {
        String format = String.format("Request requestType=%s not supported by this agent, expected one of %s", distributionRequest.getRequestType(), this.reqTypes.keySet());
        this.log.info(format, new Object[0]);
        return new SimpleDistributionResponse(DistributionRequestState.DROPPED, format);
    }
}
