/*-------------------------------------------------------------------------
 *
 * slot.c
 *	   Replication slot management.
 *
 *
 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
 *
 *
 * IDENTIFICATION
 *	  src/backend/replication/slot.c
 *
 * NOTES
 *
 * Replication slots are used to keep state about replication streams
 * originating from this cluster.  Their primary purpose is to prevent the
 * premature removal of WAL or of old tuple versions in a manner that would
 * interfere with replication; they are also useful for monitoring purposes.
 * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
 * on standbys (to support cascading setups).  The requirement that slots be
 * usable on standbys precludes storing them in the system catalogs.
 *
 * Each replication slot gets its own directory inside the directory
 * $PGDATA / PG_REPLSLOT_DIR.  Inside that directory the state file will
 * contain the slot's own data.  Additional data can be stored alongside that
 * file if required.  While the server is running, the state data is also
 * cached in memory for efficiency.
 *
 * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
 * or free a slot. ReplicationSlotControlLock must be taken in shared mode
 * to iterate over the slots, and in exclusive mode to change the in_use flag
 * of a slot.  The remaining data in each slot is protected by its mutex.
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include <unistd.h>
#include <sys/stat.h>

#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "common/file_utils.h"
#include "common/string.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/walsender_private.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/guc_hooks.h"
#include "utils/injection_point.h"
#include "utils/varlena.h"

/*
 * Replication slot on-disk data structure.
 */
typedef struct ReplicationSlotOnDisk
{
	/* first part of this struct needs to be version independent */

	/* data not covered by checksum */
	uint32		magic;
	pg_crc32c	checksum;

	/* data covered by checksum */
	uint32		version;
	uint32		length;

	/*
	 * The actual data in the slot that follows can differ based on the above
	 * 'version'.
	 */

	ReplicationSlotPersistentData slotdata;
} ReplicationSlotOnDisk;

/*
 * Struct for the configuration of synchronized_standby_slots.
 *
 * Note: this must be a flat representation that can be held in a single chunk
 * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
 * synchronized_standby_slots GUC.
 */
typedef struct
{
	/* Number of slot names in the slot_names[] */
	int			nslotnames;

	/*
	 * slot_names contains 'nslotnames' consecutive null-terminated C strings.
	 */
	char		slot_names[FLEXIBLE_ARRAY_MEMBER];
} SyncStandbySlotsConfigData;

/*
 * Lookup table for slot invalidation causes.
 */
typedef struct SlotInvalidationCauseMap
{
	ReplicationSlotInvalidationCause cause;
	const char *cause_name;
} SlotInvalidationCauseMap;

static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
	{RS_INVAL_NONE, "none"},
	{RS_INVAL_WAL_REMOVED, "wal_removed"},
	{RS_INVAL_HORIZON, "rows_removed"},
	{RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
	{RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
};

/*
 * Ensure that the lookup table is up-to-date with the enums defined in
 * ReplicationSlotInvalidationCause.
 */
StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
				 "array length mismatch");

/* size of version independent data */
#define ReplicationSlotOnDiskConstantSize \
	offsetof(ReplicationSlotOnDisk, slotdata)
/* size of the part of the slot not covered by the checksum */
#define ReplicationSlotOnDiskNotChecksummedSize  \
	offsetof(ReplicationSlotOnDisk, version)
/* size of the part covered by the checksum */
#define ReplicationSlotOnDiskChecksummedSize \
	sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
/* size of the slot data that is version dependent */
#define ReplicationSlotOnDiskV2Size \
	sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize

#define SLOT_MAGIC		0x1051CA1	/* format identifier */
#define SLOT_VERSION	5		/* version for new files */

/* Control array for replication slot management */
ReplicationSlotCtlData *ReplicationSlotCtl = NULL;

/* My backend's replication slot in the shared memory array */
ReplicationSlot *MyReplicationSlot = NULL;

/* GUC variables */
int			max_replication_slots = 10; /* the maximum number of replication
										 * slots */

/*
 * Invalidate replication slots that have remained idle longer than this
 * duration; '0' disables it.
 */
int			idle_replication_slot_timeout_mins = 0;

/*
 * This GUC lists streaming replication standby server slot names that
 * logical WAL sender processes will wait for.
 */
char	   *synchronized_standby_slots;

/* This is the parsed and cached configuration for synchronized_standby_slots */
static SyncStandbySlotsConfigData *synchronized_standby_slots_config;

/*
 * Oldest LSN that has been confirmed to be flushed to the standbys
 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
 */
static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;

static void ReplicationSlotShmemExit(int code, Datum arg);
static void ReplicationSlotDropPtr(ReplicationSlot *slot);

/* internal persistency functions */
static void RestoreSlotFromDisk(const char *name);
static void CreateSlotOnDisk(ReplicationSlot *slot);
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);

/*
 * Report shared-memory space needed by ReplicationSlotsShmemInit.
 */
Size
ReplicationSlotsShmemSize(void)
{
	Size		size = 0;

	if (max_replication_slots == 0)
		return size;

	size = offsetof(ReplicationSlotCtlData, replication_slots);
	size = add_size(size,
					mul_size(max_replication_slots, sizeof(ReplicationSlot)));

	return size;
}

/*
 * Allocate and initialize shared memory for replication slots.
 */
void
ReplicationSlotsShmemInit(void)
{
	bool		found;

	if (max_replication_slots == 0)
		return;

	ReplicationSlotCtl = (ReplicationSlotCtlData *)
		ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
						&found);

	if (!found)
	{
		int			i;

		/* First time through, so initialize */
		MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());

		for (i = 0; i < max_replication_slots; i++)
		{
			ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];

			/* everything else is zeroed by the memset above */
			SpinLockInit(&slot->mutex);
			LWLockInitialize(&slot->io_in_progress_lock,
							 LWTRANCHE_REPLICATION_SLOT_IO);
			ConditionVariableInit(&slot->active_cv);
		}
	}
}

/*
 * Register the callback for replication slot cleanup and releasing.
 */
void
ReplicationSlotInitialize(void)
{
	before_shmem_exit(ReplicationSlotShmemExit, 0);
}

/*
 * Release and cleanup replication slots.
 */
static void
ReplicationSlotShmemExit(int code, Datum arg)
{
	/* Make sure active replication slots are released */
	if (MyReplicationSlot != NULL)
		ReplicationSlotRelease();

	/* Also cleanup all the temporary slots. */
	ReplicationSlotCleanup(false);
}

/*
 * Check whether the passed slot name is valid and report errors at elevel.
 *
 * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
 * the name to be used as a directory name on every supported OS.
 *
 * Returns whether the directory name is valid or not if elevel < ERROR.
 */
bool
ReplicationSlotValidateName(const char *name, int elevel)
{
	const char *cp;

	if (strlen(name) == 0)
	{
		ereport(elevel,
				(errcode(ERRCODE_INVALID_NAME),
				 errmsg("replication slot name \"%s\" is too short",
						name)));
		return false;
	}

	if (strlen(name) >= NAMEDATALEN)
	{
		ereport(elevel,
				(errcode(ERRCODE_NAME_TOO_LONG),
				 errmsg("replication slot name \"%s\" is too long",
						name)));
		return false;
	}

	for (cp = name; *cp; cp++)
	{
		if (!((*cp >= 'a' && *cp <= 'z')
			  || (*cp >= '0' && *cp <= '9')
			  || (*cp == '_')))
		{
			ereport(elevel,
					(errcode(ERRCODE_INVALID_NAME),
					 errmsg("replication slot name \"%s\" contains invalid character",
							name),
					 errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
			return false;
		}
	}
	return true;
}

/*
 * Create a new replication slot and mark it as used by this backend.
 *
 * name: Name of the slot
 * db_specific: logical decoding is db specific; if the slot is going to
 *	   be used for that pass true, otherwise false.
 * two_phase: Allows decoding of prepared transactions. We allow this option
 *     to be enabled only at the slot creation time. If we allow this option
 *     to be changed during decoding then it is quite possible that we skip
 *     prepare first time because this option was not enabled. Now next time
 *     during getting changes, if the two_phase option is enabled it can skip
 *     prepare because by that time start decoding point has been moved. So the
 *     user will only get commit prepared.
 * failover: If enabled, allows the slot to be synced to standbys so
 *     that logical replication can be resumed after failover.
 * synced: True if the slot is synchronized from the primary server.
 */
void
ReplicationSlotCreate(const char *name, bool db_specific,
					  ReplicationSlotPersistency persistency,
					  bool two_phase, bool failover, bool synced)
{
	ReplicationSlot *slot = NULL;
	int			i;

	Assert(MyReplicationSlot == NULL);

	ReplicationSlotValidateName(name, ERROR);

	if (failover)
	{
		/*
		 * Do not allow users to create the failover enabled slots on the
		 * standby as we do not support sync to the cascading standby.
		 *
		 * However, failover enabled slots can be created during slot
		 * synchronization because we need to retain the same values as the
		 * remote slot.
		 */
		if (RecoveryInProgress() && !IsSyncingReplicationSlots())
			ereport(ERROR,
					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
					errmsg("cannot enable failover for a replication slot created on the standby"));

		/*
		 * Do not allow users to create failover enabled temporary slots,
		 * because temporary slots will not be synced to the standby.
		 *
		 * However, failover enabled temporary slots can be created during
		 * slot synchronization. See the comments atop slotsync.c for details.
		 */
		if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
			ereport(ERROR,
					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
					errmsg("cannot enable failover for a temporary replication slot"));
	}

	/*
	 * If some other backend ran this code concurrently with us, we'd likely
	 * both allocate the same slot, and that would be bad.  We'd also be at
	 * risk of missing a name collision.  Also, we don't want to try to create
	 * a new slot while somebody's busy cleaning up an old one, because we
	 * might both be monkeying with the same directory.
	 */
	LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);

	/*
	 * Check for name collision, and identify an allocatable slot.  We need to
	 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
	 * else can change the in_use flags while we're looking at them.
	 */
	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
	for (i = 0; i < max_replication_slots; i++)
	{
		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];

		if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
			ereport(ERROR,
					(errcode(ERRCODE_DUPLICATE_OBJECT),
					 errmsg("replication slot \"%s\" already exists", name)));
		if (!s->in_use && slot == NULL)
			slot = s;
	}
	LWLockRelease(ReplicationSlotControlLock);

	/* If all slots are in use, we're out of luck. */
	if (slot == NULL)
		ereport(ERROR,
				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
				 errmsg("all replication slots are in use"),
				 errhint("Free one or increase \"max_replication_slots\".")));

	/*
	 * Since this slot is not in use, nobody should be looking at any part of
	 * it other than the in_use field unless they're trying to allocate it.
	 * And since we hold ReplicationSlotAllocationLock, nobody except us can
	 * be doing that.  So it's safe to initialize the slot.
	 */
	Assert(!slot->in_use);
	Assert(slot->active_pid == 0);

	/* first initialize persistent data */
	memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
	namestrcpy(&slot->data.name, name);
	slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
	slot->data.persistency = persistency;
	slot->data.two_phase = two_phase;
	slot->data.two_phase_at = InvalidXLogRecPtr;
	slot->data.failover = failover;
	slot->data.synced = synced;

	/* and then data only present in shared memory */
	slot->just_dirtied = false;
	slot->dirty = false;
	slot->effective_xmin = InvalidTransactionId;
	slot->effective_catalog_xmin = InvalidTransactionId;
	slot->candidate_catalog_xmin = InvalidTransactionId;
	slot->candidate_xmin_lsn = InvalidXLogRecPtr;
	slot->candidate_restart_valid = InvalidXLogRecPtr;
	slot->candidate_restart_lsn = InvalidXLogRecPtr;
	slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
	slot->last_saved_restart_lsn = InvalidXLogRecPtr;
	slot->inactive_since = 0;

	/*
	 * Create the slot on disk.  We haven't actually marked the slot allocated
	 * yet, so no special cleanup is required if this errors out.
	 */
	CreateSlotOnDisk(slot);

	/*
	 * We need to briefly prevent any other backend from iterating over the
	 * slots while we flip the in_use flag. We also need to set the active
	 * flag while holding the ControlLock as otherwise a concurrent
	 * ReplicationSlotAcquire() could acquire the slot as well.
	 */
	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);

	slot->in_use = true;

	/* We can now mark the slot active, and that makes it our slot. */
	SpinLockAcquire(&slot->mutex);
	Assert(slot->active_pid == 0);
	slot->active_pid = MyProcPid;
	SpinLockRelease(&slot->mutex);
	MyReplicationSlot = slot;

	LWLockRelease(ReplicationSlotControlLock);

	/*
	 * Create statistics entry for the new logical slot. We don't collect any
	 * stats for physical slots, so no need to create an entry for the same.
	 * See ReplicationSlotDropPtr for why we need to do this before releasing
	 * ReplicationSlotAllocationLock.
	 */
	if (SlotIsLogical(slot))
		pgstat_create_replslot(slot);

	/*
	 * Now that the slot has been marked as in_use and active, it's safe to
	 * let somebody else try to allocate a slot.
	 */
	LWLockRelease(ReplicationSlotAllocationLock);

	/* Let everybody know we've modified this slot */
	ConditionVariableBroadcast(&slot->active_cv);
}

