/*-------------------------------------------------------------------------
 *
 * xloginsert.c
 *		Functions for constructing WAL records
 *
 * Constructing a WAL record begins with a call to XLogBeginInsert,
 * followed by a number of XLogRegister* calls. The registered data is
 * collected in private working memory, and finally assembled into a chain
 * of XLogRecData structs by a call to XLogRecordAssemble(). See
 * access/transam/README for details.
 *
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * src/backend/access/transam/xloginsert.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#ifdef USE_LZ4
#include <lz4.h>
#endif

#ifdef USE_ZSTD
#include <zstd.h>
#endif

#include "access/xact.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "access/xloginsert.h"
#include "catalog/pg_control.h"
#include "common/pg_lzcompress.h"
#include "miscadmin.h"
#include "pg_trace.h"
#include "replication/origin.h"
#include "storage/bufmgr.h"
#include "storage/proc.h"
#include "utils/memutils.h"

/*
 * Guess the maximum buffer size required to store a compressed version of
 * backup block image.
 */
#ifdef USE_LZ4
#define	LZ4_MAX_BLCKSZ		LZ4_COMPRESSBOUND(BLCKSZ)
#else
#define LZ4_MAX_BLCKSZ		0
#endif

#ifdef USE_ZSTD
#define ZSTD_MAX_BLCKSZ		ZSTD_COMPRESSBOUND(BLCKSZ)
#else
#define ZSTD_MAX_BLCKSZ		0
#endif

#define PGLZ_MAX_BLCKSZ		PGLZ_MAX_OUTPUT(BLCKSZ)

/* Buffer size required to store a compressed version of backup block image */
#define COMPRESS_BUFSIZE	Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ)

/*
 * For each block reference registered with XLogRegisterBuffer, we fill in
 * a registered_buffer struct.
 */
typedef struct
{
	bool		in_use;			/* is this slot in use? */
	uint8		flags;			/* REGBUF_* flags */
	RelFileLocator rlocator;	/* identifies the relation and block */
	ForkNumber	forkno;
	BlockNumber block;
	const PageData *page;		/* page content */
	uint32		rdata_len;		/* total length of data in rdata chain */
	XLogRecData *rdata_head;	/* head of the chain of data registered with
								 * this block */
	XLogRecData *rdata_tail;	/* last entry in the chain, or &rdata_head if
								 * empty */

	XLogRecData bkp_rdatas[2];	/* temporary rdatas used to hold references to
								 * backup block data in XLogRecordAssemble() */

	/* buffer to store a compressed version of backup block image */
	char		compressed_page[COMPRESS_BUFSIZE];
} registered_buffer;

static registered_buffer *registered_buffers;
static int	max_registered_buffers; /* allocated size */
static int	max_registered_block_id = 0;	/* highest block_id + 1 currently
											 * registered */

/*
 * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
 * with XLogRegisterData(...).
 */
static XLogRecData *mainrdata_head;
static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
static uint64 mainrdata_len;	/* total # of bytes in chain */

/* flags for the in-progress insertion */
static uint8 curinsert_flags = 0;

/*
 * These are used to hold the record header while constructing a record.
 * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
 * because we want it to be MAXALIGNed and padding bytes zeroed.
 *
 * For simplicity, it's allocated large enough to hold the headers for any
 * WAL record.
 */
static XLogRecData hdr_rdt;
static char *hdr_scratch = NULL;

#define SizeOfXlogOrigin	(sizeof(RepOriginId) + sizeof(char))
#define SizeOfXLogTransactionId	(sizeof(TransactionId) + sizeof(char))

#define HEADER_SCRATCH_SIZE \
	(SizeOfXLogRecord + \
	 MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
	 SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
	 SizeOfXLogTransactionId)

/*
 * An array of XLogRecData structs, to hold registered data.
 */
static XLogRecData *rdatas;
static int	num_rdatas;			/* entries currently used */
static int	max_rdatas;			/* allocated size */

static bool begininsert_called = false;

/* Memory context to hold the registered buffer and data references. */
static MemoryContext xloginsert_cxt;

static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
									   XLogRecPtr RedoRecPtr, bool doPageWrites,
									   XLogRecPtr *fpw_lsn, int *num_fpi,
									   bool *topxid_included);
static bool XLogCompressBackupBlock(const PageData *page, uint16 hole_offset,
									uint16 hole_length, void *dest, uint16 *dlen);

/*
 * Begin constructing a WAL record. This must be called before the
 * XLogRegister* functions and XLogInsert().
 */
void
XLogBeginInsert(void)
{
	Assert(max_registered_block_id == 0);
	Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
	Assert(mainrdata_len == 0);

	/* cross-check on whether we should be here or not */
	if (!XLogInsertAllowed())
		elog(ERROR, "cannot make new WAL entries during recovery");

	if (begininsert_called)
		elog(ERROR, "XLogBeginInsert was already called");

	begininsert_called = true;
}

/*
 * Ensure that there are enough buffer and data slots in the working area,
 * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
 * calls.
 *
 * There is always space for a small number of buffers and data chunks, enough
 * for most record types. This function is for the exceptional cases that need
 * more.
 */
