/*-------------------------------------------------------------------------
 *
 * logtape.c
 *	  Management of "logical tapes" within temporary files.
 *
 * This module exists to support sorting via multiple merge passes (see
 * tuplesort.c).  Merging is an ideal algorithm for tape devices, but if
 * we implement it on disk by creating a separate file for each "tape",
 * there is an annoying problem: the peak space usage is at least twice
 * the volume of actual data to be sorted.  (This must be so because each
 * datum will appear in both the input and output tapes of the final
 * merge pass.)
 *
 * We can work around this problem by recognizing that any one tape
 * dataset (with the possible exception of the final output) is written
 * and read exactly once in a perfectly sequential manner.  Therefore,
 * a datum once read will not be required again, and we can recycle its
 * space for use by the new tape dataset(s) being generated.  In this way,
 * the total space usage is essentially just the actual data volume, plus
 * insignificant bookkeeping and start/stop overhead.
 *
 * Few OSes allow arbitrary parts of a file to be released back to the OS,
 * so we have to implement this space-recycling ourselves within a single
 * logical file.  logtape.c exists to perform this bookkeeping and provide
 * the illusion of N independent tape devices to tuplesort.c.  Note that
 * logtape.c itself depends on buffile.c to provide a "logical file" of
 * larger size than the underlying OS may support.
 *
 * For simplicity, we allocate and release space in the underlying file
 * in BLCKSZ-size blocks.  Space allocation boils down to keeping track
 * of which blocks in the underlying file belong to which logical tape,
 * plus any blocks that are free (recycled and not yet reused).
 * The blocks in each logical tape form a chain, with a prev- and next-
 * pointer in each block.
 *
 * The initial write pass is guaranteed to fill the underlying file
 * perfectly sequentially, no matter how data is divided into logical tapes.
 * Once we begin merge passes, the access pattern becomes considerably
 * less predictable --- but the seeking involved should be comparable to
 * what would happen if we kept each logical tape in a separate file,
 * so there's no serious performance penalty paid to obtain the space
 * savings of recycling.  We try to localize the write accesses by always
 * writing to the lowest-numbered free block when we have a choice; it's
 * not clear this helps much, but it can't hurt.  (XXX perhaps a LIFO
 * policy for free blocks would be better?)
 *
 * To further make the I/Os more sequential, we can use a larger buffer
 * when reading, and read multiple blocks from the same tape in one go,
 * whenever the buffer becomes empty.
 *
 * To support the above policy of writing to the lowest free block, the
 * freelist is a min heap.
 *
 * Since all the bookkeeping and buffer memory is allocated with palloc(),
 * and the underlying file(s) are made with OpenTemporaryFile, all resources
 * for a logical tape set are certain to be cleaned up even if processing
 * is aborted by ereport(ERROR).  To avoid confusion, the caller should take
 * care that all calls for a single LogicalTapeSet are made in the same
 * palloc context.
 *
 * To support parallel sort operations involving coordinated callers to
 * tuplesort.c routines across multiple workers, it is necessary to
 * concatenate each worker BufFile/tapeset into one single logical tapeset
 * managed by the leader.  Workers should have produced one final
 * materialized tape (their entire output) when this happens in leader.
 * There will always be the same number of runs as input tapes, and the same
 * number of input tapes as participants (worker Tuplesortstates).
 *
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
 *	  src/backend/utils/sort/logtape.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include <fcntl.h>

#include "storage/buffile.h"
#include "utils/builtins.h"
#include "utils/logtape.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"

/*
 * A TapeBlockTrailer is stored at the end of each BLCKSZ block.
 *
 * The first block of a tape has prev == -1.  The last block of a tape
 * stores the number of valid bytes on the block, inverted, in 'next'
 * Therefore next < 0 indicates the last block.
 */
typedef struct TapeBlockTrailer
{
	int64		prev;			/* previous block on this tape, or -1 on first
								 * block */
	int64		next;			/* next block on this tape, or # of valid
								 * bytes on last block (if < 0) */
} TapeBlockTrailer;

#define TapeBlockPayloadSize  (BLCKSZ - sizeof(TapeBlockTrailer))
#define TapeBlockGetTrailer(buf) \
	((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))

#define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0)
#define TapeBlockGetNBytes(buf) \
	(TapeBlockIsLast(buf) ? \
	 (- TapeBlockGetTrailer(buf)->next) : TapeBlockPayloadSize)
#define TapeBlockSetNBytes(buf, nbytes) \
	(TapeBlockGetTrailer(buf)->next = -(nbytes))