/*
 * Search for the named replication slot.
 *
 * Return the replication slot if found, otherwise NULL.
 */
ReplicationSlot *
SearchNamedReplicationSlot(const char *name, bool need_lock)
{
	int			i;
	ReplicationSlot *slot = NULL;

	if (need_lock)
		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);

	for (i = 0; i < max_replication_slots; i++)
	{
		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];

		if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
		{
			slot = s;
			break;
		}
	}

	if (need_lock)
		LWLockRelease(ReplicationSlotControlLock);

	return slot;
}

/*
 * Return the index of the replication slot in
 * ReplicationSlotCtl->replication_slots.
 *
 * This is mainly useful to have an efficient key for storing replication slot
 * stats.
 */
int
ReplicationSlotIndex(ReplicationSlot *slot)
{
	Assert(slot >= ReplicationSlotCtl->replication_slots &&
		   slot < ReplicationSlotCtl->replication_slots + max_replication_slots);

	return slot - ReplicationSlotCtl->replication_slots;
}

/*
 * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
 * the slot's name and true is returned.
 *
 * This likely is only useful for pgstat_replslot.c during shutdown, in other
 * cases there are obvious TOCTOU issues.
 */
bool
ReplicationSlotName(int index, Name name)
{
	ReplicationSlot *slot;
	bool		found;

	slot = &ReplicationSlotCtl->replication_slots[index];

	/*
	 * Ensure that the slot cannot be dropped while we copy the name. Don't
	 * need the spinlock as the name of an existing slot cannot change.
	 */
	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
	found = slot->in_use;
	if (slot->in_use)
		namestrcpy(name, NameStr(slot->data.name));
	LWLockRelease(ReplicationSlotControlLock);

	return found;
}

/*
 * Find a previously created slot and mark it as used by this process.
 *
 * An error is raised if nowait is true and the slot is currently in use. If
 * nowait is false, we sleep until the slot is released by the owning process.
 *
 * An error is raised if error_if_invalid is true and the slot is found to
 * be invalid. It should always be set to true, except when we are temporarily
 * acquiring the slot and don't intend to change it.
 */
void
ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
{
	ReplicationSlot *s;
	int			active_pid;

	Assert(name != NULL);

retry:
	Assert(MyReplicationSlot == NULL);

	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);

	/* Check if the slot exits with the given name. */
	s = SearchNamedReplicationSlot(name, false);
	if (s == NULL || !s->in_use)
	{
		LWLockRelease(ReplicationSlotControlLock);

		ereport(ERROR,
				(errcode(ERRCODE_UNDEFINED_OBJECT),
				 errmsg("replication slot \"%s\" does not exist",
						name)));
	}

	/*
	 * This is the slot we want; check if it's active under some other
	 * process.  In single user mode, we don't need this check.
	 */
	if (IsUnderPostmaster)
	{
		/*
		 * Get ready to sleep on the slot in case it is active.  (We may end
		 * up not sleeping, but we don't want to do this while holding the
		 * spinlock.)
		 */
		if (!nowait)
			ConditionVariablePrepareToSleep(&s->active_cv);

		/*
		 * It is important to reset the inactive_since under spinlock here to
		 * avoid race conditions with slot invalidation. See comments related
		 * to inactive_since in InvalidatePossiblyObsoleteSlot.
		 */
		SpinLockAcquire(&s->mutex);
		if (s->active_pid == 0)
			s->active_pid = MyProcPid;
		active_pid = s->active_pid;
		ReplicationSlotSetInactiveSince(s, 0, false);
		SpinLockRelease(&s->mutex);
	}
	else
	{
		active_pid = MyProcPid;
		ReplicationSlotSetInactiveSince(s, 0, true);
	}
	LWLockRelease(ReplicationSlotControlLock);

	/*
	 * If we found the slot but it's already active in another process, we
	 * wait until the owning process signals us that it's been released, or
	 * error out.
	 */
	if (active_pid != MyProcPid)
	{
		if (!nowait)
		{
			/* Wait here until we get signaled, and then restart */
			ConditionVariableSleep(&s->active_cv,
								   WAIT_EVENT_REPLICATION_SLOT_DROP);
			ConditionVariableCancelSleep();
			goto retry;
		}

		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_IN_USE),
				 errmsg("replication slot \"%s\" is active for PID %d",
						NameStr(s->data.name), active_pid)));
	}
	else if (!nowait)
		ConditionVariableCancelSleep(); /* no sleep needed after all */

	/* We made this slot active, so it's ours now. */
	MyReplicationSlot = s;

	/*
	 * We need to check for invalidation after making the slot ours to avoid
	 * the possible race condition with the checkpointer that can otherwise
	 * invalidate the slot immediately after the check.
	 */
	if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
		ereport(ERROR,
				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				errmsg("can no longer access replication slot \"%s\"",
					   NameStr(s->data.name)),
				errdetail("This replication slot has been invalidated due to \"%s\".",
						  GetSlotInvalidationCauseName(s->data.invalidated)));

	/* Let everybody know we've modified this slot */
	ConditionVariableBroadcast(&s->active_cv);

	/*
	 * The call to pgstat_acquire_replslot() protects against stats for a
	 * different slot, from before a restart or such, being present during
	 * pgstat_report_replslot().
	 */
	if (SlotIsLogical(s))
		pgstat_acquire_replslot(s);


	if (am_walsender)
	{
		ereport(log_replication_commands ? LOG : DEBUG1,
				SlotIsLogical(s)
				? errmsg("acquired logical replication slot \"%s\"",
						 NameStr(s->data.name))
				: errmsg("acquired physical replication slot \"%s\"",
						 NameStr(s->data.name)));
	}
}

/*
 * Release the replication slot that this backend considers to own.
 *
 * This or another backend can re-acquire the slot later.
 * Resources this slot requires will be preserved.
 */
void
ReplicationSlotRelease(void)
{
	ReplicationSlot *slot = MyReplicationSlot;
	char	   *slotname = NULL;	/* keep compiler quiet */
	bool		is_logical = false; /* keep compiler quiet */
	TimestampTz now = 0;

	Assert(slot != NULL && slot->active_pid != 0);

	if (am_walsender)
	{
		slotname = pstrdup(NameStr(slot->data.name));
		is_logical = SlotIsLogical(slot);
	}

	if (slot->data.persistency == RS_EPHEMERAL)
	{
		/*
		 * Delete the slot. There is no !PANIC case where this is allowed to
		 * fail, all that may happen is an incomplete cleanup of the on-disk
		 * data.
		 */
		ReplicationSlotDropAcquired();
	}

	/*
	 * If slot needed to temporarily restrain both data and catalog xmin to
	 * create the catalog snapshot, remove that temporary constraint.
	 * Snapshots can only be exported while the initial snapshot is still
	 * acquired.
	 */
	if (!TransactionIdIsValid(slot->data.xmin) &&
		TransactionIdIsValid(slot->effective_xmin))
	{
		SpinLockAcquire(&slot->mutex);
		slot->effective_xmin = InvalidTransactionId;
		SpinLockRelease(&slot->mutex);
		ReplicationSlotsComputeRequiredXmin(false);
	}

	/*
	 * Set the time since the slot has become inactive. We get the current
	 * time beforehand to avoid system call while holding the spinlock.
	 */
	now = GetCurrentTimestamp();

	if (slot->data.persistency == RS_PERSISTENT)
	{
		/*
		 * Mark persistent slot inactive.  We're not freeing it, just
		 * disconnecting, but wake up others that may be waiting for it.
		 */
		SpinLockAcquire(&slot->mutex);
		slot->active_pid = 0;
		ReplicationSlotSetInactiveSince(slot, now, false);
		SpinLockRelease(&slot->mutex);
		ConditionVariableBroadcast(&slot->active_cv);
	}
	else
		ReplicationSlotSetInactiveSince(slot, now, true);

	MyReplicationSlot = NULL;

	/* might not have been set when we've been a plain slot */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
	MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
	ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
	LWLockRelease(ProcArrayLock);

	if (am_walsender)
	{
		ereport(log_replication_commands ? LOG : DEBUG1,
				is_logical
				? errmsg("released logical replication slot \"%s\"",
						 slotname)
				: errmsg("released physical replication slot \"%s\"",
						 slotname));

		pfree(slotname);
	}
}