void
XLogEnsureRecordSpace(int max_block_id, int ndatas)
{
	int			nbuffers;

	/*
	 * This must be called before entering a critical section, because
	 * allocating memory inside a critical section can fail. repalloc() will
	 * check the same, but better to check it here too so that we fail
	 * consistently even if the arrays happen to be large enough already.
	 */
	Assert(CritSectionCount == 0);

	/* the minimum values can't be decreased */
	if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
		max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
	if (ndatas < XLR_NORMAL_RDATAS)
		ndatas = XLR_NORMAL_RDATAS;

	if (max_block_id > XLR_MAX_BLOCK_ID)
		elog(ERROR, "maximum number of WAL record block references exceeded");
	nbuffers = max_block_id + 1;

	if (nbuffers > max_registered_buffers)
	{
		registered_buffers = (registered_buffer *)
			repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);

		/*
		 * At least the padding bytes in the structs must be zeroed, because
		 * they are included in WAL data, but initialize it all for tidiness.
		 */
		MemSet(&registered_buffers[max_registered_buffers], 0,
			   (nbuffers - max_registered_buffers) * sizeof(registered_buffer));
		max_registered_buffers = nbuffers;
	}

	if (ndatas > max_rdatas)
	{
		rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
		max_rdatas = ndatas;
	}
}

/*
 * Reset WAL record construction buffers.
 */
void
XLogResetInsertion(void)
{
	int			i;

	for (i = 0; i < max_registered_block_id; i++)
		registered_buffers[i].in_use = false;

	num_rdatas = 0;
	max_registered_block_id = 0;
	mainrdata_len = 0;
	mainrdata_last = (XLogRecData *) &mainrdata_head;
	curinsert_flags = 0;
	begininsert_called = false;
}

/*
 * Register a reference to a buffer with the WAL record being constructed.
 * This must be called for every page that the WAL-logged operation modifies.
 */
void
XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
{
	registered_buffer *regbuf;

	/* NO_IMAGE doesn't make sense with FORCE_IMAGE */
	Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
	Assert(begininsert_called);

	/*
	 * Ordinarily, buffer should be exclusive-locked and marked dirty before
	 * we get here, otherwise we could end up violating one of the rules in
	 * access/transam/README.
	 *
	 * Some callers intentionally register a clean page and never update that
	 * page's LSN; in that case they can pass the flag REGBUF_NO_CHANGE to
	 * bypass these checks.
	 */
#ifdef USE_ASSERT_CHECKING
	if (!(flags & REGBUF_NO_CHANGE))
		Assert(BufferIsExclusiveLocked(buffer) && BufferIsDirty(buffer));
#endif

	if (block_id >= max_registered_block_id)
	{
		if (block_id >= max_registered_buffers)
			elog(ERROR, "too many registered buffers");
		max_registered_block_id = block_id + 1;
	}

	regbuf = &registered_buffers[block_id];

	BufferGetTag(buffer, &regbuf->rlocator, &regbuf->forkno, &regbuf->block);
	regbuf->page = BufferGetPage(buffer);
	regbuf->flags = flags;
	regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
	regbuf->rdata_len = 0;

	/*
	 * Check that this page hasn't already been registered with some other
	 * block_id.
	 */
#ifdef USE_ASSERT_CHECKING
	{
		int			i;

		for (i = 0; i < max_registered_block_id; i++)
		{
			registered_buffer *regbuf_old = &registered_buffers[i];

			if (i == block_id || !regbuf_old->in_use)
				continue;

			Assert(!RelFileLocatorEquals(regbuf_old->rlocator, regbuf->rlocator) ||
				   regbuf_old->forkno != regbuf->forkno ||
				   regbuf_old->block != regbuf->block);
		}
	}
#endif

	regbuf->in_use = true;
}

/*
 * Like XLogRegisterBuffer, but for registering a block that's not in the
 * shared buffer pool (i.e. when you don't have a Buffer for it).
 */
void
XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator, ForkNumber forknum,
				  BlockNumber blknum, const PageData *page, uint8 flags)
{
	registered_buffer *regbuf;

	Assert(begininsert_called);

	if (block_id >= max_registered_block_id)
		max_registered_block_id = block_id + 1;

	if (block_id >= max_registered_buffers)
		elog(ERROR, "too many registered buffers");

	regbuf = &registered_buffers[block_id];

	regbuf->rlocator = *rlocator;
	regbuf->forkno = forknum;
	regbuf->block = blknum;
	regbuf->page = page;
	regbuf->flags = flags;
	regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
	regbuf->rdata_len = 0;

	/*
	 * Check that this page hasn't already been registered with some other
	 * block_id.
	 */
#ifdef USE_ASSERT_CHECKING
	{
		int			i;

		for (i = 0; i < max_registered_block_id; i++)
		{
			registered_buffer *regbuf_old = &registered_buffers[i];

			if (i == block_id || !regbuf_old->in_use)
				continue;

			Assert(!RelFileLocatorEquals(regbuf_old->rlocator, regbuf->rlocator) ||
				   regbuf_old->forkno != regbuf->forkno ||
				   regbuf_old->block != regbuf->block);
		}
	}
#endif

	regbuf->in_use = true;
}

