Class BookKeeper

  • All Implemented Interfaces:
    Closeable, AutoCloseable

    public class BookKeeper
    extends Object
    implements Closeable
    Keeps track of offset and processed status and manages coordinates the import/retry handling. The offset store is identified by the agentName only. With non clustered publish instances deployment, each instance stores the offset in its own node store, thus avoiding mix ups. Moreover, when cloning an instance from a node store, the cloned instance will implicitly recover the offsets and start from the last processed offset. With clustered publish instances deployment, only one Subscriber agent must run on the cluster in order to avoid mix ups. The clustered and non clustered publish instances use cases can be supported by only running the Subscriber agent on the leader instance.
    • Constructor Detail

      • BookKeeper

        public BookKeeper​(org.apache.sling.api.resource.ResourceResolverFactory resolverFactory,
                          DistributionMetricsService distributionMetricsService,
                          org.apache.sling.distribution.journal.bookkeeper.PackageHandler packageHandler,
                          org.osgi.service.event.EventAdmin eventAdmin,
                          Consumer<org.apache.sling.distribution.journal.messages.PackageStatusMessage> sender,
                          Consumer<org.apache.sling.distribution.journal.messages.LogMessage> logSender,
                          BookKeeperConfig config)
      • BookKeeper

        public BookKeeper​(org.apache.sling.api.resource.ResourceResolverFactory resolverFactory,
                          DistributionMetricsService distributionMetricsService,
                          org.apache.sling.distribution.journal.bookkeeper.PackageHandler packageHandler,
                          org.osgi.service.event.EventAdmin eventAdmin,
                          Consumer<org.apache.sling.distribution.journal.messages.PackageStatusMessage> sender,
                          Consumer<org.apache.sling.distribution.journal.messages.LogMessage> logSender,
                          BookKeeperConfig config,
                          org.apache.sling.distribution.ImportPostProcessor importPostProcessor,
                          org.apache.sling.distribution.InvalidationProcessor invalidationProcessor)
    • Method Detail

      • importPackage

        public void importPackage​(org.apache.sling.distribution.journal.messages.PackageMessage pkgMsg,
                                  long offset,
                                  long createdTime)
                           throws org.apache.sling.distribution.common.DistributionException
        We aim at processing the packages exactly once. Processing the packages exactly once is possible with the following conditions I. The package importer is configured to disable auto-committing changes. II. A single commit aggregates three content updates C1. install the package C2. store the processing status C3. store the offset processed Some package importers require auto-saving or issue partial commits before failing. For those packages importers, we aim at processing packages at least once, thanks to the order in which the content updates are applied.
        Throws:
        org.apache.sling.distribution.common.DistributionException
      • invalidateCache

        public void invalidateCache​(org.apache.sling.distribution.journal.messages.PackageMessage pkgMsg,
                                    long offset)
                             throws org.apache.sling.distribution.common.DistributionException
        Throws:
        org.apache.sling.distribution.common.DistributionException
      • removePackage

        public void removePackage​(org.apache.sling.distribution.journal.messages.PackageMessage pkgMsg,
                                  long offset)
                           throws org.apache.sling.api.resource.LoginException,
                                  org.apache.sling.api.resource.PersistenceException
        Throws:
        org.apache.sling.api.resource.LoginException
        org.apache.sling.api.resource.PersistenceException
      • skipPackage

        public void skipPackage​(long offset)
                         throws org.apache.sling.api.resource.LoginException,
                                org.apache.sling.api.resource.PersistenceException
        Throws:
        org.apache.sling.api.resource.LoginException
        org.apache.sling.api.resource.PersistenceException
      • shouldCommitSkipped

        public boolean shouldCommitSkipped()
      • sendStoredStatus

        public boolean sendStoredStatus​(int retry)
        Returns:
        true if the status has been sent ; false otherwise.
      • markStatusSent

        public void markStatusSent()
      • loadOffset

        public long loadOffset()
      • getRetries

        public int getRetries​(String pubAgentName)
      • clearPackageRetriesOnSuccess

        public void clearPackageRetriesOnSuccess​(org.apache.sling.distribution.journal.messages.PackageMessage pkgMsg)
        This method clears the packageRetries storage for a given package and emits metrics on the success of the retry.
        Parameters:
        pkgMsg - : package distributed