/*
 * Cleanup temporary slots created in current session.
 *
 * Cleanup only synced temporary slots if 'synced_only' is true, else
 * cleanup all temporary slots.
 */
void
ReplicationSlotCleanup(bool synced_only)
{
	int			i;

	Assert(MyReplicationSlot == NULL);

restart:
	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
	for (i = 0; i < max_replication_slots; i++)
	{
		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];

		if (!s->in_use)
			continue;

		SpinLockAcquire(&s->mutex);
		if ((s->active_pid == MyProcPid &&
			 (!synced_only || s->data.synced)))
		{
			Assert(s->data.persistency == RS_TEMPORARY);
			SpinLockRelease(&s->mutex);
			LWLockRelease(ReplicationSlotControlLock);	/* avoid deadlock */

			ReplicationSlotDropPtr(s);

			ConditionVariableBroadcast(&s->active_cv);
			goto restart;
		}
		else
			SpinLockRelease(&s->mutex);
	}

	LWLockRelease(ReplicationSlotControlLock);
}

/*
 * Permanently drop replication slot identified by the passed in name.
 */
void
ReplicationSlotDrop(const char *name, bool nowait)
{
	Assert(MyReplicationSlot == NULL);

	ReplicationSlotAcquire(name, nowait, false);

	/*
	 * Do not allow users to drop the slots which are currently being synced
	 * from the primary to the standby.
	 */
	if (RecoveryInProgress() && MyReplicationSlot->data.synced)
		ereport(ERROR,
				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				errmsg("cannot drop replication slot \"%s\"", name),
				errdetail("This replication slot is being synchronized from the primary server."));

	ReplicationSlotDropAcquired();
}

/*
 * Change the definition of the slot identified by the specified name.
 */
void
ReplicationSlotAlter(const char *name, const bool *failover,
					 const bool *two_phase)
{
	bool		update_slot = false;

	Assert(MyReplicationSlot == NULL);
	Assert(failover || two_phase);

	ReplicationSlotAcquire(name, false, true);

	if (SlotIsPhysical(MyReplicationSlot))
		ereport(ERROR,
				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
				errmsg("cannot use %s with a physical replication slot",
					   "ALTER_REPLICATION_SLOT"));

	if (RecoveryInProgress())
	{
		/*
		 * Do not allow users to alter the slots which are currently being
		 * synced from the primary to the standby.
		 */
		if (MyReplicationSlot->data.synced)
			ereport(ERROR,
					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
					errmsg("cannot alter replication slot \"%s\"", name),
					errdetail("This replication slot is being synchronized from the primary server."));

		/*
		 * Do not allow users to enable failover on the standby as we do not
		 * support sync to the cascading standby.
		 */
		if (failover && *failover)
			ereport(ERROR,
					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
					errmsg("cannot enable failover for a replication slot"
						   " on the standby"));
	}

	if (failover)
	{
		/*
		 * Do not allow users to enable failover for temporary slots as we do
		 * not support syncing temporary slots to the standby.
		 */
		if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
			ereport(ERROR,
					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
					errmsg("cannot enable failover for a temporary replication slot"));

		if (MyReplicationSlot->data.failover != *failover)
		{
			SpinLockAcquire(&MyReplicationSlot->mutex);
			MyReplicationSlot->data.failover = *failover;
			SpinLockRelease(&MyReplicationSlot->mutex);

			update_slot = true;
		}
	}

	if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
	{
		SpinLockAcquire(&MyReplicationSlot->mutex);
		MyReplicationSlot->data.two_phase = *two_phase;
		SpinLockRelease(&MyReplicationSlot->mutex);

		update_slot = true;
	}

	if (update_slot)
	{
		ReplicationSlotMarkDirty();
		ReplicationSlotSave();
	}

	ReplicationSlotRelease();
}

/*
 * Permanently drop the currently acquired replication slot.
 */
void
ReplicationSlotDropAcquired(void)
{
	ReplicationSlot *slot = MyReplicationSlot;

	Assert(MyReplicationSlot != NULL);

	/* slot isn't acquired anymore */
	MyReplicationSlot = NULL;

	ReplicationSlotDropPtr(slot);
}

/*
 * Permanently drop the replication slot which will be released by the point
 * this function returns.
 */
static void
ReplicationSlotDropPtr(ReplicationSlot *slot)
{
	char		path[MAXPGPATH];
	char		tmppath[MAXPGPATH];

	/*
	 * If some other backend ran this code concurrently with us, we might try
	 * to delete a slot with a certain name while someone else was trying to
	 * create a slot with the same name.
	 */
	LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);

	/* Generate pathnames. */
	sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
	sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));

	/*
	 * Rename the slot directory on disk, so that we'll no longer recognize
	 * this as a valid slot.  Note that if this fails, we've got to mark the
	 * slot inactive before bailing out.  If we're dropping an ephemeral or a
	 * temporary slot, we better never fail hard as the caller won't expect
	 * the slot to survive and this might get called during error handling.
	 */
	if (rename(path, tmppath) == 0)
	{
		/*
		 * We need to fsync() the directory we just renamed and its parent to
		 * make sure that our changes are on disk in a crash-safe fashion.  If
		 * fsync() fails, we can't be sure whether the changes are on disk or
		 * not.  For now, we handle that by panicking;
		 * StartupReplicationSlots() will try to straighten it out after
		 * restart.
		 */
		START_CRIT_SECTION();
		fsync_fname(tmppath, true);
		fsync_fname(PG_REPLSLOT_DIR, true);
		END_CRIT_SECTION();
	}
	else
	{
		bool		fail_softly = slot->data.persistency != RS_PERSISTENT;

		SpinLockAcquire(&slot->mutex);
		slot->active_pid = 0;
		SpinLockRelease(&slot->mutex);

		/* wake up anyone waiting on this slot */
		ConditionVariableBroadcast(&slot->active_cv);

		ereport(fail_softly ? WARNING : ERROR,
				(errcode_for_file_access(),
				 errmsg("could not rename file \"%s\" to \"%s\": %m",
						path, tmppath)));
	}

	/*
	 * The slot is definitely gone.  Lock out concurrent scans of the array
	 * long enough to kill it.  It's OK to clear the active PID here without
	 * grabbing the mutex because nobody else can be scanning the array here,
	 * and nobody can be attached to this slot and thus access it without
	 * scanning the array.
	 *
	 * Also wake up processes waiting for it.
	 */
	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
	slot->active_pid = 0;
	slot->in_use = false;
	LWLockRelease(ReplicationSlotControlLock);
	ConditionVariableBroadcast(&slot->active_cv);

	/*
	 * Slot is dead and doesn't prevent resource removal anymore, recompute
	 * limits.
	 */
	ReplicationSlotsComputeRequiredXmin(false);
	ReplicationSlotsComputeRequiredLSN();

	/*
	 * If removing the directory fails, the worst thing that will happen is
	 * that the user won't be able to create a new slot with the same name
	 * until the next server restart.  We warn about it, but that's all.
	 */
	if (!rmtree(tmppath, true))
		ereport(WARNING,
				(errmsg("could not remove directory \"%s\"", tmppath)));

	/*
	 * Drop the statistics entry for the replication slot.  Do this while
	 * holding ReplicationSlotAllocationLock so that we don't drop a
	 * statistics entry for another slot with the same name just created in
	 * another session.
	 */
	if (SlotIsLogical(slot))
		pgstat_drop_replslot(slot);

	/*
	 * We release this at the very end, so that nobody starts trying to create
	 * a slot while we're still cleaning up the detritus of the old one.
	 */
	LWLockRelease(ReplicationSlotAllocationLock);
}

/*
 * Serialize the currently acquired slot's state from memory to disk, thereby
 * guaranteeing the current state will survive a crash.
 */
void
ReplicationSlotSave(void)
{
	char		path[MAXPGPATH];

	Assert(MyReplicationSlot != NULL);

	sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
	SaveSlotToPath(MyReplicationSlot, path, ERROR);
}

/*
 * Signal that it would be useful if the currently acquired slot would be
 * flushed out to disk.
 *
 * Note that the actual flush to disk can be delayed for a long time, if
 * required for correctness explicitly do a ReplicationSlotSave().
 */
void
ReplicationSlotMarkDirty(void)
{
	ReplicationSlot *slot = MyReplicationSlot;

	Assert(MyReplicationSlot != NULL);

	SpinLockAcquire(&slot->mutex);
	MyReplicationSlot->just_dirtied = true;
	MyReplicationSlot->dirty = true;
	SpinLockRelease(&slot->mutex);
}

/*
 * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
 * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
 */
void
ReplicationSlotPersist(void)
{
	ReplicationSlot *slot = MyReplicationSlot;

	Assert(slot != NULL);
	Assert(slot->data.persistency != RS_PERSISTENT);

	SpinLockAcquire(&slot->mutex);
	slot->data.persistency = RS_PERSISTENT;
	SpinLockRelease(&slot->mutex);

	ReplicationSlotMarkDirty();
	ReplicationSlotSave();
}

/*
 * Compute the oldest xmin across all slots and store it in the ProcArray.
 *
 * If already_locked is true, ProcArrayLock has already been acquired
 * exclusively.
 */