/*
 * Add data to the WAL record that's being constructed.
 *
 * The data is appended to the "main chunk", available at replay with
 * XLogRecGetData().
 */
void
XLogRegisterData(const void *data, uint32 len)
{
	XLogRecData *rdata;

	Assert(begininsert_called);

	if (num_rdatas >= max_rdatas)
		ereport(ERROR,
				(errmsg_internal("too much WAL data"),
				 errdetail_internal("%d out of %d data segments are already in use.",
									num_rdatas, max_rdatas)));
	rdata = &rdatas[num_rdatas++];

	rdata->data = data;
	rdata->len = len;

	/*
	 * we use the mainrdata_last pointer to track the end of the chain, so no
	 * need to clear 'next' here.
	 */

	mainrdata_last->next = rdata;
	mainrdata_last = rdata;

	mainrdata_len += len;
}

/*
 * Add buffer-specific data to the WAL record that's being constructed.
 *
 * Block_id must reference a block previously registered with
 * XLogRegisterBuffer(). If this is called more than once for the same
 * block_id, the data is appended.
 *
 * The maximum amount of data that can be registered per block is 65535
 * bytes. That should be plenty; if you need more than BLCKSZ bytes to
 * reconstruct the changes to the page, you might as well just log a full
 * copy of it. (the "main data" that's not associated with a block is not
 * limited)
 */
void
XLogRegisterBufData(uint8 block_id, const void *data, uint32 len)
{
	registered_buffer *regbuf;
	XLogRecData *rdata;

	Assert(begininsert_called);

	/* find the registered buffer struct */
	regbuf = &registered_buffers[block_id];
	if (!regbuf->in_use)
		elog(ERROR, "no block with id %d registered with WAL insertion",
			 block_id);

	/*
	 * Check against max_rdatas and ensure we do not register more data per
	 * buffer than can be handled by the physical data format; i.e. that
	 * regbuf->rdata_len does not grow beyond what
	 * XLogRecordBlockHeader->data_length can hold.
	 */
	if (num_rdatas >= max_rdatas)
		ereport(ERROR,
				(errmsg_internal("too much WAL data"),
				 errdetail_internal("%d out of %d data segments are already in use.",
									num_rdatas, max_rdatas)));
	if (regbuf->rdata_len + len > UINT16_MAX || len > UINT16_MAX)
		ereport(ERROR,
				(errmsg_internal("too much WAL data"),
				 errdetail_internal("Registering more than maximum %u bytes allowed to block %u: current %u bytes, adding %u bytes.",
									UINT16_MAX, block_id, regbuf->rdata_len, len)));

	rdata = &rdatas[num_rdatas++];

	rdata->data = data;
	rdata->len = len;

	regbuf->rdata_tail->next = rdata;
	regbuf->rdata_tail = rdata;
	regbuf->rdata_len += len;
}

/*
 * Set insert status flags for the upcoming WAL record.
 *
 * The flags that can be used here are:
 * - XLOG_INCLUDE_ORIGIN, to determine if the replication origin should be
 *	 included in the record.
 * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
 *	 durability, which allows to avoid triggering WAL archiving and other
 *	 background activity.
 */
void
XLogSetRecordFlags(uint8 flags)
{
	Assert(begininsert_called);
	curinsert_flags |= flags;
}

/*
 * Insert an XLOG record having the specified RMID and info bytes, with the
 * body of the record being the data and buffer references registered earlier
 * with XLogRegister* calls.
 *
 * Returns XLOG pointer to end of record (beginning of next record).
 * This can be used as LSN for data pages affected by the logged action.
 * (LSN is the XLOG point up to which the XLOG must be flushed to disk
 * before the data page can be written out.  This implements the basic
 * WAL rule "write the log before the data".)
 */
XLogRecPtr
XLogInsert(RmgrId rmid, uint8 info)
{
	XLogRecPtr	EndPos;

	/* XLogBeginInsert() must have been called. */
	if (!begininsert_called)
		elog(ERROR, "XLogBeginInsert was not called");

	/*
	 * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
	 * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
	 */
	if ((info & ~(XLR_RMGR_INFO_MASK |
				  XLR_SPECIAL_REL_UPDATE |
				  XLR_CHECK_CONSISTENCY)) != 0)
		elog(PANIC, "invalid xlog info mask %02X", info);

	TRACE_POSTGRESQL_WAL_INSERT(rmid, info);

	/*
	 * In bootstrap mode, we don't actually log anything but XLOG resources;
	 * return a phony record pointer.
	 */
	if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
	{
		XLogResetInsertion();
		EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
		return EndPos;
	}

	do
	{
		XLogRecPtr	RedoRecPtr;
		bool		doPageWrites;
		bool		topxid_included = false;
		XLogRecPtr	fpw_lsn;
		XLogRecData *rdt;
		int			num_fpi = 0;

		/*
		 * Get values needed to decide whether to do full-page writes. Since
		 * we don't yet have an insertion lock, these could change under us,
		 * but XLogInsertRecord will recheck them once it has a lock.
		 */
		GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);

		rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
								 &fpw_lsn, &num_fpi, &topxid_included);

		EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi,
								  topxid_included);
	} while (EndPos == InvalidXLogRecPtr);

	XLogResetInsertion();

	return EndPos;
}