/*
 * When multiple tapes are being written to concurrently (as in HashAgg),
 * avoid excessive fragmentation by preallocating block numbers to individual
 * tapes. Each preallocation doubles in size starting at
 * TAPE_WRITE_PREALLOC_MIN blocks up to TAPE_WRITE_PREALLOC_MAX blocks.
 *
 * No filesystem operations are performed for preallocation; only the block
 * numbers are reserved. This may lead to sparse writes, which will cause
 * ltsWriteBlock() to fill in holes with zeros.
 */
#define TAPE_WRITE_PREALLOC_MIN 8
#define TAPE_WRITE_PREALLOC_MAX 128

/*
 * This data structure represents a single "logical tape" within the set
 * of logical tapes stored in the same file.
 *
 * While writing, we hold the current partially-written data block in the
 * buffer.  While reading, we can hold multiple blocks in the buffer.  Note
 * that we don't retain the trailers of a block when it's read into the
 * buffer.  The buffer therefore contains one large contiguous chunk of data
 * from the tape.
 */
struct LogicalTape
{
	LogicalTapeSet *tapeSet;	/* tape set this tape is part of */

	bool		writing;		/* T while in write phase */
	bool		frozen;			/* T if blocks should not be freed when read */
	bool		dirty;			/* does buffer need to be written? */

	/*
	 * Block numbers of the first, current, and next block of the tape.
	 *
	 * The "current" block number is only valid when writing, or reading from
	 * a frozen tape.  (When reading from an unfrozen tape, we use a larger
	 * read buffer that holds multiple blocks, so the "current" block is
	 * ambiguous.)
	 *
	 * When concatenation of worker tape BufFiles is performed, an offset to
	 * the first block in the unified BufFile space is applied during reads.
	 */
	int64		firstBlockNumber;
	int64		curBlockNumber;
	int64		nextBlockNumber;
	int64		offsetBlockNumber;

	/*
	 * Buffer for current data block(s).
	 */
	char	   *buffer;			/* physical buffer (separately palloc'd) */
	int			buffer_size;	/* allocated size of the buffer */
	int			max_size;		/* highest useful, safe buffer_size */
	int			pos;			/* next read/write position in buffer */
	int			nbytes;			/* total # of valid bytes in buffer */

	/*
	 * Preallocated block numbers are held in an array sorted in descending
	 * order; blocks are consumed from the end of the array (lowest block
	 * numbers first).
	 */
	int64	   *prealloc;
	int			nprealloc;		/* number of elements in list */
	int			prealloc_size;	/* number of elements list can hold */
};

/*
 * This data structure represents a set of related "logical tapes" sharing
 * space in a single underlying file.  (But that "file" may be multiple files
 * if needed to escape OS limits on file size; buffile.c handles that for us.)
 * Tapes belonging to a tape set can be created and destroyed on-the-fly, on
 * demand.
 */
struct LogicalTapeSet
{
	BufFile    *pfile;			/* underlying file for whole tape set */
	SharedFileSet *fileset;
	int			worker;			/* worker # if shared, -1 for leader/serial */

	/*
	 * File size tracking.  nBlocksWritten is the size of the underlying file,
	 * in BLCKSZ blocks.  nBlocksAllocated is the number of blocks allocated
	 * by ltsReleaseBlock(), and it is always greater than or equal to
	 * nBlocksWritten.  Blocks between nBlocksAllocated and nBlocksWritten are
	 * blocks that have been allocated for a tape, but have not been written
	 * to the underlying file yet.  nHoleBlocks tracks the total number of
	 * blocks that are in unused holes between worker spaces following BufFile
	 * concatenation.
	 */
	int64		nBlocksAllocated;	/* # of blocks allocated */
	int64		nBlocksWritten; /* # of blocks used in underlying file */
	int64		nHoleBlocks;	/* # of "hole" blocks left */

	/*
	 * We store the numbers of recycled-and-available blocks in freeBlocks[].
	 * When there are no such blocks, we extend the underlying file.
	 *
	 * If forgetFreeSpace is true then any freed blocks are simply forgotten
	 * rather than being remembered in freeBlocks[].  See notes for
	 * LogicalTapeSetForgetFreeSpace().
	 */
	bool		forgetFreeSpace;	/* are we remembering free blocks? */
	int64	   *freeBlocks;		/* resizable array holding minheap */
	int64		nFreeBlocks;	/* # of currently free blocks */
	Size		freeBlocksLen;	/* current allocated length of freeBlocks[] */
	bool		enable_prealloc;	/* preallocate write blocks? */
};

static LogicalTape *ltsCreateTape(LogicalTapeSet *lts);
static void ltsWriteBlock(LogicalTapeSet *lts, int64 blocknum, const void *buffer);
static void ltsReadBlock(LogicalTapeSet *lts, int64 blocknum, void *buffer);
static int64 ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt);
static int64 ltsGetFreeBlock(LogicalTapeSet *lts);
static int64 ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt);
static void ltsReleaseBlock(LogicalTapeSet *lts, int64 blocknum);
static void ltsInitReadBuffer(LogicalTape *lt);