void
ReplicationSlotsComputeRequiredXmin(bool already_locked)
{
	int			i;
	TransactionId agg_xmin = InvalidTransactionId;
	TransactionId agg_catalog_xmin = InvalidTransactionId;

	Assert(ReplicationSlotCtl != NULL);

	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);

	for (i = 0; i < max_replication_slots; i++)
	{
		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
		TransactionId effective_xmin;
		TransactionId effective_catalog_xmin;
		bool		invalidated;

		if (!s->in_use)
			continue;

		SpinLockAcquire(&s->mutex);
		effective_xmin = s->effective_xmin;
		effective_catalog_xmin = s->effective_catalog_xmin;
		invalidated = s->data.invalidated != RS_INVAL_NONE;
		SpinLockRelease(&s->mutex);

		/* invalidated slots need not apply */
		if (invalidated)
			continue;

		/* check the data xmin */
		if (TransactionIdIsValid(effective_xmin) &&
			(!TransactionIdIsValid(agg_xmin) ||
			 TransactionIdPrecedes(effective_xmin, agg_xmin)))
			agg_xmin = effective_xmin;

		/* check the catalog xmin */
		if (TransactionIdIsValid(effective_catalog_xmin) &&
			(!TransactionIdIsValid(agg_catalog_xmin) ||
			 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
			agg_catalog_xmin = effective_catalog_xmin;
	}

	LWLockRelease(ReplicationSlotControlLock);

	ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
}

/*
 * Compute the oldest restart LSN across all slots and inform xlog module.
 *
 * Note: while max_slot_wal_keep_size is theoretically relevant for this
 * purpose, we don't try to account for that, because this module doesn't
 * know what to compare against.
 */
void
ReplicationSlotsComputeRequiredLSN(void)
{
	int			i;
	XLogRecPtr	min_required = InvalidXLogRecPtr;

	Assert(ReplicationSlotCtl != NULL);

	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
	for (i = 0; i < max_replication_slots; i++)
	{
		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
		XLogRecPtr	restart_lsn;
		XLogRecPtr	last_saved_restart_lsn;
		bool		invalidated;
		ReplicationSlotPersistency persistency;

		if (!s->in_use)
			continue;

		SpinLockAcquire(&s->mutex);
		persistency = s->data.persistency;
		restart_lsn = s->data.restart_lsn;
		invalidated = s->data.invalidated != RS_INVAL_NONE;
		last_saved_restart_lsn = s->last_saved_restart_lsn;
		SpinLockRelease(&s->mutex);

		/* invalidated slots need not apply */
		if (invalidated)
			continue;

		/*
		 * For persistent slot use last_saved_restart_lsn to compute the
		 * oldest LSN for removal of WAL segments.  The segments between
		 * last_saved_restart_lsn and restart_lsn might be needed by a
		 * persistent slot in the case of database crash.  Non-persistent
		 * slots can't survive the database crash, so we don't care about
		 * last_saved_restart_lsn for them.
		 */
		if (persistency == RS_PERSISTENT)
		{
			if (last_saved_restart_lsn != InvalidXLogRecPtr &&
				restart_lsn > last_saved_restart_lsn)
			{
				restart_lsn = last_saved_restart_lsn;
			}
		}

		if (restart_lsn != InvalidXLogRecPtr &&
			(min_required == InvalidXLogRecPtr ||
			 restart_lsn < min_required))
			min_required = restart_lsn;
	}
	LWLockRelease(ReplicationSlotControlLock);

	XLogSetReplicationSlotMinimumLSN(min_required);
}

/*
 * Compute the oldest WAL LSN required by *logical* decoding slots..
 *
 * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
 * slots exist.
 *
 * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
 * ignores physical replication slots.
 *
 * The results aren't required frequently, so we don't maintain a precomputed
 * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
 */
XLogRecPtr
ReplicationSlotsComputeLogicalRestartLSN(void)
{
	XLogRecPtr	result = InvalidXLogRecPtr;
	int			i;

	if (max_replication_slots <= 0)
		return InvalidXLogRecPtr;

	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);

	for (i = 0; i < max_replication_slots; i++)
	{
		ReplicationSlot *s;
		XLogRecPtr	restart_lsn;
		XLogRecPtr	last_saved_restart_lsn;
		bool		invalidated;
		ReplicationSlotPersistency persistency;

		s = &ReplicationSlotCtl->replication_slots[i];

		/* cannot change while ReplicationSlotCtlLock is held */
		if (!s->in_use)
			continue;

		/* we're only interested in logical slots */
		if (!SlotIsLogical(s))
			continue;

		/* read once, it's ok if it increases while we're checking */
		SpinLockAcquire(&s->mutex);
		persistency = s->data.persistency;
		restart_lsn = s->data.restart_lsn;
		invalidated = s->data.invalidated != RS_INVAL_NONE;
		last_saved_restart_lsn = s->last_saved_restart_lsn;
		SpinLockRelease(&s->mutex);

		/* invalidated slots need not apply */
		if (invalidated)
			continue;

		/*
		 * For persistent slot use last_saved_restart_lsn to compute the
		 * oldest LSN for removal of WAL segments.  The segments between
		 * last_saved_restart_lsn and restart_lsn might be needed by a
		 * persistent slot in the case of database crash.  Non-persistent
		 * slots can't survive the database crash, so we don't care about
		 * last_saved_restart_lsn for them.
		 */
		if (persistency == RS_PERSISTENT)
		{
			if (last_saved_restart_lsn != InvalidXLogRecPtr &&
				restart_lsn > last_saved_restart_lsn)
			{
				restart_lsn = last_saved_restart_lsn;
			}
		}

		if (restart_lsn == InvalidXLogRecPtr)
			continue;

		if (result == InvalidXLogRecPtr ||
			restart_lsn < result)
			result = restart_lsn;
	}

	LWLockRelease(ReplicationSlotControlLock);

	return result;
}

/*
 * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
 * passed database oid.
 *
 * Returns true if there are any slots referencing the database. *nslots will
 * be set to the absolute number of slots in the database, *nactive to ones
 * currently active.
 */
bool
ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
{
	int			i;

	*nslots = *nactive = 0;

	if (max_replication_slots <= 0)
		return false;

	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
	for (i = 0; i < max_replication_slots; i++)
	{
		ReplicationSlot *s;

		s = &ReplicationSlotCtl->replication_slots[i];

		/* cannot change while ReplicationSlotCtlLock is held */
		if (!s->in_use)
			continue;

		/* only logical slots are database specific, skip */
		if (!SlotIsLogical(s))
			continue;

		/* not our database, skip */
		if (s->data.database != dboid)
			continue;

		/* NB: intentionally counting invalidated slots */

		/* count slots with spinlock held */
		SpinLockAcquire(&s->mutex);
		(*nslots)++;
		if (s->active_pid != 0)
			(*nactive)++;
		SpinLockRelease(&s->mutex);
	}
	LWLockRelease(ReplicationSlotControlLock);

	if (*nslots > 0)
		return true;
	return false;
}

/*
 * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
 * passed database oid. The caller should hold an exclusive lock on the
 * pg_database oid for the database to prevent creation of new slots on the db
 * or replay from existing slots.
 *
 * Another session that concurrently acquires an existing slot on the target DB
 * (most likely to drop it) may cause this function to ERROR. If that happens
 * it may have dropped some but not all slots.
 *
 * This routine isn't as efficient as it could be - but we don't drop
 * databases often, especially databases with lots of slots.
 */
void
ReplicationSlotsDropDBSlots(Oid dboid)
{
	int			i;

	if (max_replication_slots <= 0)
		return;

restart:
	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
	for (i = 0; i < max_replication_slots; i++)
	{
		ReplicationSlot *s;
		char	   *slotname;
		int			active_pid;

		s = &ReplicationSlotCtl->replication_slots[i];

		/* cannot change while ReplicationSlotCtlLock is held */
		if (!s->in_use)
			continue;

		/* only logical slots are database specific, skip */
		if (!SlotIsLogical(s))
			continue;

		/* not our database, skip */
		if (s->data.database != dboid)
			continue;

		/* NB: intentionally including invalidated slots */

		/* acquire slot, so ReplicationSlotDropAcquired can be reused  */
		SpinLockAcquire(&s->mutex);
		/* can't change while ReplicationSlotControlLock is held */
		slotname = NameStr(s->data.name);
		active_pid = s->active_pid;
		if (active_pid == 0)
		{
			MyReplicationSlot = s;
			s->active_pid = MyProcPid;
		}
		SpinLockRelease(&s->mutex);

		/*
		 * Even though we hold an exclusive lock on the database object a
		 * logical slot for that DB can still be active, e.g. if it's
		 * concurrently being dropped by a backend connected to another DB.
		 *
		 * That's fairly unlikely in practice, so we'll just bail out.
		 *
		 * The slot sync worker holds a shared lock on the database before
		 * operating on synced logical slots to avoid conflict with the drop
		 * happening here. The persistent synced slots are thus safe but there
		 * is a possibility that the slot sync worker has created a temporary
		 * slot (which stays active even on release) and we are trying to drop
		 * that here. In practice, the chances of hitting this scenario are
		 * less as during slot synchronization, the temporary slot is
		 * immediately converted to persistent and thus is safe due to the
		 * shared lock taken on the database. So, we'll just bail out in such
		 * a case.
		 *
		 * XXX: We can consider shutting down the slot sync worker before
		 * trying to drop synced temporary slots here.
		 */
		if (active_pid)
			ereport(ERROR,
					(errcode(ERRCODE_OBJECT_IN_USE),
					 errmsg("replication slot \"%s\" is active for PID %d",
							slotname, active_pid)));

		/*
		 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
		 * holding ReplicationSlotControlLock over filesystem operations,
		 * release ReplicationSlotControlLock and use
		 * ReplicationSlotDropAcquired.
		 *
		 * As that means the set of slots could change, restart scan from the
		 * beginning each time we release the lock.
		 */
		LWLockRelease(ReplicationSlotControlLock);
		ReplicationSlotDropAcquired();
		goto restart;
	}
	LWLockRelease(ReplicationSlotControlLock);
}


/*
 * Check whether the server's configuration supports using replication
 * slots.
 */
void
CheckSlotRequirements(void)
{
	/*
	 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
	 * needs the same check.
	 */

	if (max_replication_slots == 0)
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));

	if (wal_level < WAL_LEVEL_REPLICA)
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
}

/*
 * Check whether the user has privilege to use replication slots.
 */
void
CheckSlotPermissions(void)
{
	if (!has_rolreplication(GetUserId()))
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("permission denied to use replication slots"),
				 errdetail("Only roles with the %s attribute may use replication slots.",
						   "REPLICATION")));
}

/*
 * Reserve WAL for the currently active slot.
 *
 * Compute and set restart_lsn in a manner that's appropriate for the type of
 * the slot and concurrency safe.
 */
void
ReplicationSlotReserveWal(void)
{
	ReplicationSlot *slot = MyReplicationSlot;

	Assert(slot != NULL);
	Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
	Assert(slot->last_saved_restart_lsn == InvalidXLogRecPtr);

	/*
	 * The replication slot mechanism is used to prevent removal of required
	 * WAL. As there is no interlock between this routine and checkpoints, WAL
	 * segments could concurrently be removed when a now stale return value of
	 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
	 * this happens we'll just retry.
	 */
	while (true)
	{
		XLogSegNo	segno;
		XLogRecPtr	restart_lsn;

		/*
		 * For logical slots log a standby snapshot and start logical decoding
		 * at exactly that position. That allows the slot to start up more
		 * quickly. But on a standby we cannot do WAL writes, so just use the
		 * replay pointer; effectively, an attempt to create a logical slot on
		 * standby will cause it to wait for an xl_running_xact record to be
		 * logged independently on the primary, so that a snapshot can be
		 * built using the record.
		 *
		 * None of this is needed (or indeed helpful) for physical slots as
		 * they'll start replay at the last logged checkpoint anyway. Instead
		 * return the location of the last redo LSN. While that slightly
		 * increases the chance that we have to retry, it's where a base
		 * backup has to start replay at.
		 */
		if (SlotIsPhysical(slot))
			restart_lsn = GetRedoRecPtr();
		else if (RecoveryInProgress())
			restart_lsn = GetXLogReplayRecPtr(NULL);
		else
			restart_lsn = GetXLogInsertRecPtr();

		SpinLockAcquire(&slot->mutex);
		slot->data.restart_lsn = restart_lsn;
		SpinLockRelease(&slot->mutex);

		/* prevent WAL removal as fast as possible */
		ReplicationSlotsComputeRequiredLSN();

		/*
		 * If all required WAL is still there, great, otherwise retry. The
		 * slot should prevent further removal of WAL, unless there's a
		 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
		 * the new restart_lsn above, so normally we should never need to loop
		 * more than twice.
		 */
		XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
		if (XLogGetLastRemovedSegno() < segno)
			break;
	}

	if (!RecoveryInProgress() && SlotIsLogical(slot))
	{
		XLogRecPtr	flushptr;

		/* make sure we have enough information to start */
		flushptr = LogStandbySnapshot();

		/* and make sure it's fsynced to disk */
		XLogFlush(flushptr);
	}
}