/*
 * Assemble a WAL record from the registered data and buffers into an
 * XLogRecData chain, ready for insertion with XLogInsertRecord().
 *
 * The record header fields are filled in, except for the xl_prev field. The
 * calculated CRC does not include the record header yet.
 *
 * If there are any registered buffers, and a full-page image was not taken
 * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This
 * signals that the assembled record is only good for insertion on the
 * assumption that the RedoRecPtr and doPageWrites values were up-to-date.
 *
 * *topxid_included is set if the topmost transaction ID is logged with the
 * current subtransaction.
 */
static XLogRecData *
XLogRecordAssemble(RmgrId rmid, uint8 info,
				   XLogRecPtr RedoRecPtr, bool doPageWrites,
				   XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included)
{
	XLogRecData *rdt;
	uint64		total_len = 0;
	int			block_id;
	pg_crc32c	rdata_crc;
	registered_buffer *prev_regbuf = NULL;
	XLogRecData *rdt_datas_last;
	XLogRecord *rechdr;
	char	   *scratch = hdr_scratch;

	/*
	 * Note: this function can be called multiple times for the same record.
	 * All the modifications we do to the rdata chains below must handle that.
	 */

	/* The record begins with the fixed-size header */
	rechdr = (XLogRecord *) scratch;
	scratch += SizeOfXLogRecord;

	hdr_rdt.next = NULL;
	rdt_datas_last = &hdr_rdt;
	hdr_rdt.data = hdr_scratch;

	/*
	 * Enforce consistency checks for this record if user is looking for it.
	 * Do this before at the beginning of this routine to give the possibility
	 * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
	 * a record.
	 */
	if (wal_consistency_checking[rmid])
		info |= XLR_CHECK_CONSISTENCY;

	/*
	 * Make an rdata chain containing all the data portions of all block
	 * references. This includes the data for full-page images. Also append
	 * the headers for the block references in the scratch buffer.
	 */
	*fpw_lsn = InvalidXLogRecPtr;
	for (block_id = 0; block_id < max_registered_block_id; block_id++)
	{
		registered_buffer *regbuf = &registered_buffers[block_id];
		bool		needs_backup;
		bool		needs_data;
		XLogRecordBlockHeader bkpb;
		XLogRecordBlockImageHeader bimg;
		XLogRecordBlockCompressHeader cbimg = {0};
		bool		samerel;
		bool		is_compressed = false;
		bool		include_image;

		if (!regbuf->in_use)
			continue;

		/* Determine if this block needs to be backed up */
		if (regbuf->flags & REGBUF_FORCE_IMAGE)
			needs_backup = true;
		else if (regbuf->flags & REGBUF_NO_IMAGE)
			needs_backup = false;
		else if (!doPageWrites)
			needs_backup = false;
		else
		{
			/*
			 * We assume page LSN is first data on *every* page that can be
			 * passed to XLogInsert, whether it has the standard page layout
			 * or not.
			 */
			XLogRecPtr	page_lsn = PageGetLSN(regbuf->page);

			needs_backup = (page_lsn <= RedoRecPtr);
			if (!needs_backup)
			{
				if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
					*fpw_lsn = page_lsn;
			}
		}

		/* Determine if the buffer data needs to included */
		if (regbuf->rdata_len == 0)
			needs_data = false;
		else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
			needs_data = true;
		else
			needs_data = !needs_backup;

		bkpb.id = block_id;
		bkpb.fork_flags = regbuf->forkno;
		bkpb.data_length = 0;

		if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
			bkpb.fork_flags |= BKPBLOCK_WILL_INIT;

		/*
		 * If needs_backup is true or WAL checking is enabled for current
		 * resource manager, log a full-page write for the current block.
		 */
		include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;

		if (include_image)
		{
			const PageData *page = regbuf->page;
			uint16		compressed_len = 0;

			/*
			 * The page needs to be backed up, so calculate its hole length
			 * and offset.
			 */
			if (regbuf->flags & REGBUF_STANDARD)
			{
				/* Assume we can omit data between pd_lower and pd_upper */
				uint16		lower = ((PageHeader) page)->pd_lower;
				uint16		upper = ((PageHeader) page)->pd_upper;

				if (lower >= SizeOfPageHeaderData &&
					upper > lower &&
					upper <= BLCKSZ)
				{
					bimg.hole_offset = lower;
					cbimg.hole_length = upper - lower;
				}
				else
				{
					/* No "hole" to remove */
					bimg.hole_offset = 0;
					cbimg.hole_length = 0;
				}
			}
			else
			{
				/* Not a standard page header, don't try to eliminate "hole" */
				bimg.hole_offset = 0;
				cbimg.hole_length = 0;
			}

			/*
			 * Try to compress a block image if wal_compression is enabled
			 */
			if (wal_compression != WAL_COMPRESSION_NONE)
			{
				is_compressed =
					XLogCompressBackupBlock(page, bimg.hole_offset,
											cbimg.hole_length,
											regbuf->compressed_page,
											&compressed_len);
			}

			/*
			 * Fill in the remaining fields in the XLogRecordBlockHeader
			 * struct
			 */
			bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;

			/* Report a full page image constructed for the WAL record */
			*num_fpi += 1;

			/*
			 * Construct XLogRecData entries for the page content.
			 */
			rdt_datas_last->next = &regbuf->bkp_rdatas[0];
			rdt_datas_last = rdt_datas_last->next;

			bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;

			/*
			 * If WAL consistency checking is enabled for the resource manager
			 * of this WAL record, a full-page image is included in the record
			 * for the block modified. During redo, the full-page is replayed
			 * only if BKPIMAGE_APPLY is set.
			 */
			if (needs_backup)
				bimg.bimg_info |= BKPIMAGE_APPLY;

			if (is_compressed)
			{
				/* The current compression is stored in the WAL record */
				bimg.length = compressed_len;

				/* Set the compression method used for this block */
				switch ((WalCompression) wal_compression)
				{
					case WAL_COMPRESSION_PGLZ:
						bimg.bimg_info |= BKPIMAGE_COMPRESS_PGLZ;
						break;

					case WAL_COMPRESSION_LZ4:
#ifdef USE_LZ4
						bimg.bimg_info |= BKPIMAGE_COMPRESS_LZ4;
#else
						elog(ERROR, "LZ4 is not supported by this build");
#endif
						break;

					case WAL_COMPRESSION_ZSTD:
#ifdef USE_ZSTD
						bimg.bimg_info |= BKPIMAGE_COMPRESS_ZSTD;
#else
						elog(ERROR, "zstd is not supported by this build");
#endif
						break;

					case WAL_COMPRESSION_NONE:
						Assert(false);	/* cannot happen */
						break;
						/* no default case, so that compiler will warn */
				}

				rdt_datas_last->data = regbuf->compressed_page;
				rdt_datas_last->len = compressed_len;
			}
			else
			{
				bimg.length = BLCKSZ - cbimg.hole_length;

				if (cbimg.hole_length == 0)
				{
					rdt_datas_last->data = page;
					rdt_datas_last->len = BLCKSZ;
				}
				else
				{
					/* must skip the hole */
					rdt_datas_last->data = page;
					rdt_datas_last->len = bimg.hole_offset;

					rdt_datas_last->next = &regbuf->bkp_rdatas[1];
					rdt_datas_last = rdt_datas_last->next;

					rdt_datas_last->data =
						page + (bimg.hole_offset + cbimg.hole_length);
					rdt_datas_last->len =
						BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
				}
			}

			total_len += bimg.length;
		}

		if (needs_data)
		{
			/*
			 * When copying to XLogRecordBlockHeader, the length is narrowed
			 * to an uint16.  Double-check that it is still correct.
			 */
			Assert(regbuf->rdata_len <= UINT16_MAX);

			/*
			 * Link the caller-supplied rdata chain for this buffer to the
			 * overall list.
			 */
			bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
			bkpb.data_length = (uint16) regbuf->rdata_len;
			total_len += regbuf->rdata_len;

			rdt_datas_last->next = regbuf->rdata_head;
			rdt_datas_last = regbuf->rdata_tail;
		}

		if (prev_regbuf && RelFileLocatorEquals(regbuf->rlocator, prev_regbuf->rlocator))
		{
			samerel = true;
			bkpb.fork_flags |= BKPBLOCK_SAME_REL;
		}
		else
			samerel = false;
		prev_regbuf = regbuf;

		/* Ok, copy the header to the scratch buffer */
		memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
		scratch += SizeOfXLogRecordBlockHeader;
		if (include_image)
		{
			memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
			scratch += SizeOfXLogRecordBlockImageHeader;
			if (cbimg.hole_length != 0 && is_compressed)
			{
				memcpy(scratch, &cbimg,
					   SizeOfXLogRecordBlockCompressHeader);
				scratch += SizeOfXLogRecordBlockCompressHeader;
			}
		}
		if (!samerel)
		{
			memcpy(scratch, &regbuf->rlocator, sizeof(RelFileLocator));
			scratch += sizeof(RelFileLocator);
		}
		memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
		scratch += sizeof(BlockNumber);
	}

	/* followed by the record's origin, if any */
	if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
		replorigin_session_origin != InvalidRepOriginId)
	{
		*(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
		memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
		scratch += sizeof(replorigin_session_origin);
	}

	/* followed by toplevel XID, if not already included in previous record */
	if (IsSubxactTopXidLogPending())
	{
		TransactionId xid = GetTopTransactionIdIfAny();

		/* Set the flag that the top xid is included in the WAL */
		*topxid_included = true;

		*(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
		memcpy(scratch, &xid, sizeof(TransactionId));
		scratch += sizeof(TransactionId);
	}

	/* followed by main data, if any */
	if (mainrdata_len > 0)
	{
		if (mainrdata_len > 255)
		{
			uint32		mainrdata_len_4b;

			if (mainrdata_len > PG_UINT32_MAX)
				ereport(ERROR,
						(errmsg_internal("too much WAL data"),
						 errdetail_internal("Main data length is %" PRIu64 " bytes for a maximum of %u bytes.",
											mainrdata_len,
											PG_UINT32_MAX)));

			mainrdata_len_4b = (uint32) mainrdata_len;
			*(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
			memcpy(scratch, &mainrdata_len_4b, sizeof(uint32));
			scratch += sizeof(uint32);
		}
		else
		{
			*(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
			*(scratch++) = (uint8) mainrdata_len;
		}
		rdt_datas_last->next = mainrdata_head;
		rdt_datas_last = mainrdata_last;
		total_len += mainrdata_len;
	}
	rdt_datas_last->next = NULL;

	hdr_rdt.len = (scratch - hdr_scratch);
	total_len += hdr_rdt.len;

	/*
	 * Calculate CRC of the data
	 *
	 * Note that the record header isn't added into the CRC initially since we
	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
	 * the whole record in the order: rdata, then backup blocks, then record
	 * header.
	 */
	INIT_CRC32C(rdata_crc);
	COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
	for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);

	/*
	 * Ensure that the XLogRecord is not too large.
	 *
	 * XLogReader machinery is only able to handle records up to a certain
	 * size (ignoring machine resource limitations), so make sure that we will
	 * not emit records larger than the sizes advertised to be supported.
	 */
	if (total_len > XLogRecordMaxSize)
		ereport(ERROR,
				(errmsg_internal("oversized WAL record"),
				 errdetail_internal("WAL record would be %" PRIu64 " bytes (of maximum %u bytes); rmid %u flags %u.",
									total_len, XLogRecordMaxSize, rmid, info)));

	/*
	 * Fill in the fields in the record header. Prev-link is filled in later,
	 * once we know where in the WAL the record will be inserted. The CRC does
	 * not include the record header yet.
	 */
	rechdr->xl_xid = GetCurrentTransactionIdIfAny();
	rechdr->xl_tot_len = (uint32) total_len;
	rechdr->xl_info = info;
	rechdr->xl_rmid = rmid;
	rechdr->xl_prev = InvalidXLogRecPtr;
	rechdr->xl_crc = rdata_crc;

	return &hdr_rdt;
}

/*
 * Create a compressed version of a backup block image.
 *
 * Returns false if compression fails (i.e., compressed result is actually
 * bigger than original). Otherwise, returns true and sets 'dlen' to
 * the length of compressed block image.
 */
static bool
XLogCompressBackupBlock(const PageData *page, uint16 hole_offset, uint16 hole_length,
						void *dest, uint16 *dlen)
{
	int32		orig_len = BLCKSZ - hole_length;
	int32		len = -1;
	int32		extra_bytes = 0;
	const void *source;
	PGAlignedBlock tmp;

	if (hole_length != 0)
	{
		/* must skip the hole */
		memcpy(tmp.data, page, hole_offset);
		memcpy(tmp.data + hole_offset,
			   page + (hole_offset + hole_length),
			   BLCKSZ - (hole_length + hole_offset));
		source = tmp.data;

		/*
		 * Extra data needs to be stored in WAL record for the compressed
		 * version of block image if the hole exists.
		 */
		extra_bytes = SizeOfXLogRecordBlockCompressHeader;
	}
	else
		source = page;

	switch ((WalCompression) wal_compression)
	{
		case WAL_COMPRESSION_PGLZ:
			len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
			break;

		case WAL_COMPRESSION_LZ4:
#ifdef USE_LZ4
			len = LZ4_compress_default(source, dest, orig_len,
									   COMPRESS_BUFSIZE);
			if (len <= 0)
				len = -1;		/* failure */
#else
			elog(ERROR, "LZ4 is not supported by this build");
#endif
			break;

		case WAL_COMPRESSION_ZSTD:
#ifdef USE_ZSTD
			len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len,
								ZSTD_CLEVEL_DEFAULT);
			if (ZSTD_isError(len))
				len = -1;		/* failure */
#else
			elog(ERROR, "zstd is not supported by this build");
#endif
			break;

		case WAL_COMPRESSION_NONE:
			Assert(false);		/* cannot happen */
			break;
			/* no default case, so that compiler will warn */
	}

	/*
	 * We recheck the actual size even if compression reports success and see
	 * if the number of bytes saved by compression is larger than the length
	 * of extra data needed for the compressed version of block image.
	 */
	if (len >= 0 &&
		len + extra_bytes < orig_len)
	{
		*dlen = (uint16) len;	/* successful compression */
		return true;
	}
	return false;
}

/*
 * Determine whether the buffer referenced has to be backed up.
 *
 * Since we don't yet have the insert lock, fullPageWrites and runningBackups
 * (which forces full-page writes) could change later, so the result should
 * be used for optimization purposes only.
 */
bool
XLogCheckBufferNeedsBackup(Buffer buffer)
{
	XLogRecPtr	RedoRecPtr;
	bool		doPageWrites;
	Page		page;

	GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);

	page = BufferGetPage(buffer);

	if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
		return true;			/* buffer requires backup */

	return false;				/* buffer does not need to be backed up */
}