/*
 * Write a block-sized buffer to the specified block of the underlying file.
 *
 * No need for an error return convention; we ereport() on any error.
 */
static void
ltsWriteBlock(LogicalTapeSet *lts, int64 blocknum, const void *buffer)
{
	/*
	 * BufFile does not support "holes", so if we're about to write a block
	 * that's past the current end of file, fill the space between the current
	 * end of file and the target block with zeros.
	 *
	 * This can happen either when tapes preallocate blocks; or for the last
	 * block of a tape which might not have been flushed.
	 *
	 * Note that BufFile concatenation can leave "holes" in BufFile between
	 * worker-owned block ranges.  These are tracked for reporting purposes
	 * only.  We never read from nor write to these hole blocks, and so they
	 * are not considered here.
	 */
	while (blocknum > lts->nBlocksWritten)
	{
		PGIOAlignedBlock zerobuf;

		MemSet(zerobuf.data, 0, sizeof(zerobuf));

		ltsWriteBlock(lts, lts->nBlocksWritten, zerobuf.data);
	}

	/* Write the requested block */
	if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not seek to block %" PRId64 " of temporary file",
						blocknum)));
	BufFileWrite(lts->pfile, buffer, BLCKSZ);

	/* Update nBlocksWritten, if we extended the file */
	if (blocknum == lts->nBlocksWritten)
		lts->nBlocksWritten++;
}

/*
 * Read a block-sized buffer from the specified block of the underlying file.
 *
 * No need for an error return convention; we ereport() on any error.   This
 * module should never attempt to read a block it doesn't know is there.
 */
static void
ltsReadBlock(LogicalTapeSet *lts, int64 blocknum, void *buffer)
{
	if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not seek to block %" PRId64 " of temporary file",
						blocknum)));
	BufFileReadExact(lts->pfile, buffer, BLCKSZ);
}

/*
 * Read as many blocks as we can into the per-tape buffer.
 *
 * Returns true if anything was read, 'false' on EOF.
 */
static bool
ltsReadFillBuffer(LogicalTape *lt)
{
	lt->pos = 0;
	lt->nbytes = 0;

	do
	{
		char	   *thisbuf = lt->buffer + lt->nbytes;
		int64		datablocknum = lt->nextBlockNumber;

		/* Fetch next block number */
		if (datablocknum == -1L)
			break;				/* EOF */
		/* Apply worker offset, needed for leader tapesets */
		datablocknum += lt->offsetBlockNumber;

		/* Read the block */
		ltsReadBlock(lt->tapeSet, datablocknum, thisbuf);
		if (!lt->frozen)
			ltsReleaseBlock(lt->tapeSet, datablocknum);
		lt->curBlockNumber = lt->nextBlockNumber;

		lt->nbytes += TapeBlockGetNBytes(thisbuf);
		if (TapeBlockIsLast(thisbuf))
		{
			lt->nextBlockNumber = -1L;
			/* EOF */
			break;
		}
		else
			lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;

		/* Advance to next block, if we have buffer space left */
	} while (lt->buffer_size - lt->nbytes > BLCKSZ);

	return (lt->nbytes > 0);
}

static inline uint64
left_offset(uint64 i)
{
	return 2 * i + 1;
}

static inline uint64
right_offset(uint64 i)
{
	return 2 * i + 2;
}

static inline uint64
parent_offset(uint64 i)
{
	return (i - 1) / 2;
}

/*
 * Get the next block for writing.
 */
static int64
ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt)
{
	if (lts->enable_prealloc)
		return ltsGetPreallocBlock(lts, lt);
	else
		return ltsGetFreeBlock(lts);
}

/*
 * Select the lowest currently unused block from the tape set's global free
 * list min heap.
 */
static int64
ltsGetFreeBlock(LogicalTapeSet *lts)
{
	int64	   *heap = lts->freeBlocks;
	int64		blocknum;
	int64		heapsize;
	int64		holeval;
	uint64		holepos;

	/* freelist empty; allocate a new block */
	if (lts->nFreeBlocks == 0)
		return lts->nBlocksAllocated++;

	/* easy if heap contains one element */
	if (lts->nFreeBlocks == 1)
	{
		lts->nFreeBlocks--;
		return lts->freeBlocks[0];
	}

	/* remove top of minheap */
	blocknum = heap[0];

	/* we'll replace it with end of minheap array */
	holeval = heap[--lts->nFreeBlocks];

	/* sift down */
	holepos = 0;				/* holepos is where the "hole" is */
	heapsize = lts->nFreeBlocks;
	while (true)
	{
		uint64		left = left_offset(holepos);
		uint64		right = right_offset(holepos);
		uint64		min_child;

		if (left < heapsize && right < heapsize)
			min_child = (heap[left] < heap[right]) ? left : right;
		else if (left < heapsize)
			min_child = left;
		else if (right < heapsize)
			min_child = right;
		else
			break;

		if (heap[min_child] >= holeval)
			break;

		heap[holepos] = heap[min_child];
		holepos = min_child;
	}
	heap[holepos] = holeval;

	return blocknum;
}