/*
 * Report that replication slot needs to be invalidated
 */
static void
ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
					   bool terminating,
					   int pid,
					   NameData slotname,
					   XLogRecPtr restart_lsn,
					   XLogRecPtr oldestLSN,
					   TransactionId snapshotConflictHorizon,
					   long slot_idle_seconds)
{
	StringInfoData err_detail;
	StringInfoData err_hint;

	initStringInfo(&err_detail);
	initStringInfo(&err_hint);

	switch (cause)
	{
		case RS_INVAL_WAL_REMOVED:
			{
				uint64		ex = oldestLSN - restart_lsn;

				appendStringInfo(&err_detail,
								 ngettext("The slot's restart_lsn %X/%X exceeds the limit by %" PRIu64 " byte.",
										  "The slot's restart_lsn %X/%X exceeds the limit by %" PRIu64 " bytes.",
										  ex),
								 LSN_FORMAT_ARGS(restart_lsn),
								 ex);
				/* translator: %s is a GUC variable name */
				appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
								 "max_slot_wal_keep_size");
				break;
			}
		case RS_INVAL_HORIZON:
			appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
							 snapshotConflictHorizon);
			break;

		case RS_INVAL_WAL_LEVEL:
			appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
			break;

		case RS_INVAL_IDLE_TIMEOUT:
			{
				int			minutes = slot_idle_seconds / SECS_PER_MINUTE;
				int			secs = slot_idle_seconds % SECS_PER_MINUTE;

				/* translator: %s is a GUC variable name */
				appendStringInfo(&err_detail, _("The slot's idle time of %dmin %02ds exceeds the configured \"%s\" duration of %dmin."),
								 minutes, secs, "idle_replication_slot_timeout",
								 idle_replication_slot_timeout_mins);
				/* translator: %s is a GUC variable name */
				appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
								 "idle_replication_slot_timeout");
				break;
			}
		case RS_INVAL_NONE:
			pg_unreachable();
	}

	ereport(LOG,
			terminating ?
			errmsg("terminating process %d to release replication slot \"%s\"",
				   pid, NameStr(slotname)) :
			errmsg("invalidating obsolete replication slot \"%s\"",
				   NameStr(slotname)),
			errdetail_internal("%s", err_detail.data),
			err_hint.len ? errhint("%s", err_hint.data) : 0);

	pfree(err_detail.data);
	pfree(err_hint.data);
}

/*
 * Can we invalidate an idle replication slot?
 *
 * Idle timeout invalidation is allowed only when:
 *
 * 1. Idle timeout is set
 * 2. Slot has reserved WAL
 * 3. Slot is inactive
 * 4. The slot is not being synced from the primary while the server is in
 *	  recovery. This is because synced slots are always considered to be
 *	  inactive because they don't perform logical decoding to produce changes.
 */
static inline bool
CanInvalidateIdleSlot(ReplicationSlot *s)
{
	return (idle_replication_slot_timeout_mins != 0 &&
			!XLogRecPtrIsInvalid(s->data.restart_lsn) &&
			s->inactive_since > 0 &&
			!(RecoveryInProgress() && s->data.synced));
}

/*
 * DetermineSlotInvalidationCause - Determine the cause for which a slot
 * becomes invalid among the given possible causes.
 *
 * This function sequentially checks all possible invalidation causes and
 * returns the first one for which the slot is eligible for invalidation.
 */
static ReplicationSlotInvalidationCause
DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
							   XLogRecPtr oldestLSN, Oid dboid,
							   TransactionId snapshotConflictHorizon,
							   TransactionId initial_effective_xmin,
							   TransactionId initial_catalog_effective_xmin,
							   XLogRecPtr initial_restart_lsn,
							   TimestampTz *inactive_since, TimestampTz now)
{
	Assert(possible_causes != RS_INVAL_NONE);

	if (possible_causes & RS_INVAL_WAL_REMOVED)
	{
		if (initial_restart_lsn != InvalidXLogRecPtr &&
			initial_restart_lsn < oldestLSN)
			return RS_INVAL_WAL_REMOVED;
	}

	if (possible_causes & RS_INVAL_HORIZON)
	{
		/* invalid DB oid signals a shared relation */
		if (SlotIsLogical(s) &&
			(dboid == InvalidOid || dboid == s->data.database))
		{
			if (TransactionIdIsValid(initial_effective_xmin) &&
				TransactionIdPrecedesOrEquals(initial_effective_xmin,
											  snapshotConflictHorizon))
				return RS_INVAL_HORIZON;
			else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
					 TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
												   snapshotConflictHorizon))
				return RS_INVAL_HORIZON;
		}
	}

	if (possible_causes & RS_INVAL_WAL_LEVEL)
	{
		if (SlotIsLogical(s))
			return RS_INVAL_WAL_LEVEL;
	}

	if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
	{
		Assert(now > 0);

		if (CanInvalidateIdleSlot(s))
		{
			/*
			 * We simulate the invalidation due to idle_timeout as the minimum
			 * time idle time is one minute which makes tests take a long
			 * time.
			 */
#ifdef USE_INJECTION_POINTS
			if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
			{
				*inactive_since = 0;	/* since the beginning of time */
				return RS_INVAL_IDLE_TIMEOUT;
			}
#endif

			/*
			 * Check if the slot needs to be invalidated due to
			 * idle_replication_slot_timeout GUC.
			 */
			if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
												  idle_replication_slot_timeout_mins * SECS_PER_MINUTE))
			{
				*inactive_since = s->inactive_since;
				return RS_INVAL_IDLE_TIMEOUT;
			}
		}
	}

	return RS_INVAL_NONE;
}

/*
 * Helper for InvalidateObsoleteReplicationSlots
 *
 * Acquires the given slot and mark it invalid, if necessary and possible.
 *
 * Returns whether ReplicationSlotControlLock was released in the interim (and
 * in that case we're not holding the lock at return, otherwise we are).
 *
 * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
 *
 * This is inherently racy, because we release the LWLock
 * for syscalls, so caller must restart if we return true.
 */
static bool
InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
							   ReplicationSlot *s,
							   XLogRecPtr oldestLSN,
							   Oid dboid, TransactionId snapshotConflictHorizon,
							   bool *invalidated)
{
	int			last_signaled_pid = 0;
	bool		released_lock = false;
	bool		terminated = false;
	TransactionId initial_effective_xmin = InvalidTransactionId;
	TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
	XLogRecPtr	initial_restart_lsn = InvalidXLogRecPtr;
	ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
	TimestampTz inactive_since = 0;

	for (;;)
	{
		XLogRecPtr	restart_lsn;
		NameData	slotname;
		int			active_pid = 0;
		ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
		TimestampTz now = 0;
		long		slot_idle_secs = 0;

		Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));

		if (!s->in_use)
		{
			if (released_lock)
				LWLockRelease(ReplicationSlotControlLock);
			break;
		}

		if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
		{
			/*
			 * Assign the current time here to avoid system call overhead
			 * while holding the spinlock in subsequent code.
			 */
			now = GetCurrentTimestamp();
		}

		/*
		 * Check if the slot needs to be invalidated. If it needs to be
		 * invalidated, and is not currently acquired, acquire it and mark it
		 * as having been invalidated.  We do this with the spinlock held to
		 * avoid race conditions -- for example the restart_lsn could move
		 * forward, or the slot could be dropped.
		 */
		SpinLockAcquire(&s->mutex);

		restart_lsn = s->data.restart_lsn;

		/* we do nothing if the slot is already invalid */
		if (s->data.invalidated == RS_INVAL_NONE)
		{
			/*
			 * The slot's mutex will be released soon, and it is possible that
			 * those values change since the process holding the slot has been
			 * terminated (if any), so record them here to ensure that we
			 * would report the correct invalidation cause.
			 *
			 * Unlike other slot attributes, slot's inactive_since can't be
			 * changed until the acquired slot is released or the owning
			 * process is terminated. So, the inactive slot can only be
			 * invalidated immediately without being terminated.
			 */
			if (!terminated)
			{
				initial_restart_lsn = s->data.restart_lsn;
				initial_effective_xmin = s->effective_xmin;
				initial_catalog_effective_xmin = s->effective_catalog_xmin;
			}

			invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
																s, oldestLSN,
																dboid,
																snapshotConflictHorizon,
																initial_effective_xmin,
																initial_catalog_effective_xmin,
																initial_restart_lsn,
																&inactive_since,
																now);
		}

		/*
		 * The invalidation cause recorded previously should not change while
		 * the process owning the slot (if any) has been terminated.
		 */
		Assert(!(invalidation_cause_prev != RS_INVAL_NONE && terminated &&
				 invalidation_cause_prev != invalidation_cause));

		/* if there's no invalidation, we're done */
		if (invalidation_cause == RS_INVAL_NONE)
		{
			SpinLockRelease(&s->mutex);
			if (released_lock)
				LWLockRelease(ReplicationSlotControlLock);
			break;
		}

		slotname = s->data.name;
		active_pid = s->active_pid;

		/*
		 * If the slot can be acquired, do so and mark it invalidated
		 * immediately.  Otherwise we'll signal the owning process, below, and
		 * retry.
		 */
		if (active_pid == 0)
		{
			MyReplicationSlot = s;
			s->active_pid = MyProcPid;
			s->data.invalidated = invalidation_cause;

			/*
			 * XXX: We should consider not overwriting restart_lsn and instead
			 * just rely on .invalidated.
			 */
			if (invalidation_cause == RS_INVAL_WAL_REMOVED)
			{
				s->data.restart_lsn = InvalidXLogRecPtr;
				s->last_saved_restart_lsn = InvalidXLogRecPtr;
			}

			/* Let caller know */
			*invalidated = true;
		}

		SpinLockRelease(&s->mutex);

		/*
		 * The logical replication slots shouldn't be invalidated as GUC
		 * max_slot_wal_keep_size is set to -1 and
		 * idle_replication_slot_timeout is set to 0 during the binary
		 * upgrade. See check_old_cluster_for_valid_slots() where we ensure
		 * that no invalidated before the upgrade.
		 */
		Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade));

		/*
		 * Calculate the idle time duration of the slot if slot is marked
		 * invalidated with RS_INVAL_IDLE_TIMEOUT.
		 */
		if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
		{
			int			slot_idle_usecs;

			TimestampDifference(inactive_since, now, &slot_idle_secs,
								&slot_idle_usecs);
		}

		if (active_pid != 0)
		{
			/*
			 * Prepare the sleep on the slot's condition variable before
			 * releasing the lock, to close a possible race condition if the
			 * slot is released before the sleep below.
			 */
			ConditionVariablePrepareToSleep(&s->active_cv);

			LWLockRelease(ReplicationSlotControlLock);
			released_lock = true;

			/*
			 * Signal to terminate the process that owns the slot, if we
			 * haven't already signalled it.  (Avoidance of repeated
			 * signalling is the only reason for there to be a loop in this
			 * routine; otherwise we could rely on caller's restart loop.)
			 *
			 * There is the race condition that other process may own the slot
			 * after its current owner process is terminated and before this
			 * process owns it. To handle that, we signal only if the PID of
			 * the owning process has changed from the previous time. (This
			 * logic assumes that the same PID is not reused very quickly.)
			 */
			if (last_signaled_pid != active_pid)
			{
				ReportSlotInvalidation(invalidation_cause, true, active_pid,
									   slotname, restart_lsn,
									   oldestLSN, snapshotConflictHorizon,
									   slot_idle_secs);

				if (MyBackendType == B_STARTUP)
					(void) SendProcSignal(active_pid,
										  PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
										  INVALID_PROC_NUMBER);
				else
					(void) kill(active_pid, SIGTERM);

				last_signaled_pid = active_pid;
				terminated = true;
				invalidation_cause_prev = invalidation_cause;
			}

			/* Wait until the slot is released. */
			ConditionVariableSleep(&s->active_cv,
								   WAIT_EVENT_REPLICATION_SLOT_DROP);

			/*
			 * Re-acquire lock and start over; we expect to invalidate the
			 * slot next time (unless another process acquires the slot in the
			 * meantime).
			 */
			LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
			continue;
		}
		else
		{
			/*
			 * We hold the slot now and have already invalidated it; flush it
			 * to ensure that state persists.
			 *
			 * Don't want to hold ReplicationSlotControlLock across file
			 * system operations, so release it now but be sure to tell caller
			 * to restart from scratch.
			 */
			LWLockRelease(ReplicationSlotControlLock);
			released_lock = true;

			/* Make sure the invalidated state persists across server restart */
			ReplicationSlotMarkDirty();
			ReplicationSlotSave();
			ReplicationSlotRelease();

			ReportSlotInvalidation(invalidation_cause, false, active_pid,
								   slotname, restart_lsn,
								   oldestLSN, snapshotConflictHorizon,
								   slot_idle_secs);

			/* done with this slot for now */
			break;
		}
	}

	Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));

	return released_lock;
}