/*
 * Write a backup block if needed when we are setting a hint. Note that
 * this may be called for a variety of page types, not just heaps.
 *
 * Callable while holding just share lock on the buffer content.
 *
 * We can't use the plain backup block mechanism since that relies on the
 * Buffer being exclusively locked. Since some modifications (setting LSN, hint
 * bits) are allowed in a sharelocked buffer that can lead to wal checksum
 * failures. So instead we copy the page and insert the copied data as normal
 * record data.
 *
 * We only need to do something if page has not yet been full page written in
 * this checkpoint round. The LSN of the inserted wal record is returned if we
 * had to write, InvalidXLogRecPtr otherwise.
 *
 * It is possible that multiple concurrent backends could attempt to write WAL
 * records. In that case, multiple copies of the same block would be recorded
 * in separate WAL records by different backends, though that is still OK from
 * a correctness perspective.
 */
XLogRecPtr
XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
{
	XLogRecPtr	recptr = InvalidXLogRecPtr;
	XLogRecPtr	lsn;
	XLogRecPtr	RedoRecPtr;

	/*
	 * Ensure no checkpoint can change our view of RedoRecPtr.
	 */
	Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) != 0);

	/*
	 * Update RedoRecPtr so that we can make the right decision
	 */
	RedoRecPtr = GetRedoRecPtr();

	/*
	 * We assume page LSN is first data on *every* page that can be passed to
	 * XLogInsert, whether it has the standard page layout or not. Since we're
	 * only holding a share-lock on the page, we must take the buffer header
	 * lock when we look at the LSN.
	 */
	lsn = BufferGetLSNAtomic(buffer);

	if (lsn <= RedoRecPtr)
	{
		int			flags = 0;
		PGAlignedBlock copied_buffer;
		char	   *origdata = (char *) BufferGetBlock(buffer);
		RelFileLocator rlocator;
		ForkNumber	forkno;
		BlockNumber blkno;

		/*
		 * Copy buffer so we don't have to worry about concurrent hint bit or
		 * lsn updates. We assume pd_lower/upper cannot be changed without an
		 * exclusive lock, so the contents bkp are not racy.
		 */
		if (buffer_std)
		{
			/* Assume we can omit data between pd_lower and pd_upper */
			Page		page = BufferGetPage(buffer);
			uint16		lower = ((PageHeader) page)->pd_lower;
			uint16		upper = ((PageHeader) page)->pd_upper;

			memcpy(copied_buffer.data, origdata, lower);
			memcpy(copied_buffer.data + upper, origdata + upper, BLCKSZ - upper);
		}
		else
			memcpy(copied_buffer.data, origdata, BLCKSZ);

		XLogBeginInsert();

		if (buffer_std)
			flags |= REGBUF_STANDARD;

		BufferGetTag(buffer, &rlocator, &forkno, &blkno);
		XLogRegisterBlock(0, &rlocator, forkno, blkno, copied_buffer.data, flags);

		recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
	}

	return recptr;
}