/*
 * Return the lowest free block number from the tape's preallocation list.
 * Refill the preallocation list with blocks from the tape set's free list if
 * necessary.
 */
static int64
ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt)
{
	/* sorted in descending order, so return the last element */
	if (lt->nprealloc > 0)
		return lt->prealloc[--lt->nprealloc];

	if (lt->prealloc == NULL)
	{
		lt->prealloc_size = TAPE_WRITE_PREALLOC_MIN;
		lt->prealloc = (int64 *) palloc(sizeof(int64) * lt->prealloc_size);
	}
	else if (lt->prealloc_size < TAPE_WRITE_PREALLOC_MAX)
	{
		/* when the preallocation list runs out, double the size */
		lt->prealloc_size *= 2;
		if (lt->prealloc_size > TAPE_WRITE_PREALLOC_MAX)
			lt->prealloc_size = TAPE_WRITE_PREALLOC_MAX;
		lt->prealloc = (int64 *) repalloc(lt->prealloc,
										  sizeof(int64) * lt->prealloc_size);
	}

	/* refill preallocation list */
	lt->nprealloc = lt->prealloc_size;
	for (int i = lt->nprealloc; i > 0; i--)
	{
		lt->prealloc[i - 1] = ltsGetFreeBlock(lts);

		/* verify descending order */
		Assert(i == lt->nprealloc || lt->prealloc[i - 1] > lt->prealloc[i]);
	}

	return lt->prealloc[--lt->nprealloc];
}

/*
 * Return a block# to the freelist.
 */
static void
ltsReleaseBlock(LogicalTapeSet *lts, int64 blocknum)
{
	int64	   *heap;
	uint64		holepos;

	/*
	 * Do nothing if we're no longer interested in remembering free space.
	 */
	if (lts->forgetFreeSpace)
		return;

	/*
	 * Enlarge freeBlocks array if full.
	 */
	if (lts->nFreeBlocks >= lts->freeBlocksLen)
	{
		/*
		 * If the freelist becomes very large, just return and leak this free
		 * block.
		 */
		if (lts->freeBlocksLen * 2 * sizeof(int64) > MaxAllocSize)
			return;

		lts->freeBlocksLen *= 2;
		lts->freeBlocks = (int64 *) repalloc(lts->freeBlocks,
											 lts->freeBlocksLen * sizeof(int64));
	}

	/* create a "hole" at end of minheap array */
	heap = lts->freeBlocks;
	holepos = lts->nFreeBlocks;
	lts->nFreeBlocks++;

	/* sift up to insert blocknum */
	while (holepos != 0)
	{
		uint64		parent = parent_offset(holepos);

		if (heap[parent] < blocknum)
			break;

		heap[holepos] = heap[parent];
		holepos = parent;
	}
	heap[holepos] = blocknum;
}

/*
 * Lazily allocate and initialize the read buffer. This avoids waste when many
 * tapes are open at once, but not all are active between rewinding and
 * reading.
 */
static void
ltsInitReadBuffer(LogicalTape *lt)
{
	Assert(lt->buffer_size > 0);
	lt->buffer = palloc(lt->buffer_size);

	/* Read the first block, or reset if tape is empty */
	lt->nextBlockNumber = lt->firstBlockNumber;
	lt->pos = 0;
	lt->nbytes = 0;
	ltsReadFillBuffer(lt);
}

/*
 * Create a tape set, backed by a temporary underlying file.
 *
 * The tape set is initially empty. Use LogicalTapeCreate() to create
 * tapes in it.
 *
 * In a single-process sort, pass NULL argument for fileset, and -1 for
 * worker.
 *
 * In a parallel sort, parallel workers pass the shared fileset handle and
 * their own worker number.  After the workers have finished, create the
 * tape set in the leader, passing the shared fileset handle and -1 for
 * worker, and use LogicalTapeImport() to import the worker tapes into it.
 *
 * Currently, the leader will only import worker tapes into the set, it does
 * not create tapes of its own, although in principle that should work.
 *
 * If preallocate is true, blocks for each individual tape are allocated in
 * batches.  This avoids fragmentation when writing multiple tapes at the
 * same time.
 */