/*
 * Invalidate slots that require resources about to be removed.
 *
 * Returns true when any slot have got invalidated.
 *
 * Whether a slot needs to be invalidated depends on the invalidation cause.
 * A slot is invalidated if it:
 * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
 * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
 *   db; dboid may be InvalidOid for shared relations
 * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient
 * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
 *   "idle_replication_slot_timeout" duration.
 *
 * Note: This function attempts to invalidate the slot for multiple possible
 * causes in a single pass, minimizing redundant iterations. The "cause"
 * parameter can be a MASK representing one or more of the defined causes.
 *
 * NB - this runs as part of checkpoint, so avoid raising errors if possible.
 */
bool
InvalidateObsoleteReplicationSlots(uint32 possible_causes,
								   XLogSegNo oldestSegno, Oid dboid,
								   TransactionId snapshotConflictHorizon)
{
	XLogRecPtr	oldestLSN;
	bool		invalidated = false;

	Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
	Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
	Assert(possible_causes != RS_INVAL_NONE);

	if (max_replication_slots == 0)
		return invalidated;

	XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);

restart:
	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
	for (int i = 0; i < max_replication_slots; i++)
	{
		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];

		if (!s->in_use)
			continue;

		if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid,
										   snapshotConflictHorizon,
										   &invalidated))
		{
			/* if the lock was released, start from scratch */
			goto restart;
		}
	}
	LWLockRelease(ReplicationSlotControlLock);

	/*
	 * If any slots have been invalidated, recalculate the resource limits.
	 */
	if (invalidated)
	{
		ReplicationSlotsComputeRequiredXmin(false);
		ReplicationSlotsComputeRequiredLSN();
	}

	return invalidated;
}

/*
 * Flush all replication slots to disk.
 *
 * It is convenient to flush dirty replication slots at the time of checkpoint.
 * Additionally, in case of a shutdown checkpoint, we also identify the slots
 * for which the confirmed_flush LSN has been updated since the last time it
 * was saved and flush them.
 */
void
CheckPointReplicationSlots(bool is_shutdown)
{
	int			i;

	elog(DEBUG1, "performing replication slot checkpoint");

	/*
	 * Prevent any slot from being created/dropped while we're active. As we
	 * explicitly do *not* want to block iterating over replication_slots or
	 * acquiring a slot we cannot take the control lock - but that's OK,
	 * because holding ReplicationSlotAllocationLock is strictly stronger, and
	 * enough to guarantee that nobody can change the in_use bits on us.
	 */
	LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);

	for (i = 0; i < max_replication_slots; i++)
	{
		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
		char		path[MAXPGPATH];

		if (!s->in_use)
			continue;

		/* save the slot to disk, locking is handled in SaveSlotToPath() */
		sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));

		/*
		 * Slot's data is not flushed each time the confirmed_flush LSN is
		 * updated as that could lead to frequent writes.  However, we decide
		 * to force a flush of all logical slot's data at the time of shutdown
		 * if the confirmed_flush LSN is changed since we last flushed it to
		 * disk.  This helps in avoiding an unnecessary retreat of the
		 * confirmed_flush LSN after restart.
		 */
		if (is_shutdown && SlotIsLogical(s))
		{
			SpinLockAcquire(&s->mutex);

			if (s->data.invalidated == RS_INVAL_NONE &&
				s->data.confirmed_flush > s->last_saved_confirmed_flush)
			{
				s->just_dirtied = true;
				s->dirty = true;
			}
			SpinLockRelease(&s->mutex);
		}

		SaveSlotToPath(s, path, LOG);
	}
	LWLockRelease(ReplicationSlotAllocationLock);

	/*
	 * Recompute the required LSN as SaveSlotToPath() updated
	 * last_saved_restart_lsn for slots.
	 */
	ReplicationSlotsComputeRequiredLSN();
}

/*
 * Load all replication slots from disk into memory at server startup. This
 * needs to be run before we start crash recovery.
 */
void
StartupReplicationSlots(void)
{
	DIR		   *replication_dir;
	struct dirent *replication_de;

	elog(DEBUG1, "starting up replication slots");

	/* restore all slots by iterating over all on-disk entries */
	replication_dir = AllocateDir(PG_REPLSLOT_DIR);
	while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
	{
		char		path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
		PGFileType	de_type;

		if (strcmp(replication_de->d_name, ".") == 0 ||
			strcmp(replication_de->d_name, "..") == 0)
			continue;

		snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
		de_type = get_dirent_type(path, replication_de, false, DEBUG1);

		/* we're only creating directories here, skip if it's not our's */
		if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
			continue;

		/* we crashed while a slot was being setup or deleted, clean up */
		if (pg_str_endswith(replication_de->d_name, ".tmp"))
		{
			if (!rmtree(path, true))
			{
				ereport(WARNING,
						(errmsg("could not remove directory \"%s\"",
								path)));
				continue;
			}
			fsync_fname(PG_REPLSLOT_DIR, true);
			continue;
		}

		/* looks like a slot in a normal state, restore */
		RestoreSlotFromDisk(replication_de->d_name);
	}
	FreeDir(replication_dir);

	/* currently no slots exist, we're done. */
	if (max_replication_slots <= 0)
		return;

	/* Now that we have recovered all the data, compute replication xmin */
	ReplicationSlotsComputeRequiredXmin(false);
	ReplicationSlotsComputeRequiredLSN();
}

/* ----
 * Manipulation of on-disk state of replication slots
 *
 * NB: none of the routines below should take any notice whether a slot is the
 * current one or not, that's all handled a layer above.
 * ----
 */
static void
CreateSlotOnDisk(ReplicationSlot *slot)
{
	char		tmppath[MAXPGPATH];
	char		path[MAXPGPATH];
	struct stat st;

	/*
	 * No need to take out the io_in_progress_lock, nobody else can see this
	 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
	 * takes out the lock, if we'd take the lock here, we'd deadlock.
	 */

	sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
	sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));

	/*
	 * It's just barely possible that some previous effort to create or drop a
	 * slot with this name left a temp directory lying around. If that seems
	 * to be the case, try to remove it.  If the rmtree() fails, we'll error
	 * out at the MakePGDirectory() below, so we don't bother checking
	 * success.
	 */
	if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
		rmtree(tmppath, true);

	/* Create and fsync the temporary slot directory. */
	if (MakePGDirectory(tmppath) < 0)
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not create directory \"%s\": %m",
						tmppath)));
	fsync_fname(tmppath, true);

	/* Write the actual state file. */
	slot->dirty = true;			/* signal that we really need to write */
	SaveSlotToPath(slot, tmppath, ERROR);

	/* Rename the directory into place. */
	if (rename(tmppath, path) != 0)
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not rename file \"%s\" to \"%s\": %m",
						tmppath, path)));

	/*
	 * If we'd now fail - really unlikely - we wouldn't know whether this slot
	 * would persist after an OS crash or not - so, force a restart. The
	 * restart would try to fsync this again till it works.
	 */
	START_CRIT_SECTION();

	fsync_fname(path, true);
	fsync_fname(PG_REPLSLOT_DIR, true);

	END_CRIT_SECTION();
}

/*
 * Shared functionality between saving and creating a replication slot.
 */