/*
 * Write a WAL record containing a full image of a page. Caller is responsible
 * for writing the page to disk after calling this routine.
 *
 * Note: If you're using this function, you should be building pages in private
 * memory and writing them directly to smgr.  If you're using buffers, call
 * log_newpage_buffer instead.
 *
 * If the page follows the standard page layout, with a PageHeader and unused
 * space between pd_lower and pd_upper, set 'page_std' to true. That allows
 * the unused space to be left out from the WAL record, making it smaller.
 */
XLogRecPtr
log_newpage(RelFileLocator *rlocator, ForkNumber forknum, BlockNumber blkno,
			Page page, bool page_std)
{
	int			flags;
	XLogRecPtr	recptr;

	flags = REGBUF_FORCE_IMAGE;
	if (page_std)
		flags |= REGBUF_STANDARD;

	XLogBeginInsert();
	XLogRegisterBlock(0, rlocator, forknum, blkno, page, flags);
	recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);

	/*
	 * The page may be uninitialized. If so, we can't set the LSN because that
	 * would corrupt the page.
	 */
	if (!PageIsNew(page))
	{
		PageSetLSN(page, recptr);
	}

	return recptr;
}

/*
 * Like log_newpage(), but allows logging multiple pages in one operation.
 * It is more efficient than calling log_newpage() for each page separately,
 * because we can write multiple pages in a single WAL record.
 */