LogicalTapeSet *
LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
{
	LogicalTapeSet *lts;

	/*
	 * Create top-level struct including per-tape LogicalTape structs.
	 */
	lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet));
	lts->nBlocksAllocated = 0L;
	lts->nBlocksWritten = 0L;
	lts->nHoleBlocks = 0L;
	lts->forgetFreeSpace = false;
	lts->freeBlocksLen = 32;	/* reasonable initial guess */
	lts->freeBlocks = (int64 *) palloc(lts->freeBlocksLen * sizeof(int64));
	lts->nFreeBlocks = 0;
	lts->enable_prealloc = preallocate;

	lts->fileset = fileset;
	lts->worker = worker;

	/*
	 * Create temp BufFile storage as required.
	 *
	 * In leader, we hijack the BufFile of the first tape that's imported, and
	 * concatenate the BufFiles of any subsequent tapes to that. Hence don't
	 * create a BufFile here. Things are simpler for the worker case and the
	 * serial case, though.  They are generally very similar -- workers use a
	 * shared fileset, whereas serial sorts use a conventional serial BufFile.
	 */
	if (fileset && worker == -1)
		lts->pfile = NULL;
	else if (fileset)
	{
		char		filename[MAXPGPATH];

		pg_itoa(worker, filename);
		lts->pfile = BufFileCreateFileSet(&fileset->fs, filename);
	}
	else
		lts->pfile = BufFileCreateTemp(false);

	return lts;
}

/*
 * Claim ownership of a logical tape from an existing shared BufFile.
 *
 * Caller should be leader process.  Though tapes are marked as frozen in
 * workers, they are not frozen when opened within leader, since unfrozen tapes
 * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
 * for random access.)
 */
LogicalTape *
LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
{
	LogicalTape *lt;
	int64		tapeblocks;
	char		filename[MAXPGPATH];
	BufFile    *file;
	int64		filesize;

	lt = ltsCreateTape(lts);

	/*
	 * build concatenated view of all buffiles, remembering the block number
	 * where each source file begins.
	 */
	pg_itoa(worker, filename);
	file = BufFileOpenFileSet(&lts->fileset->fs, filename, O_RDONLY, false);
	filesize = BufFileSize(file);

	/*
	 * Stash first BufFile, and concatenate subsequent BufFiles to that. Store
	 * block offset into each tape as we go.
	 */
	lt->firstBlockNumber = shared->firstblocknumber;
	if (lts->pfile == NULL)
	{
		lts->pfile = file;
		lt->offsetBlockNumber = 0L;
	}
	else
	{
		lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
	}
	/* Don't allocate more for read buffer than could possibly help */
	lt->max_size = Min(MaxAllocSize, filesize);
	tapeblocks = filesize / BLCKSZ;

	/*
	 * Update # of allocated blocks and # blocks written to reflect the
	 * imported BufFile.  Allocated/written blocks include space used by holes
	 * left between concatenated BufFiles.  Also track the number of hole
	 * blocks so that we can later work backwards to calculate the number of
	 * physical blocks for instrumentation.
	 */
	lts->nHoleBlocks += lt->offsetBlockNumber - lts->nBlocksAllocated;

	lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
	lts->nBlocksWritten = lts->nBlocksAllocated;

	return lt;
}

/*
 * Close a logical tape set and release all resources.
 *
 * NOTE: This doesn't close any of the tapes!  You must close them
 * first, or you can let them be destroyed along with the memory context.
 */
void
LogicalTapeSetClose(LogicalTapeSet *lts)
{
	BufFileClose(lts->pfile);
	pfree(lts->freeBlocks);
	pfree(lts);
}

/*
 * Create a logical tape in the given tapeset.
 *
 * The tape is initialized in write state.
 */
LogicalTape *
LogicalTapeCreate(LogicalTapeSet *lts)
{
	/*
	 * The only thing that currently prevents creating new tapes in leader is
	 * the fact that BufFiles opened using BufFileOpenShared() are read-only
	 * by definition, but that could be changed if it seemed worthwhile.  For
	 * now, writing to the leader tape will raise a "Bad file descriptor"
	 * error, so tuplesort must avoid writing to the leader tape altogether.
	 */
	if (lts->fileset && lts->worker == -1)
		elog(ERROR, "cannot create new tapes in leader process");

	return ltsCreateTape(lts);
}