static void
SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
{
	char		tmppath[MAXPGPATH];
	char		path[MAXPGPATH];
	int			fd;
	ReplicationSlotOnDisk cp;
	bool		was_dirty;

	/* first check whether there's something to write out */
	SpinLockAcquire(&slot->mutex);
	was_dirty = slot->dirty;
	slot->just_dirtied = false;
	SpinLockRelease(&slot->mutex);

	/* and don't do anything if there's nothing to write */
	if (!was_dirty)
		return;

	LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);

	/* silence valgrind :( */
	memset(&cp, 0, sizeof(ReplicationSlotOnDisk));

	sprintf(tmppath, "%s/state.tmp", dir);
	sprintf(path, "%s/state", dir);

	fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
	if (fd < 0)
	{
		/*
		 * If not an ERROR, then release the lock before returning.  In case
		 * of an ERROR, the error recovery path automatically releases the
		 * lock, but no harm in explicitly releasing even in that case.  Note
		 * that LWLockRelease() could affect errno.
		 */
		int			save_errno = errno;

		LWLockRelease(&slot->io_in_progress_lock);
		errno = save_errno;
		ereport(elevel,
				(errcode_for_file_access(),
				 errmsg("could not create file \"%s\": %m",
						tmppath)));
		return;
	}

	cp.magic = SLOT_MAGIC;
	INIT_CRC32C(cp.checksum);
	cp.version = SLOT_VERSION;
	cp.length = ReplicationSlotOnDiskV2Size;

	SpinLockAcquire(&slot->mutex);

	memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));

	SpinLockRelease(&slot->mutex);

	COMP_CRC32C(cp.checksum,
				(char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
				ReplicationSlotOnDiskChecksummedSize);
	FIN_CRC32C(cp.checksum);

	errno = 0;
	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
	if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
	{
		int			save_errno = errno;

		pgstat_report_wait_end();
		CloseTransientFile(fd);
		LWLockRelease(&slot->io_in_progress_lock);

		/* if write didn't set errno, assume problem is no disk space */
		errno = save_errno ? save_errno : ENOSPC;
		ereport(elevel,
				(errcode_for_file_access(),
				 errmsg("could not write to file \"%s\": %m",
						tmppath)));
		return;
	}
	pgstat_report_wait_end();

	/* fsync the temporary file */
	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
	if (pg_fsync(fd) != 0)
	{
		int			save_errno = errno;

		pgstat_report_wait_end();
		CloseTransientFile(fd);
		LWLockRelease(&slot->io_in_progress_lock);
		errno = save_errno;
		ereport(elevel,
				(errcode_for_file_access(),
				 errmsg("could not fsync file \"%s\": %m",
						tmppath)));
		return;
	}
	pgstat_report_wait_end();

	if (CloseTransientFile(fd) != 0)
	{
		int			save_errno = errno;

		LWLockRelease(&slot->io_in_progress_lock);
		errno = save_errno;
		ereport(elevel,
				(errcode_for_file_access(),
				 errmsg("could not close file \"%s\": %m",
						tmppath)));
		return;
	}

	/* rename to permanent file, fsync file and directory */
	if (rename(tmppath, path) != 0)
	{
		int			save_errno = errno;

		LWLockRelease(&slot->io_in_progress_lock);
		errno = save_errno;
		ereport(elevel,
				(errcode_for_file_access(),
				 errmsg("could not rename file \"%s\" to \"%s\": %m",
						tmppath, path)));
		return;
	}

	/*
	 * Check CreateSlotOnDisk() for the reasoning of using a critical section.
	 */
	START_CRIT_SECTION();

	fsync_fname(path, false);
	fsync_fname(dir, true);
	fsync_fname(PG_REPLSLOT_DIR, true);

	END_CRIT_SECTION();

	/*
	 * Successfully wrote, unset dirty bit, unless somebody dirtied again
	 * already and remember the confirmed_flush LSN value.
	 */
	SpinLockAcquire(&slot->mutex);
	if (!slot->just_dirtied)
		slot->dirty = false;
	slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
	slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
	SpinLockRelease(&slot->mutex);

	LWLockRelease(&slot->io_in_progress_lock);
}

/*
 * Load a single slot from disk into memory.
 */
static void
RestoreSlotFromDisk(const char *name)
{
	ReplicationSlotOnDisk cp;
	int			i;
	char		slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
	char		path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
	int			fd;
	bool		restored = false;
	int			readBytes;
	pg_crc32c	checksum;
	TimestampTz now = 0;

	/* no need to lock here, no concurrent access allowed yet */

	/* delete temp file if it exists */
	sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
	sprintf(path, "%s/state.tmp", slotdir);
	if (unlink(path) < 0 && errno != ENOENT)
		ereport(PANIC,
				(errcode_for_file_access(),
				 errmsg("could not remove file \"%s\": %m", path)));

	sprintf(path, "%s/state", slotdir);

	elog(DEBUG1, "restoring replication slot from \"%s\"", path);

	/* on some operating systems fsyncing a file requires O_RDWR */
	fd = OpenTransientFile(path, O_RDWR | PG_BINARY);

	/*
	 * We do not need to handle this as we are rename()ing the directory into
	 * place only after we fsync()ed the state file.
	 */
	if (fd < 0)
		ereport(PANIC,
				(errcode_for_file_access(),
				 errmsg("could not open file \"%s\": %m", path)));

	/*
	 * Sync state file before we're reading from it. We might have crashed
	 * while it wasn't synced yet and we shouldn't continue on that basis.
	 */
	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
	if (pg_fsync(fd) != 0)
		ereport(PANIC,
				(errcode_for_file_access(),
				 errmsg("could not fsync file \"%s\": %m",
						path)));
	pgstat_report_wait_end();

	/* Also sync the parent directory */
	START_CRIT_SECTION();
	fsync_fname(slotdir, true);
	END_CRIT_SECTION();

	/* read part of statefile that's guaranteed to be version independent */
	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
	readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
	pgstat_report_wait_end();
	if (readBytes != ReplicationSlotOnDiskConstantSize)
	{
		if (readBytes < 0)
			ereport(PANIC,
					(errcode_for_file_access(),
					 errmsg("could not read file \"%s\": %m", path)));
		else
			ereport(PANIC,
					(errcode(ERRCODE_DATA_CORRUPTED),
					 errmsg("could not read file \"%s\": read %d of %zu",
							path, readBytes,
							(Size) ReplicationSlotOnDiskConstantSize)));
	}

	/* verify magic */
	if (cp.magic != SLOT_MAGIC)
		ereport(PANIC,
				(errcode(ERRCODE_DATA_CORRUPTED),
				 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
						path, cp.magic, SLOT_MAGIC)));

	/* verify version */
	if (cp.version != SLOT_VERSION)
		ereport(PANIC,
				(errcode(ERRCODE_DATA_CORRUPTED),
				 errmsg("replication slot file \"%s\" has unsupported version %u",
						path, cp.version)));

	/* boundary check on length */
	if (cp.length != ReplicationSlotOnDiskV2Size)
		ereport(PANIC,
				(errcode(ERRCODE_DATA_CORRUPTED),
				 errmsg("replication slot file \"%s\" has corrupted length %u",
						path, cp.length)));

	/* Now that we know the size, read the entire file */
	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
	readBytes = read(fd,
					 (char *) &cp + ReplicationSlotOnDiskConstantSize,
					 cp.length);
	pgstat_report_wait_end();
	if (readBytes != cp.length)
	{
		if (readBytes < 0)
			ereport(PANIC,
					(errcode_for_file_access(),
					 errmsg("could not read file \"%s\": %m", path)));
		else
			ereport(PANIC,
					(errcode(ERRCODE_DATA_CORRUPTED),
					 errmsg("could not read file \"%s\": read %d of %zu",
							path, readBytes, (Size) cp.length)));
	}

	if (CloseTransientFile(fd) != 0)
		ereport(PANIC,
				(errcode_for_file_access(),
				 errmsg("could not close file \"%s\": %m", path)));

	/* now verify the CRC */
	INIT_CRC32C(checksum);
	COMP_CRC32C(checksum,
				(char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
				ReplicationSlotOnDiskChecksummedSize);
	FIN_CRC32C(checksum);

	if (!EQ_CRC32C(checksum, cp.checksum))
		ereport(PANIC,
				(errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
						path, checksum, cp.checksum)));

	/*
	 * If we crashed with an ephemeral slot active, don't restore but delete
	 * it.
	 */
	if (cp.slotdata.persistency != RS_PERSISTENT)
	{
		if (!rmtree(slotdir, true))
		{
			ereport(WARNING,
					(errmsg("could not remove directory \"%s\"",
							slotdir)));
		}
		fsync_fname(PG_REPLSLOT_DIR, true);
		return;
	}

	/*
	 * Verify that requirements for the specific slot type are met. That's
	 * important because if these aren't met we're not guaranteed to retain
	 * all the necessary resources for the slot.
	 *
	 * NB: We have to do so *after* the above checks for ephemeral slots,
	 * because otherwise a slot that shouldn't exist anymore could prevent
	 * restarts.
	 *
	 * NB: Changing the requirements here also requires adapting
	 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
	 */
	if (cp.slotdata.database != InvalidOid)
	{
		if (wal_level < WAL_LEVEL_LOGICAL)
			ereport(FATAL,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
					 errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"",
							NameStr(cp.slotdata.name)),
					 errhint("Change \"wal_level\" to be \"logical\" or higher.")));

		/*
		 * In standby mode, the hot standby must be enabled. This check is
		 * necessary to ensure logical slots are invalidated when they become
		 * incompatible due to insufficient wal_level. Otherwise, if the
		 * primary reduces wal_level < logical while hot standby is disabled,
		 * logical slots would remain valid even after promotion.
		 */
		if (StandbyMode && !EnableHotStandby)
			ereport(FATAL,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
					 errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
							NameStr(cp.slotdata.name)),
					 errhint("Change \"hot_standby\" to be \"on\".")));
	}
	else if (wal_level < WAL_LEVEL_REPLICA)
		ereport(FATAL,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
						NameStr(cp.slotdata.name)),
				 errhint("Change \"wal_level\" to be \"replica\" or higher.")));

	/* nothing can be active yet, don't lock anything */
	for (i = 0; i < max_replication_slots; i++)
	{
		ReplicationSlot *slot;

		slot = &ReplicationSlotCtl->replication_slots[i];

		if (slot->in_use)
			continue;

		/* restore the entire set of persistent data */
		memcpy(&slot->data, &cp.slotdata,
			   sizeof(ReplicationSlotPersistentData));

		/* initialize in memory state */
		slot->effective_xmin = cp.slotdata.xmin;
		slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
		slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
		slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;

		slot->candidate_catalog_xmin = InvalidTransactionId;
		slot->candidate_xmin_lsn = InvalidXLogRecPtr;
		slot->candidate_restart_lsn = InvalidXLogRecPtr;
		slot->candidate_restart_valid = InvalidXLogRecPtr;

		slot->in_use = true;
		slot->active_pid = 0;

		/*
		 * Set the time since the slot has become inactive after loading the
		 * slot from the disk into memory. Whoever acquires the slot i.e.
		 * makes the slot active will reset it. Use the same inactive_since
		 * time for all the slots.
		 */
		if (now == 0)
			now = GetCurrentTimestamp();

		ReplicationSlotSetInactiveSince(slot, now, false);

		restored = true;
		break;
	}

	if (!restored)
		ereport(FATAL,
				(errmsg("too many replication slots active before shutdown"),
				 errhint("Increase \"max_replication_slots\" and try again.")));
}