void
log_newpages(RelFileLocator *rlocator, ForkNumber forknum, int num_pages,
			 BlockNumber *blknos, Page *pages, bool page_std)
{
	int			flags;
	XLogRecPtr	recptr;
	int			i;
	int			j;

	flags = REGBUF_FORCE_IMAGE;
	if (page_std)
		flags |= REGBUF_STANDARD;

	/*
	 * Iterate over all the pages. They are collected into batches of
	 * XLR_MAX_BLOCK_ID pages, and a single WAL-record is written for each
	 * batch.
	 */
	XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);

	i = 0;
	while (i < num_pages)
	{
		int			batch_start = i;
		int			nbatch;

		XLogBeginInsert();

		nbatch = 0;
		while (nbatch < XLR_MAX_BLOCK_ID && i < num_pages)
		{
			XLogRegisterBlock(nbatch, rlocator, forknum, blknos[i], pages[i], flags);
			i++;
			nbatch++;
		}

		recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);

		for (j = batch_start; j < i; j++)
		{
			/*
			 * The page may be uninitialized. If so, we can't set the LSN
			 * because that would corrupt the page.
			 */
			if (!PageIsNew(pages[j]))
			{
				PageSetLSN(pages[j], recptr);
			}
		}
	}
}

/*
 * Write a WAL record containing a full image of a page.
 *
 * Caller should initialize the buffer and mark it dirty before calling this
 * function.  This function will set the page LSN.
 *
 * If the page follows the standard page layout, with a PageHeader and unused
 * space between pd_lower and pd_upper, set 'page_std' to true. That allows
 * the unused space to be left out from the WAL record, making it smaller.
 */