static LogicalTape *
ltsCreateTape(LogicalTapeSet *lts)
{
	LogicalTape *lt;

	/*
	 * Create per-tape struct.  Note we allocate the I/O buffer lazily.
	 */
	lt = palloc(sizeof(LogicalTape));
	lt->tapeSet = lts;
	lt->writing = true;
	lt->frozen = false;
	lt->dirty = false;
	lt->firstBlockNumber = -1L;
	lt->curBlockNumber = -1L;
	lt->nextBlockNumber = -1L;
	lt->offsetBlockNumber = 0L;
	lt->buffer = NULL;
	lt->buffer_size = 0;
	/* palloc() larger than MaxAllocSize would fail */
	lt->max_size = MaxAllocSize;
	lt->pos = 0;
	lt->nbytes = 0;
	lt->prealloc = NULL;
	lt->nprealloc = 0;
	lt->prealloc_size = 0;

	return lt;
}

/*
 * Close a logical tape.
 *
 * Note: This doesn't return any blocks to the free list!  You must read
 * the tape to the end first, to reuse the space.  In current use, though,
 * we only close tapes after fully reading them.
 */
void
LogicalTapeClose(LogicalTape *lt)
{
	if (lt->buffer)
		pfree(lt->buffer);
	pfree(lt);
}

/*
 * Mark a logical tape set as not needing management of free space anymore.
 *
 * This should be called if the caller does not intend to write any more data
 * into the tape set, but is reading from un-frozen tapes.  Since no more
 * writes are planned, remembering free blocks is no longer useful.  Setting
 * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
 * is not designed to handle large numbers of free blocks.
 */
void
LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
{
	lts->forgetFreeSpace = true;
}

/*
 * Write to a logical tape.
 *
 * There are no error returns; we ereport() on failure.
 */
void
LogicalTapeWrite(LogicalTape *lt, const void *ptr, size_t size)
{
	LogicalTapeSet *lts = lt->tapeSet;
	size_t		nthistime;

	Assert(lt->writing);
	Assert(lt->offsetBlockNumber == 0L);

	/* Allocate data buffer and first block on first write */
	if (lt->buffer == NULL)
	{
		lt->buffer = (char *) palloc(BLCKSZ);
		lt->buffer_size = BLCKSZ;
	}
	if (lt->curBlockNumber == -1)
	{
		Assert(lt->firstBlockNumber == -1);
		Assert(lt->pos == 0);

		lt->curBlockNumber = ltsGetBlock(lts, lt);
		lt->firstBlockNumber = lt->curBlockNumber;

		TapeBlockGetTrailer(lt->buffer)->prev = -1L;
	}

	Assert(lt->buffer_size == BLCKSZ);
	while (size > 0)
	{
		if (lt->pos >= (int) TapeBlockPayloadSize)
		{
			/* Buffer full, dump it out */
			int64		nextBlockNumber;

			if (!lt->dirty)
			{
				/* Hmm, went directly from reading to writing? */
				elog(ERROR, "invalid logtape state: should be dirty");
			}

			/*
			 * First allocate the next block, so that we can store it in the
			 * 'next' pointer of this block.
			 */
			nextBlockNumber = ltsGetBlock(lt->tapeSet, lt);

			/* set the next-pointer and dump the current block. */
			TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
			ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, lt->buffer);

			/* initialize the prev-pointer of the next block */
			TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
			lt->curBlockNumber = nextBlockNumber;
			lt->pos = 0;
			lt->nbytes = 0;
		}

		nthistime = TapeBlockPayloadSize - lt->pos;
		if (nthistime > size)
			nthistime = size;
		Assert(nthistime > 0);

		memcpy(lt->buffer + lt->pos, ptr, nthistime);

		lt->dirty = true;
		lt->pos += nthistime;
		if (lt->nbytes < lt->pos)
			lt->nbytes = lt->pos;
		ptr = (const char *) ptr + nthistime;
		size -= nthistime;
	}
}

/*
 * Rewind logical tape and switch from writing to reading.
 *
 * The tape must currently be in writing state, or "frozen" in read state.
 *
 * 'buffer_size' specifies how much memory to use for the read buffer.
 * Regardless of the argument, the actual amount of memory used is between
 * BLCKSZ and MaxAllocSize, and is a multiple of BLCKSZ.  The given value is
 * rounded down and truncated to fit those constraints, if necessary.  If the
 * tape is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ
 * byte buffer is used.
 */