/*
 * Maps an invalidation reason for a replication slot to
 * ReplicationSlotInvalidationCause.
 */
ReplicationSlotInvalidationCause
GetSlotInvalidationCause(const char *cause_name)
{
	Assert(cause_name);

	/* Search lookup table for the cause having this name */
	for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
	{
		if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
			return SlotInvalidationCauses[i].cause;
	}

	Assert(false);
	return RS_INVAL_NONE;		/* to keep compiler quiet */
}

/*
 * Maps an ReplicationSlotInvalidationCause to the invalidation
 * reason for a replication slot.
 */
const char *
GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
{
	/* Search lookup table for the name of this cause */
	for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
	{
		if (SlotInvalidationCauses[i].cause == cause)
			return SlotInvalidationCauses[i].cause_name;
	}

	Assert(false);
	return "none";				/* to keep compiler quiet */
}

/*
 * A helper function to validate slots specified in GUC synchronized_standby_slots.
 *
 * The rawname will be parsed, and the result will be saved into *elemlist.
 */
static bool
validate_sync_standby_slots(char *rawname, List **elemlist)
{
	bool		ok;

	/* Verify syntax and parse string into a list of identifiers */
	ok = SplitIdentifierString(rawname, ',', elemlist);

	if (!ok)
	{
		GUC_check_errdetail("List syntax is invalid.");
	}
	else if (MyProc)
	{
		/*
		 * Check that each specified slot exist and is physical.
		 *
		 * Because we need an LWLock, we cannot do this on processes without a
		 * PGPROC, so we skip it there; but see comments in
		 * StandbySlotsHaveCaughtup() as to why that's not a problem.
		 */
		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);

		foreach_ptr(char, name, *elemlist)
		{
			ReplicationSlot *slot;

			slot = SearchNamedReplicationSlot(name, false);

			if (!slot)
			{
				GUC_check_errdetail("Replication slot \"%s\" does not exist.",
									name);
				ok = false;
				break;
			}

			if (!SlotIsPhysical(slot))
			{
				GUC_check_errdetail("\"%s\" is not a physical replication slot.",
									name);
				ok = false;
				break;
			}
		}

		LWLockRelease(ReplicationSlotControlLock);
	}

	return ok;
}

/*
 * GUC check_hook for synchronized_standby_slots
 */
bool
check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
{
	char	   *rawname;
	char	   *ptr;
	List	   *elemlist;
	int			size;
	bool		ok;
	SyncStandbySlotsConfigData *config;

	if ((*newval)[0] == '\0')
		return true;

	/* Need a modifiable copy of the GUC string */
	rawname = pstrdup(*newval);

	/* Now verify if the specified slots exist and have correct type */
	ok = validate_sync_standby_slots(rawname, &elemlist);

	if (!ok || elemlist == NIL)
	{
		pfree(rawname);
		list_free(elemlist);
		return ok;
	}

	/* Compute the size required for the SyncStandbySlotsConfigData struct */
	size = offsetof(SyncStandbySlotsConfigData, slot_names);
	foreach_ptr(char, slot_name, elemlist)
		size += strlen(slot_name) + 1;

	/* GUC extra value must be guc_malloc'd, not palloc'd */
	config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
	if (!config)
		return false;

	/* Transform the data into SyncStandbySlotsConfigData */
	config->nslotnames = list_length(elemlist);

	ptr = config->slot_names;
	foreach_ptr(char, slot_name, elemlist)
	{
		strcpy(ptr, slot_name);
		ptr += strlen(slot_name) + 1;
	}

	*extra = config;

	pfree(rawname);
	list_free(elemlist);
	return true;
}

/*
 * GUC assign_hook for synchronized_standby_slots
 */
void
assign_synchronized_standby_slots(const char *newval, void *extra)
{
	/*
	 * The standby slots may have changed, so we must recompute the oldest
	 * LSN.
	 */
	ss_oldest_flush_lsn = InvalidXLogRecPtr;

	synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
}

/*
 * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
 */
bool
SlotExistsInSyncStandbySlots(const char *slot_name)
{
	const char *standby_slot_name;

	/* Return false if there is no value in synchronized_standby_slots */
	if (synchronized_standby_slots_config == NULL)
		return false;

	/*
	 * XXX: We are not expecting this list to be long so a linear search
	 * shouldn't hurt but if that turns out not to be true then we can cache
	 * this information for each WalSender as well.
	 */
	standby_slot_name = synchronized_standby_slots_config->slot_names;
	for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
	{
		if (strcmp(standby_slot_name, slot_name) == 0)
			return true;

		standby_slot_name += strlen(standby_slot_name) + 1;
	}

	return false;
}

/*
 * Return true if the slots specified in synchronized_standby_slots have caught up to
 * the given WAL location, false otherwise.
 *
 * The elevel parameter specifies the error level used for logging messages
 * related to slots that do not exist, are invalidated, or are inactive.
 */
bool
StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
{
	const char *name;
	int			caught_up_slot_num = 0;
	XLogRecPtr	min_restart_lsn = InvalidXLogRecPtr;

	/*
	 * Don't need to wait for the standbys to catch up if there is no value in
	 * synchronized_standby_slots.
	 */
	if (synchronized_standby_slots_config == NULL)
		return true;

	/*
	 * Don't need to wait for the standbys to catch up if we are on a standby
	 * server, since we do not support syncing slots to cascading standbys.
	 */
	if (RecoveryInProgress())
		return true;

	/*
	 * Don't need to wait for the standbys to catch up if they are already
	 * beyond the specified WAL location.
	 */
	if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
		ss_oldest_flush_lsn >= wait_for_lsn)
		return true;

	/*
	 * To prevent concurrent slot dropping and creation while filtering the
	 * slots, take the ReplicationSlotControlLock outside of the loop.
	 */
	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);

	name = synchronized_standby_slots_config->slot_names;
	for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
	{
		XLogRecPtr	restart_lsn;
		bool		invalidated;
		bool		inactive;
		ReplicationSlot *slot;

		slot = SearchNamedReplicationSlot(name, false);

		/*
		 * If a slot name provided in synchronized_standby_slots does not
		 * exist, report a message and exit the loop.
		 *
		 * Though validate_sync_standby_slots (the GUC check_hook) tries to
		 * avoid this, it can nonetheless happen because the user can specify
		 * a nonexistent slot name before server startup. That function cannot
		 * validate such a slot during startup, as ReplicationSlotCtl is not
		 * initialized by then.  Also, the user might have dropped one slot.
		 */
		if (!slot)
		{
			ereport(elevel,
					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
						   name, "synchronized_standby_slots"),
					errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
							  name),
					errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
							name, "synchronized_standby_slots"));
			break;
		}

		/* Same as above: if a slot is not physical, exit the loop. */
		if (SlotIsLogical(slot))
		{
			ereport(elevel,
					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
						   name, "synchronized_standby_slots"),
					errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
							  name),
					errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
							name, "synchronized_standby_slots"));
			break;
		}

		SpinLockAcquire(&slot->mutex);
		restart_lsn = slot->data.restart_lsn;
		invalidated = slot->data.invalidated != RS_INVAL_NONE;
		inactive = slot->active_pid == 0;
		SpinLockRelease(&slot->mutex);

		if (invalidated)
		{
			/* Specified physical slot has been invalidated */
			ereport(elevel,
					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
					errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
						   name, "synchronized_standby_slots"),
					errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
							  name),
					errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
							name, "synchronized_standby_slots"));
			break;
		}

		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
		{
			/* Log a message if no active_pid for this physical slot */
			if (inactive)
				ereport(elevel,
						errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
						errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
							   name, "synchronized_standby_slots"),
						errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
								  name),
						errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
								name, "synchronized_standby_slots"));

			/* Continue if the current slot hasn't caught up. */
			break;
		}

		Assert(restart_lsn >= wait_for_lsn);

		if (XLogRecPtrIsInvalid(min_restart_lsn) ||
			min_restart_lsn > restart_lsn)
			min_restart_lsn = restart_lsn;

		caught_up_slot_num++;

		name += strlen(name) + 1;
	}

	LWLockRelease(ReplicationSlotControlLock);

	/*
	 * Return false if not all the standbys have caught up to the specified
	 * WAL location.
	 */
	if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
		return false;

	/* The ss_oldest_flush_lsn must not retreat. */
	Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
		   min_restart_lsn >= ss_oldest_flush_lsn);

	ss_oldest_flush_lsn = min_restart_lsn;

	return true;
}

/*
 * Wait for physical standbys to confirm receiving the given lsn.
 *
 * Used by logical decoding SQL functions. It waits for physical standbys
 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
 */
void
WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
{
	/*
	 * Don't need to wait for the standby to catch up if the current acquired
	 * slot is not a logical failover slot, or there is no value in
	 * synchronized_standby_slots.
	 */
	if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
		return;

	ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);

	for (;;)
	{
		CHECK_FOR_INTERRUPTS();

		if (ConfigReloadPending)
		{
			ConfigReloadPending = false;
			ProcessConfigFile(PGC_SIGHUP);
		}

		/* Exit if done waiting for every slot. */
		if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
			break;

		/*
		 * Wait for the slots in the synchronized_standby_slots to catch up,
		 * but use a timeout (1s) so we can also check if the
		 * synchronized_standby_slots has been changed.
		 */
		ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
									WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
	}

	ConditionVariableCancelSleep();
}

/*
 * GUC check_hook for idle_replication_slot_timeout
 *
 * The value of idle_replication_slot_timeout must be set to 0 during
 * a binary upgrade. See start_postmaster() in pg_upgrade for more details.
 */
bool
check_idle_replication_slot_timeout(int *newval, void **extra, GucSource source)
{
	if (IsBinaryUpgrade && *newval != 0)
	{
		GUC_check_errdetail("\"%s\" must be set to 0 during binary upgrade mode.",
							"idle_replication_slot_timeout");
		return false;
	}

	return true;
}