XLogRecPtr
log_newpage_buffer(Buffer buffer, bool page_std)
{
	Page		page = BufferGetPage(buffer);
	RelFileLocator rlocator;
	ForkNumber	forknum;
	BlockNumber blkno;

	/* Shared buffers should be modified in a critical section. */
	Assert(CritSectionCount > 0);

	BufferGetTag(buffer, &rlocator, &forknum, &blkno);

	return log_newpage(&rlocator, forknum, blkno, page, page_std);
}

/*
 * WAL-log a range of blocks in a relation.
 *
 * An image of all pages with block numbers 'startblk' <= X < 'endblk' is
 * written to the WAL. If the range is large, this is done in multiple WAL
 * records.
 *
 * If all page follows the standard page layout, with a PageHeader and unused
 * space between pd_lower and pd_upper, set 'page_std' to true. That allows
 * the unused space to be left out from the WAL records, making them smaller.
 *
 * NOTE: This function acquires exclusive-locks on the pages. Typically, this
 * is used on a newly-built relation, and the caller is holding a
 * AccessExclusiveLock on it, so no other backend can be accessing it at the
 * same time. If that's not the case, you must ensure that this does not
 * cause a deadlock through some other means.
 */
void
log_newpage_range(Relation rel, ForkNumber forknum,
				  BlockNumber startblk, BlockNumber endblk,
				  bool page_std)
{
	int			flags;
	BlockNumber blkno;

	flags = REGBUF_FORCE_IMAGE;
	if (page_std)
		flags |= REGBUF_STANDARD;

	/*
	 * Iterate over all the pages in the range. They are collected into
	 * batches of XLR_MAX_BLOCK_ID pages, and a single WAL-record is written
	 * for each batch.
	 */
	XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);

	blkno = startblk;
	while (blkno < endblk)
	{
		Buffer		bufpack[XLR_MAX_BLOCK_ID];
		XLogRecPtr	recptr;
		int			nbufs;
		int			i;

		CHECK_FOR_INTERRUPTS();

		/* Collect a batch of blocks. */
		nbufs = 0;
		while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
		{
			Buffer		buf = ReadBufferExtended(rel, forknum, blkno,
												 RBM_NORMAL, NULL);

			LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);

			/*
			 * Completely empty pages are not WAL-logged. Writing a WAL record
			 * would change the LSN, and we don't want that. We want the page
			 * to stay empty.
			 */
			if (!PageIsNew(BufferGetPage(buf)))
				bufpack[nbufs++] = buf;
			else
				UnlockReleaseBuffer(buf);
			blkno++;
		}

		/* Nothing more to do if all remaining blocks were empty. */
		if (nbufs == 0)
			break;

		/* Write WAL record for this batch. */
		XLogBeginInsert();

		START_CRIT_SECTION();
		for (i = 0; i < nbufs; i++)
		{
			MarkBufferDirty(bufpack[i]);
			XLogRegisterBuffer(i, bufpack[i], flags);
		}

		recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);

		for (i = 0; i < nbufs; i++)
		{
			PageSetLSN(BufferGetPage(bufpack[i]), recptr);
			UnlockReleaseBuffer(bufpack[i]);
		}
		END_CRIT_SECTION();
	}
}

/*
 * Allocate working buffers needed for WAL record construction.
 */
void
InitXLogInsert(void)
{
#ifdef USE_ASSERT_CHECKING

	/*
	 * Check that any records assembled can be decoded.  This is capped based
	 * on what XLogReader would require at its maximum bound.  The XLOG_BLCKSZ
	 * addend covers the larger allocate_recordbuf() demand.  This code path
	 * is called once per backend, more than enough for this check.
	 */
	size_t		max_required =
		DecodeXLogRecordRequiredSpace(XLogRecordMaxSize + XLOG_BLCKSZ);

	Assert(AllocSizeIsValid(max_required));
#endif

	/* Initialize the working areas */
	if (xloginsert_cxt == NULL)
	{
		xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
											   "WAL record construction",
											   ALLOCSET_DEFAULT_SIZES);
	}

	if (registered_buffers == NULL)
	{
		registered_buffers = (registered_buffer *)
			MemoryContextAllocZero(xloginsert_cxt,
								   sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
		max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
	}
	if (rdatas == NULL)
	{
		rdatas = MemoryContextAlloc(xloginsert_cxt,
									sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
		max_rdatas = XLR_NORMAL_RDATAS;
	}

	/*
	 * Allocate a buffer to hold the header information for a WAL record.
	 */
	if (hdr_scratch == NULL)
		hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
											 HEADER_SCRATCH_SIZE);
}