void
LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
{
	LogicalTapeSet *lts = lt->tapeSet;

	/*
	 * Round and cap buffer_size if needed.
	 */
	if (lt->frozen)
		buffer_size = BLCKSZ;
	else
	{
		/* need at least one block */
		if (buffer_size < BLCKSZ)
			buffer_size = BLCKSZ;

		/* palloc() larger than max_size is unlikely to be helpful */
		if (buffer_size > lt->max_size)
			buffer_size = lt->max_size;

		/* round down to BLCKSZ boundary */
		buffer_size -= buffer_size % BLCKSZ;
	}

	if (lt->writing)
	{
		/*
		 * Completion of a write phase.  Flush last partial data block, and
		 * rewind for normal (destructive) read.
		 */
		if (lt->dirty)
		{
			/*
			 * As long as we've filled the buffer at least once, its contents
			 * are entirely defined from valgrind's point of view, even though
			 * contents beyond the current end point may be stale.  But it's
			 * possible - at least in the case of a parallel sort - to sort
			 * such small amount of data that we do not fill the buffer even
			 * once.  Tell valgrind that its contents are defined, so it
			 * doesn't bleat.
			 */
			VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes,
									  lt->buffer_size - lt->nbytes);

			TapeBlockSetNBytes(lt->buffer, lt->nbytes);
			ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, lt->buffer);
		}
		lt->writing = false;
	}
	else
	{
		/*
		 * This is only OK if tape is frozen; we rewind for (another) read
		 * pass.
		 */
		Assert(lt->frozen);
	}

	if (lt->buffer)
		pfree(lt->buffer);

	/* the buffer is lazily allocated, but set the size here */
	lt->buffer = NULL;
	lt->buffer_size = buffer_size;

	/* free the preallocation list, and return unused block numbers */
	if (lt->prealloc != NULL)
	{
		for (int i = lt->nprealloc; i > 0; i--)
			ltsReleaseBlock(lts, lt->prealloc[i - 1]);
		pfree(lt->prealloc);
		lt->prealloc = NULL;
		lt->nprealloc = 0;
		lt->prealloc_size = 0;
	}
}

/*
 * Read from a logical tape.
 *
 * Early EOF is indicated by return value less than #bytes requested.
 */
size_t
LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
{
	size_t		nread = 0;
	size_t		nthistime;

	Assert(!lt->writing);

	if (lt->buffer == NULL)
		ltsInitReadBuffer(lt);

	while (size > 0)
	{
		if (lt->pos >= lt->nbytes)
		{
			/* Try to load more data into buffer. */
			if (!ltsReadFillBuffer(lt))
				break;			/* EOF */
		}

		nthistime = lt->nbytes - lt->pos;
		if (nthistime > size)
			nthistime = size;
		Assert(nthistime > 0);

		memcpy(ptr, lt->buffer + lt->pos, nthistime);

		lt->pos += nthistime;
		ptr = (char *) ptr + nthistime;
		size -= nthistime;
		nread += nthistime;
	}

	return nread;
}

/*
 * "Freeze" the contents of a tape so that it can be read multiple times
 * and/or read backwards.  Once a tape is frozen, its contents will not
 * be released until the LogicalTapeSet is destroyed.  This is expected
 * to be used only for the final output pass of a merge.
 *
 * This *must* be called just at the end of a write pass, before the
 * tape is rewound (after rewind is too late!).  It performs a rewind
 * and switch to read mode "for free".  An immediately following rewind-
 * for-read call is OK but not necessary.
 *
 * share output argument is set with details of storage used for tape after
 * freezing, which may be passed to LogicalTapeSetCreate within leader
 * process later.  This metadata is only of interest to worker callers
 * freezing their final output for leader (single materialized tape).
 * Serial sorts should set share to NULL.
 */
void
LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
{
	LogicalTapeSet *lts = lt->tapeSet;

	Assert(lt->writing);
	Assert(lt->offsetBlockNumber == 0L);

	/*
	 * Completion of a write phase.  Flush last partial data block, and rewind
	 * for nondestructive read.
	 */
	if (lt->dirty)
	{
		/*
		 * As long as we've filled the buffer at least once, its contents are
		 * entirely defined from valgrind's point of view, even though
		 * contents beyond the current end point may be stale.  But it's
		 * possible - at least in the case of a parallel sort - to sort such
		 * small amount of data that we do not fill the buffer even once. Tell
		 * valgrind that its contents are defined, so it doesn't bleat.
		 */
		VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes,
								  lt->buffer_size - lt->nbytes);

		TapeBlockSetNBytes(lt->buffer, lt->nbytes);
		ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, lt->buffer);
	}
	lt->writing = false;
	lt->frozen = true;

	/*
	 * The seek and backspace functions assume a single block read buffer.
	 * That's OK with current usage.  A larger buffer is helpful to make the
	 * read pattern of the backing file look more sequential to the OS, when
	 * we're reading from multiple tapes.  But at the end of a sort, when a
	 * tape is frozen, we only read from a single tape anyway.
	 */
	if (!lt->buffer || lt->buffer_size != BLCKSZ)
	{
		if (lt->buffer)
			pfree(lt->buffer);
		lt->buffer = palloc(BLCKSZ);
		lt->buffer_size = BLCKSZ;
	}

	/* Read the first block, or reset if tape is empty */
	lt->curBlockNumber = lt->firstBlockNumber;
	lt->pos = 0;
	lt->nbytes = 0;

	if (lt->firstBlockNumber == -1L)
		lt->nextBlockNumber = -1L;
	ltsReadBlock(lt->tapeSet, lt->curBlockNumber, lt->buffer);
	if (TapeBlockIsLast(lt->buffer))
		lt->nextBlockNumber = -1L;
	else
		lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
	lt->nbytes = TapeBlockGetNBytes(lt->buffer);

	/* Handle extra steps when caller is to share its tapeset */
	if (share)
	{
		BufFileExportFileSet(lts->pfile);
		share->firstblocknumber = lt->firstBlockNumber;
	}
}

/*
 * Backspace the tape a given number of bytes.  (We also support a more
 * general seek interface, see below.)
 *
 * *Only* a frozen-for-read tape can be backed up; we don't support
 * random access during write, and an unfrozen read tape may have
 * already discarded the desired data!
 *
 * Returns the number of bytes backed up.  It can be less than the
 * requested amount, if there isn't that much data before the current
 * position.  The tape is positioned to the beginning of the tape in
 * that case.
 */
size_t
LogicalTapeBackspace(LogicalTape *lt, size_t size)
{
	size_t		seekpos = 0;

	Assert(lt->frozen);
	Assert(lt->buffer_size == BLCKSZ);

	if (lt->buffer == NULL)
		ltsInitReadBuffer(lt);

	/*
	 * Easy case for seek within current block.
	 */
	if (size <= (size_t) lt->pos)
	{
		lt->pos -= (int) size;
		return size;
	}

	/*
	 * Not-so-easy case, have to walk back the chain of blocks.  This
	 * implementation would be pretty inefficient for long seeks, but we
	 * really aren't doing that (a seek over one tuple is typical).
	 */
	seekpos = (size_t) lt->pos; /* part within this block */
	while (size > seekpos)
	{
		int64		prev = TapeBlockGetTrailer(lt->buffer)->prev;

		if (prev == -1L)
		{
			/* Tried to back up beyond the beginning of tape. */
			if (lt->curBlockNumber != lt->firstBlockNumber)
				elog(ERROR, "unexpected end of tape");
			lt->pos = 0;
			return seekpos;
		}

		ltsReadBlock(lt->tapeSet, prev, lt->buffer);

		if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
			elog(ERROR, "broken tape, next of block %" PRId64 " is %" PRId64 ", expected %" PRId64,
				 prev,
				 TapeBlockGetTrailer(lt->buffer)->next,
				 lt->curBlockNumber);

		lt->nbytes = TapeBlockPayloadSize;
		lt->curBlockNumber = prev;
		lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;

		seekpos += TapeBlockPayloadSize;
	}

	/*
	 * 'seekpos' can now be greater than 'size', because it points to the
	 * beginning the target block.  The difference is the position within the
	 * page.
	 */
	lt->pos = seekpos - size;
	return size;
}

/*
 * Seek to an arbitrary position in a logical tape.
 *
 * *Only* a frozen-for-read tape can be seeked.
 *
 * Must be called with a block/offset previously returned by
 * LogicalTapeTell().
 */
void
LogicalTapeSeek(LogicalTape *lt, int64 blocknum, int offset)
{
	Assert(lt->frozen);
	Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
	Assert(lt->buffer_size == BLCKSZ);

	if (lt->buffer == NULL)
		ltsInitReadBuffer(lt);

	if (blocknum != lt->curBlockNumber)
	{
		ltsReadBlock(lt->tapeSet, blocknum, lt->buffer);
		lt->curBlockNumber = blocknum;
		lt->nbytes = TapeBlockPayloadSize;
		lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
	}

	if (offset > lt->nbytes)
		elog(ERROR, "invalid tape seek position");
	lt->pos = offset;
}

/*
 * Obtain current position in a form suitable for a later LogicalTapeSeek.
 *
 * NOTE: it'd be OK to do this during write phase with intention of using
 * the position for a seek after freezing.  Not clear if anyone needs that.
 */
void
LogicalTapeTell(LogicalTape *lt, int64 *blocknum, int *offset)
{
	if (lt->buffer == NULL)
		ltsInitReadBuffer(lt);

	Assert(lt->offsetBlockNumber == 0L);

	/* With a larger buffer, 'pos' wouldn't be the same as offset within page */
	Assert(lt->buffer_size == BLCKSZ);

	*blocknum = lt->curBlockNumber;
	*offset = lt->pos;
}

/*
 * Obtain total disk space currently used by a LogicalTapeSet, in blocks. Does
 * not account for open write buffer, if any.
 */
int64
LogicalTapeSetBlocks(LogicalTapeSet *lts)
{
	return lts->nBlocksWritten - lts->nHoleBlocks;
}
