/*-------------------------------------------------------------------------
 *
 * relcache.c
 *	  POSTGRES relation descriptor cache code
 *
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *	  src/backend/utils/cache/relcache.c
 *
 *-------------------------------------------------------------------------
 */
/*
 * INTERFACE ROUTINES
 *		RelationCacheInitialize			- initialize relcache (to empty)
 *		RelationCacheInitializePhase2	- initialize shared-catalog entries
 *		RelationCacheInitializePhase3	- finish initializing relcache
 *		RelationIdGetRelation			- get a reldesc by relation id
 *		RelationClose					- close an open relation
 *
 * NOTES
 *		The following code contains many undocumented hacks.  Please be
 *		careful....
 */
#include "postgres.h"

#include <sys/file.h>
#include <fcntl.h>
#include <unistd.h>

#include "access/htup_details.h"
#include "access/multixact.h"
#include "access/parallel.h"
#include "access/reloptions.h"
#include "access/sysattr.h"
#include "access/table.h"
#include "access/tableam.h"
#include "access/tupdesc_details.h"
#include "access/xact.h"
#include "catalog/binary_upgrade.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/partition.h"
#include "catalog/pg_am.h"
#include "catalog/pg_amproc.h"
#include "catalog/pg_attrdef.h"
#include "catalog/pg_auth_members.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_database.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_opclass.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_rewrite.h"
#include "catalog/pg_shseclabel.h"
#include "catalog/pg_statistic_ext.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_tablespace.h"
#include "catalog/pg_trigger.h"
#include "catalog/pg_type.h"
#include "catalog/schemapg.h"
#include "catalog/storage.h"
#include "commands/policy.h"
#include "commands/publicationcmds.h"
#include "commands/trigger.h"
#include "common/int.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "rewrite/rewriteDefine.h"
#include "rewrite/rowsecurity.h"
#include "storage/lmgr.h"
#include "storage/smgr.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/datum.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/relmapper.h"
#include "utils/resowner.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"

#define RELCACHE_INIT_FILEMAGIC		0x573266	/* version ID value */

/*
 * Whether to bother checking if relation cache memory needs to be freed
 * eagerly.  See also RelationBuildDesc() and pg_config_manual.h.
 */
#if defined(RECOVER_RELATION_BUILD_MEMORY) && (RECOVER_RELATION_BUILD_MEMORY != 0)
#define MAYBE_RECOVER_RELATION_BUILD_MEMORY 1
#else
#define RECOVER_RELATION_BUILD_MEMORY 0
#ifdef DISCARD_CACHES_ENABLED
#define MAYBE_RECOVER_RELATION_BUILD_MEMORY 1
#endif
#endif

/*
 *		hardcoded tuple descriptors, contents generated by genbki.pl
 */
static const FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
static const FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
static const FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
static const FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
static const FormData_pg_attribute Desc_pg_database[Natts_pg_database] = {Schema_pg_database};
static const FormData_pg_attribute Desc_pg_authid[Natts_pg_authid] = {Schema_pg_authid};
static const FormData_pg_attribute Desc_pg_auth_members[Natts_pg_auth_members] = {Schema_pg_auth_members};
static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
static const FormData_pg_attribute Desc_pg_shseclabel[Natts_pg_shseclabel] = {Schema_pg_shseclabel};
static const FormData_pg_attribute Desc_pg_subscription[Natts_pg_subscription] = {Schema_pg_subscription};

/*
 *		Hash tables that index the relation cache
 *
 *		We used to index the cache by both name and OID, but now there
 *		is only an index by OID.
 */
typedef struct relidcacheent
{
	Oid			reloid;
	Relation	reldesc;
} RelIdCacheEnt;

static HTAB *RelationIdCache;

/*
 * This flag is false until we have prepared the critical relcache entries
 * that are needed to do indexscans on the tables read by relcache building.
 */
bool		criticalRelcachesBuilt = false;

/*
 * This flag is false until we have prepared the critical relcache entries
 * for shared catalogs (which are the tables needed for login).
 */
bool		criticalSharedRelcachesBuilt = false;

/*
 * This counter counts relcache inval events received since backend startup
 * (but only for rels that are actually in cache).  Presently, we use it only
 * to detect whether data about to be written by write_relcache_init_file()
 * might already be obsolete.
 */
static long relcacheInvalsReceived = 0L;

/*
 * in_progress_list is a stack of ongoing RelationBuildDesc() calls.  CREATE
 * INDEX CONCURRENTLY makes catalog changes under ShareUpdateExclusiveLock.
 * It critically relies on each backend absorbing those changes no later than
 * next transaction start.  Hence, RelationBuildDesc() loops until it finishes
 * without accepting a relevant invalidation.  (Most invalidation consumers
 * don't do this.)
 */
typedef struct inprogressent
{
	Oid			reloid;			/* OID of relation being built */
	bool		invalidated;	/* whether an invalidation arrived for it */
} InProgressEnt;

static InProgressEnt *in_progress_list;
static int	in_progress_list_len;
static int	in_progress_list_maxlen;

/*
 * eoxact_list[] stores the OIDs of relations that (might) need AtEOXact
 * cleanup work.  This list intentionally has limited size; if it overflows,
 * we fall back to scanning the whole hashtable.  There is no value in a very
 * large list because (1) at some point, a hash_seq_search scan is faster than
 * retail lookups, and (2) the value of this is to reduce EOXact work for
 * short transactions, which can't have dirtied all that many tables anyway.
 * EOXactListAdd() does not bother to prevent duplicate list entries, so the
 * cleanup processing must be idempotent.
 */
#define MAX_EOXACT_LIST 32
static Oid	eoxact_list[MAX_EOXACT_LIST];
static int	eoxact_list_len = 0;
static bool eoxact_list_overflowed = false;

#define EOXactListAdd(rel) \
	do { \
		if (eoxact_list_len < MAX_EOXACT_LIST) \
			eoxact_list[eoxact_list_len++] = (rel)->rd_id; \
		else \
			eoxact_list_overflowed = true; \
	} while (0)

/*
 * EOXactTupleDescArray stores TupleDescs that (might) need AtEOXact
 * cleanup work.  The array expands as needed; there is no hashtable because
 * we don't need to access individual items except at EOXact.
 */
static TupleDesc *EOXactTupleDescArray;
static int	NextEOXactTupleDescNum = 0;
static int	EOXactTupleDescArrayLen = 0;

/*
 *		macros to manipulate the lookup hashtable
 */
#define RelationCacheInsert(RELATION, replace_allowed)	\
do { \
	RelIdCacheEnt *hentry; bool found; \
	hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
										   &((RELATION)->rd_id), \
										   HASH_ENTER, &found); \
	if (found) \
	{ \
		/* see comments in RelationBuildDesc and RelationBuildLocalRelation */ \
		Relation _old_rel = hentry->reldesc; \
		Assert(replace_allowed); \
		hentry->reldesc = (RELATION); \
		if (RelationHasReferenceCountZero(_old_rel)) \
			RelationDestroyRelation(_old_rel, false); \
		else if (!IsBootstrapProcessingMode()) \
			elog(WARNING, "leaking still-referenced relcache entry for \"%s\"", \
				 RelationGetRelationName(_old_rel)); \
	} \
	else \
		hentry->reldesc = (RELATION); \
} while(0)

#define RelationIdCacheLookup(ID, RELATION) \
do { \
	RelIdCacheEnt *hentry; \
	hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
										   &(ID), \
										   HASH_FIND, NULL); \
	if (hentry) \
		RELATION = hentry->reldesc; \
	else \
		RELATION = NULL; \
} while(0)

#define RelationCacheDelete(RELATION) \
do { \
	RelIdCacheEnt *hentry; \
	hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
										   &((RELATION)->rd_id), \
										   HASH_REMOVE, NULL); \
	if (hentry == NULL) \
		elog(WARNING, "failed to delete relcache entry for OID %u", \
			 (RELATION)->rd_id); \
} while(0)


/*
 * Special cache for opclass-related information
 *
 * Note: only default support procs get cached, ie, those with
 * lefttype = righttype = opcintype.
 */
typedef struct opclasscacheent
{
	Oid			opclassoid;		/* lookup key: OID of opclass */
	bool		valid;			/* set true after successful fill-in */
	StrategyNumber numSupport;	/* max # of support procs (from pg_am) */
	Oid			opcfamily;		/* OID of opclass's family */
	Oid			opcintype;		/* OID of opclass's declared input type */
	RegProcedure *supportProcs; /* OIDs of support procedures */
} OpClassCacheEnt;

static HTAB *OpClassCache = NULL;


/* non-export function prototypes */

static void RelationCloseCleanup(Relation relation);
static void RelationDestroyRelation(Relation relation, bool remember_tupdesc);
static void RelationInvalidateRelation(Relation relation);
static void RelationClearRelation(Relation relation);
static void RelationRebuildRelation(Relation relation);

static void RelationReloadIndexInfo(Relation relation);
static void RelationReloadNailed(Relation relation);
static void RelationFlushRelation(Relation relation);
static void RememberToFreeTupleDescAtEOX(TupleDesc td);
#ifdef USE_ASSERT_CHECKING
static void AssertPendingSyncConsistency(Relation relation);
#endif
static void AtEOXact_cleanup(Relation relation, bool isCommit);
static void AtEOSubXact_cleanup(Relation relation, bool isCommit,
								SubTransactionId mySubid, SubTransactionId parentSubid);
static bool load_relcache_init_file(bool shared);
static void write_relcache_init_file(bool shared);
static void write_item(const void *data, Size len, FILE *fp);

static void formrdesc(const char *relationName, Oid relationReltype,
					  bool isshared, int natts, const FormData_pg_attribute *attrs);

static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic);
static Relation AllocateRelationDesc(Form_pg_class relp);
static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
static void RelationBuildTupleDesc(Relation relation);
static Relation RelationBuildDesc(Oid targetRelId, bool insertIt);
static void RelationInitPhysicalAddr(Relation relation);
static void load_critical_index(Oid indexoid, Oid heapoid);
static TupleDesc GetPgClassDescriptor(void);
static TupleDesc GetPgIndexDescriptor(void);
static void AttrDefaultFetch(Relation relation, int ndef);
static int	AttrDefaultCmp(const void *a, const void *b);
static void CheckNNConstraintFetch(Relation relation);
static int	CheckConstraintCmp(const void *a, const void *b);
static void InitIndexAmRoutine(Relation relation);
static void IndexSupportInitialize(oidvector *indclass,
								   RegProcedure *indexSupport,
								   Oid *opFamily,
								   Oid *opcInType,
								   StrategyNumber maxSupportNumber,
								   AttrNumber maxAttributeNumber);
static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
										  StrategyNumber numSupport);
static void RelationCacheInitFileRemoveInDir(const char *tblspcpath);
static void unlink_initfile(const char *initfilename, int elevel);


/*
 *		ScanPgRelation
 *
 *		This is used by RelationBuildDesc to find a pg_class
 *		tuple matching targetRelId.  The caller must hold at least
 *		AccessShareLock on the target relid to prevent concurrent-update
 *		scenarios; it isn't guaranteed that all scans used to build the
 *		relcache entry will use the same snapshot.  If, for example,
 *		an attribute were to be added after scanning pg_class and before
 *		scanning pg_attribute, relnatts wouldn't match.
 *
 *		NB: the returned tuple has been copied into palloc'd storage
 *		and must eventually be freed with heap_freetuple.
 */
static HeapTuple
ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic)
{
	HeapTuple	pg_class_tuple;
	Relation	pg_class_desc;
	SysScanDesc pg_class_scan;
	ScanKeyData key[1];
	Snapshot	snapshot = NULL;

	/*
	 * If something goes wrong during backend startup, we might find ourselves
	 * trying to read pg_class before we've selected a database.  That ain't
	 * gonna work, so bail out with a useful error message.  If this happens,
	 * it probably means a relcache entry that needs to be nailed isn't.
	 */
	if (!OidIsValid(MyDatabaseId))
		elog(FATAL, "cannot read pg_class without having selected a database");

	/*
	 * form a scan key
	 */
	ScanKeyInit(&key[0],
				Anum_pg_class_oid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(targetRelId));

	/*
	 * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
	 */
	pg_class_desc = table_open(RelationRelationId, AccessShareLock);

	/*
	 * The caller might need a tuple that's newer than what's visible to the
	 * historic snapshot; currently the only case requiring to do so is
	 * looking up the relfilenumber of non mapped system relations during
	 * decoding.
	 */
	if (force_non_historic)
		snapshot = RegisterSnapshot(GetNonHistoricCatalogSnapshot(RelationRelationId));

	pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
									   indexOK && criticalRelcachesBuilt,
									   snapshot,
									   1, key);

	pg_class_tuple = systable_getnext(pg_class_scan);

	/*
	 * Must copy tuple before releasing buffer.
	 */
	if (HeapTupleIsValid(pg_class_tuple))
		pg_class_tuple = heap_copytuple(pg_class_tuple);

	/* all done */
	systable_endscan(pg_class_scan);

	if (snapshot)
		UnregisterSnapshot(snapshot);

	table_close(pg_class_desc, AccessShareLock);

	return pg_class_tuple;
}

/*
 *		AllocateRelationDesc
 *
 *		This is used to allocate memory for a new relation descriptor
 *		and initialize the rd_rel field from the given pg_class tuple.
 */
static Relation
AllocateRelationDesc(Form_pg_class relp)
{
	Relation	relation;
	MemoryContext oldcxt;
	Form_pg_class relationForm;

	/* Relcache entries must live in CacheMemoryContext */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

	/*
	 * allocate and zero space for new relation descriptor
	 */
	relation = (Relation) palloc0(sizeof(RelationData));

	/* make sure relation is marked as having no open file yet */
	relation->rd_smgr = NULL;

	/*
	 * Copy the relation tuple form
	 *
	 * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
	 * variable-length fields (relacl, reloptions) are NOT stored in the
	 * relcache --- there'd be little point in it, since we don't copy the
	 * tuple's nulls bitmap and hence wouldn't know if the values are valid.
	 * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
	 * it from the syscache if you need it.  The same goes for the original
	 * form of reloptions (however, we do store the parsed form of reloptions
	 * in rd_options).
	 */
	relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);

	memcpy(relationForm, relp, CLASS_TUPLE_SIZE);

	/* initialize relation tuple form */
	relation->rd_rel = relationForm;

	/* and allocate attribute tuple form storage */
	relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts);
	/* which we mark as a reference-counted tupdesc */
	relation->rd_att->tdrefcount = 1;

	MemoryContextSwitchTo(oldcxt);

	return relation;
}

/*
 * RelationParseRelOptions
 *		Convert pg_class.reloptions into pre-parsed rd_options
 *
 * tuple is the real pg_class tuple (not rd_rel!) for relation
 *
 * Note: rd_rel and (if an index) rd_indam must be valid already
 */
static void
RelationParseRelOptions(Relation relation, HeapTuple tuple)
{
	bytea	   *options;
	amoptions_function amoptsfn;

	relation->rd_options = NULL;

	/*
	 * Look up any AM-specific parse function; fall out if relkind should not
	 * have options.
	 */
	switch (relation->rd_rel->relkind)
	{
		case RELKIND_RELATION:
		case RELKIND_TOASTVALUE:
		case RELKIND_VIEW:
		case RELKIND_MATVIEW:
		case RELKIND_PARTITIONED_TABLE:
			amoptsfn = NULL;
			break;
		case RELKIND_INDEX:
		case RELKIND_PARTITIONED_INDEX:
			amoptsfn = relation->rd_indam->amoptions;
			break;
		default:
			return;
	}

	/*
	 * Fetch reloptions from tuple; have to use a hardwired descriptor because
	 * we might not have any other for pg_class yet (consider executing this
	 * code for pg_class itself)
	 */
	options = extractRelOptions(tuple, GetPgClassDescriptor(), amoptsfn);

	/*
	 * Copy parsed data into CacheMemoryContext.  To guard against the
	 * possibility of leaks in the reloptions code, we want to do the actual
	 * parsing in the caller's memory context and copy the results into
	 * CacheMemoryContext after the fact.
	 */
	if (options)
	{
		relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
												  VARSIZE(options));
		memcpy(relation->rd_options, options, VARSIZE(options));
		pfree(options);
	}
}

/*
 *		RelationBuildTupleDesc
 *
 *		Form the relation's tuple descriptor from information in
 *		the pg_attribute, pg_attrdef & pg_constraint system catalogs.
 */
static void
RelationBuildTupleDesc(Relation relation)
{
	HeapTuple	pg_attribute_tuple;
	Relation	pg_attribute_desc;
	SysScanDesc pg_attribute_scan;
	ScanKeyData skey[2];
	int			need;
	TupleConstr *constr;
	AttrMissing *attrmiss = NULL;
	int			ndef = 0;

	/* fill rd_att's type ID fields (compare heap.c's AddNewRelationTuple) */
	relation->rd_att->tdtypeid =
		relation->rd_rel->reltype ? relation->rd_rel->reltype : RECORDOID;
	relation->rd_att->tdtypmod = -1;	/* just to be sure */

	constr = (TupleConstr *) MemoryContextAllocZero(CacheMemoryContext,
													sizeof(TupleConstr));

	/*
	 * Form a scan key that selects only user attributes (attnum > 0).
	 * (Eliminating system attribute rows at the index level is lots faster
	 * than fetching them.)
	 */
	ScanKeyInit(&skey[0],
				Anum_pg_attribute_attrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
	ScanKeyInit(&skey[1],
				Anum_pg_attribute_attnum,
				BTGreaterStrategyNumber, F_INT2GT,
				Int16GetDatum(0));

	/*
	 * Open pg_attribute and begin a scan.  Force heap scan if we haven't yet
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).
	 */
	pg_attribute_desc = table_open(AttributeRelationId, AccessShareLock);
	pg_attribute_scan = systable_beginscan(pg_attribute_desc,
										   AttributeRelidNumIndexId,
										   criticalRelcachesBuilt,
										   NULL,
										   2, skey);

	/*
	 * add attribute data to relation->rd_att
	 */
	need = RelationGetNumberOfAttributes(relation);

	while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
	{
		Form_pg_attribute attp;
		int			attnum;

		attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);

		attnum = attp->attnum;
		if (attnum <= 0 || attnum > RelationGetNumberOfAttributes(relation))
			elog(ERROR, "invalid attribute number %d for relation \"%s\"",
				 attp->attnum, RelationGetRelationName(relation));

		memcpy(TupleDescAttr(relation->rd_att, attnum - 1),
			   attp,
			   ATTRIBUTE_FIXED_PART_SIZE);

		populate_compact_attribute(relation->rd_att, attnum - 1);

		/* Update constraint/default info */
		if (attp->attnotnull)
			constr->has_not_null = true;
		if (attp->attgenerated == ATTRIBUTE_GENERATED_STORED)
			constr->has_generated_stored = true;
		if (attp->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
			constr->has_generated_virtual = true;
		if (attp->atthasdef)
			ndef++;

		/* If the column has a "missing" value, put it in the attrmiss array */
		if (attp->atthasmissing)
		{
			Datum		missingval;
			bool		missingNull;

			/* Do we have a missing value? */
			missingval = heap_getattr(pg_attribute_tuple,
									  Anum_pg_attribute_attmissingval,
									  pg_attribute_desc->rd_att,
									  &missingNull);
			if (!missingNull)
			{
				/* Yes, fetch from the array */
				MemoryContext oldcxt;
				bool		is_null;
				int			one = 1;
				Datum		missval;

				if (attrmiss == NULL)
					attrmiss = (AttrMissing *)
						MemoryContextAllocZero(CacheMemoryContext,
											   relation->rd_rel->relnatts *
											   sizeof(AttrMissing));

				missval = array_get_element(missingval,
											1,
											&one,
											-1,
											attp->attlen,
											attp->attbyval,
											attp->attalign,
											&is_null);
				Assert(!is_null);
				if (attp->attbyval)
				{
					/* for copy by val just copy the datum direct */
					attrmiss[attnum - 1].am_value = missval;
				}
				else
				{
					/* otherwise copy in the correct context */
					oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
					attrmiss[attnum - 1].am_value = datumCopy(missval,
															  attp->attbyval,
															  attp->attlen);
					MemoryContextSwitchTo(oldcxt);
				}
				attrmiss[attnum - 1].am_present = true;
			}
		}
		need--;
		if (need == 0)
			break;
	}

	/*
	 * end the scan and close the attribute relation
	 */
	systable_endscan(pg_attribute_scan);
	table_close(pg_attribute_desc, AccessShareLock);

	if (need != 0)
		elog(ERROR, "pg_attribute catalog is missing %d attribute(s) for relation OID %u",
			 need, RelationGetRelid(relation));

	/*
	 * We can easily set the attcacheoff value for the first attribute: it
	 * must be zero.  This eliminates the need for special cases for attnum=1
	 * that used to exist in fastgetattr() and index_getattr().
	 */
	if (RelationGetNumberOfAttributes(relation) > 0)
		TupleDescCompactAttr(relation->rd_att, 0)->attcacheoff = 0;

	/*
	 * Set up constraint/default info
	 */
	if (constr->has_not_null ||
		constr->has_generated_stored ||
		constr->has_generated_virtual ||
		ndef > 0 ||
		attrmiss ||
		relation->rd_rel->relchecks > 0)
	{
		bool		is_catalog = IsCatalogRelation(relation);

		relation->rd_att->constr = constr;

		if (ndef > 0)			/* DEFAULTs */
			AttrDefaultFetch(relation, ndef);
		else
			constr->num_defval = 0;

		constr->missing = attrmiss;

		/* CHECK and NOT NULLs */
		if (relation->rd_rel->relchecks > 0 ||
			(!is_catalog && constr->has_not_null))
			CheckNNConstraintFetch(relation);

		/*
		 * Any not-null constraint that wasn't marked invalid by
		 * CheckNNConstraintFetch must necessarily be valid; make it so in the
		 * CompactAttribute array.
		 */
		if (!is_catalog)
		{
			for (int i = 0; i < relation->rd_rel->relnatts; i++)
			{
				CompactAttribute *attr;

				attr = TupleDescCompactAttr(relation->rd_att, i);

				if (attr->attnullability == ATTNULLABLE_UNKNOWN)
					attr->attnullability = ATTNULLABLE_VALID;
				else
					Assert(attr->attnullability == ATTNULLABLE_INVALID ||
						   attr->attnullability == ATTNULLABLE_UNRESTRICTED);
			}
		}

		if (relation->rd_rel->relchecks == 0)
			constr->num_check = 0;
	}
	else
	{
		pfree(constr);
		relation->rd_att->constr = NULL;
	}
}

/*
 *		RelationBuildRuleLock
 *
 *		Form the relation's rewrite rules from information in
 *		the pg_rewrite system catalog.
 *
 * Note: The rule parsetrees are potentially very complex node structures.
 * To allow these trees to be freed when the relcache entry is flushed,
 * we make a private memory context to hold the RuleLock information for
 * each relcache entry that has associated rules.  The context is used
 * just for rule info, not for any other subsidiary data of the relcache
 * entry, because that keeps the update logic in RelationRebuildRelation()
 * manageable.  The other subsidiary data structures are simple enough
 * to be easy to free explicitly, anyway.
 *
 * Note: The relation's reloptions must have been extracted first.
 */
static void
RelationBuildRuleLock(Relation relation)
{
	MemoryContext rulescxt;
	MemoryContext oldcxt;
	HeapTuple	rewrite_tuple;
	Relation	rewrite_desc;
	TupleDesc	rewrite_tupdesc;
	SysScanDesc rewrite_scan;
	ScanKeyData key;
	RuleLock   *rulelock;
	int			numlocks;
	RewriteRule **rules;
	int			maxlocks;

	/*
	 * Make the private context.  Assume it'll not contain much data.
	 */
	rulescxt = AllocSetContextCreate(CacheMemoryContext,
									 "relation rules",
									 ALLOCSET_SMALL_SIZES);
	relation->rd_rulescxt = rulescxt;
	MemoryContextCopyAndSetIdentifier(rulescxt,
									  RelationGetRelationName(relation));

	/*
	 * allocate an array to hold the rewrite rules (the array is extended if
	 * necessary)
	 */
	maxlocks = 4;
	rules = (RewriteRule **)
		MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
	numlocks = 0;

	/*
	 * form a scan key
	 */
	ScanKeyInit(&key,
				Anum_pg_rewrite_ev_class,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));

	/*
	 * open pg_rewrite and begin a scan
	 *
	 * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
	 * be reading the rules in name order, except possibly during
	 * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
	 * ensures that rules will be fired in name order.
	 */
	rewrite_desc = table_open(RewriteRelationId, AccessShareLock);
	rewrite_tupdesc = RelationGetDescr(rewrite_desc);
	rewrite_scan = systable_beginscan(rewrite_desc,
									  RewriteRelRulenameIndexId,
									  true, NULL,
									  1, &key);

	while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
	{
		Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
		bool		isnull;
		Datum		rule_datum;
		char	   *rule_str;
		RewriteRule *rule;
		Oid			check_as_user;

		rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
												  sizeof(RewriteRule));

		rule->ruleId = rewrite_form->oid;

		rule->event = rewrite_form->ev_type - '0';
		rule->enabled = rewrite_form->ev_enabled;
		rule->isInstead = rewrite_form->is_instead;

		/*
		 * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
		 * rule strings are often large enough to be toasted.  To avoid
		 * leaking memory in the caller's context, do the detoasting here so
		 * we can free the detoasted version.
		 */
		rule_datum = heap_getattr(rewrite_tuple,
								  Anum_pg_rewrite_ev_action,
								  rewrite_tupdesc,
								  &isnull);
		Assert(!isnull);
		rule_str = TextDatumGetCString(rule_datum);
		oldcxt = MemoryContextSwitchTo(rulescxt);
		rule->actions = (List *) stringToNode(rule_str);
		MemoryContextSwitchTo(oldcxt);
		pfree(rule_str);

		rule_datum = heap_getattr(rewrite_tuple,
								  Anum_pg_rewrite_ev_qual,
								  rewrite_tupdesc,
								  &isnull);
		Assert(!isnull);
		rule_str = TextDatumGetCString(rule_datum);
		oldcxt = MemoryContextSwitchTo(rulescxt);
		rule->qual = (Node *) stringToNode(rule_str);
		MemoryContextSwitchTo(oldcxt);
		pfree(rule_str);

		/*
		 * If this is a SELECT rule defining a view, and the view has
		 * "security_invoker" set, we must perform all permissions checks on
		 * relations referred to by the rule as the invoking user.
		 *
		 * In all other cases (including non-SELECT rules on security invoker
		 * views), perform the permissions checks as the relation owner.
		 */
		if (rule->event == CMD_SELECT &&
			relation->rd_rel->relkind == RELKIND_VIEW &&
			RelationHasSecurityInvoker(relation))
			check_as_user = InvalidOid;
		else
			check_as_user = relation->rd_rel->relowner;

		/*
		 * Scan through the rule's actions and set the checkAsUser field on
		 * all RTEPermissionInfos. We have to look at the qual as well, in
		 * case it contains sublinks.
		 *
		 * The reason for doing this when the rule is loaded, rather than when
		 * it is stored, is that otherwise ALTER TABLE OWNER would have to
		 * grovel through stored rules to update checkAsUser fields. Scanning
		 * the rule tree during load is relatively cheap (compared to
		 * constructing it in the first place), so we do it here.
		 */
		setRuleCheckAsUser((Node *) rule->actions, check_as_user);
		setRuleCheckAsUser(rule->qual, check_as_user);

		if (numlocks >= maxlocks)
		{
			maxlocks *= 2;
			rules = (RewriteRule **)
				repalloc(rules, sizeof(RewriteRule *) * maxlocks);
		}
		rules[numlocks++] = rule;
	}

	/*
	 * end the scan and close the attribute relation
	 */
	systable_endscan(rewrite_scan);
	table_close(rewrite_desc, AccessShareLock);

	/*
	 * there might not be any rules (if relhasrules is out-of-date)
	 */
	if (numlocks == 0)
	{
		relation->rd_rules = NULL;
		relation->rd_rulescxt = NULL;
		MemoryContextDelete(rulescxt);
		return;
	}

	/*
	 * form a RuleLock and insert into relation
	 */
	rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
	rulelock->numLocks = numlocks;
	rulelock->rules = rules;

	relation->rd_rules = rulelock;
}

/*
 *		equalRuleLocks
 *
 *		Determine whether two RuleLocks are equivalent
 *
 *		Probably this should be in the rules code someplace...
 */
static bool
equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
{
	int			i;

	/*
	 * As of 7.3 we assume the rule ordering is repeatable, because
	 * RelationBuildRuleLock should read 'em in a consistent order.  So just
	 * compare corresponding slots.
	 */
	if (rlock1 != NULL)
	{
		if (rlock2 == NULL)
			return false;
		if (rlock1->numLocks != rlock2->numLocks)
			return false;
		for (i = 0; i < rlock1->numLocks; i++)
		{
			RewriteRule *rule1 = rlock1->rules[i];
			RewriteRule *rule2 = rlock2->rules[i];

			if (rule1->ruleId != rule2->ruleId)
				return false;
			if (rule1->event != rule2->event)
				return false;
			if (rule1->enabled != rule2->enabled)
				return false;
			if (rule1->isInstead != rule2->isInstead)
				return false;
			if (!equal(rule1->qual, rule2->qual))
				return false;
			if (!equal(rule1->actions, rule2->actions))
				return false;
		}
	}
	else if (rlock2 != NULL)
		return false;
	return true;
}

/*
 *		equalPolicy
 *
 *		Determine whether two policies are equivalent
 */
static bool
equalPolicy(RowSecurityPolicy *policy1, RowSecurityPolicy *policy2)
{
	int			i;
	Oid		   *r1,
			   *r2;

	if (policy1 != NULL)
	{
		if (policy2 == NULL)
			return false;

		if (policy1->polcmd != policy2->polcmd)
			return false;
		if (policy1->hassublinks != policy2->hassublinks)
			return false;
		if (strcmp(policy1->policy_name, policy2->policy_name) != 0)
			return false;
		if (ARR_DIMS(policy1->roles)[0] != ARR_DIMS(policy2->roles)[0])
			return false;

		r1 = (Oid *) ARR_DATA_PTR(policy1->roles);
		r2 = (Oid *) ARR_DATA_PTR(policy2->roles);

		for (i = 0; i < ARR_DIMS(policy1->roles)[0]; i++)
		{
			if (r1[i] != r2[i])
				return false;
		}

		if (!equal(policy1->qual, policy2->qual))
			return false;
		if (!equal(policy1->with_check_qual, policy2->with_check_qual))
			return false;
	}
	else if (policy2 != NULL)
		return false;

	return true;
}

/*
 *		equalRSDesc
 *
 *		Determine whether two RowSecurityDesc's are equivalent
 */
static bool
equalRSDesc(RowSecurityDesc *rsdesc1, RowSecurityDesc *rsdesc2)
{
	ListCell   *lc,
			   *rc;

	if (rsdesc1 == NULL && rsdesc2 == NULL)
		return true;

	if ((rsdesc1 != NULL && rsdesc2 == NULL) ||
		(rsdesc1 == NULL && rsdesc2 != NULL))
		return false;

	if (list_length(rsdesc1->policies) != list_length(rsdesc2->policies))
		return false;

	/* RelationBuildRowSecurity should build policies in order */
	forboth(lc, rsdesc1->policies, rc, rsdesc2->policies)
	{
		RowSecurityPolicy *l = (RowSecurityPolicy *) lfirst(lc);
		RowSecurityPolicy *r = (RowSecurityPolicy *) lfirst(rc);

		if (!equalPolicy(l, r))
			return false;
	}

	return true;
}

/*
 *		RelationBuildDesc
 *
 *		Build a relation descriptor.  The caller must hold at least
 *		AccessShareLock on the target relid.
 *
 *		The new descriptor is inserted into the hash table if insertIt is true.
 *
 *		Returns NULL if no pg_class row could be found for the given relid
 *		(suggesting we are trying to access a just-deleted relation).
 *		Any other error is reported via elog.
 */
static Relation
RelationBuildDesc(Oid targetRelId, bool insertIt)
{
	int			in_progress_offset;
	Relation	relation;
	Oid			relid;
	HeapTuple	pg_class_tuple;
	Form_pg_class relp;

	/*
	 * This function and its subroutines can allocate a good deal of transient
	 * data in CurrentMemoryContext.  Traditionally we've just leaked that
	 * data, reasoning that the caller's context is at worst of transaction
	 * scope, and relcache loads shouldn't happen so often that it's essential
	 * to recover transient data before end of statement/transaction.  However
	 * that's definitely not true when debug_discard_caches is active, and
	 * perhaps it's not true in other cases.
	 *
	 * When debug_discard_caches is active or when forced to by
	 * RECOVER_RELATION_BUILD_MEMORY=1, arrange to allocate the junk in a
	 * temporary context that we'll free before returning.  Make it a child of
	 * caller's context so that it will get cleaned up appropriately if we
	 * error out partway through.
	 */
#ifdef MAYBE_RECOVER_RELATION_BUILD_MEMORY
	MemoryContext tmpcxt = NULL;
	MemoryContext oldcxt = NULL;

	if (RECOVER_RELATION_BUILD_MEMORY || debug_discard_caches > 0)
	{
		tmpcxt = AllocSetContextCreate(CurrentMemoryContext,
									   "RelationBuildDesc workspace",
									   ALLOCSET_DEFAULT_SIZES);
		oldcxt = MemoryContextSwitchTo(tmpcxt);
	}
#endif

	/* Register to catch invalidation messages */
	if (in_progress_list_len >= in_progress_list_maxlen)
	{
		int			allocsize;

		allocsize = in_progress_list_maxlen * 2;
		in_progress_list = repalloc(in_progress_list,
									allocsize * sizeof(*in_progress_list));
		in_progress_list_maxlen = allocsize;
	}
	in_progress_offset = in_progress_list_len++;
	in_progress_list[in_progress_offset].reloid = targetRelId;
retry:
	in_progress_list[in_progress_offset].invalidated = false;

	/*
	 * find the tuple in pg_class corresponding to the given relation id
	 */
	pg_class_tuple = ScanPgRelation(targetRelId, true, false);

	/*
	 * if no such tuple exists, return NULL
	 */
	if (!HeapTupleIsValid(pg_class_tuple))
	{
#ifdef MAYBE_RECOVER_RELATION_BUILD_MEMORY
		if (tmpcxt)
		{
			/* Return to caller's context, and blow away the temporary context */
			MemoryContextSwitchTo(oldcxt);
			MemoryContextDelete(tmpcxt);
		}
#endif
		Assert(in_progress_offset + 1 == in_progress_list_len);
		in_progress_list_len--;
		return NULL;
	}

	/*
	 * get information from the pg_class_tuple
	 */
	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
	relid = relp->oid;
	Assert(relid == targetRelId);

	/*
	 * allocate storage for the relation descriptor, and copy pg_class_tuple
	 * to relation->rd_rel.
	 */
	relation = AllocateRelationDesc(relp);

	/*
	 * initialize the relation's relation id (relation->rd_id)
	 */
	RelationGetRelid(relation) = relid;

	/*
	 * Normal relations are not nailed into the cache.  Since we don't flush
	 * new relations, it won't be new.  It could be temp though.
	 */
	relation->rd_refcnt = 0;
	relation->rd_isnailed = false;
	relation->rd_createSubid = InvalidSubTransactionId;
	relation->rd_newRelfilelocatorSubid = InvalidSubTransactionId;
	relation->rd_firstRelfilelocatorSubid = InvalidSubTransactionId;
	relation->rd_droppedSubid = InvalidSubTransactionId;
	switch (relation->rd_rel->relpersistence)
	{
		case RELPERSISTENCE_UNLOGGED:
		case RELPERSISTENCE_PERMANENT:
			relation->rd_backend = INVALID_PROC_NUMBER;
			relation->rd_islocaltemp = false;
			break;
		case RELPERSISTENCE_TEMP:
			if (isTempOrTempToastNamespace(relation->rd_rel->relnamespace))
			{
				relation->rd_backend = ProcNumberForTempRelations();
				relation->rd_islocaltemp = true;
			}
			else
			{
				/*
				 * If it's a temp table, but not one of ours, we have to use
				 * the slow, grotty method to figure out the owning backend.
				 *
				 * Note: it's possible that rd_backend gets set to
				 * MyProcNumber here, in case we are looking at a pg_class
				 * entry left over from a crashed backend that coincidentally
				 * had the same ProcNumber we're using.  We should *not*
				 * consider such a table to be "ours"; this is why we need the
				 * separate rd_islocaltemp flag.  The pg_class entry will get
				 * flushed if/when we clean out the corresponding temp table
				 * namespace in preparation for using it.
				 */
				relation->rd_backend =
					GetTempNamespaceProcNumber(relation->rd_rel->relnamespace);
				Assert(relation->rd_backend != INVALID_PROC_NUMBER);
				relation->rd_islocaltemp = false;
			}
			break;
		default:
			elog(ERROR, "invalid relpersistence: %c",
				 relation->rd_rel->relpersistence);
			break;
	}

	/*
	 * initialize the tuple descriptor (relation->rd_att).
	 */
	RelationBuildTupleDesc(relation);

	/* foreign key data is not loaded till asked for */
	relation->rd_fkeylist = NIL;
	relation->rd_fkeyvalid = false;

	/* partitioning data is not loaded till asked for */
	relation->rd_partkey = NULL;
	relation->rd_partkeycxt = NULL;
	relation->rd_partdesc = NULL;
	relation->rd_partdesc_nodetached = NULL;
	relation->rd_partdesc_nodetached_xmin = InvalidTransactionId;
	relation->rd_pdcxt = NULL;
	relation->rd_pddcxt = NULL;
	relation->rd_partcheck = NIL;
	relation->rd_partcheckvalid = false;
	relation->rd_partcheckcxt = NULL;

	/*
	 * initialize access method information
	 */
	if (relation->rd_rel->relkind == RELKIND_INDEX ||
		relation->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
		RelationInitIndexAccessInfo(relation);
	else if (RELKIND_HAS_TABLE_AM(relation->rd_rel->relkind) ||
			 relation->rd_rel->relkind == RELKIND_SEQUENCE)
		RelationInitTableAccessMethod(relation);
	else if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
	{
		/*
		 * Do nothing: access methods are a setting that partitions can
		 * inherit.
		 */
	}
	else
		Assert(relation->rd_rel->relam == InvalidOid);

	/* extract reloptions if any */
	RelationParseRelOptions(relation, pg_class_tuple);

	/*
	 * Fetch rules and triggers that affect this relation.
	 *
	 * Note that RelationBuildRuleLock() relies on this being done after
	 * extracting the relation's reloptions.
	 */
	if (relation->rd_rel->relhasrules)
		RelationBuildRuleLock(relation);
	else
	{
		relation->rd_rules = NULL;
		relation->rd_rulescxt = NULL;
	}

	if (relation->rd_rel->relhastriggers)
		RelationBuildTriggers(relation);
	else
		relation->trigdesc = NULL;

	if (relation->rd_rel->relrowsecurity)
		RelationBuildRowSecurity(relation);
	else
		relation->rd_rsdesc = NULL;

	/*
	 * initialize the relation lock manager information
	 */
	RelationInitLockInfo(relation); /* see lmgr.c */

	/*
	 * initialize physical addressing information for the relation
	 */
	RelationInitPhysicalAddr(relation);

	/* make sure relation is marked as having no open file yet */
	relation->rd_smgr = NULL;

	/*
	 * now we can free the memory allocated for pg_class_tuple
	 */
	heap_freetuple(pg_class_tuple);

	/*
	 * If an invalidation arrived mid-build, start over.  Between here and the
	 * end of this function, don't add code that does or reasonably could read
	 * system catalogs.  That range must be free from invalidation processing
	 * for the !insertIt case.  For the insertIt case, RelationCacheInsert()
	 * will enroll this relation in ordinary relcache invalidation processing,
	 */
	if (in_progress_list[in_progress_offset].invalidated)
	{
		RelationDestroyRelation(relation, false);
		goto retry;
	}
	Assert(in_progress_offset + 1 == in_progress_list_len);
	in_progress_list_len--;

	/*
	 * Insert newly created relation into relcache hash table, if requested.
	 *
	 * There is one scenario in which we might find a hashtable entry already
	 * present, even though our caller failed to find it: if the relation is a
	 * system catalog or index that's used during relcache load, we might have
	 * recursively created the same relcache entry during the preceding steps.
	 * So allow RelationCacheInsert to delete any already-present relcache
	 * entry for the same OID.  The already-present entry should have refcount
	 * zero (else somebody forgot to close it); in the event that it doesn't,
	 * we'll elog a WARNING and leak the already-present entry.
	 */
	if (insertIt)
		RelationCacheInsert(relation, true);

	/* It's fully valid */
	relation->rd_isvalid = true;

#ifdef MAYBE_RECOVER_RELATION_BUILD_MEMORY
	if (tmpcxt)
	{
		/* Return to caller's context, and blow away the temporary context */
		MemoryContextSwitchTo(oldcxt);
		MemoryContextDelete(tmpcxt);
	}
#endif

	return relation;
}

/*
 * Initialize the physical addressing info (RelFileLocator) for a relcache entry
 *
 * Note: at the physical level, relations in the pg_global tablespace must
 * be treated as shared, even if relisshared isn't set.  Hence we do not
 * look at relisshared here.
 */
static void
RelationInitPhysicalAddr(Relation relation)
{
	RelFileNumber oldnumber = relation->rd_locator.relNumber;

	/* these relations kinds never have storage */
	if (!RELKIND_HAS_STORAGE(relation->rd_rel->relkind))
		return;

	if (relation->rd_rel->reltablespace)
		relation->rd_locator.spcOid = relation->rd_rel->reltablespace;
	else
		relation->rd_locator.spcOid = MyDatabaseTableSpace;
	if (relation->rd_locator.spcOid == GLOBALTABLESPACE_OID)
		relation->rd_locator.dbOid = InvalidOid;
	else
		relation->rd_locator.dbOid = MyDatabaseId;

	if (relation->rd_rel->relfilenode)
	{
		/*
		 * Even if we are using a decoding snapshot that doesn't represent the
		 * current state of the catalog we need to make sure the filenode
		 * points to the current file since the older file will be gone (or
		 * truncated). The new file will still contain older rows so lookups
		 * in them will work correctly. This wouldn't work correctly if
		 * rewrites were allowed to change the schema in an incompatible way,
		 * but those are prevented both on catalog tables and on user tables
		 * declared as additional catalog tables.
		 */
		if (HistoricSnapshotActive()
			&& RelationIsAccessibleInLogicalDecoding(relation)
			&& IsTransactionState())
		{
			HeapTuple	phys_tuple;
			Form_pg_class physrel;

			phys_tuple = ScanPgRelation(RelationGetRelid(relation),
										RelationGetRelid(relation) != ClassOidIndexId,
										true);
			if (!HeapTupleIsValid(phys_tuple))
				elog(ERROR, "could not find pg_class entry for %u",
					 RelationGetRelid(relation));
			physrel = (Form_pg_class) GETSTRUCT(phys_tuple);

			relation->rd_rel->reltablespace = physrel->reltablespace;
			relation->rd_rel->relfilenode = physrel->relfilenode;
			heap_freetuple(phys_tuple);
		}

		relation->rd_locator.relNumber = relation->rd_rel->relfilenode;
	}
	else
	{
		/* Consult the relation mapper */
		relation->rd_locator.relNumber =
			RelationMapOidToFilenumber(relation->rd_id,
									   relation->rd_rel->relisshared);
		if (!RelFileNumberIsValid(relation->rd_locator.relNumber))
			elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
				 RelationGetRelationName(relation), relation->rd_id);
	}

	/*
	 * For RelationNeedsWAL() to answer correctly on parallel workers, restore
	 * rd_firstRelfilelocatorSubid.  No subtransactions start or end while in
	 * parallel mode, so the specific SubTransactionId does not matter.
	 */
	if (IsParallelWorker() && oldnumber != relation->rd_locator.relNumber)
	{
		if (RelFileLocatorSkippingWAL(relation->rd_locator))
			relation->rd_firstRelfilelocatorSubid = TopSubTransactionId;
		else
			relation->rd_firstRelfilelocatorSubid = InvalidSubTransactionId;
	}
}

/*
 * Fill in the IndexAmRoutine for an index relation.
 *
 * relation's rd_amhandler and rd_indexcxt must be valid already.
 */
static void
InitIndexAmRoutine(Relation relation)
{
	IndexAmRoutine *cached,
			   *tmp;

	/*
	 * Call the amhandler in current, short-lived memory context, just in case
	 * it leaks anything (it probably won't, but let's be paranoid).
	 */
	tmp = GetIndexAmRoutine(relation->rd_amhandler);

	/* OK, now transfer the data into relation's rd_indexcxt. */
	cached = (IndexAmRoutine *) MemoryContextAlloc(relation->rd_indexcxt,
												   sizeof(IndexAmRoutine));
	memcpy(cached, tmp, sizeof(IndexAmRoutine));
	relation->rd_indam = cached;

	pfree(tmp);
}

/*
 * Initialize index-access-method support data for an index relation
 */
void
RelationInitIndexAccessInfo(Relation relation)
{
	HeapTuple	tuple;
	Form_pg_am	aform;
	Datum		indcollDatum;
	Datum		indclassDatum;
	Datum		indoptionDatum;
	bool		isnull;
	oidvector  *indcoll;
	oidvector  *indclass;
	int2vector *indoption;
	MemoryContext indexcxt;
	MemoryContext oldcontext;
	int			indnatts;
	int			indnkeyatts;
	uint16		amsupport;

	/*
	 * Make a copy of the pg_index entry for the index.  Since pg_index
	 * contains variable-length and possibly-null fields, we have to do this
	 * honestly rather than just treating it as a Form_pg_index struct.
	 */
	tuple = SearchSysCache1(INDEXRELID,
							ObjectIdGetDatum(RelationGetRelid(relation)));
	if (!HeapTupleIsValid(tuple))
		elog(ERROR, "cache lookup failed for index %u",
			 RelationGetRelid(relation));
	oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_indextuple = heap_copytuple(tuple);
	relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
	MemoryContextSwitchTo(oldcontext);
	ReleaseSysCache(tuple);

	/*
	 * Look up the index's access method, save the OID of its handler function
	 */
	Assert(relation->rd_rel->relam != InvalidOid);
	tuple = SearchSysCache1(AMOID, ObjectIdGetDatum(relation->rd_rel->relam));
	if (!HeapTupleIsValid(tuple))
		elog(ERROR, "cache lookup failed for access method %u",
			 relation->rd_rel->relam);
	aform = (Form_pg_am) GETSTRUCT(tuple);
	relation->rd_amhandler = aform->amhandler;
	ReleaseSysCache(tuple);

	indnatts = RelationGetNumberOfAttributes(relation);
	if (indnatts != IndexRelationGetNumberOfAttributes(relation))
		elog(ERROR, "relnatts disagrees with indnatts for index %u",
			 RelationGetRelid(relation));
	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(relation);

	/*
	 * Make the private context to hold index access info.  The reason we need
	 * a context, and not just a couple of pallocs, is so that we won't leak
	 * any subsidiary info attached to fmgr lookup records.
	 */
	indexcxt = AllocSetContextCreate(CacheMemoryContext,
									 "index info",
									 ALLOCSET_SMALL_SIZES);
	relation->rd_indexcxt = indexcxt;
	MemoryContextCopyAndSetIdentifier(indexcxt,
									  RelationGetRelationName(relation));

	/*
	 * Now we can fetch the index AM's API struct
	 */
	InitIndexAmRoutine(relation);

	/*
	 * Allocate arrays to hold data. Opclasses are not used for included
	 * columns, so allocate them for indnkeyatts only.
	 */
	relation->rd_opfamily = (Oid *)
		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
	relation->rd_opcintype = (Oid *)
		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));

	amsupport = relation->rd_indam->amsupport;
	if (amsupport > 0)
	{
		int			nsupport = indnatts * amsupport;

		relation->rd_support = (RegProcedure *)
			MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
		relation->rd_supportinfo = (FmgrInfo *)
			MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
	}
	else
	{
		relation->rd_support = NULL;
		relation->rd_supportinfo = NULL;
	}

	relation->rd_indcollation = (Oid *)
		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));

	relation->rd_indoption = (int16 *)
		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(int16));

	/*
	 * indcollation cannot be referenced directly through the C struct,
	 * because it comes after the variable-width indkey field.  Must extract
	 * the datum the hard way...
	 */
	indcollDatum = fastgetattr(relation->rd_indextuple,
							   Anum_pg_index_indcollation,
							   GetPgIndexDescriptor(),
							   &isnull);
	Assert(!isnull);
	indcoll = (oidvector *) DatumGetPointer(indcollDatum);
	memcpy(relation->rd_indcollation, indcoll->values, indnkeyatts * sizeof(Oid));

	/*
	 * indclass cannot be referenced directly through the C struct, because it
	 * comes after the variable-width indkey field.  Must extract the datum
	 * the hard way...
	 */
	indclassDatum = fastgetattr(relation->rd_indextuple,
								Anum_pg_index_indclass,
								GetPgIndexDescriptor(),
								&isnull);
	Assert(!isnull);
	indclass = (oidvector *) DatumGetPointer(indclassDatum);

	/*
	 * Fill the support procedure OID array, as well as the info about
	 * opfamilies and opclass input types.  (aminfo and supportinfo are left
	 * as zeroes, and are filled on-the-fly when used)
	 */
	IndexSupportInitialize(indclass, relation->rd_support,
						   relation->rd_opfamily, relation->rd_opcintype,
						   amsupport, indnkeyatts);

	/*
	 * Similarly extract indoption and copy it to the cache entry
	 */
	indoptionDatum = fastgetattr(relation->rd_indextuple,
								 Anum_pg_index_indoption,
								 GetPgIndexDescriptor(),
								 &isnull);
	Assert(!isnull);
	indoption = (int2vector *) DatumGetPointer(indoptionDatum);
	memcpy(relation->rd_indoption, indoption->values, indnkeyatts * sizeof(int16));

	(void) RelationGetIndexAttOptions(relation, false);

	/*
	 * expressions, predicate, exclusion caches will be filled later
	 */
	relation->rd_indexprs = NIL;
	relation->rd_indpred = NIL;
	relation->rd_exclops = NULL;
	relation->rd_exclprocs = NULL;
	relation->rd_exclstrats = NULL;
	relation->rd_amcache = NULL;
}

/*
 * IndexSupportInitialize
 *		Initializes an index's cached opclass information,
 *		given the index's pg_index.indclass entry.
 *
 * Data is returned into *indexSupport, *opFamily, and *opcInType,
 * which are arrays allocated by the caller.
 *
 * The caller also passes maxSupportNumber and maxAttributeNumber, since these
 * indicate the size of the arrays it has allocated --- but in practice these
 * numbers must always match those obtainable from the system catalog entries
 * for the index and access method.
 */
static void
IndexSupportInitialize(oidvector *indclass,
					   RegProcedure *indexSupport,
					   Oid *opFamily,
					   Oid *opcInType,
					   StrategyNumber maxSupportNumber,
					   AttrNumber maxAttributeNumber)
{
	int			attIndex;

	for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
	{
		OpClassCacheEnt *opcentry;

		if (!OidIsValid(indclass->values[attIndex]))
			elog(ERROR, "bogus pg_index tuple");

		/* look up the info for this opclass, using a cache */
		opcentry = LookupOpclassInfo(indclass->values[attIndex],
									 maxSupportNumber);

		/* copy cached data into relcache entry */
		opFamily[attIndex] = opcentry->opcfamily;
		opcInType[attIndex] = opcentry->opcintype;
		if (maxSupportNumber > 0)
			memcpy(&indexSupport[attIndex * maxSupportNumber],
				   opcentry->supportProcs,
				   maxSupportNumber * sizeof(RegProcedure));
	}
}

/*
 * LookupOpclassInfo
 *
 * This routine maintains a per-opclass cache of the information needed
 * by IndexSupportInitialize().  This is more efficient than relying on
 * the catalog cache, because we can load all the info about a particular
 * opclass in a single indexscan of pg_amproc.
 *
 * The information from pg_am about expected range of support function
 * numbers is passed in, rather than being looked up, mainly because the
 * caller will have it already.
 *
 * Note there is no provision for flushing the cache.  This is OK at the
 * moment because there is no way to ALTER any interesting properties of an
 * existing opclass --- all you can do is drop it, which will result in
 * a useless but harmless dead entry in the cache.  To support altering
 * opclass membership (not the same as opfamily membership!), we'd need to
 * be able to flush this cache as well as the contents of relcache entries
 * for indexes.
 */
static OpClassCacheEnt *
LookupOpclassInfo(Oid operatorClassOid,
				  StrategyNumber numSupport)
{
	OpClassCacheEnt *opcentry;
	bool		found;
	Relation	rel;
	SysScanDesc scan;
	ScanKeyData skey[3];
	HeapTuple	htup;
	bool		indexOK;

	if (OpClassCache == NULL)
	{
		/* First time through: initialize the opclass cache */
		HASHCTL		ctl;

		/* Also make sure CacheMemoryContext exists */
		if (!CacheMemoryContext)
			CreateCacheMemoryContext();

		ctl.keysize = sizeof(Oid);
		ctl.entrysize = sizeof(OpClassCacheEnt);
		OpClassCache = hash_create("Operator class cache", 64,
								   &ctl, HASH_ELEM | HASH_BLOBS);
	}

	opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
											   &operatorClassOid,
											   HASH_ENTER, &found);

	if (!found)
	{
		/* Initialize new entry */
		opcentry->valid = false;	/* until known OK */
		opcentry->numSupport = numSupport;
		opcentry->supportProcs = NULL;	/* filled below */
	}
	else
	{
		Assert(numSupport == opcentry->numSupport);
	}

	/*
	 * When aggressively testing cache-flush hazards, we disable the operator
	 * class cache and force reloading of the info on each call.  This models
	 * no real-world behavior, since the cache entries are never invalidated
	 * otherwise.  However it can be helpful for detecting bugs in the cache
	 * loading logic itself, such as reliance on a non-nailed index.  Given
	 * the limited use-case and the fact that this adds a great deal of
	 * expense, we enable it only for high values of debug_discard_caches.
	 */
#ifdef DISCARD_CACHES_ENABLED
	if (debug_discard_caches > 2)
		opcentry->valid = false;
#endif

	if (opcentry->valid)
		return opcentry;

	/*
	 * Need to fill in new entry.  First allocate space, unless we already did
	 * so in some previous attempt.
	 */
	if (opcentry->supportProcs == NULL && numSupport > 0)
		opcentry->supportProcs = (RegProcedure *)
			MemoryContextAllocZero(CacheMemoryContext,
								   numSupport * sizeof(RegProcedure));

	/*
	 * To avoid infinite recursion during startup, force heap scans if we're
	 * looking up info for the opclasses used by the indexes we would like to
	 * reference here.
	 */
	indexOK = criticalRelcachesBuilt ||
		(operatorClassOid != OID_BTREE_OPS_OID &&
		 operatorClassOid != INT2_BTREE_OPS_OID);

	/*
	 * We have to fetch the pg_opclass row to determine its opfamily and
	 * opcintype, which are needed to look up related operators and functions.
	 * It'd be convenient to use the syscache here, but that probably doesn't
	 * work while bootstrapping.
	 */
	ScanKeyInit(&skey[0],
				Anum_pg_opclass_oid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(operatorClassOid));
	rel = table_open(OperatorClassRelationId, AccessShareLock);
	scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
							  NULL, 1, skey);

	if (HeapTupleIsValid(htup = systable_getnext(scan)))
	{
		Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);

		opcentry->opcfamily = opclassform->opcfamily;
		opcentry->opcintype = opclassform->opcintype;
	}
	else
		elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);

	systable_endscan(scan);
	table_close(rel, AccessShareLock);

	/*
	 * Scan pg_amproc to obtain support procs for the opclass.  We only fetch
	 * the default ones (those with lefttype = righttype = opcintype).
	 */
	if (numSupport > 0)
	{
		ScanKeyInit(&skey[0],
					Anum_pg_amproc_amprocfamily,
					BTEqualStrategyNumber, F_OIDEQ,
					ObjectIdGetDatum(opcentry->opcfamily));
		ScanKeyInit(&skey[1],
					Anum_pg_amproc_amproclefttype,
					BTEqualStrategyNumber, F_OIDEQ,
					ObjectIdGetDatum(opcentry->opcintype));
		ScanKeyInit(&skey[2],
					Anum_pg_amproc_amprocrighttype,
					BTEqualStrategyNumber, F_OIDEQ,
					ObjectIdGetDatum(opcentry->opcintype));
		rel = table_open(AccessMethodProcedureRelationId, AccessShareLock);
		scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
								  NULL, 3, skey);

		while (HeapTupleIsValid(htup = systable_getnext(scan)))
		{
			Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);

			if (amprocform->amprocnum <= 0 ||
				(StrategyNumber) amprocform->amprocnum > numSupport)
				elog(ERROR, "invalid amproc number %d for opclass %u",
					 amprocform->amprocnum, operatorClassOid);

			opcentry->supportProcs[amprocform->amprocnum - 1] =
				amprocform->amproc;
		}

		systable_endscan(scan);
		table_close(rel, AccessShareLock);
	}

	opcentry->valid = true;
	return opcentry;
}

/*
 * Fill in the TableAmRoutine for a relation
 *
 * relation's rd_amhandler must be valid already.
 */
static void
InitTableAmRoutine(Relation relation)
{
	relation->rd_tableam = GetTableAmRoutine(relation->rd_amhandler);
}

/*
 * Initialize table access method support for a table like relation
 */
void
RelationInitTableAccessMethod(Relation relation)
{
	HeapTuple	tuple;
	Form_pg_am	aform;

	if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
	{
		/*
		 * Sequences are currently accessed like heap tables, but it doesn't
		 * seem prudent to show that in the catalog. So just overwrite it
		 * here.
		 */
		Assert(relation->rd_rel->relam == InvalidOid);
		relation->rd_amhandler = F_HEAP_TABLEAM_HANDLER;
	}
	else if (IsCatalogRelation(relation))
	{
		/*
		 * Avoid doing a syscache lookup for catalog tables.
		 */
		Assert(relation->rd_rel->relam == HEAP_TABLE_AM_OID);
		relation->rd_amhandler = F_HEAP_TABLEAM_HANDLER;
	}
	else
	{
		/*
		 * Look up the table access method, save the OID of its handler
		 * function.
		 */
		Assert(relation->rd_rel->relam != InvalidOid);
		tuple = SearchSysCache1(AMOID,
								ObjectIdGetDatum(relation->rd_rel->relam));
		if (!HeapTupleIsValid(tuple))
			elog(ERROR, "cache lookup failed for access method %u",
				 relation->rd_rel->relam);
		aform = (Form_pg_am) GETSTRUCT(tuple);
		relation->rd_amhandler = aform->amhandler;
		ReleaseSysCache(tuple);
	}

	/*
	 * Now we can fetch the table AM's API struct
	 */
	InitTableAmRoutine(relation);
}

/*
 *		formrdesc
 *
 *		This is a special cut-down version of RelationBuildDesc(),
 *		used while initializing the relcache.
 *		The relation descriptor is built just from the supplied parameters,
 *		without actually looking at any system table entries.  We cheat
 *		quite a lot since we only need to work for a few basic system
 *		catalogs.
 *
 * The catalogs this is used for can't have constraints (except attnotnull),
 * default values, rules, or triggers, since we don't cope with any of that.
 * (Well, actually, this only matters for properties that need to be valid
 * during bootstrap or before RelationCacheInitializePhase3 runs, and none of
 * these properties matter then...)
 *
 * NOTE: we assume we are already switched into CacheMemoryContext.
 */
static void
formrdesc(const char *relationName, Oid relationReltype,
		  bool isshared,
		  int natts, const FormData_pg_attribute *attrs)
{
	Relation	relation;
	int			i;
	bool		has_not_null;

	/*
	 * allocate new relation desc, clear all fields of reldesc
	 */
	relation = (Relation) palloc0(sizeof(RelationData));

	/* make sure relation is marked as having no open file yet */
	relation->rd_smgr = NULL;

	/*
	 * initialize reference count: 1 because it is nailed in cache
	 */
	relation->rd_refcnt = 1;

	/*
	 * all entries built with this routine are nailed-in-cache; none are for
	 * new or temp relations.
	 */
	relation->rd_isnailed = true;
	relation->rd_createSubid = InvalidSubTransactionId;
	relation->rd_newRelfilelocatorSubid = InvalidSubTransactionId;
	relation->rd_firstRelfilelocatorSubid = InvalidSubTransactionId;
	relation->rd_droppedSubid = InvalidSubTransactionId;
	relation->rd_backend = INVALID_PROC_NUMBER;
	relation->rd_islocaltemp = false;

	/*
	 * initialize relation tuple form
	 *
	 * The data we insert here is pretty incomplete/bogus, but it'll serve to
	 * get us launched.  RelationCacheInitializePhase3() will read the real
	 * data from pg_class and replace what we've done here.  Note in
	 * particular that relowner is left as zero; this cues
	 * RelationCacheInitializePhase3 that the real data isn't there yet.
	 */
	relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);

	namestrcpy(&relation->rd_rel->relname, relationName);
	relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
	relation->rd_rel->reltype = relationReltype;

	/*
	 * It's important to distinguish between shared and non-shared relations,
	 * even at bootstrap time, to make sure we know where they are stored.
	 */
	relation->rd_rel->relisshared = isshared;
	if (isshared)
		relation->rd_rel->reltablespace = GLOBALTABLESPACE_OID;

	/* formrdesc is used only for permanent relations */
	relation->rd_rel->relpersistence = RELPERSISTENCE_PERMANENT;

	/* ... and they're always populated, too */
	relation->rd_rel->relispopulated = true;

	relation->rd_rel->relreplident = REPLICA_IDENTITY_NOTHING;
	relation->rd_rel->relpages = 0;
	relation->rd_rel->reltuples = -1;
	relation->rd_rel->relallvisible = 0;
	relation->rd_rel->relallfrozen = 0;
	relation->rd_rel->relkind = RELKIND_RELATION;
	relation->rd_rel->relnatts = (int16) natts;

	/*
	 * initialize attribute tuple form
	 *
	 * Unlike the case with the relation tuple, this data had better be right
	 * because it will never be replaced.  The data comes from
	 * src/include/catalog/ headers via genbki.pl.
	 */
	relation->rd_att = CreateTemplateTupleDesc(natts);
	relation->rd_att->tdrefcount = 1;	/* mark as refcounted */

	relation->rd_att->tdtypeid = relationReltype;
	relation->rd_att->tdtypmod = -1;	/* just to be sure */

	/*
	 * initialize tuple desc info
	 */
	has_not_null = false;
	for (i = 0; i < natts; i++)
	{
		memcpy(TupleDescAttr(relation->rd_att, i),
			   &attrs[i],
			   ATTRIBUTE_FIXED_PART_SIZE);
		has_not_null |= attrs[i].attnotnull;

		populate_compact_attribute(relation->rd_att, i);
	}

	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
	TupleDescCompactAttr(relation->rd_att, 0)->attcacheoff = 0;

	/* mark not-null status */
	if (has_not_null)
	{
		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

		constr->has_not_null = true;
		relation->rd_att->constr = constr;
	}

	/*
	 * initialize relation id from info in att array (my, this is ugly)
	 */
	RelationGetRelid(relation) = TupleDescAttr(relation->rd_att, 0)->attrelid;

	/*
	 * All relations made with formrdesc are mapped.  This is necessarily so
	 * because there is no other way to know what filenumber they currently
	 * have.  In bootstrap mode, add them to the initial relation mapper data,
	 * specifying that the initial filenumber is the same as the OID.
	 */
	relation->rd_rel->relfilenode = InvalidRelFileNumber;
	if (IsBootstrapProcessingMode())
		RelationMapUpdateMap(RelationGetRelid(relation),
							 RelationGetRelid(relation),
							 isshared, true);

	/*
	 * initialize the relation lock manager information
	 */
	RelationInitLockInfo(relation); /* see lmgr.c */

	/*
	 * initialize physical addressing information for the relation
	 */
	RelationInitPhysicalAddr(relation);

	/*
	 * initialize the table am handler
	 */
	relation->rd_rel->relam = HEAP_TABLE_AM_OID;
	relation->rd_tableam = GetHeapamTableAmRoutine();

	/*
	 * initialize the rel-has-index flag, using hardwired knowledge
	 */
	if (IsBootstrapProcessingMode())
	{
		/* In bootstrap mode, we have no indexes */
		relation->rd_rel->relhasindex = false;
	}
	else
	{
		/* Otherwise, all the rels formrdesc is used for have indexes */
		relation->rd_rel->relhasindex = true;
	}

	/*
	 * add new reldesc to relcache
	 */
	RelationCacheInsert(relation, false);

	/* It's fully valid */
	relation->rd_isvalid = true;
}

#ifdef USE_ASSERT_CHECKING
/*
 *		AssertCouldGetRelation
 *
 *		Check safety of calling RelationIdGetRelation().
 *
 *		In code that reads catalogs in the event of a cache miss, call this
 *		before checking the cache.
 */
void
AssertCouldGetRelation(void)
{
	Assert(IsTransactionState());
	AssertBufferLocksPermitCatalogRead();
}
#endif


/* ----------------------------------------------------------------
 *				 Relation Descriptor Lookup Interface
 * ----------------------------------------------------------------
 */

/*
 *		RelationIdGetRelation
 *
 *		Lookup a reldesc by OID; make one if not already in cache.
 *
 *		Returns NULL if no pg_class row could be found for the given relid
 *		(suggesting we are trying to access a just-deleted relation).
 *		Any other error is reported via elog.
 *
 *		NB: caller should already have at least AccessShareLock on the
 *		relation ID, else there are nasty race conditions.
 *
 *		NB: relation ref count is incremented, or set to 1 if new entry.
 *		Caller should eventually decrement count.  (Usually,
 *		that happens by calling RelationClose().)
 */
Relation
RelationIdGetRelation(Oid relationId)
{
	Relation	rd;

	AssertCouldGetRelation();

	/*
	 * first try to find reldesc in the cache
	 */
	RelationIdCacheLookup(relationId, rd);

	if (RelationIsValid(rd))
	{
		/* return NULL for dropped relations */
		if (rd->rd_droppedSubid != InvalidSubTransactionId)
		{
			Assert(!rd->rd_isvalid);
			return NULL;
		}

		RelationIncrementReferenceCount(rd);
		/* revalidate cache entry if necessary */
		if (!rd->rd_isvalid)
		{
			RelationRebuildRelation(rd);

			/*
			 * Normally entries need to be valid here, but before the relcache
			 * has been initialized, not enough infrastructure exists to
			 * perform pg_class lookups. The structure of such entries doesn't
			 * change, but we still want to update the rd_rel entry. So
			 * rd_isvalid = false is left in place for a later lookup.
			 */
			Assert(rd->rd_isvalid ||
				   (rd->rd_isnailed && !criticalRelcachesBuilt));
		}
		return rd;
	}

	/*
	 * no reldesc in the cache, so have RelationBuildDesc() build one and add
	 * it.
	 */
	rd = RelationBuildDesc(relationId, true);
	if (RelationIsValid(rd))
		RelationIncrementReferenceCount(rd);
	return rd;
}

/* ----------------------------------------------------------------
 *				cache invalidation support routines
 * ----------------------------------------------------------------
 */

/* ResourceOwner callbacks to track relcache references */
static void ResOwnerReleaseRelation(Datum res);
static char *ResOwnerPrintRelCache(Datum res);

static const ResourceOwnerDesc relref_resowner_desc =
{
	.name = "relcache reference",
	.release_phase = RESOURCE_RELEASE_BEFORE_LOCKS,
	.release_priority = RELEASE_PRIO_RELCACHE_REFS,
	.ReleaseResource = ResOwnerReleaseRelation,
	.DebugPrint = ResOwnerPrintRelCache
};

/* Convenience wrappers over ResourceOwnerRemember/Forget */
static inline void
ResourceOwnerRememberRelationRef(ResourceOwner owner, Relation rel)
{
	ResourceOwnerRemember(owner, PointerGetDatum(rel), &relref_resowner_desc);
}
static inline void
ResourceOwnerForgetRelationRef(ResourceOwner owner, Relation rel)
{
	ResourceOwnerForget(owner, PointerGetDatum(rel), &relref_resowner_desc);
}

/*
 * RelationIncrementReferenceCount
 *		Increments relation reference count.
 *
 * Note: bootstrap mode has its own weird ideas about relation refcount
 * behavior; we ought to fix it someday, but for now, just disable
 * reference count ownership tracking in bootstrap mode.
 */
void
RelationIncrementReferenceCount(Relation rel)
{
	ResourceOwnerEnlarge(CurrentResourceOwner);
	rel->rd_refcnt += 1;
	if (!IsBootstrapProcessingMode())
		ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
}

/*
 * RelationDecrementReferenceCount
 *		Decrements relation reference count.
 */
void
RelationDecrementReferenceCount(Relation rel)
{
	Assert(rel->rd_refcnt > 0);
	rel->rd_refcnt -= 1;
	if (!IsBootstrapProcessingMode())
		ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
}

/*
 * RelationClose - close an open relation
 *
 *	Actually, we just decrement the refcount.
 *
 *	NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
 *	will be freed as soon as their refcount goes to zero.  In combination
 *	with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
 *	to catch references to already-released relcache entries.  It slows
 *	things down quite a bit, however.
 */
void
RelationClose(Relation relation)
{
	/* Note: no locking manipulations needed */
	RelationDecrementReferenceCount(relation);

	RelationCloseCleanup(relation);
}

static void
RelationCloseCleanup(Relation relation)
{
	/*
	 * If the relation is no longer open in this session, we can clean up any
	 * stale partition descriptors it has.  This is unlikely, so check to see
	 * if there are child contexts before expending a call to mcxt.c.
	 */
	if (RelationHasReferenceCountZero(relation))
	{
		if (relation->rd_pdcxt != NULL &&
			relation->rd_pdcxt->firstchild != NULL)
			MemoryContextDeleteChildren(relation->rd_pdcxt);

		if (relation->rd_pddcxt != NULL &&
			relation->rd_pddcxt->firstchild != NULL)
			MemoryContextDeleteChildren(relation->rd_pddcxt);
	}

#ifdef RELCACHE_FORCE_RELEASE
	if (RelationHasReferenceCountZero(relation) &&
		relation->rd_createSubid == InvalidSubTransactionId &&
		relation->rd_firstRelfilelocatorSubid == InvalidSubTransactionId)
		RelationClearRelation(relation);
#endif
}

/*
 * RelationReloadIndexInfo - reload minimal information for an open index
 *
 *	This function is used only for indexes.  A relcache inval on an index
 *	can mean that its pg_class or pg_index row changed.  There are only
 *	very limited changes that are allowed to an existing index's schema,
 *	so we can update the relcache entry without a complete rebuild; which
 *	is fortunate because we can't rebuild an index entry that is "nailed"
 *	and/or in active use.  We support full replacement of the pg_class row,
 *	as well as updates of a few simple fields of the pg_index row.
 *
 *	We assume that at the time we are called, we have at least AccessShareLock
 *	on the target index.
 *
 *	If the target index is an index on pg_class or pg_index, we'd better have
 *	previously gotten at least AccessShareLock on its underlying catalog,
 *	else we are at risk of deadlock against someone trying to exclusive-lock
 *	the heap and index in that order.  This is ensured in current usage by
 *	only applying this to indexes being opened or having positive refcount.
 */
static void
RelationReloadIndexInfo(Relation relation)
{
	bool		indexOK;
	HeapTuple	pg_class_tuple;
	Form_pg_class relp;

	/* Should be called only for invalidated, live indexes */
	Assert((relation->rd_rel->relkind == RELKIND_INDEX ||
			relation->rd_rel->relkind == RELKIND_PARTITIONED_INDEX) &&
		   !relation->rd_isvalid &&
		   relation->rd_droppedSubid == InvalidSubTransactionId);

	/*
	 * If it's a shared index, we might be called before backend startup has
	 * finished selecting a database, in which case we have no way to read
	 * pg_class yet.  However, a shared index can never have any significant
	 * schema updates, so it's okay to mostly ignore the invalidation signal.
	 * Its physical relfilenumber might've changed, but that's all.  Update
	 * the physical relfilenumber, mark it valid and return without doing
	 * anything more.
	 */
	if (relation->rd_rel->relisshared && !criticalRelcachesBuilt)
	{
		RelationInitPhysicalAddr(relation);
		relation->rd_isvalid = true;
		return;
	}

	/*
	 * Read the pg_class row
	 *
	 * Don't try to use an indexscan of pg_class_oid_index to reload the info
	 * for pg_class_oid_index ...
	 */
	indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
	pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK, false);
	if (!HeapTupleIsValid(pg_class_tuple))
		elog(ERROR, "could not find pg_class tuple for index %u",
			 RelationGetRelid(relation));
	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
	memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
	/* Reload reloptions in case they changed */
	if (relation->rd_options)
		pfree(relation->rd_options);
	RelationParseRelOptions(relation, pg_class_tuple);
	/* done with pg_class tuple */
	heap_freetuple(pg_class_tuple);
	/* We must recalculate physical address in case it changed */
	RelationInitPhysicalAddr(relation);

	/*
	 * For a non-system index, there are fields of the pg_index row that are
	 * allowed to change, so re-read that row and update the relcache entry.
	 * Most of the info derived from pg_index (such as support function lookup
	 * info) cannot change, and indeed the whole point of this routine is to
	 * update the relcache entry without clobbering that data; so wholesale
	 * replacement is not appropriate.
	 */
	if (!IsSystemRelation(relation))
	{
		HeapTuple	tuple;
		Form_pg_index index;

		tuple = SearchSysCache1(INDEXRELID,
								ObjectIdGetDatum(RelationGetRelid(relation)));
		if (!HeapTupleIsValid(tuple))
			elog(ERROR, "cache lookup failed for index %u",
				 RelationGetRelid(relation));
		index = (Form_pg_index) GETSTRUCT(tuple);

		/*
		 * Basically, let's just copy all the bool fields.  There are one or
		 * two of these that can't actually change in the current code, but
		 * it's not worth it to track exactly which ones they are.  None of
		 * the array fields are allowed to change, though.
		 */
		relation->rd_index->indisunique = index->indisunique;
		relation->rd_index->indnullsnotdistinct = index->indnullsnotdistinct;
		relation->rd_index->indisprimary = index->indisprimary;
		relation->rd_index->indisexclusion = index->indisexclusion;
		relation->rd_index->indimmediate = index->indimmediate;
		relation->rd_index->indisclustered = index->indisclustered;
		relation->rd_index->indisvalid = index->indisvalid;
		relation->rd_index->indcheckxmin = index->indcheckxmin;
		relation->rd_index->indisready = index->indisready;
		relation->rd_index->indislive = index->indislive;
		relation->rd_index->indisreplident = index->indisreplident;

		/* Copy xmin too, as that is needed to make sense of indcheckxmin */
		HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
							   HeapTupleHeaderGetXmin(tuple->t_data));

		ReleaseSysCache(tuple);
	}

	/* Okay, now it's valid again */
	relation->rd_isvalid = true;
}

/*
 * RelationReloadNailed - reload minimal information for nailed relations.
 *
 * The structure of a nailed relation can never change (which is good, because
 * we rely on knowing their structure to be able to read catalog content). But
 * some parts, e.g. pg_class.relfrozenxid, are still important to have
 * accurate content for. Therefore those need to be reloaded after the arrival
 * of invalidations.
 */
static void
RelationReloadNailed(Relation relation)
{
	/* Should be called only for invalidated, nailed relations */
	Assert(!relation->rd_isvalid);
	Assert(relation->rd_isnailed);
	/* nailed indexes are handled by RelationReloadIndexInfo() */
	Assert(relation->rd_rel->relkind == RELKIND_RELATION);
	AssertCouldGetRelation();

	/*
	 * Redo RelationInitPhysicalAddr in case it is a mapped relation whose
	 * mapping changed.
	 */
	RelationInitPhysicalAddr(relation);

	/*
	 * Reload a non-index entry.  We can't easily do so if relcaches aren't
	 * yet built, but that's fine because at that stage the attributes that
	 * need to be current (like relfrozenxid) aren't yet accessed.  To ensure
	 * the entry will later be revalidated, we leave it in invalid state, but
	 * allow use (cf. RelationIdGetRelation()).
	 */
	if (criticalRelcachesBuilt)
	{
		HeapTuple	pg_class_tuple;
		Form_pg_class relp;

		/*
		 * NB: Mark the entry as valid before starting to scan, to avoid
		 * self-recursion when re-building pg_class.
		 */
		relation->rd_isvalid = true;

		pg_class_tuple = ScanPgRelation(RelationGetRelid(relation),
										true, false);
		relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
		memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
		heap_freetuple(pg_class_tuple);

		/*
		 * Again mark as valid, to protect against concurrently arriving
		 * invalidations.
		 */
		relation->rd_isvalid = true;
	}
}

/*
 * RelationDestroyRelation
 *
 *	Physically delete a relation cache entry and all subsidiary data.
 *	Caller must already have unhooked the entry from the hash table.
 */
static void
RelationDestroyRelation(Relation relation, bool remember_tupdesc)
{
	Assert(RelationHasReferenceCountZero(relation));

	/*
	 * Make sure smgr and lower levels close the relation's files, if they
	 * weren't closed already.  (This was probably done by caller, but let's
	 * just be real sure.)
	 */
	RelationCloseSmgr(relation);

	/* break mutual link with stats entry */
	pgstat_unlink_relation(relation);

	/*
	 * Free all the subsidiary data structures of the relcache entry, then the
	 * entry itself.
	 */
	if (relation->rd_rel)
		pfree(relation->rd_rel);
	/* can't use DecrTupleDescRefCount here */
	Assert(relation->rd_att->tdrefcount > 0);
	if (--relation->rd_att->tdrefcount == 0)
	{
		/*
		 * If we Rebuilt a relcache entry during a transaction then its
		 * possible we did that because the TupDesc changed as the result of
		 * an ALTER TABLE that ran at less than AccessExclusiveLock. It's
		 * possible someone copied that TupDesc, in which case the copy would
		 * point to free'd memory. So if we rebuild an entry we keep the
		 * TupDesc around until end of transaction, to be safe.
		 */
		if (remember_tupdesc)
			RememberToFreeTupleDescAtEOX(relation->rd_att);
		else
			FreeTupleDesc(relation->rd_att);
	}
	FreeTriggerDesc(relation->trigdesc);
	list_free_deep(relation->rd_fkeylist);
	list_free(relation->rd_indexlist);
	list_free(relation->rd_statlist);
	bms_free(relation->rd_keyattr);
	bms_free(relation->rd_pkattr);
	bms_free(relation->rd_idattr);
	bms_free(relation->rd_hotblockingattr);
	bms_free(relation->rd_summarizedattr);
	if (relation->rd_pubdesc)
		pfree(relation->rd_pubdesc);
	if (relation->rd_options)
		pfree(relation->rd_options);
	if (relation->rd_indextuple)
		pfree(relation->rd_indextuple);
	if (relation->rd_amcache)
		pfree(relation->rd_amcache);
	if (relation->rd_fdwroutine)
		pfree(relation->rd_fdwroutine);
	if (relation->rd_indexcxt)
		MemoryContextDelete(relation->rd_indexcxt);
	if (relation->rd_rulescxt)
		MemoryContextDelete(relation->rd_rulescxt);
	if (relation->rd_rsdesc)
		MemoryContextDelete(relation->rd_rsdesc->rscxt);
	if (relation->rd_partkeycxt)
		MemoryContextDelete(relation->rd_partkeycxt);
	if (relation->rd_pdcxt)
		MemoryContextDelete(relation->rd_pdcxt);
	if (relation->rd_pddcxt)
		MemoryContextDelete(relation->rd_pddcxt);
	if (relation->rd_partcheckcxt)
		MemoryContextDelete(relation->rd_partcheckcxt);
	pfree(relation);
}

/*
 * RelationInvalidateRelation - mark a relation cache entry as invalid
 *
 * An entry that's marked as invalid will be reloaded on next access.
 */
static void
RelationInvalidateRelation(Relation relation)
{
	/*
	 * Make sure smgr and lower levels close the relation's files, if they
	 * weren't closed already.  If the relation is not getting deleted, the
	 * next smgr access should reopen the files automatically.  This ensures
	 * that the low-level file access state is updated after, say, a vacuum
	 * truncation.
	 */
	RelationCloseSmgr(relation);

	/* Free AM cached data, if any */
	if (relation->rd_amcache)
		pfree(relation->rd_amcache);
	relation->rd_amcache = NULL;

	relation->rd_isvalid = false;
}

/*
 * RelationClearRelation - physically blow away a relation cache entry
 *
 * The caller must ensure that the entry is no longer needed, i.e. its
 * reference count is zero.  Also, the rel or its storage must not be created
 * in the current transaction (rd_createSubid and rd_firstRelfilelocatorSubid
 * must not be set).
 */
static void
RelationClearRelation(Relation relation)
{
	Assert(RelationHasReferenceCountZero(relation));
	Assert(!relation->rd_isnailed);

	/*
	 * Relations created in the same transaction must never be removed, see
	 * RelationFlushRelation.
	 */
	Assert(relation->rd_createSubid == InvalidSubTransactionId);
	Assert(relation->rd_firstRelfilelocatorSubid == InvalidSubTransactionId);
	Assert(relation->rd_droppedSubid == InvalidSubTransactionId);

	/* first mark it as invalid */
	RelationInvalidateRelation(relation);

	/* Remove it from the hash table */
	RelationCacheDelete(relation);

	/* And release storage */
	RelationDestroyRelation(relation, false);
}

/*
 * RelationRebuildRelation - rebuild a relation cache entry in place
 *
 * Reset and rebuild a relation cache entry from scratch (that is, from
 * catalog entries).  This is used when we are notified of a change to an open
 * relation (one with refcount > 0).  The entry is reconstructed without
 * moving the physical RelationData record, so that the refcount holder's
 * pointer is still valid.
 *
 * NB: when rebuilding, we'd better hold some lock on the relation, else the
 * catalog data we need to read could be changing under us.  Also, a rel to be
 * rebuilt had better have refcnt > 0.  This is because a sinval reset could
 * happen while we're accessing the catalogs, and the rel would get blown away
 * underneath us by RelationCacheInvalidate if it has zero refcnt.
 */
static void
RelationRebuildRelation(Relation relation)
{
	Assert(!RelationHasReferenceCountZero(relation));
	AssertCouldGetRelation();
	/* there is no reason to ever rebuild a dropped relation */
	Assert(relation->rd_droppedSubid == InvalidSubTransactionId);

	/* Close and mark it as invalid until we've finished the rebuild */
	RelationInvalidateRelation(relation);

	/*
	 * Indexes only have a limited number of possible schema changes, and we
	 * don't want to use the full-blown procedure because it's a headache for
	 * indexes that reload itself depends on.
	 *
	 * As an exception, use the full procedure if the index access info hasn't
	 * been initialized yet.  Index creation relies on that: it first builds
	 * the relcache entry with RelationBuildLocalRelation(), creates the
	 * pg_index tuple only after that, and then relies on
	 * CommandCounterIncrement to load the pg_index contents.
	 */
	if ((relation->rd_rel->relkind == RELKIND_INDEX ||
		 relation->rd_rel->relkind == RELKIND_PARTITIONED_INDEX) &&
		relation->rd_indexcxt != NULL)
	{
		RelationReloadIndexInfo(relation);
		return;
	}
	/* Nailed relations are handled separately. */
	else if (relation->rd_isnailed)
	{
		RelationReloadNailed(relation);
		return;
	}
	else
	{
		/*
		 * Our strategy for rebuilding an open relcache entry is to build a
		 * new entry from scratch, swap its contents with the old entry, and
		 * finally delete the new entry (along with any infrastructure swapped
		 * over from the old entry).  This is to avoid trouble in case an
		 * error causes us to lose control partway through.  The old entry
		 * will still be marked !rd_isvalid, so we'll try to rebuild it again
		 * on next access.  Meanwhile it's not any less valid than it was
		 * before, so any code that might expect to continue accessing it
		 * isn't hurt by the rebuild failure.  (Consider for example a
		 * subtransaction that ALTERs a table and then gets canceled partway
		 * through the cache entry rebuild.  The outer transaction should
		 * still see the not-modified cache entry as valid.)  The worst
		 * consequence of an error is leaking the necessarily-unreferenced new
		 * entry, and this shouldn't happen often enough for that to be a big
		 * problem.
		 *
		 * When rebuilding an open relcache entry, we must preserve ref count,
		 * rd_*Subid, and rd_toastoid state.  Also attempt to preserve the
		 * pg_class entry (rd_rel), tupledesc, rewrite-rule, partition key,
		 * and partition descriptor substructures in place, because various
		 * places assume that these structures won't move while they are
		 * working with an open relcache entry.  (Note:  the refcount
		 * mechanism for tupledescs might someday allow us to remove this hack
		 * for the tupledesc.)
		 *
		 * Note that this process does not touch CurrentResourceOwner; which
		 * is good because whatever ref counts the entry may have do not
		 * necessarily belong to that resource owner.
		 */
		Relation	newrel;
		Oid			save_relid = RelationGetRelid(relation);
		bool		keep_tupdesc;
		bool		keep_rules;
		bool		keep_policies;
		bool		keep_partkey;

		/* Build temporary entry, but don't link it into hashtable */
		newrel = RelationBuildDesc(save_relid, false);

		/*
		 * Between here and the end of the swap, don't add code that does or
		 * reasonably could read system catalogs.  That range must be free
		 * from invalidation processing.  See RelationBuildDesc() manipulation
		 * of in_progress_list.
		 */

		if (newrel == NULL)
		{
			/*
			 * We can validly get here, if we're using a historic snapshot in
			 * which a relation, accessed from outside logical decoding, is
			 * still invisible. In that case it's fine to just mark the
			 * relation as invalid and return - it'll fully get reloaded by
			 * the cache reset at the end of logical decoding (or at the next
			 * access).  During normal processing we don't want to ignore this
			 * case as it shouldn't happen there, as explained below.
			 */
			if (HistoricSnapshotActive())
				return;

			/*
			 * This shouldn't happen as dropping a relation is intended to be
			 * impossible if still referenced (cf. CheckTableNotInUse()). But
			 * if we get here anyway, we can't just delete the relcache entry,
			 * as it possibly could get accessed later (as e.g. the error
			 * might get trapped and handled via a subtransaction rollback).
			 */
			elog(ERROR, "relation %u deleted while still in use", save_relid);
		}

		/*
		 * If we were to, again, have cases of the relkind of a relcache entry
		 * changing, we would need to ensure that pgstats does not get
		 * confused.
		 */
		Assert(relation->rd_rel->relkind == newrel->rd_rel->relkind);

		keep_tupdesc = equalTupleDescs(relation->rd_att, newrel->rd_att);
		keep_rules = equalRuleLocks(relation->rd_rules, newrel->rd_rules);
		keep_policies = equalRSDesc(relation->rd_rsdesc, newrel->rd_rsdesc);
		/* partkey is immutable once set up, so we can always keep it */
		keep_partkey = (relation->rd_partkey != NULL);

		/*
		 * Perform swapping of the relcache entry contents.  Within this
		 * process the old entry is momentarily invalid, so there *must* be no
		 * possibility of CHECK_FOR_INTERRUPTS within this sequence. Do it in
		 * all-in-line code for safety.
		 *
		 * Since the vast majority of fields should be swapped, our method is
		 * to swap the whole structures and then re-swap those few fields we
		 * didn't want swapped.
		 */
#define SWAPFIELD(fldtype, fldname) \
		do { \
			fldtype _tmp = newrel->fldname; \
			newrel->fldname = relation->fldname; \
			relation->fldname = _tmp; \
		} while (0)

		/* swap all Relation struct fields */
		{
			RelationData tmpstruct;

			memcpy(&tmpstruct, newrel, sizeof(RelationData));
			memcpy(newrel, relation, sizeof(RelationData));
			memcpy(relation, &tmpstruct, sizeof(RelationData));
		}

		/* rd_smgr must not be swapped, due to back-links from smgr level */
		SWAPFIELD(SMgrRelation, rd_smgr);
		/* rd_refcnt must be preserved */
		SWAPFIELD(int, rd_refcnt);
		/* isnailed shouldn't change */
		Assert(newrel->rd_isnailed == relation->rd_isnailed);
		/* creation sub-XIDs must be preserved */
		SWAPFIELD(SubTransactionId, rd_createSubid);
		SWAPFIELD(SubTransactionId, rd_newRelfilelocatorSubid);
		SWAPFIELD(SubTransactionId, rd_firstRelfilelocatorSubid);
		SWAPFIELD(SubTransactionId, rd_droppedSubid);
		/* un-swap rd_rel pointers, swap contents instead */
		SWAPFIELD(Form_pg_class, rd_rel);
		/* ... but actually, we don't have to update newrel->rd_rel */
		memcpy(relation->rd_rel, newrel->rd_rel, CLASS_TUPLE_SIZE);
		/* preserve old tupledesc, rules, policies if no logical change */
		if (keep_tupdesc)
			SWAPFIELD(TupleDesc, rd_att);
		if (keep_rules)
		{
			SWAPFIELD(RuleLock *, rd_rules);
			SWAPFIELD(MemoryContext, rd_rulescxt);
		}
		if (keep_policies)
			SWAPFIELD(RowSecurityDesc *, rd_rsdesc);
		/* toast OID override must be preserved */
		SWAPFIELD(Oid, rd_toastoid);
		/* pgstat_info / enabled must be preserved */
		SWAPFIELD(struct PgStat_TableStatus *, pgstat_info);
		SWAPFIELD(bool, pgstat_enabled);
		/* preserve old partition key if we have one */
		if (keep_partkey)
		{
			SWAPFIELD(PartitionKey, rd_partkey);
			SWAPFIELD(MemoryContext, rd_partkeycxt);
		}
		if (newrel->rd_pdcxt != NULL || newrel->rd_pddcxt != NULL)
		{
			/*
			 * We are rebuilding a partitioned relation with a non-zero
			 * reference count, so we must keep the old partition descriptor
			 * around, in case there's a PartitionDirectory with a pointer to
			 * it.  This means we can't free the old rd_pdcxt yet.  (This is
			 * necessary because RelationGetPartitionDesc hands out direct
			 * pointers to the relcache's data structure, unlike our usual
			 * practice which is to hand out copies.  We'd have the same
			 * problem with rd_partkey, except that we always preserve that
			 * once created.)
			 *
			 * To ensure that it's not leaked completely, re-attach it to the
			 * new reldesc, or make it a child of the new reldesc's rd_pdcxt
			 * in the unlikely event that there is one already.  (Compare hack
			 * in RelationBuildPartitionDesc.)  RelationClose will clean up
			 * any such contexts once the reference count reaches zero.
			 *
			 * In the case where the reference count is zero, this code is not
			 * reached, which should be OK because in that case there should
			 * be no PartitionDirectory with a pointer to the old entry.
			 *
			 * Note that newrel and relation have already been swapped, so the
			 * "old" partition descriptor is actually the one hanging off of
			 * newrel.
			 */
			relation->rd_partdesc = NULL;	/* ensure rd_partdesc is invalid */
			relation->rd_partdesc_nodetached = NULL;
			relation->rd_partdesc_nodetached_xmin = InvalidTransactionId;
			if (relation->rd_pdcxt != NULL) /* probably never happens */
				MemoryContextSetParent(newrel->rd_pdcxt, relation->rd_pdcxt);
			else
				relation->rd_pdcxt = newrel->rd_pdcxt;
			if (relation->rd_pddcxt != NULL)
				MemoryContextSetParent(newrel->rd_pddcxt, relation->rd_pddcxt);
			else
				relation->rd_pddcxt = newrel->rd_pddcxt;
			/* drop newrel's pointers so we don't destroy it below */
			newrel->rd_partdesc = NULL;
			newrel->rd_partdesc_nodetached = NULL;
			newrel->rd_partdesc_nodetached_xmin = InvalidTransactionId;
			newrel->rd_pdcxt = NULL;
			newrel->rd_pddcxt = NULL;
		}

#undef SWAPFIELD

		/* And now we can throw away the temporary entry */
		RelationDestroyRelation(newrel, !keep_tupdesc);
	}
}

/*
 * RelationFlushRelation
 *
 *	 Rebuild the relation if it is open (refcount > 0), else blow it away.
 *	 This is used when we receive a cache invalidation event for the rel.
 */
static void
RelationFlushRelation(Relation relation)
{
	if (relation->rd_createSubid != InvalidSubTransactionId ||
		relation->rd_firstRelfilelocatorSubid != InvalidSubTransactionId)
	{
		/*
		 * New relcache entries are always rebuilt, not flushed; else we'd
		 * forget the "new" status of the relation.  Ditto for the
		 * new-relfilenumber status.
		 */
		if (IsTransactionState() && relation->rd_droppedSubid == InvalidSubTransactionId)
		{
			/*
			 * The rel could have zero refcnt here, so temporarily increment
			 * the refcnt to ensure it's safe to rebuild it.  We can assume
			 * that the current transaction has some lock on the rel already.
			 */
			RelationIncrementReferenceCount(relation);
			RelationRebuildRelation(relation);
			RelationDecrementReferenceCount(relation);
		}
		else
			RelationInvalidateRelation(relation);
	}
	else
	{
		/*
		 * Pre-existing rels can be dropped from the relcache if not open.
		 *
		 * If the entry is in use, rebuild it if possible.  If we're not
		 * inside a valid transaction, we can't do any catalog access so it's
		 * not possible to rebuild yet.  Just mark it as invalid in that case,
		 * so that the rebuild will occur when the entry is next opened.
		 *
		 * Note: it's possible that we come here during subtransaction abort,
		 * and the reason for wanting to rebuild is that the rel is open in
		 * the outer transaction.  In that case it might seem unsafe to not
		 * rebuild immediately, since whatever code has the rel already open
		 * will keep on using the relcache entry as-is.  However, in such a
		 * case the outer transaction should be holding a lock that's
		 * sufficient to prevent any significant change in the rel's schema,
		 * so the existing entry contents should be good enough for its
		 * purposes; at worst we might be behind on statistics updates or the
		 * like.  (See also CheckTableNotInUse() and its callers.)
		 */
		if (RelationHasReferenceCountZero(relation))
			RelationClearRelation(relation);
		else if (!IsTransactionState())
			RelationInvalidateRelation(relation);
		else if (relation->rd_isnailed && relation->rd_refcnt == 1)
		{
			/*
			 * A nailed relation with refcnt == 1 is unused.  We cannot clear
			 * it, but there's also no need no need to rebuild it immediately.
			 */
			RelationInvalidateRelation(relation);
		}
		else
			RelationRebuildRelation(relation);
	}
}

/*
 * RelationForgetRelation - caller reports that it dropped the relation
 */
void
RelationForgetRelation(Oid rid)
{
	Relation	relation;

	RelationIdCacheLookup(rid, relation);

	if (!PointerIsValid(relation))
		return;					/* not in cache, nothing to do */

	if (!RelationHasReferenceCountZero(relation))
		elog(ERROR, "relation %u is still open", rid);

	Assert(relation->rd_droppedSubid == InvalidSubTransactionId);
	if (relation->rd_createSubid != InvalidSubTransactionId ||
		relation->rd_firstRelfilelocatorSubid != InvalidSubTransactionId)
	{
		/*
		 * In the event of subtransaction rollback, we must not forget
		 * rd_*Subid.  Mark the entry "dropped" and invalidate it, instead of
		 * destroying it right away.  (If we're in a top transaction, we could
		 * opt to destroy the entry.)
		 */
		relation->rd_droppedSubid = GetCurrentSubTransactionId();
		RelationInvalidateRelation(relation);
	}
	else
		RelationClearRelation(relation);
}

/*
 *		RelationCacheInvalidateEntry
 *
 *		This routine is invoked for SI cache flush messages.
 *
 * Any relcache entry matching the relid must be flushed.  (Note: caller has
 * already determined that the relid belongs to our database or is a shared
 * relation.)
 *
 * We used to skip local relations, on the grounds that they could
 * not be targets of cross-backend SI update messages; but it seems
 * safer to process them, so that our *own* SI update messages will
 * have the same effects during CommandCounterIncrement for both
 * local and nonlocal relations.
 */
void
RelationCacheInvalidateEntry(Oid relationId)
{
	Relation	relation;

	RelationIdCacheLookup(relationId, relation);

	if (PointerIsValid(relation))
	{
		relcacheInvalsReceived++;
		RelationFlushRelation(relation);
	}
	else
	{
		int			i;

		for (i = 0; i < in_progress_list_len; i++)
			if (in_progress_list[i].reloid == relationId)
				in_progress_list[i].invalidated = true;
	}
}

/*
 * RelationCacheInvalidate
 *	 Blow away cached relation descriptors that have zero reference counts,
 *	 and rebuild those with positive reference counts.  Also reset the smgr
 *	 relation cache and re-read relation mapping data.
 *
 *	 Apart from debug_discard_caches, this is currently used only to recover
 *	 from SI message buffer overflow, so we do not touch relations having
 *	 new-in-transaction relfilenumbers; they cannot be targets of cross-backend
 *	 SI updates (and our own updates now go through a separate linked list
 *	 that isn't limited by the SI message buffer size).
 *
 *	 We do this in two phases: the first pass deletes deletable items, and
 *	 the second one rebuilds the rebuildable items.  This is essential for
 *	 safety, because hash_seq_search only copes with concurrent deletion of
 *	 the element it is currently visiting.  If a second SI overflow were to
 *	 occur while we are walking the table, resulting in recursive entry to
 *	 this routine, we could crash because the inner invocation blows away
 *	 the entry next to be visited by the outer scan.  But this way is OK,
 *	 because (a) during the first pass we won't process any more SI messages,
 *	 so hash_seq_search will complete safely; (b) during the second pass we
 *	 only hold onto pointers to nondeletable entries.
 *
 *	 The two-phase approach also makes it easy to update relfilenumbers for
 *	 mapped relations before we do anything else, and to ensure that the
 *	 second pass processes nailed-in-cache items before other nondeletable
 *	 items.  This should ensure that system catalogs are up to date before
 *	 we attempt to use them to reload information about other open relations.
 *
 *	 After those two phases of work having immediate effects, we normally
 *	 signal any RelationBuildDesc() on the stack to start over.  However, we
 *	 don't do this if called as part of debug_discard_caches.  Otherwise,
 *	 RelationBuildDesc() would become an infinite loop.
 */
void
RelationCacheInvalidate(bool debug_discard)
{
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;
	Relation	relation;
	List	   *rebuildFirstList = NIL;
	List	   *rebuildList = NIL;
	ListCell   *l;
	int			i;

	/*
	 * Reload relation mapping data before starting to reconstruct cache.
	 */
	RelationMapInvalidateAll();

	/* Phase 1 */
	hash_seq_init(&status, RelationIdCache);

	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
	{
		relation = idhentry->reldesc;

		/*
		 * Ignore new relations; no other backend will manipulate them before
		 * we commit.  Likewise, before replacing a relation's relfilelocator,
		 * we shall have acquired AccessExclusiveLock and drained any
		 * applicable pending invalidations.
		 */
		if (relation->rd_createSubid != InvalidSubTransactionId ||
			relation->rd_firstRelfilelocatorSubid != InvalidSubTransactionId)
			continue;

		relcacheInvalsReceived++;

		if (RelationHasReferenceCountZero(relation))
		{
			/* Delete this entry immediately */
			RelationClearRelation(relation);
		}
		else
		{
			/*
			 * If it's a mapped relation, immediately update its rd_locator in
			 * case its relfilenumber changed.  We must do this during phase 1
			 * in case the relation is consulted during rebuild of other
			 * relcache entries in phase 2.  It's safe since consulting the
			 * map doesn't involve any access to relcache entries.
			 */
			if (RelationIsMapped(relation))
			{
				RelationCloseSmgr(relation);
				RelationInitPhysicalAddr(relation);
			}

			/*
			 * Add this entry to list of stuff to rebuild in second pass.
			 * pg_class goes to the front of rebuildFirstList while
			 * pg_class_oid_index goes to the back of rebuildFirstList, so
			 * they are done first and second respectively.  Other nailed
			 * relations go to the front of rebuildList, so they'll be done
			 * next in no particular order; and everything else goes to the
			 * back of rebuildList.
			 */
			if (RelationGetRelid(relation) == RelationRelationId)
				rebuildFirstList = lcons(relation, rebuildFirstList);
			else if (RelationGetRelid(relation) == ClassOidIndexId)
				rebuildFirstList = lappend(rebuildFirstList, relation);
			else if (relation->rd_isnailed)
				rebuildList = lcons(relation, rebuildList);
			else
				rebuildList = lappend(rebuildList, relation);
		}
	}

	/*
	 * We cannot destroy the SMgrRelations as there might still be references
	 * to them, but close the underlying file descriptors.
	 */
	smgrreleaseall();

	/*
	 * Phase 2: rebuild (or invalidate) the items found to need rebuild in
	 * phase 1
	 */
	foreach(l, rebuildFirstList)
	{
		relation = (Relation) lfirst(l);
		if (!IsTransactionState() || (relation->rd_isnailed && relation->rd_refcnt == 1))
			RelationInvalidateRelation(relation);
		else
			RelationRebuildRelation(relation);
	}
	list_free(rebuildFirstList);
	foreach(l, rebuildList)
	{
		relation = (Relation) lfirst(l);
		if (!IsTransactionState() || (relation->rd_isnailed && relation->rd_refcnt == 1))
			RelationInvalidateRelation(relation);
		else
			RelationRebuildRelation(relation);
	}
	list_free(rebuildList);

	if (!debug_discard)
		/* Any RelationBuildDesc() on the stack must start over. */
		for (i = 0; i < in_progress_list_len; i++)
			in_progress_list[i].invalidated = true;
}

static void
RememberToFreeTupleDescAtEOX(TupleDesc td)
{
	if (EOXactTupleDescArray == NULL)
	{
		MemoryContext oldcxt;

		oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

		EOXactTupleDescArray = (TupleDesc *) palloc(16 * sizeof(TupleDesc));
		EOXactTupleDescArrayLen = 16;
		NextEOXactTupleDescNum = 0;
		MemoryContextSwitchTo(oldcxt);
	}
	else if (NextEOXactTupleDescNum >= EOXactTupleDescArrayLen)
	{
		int32		newlen = EOXactTupleDescArrayLen * 2;

		Assert(EOXactTupleDescArrayLen > 0);

		EOXactTupleDescArray = (TupleDesc *) repalloc(EOXactTupleDescArray,
													  newlen * sizeof(TupleDesc));
		EOXactTupleDescArrayLen = newlen;
	}

	EOXactTupleDescArray[NextEOXactTupleDescNum++] = td;
}

#ifdef USE_ASSERT_CHECKING
static void
AssertPendingSyncConsistency(Relation relation)
{
	bool		relcache_verdict =
		RelationIsPermanent(relation) &&
		((relation->rd_createSubid != InvalidSubTransactionId &&
		  RELKIND_HAS_STORAGE(relation->rd_rel->relkind)) ||
		 relation->rd_firstRelfilelocatorSubid != InvalidSubTransactionId);

	Assert(relcache_verdict == RelFileLocatorSkippingWAL(relation->rd_locator));

	if (relation->rd_droppedSubid != InvalidSubTransactionId)
		Assert(!relation->rd_isvalid &&
			   (relation->rd_createSubid != InvalidSubTransactionId ||
				relation->rd_firstRelfilelocatorSubid != InvalidSubTransactionId));
}

/*
 * AssertPendingSyncs_RelationCache
 *
 *	Assert that relcache.c and storage.c agree on whether to skip WAL.
 */
void
AssertPendingSyncs_RelationCache(void)
{
	HASH_SEQ_STATUS status;
	LOCALLOCK  *locallock;
	Relation   *rels;
	int			maxrels;
	int			nrels;
	RelIdCacheEnt *idhentry;
	int			i;

	/*
	 * Open every relation that this transaction has locked.  If, for some
	 * relation, storage.c is skipping WAL and relcache.c is not skipping WAL,
	 * a CommandCounterIncrement() typically yields a local invalidation
	 * message that destroys the relcache entry.  By recreating such entries
	 * here, we detect the problem.
	 */
	PushActiveSnapshot(GetTransactionSnapshot());
	maxrels = 1;
	rels = palloc(maxrels * sizeof(*rels));
	nrels = 0;
	hash_seq_init(&status, GetLockMethodLocalHash());
	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
		Oid			relid;
		Relation	r;

		if (locallock->nLocks <= 0)
			continue;
		if ((LockTagType) locallock->tag.lock.locktag_type !=
			LOCKTAG_RELATION)
			continue;
		relid = ObjectIdGetDatum(locallock->tag.lock.locktag_field2);
		r = RelationIdGetRelation(relid);
		if (!RelationIsValid(r))
			continue;
		if (nrels >= maxrels)
		{
			maxrels *= 2;
			rels = repalloc(rels, maxrels * sizeof(*rels));
		}
		rels[nrels++] = r;
	}

	hash_seq_init(&status, RelationIdCache);
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
		AssertPendingSyncConsistency(idhentry->reldesc);

	for (i = 0; i < nrels; i++)
		RelationClose(rels[i]);
	PopActiveSnapshot();
}
#endif

/*
 * AtEOXact_RelationCache
 *
 *	Clean up the relcache at main-transaction commit or abort.
 *
 * Note: this must be called *before* processing invalidation messages.
 * In the case of abort, we don't want to try to rebuild any invalidated
 * cache entries (since we can't safely do database accesses).  Therefore
 * we must reset refcnts before handling pending invalidations.
 *
 * As of PostgreSQL 8.1, relcache refcnts should get released by the
 * ResourceOwner mechanism.  This routine just does a debugging
 * cross-check that no pins remain.  However, we also need to do special
 * cleanup when the current transaction created any relations or made use
 * of forced index lists.
 */
void
AtEOXact_RelationCache(bool isCommit)
{
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;
	int			i;

	/*
	 * Forget in_progress_list.  This is relevant when we're aborting due to
	 * an error during RelationBuildDesc().
	 */
	Assert(in_progress_list_len == 0 || !isCommit);
	in_progress_list_len = 0;

	/*
	 * Unless the eoxact_list[] overflowed, we only need to examine the rels
	 * listed in it.  Otherwise fall back on a hash_seq_search scan.
	 *
	 * For simplicity, eoxact_list[] entries are not deleted till end of
	 * top-level transaction, even though we could remove them at
	 * subtransaction end in some cases, or remove relations from the list if
	 * they are cleared for other reasons.  Therefore we should expect the
	 * case that list entries are not found in the hashtable; if not, there's
	 * nothing to do for them.
	 */
	if (eoxact_list_overflowed)
	{
		hash_seq_init(&status, RelationIdCache);
		while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
		{
			AtEOXact_cleanup(idhentry->reldesc, isCommit);
		}
	}
	else
	{
		for (i = 0; i < eoxact_list_len; i++)
		{
			idhentry = (RelIdCacheEnt *) hash_search(RelationIdCache,
													 &eoxact_list[i],
													 HASH_FIND,
													 NULL);
			if (idhentry != NULL)
				AtEOXact_cleanup(idhentry->reldesc, isCommit);
		}
	}

	if (EOXactTupleDescArrayLen > 0)
	{
		Assert(EOXactTupleDescArray != NULL);
		for (i = 0; i < NextEOXactTupleDescNum; i++)
			FreeTupleDesc(EOXactTupleDescArray[i]);
		pfree(EOXactTupleDescArray);
		EOXactTupleDescArray = NULL;
	}

	/* Now we're out of the transaction and can clear the lists */
	eoxact_list_len = 0;
	eoxact_list_overflowed = false;
	NextEOXactTupleDescNum = 0;
	EOXactTupleDescArrayLen = 0;
}

/*
 * AtEOXact_cleanup
 *
 *	Clean up a single rel at main-transaction commit or abort
 *
 * NB: this processing must be idempotent, because EOXactListAdd() doesn't
 * bother to prevent duplicate entries in eoxact_list[].
 */
static void
AtEOXact_cleanup(Relation relation, bool isCommit)
{
	bool		clear_relcache = false;

	/*
	 * The relcache entry's ref count should be back to its normal
	 * not-in-a-transaction state: 0 unless it's nailed in cache.
	 *
	 * In bootstrap mode, this is NOT true, so don't check it --- the
	 * bootstrap code expects relations to stay open across start/commit
	 * transaction calls.  (That seems bogus, but it's not worth fixing.)
	 *
	 * Note: ideally this check would be applied to every relcache entry, not
	 * just those that have eoxact work to do.  But it's not worth forcing a
	 * scan of the whole relcache just for this.  (Moreover, doing so would
	 * mean that assert-enabled testing never tests the hash_search code path
	 * above, which seems a bad idea.)
	 */
#ifdef USE_ASSERT_CHECKING
	if (!IsBootstrapProcessingMode())
	{
		int			expected_refcnt;

		expected_refcnt = relation->rd_isnailed ? 1 : 0;
		Assert(relation->rd_refcnt == expected_refcnt);
	}
#endif

	/*
	 * Is the relation live after this transaction ends?
	 *
	 * During commit, clear the relcache entry if it is preserved after
	 * relation drop, in order not to orphan the entry.  During rollback,
	 * clear the relcache entry if the relation is created in the current
	 * transaction since it isn't interesting any longer once we are out of
	 * the transaction.
	 */
	clear_relcache =
		(isCommit ?
		 relation->rd_droppedSubid != InvalidSubTransactionId :
		 relation->rd_createSubid != InvalidSubTransactionId);

	/*
	 * Since we are now out of the transaction, reset the subids to zero. That
	 * also lets RelationClearRelation() drop the relcache entry.
	 */
	relation->rd_createSubid = InvalidSubTransactionId;
	relation->rd_newRelfilelocatorSubid = InvalidSubTransactionId;
	relation->rd_firstRelfilelocatorSubid = InvalidSubTransactionId;
	relation->rd_droppedSubid = InvalidSubTransactionId;

	if (clear_relcache)
	{
		if (RelationHasReferenceCountZero(relation))
		{
			RelationClearRelation(relation);
			return;
		}
		else
		{
			/*
			 * Hmm, somewhere there's a (leaked?) reference to the relation.
			 * We daren't remove the entry for fear of dereferencing a
			 * dangling pointer later.  Bleat, and mark it as not belonging to
			 * the current transaction.  Hopefully it'll get cleaned up
			 * eventually.  This must be just a WARNING to avoid
			 * error-during-error-recovery loops.
			 */
			elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
				 RelationGetRelationName(relation));
		}
	}
}

/*
 * AtEOSubXact_RelationCache
 *
 *	Clean up the relcache at sub-transaction commit or abort.
 *
 * Note: this must be called *before* processing invalidation messages.
 */
void
AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
						  SubTransactionId parentSubid)
{
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;
	int			i;

	/*
	 * Forget in_progress_list.  This is relevant when we're aborting due to
	 * an error during RelationBuildDesc().  We don't commit subtransactions
	 * during RelationBuildDesc().
	 */
	Assert(in_progress_list_len == 0 || !isCommit);
	in_progress_list_len = 0;

	/*
	 * Unless the eoxact_list[] overflowed, we only need to examine the rels
	 * listed in it.  Otherwise fall back on a hash_seq_search scan.  Same
	 * logic as in AtEOXact_RelationCache.
	 */
	if (eoxact_list_overflowed)
	{
		hash_seq_init(&status, RelationIdCache);
		while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
		{
			AtEOSubXact_cleanup(idhentry->reldesc, isCommit,
								mySubid, parentSubid);
		}
	}
	else
	{
		for (i = 0; i < eoxact_list_len; i++)
		{
			idhentry = (RelIdCacheEnt *) hash_search(RelationIdCache,
													 &eoxact_list[i],
													 HASH_FIND,
													 NULL);
			if (idhentry != NULL)
				AtEOSubXact_cleanup(idhentry->reldesc, isCommit,
									mySubid, parentSubid);
		}
	}

	/* Don't reset the list; we still need more cleanup later */
}

/*
 * AtEOSubXact_cleanup
 *
 *	Clean up a single rel at subtransaction commit or abort
 *
 * NB: this processing must be idempotent, because EOXactListAdd() doesn't
 * bother to prevent duplicate entries in eoxact_list[].
 */
static void
AtEOSubXact_cleanup(Relation relation, bool isCommit,
					SubTransactionId mySubid, SubTransactionId parentSubid)
{
	/*
	 * Is it a relation created in the current subtransaction?
	 *
	 * During subcommit, mark it as belonging to the parent, instead, as long
	 * as it has not been dropped. Otherwise simply delete the relcache entry.
	 * --- it isn't interesting any longer.
	 */
	if (relation->rd_createSubid == mySubid)
	{
		/*
		 * Valid rd_droppedSubid means the corresponding relation is dropped
		 * but the relcache entry is preserved for at-commit pending sync. We
		 * need to drop it explicitly here not to make the entry orphan.
		 */
		Assert(relation->rd_droppedSubid == mySubid ||
			   relation->rd_droppedSubid == InvalidSubTransactionId);
		if (isCommit && relation->rd_droppedSubid == InvalidSubTransactionId)
			relation->rd_createSubid = parentSubid;
		else if (RelationHasReferenceCountZero(relation))
		{
			/* allow the entry to be removed */
			relation->rd_createSubid = InvalidSubTransactionId;
			relation->rd_newRelfilelocatorSubid = InvalidSubTransactionId;
			relation->rd_firstRelfilelocatorSubid = InvalidSubTransactionId;
			relation->rd_droppedSubid = InvalidSubTransactionId;
			RelationClearRelation(relation);
			return;
		}
		else
		{
			/*
			 * Hmm, somewhere there's a (leaked?) reference to the relation.
			 * We daren't remove the entry for fear of dereferencing a
			 * dangling pointer later.  Bleat, and transfer it to the parent
			 * subtransaction so we can try again later.  This must be just a
			 * WARNING to avoid error-during-error-recovery loops.
			 */
			relation->rd_createSubid = parentSubid;
			elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
				 RelationGetRelationName(relation));
		}
	}

	/*
	 * Likewise, update or drop any new-relfilenumber-in-subtransaction record
	 * or drop record.
	 */
	if (relation->rd_newRelfilelocatorSubid == mySubid)
	{
		if (isCommit)
			relation->rd_newRelfilelocatorSubid = parentSubid;
		else
			relation->rd_newRelfilelocatorSubid = InvalidSubTransactionId;
	}

	if (relation->rd_firstRelfilelocatorSubid == mySubid)
	{
		if (isCommit)
			relation->rd_firstRelfilelocatorSubid = parentSubid;
		else
			relation->rd_firstRelfilelocatorSubid = InvalidSubTransactionId;
	}

	if (relation->rd_droppedSubid == mySubid)
	{
		if (isCommit)
			relation->rd_droppedSubid = parentSubid;
		else
			relation->rd_droppedSubid = InvalidSubTransactionId;
	}
}


/*
 *		RelationBuildLocalRelation
 *			Build a relcache entry for an about-to-be-created relation,
 *			and enter it into the relcache.
 */
Relation
RelationBuildLocalRelation(const char *relname,
						   Oid relnamespace,
						   TupleDesc tupDesc,
						   Oid relid,
						   Oid accessmtd,
						   RelFileNumber relfilenumber,
						   Oid reltablespace,
						   bool shared_relation,
						   bool mapped_relation,
						   char relpersistence,
						   char relkind)
{
	Relation	rel;
	MemoryContext oldcxt;
	int			natts = tupDesc->natts;
	int			i;
	bool		has_not_null;
	bool		nailit;

	Assert(natts >= 0);

	/*
	 * check for creation of a rel that must be nailed in cache.
	 *
	 * XXX this list had better match the relations specially handled in
	 * RelationCacheInitializePhase2/3.
	 */
	switch (relid)
	{
		case DatabaseRelationId:
		case AuthIdRelationId:
		case AuthMemRelationId:
		case RelationRelationId:
		case AttributeRelationId:
		case ProcedureRelationId:
		case TypeRelationId:
			nailit = true;
			break;
		default:
			nailit = false;
			break;
	}

	/*
	 * check that hardwired list of shared rels matches what's in the
	 * bootstrap .bki file.  If you get a failure here during initdb, you
	 * probably need to fix IsSharedRelation() to match whatever you've done
	 * to the set of shared relations.
	 */
	if (shared_relation != IsSharedRelation(relid))
		elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
			 relname, relid);

	/* Shared relations had better be mapped, too */
	Assert(mapped_relation || !shared_relation);

	/*
	 * switch to the cache context to create the relcache entry.
	 */
	if (!CacheMemoryContext)
		CreateCacheMemoryContext();

	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

	/*
	 * allocate a new relation descriptor and fill in basic state fields.
	 */
	rel = (Relation) palloc0(sizeof(RelationData));

	/* make sure relation is marked as having no open file yet */
	rel->rd_smgr = NULL;

	/* mark it nailed if appropriate */
	rel->rd_isnailed = nailit;

	rel->rd_refcnt = nailit ? 1 : 0;

	/* it's being created in this transaction */
	rel->rd_createSubid = GetCurrentSubTransactionId();
	rel->rd_newRelfilelocatorSubid = InvalidSubTransactionId;
	rel->rd_firstRelfilelocatorSubid = InvalidSubTransactionId;
	rel->rd_droppedSubid = InvalidSubTransactionId;

	/*
	 * create a new tuple descriptor from the one passed in.  We do this
	 * partly to copy it into the cache context, and partly because the new
	 * relation can't have any defaults or constraints yet; they have to be
	 * added in later steps, because they require additions to multiple system
	 * catalogs.  We can copy attnotnull constraints here, however.
	 */
	rel->rd_att = CreateTupleDescCopy(tupDesc);
	rel->rd_att->tdrefcount = 1;	/* mark as refcounted */
	has_not_null = false;
	for (i = 0; i < natts; i++)
	{
		Form_pg_attribute satt = TupleDescAttr(tupDesc, i);
		Form_pg_attribute datt = TupleDescAttr(rel->rd_att, i);

		datt->attidentity = satt->attidentity;
		datt->attgenerated = satt->attgenerated;
		datt->attnotnull = satt->attnotnull;
		has_not_null |= satt->attnotnull;
		populate_compact_attribute(rel->rd_att, i);

		if (satt->attnotnull)
		{
			CompactAttribute *scatt = TupleDescCompactAttr(tupDesc, i);
			CompactAttribute *dcatt = TupleDescCompactAttr(rel->rd_att, i);

			dcatt->attnullability = scatt->attnullability;
		}
	}

	if (has_not_null)
	{
		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

		constr->has_not_null = true;
		rel->rd_att->constr = constr;
	}

	/*
	 * initialize relation tuple form (caller may add/override data later)
	 */
	rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);

	namestrcpy(&rel->rd_rel->relname, relname);
	rel->rd_rel->relnamespace = relnamespace;

	rel->rd_rel->relkind = relkind;
	rel->rd_rel->relnatts = natts;
	rel->rd_rel->reltype = InvalidOid;
	/* needed when bootstrapping: */
	rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;

	/* set up persistence and relcache fields dependent on it */
	rel->rd_rel->relpersistence = relpersistence;
	switch (relpersistence)
	{
		case RELPERSISTENCE_UNLOGGED:
		case RELPERSISTENCE_PERMANENT:
			rel->rd_backend = INVALID_PROC_NUMBER;
			rel->rd_islocaltemp = false;
			break;
		case RELPERSISTENCE_TEMP:
			Assert(isTempOrTempToastNamespace(relnamespace));
			rel->rd_backend = ProcNumberForTempRelations();
			rel->rd_islocaltemp = true;
			break;
		default:
			elog(ERROR, "invalid relpersistence: %c", relpersistence);
			break;
	}

	/* if it's a materialized view, it's not populated initially */
	if (relkind == RELKIND_MATVIEW)
		rel->rd_rel->relispopulated = false;
	else
		rel->rd_rel->relispopulated = true;

	/* set replica identity -- system catalogs and non-tables don't have one */
	if (!IsCatalogNamespace(relnamespace) &&
		(relkind == RELKIND_RELATION ||
		 relkind == RELKIND_MATVIEW ||
		 relkind == RELKIND_PARTITIONED_TABLE))
		rel->rd_rel->relreplident = REPLICA_IDENTITY_DEFAULT;
	else
		rel->rd_rel->relreplident = REPLICA_IDENTITY_NOTHING;

	/*
	 * Insert relation physical and logical identifiers (OIDs) into the right
	 * places.  For a mapped relation, we set relfilenumber to zero and rely
	 * on RelationInitPhysicalAddr to consult the map.
	 */
	rel->rd_rel->relisshared = shared_relation;

	RelationGetRelid(rel) = relid;

	for (i = 0; i < natts; i++)
		TupleDescAttr(rel->rd_att, i)->attrelid = relid;

	rel->rd_rel->reltablespace = reltablespace;

	if (mapped_relation)
	{
		rel->rd_rel->relfilenode = InvalidRelFileNumber;
		/* Add it to the active mapping information */
		RelationMapUpdateMap(relid, relfilenumber, shared_relation, true);
	}
	else
		rel->rd_rel->relfilenode = relfilenumber;

	RelationInitLockInfo(rel);	/* see lmgr.c */

	RelationInitPhysicalAddr(rel);

	rel->rd_rel->relam = accessmtd;

	/*
	 * RelationInitTableAccessMethod will do syscache lookups, so we mustn't
	 * run it in CacheMemoryContext.  Fortunately, the remaining steps don't
	 * require a long-lived current context.
	 */
	MemoryContextSwitchTo(oldcxt);

	if (RELKIND_HAS_TABLE_AM(relkind) || relkind == RELKIND_SEQUENCE)
		RelationInitTableAccessMethod(rel);

	/*
	 * Leave index access method uninitialized, because the pg_index row has
	 * not been inserted at this stage of index creation yet.  The cache
	 * invalidation after pg_index row has been inserted will initialize it.
	 */

	/*
	 * Okay to insert into the relcache hash table.
	 *
	 * Ordinarily, there should certainly not be an existing hash entry for
	 * the same OID; but during bootstrap, when we create a "real" relcache
	 * entry for one of the bootstrap relations, we'll be overwriting the
	 * phony one created with formrdesc.  So allow that to happen for nailed
	 * rels.
	 */
	RelationCacheInsert(rel, nailit);

	/*
	 * Flag relation as needing eoxact cleanup (to clear rd_createSubid). We
	 * can't do this before storing relid in it.
	 */
	EOXactListAdd(rel);

	/* It's fully valid */
	rel->rd_isvalid = true;

	/*
	 * Caller expects us to pin the returned entry.
	 */
	RelationIncrementReferenceCount(rel);

	return rel;
}


/*
 * RelationSetNewRelfilenumber
 *
 * Assign a new relfilenumber (physical file name), and possibly a new
 * persistence setting, to the relation.
 *
 * This allows a full rewrite of the relation to be done with transactional
 * safety (since the filenumber assignment can be rolled back).  Note however
 * that there is no simple way to access the relation's old data for the
 * remainder of the current transaction.  This limits the usefulness to cases
 * such as TRUNCATE or rebuilding an index from scratch.
 *
 * Caller must already hold exclusive lock on the relation.
 */
void
RelationSetNewRelfilenumber(Relation relation, char persistence)
{
	RelFileNumber newrelfilenumber;
	Relation	pg_class;
	ItemPointerData otid;
	HeapTuple	tuple;
	Form_pg_class classform;
	MultiXactId minmulti = InvalidMultiXactId;
	TransactionId freezeXid = InvalidTransactionId;
	RelFileLocator newrlocator;

	if (!IsBinaryUpgrade)
	{
		/* Allocate a new relfilenumber */
		newrelfilenumber = GetNewRelFileNumber(relation->rd_rel->reltablespace,
											   NULL, persistence);
	}
	else if (relation->rd_rel->relkind == RELKIND_INDEX)
	{
		if (!OidIsValid(binary_upgrade_next_index_pg_class_relfilenumber))
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("index relfilenumber value not set when in binary upgrade mode")));

		newrelfilenumber = binary_upgrade_next_index_pg_class_relfilenumber;
		binary_upgrade_next_index_pg_class_relfilenumber = InvalidOid;
	}
	else if (relation->rd_rel->relkind == RELKIND_RELATION)
	{
		if (!OidIsValid(binary_upgrade_next_heap_pg_class_relfilenumber))
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("heap relfilenumber value not set when in binary upgrade mode")));

		newrelfilenumber = binary_upgrade_next_heap_pg_class_relfilenumber;
		binary_upgrade_next_heap_pg_class_relfilenumber = InvalidOid;
	}
	else
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("unexpected request for new relfilenumber in binary upgrade mode")));

	/*
	 * Get a writable copy of the pg_class tuple for the given relation.
	 */
	pg_class = table_open(RelationRelationId, RowExclusiveLock);

	tuple = SearchSysCacheLockedCopy1(RELOID,
									  ObjectIdGetDatum(RelationGetRelid(relation)));
	if (!HeapTupleIsValid(tuple))
		elog(ERROR, "could not find tuple for relation %u",
			 RelationGetRelid(relation));
	otid = tuple->t_self;
	classform = (Form_pg_class) GETSTRUCT(tuple);

	/*
	 * Schedule unlinking of the old storage at transaction commit, except
	 * when performing a binary upgrade, when we must do it immediately.
	 */
	if (IsBinaryUpgrade)
	{
		SMgrRelation srel;

		/*
		 * During a binary upgrade, we use this code path to ensure that
		 * pg_largeobject and its index have the same relfilenumbers as in the
		 * old cluster. This is necessary because pg_upgrade treats
		 * pg_largeobject like a user table, not a system table. It is however
		 * possible that a table or index may need to end up with the same
		 * relfilenumber in the new cluster as what it had in the old cluster.
		 * Hence, we can't wait until commit time to remove the old storage.
		 *
		 * In general, this function needs to have transactional semantics,
		 * and removing the old storage before commit time surely isn't.
		 * However, it doesn't really matter, because if a binary upgrade
		 * fails at this stage, the new cluster will need to be recreated
		 * anyway.
		 */
		srel = smgropen(relation->rd_locator, relation->rd_backend);
		smgrdounlinkall(&srel, 1, false);
		smgrclose(srel);
	}
	else
	{
		/* Not a binary upgrade, so just schedule it to happen later. */
		RelationDropStorage(relation);
	}

	/*
	 * Create storage for the main fork of the new relfilenumber.  If it's a
	 * table-like object, call into the table AM to do so, which'll also
	 * create the table's init fork if needed.
	 *
	 * NOTE: If relevant for the AM, any conflict in relfilenumber value will
	 * be caught here, if GetNewRelFileNumber messes up for any reason.
	 */
	newrlocator = relation->rd_locator;
	newrlocator.relNumber = newrelfilenumber;

	if (RELKIND_HAS_TABLE_AM(relation->rd_rel->relkind))
	{
		table_relation_set_new_filelocator(relation, &newrlocator,
										   persistence,
										   &freezeXid, &minmulti);
	}
	else if (RELKIND_HAS_STORAGE(relation->rd_rel->relkind))
	{
		/* handle these directly, at least for now */
		SMgrRelation srel;

		srel = RelationCreateStorage(newrlocator, persistence, true);
		smgrclose(srel);
	}
	else
	{
		/* we shouldn't be called for anything else */
		elog(ERROR, "relation \"%s\" does not have storage",
			 RelationGetRelationName(relation));
	}

	/*
	 * If we're dealing with a mapped index, pg_class.relfilenode doesn't
	 * change; instead we have to send the update to the relation mapper.
	 *
	 * For mapped indexes, we don't actually change the pg_class entry at all;
	 * this is essential when reindexing pg_class itself.  That leaves us with
	 * possibly-inaccurate values of relpages etc, but those will be fixed up
	 * later.
	 */
	if (RelationIsMapped(relation))
	{
		/* This case is only supported for indexes */
		Assert(relation->rd_rel->relkind == RELKIND_INDEX);

		/* Since we're not updating pg_class, these had better not change */
		Assert(classform->relfrozenxid == freezeXid);
		Assert(classform->relminmxid == minmulti);
		Assert(classform->relpersistence == persistence);

		/*
		 * In some code paths it's possible that the tuple update we'd
		 * otherwise do here is the only thing that would assign an XID for
		 * the current transaction.  However, we must have an XID to delete
		 * files, so make sure one is assigned.
		 */
		(void) GetCurrentTransactionId();

		/* Do the deed */
		RelationMapUpdateMap(RelationGetRelid(relation),
							 newrelfilenumber,
							 relation->rd_rel->relisshared,
							 false);

		/* Since we're not updating pg_class, must trigger inval manually */
		CacheInvalidateRelcache(relation);
	}
	else
	{
		/* Normal case, update the pg_class entry */
		classform->relfilenode = newrelfilenumber;

		/* relpages etc. never change for sequences */
		if (relation->rd_rel->relkind != RELKIND_SEQUENCE)
		{
			classform->relpages = 0;	/* it's empty until further notice */
			classform->reltuples = -1;
			classform->relallvisible = 0;
			classform->relallfrozen = 0;
		}
		classform->relfrozenxid = freezeXid;
		classform->relminmxid = minmulti;
		classform->relpersistence = persistence;

		CatalogTupleUpdate(pg_class, &otid, tuple);
	}

	UnlockTuple(pg_class, &otid, InplaceUpdateTupleLock);
	heap_freetuple(tuple);

	table_close(pg_class, RowExclusiveLock);

	/*
	 * Make the pg_class row change or relation map change visible.  This will
	 * cause the relcache entry to get updated, too.
	 */
	CommandCounterIncrement();

	RelationAssumeNewRelfilelocator(relation);
}

/*
 * RelationAssumeNewRelfilelocator
 *
 * Code that modifies pg_class.reltablespace or pg_class.relfilenode must call
 * this.  The call shall precede any code that might insert WAL records whose
 * replay would modify bytes in the new RelFileLocator, and the call shall follow
 * any WAL modifying bytes in the prior RelFileLocator.  See struct RelationData.
 * Ideally, call this as near as possible to the CommandCounterIncrement()
 * that makes the pg_class change visible (before it or after it); that
 * minimizes the chance of future development adding a forbidden WAL insertion
 * between RelationAssumeNewRelfilelocator() and CommandCounterIncrement().
 */
void
RelationAssumeNewRelfilelocator(Relation relation)
{
	relation->rd_newRelfilelocatorSubid = GetCurrentSubTransactionId();
	if (relation->rd_firstRelfilelocatorSubid == InvalidSubTransactionId)
		relation->rd_firstRelfilelocatorSubid = relation->rd_newRelfilelocatorSubid;

	/* Flag relation as needing eoxact cleanup (to clear these fields) */
	EOXactListAdd(relation);
}


/*
 *		RelationCacheInitialize
 *
 *		This initializes the relation descriptor cache.  At the time
 *		that this is invoked, we can't do database access yet (mainly
 *		because the transaction subsystem is not up); all we are doing
 *		is making an empty cache hashtable.  This must be done before
 *		starting the initialization transaction, because otherwise
 *		AtEOXact_RelationCache would crash if that transaction aborts
 *		before we can get the relcache set up.
 */

#define INITRELCACHESIZE		400

void
RelationCacheInitialize(void)
{
	HASHCTL		ctl;
	int			allocsize;

	/*
	 * make sure cache memory context exists
	 */
	if (!CacheMemoryContext)
		CreateCacheMemoryContext();

	/*
	 * create hashtable that indexes the relcache
	 */
	ctl.keysize = sizeof(Oid);
	ctl.entrysize = sizeof(RelIdCacheEnt);
	RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
								  &ctl, HASH_ELEM | HASH_BLOBS);

	/*
	 * reserve enough in_progress_list slots for many cases
	 */
	allocsize = 4;
	in_progress_list =
		MemoryContextAlloc(CacheMemoryContext,
						   allocsize * sizeof(*in_progress_list));
	in_progress_list_maxlen = allocsize;

	/*
	 * relation mapper needs to be initialized too
	 */
	RelationMapInitialize();
}

/*
 *		RelationCacheInitializePhase2
 *
 *		This is called to prepare for access to shared catalogs during startup.
 *		We must at least set up nailed reldescs for pg_database, pg_authid,
 *		pg_auth_members, and pg_shseclabel. Ideally we'd like to have reldescs
 *		for their indexes, too.  We attempt to load this information from the
 *		shared relcache init file.  If that's missing or broken, just make
 *		phony entries for the catalogs themselves.
 *		RelationCacheInitializePhase3 will clean up as needed.
 */
void
RelationCacheInitializePhase2(void)
{
	MemoryContext oldcxt;

	/*
	 * relation mapper needs initialized too
	 */
	RelationMapInitializePhase2();

	/*
	 * In bootstrap mode, the shared catalogs aren't there yet anyway, so do
	 * nothing.
	 */
	if (IsBootstrapProcessingMode())
		return;

	/*
	 * switch to cache memory context
	 */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

	/*
	 * Try to load the shared relcache cache file.  If unsuccessful, bootstrap
	 * the cache with pre-made descriptors for the critical shared catalogs.
	 */
	if (!load_relcache_init_file(true))
	{
		formrdesc("pg_database", DatabaseRelation_Rowtype_Id, true,
				  Natts_pg_database, Desc_pg_database);
		formrdesc("pg_authid", AuthIdRelation_Rowtype_Id, true,
				  Natts_pg_authid, Desc_pg_authid);
		formrdesc("pg_auth_members", AuthMemRelation_Rowtype_Id, true,
				  Natts_pg_auth_members, Desc_pg_auth_members);
		formrdesc("pg_shseclabel", SharedSecLabelRelation_Rowtype_Id, true,
				  Natts_pg_shseclabel, Desc_pg_shseclabel);
		formrdesc("pg_subscription", SubscriptionRelation_Rowtype_Id, true,
				  Natts_pg_subscription, Desc_pg_subscription);

#define NUM_CRITICAL_SHARED_RELS	5	/* fix if you change list above */
	}

	MemoryContextSwitchTo(oldcxt);
}

/*
 *		RelationCacheInitializePhase3
 *
 *		This is called as soon as the catcache and transaction system
 *		are functional and we have determined MyDatabaseId.  At this point
 *		we can actually read data from the database's system catalogs.
 *		We first try to read pre-computed relcache entries from the local
 *		relcache init file.  If that's missing or broken, make phony entries
 *		for the minimum set of nailed-in-cache relations.  Then (unless
 *		bootstrapping) make sure we have entries for the critical system
 *		indexes.  Once we've done all this, we have enough infrastructure to
 *		open any system catalog or use any catcache.  The last step is to
 *		rewrite the cache files if needed.
 */
void
RelationCacheInitializePhase3(void)
{
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;
	MemoryContext oldcxt;
	bool		needNewCacheFile = !criticalSharedRelcachesBuilt;

	/*
	 * relation mapper needs initialized too
	 */
	RelationMapInitializePhase3();

	/*
	 * switch to cache memory context
	 */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

	/*
	 * Try to load the local relcache cache file.  If unsuccessful, bootstrap
	 * the cache with pre-made descriptors for the critical "nailed-in" system
	 * catalogs.
	 */
	if (IsBootstrapProcessingMode() ||
		!load_relcache_init_file(false))
	{
		needNewCacheFile = true;

		formrdesc("pg_class", RelationRelation_Rowtype_Id, false,
				  Natts_pg_class, Desc_pg_class);
		formrdesc("pg_attribute", AttributeRelation_Rowtype_Id, false,
				  Natts_pg_attribute, Desc_pg_attribute);
		formrdesc("pg_proc", ProcedureRelation_Rowtype_Id, false,
				  Natts_pg_proc, Desc_pg_proc);
		formrdesc("pg_type", TypeRelation_Rowtype_Id, false,
				  Natts_pg_type, Desc_pg_type);

#define NUM_CRITICAL_LOCAL_RELS 4	/* fix if you change list above */
	}

	MemoryContextSwitchTo(oldcxt);

	/* In bootstrap mode, the faked-up formrdesc info is all we'll have */
	if (IsBootstrapProcessingMode())
		return;

	/*
	 * If we didn't get the critical system indexes loaded into relcache, do
	 * so now.  These are critical because the catcache and/or opclass cache
	 * depend on them for fetches done during relcache load.  Thus, we have an
	 * infinite-recursion problem.  We can break the recursion by doing
	 * heapscans instead of indexscans at certain key spots. To avoid hobbling
	 * performance, we only want to do that until we have the critical indexes
	 * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
	 * decide whether to do heapscan or indexscan at the key spots, and we set
	 * it true after we've loaded the critical indexes.
	 *
	 * The critical indexes are marked as "nailed in cache", partly to make it
	 * easy for load_relcache_init_file to count them, but mainly because we
	 * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
	 * true.  (NOTE: perhaps it would be possible to reload them by
	 * temporarily setting criticalRelcachesBuilt to false again.  For now,
	 * though, we just nail 'em in.)
	 *
	 * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
	 * in the same way as the others, because the critical catalogs don't
	 * (currently) have any rules or triggers, and so these indexes can be
	 * rebuilt without inducing recursion.  However they are used during
	 * relcache load when a rel does have rules or triggers, so we choose to
	 * nail them for performance reasons.
	 */
	if (!criticalRelcachesBuilt)
	{
		load_critical_index(ClassOidIndexId,
							RelationRelationId);
		load_critical_index(AttributeRelidNumIndexId,
							AttributeRelationId);
		load_critical_index(IndexRelidIndexId,
							IndexRelationId);
		load_critical_index(OpclassOidIndexId,
							OperatorClassRelationId);
		load_critical_index(AccessMethodProcedureIndexId,
							AccessMethodProcedureRelationId);
		load_critical_index(RewriteRelRulenameIndexId,
							RewriteRelationId);
		load_critical_index(TriggerRelidNameIndexId,
							TriggerRelationId);

#define NUM_CRITICAL_LOCAL_INDEXES	7	/* fix if you change list above */

		criticalRelcachesBuilt = true;
	}

	/*
	 * Process critical shared indexes too.
	 *
	 * DatabaseNameIndexId isn't critical for relcache loading, but rather for
	 * initial lookup of MyDatabaseId, without which we'll never find any
	 * non-shared catalogs at all.  Autovacuum calls InitPostgres with a
	 * database OID, so it instead depends on DatabaseOidIndexId.  We also
	 * need to nail up some indexes on pg_authid and pg_auth_members for use
	 * during client authentication.  SharedSecLabelObjectIndexId isn't
	 * critical for the core system, but authentication hooks might be
	 * interested in it.
	 */
	if (!criticalSharedRelcachesBuilt)
	{
		load_critical_index(DatabaseNameIndexId,
							DatabaseRelationId);
		load_critical_index(DatabaseOidIndexId,
							DatabaseRelationId);
		load_critical_index(AuthIdRolnameIndexId,
							AuthIdRelationId);
		load_critical_index(AuthIdOidIndexId,
							AuthIdRelationId);
		load_critical_index(AuthMemMemRoleIndexId,
							AuthMemRelationId);
		load_critical_index(SharedSecLabelObjectIndexId,
							SharedSecLabelRelationId);

#define NUM_CRITICAL_SHARED_INDEXES 6	/* fix if you change list above */

		criticalSharedRelcachesBuilt = true;
	}

	/*
	 * Now, scan all the relcache entries and update anything that might be
	 * wrong in the results from formrdesc or the relcache cache file. If we
	 * faked up relcache entries using formrdesc, then read the real pg_class
	 * rows and replace the fake entries with them. Also, if any of the
	 * relcache entries have rules, triggers, or security policies, load that
	 * info the hard way since it isn't recorded in the cache file.
	 *
	 * Whenever we access the catalogs to read data, there is a possibility of
	 * a shared-inval cache flush causing relcache entries to be removed.
	 * Since hash_seq_search only guarantees to still work after the *current*
	 * entry is removed, it's unsafe to continue the hashtable scan afterward.
	 * We handle this by restarting the scan from scratch after each access.
	 * This is theoretically O(N^2), but the number of entries that actually
	 * need to be fixed is small enough that it doesn't matter.
	 */
	hash_seq_init(&status, RelationIdCache);

	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
	{
		Relation	relation = idhentry->reldesc;
		bool		restart = false;

		/*
		 * Make sure *this* entry doesn't get flushed while we work with it.
		 */
		RelationIncrementReferenceCount(relation);

		/*
		 * If it's a faked-up entry, read the real pg_class tuple.
		 */
		if (relation->rd_rel->relowner == InvalidOid)
		{
			HeapTuple	htup;
			Form_pg_class relp;

			htup = SearchSysCache1(RELOID,
								   ObjectIdGetDatum(RelationGetRelid(relation)));
			if (!HeapTupleIsValid(htup))
				ereport(FATAL,
						errcode(ERRCODE_UNDEFINED_OBJECT),
						errmsg_internal("cache lookup failed for relation %u",
										RelationGetRelid(relation)));
			relp = (Form_pg_class) GETSTRUCT(htup);

			/*
			 * Copy tuple to relation->rd_rel. (See notes in
			 * AllocateRelationDesc())
			 */
			memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);

			/* Update rd_options while we have the tuple */
			if (relation->rd_options)
				pfree(relation->rd_options);
			RelationParseRelOptions(relation, htup);

			/*
			 * Check the values in rd_att were set up correctly.  (We cannot
			 * just copy them over now: formrdesc must have set up the rd_att
			 * data correctly to start with, because it may already have been
			 * copied into one or more catcache entries.)
			 */
			Assert(relation->rd_att->tdtypeid == relp->reltype);
			Assert(relation->rd_att->tdtypmod == -1);

			ReleaseSysCache(htup);

			/* relowner had better be OK now, else we'll loop forever */
			if (relation->rd_rel->relowner == InvalidOid)
				elog(ERROR, "invalid relowner in pg_class entry for \"%s\"",
					 RelationGetRelationName(relation));

			restart = true;
		}

		/*
		 * Fix data that isn't saved in relcache cache file.
		 *
		 * relhasrules or relhastriggers could possibly be wrong or out of
		 * date.  If we don't actually find any rules or triggers, clear the
		 * local copy of the flag so that we don't get into an infinite loop
		 * here.  We don't make any attempt to fix the pg_class entry, though.
		 */
		if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
		{
			RelationBuildRuleLock(relation);
			if (relation->rd_rules == NULL)
				relation->rd_rel->relhasrules = false;
			restart = true;
		}
		if (relation->rd_rel->relhastriggers && relation->trigdesc == NULL)
		{
			RelationBuildTriggers(relation);
			if (relation->trigdesc == NULL)
				relation->rd_rel->relhastriggers = false;
			restart = true;
		}

		/*
		 * Re-load the row security policies if the relation has them, since
		 * they are not preserved in the cache.  Note that we can never NOT
		 * have a policy while relrowsecurity is true,
		 * RelationBuildRowSecurity will create a single default-deny policy
		 * if there is no policy defined in pg_policy.
		 */
		if (relation->rd_rel->relrowsecurity && relation->rd_rsdesc == NULL)
		{
			RelationBuildRowSecurity(relation);

			Assert(relation->rd_rsdesc != NULL);
			restart = true;
		}

		/* Reload tableam data if needed */
		if (relation->rd_tableam == NULL &&
			(RELKIND_HAS_TABLE_AM(relation->rd_rel->relkind) || relation->rd_rel->relkind == RELKIND_SEQUENCE))
		{
			RelationInitTableAccessMethod(relation);
			Assert(relation->rd_tableam != NULL);

			restart = true;
		}

		/* Release hold on the relation */
		RelationDecrementReferenceCount(relation);

		/* Now, restart the hashtable scan if needed */
		if (restart)
		{
			hash_seq_term(&status);
			hash_seq_init(&status, RelationIdCache);
		}
	}

	/*
	 * Lastly, write out new relcache cache files if needed.  We don't bother
	 * to distinguish cases where only one of the two needs an update.
	 */
	if (needNewCacheFile)
	{
		/*
		 * Force all the catcaches to finish initializing and thereby open the
		 * catalogs and indexes they use.  This will preload the relcache with
		 * entries for all the most important system catalogs and indexes, so
		 * that the init files will be most useful for future backends.
		 */
		InitCatalogCachePhase2();

		/* now write the files */
		write_relcache_init_file(true);
		write_relcache_init_file(false);
	}
}

/*
 * Load one critical system index into the relcache
 *
 * indexoid is the OID of the target index, heapoid is the OID of the catalog
 * it belongs to.
 */
static void
load_critical_index(Oid indexoid, Oid heapoid)
{
	Relation	ird;

	/*
	 * We must lock the underlying catalog before locking the index to avoid
	 * deadlock, since RelationBuildDesc might well need to read the catalog,
	 * and if anyone else is exclusive-locking this catalog and index they'll
	 * be doing it in that order.
	 */
	LockRelationOid(heapoid, AccessShareLock);
	LockRelationOid(indexoid, AccessShareLock);
	ird = RelationBuildDesc(indexoid, true);
	if (ird == NULL)
		ereport(PANIC,
				errcode(ERRCODE_DATA_CORRUPTED),
				errmsg_internal("could not open critical system index %u", indexoid));
	ird->rd_isnailed = true;
	ird->rd_refcnt = 1;
	UnlockRelationOid(indexoid, AccessShareLock);
	UnlockRelationOid(heapoid, AccessShareLock);

	(void) RelationGetIndexAttOptions(ird, false);
}

/*
 * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
 * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
 *
 * We need this kluge because we have to be able to access non-fixed-width
 * fields of pg_class and pg_index before we have the standard catalog caches
 * available.  We use predefined data that's set up in just the same way as
 * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
 * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
 * does it have a TupleConstr field.  But it's good enough for the purpose of
 * extracting fields.
 */
static TupleDesc
BuildHardcodedDescriptor(int natts, const FormData_pg_attribute *attrs)
{
	TupleDesc	result;
	MemoryContext oldcxt;
	int			i;

	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

	result = CreateTemplateTupleDesc(natts);
	result->tdtypeid = RECORDOID;	/* not right, but we don't care */
	result->tdtypmod = -1;

	for (i = 0; i < natts; i++)
	{
		memcpy(TupleDescAttr(result, i), &attrs[i], ATTRIBUTE_FIXED_PART_SIZE);

		populate_compact_attribute(result, i);
	}

	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
	TupleDescCompactAttr(result, 0)->attcacheoff = 0;

	/* Note: we don't bother to set up a TupleConstr entry */

	MemoryContextSwitchTo(oldcxt);

	return result;
}

static TupleDesc
GetPgClassDescriptor(void)
{
	static TupleDesc pgclassdesc = NULL;

	/* Already done? */
	if (pgclassdesc == NULL)
		pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
											   Desc_pg_class);

	return pgclassdesc;
}

static TupleDesc
GetPgIndexDescriptor(void)
{
	static TupleDesc pgindexdesc = NULL;

	/* Already done? */
	if (pgindexdesc == NULL)
		pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
											   Desc_pg_index);

	return pgindexdesc;
}

/*
 * Load any default attribute value definitions for the relation.
 *
 * ndef is the number of attributes that were marked atthasdef.
 *
 * Note: we don't make it a hard error to be missing some pg_attrdef records.
 * We can limp along as long as nothing needs to use the default value.  Code
 * that fails to find an expected AttrDefault record should throw an error.
 */
static void
AttrDefaultFetch(Relation relation, int ndef)
{
	AttrDefault *attrdef;
	Relation	adrel;
	SysScanDesc adscan;
	ScanKeyData skey;
	HeapTuple	htup;
	int			found = 0;

	/* Allocate array with room for as many entries as expected */
	attrdef = (AttrDefault *)
		MemoryContextAllocZero(CacheMemoryContext,
							   ndef * sizeof(AttrDefault));

	/* Search pg_attrdef for relevant entries */
	ScanKeyInit(&skey,
				Anum_pg_attrdef_adrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));

	adrel = table_open(AttrDefaultRelationId, AccessShareLock);
	adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
								NULL, 1, &skey);

	while (HeapTupleIsValid(htup = systable_getnext(adscan)))
	{
		Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
		Datum		val;
		bool		isnull;

		/* protect limited size of array */
		if (found >= ndef)
		{
			elog(WARNING, "unexpected pg_attrdef record found for attribute %d of relation \"%s\"",
				 adform->adnum, RelationGetRelationName(relation));
			break;
		}

		val = fastgetattr(htup,
						  Anum_pg_attrdef_adbin,
						  adrel->rd_att, &isnull);
		if (isnull)
			elog(WARNING, "null adbin for attribute %d of relation \"%s\"",
				 adform->adnum, RelationGetRelationName(relation));
		else
		{
			/* detoast and convert to cstring in caller's context */
			char	   *s = TextDatumGetCString(val);

			attrdef[found].adnum = adform->adnum;
			attrdef[found].adbin = MemoryContextStrdup(CacheMemoryContext, s);
			pfree(s);
			found++;
		}
	}

	systable_endscan(adscan);
	table_close(adrel, AccessShareLock);

	if (found != ndef)
		elog(WARNING, "%d pg_attrdef record(s) missing for relation \"%s\"",
			 ndef - found, RelationGetRelationName(relation));

	/*
	 * Sort the AttrDefault entries by adnum, for the convenience of
	 * equalTupleDescs().  (Usually, they already will be in order, but this
	 * might not be so if systable_getnext isn't using an index.)
	 */
	if (found > 1)
		qsort(attrdef, found, sizeof(AttrDefault), AttrDefaultCmp);

	/* Install array only after it's fully valid */
	relation->rd_att->constr->defval = attrdef;
	relation->rd_att->constr->num_defval = found;
}

/*
 * qsort comparator to sort AttrDefault entries by adnum
 */
static int
AttrDefaultCmp(const void *a, const void *b)
{
	const AttrDefault *ada = (const AttrDefault *) a;
	const AttrDefault *adb = (const AttrDefault *) b;

	return pg_cmp_s16(ada->adnum, adb->adnum);
}

/*
 * Load any check constraints for the relation, and update not-null validity
 * of invalid constraints.
 *
 * As with defaults, if we don't find the expected number of them, just warn
 * here.  The executor should throw an error if an INSERT/UPDATE is attempted.
 */
static void
CheckNNConstraintFetch(Relation relation)
{
	ConstrCheck *check;
	int			ncheck = relation->rd_rel->relchecks;
	Relation	conrel;
	SysScanDesc conscan;
	ScanKeyData skey[1];
	HeapTuple	htup;
	int			found = 0;

	/* Allocate array with room for as many entries as expected, if needed */
	if (ncheck > 0)
		check = (ConstrCheck *)
			MemoryContextAllocZero(CacheMemoryContext,
								   ncheck * sizeof(ConstrCheck));
	else
		check = NULL;

	/* Search pg_constraint for relevant entries */
	ScanKeyInit(&skey[0],
				Anum_pg_constraint_conrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));

	conrel = table_open(ConstraintRelationId, AccessShareLock);
	conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
								 NULL, 1, skey);

	while (HeapTupleIsValid(htup = systable_getnext(conscan)))
	{
		Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
		Datum		val;
		bool		isnull;

		/*
		 * If this is a not-null constraint, then only look at it if it's
		 * invalid, and if so, mark the TupleDesc entry as known invalid.
		 * Otherwise move on.  We'll mark any remaining columns that are still
		 * in UNKNOWN state as known valid later.  This allows us not to have
		 * to extract the attnum from this constraint tuple in the vast
		 * majority of cases.
		 */
		if (conform->contype == CONSTRAINT_NOTNULL)
		{
			if (!conform->convalidated)
			{
				AttrNumber	attnum;

				attnum = extractNotNullColumn(htup);
				Assert(relation->rd_att->compact_attrs[attnum - 1].attnullability ==
					   ATTNULLABLE_UNKNOWN);
				relation->rd_att->compact_attrs[attnum - 1].attnullability =
					ATTNULLABLE_INVALID;
			}

			continue;
		}

		/* For what follows, consider check constraints only */
		if (conform->contype != CONSTRAINT_CHECK)
			continue;

		/* protect limited size of array */
		if (found >= ncheck)
		{
			elog(WARNING, "unexpected pg_constraint record found for relation \"%s\"",
				 RelationGetRelationName(relation));
			break;
		}

		check[found].ccenforced = conform->conenforced;
		check[found].ccvalid = conform->convalidated;
		check[found].ccnoinherit = conform->connoinherit;
		check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
												  NameStr(conform->conname));

		/* Grab and test conbin is actually set */
		val = fastgetattr(htup,
						  Anum_pg_constraint_conbin,
						  conrel->rd_att, &isnull);
		if (isnull)
			elog(WARNING, "null conbin for relation \"%s\"",
				 RelationGetRelationName(relation));
		else
		{
			/* detoast and convert to cstring in caller's context */
			char	   *s = TextDatumGetCString(val);

			check[found].ccbin = MemoryContextStrdup(CacheMemoryContext, s);
			pfree(s);
			found++;
		}
	}

	systable_endscan(conscan);
	table_close(conrel, AccessShareLock);

	if (found != ncheck)
		elog(WARNING, "%d pg_constraint record(s) missing for relation \"%s\"",
			 ncheck - found, RelationGetRelationName(relation));

	/*
	 * Sort the records by name.  This ensures that CHECKs are applied in a
	 * deterministic order, and it also makes equalTupleDescs() faster.
	 */
	if (found > 1)
		qsort(check, found, sizeof(ConstrCheck), CheckConstraintCmp);

	/* Install array only after it's fully valid */
	relation->rd_att->constr->check = check;
	relation->rd_att->constr->num_check = found;
}

/*
 * qsort comparator to sort ConstrCheck entries by name
 */
static int
CheckConstraintCmp(const void *a, const void *b)
{
	const ConstrCheck *ca = (const ConstrCheck *) a;
	const ConstrCheck *cb = (const ConstrCheck *) b;

	return strcmp(ca->ccname, cb->ccname);
}

/*
 * RelationGetFKeyList -- get a list of foreign key info for the relation
 *
 * Returns a list of ForeignKeyCacheInfo structs, one per FK constraining
 * the given relation.  This data is a direct copy of relevant fields from
 * pg_constraint.  The list items are in no particular order.
 *
 * CAUTION: the returned list is part of the relcache's data, and could
 * vanish in a relcache entry reset.  Callers must inspect or copy it
 * before doing anything that might trigger a cache flush, such as
 * system catalog accesses.  copyObject() can be used if desired.
 * (We define it this way because current callers want to filter and
 * modify the list entries anyway, so copying would be a waste of time.)
 */
List *
RelationGetFKeyList(Relation relation)
{
	List	   *result;
	Relation	conrel;
	SysScanDesc conscan;
	ScanKeyData skey;
	HeapTuple	htup;
	List	   *oldlist;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the list. */
	if (relation->rd_fkeyvalid)
		return relation->rd_fkeylist;

	/*
	 * We build the list we intend to return (in the caller's context) while
	 * doing the scan.  After successfully completing the scan, we copy that
	 * list into the relcache entry.  This avoids cache-context memory leakage
	 * if we get some sort of error partway through.
	 */
	result = NIL;

	/* Prepare to scan pg_constraint for entries having conrelid = this rel. */
	ScanKeyInit(&skey,
				Anum_pg_constraint_conrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));

	conrel = table_open(ConstraintRelationId, AccessShareLock);
	conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
								 NULL, 1, &skey);

	while (HeapTupleIsValid(htup = systable_getnext(conscan)))
	{
		Form_pg_constraint constraint = (Form_pg_constraint) GETSTRUCT(htup);
		ForeignKeyCacheInfo *info;

		/* consider only foreign keys */
		if (constraint->contype != CONSTRAINT_FOREIGN)
			continue;

		info = makeNode(ForeignKeyCacheInfo);
		info->conoid = constraint->oid;
		info->conrelid = constraint->conrelid;
		info->confrelid = constraint->confrelid;
		info->conenforced = constraint->conenforced;

		DeconstructFkConstraintRow(htup, &info->nkeys,
								   info->conkey,
								   info->confkey,
								   info->conpfeqop,
								   NULL, NULL, NULL, NULL);

		/* Add FK's node to the result list */
		result = lappend(result, info);
	}

	systable_endscan(conscan);
	table_close(conrel, AccessShareLock);

	/* Now save a copy of the completed list in the relcache entry. */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
	oldlist = relation->rd_fkeylist;
	relation->rd_fkeylist = copyObject(result);
	relation->rd_fkeyvalid = true;
	MemoryContextSwitchTo(oldcxt);

	/* Don't leak the old list, if there is one */
	list_free_deep(oldlist);

	return result;
}

/*
 * RelationGetIndexList -- get a list of OIDs of indexes on this relation
 *
 * The index list is created only if someone requests it.  We scan pg_index
 * to find relevant indexes, and add the list to the relcache entry so that
 * we won't have to compute it again.  Note that shared cache inval of a
 * relcache entry will delete the old list and set rd_indexvalid to false,
 * so that we must recompute the index list on next request.  This handles
 * creation or deletion of an index.
 *
 * Indexes that are marked not indislive are omitted from the returned list.
 * Such indexes are expected to be dropped momentarily, and should not be
 * touched at all by any caller of this function.
 *
 * The returned list is guaranteed to be sorted in order by OID.  This is
 * needed by the executor, since for index types that we obtain exclusive
 * locks on when updating the index, all backends must lock the indexes in
 * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
 * consistent ordering would do, but ordering by OID is easy.
 *
 * Since shared cache inval causes the relcache's copy of the list to go away,
 * we return a copy of the list palloc'd in the caller's context.  The caller
 * may list_free() the returned list after scanning it. This is necessary
 * since the caller will typically be doing syscache lookups on the relevant
 * indexes, and syscache lookup could cause SI messages to be processed!
 *
 * In exactly the same way, we update rd_pkindex, which is the OID of the
 * relation's primary key index if any, else InvalidOid; and rd_replidindex,
 * which is the pg_class OID of an index to be used as the relation's
 * replication identity index, or InvalidOid if there is no such index.
 */
List *
RelationGetIndexList(Relation relation)
{
	Relation	indrel;
	SysScanDesc indscan;
	ScanKeyData skey;
	HeapTuple	htup;
	List	   *result;
	List	   *oldlist;
	char		replident = relation->rd_rel->relreplident;
	Oid			pkeyIndex = InvalidOid;
	Oid			candidateIndex = InvalidOid;
	bool		pkdeferrable = false;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the list. */
	if (relation->rd_indexvalid)
		return list_copy(relation->rd_indexlist);

	/*
	 * We build the list we intend to return (in the caller's context) while
	 * doing the scan.  After successfully completing the scan, we copy that
	 * list into the relcache entry.  This avoids cache-context memory leakage
	 * if we get some sort of error partway through.
	 */
	result = NIL;

	/* Prepare to scan pg_index for entries having indrelid = this rel. */
	ScanKeyInit(&skey,
				Anum_pg_index_indrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));

	indrel = table_open(IndexRelationId, AccessShareLock);
	indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
								 NULL, 1, &skey);

	while (HeapTupleIsValid(htup = systable_getnext(indscan)))
	{
		Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);

		/*
		 * Ignore any indexes that are currently being dropped.  This will
		 * prevent them from being searched, inserted into, or considered in
		 * HOT-safety decisions.  It's unsafe to touch such an index at all
		 * since its catalog entries could disappear at any instant.
		 */
		if (!index->indislive)
			continue;

		/* add index's OID to result list */
		result = lappend_oid(result, index->indexrelid);

		/*
		 * Non-unique or predicate indexes aren't interesting for either oid
		 * indexes or replication identity indexes, so don't check them.
		 * Deferred ones are not useful for replication identity either; but
		 * we do include them if they are PKs.
		 */
		if (!index->indisunique ||
			!heap_attisnull(htup, Anum_pg_index_indpred, NULL))
			continue;

		/*
		 * Remember primary key index, if any.  For regular tables we do this
		 * only if the index is valid; but for partitioned tables, then we do
		 * it even if it's invalid.
		 *
		 * The reason for returning invalid primary keys for partitioned
		 * tables is that we need it to prevent drop of not-null constraints
		 * that may underlie such a primary key, which is only a problem for
		 * partitioned tables.
		 */
		if (index->indisprimary &&
			(index->indisvalid ||
			 relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE))
		{
			pkeyIndex = index->indexrelid;
			pkdeferrable = !index->indimmediate;
		}

		if (!index->indimmediate)
			continue;

		if (!index->indisvalid)
			continue;

		/* remember explicitly chosen replica index */
		if (index->indisreplident)
			candidateIndex = index->indexrelid;
	}

	systable_endscan(indscan);

	table_close(indrel, AccessShareLock);

	/* Sort the result list into OID order, per API spec. */
	list_sort(result, list_oid_cmp);

	/* Now save a copy of the completed list in the relcache entry. */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
	oldlist = relation->rd_indexlist;
	relation->rd_indexlist = list_copy(result);
	relation->rd_pkindex = pkeyIndex;
	relation->rd_ispkdeferrable = pkdeferrable;
	if (replident == REPLICA_IDENTITY_DEFAULT && OidIsValid(pkeyIndex) && !pkdeferrable)
		relation->rd_replidindex = pkeyIndex;
	else if (replident == REPLICA_IDENTITY_INDEX && OidIsValid(candidateIndex))
		relation->rd_replidindex = candidateIndex;
	else
		relation->rd_replidindex = InvalidOid;
	relation->rd_indexvalid = true;
	MemoryContextSwitchTo(oldcxt);

	/* Don't leak the old list, if there is one */
	list_free(oldlist);

	return result;
}

/*
 * RelationGetStatExtList
 *		get a list of OIDs of statistics objects on this relation
 *
 * The statistics list is created only if someone requests it, in a way
 * similar to RelationGetIndexList().  We scan pg_statistic_ext to find
 * relevant statistics, and add the list to the relcache entry so that we
 * won't have to compute it again.  Note that shared cache inval of a
 * relcache entry will delete the old list and set rd_statvalid to 0,
 * so that we must recompute the statistics list on next request.  This
 * handles creation or deletion of a statistics object.
 *
 * The returned list is guaranteed to be sorted in order by OID, although
 * this is not currently needed.
 *
 * Since shared cache inval causes the relcache's copy of the list to go away,
 * we return a copy of the list palloc'd in the caller's context.  The caller
 * may list_free() the returned list after scanning it. This is necessary
 * since the caller will typically be doing syscache lookups on the relevant
 * statistics, and syscache lookup could cause SI messages to be processed!
 */
List *
RelationGetStatExtList(Relation relation)
{
	Relation	indrel;
	SysScanDesc indscan;
	ScanKeyData skey;
	HeapTuple	htup;
	List	   *result;
	List	   *oldlist;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the list. */
	if (relation->rd_statvalid != 0)
		return list_copy(relation->rd_statlist);

	/*
	 * We build the list we intend to return (in the caller's context) while
	 * doing the scan.  After successfully completing the scan, we copy that
	 * list into the relcache entry.  This avoids cache-context memory leakage
	 * if we get some sort of error partway through.
	 */
	result = NIL;

	/*
	 * Prepare to scan pg_statistic_ext for entries having stxrelid = this
	 * rel.
	 */
	ScanKeyInit(&skey,
				Anum_pg_statistic_ext_stxrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));

	indrel = table_open(StatisticExtRelationId, AccessShareLock);
	indscan = systable_beginscan(indrel, StatisticExtRelidIndexId, true,
								 NULL, 1, &skey);

	while (HeapTupleIsValid(htup = systable_getnext(indscan)))
	{
		Oid			oid = ((Form_pg_statistic_ext) GETSTRUCT(htup))->oid;

		result = lappend_oid(result, oid);
	}

	systable_endscan(indscan);

	table_close(indrel, AccessShareLock);

	/* Sort the result list into OID order, per API spec. */
	list_sort(result, list_oid_cmp);

	/* Now save a copy of the completed list in the relcache entry. */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
	oldlist = relation->rd_statlist;
	relation->rd_statlist = list_copy(result);

	relation->rd_statvalid = true;
	MemoryContextSwitchTo(oldcxt);

	/* Don't leak the old list, if there is one */
	list_free(oldlist);

	return result;
}

/*
 * RelationGetPrimaryKeyIndex -- get OID of the relation's primary key index
 *
 * Returns InvalidOid if there is no such index, or if the primary key is
 * DEFERRABLE and the caller isn't OK with that.
 */
Oid
RelationGetPrimaryKeyIndex(Relation relation, bool deferrable_ok)
{
	List	   *ilist;

	if (!relation->rd_indexvalid)
	{
		/* RelationGetIndexList does the heavy lifting. */
		ilist = RelationGetIndexList(relation);
		list_free(ilist);
		Assert(relation->rd_indexvalid);
	}

	if (deferrable_ok)
		return relation->rd_pkindex;
	else if (relation->rd_ispkdeferrable)
		return InvalidOid;
	return relation->rd_pkindex;
}

/*
 * RelationGetReplicaIndex -- get OID of the relation's replica identity index
 *
 * Returns InvalidOid if there is no such index.
 */
Oid
RelationGetReplicaIndex(Relation relation)
{
	List	   *ilist;

	if (!relation->rd_indexvalid)
	{
		/* RelationGetIndexList does the heavy lifting. */
		ilist = RelationGetIndexList(relation);
		list_free(ilist);
		Assert(relation->rd_indexvalid);
	}

	return relation->rd_replidindex;
}

/*
 * RelationGetIndexExpressions -- get the index expressions for an index
 *
 * We cache the result of transforming pg_index.indexprs into a node tree.
 * If the rel is not an index or has no expressional columns, we return NIL.
 * Otherwise, the returned tree is copied into the caller's memory context.
 * (We don't want to return a pointer to the relcache copy, since it could
 * disappear due to relcache invalidation.)
 */
List *
RelationGetIndexExpressions(Relation relation)
{
	List	   *result;
	Datum		exprsDatum;
	bool		isnull;
	char	   *exprsString;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indexprs)
		return copyObject(relation->rd_indexprs);

	/* Quick exit if there is nothing to do. */
	if (relation->rd_indextuple == NULL ||
		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs, NULL))
		return NIL;

	/*
	 * We build the tree we intend to return in the caller's context. After
	 * successfully completing the work, we copy it into the relcache entry.
	 * This avoids problems if we get some sort of error partway through.
	 */
	exprsDatum = heap_getattr(relation->rd_indextuple,
							  Anum_pg_index_indexprs,
							  GetPgIndexDescriptor(),
							  &isnull);
	Assert(!isnull);
	exprsString = TextDatumGetCString(exprsDatum);
	result = (List *) stringToNode(exprsString);
	pfree(exprsString);

	/*
	 * Run the expressions through eval_const_expressions. This is not just an
	 * optimization, but is necessary, because the planner will be comparing
	 * them to similarly-processed qual clauses, and may fail to detect valid
	 * matches without this.  We must not use canonicalize_qual, however,
	 * since these aren't qual expressions.
	 */
	result = (List *) eval_const_expressions(NULL, (Node *) result);

	/* May as well fix opfuncids too */
	fix_opfuncids((Node *) result);

	/* Now save a copy of the completed tree in the relcache entry. */
	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
	relation->rd_indexprs = copyObject(result);
	MemoryContextSwitchTo(oldcxt);

	return result;
}

/*
 * RelationGetDummyIndexExpressions -- get dummy expressions for an index
 *
 * Return a list of dummy expressions (just Const nodes) with the same
 * types/typmods/collations as the index's real expressions.  This is
 * useful in situations where we don't want to run any user-defined code.
 */
List *
RelationGetDummyIndexExpressions(Relation relation)
{
	List	   *result;
	Datum		exprsDatum;
	bool		isnull;
	char	   *exprsString;
	List	   *rawExprs;
	ListCell   *lc;

	/* Quick exit if there is nothing to do. */
	if (relation->rd_indextuple == NULL ||
		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs, NULL))
		return NIL;

	/* Extract raw node tree(s) from index tuple. */
	exprsDatum = heap_getattr(relation->rd_indextuple,
							  Anum_pg_index_indexprs,
							  GetPgIndexDescriptor(),
							  &isnull);
	Assert(!isnull);
	exprsString = TextDatumGetCString(exprsDatum);
	rawExprs = (List *) stringToNode(exprsString);
	pfree(exprsString);

	/* Construct null Consts; the typlen and typbyval are arbitrary. */
	result = NIL;
	foreach(lc, rawExprs)
	{
		Node	   *rawExpr = (Node *) lfirst(lc);

		result = lappend(result,
						 makeConst(exprType(rawExpr),
								   exprTypmod(rawExpr),
								   exprCollation(rawExpr),
								   1,
								   (Datum) 0,
								   true,
								   true));
	}

	return result;
}

/*
 * RelationGetIndexPredicate -- get the index predicate for an index
 *
 * We cache the result of transforming pg_index.indpred into an implicit-AND
 * node tree (suitable for use in planning).
 * If the rel is not an index or has no predicate, we return NIL.
 * Otherwise, the returned tree is copied into the caller's memory context.
 * (We don't want to return a pointer to the relcache copy, since it could
 * disappear due to relcache invalidation.)
 */
List *
RelationGetIndexPredicate(Relation relation)
{
	List	   *result;
	Datum		predDatum;
	bool		isnull;
	char	   *predString;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indpred)
		return copyObject(relation->rd_indpred);

	/* Quick exit if there is nothing to do. */
	if (relation->rd_indextuple == NULL ||
		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred, NULL))
		return NIL;

	/*
	 * We build the tree we intend to return in the caller's context. After
	 * successfully completing the work, we copy it into the relcache entry.
	 * This avoids problems if we get some sort of error partway through.
	 */
	predDatum = heap_getattr(relation->rd_indextuple,
							 Anum_pg_index_indpred,
							 GetPgIndexDescriptor(),
							 &isnull);
	Assert(!isnull);
	predString = TextDatumGetCString(predDatum);
	result = (List *) stringToNode(predString);
	pfree(predString);

	/*
	 * Run the expression through const-simplification and canonicalization.
	 * This is not just an optimization, but is necessary, because the planner
	 * will be comparing it to similarly-processed qual clauses, and may fail
	 * to detect valid matches without this.  This must match the processing
	 * done to qual clauses in preprocess_expression()!  (We can skip the
	 * stuff involving subqueries, however, since we don't allow any in index
	 * predicates.)
	 */
	result = (List *) eval_const_expressions(NULL, (Node *) result);

	result = (List *) canonicalize_qual((Expr *) result, false);

	/* Also convert to implicit-AND format */
	result = make_ands_implicit((Expr *) result);

	/* May as well fix opfuncids too */
	fix_opfuncids((Node *) result);

	/* Now save a copy of the completed tree in the relcache entry. */
	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
	relation->rd_indpred = copyObject(result);
	MemoryContextSwitchTo(oldcxt);

	return result;
}

/*
 * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
 *
 * The result has a bit set for each attribute used anywhere in the index
 * definitions of all the indexes on this relation.  (This includes not only
 * simple index keys, but attributes used in expressions and partial-index
 * predicates.)
 *
 * Depending on attrKind, a bitmap covering attnums for certain columns is
 * returned:
 *	INDEX_ATTR_BITMAP_KEY			Columns in non-partial unique indexes not
 *									in expressions (i.e., usable for FKs)
 *	INDEX_ATTR_BITMAP_PRIMARY_KEY	Columns in the table's primary key
 *									(beware: even if PK is deferrable!)
 *	INDEX_ATTR_BITMAP_IDENTITY_KEY	Columns in the table's replica identity
 *									index (empty if FULL)
 *	INDEX_ATTR_BITMAP_HOT_BLOCKING	Columns that block updates from being HOT
 *	INDEX_ATTR_BITMAP_SUMMARIZED	Columns included in summarizing indexes
 *
 * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
 * we can include system attributes (e.g., OID) in the bitmap representation.
 *
 * Deferred indexes are considered for the primary key, but not for replica
 * identity.
 *
 * Caller had better hold at least RowExclusiveLock on the target relation
 * to ensure it is safe (deadlock-free) for us to take locks on the relation's
 * indexes.  Note that since the introduction of CREATE INDEX CONCURRENTLY,
 * that lock level doesn't guarantee a stable set of indexes, so we have to
 * be prepared to retry here in case of a change in the set of indexes.
 *
 * The returned result is palloc'd in the caller's memory context and should
 * be bms_free'd when not needed anymore.
 */
Bitmapset *
RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
{
	Bitmapset  *uindexattrs;	/* columns in unique indexes */
	Bitmapset  *pkindexattrs;	/* columns in the primary index */
	Bitmapset  *idindexattrs;	/* columns in the replica identity */
	Bitmapset  *hotblockingattrs;	/* columns with HOT blocking indexes */
	Bitmapset  *summarizedattrs;	/* columns with summarizing indexes */
	List	   *indexoidlist;
	List	   *newindexoidlist;
	Oid			relpkindex;
	Oid			relreplindex;
	ListCell   *l;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_attrsvalid)
	{
		switch (attrKind)
		{
			case INDEX_ATTR_BITMAP_KEY:
				return bms_copy(relation->rd_keyattr);
			case INDEX_ATTR_BITMAP_PRIMARY_KEY:
				return bms_copy(relation->rd_pkattr);
			case INDEX_ATTR_BITMAP_IDENTITY_KEY:
				return bms_copy(relation->rd_idattr);
			case INDEX_ATTR_BITMAP_HOT_BLOCKING:
				return bms_copy(relation->rd_hotblockingattr);
			case INDEX_ATTR_BITMAP_SUMMARIZED:
				return bms_copy(relation->rd_summarizedattr);
			default:
				elog(ERROR, "unknown attrKind %u", attrKind);
		}
	}

	/* Fast path if definitely no indexes */
	if (!RelationGetForm(relation)->relhasindex)
		return NULL;

	/*
	 * Get cached list of index OIDs. If we have to start over, we do so here.
	 */
restart:
	indexoidlist = RelationGetIndexList(relation);

	/* Fall out if no indexes (but relhasindex was set) */
	if (indexoidlist == NIL)
		return NULL;

	/*
	 * Copy the rd_pkindex and rd_replidindex values computed by
	 * RelationGetIndexList before proceeding.  This is needed because a
	 * relcache flush could occur inside index_open below, resetting the
	 * fields managed by RelationGetIndexList.  We need to do the work with
	 * stable values of these fields.
	 */
	relpkindex = relation->rd_pkindex;
	relreplindex = relation->rd_replidindex;

	/*
	 * For each index, add referenced attributes to indexattrs.
	 *
	 * Note: we consider all indexes returned by RelationGetIndexList, even if
	 * they are not indisready or indisvalid.  This is important because an
	 * index for which CREATE INDEX CONCURRENTLY has just started must be
	 * included in HOT-safety decisions (see README.HOT).  If a DROP INDEX
	 * CONCURRENTLY is far enough along that we should ignore the index, it
	 * won't be returned at all by RelationGetIndexList.
	 */
	uindexattrs = NULL;
	pkindexattrs = NULL;
	idindexattrs = NULL;
	hotblockingattrs = NULL;
	summarizedattrs = NULL;
	foreach(l, indexoidlist)
	{
		Oid			indexOid = lfirst_oid(l);
		Relation	indexDesc;
		Datum		datum;
		bool		isnull;
		Node	   *indexExpressions;
		Node	   *indexPredicate;
		int			i;
		bool		isKey;		/* candidate key */
		bool		isPK;		/* primary key */
		bool		isIDKey;	/* replica identity index */
		Bitmapset **attrs;

		indexDesc = index_open(indexOid, AccessShareLock);

		/*
		 * Extract index expressions and index predicate.  Note: Don't use
		 * RelationGetIndexExpressions()/RelationGetIndexPredicate(), because
		 * those might run constant expressions evaluation, which needs a
		 * snapshot, which we might not have here.  (Also, it's probably more
		 * sound to collect the bitmaps before any transformations that might
		 * eliminate columns, but the practical impact of this is limited.)
		 */

		datum = heap_getattr(indexDesc->rd_indextuple, Anum_pg_index_indexprs,
							 GetPgIndexDescriptor(), &isnull);
		if (!isnull)
			indexExpressions = stringToNode(TextDatumGetCString(datum));
		else
			indexExpressions = NULL;

		datum = heap_getattr(indexDesc->rd_indextuple, Anum_pg_index_indpred,
							 GetPgIndexDescriptor(), &isnull);
		if (!isnull)
			indexPredicate = stringToNode(TextDatumGetCString(datum));
		else
			indexPredicate = NULL;

		/* Can this index be referenced by a foreign key? */
		isKey = indexDesc->rd_index->indisunique &&
			indexExpressions == NULL &&
			indexPredicate == NULL;

		/* Is this a primary key? */
		isPK = (indexOid == relpkindex);

		/* Is this index the configured (or default) replica identity? */
		isIDKey = (indexOid == relreplindex);

		/*
		 * If the index is summarizing, it doesn't block HOT updates, but we
		 * may still need to update it (if the attributes were modified). So
		 * decide which bitmap we'll update in the following loop.
		 */
		if (indexDesc->rd_indam->amsummarizing)
			attrs = &summarizedattrs;
		else
			attrs = &hotblockingattrs;

		/* Collect simple attribute references */
		for (i = 0; i < indexDesc->rd_index->indnatts; i++)
		{
			int			attrnum = indexDesc->rd_index->indkey.values[i];

			/*
			 * Since we have covering indexes with non-key columns, we must
			 * handle them accurately here. non-key columns must be added into
			 * hotblockingattrs or summarizedattrs, since they are in index,
			 * and update shouldn't miss them.
			 *
			 * Summarizing indexes do not block HOT, but do need to be updated
			 * when the column value changes, thus require a separate
			 * attribute bitmapset.
			 *
			 * Obviously, non-key columns couldn't be referenced by foreign
			 * key or identity key. Hence we do not include them into
			 * uindexattrs, pkindexattrs and idindexattrs bitmaps.
			 */
			if (attrnum != 0)
			{
				*attrs = bms_add_member(*attrs,
										attrnum - FirstLowInvalidHeapAttributeNumber);

				if (isKey && i < indexDesc->rd_index->indnkeyatts)
					uindexattrs = bms_add_member(uindexattrs,
												 attrnum - FirstLowInvalidHeapAttributeNumber);

				if (isPK && i < indexDesc->rd_index->indnkeyatts)
					pkindexattrs = bms_add_member(pkindexattrs,
												  attrnum - FirstLowInvalidHeapAttributeNumber);

				if (isIDKey && i < indexDesc->rd_index->indnkeyatts)
					idindexattrs = bms_add_member(idindexattrs,
												  attrnum - FirstLowInvalidHeapAttributeNumber);
			}
		}

		/* Collect all attributes used in expressions, too */
		pull_varattnos(indexExpressions, 1, attrs);

		/* Collect all attributes in the index predicate, too */
		pull_varattnos(indexPredicate, 1, attrs);

		index_close(indexDesc, AccessShareLock);
	}

	/*
	 * During one of the index_opens in the above loop, we might have received
	 * a relcache flush event on this relcache entry, which might have been
	 * signaling a change in the rel's index list.  If so, we'd better start
	 * over to ensure we deliver up-to-date attribute bitmaps.
	 */
	newindexoidlist = RelationGetIndexList(relation);
	if (equal(indexoidlist, newindexoidlist) &&
		relpkindex == relation->rd_pkindex &&
		relreplindex == relation->rd_replidindex)
	{
		/* Still the same index set, so proceed */
		list_free(newindexoidlist);
		list_free(indexoidlist);
	}
	else
	{
		/* Gotta do it over ... might as well not leak memory */
		list_free(newindexoidlist);
		list_free(indexoidlist);
		bms_free(uindexattrs);
		bms_free(pkindexattrs);
		bms_free(idindexattrs);
		bms_free(hotblockingattrs);
		bms_free(summarizedattrs);

		goto restart;
	}

	/* Don't leak the old values of these bitmaps, if any */
	relation->rd_attrsvalid = false;
	bms_free(relation->rd_keyattr);
	relation->rd_keyattr = NULL;
	bms_free(relation->rd_pkattr);
	relation->rd_pkattr = NULL;
	bms_free(relation->rd_idattr);
	relation->rd_idattr = NULL;
	bms_free(relation->rd_hotblockingattr);
	relation->rd_hotblockingattr = NULL;
	bms_free(relation->rd_summarizedattr);
	relation->rd_summarizedattr = NULL;

	/*
	 * Now save copies of the bitmaps in the relcache entry.  We intentionally
	 * set rd_attrsvalid last, because that's the one that signals validity of
	 * the values; if we run out of memory before making that copy, we won't
	 * leave the relcache entry looking like the other ones are valid but
	 * empty.
	 */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_keyattr = bms_copy(uindexattrs);
	relation->rd_pkattr = bms_copy(pkindexattrs);
	relation->rd_idattr = bms_copy(idindexattrs);
	relation->rd_hotblockingattr = bms_copy(hotblockingattrs);
	relation->rd_summarizedattr = bms_copy(summarizedattrs);
	relation->rd_attrsvalid = true;
	MemoryContextSwitchTo(oldcxt);

	/* We return our original working copy for caller to play with */
	switch (attrKind)
	{
		case INDEX_ATTR_BITMAP_KEY:
			return uindexattrs;
		case INDEX_ATTR_BITMAP_PRIMARY_KEY:
			return pkindexattrs;
		case INDEX_ATTR_BITMAP_IDENTITY_KEY:
			return idindexattrs;
		case INDEX_ATTR_BITMAP_HOT_BLOCKING:
			return hotblockingattrs;
		case INDEX_ATTR_BITMAP_SUMMARIZED:
			return summarizedattrs;
		default:
			elog(ERROR, "unknown attrKind %u", attrKind);
			return NULL;
	}
}

/*
 * RelationGetIdentityKeyBitmap -- get a bitmap of replica identity attribute
 * numbers
 *
 * A bitmap of index attribute numbers for the configured replica identity
 * index is returned.
 *
 * See also comments of RelationGetIndexAttrBitmap().
 *
 * This is a special purpose function used during logical replication. Here,
 * unlike RelationGetIndexAttrBitmap(), we don't acquire a lock on the required
 * index as we build the cache entry using a historic snapshot and all the
 * later changes are absorbed while decoding WAL. Due to this reason, we don't
 * need to retry here in case of a change in the set of indexes.
 */
Bitmapset *
RelationGetIdentityKeyBitmap(Relation relation)
{
	Bitmapset  *idindexattrs = NULL;	/* columns in the replica identity */
	Relation	indexDesc;
	int			i;
	Oid			replidindex;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result */
	if (relation->rd_idattr != NULL)
		return bms_copy(relation->rd_idattr);

	/* Fast path if definitely no indexes */
	if (!RelationGetForm(relation)->relhasindex)
		return NULL;

	/* Historic snapshot must be set. */
	Assert(HistoricSnapshotActive());

	replidindex = RelationGetReplicaIndex(relation);

	/* Fall out if there is no replica identity index */
	if (!OidIsValid(replidindex))
		return NULL;

	/* Look up the description for the replica identity index */
	indexDesc = RelationIdGetRelation(replidindex);

	if (!RelationIsValid(indexDesc))
		elog(ERROR, "could not open relation with OID %u",
			 relation->rd_replidindex);

	/* Add referenced attributes to idindexattrs */
	for (i = 0; i < indexDesc->rd_index->indnatts; i++)
	{
		int			attrnum = indexDesc->rd_index->indkey.values[i];

		/*
		 * We don't include non-key columns into idindexattrs bitmaps. See
		 * RelationGetIndexAttrBitmap.
		 */
		if (attrnum != 0)
		{
			if (i < indexDesc->rd_index->indnkeyatts)
				idindexattrs = bms_add_member(idindexattrs,
											  attrnum - FirstLowInvalidHeapAttributeNumber);
		}
	}

	RelationClose(indexDesc);

	/* Don't leak the old values of these bitmaps, if any */
	bms_free(relation->rd_idattr);
	relation->rd_idattr = NULL;

	/* Now save copy of the bitmap in the relcache entry */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_idattr = bms_copy(idindexattrs);
	MemoryContextSwitchTo(oldcxt);

	/* We return our original working copy for caller to play with */
	return idindexattrs;
}

/*
 * RelationGetExclusionInfo -- get info about index's exclusion constraint
 *
 * This should be called only for an index that is known to have an associated
 * exclusion constraint or primary key/unique constraint using WITHOUT
 * OVERLAPS.

 * It returns arrays (palloc'd in caller's context) of the exclusion operator
 * OIDs, their underlying functions' OIDs, and their strategy numbers in the
 * index's opclasses.  We cache all this information since it requires a fair
 * amount of work to get.
 */
void
RelationGetExclusionInfo(Relation indexRelation,
						 Oid **operators,
						 Oid **procs,
						 uint16 **strategies)
{
	int			indnkeyatts;
	Oid		   *ops;
	Oid		   *funcs;
	uint16	   *strats;
	Relation	conrel;
	SysScanDesc conscan;
	ScanKeyData skey[1];
	HeapTuple	htup;
	bool		found;
	MemoryContext oldcxt;
	int			i;

	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(indexRelation);

	/* Allocate result space in caller context */
	*operators = ops = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
	*procs = funcs = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
	*strategies = strats = (uint16 *) palloc(sizeof(uint16) * indnkeyatts);

	/* Quick exit if we have the data cached already */
	if (indexRelation->rd_exclstrats != NULL)
	{
		memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * indnkeyatts);
		memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * indnkeyatts);
		memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * indnkeyatts);
		return;
	}

	/*
	 * Search pg_constraint for the constraint associated with the index. To
	 * make this not too painfully slow, we use the index on conrelid; that
	 * will hold the parent relation's OID not the index's own OID.
	 *
	 * Note: if we wanted to rely on the constraint name matching the index's
	 * name, we could just do a direct lookup using pg_constraint's unique
	 * index.  For the moment it doesn't seem worth requiring that.
	 */
	ScanKeyInit(&skey[0],
				Anum_pg_constraint_conrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(indexRelation->rd_index->indrelid));

	conrel = table_open(ConstraintRelationId, AccessShareLock);
	conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
								 NULL, 1, skey);
	found = false;

	while (HeapTupleIsValid(htup = systable_getnext(conscan)))
	{
		Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
		Datum		val;
		bool		isnull;
		ArrayType  *arr;
		int			nelem;

		/* We want the exclusion constraint owning the index */
		if ((conform->contype != CONSTRAINT_EXCLUSION &&
			 !(conform->conperiod && (conform->contype == CONSTRAINT_PRIMARY
									  || conform->contype == CONSTRAINT_UNIQUE))) ||
			conform->conindid != RelationGetRelid(indexRelation))
			continue;

		/* There should be only one */
		if (found)
			elog(ERROR, "unexpected exclusion constraint record found for rel %s",
				 RelationGetRelationName(indexRelation));
		found = true;

		/* Extract the operator OIDS from conexclop */
		val = fastgetattr(htup,
						  Anum_pg_constraint_conexclop,
						  conrel->rd_att, &isnull);
		if (isnull)
			elog(ERROR, "null conexclop for rel %s",
				 RelationGetRelationName(indexRelation));

		arr = DatumGetArrayTypeP(val);	/* ensure not toasted */
		nelem = ARR_DIMS(arr)[0];
		if (ARR_NDIM(arr) != 1 ||
			nelem != indnkeyatts ||
			ARR_HASNULL(arr) ||
			ARR_ELEMTYPE(arr) != OIDOID)
			elog(ERROR, "conexclop is not a 1-D Oid array");

		memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * indnkeyatts);
	}

	systable_endscan(conscan);
	table_close(conrel, AccessShareLock);

	if (!found)
		elog(ERROR, "exclusion constraint record missing for rel %s",
			 RelationGetRelationName(indexRelation));

	/* We need the func OIDs and strategy numbers too */
	for (i = 0; i < indnkeyatts; i++)
	{
		funcs[i] = get_opcode(ops[i]);
		strats[i] = get_op_opfamily_strategy(ops[i],
											 indexRelation->rd_opfamily[i]);
		/* shouldn't fail, since it was checked at index creation */
		if (strats[i] == InvalidStrategy)
			elog(ERROR, "could not find strategy for operator %u in family %u",
				 ops[i], indexRelation->rd_opfamily[i]);
	}

	/* Save a copy of the results in the relcache entry. */
	oldcxt = MemoryContextSwitchTo(indexRelation->rd_indexcxt);
	indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
	indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
	indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * indnkeyatts);
	memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * indnkeyatts);
	memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * indnkeyatts);
	memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * indnkeyatts);
	MemoryContextSwitchTo(oldcxt);
}

/*
 * Get the publication information for the given relation.
 *
 * Traverse all the publications which the relation is in to get the
 * publication actions and validate:
 * 1. The row filter expressions for such publications if any. We consider the
 *    row filter expression as invalid if it references any column which is not
 *    part of REPLICA IDENTITY.
 * 2. The column list for such publication if any. We consider the column list
 * 	  invalid if REPLICA IDENTITY contains any column that is not part of it.
 * 3. The generated columns of the relation for such publications. We consider
 *    any reference of an unpublished generated column in REPLICA IDENTITY as
 *    invalid.
 *
 * To avoid fetching the publication information repeatedly, we cache the
 * publication actions, row filter validation information, column list
 * validation information, and generated column validation information.
 */
void
RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
{
	List	   *puboids;
	ListCell   *lc;
	MemoryContext oldcxt;
	Oid			schemaid;
	List	   *ancestors = NIL;
	Oid			relid = RelationGetRelid(relation);

	/*
	 * If not publishable, it publishes no actions.  (pgoutput_change() will
	 * ignore it.)
	 */
	if (!is_publishable_relation(relation))
	{
		memset(pubdesc, 0, sizeof(PublicationDesc));
		pubdesc->rf_valid_for_update = true;
		pubdesc->rf_valid_for_delete = true;
		pubdesc->cols_valid_for_update = true;
		pubdesc->cols_valid_for_delete = true;
		pubdesc->gencols_valid_for_update = true;
		pubdesc->gencols_valid_for_delete = true;
		return;
	}

	if (relation->rd_pubdesc)
	{
		memcpy(pubdesc, relation->rd_pubdesc, sizeof(PublicationDesc));
		return;
	}

	memset(pubdesc, 0, sizeof(PublicationDesc));
	pubdesc->rf_valid_for_update = true;
	pubdesc->rf_valid_for_delete = true;
	pubdesc->cols_valid_for_update = true;
	pubdesc->cols_valid_for_delete = true;
	pubdesc->gencols_valid_for_update = true;
	pubdesc->gencols_valid_for_delete = true;

	/* Fetch the publication membership info. */
	puboids = GetRelationPublications(relid);
	schemaid = RelationGetNamespace(relation);
	puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid));

	if (relation->rd_rel->relispartition)
	{
		/* Add publications that the ancestors are in too. */
		ancestors = get_partition_ancestors(relid);

		foreach(lc, ancestors)
		{
			Oid			ancestor = lfirst_oid(lc);

			puboids = list_concat_unique_oid(puboids,
											 GetRelationPublications(ancestor));
			schemaid = get_rel_namespace(ancestor);
			puboids = list_concat_unique_oid(puboids,
											 GetSchemaPublications(schemaid));
		}
	}
	puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());

	foreach(lc, puboids)
	{
		Oid			pubid = lfirst_oid(lc);
		HeapTuple	tup;
		Form_pg_publication pubform;
		bool		invalid_column_list;
		bool		invalid_gen_col;

		tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));

		if (!HeapTupleIsValid(tup))
			elog(ERROR, "cache lookup failed for publication %u", pubid);

		pubform = (Form_pg_publication) GETSTRUCT(tup);

		pubdesc->pubactions.pubinsert |= pubform->pubinsert;
		pubdesc->pubactions.pubupdate |= pubform->pubupdate;
		pubdesc->pubactions.pubdelete |= pubform->pubdelete;
		pubdesc->pubactions.pubtruncate |= pubform->pubtruncate;

		/*
		 * Check if all columns referenced in the filter expression are part
		 * of the REPLICA IDENTITY index or not.
		 *
		 * If the publication is FOR ALL TABLES then it means the table has no
		 * row filters and we can skip the validation.
		 */
		if (!pubform->puballtables &&
			(pubform->pubupdate || pubform->pubdelete) &&
			pub_rf_contains_invalid_column(pubid, relation, ancestors,
										   pubform->pubviaroot))
		{
			if (pubform->pubupdate)
				pubdesc->rf_valid_for_update = false;
			if (pubform->pubdelete)
				pubdesc->rf_valid_for_delete = false;
		}

		/*
		 * Check if all columns are part of the REPLICA IDENTITY index or not.
		 *
		 * Check if all generated columns included in the REPLICA IDENTITY are
		 * published.
		 */
		if ((pubform->pubupdate || pubform->pubdelete) &&
			pub_contains_invalid_column(pubid, relation, ancestors,
										pubform->pubviaroot,
										pubform->pubgencols,
										&invalid_column_list,
										&invalid_gen_col))
		{
			if (pubform->pubupdate)
			{
				pubdesc->cols_valid_for_update = !invalid_column_list;
				pubdesc->gencols_valid_for_update = !invalid_gen_col;
			}

			if (pubform->pubdelete)
			{
				pubdesc->cols_valid_for_delete = !invalid_column_list;
				pubdesc->gencols_valid_for_delete = !invalid_gen_col;
			}
		}

		ReleaseSysCache(tup);

		/*
		 * If we know everything is replicated and the row filter is invalid
		 * for update and delete, there is no point to check for other
		 * publications.
		 */
		if (pubdesc->pubactions.pubinsert && pubdesc->pubactions.pubupdate &&
			pubdesc->pubactions.pubdelete && pubdesc->pubactions.pubtruncate &&
			!pubdesc->rf_valid_for_update && !pubdesc->rf_valid_for_delete)
			break;

		/*
		 * If we know everything is replicated and the column list is invalid
		 * for update and delete, there is no point to check for other
		 * publications.
		 */
		if (pubdesc->pubactions.pubinsert && pubdesc->pubactions.pubupdate &&
			pubdesc->pubactions.pubdelete && pubdesc->pubactions.pubtruncate &&
			!pubdesc->cols_valid_for_update && !pubdesc->cols_valid_for_delete)
			break;

		/*
		 * If we know everything is replicated and replica identity has an
		 * unpublished generated column, there is no point to check for other
		 * publications.
		 */
		if (pubdesc->pubactions.pubinsert && pubdesc->pubactions.pubupdate &&
			pubdesc->pubactions.pubdelete && pubdesc->pubactions.pubtruncate &&
			!pubdesc->gencols_valid_for_update &&
			!pubdesc->gencols_valid_for_delete)
			break;
	}

	if (relation->rd_pubdesc)
	{
		pfree(relation->rd_pubdesc);
		relation->rd_pubdesc = NULL;
	}

	/* Now save copy of the descriptor in the relcache entry. */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_pubdesc = palloc(sizeof(PublicationDesc));
	memcpy(relation->rd_pubdesc, pubdesc, sizeof(PublicationDesc));
	MemoryContextSwitchTo(oldcxt);
}

static bytea **
CopyIndexAttOptions(bytea **srcopts, int natts)
{
	bytea	  **opts = palloc(sizeof(*opts) * natts);

	for (int i = 0; i < natts; i++)
	{
		bytea	   *opt = srcopts[i];

		opts[i] = !opt ? NULL : (bytea *)
			DatumGetPointer(datumCopy(PointerGetDatum(opt), false, -1));
	}

	return opts;
}

/*
 * RelationGetIndexAttOptions
 *		get AM/opclass-specific options for an index parsed into a binary form
 */
bytea	  **
RelationGetIndexAttOptions(Relation relation, bool copy)
{
	MemoryContext oldcxt;
	bytea	  **opts = relation->rd_opcoptions;
	Oid			relid = RelationGetRelid(relation);
	int			natts = RelationGetNumberOfAttributes(relation);	/* XXX
																	 * IndexRelationGetNumberOfKeyAttributes */
	int			i;

	/* Try to copy cached options. */
	if (opts)
		return copy ? CopyIndexAttOptions(opts, natts) : opts;

	/* Get and parse opclass options. */
	opts = palloc0(sizeof(*opts) * natts);

	for (i = 0; i < natts; i++)
	{
		if (criticalRelcachesBuilt && relid != AttributeRelidNumIndexId)
		{
			Datum		attoptions = get_attoptions(relid, i + 1);

			opts[i] = index_opclass_options(relation, i + 1, attoptions, false);

			if (attoptions != (Datum) 0)
				pfree(DatumGetPointer(attoptions));
		}
	}

	/* Copy parsed options to the cache. */
	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
	relation->rd_opcoptions = CopyIndexAttOptions(opts, natts);
	MemoryContextSwitchTo(oldcxt);

	if (copy)
		return opts;

	for (i = 0; i < natts; i++)
	{
		if (opts[i])
			pfree(opts[i]);
	}

	pfree(opts);

	return relation->rd_opcoptions;
}

/*
 * Routines to support ereport() reports of relation-related errors
 *
 * These could have been put into elog.c, but it seems like a module layering
 * violation to have elog.c calling relcache or syscache stuff --- and we
 * definitely don't want elog.h including rel.h.  So we put them here.
 */

/*
 * errtable --- stores schema_name and table_name of a table
 * within the current errordata.
 */
int
errtable(Relation rel)
{
	err_generic_string(PG_DIAG_SCHEMA_NAME,
					   get_namespace_name(RelationGetNamespace(rel)));
	err_generic_string(PG_DIAG_TABLE_NAME, RelationGetRelationName(rel));

	return 0;					/* return value does not matter */
}

/*
 * errtablecol --- stores schema_name, table_name and column_name
 * of a table column within the current errordata.
 *
 * The column is specified by attribute number --- for most callers, this is
 * easier and less error-prone than getting the column name for themselves.
 */
int
errtablecol(Relation rel, int attnum)
{
	TupleDesc	reldesc = RelationGetDescr(rel);
	const char *colname;

	/* Use reldesc if it's a user attribute, else consult the catalogs */
	if (attnum > 0 && attnum <= reldesc->natts)
		colname = NameStr(TupleDescAttr(reldesc, attnum - 1)->attname);
	else
		colname = get_attname(RelationGetRelid(rel), attnum, false);

	return errtablecolname(rel, colname);
}

/*
 * errtablecolname --- stores schema_name, table_name and column_name
 * of a table column within the current errordata, where the column name is
 * given directly rather than extracted from the relation's catalog data.
 *
 * Don't use this directly unless errtablecol() is inconvenient for some
 * reason.  This might possibly be needed during intermediate states in ALTER
 * TABLE, for instance.
 */
int
errtablecolname(Relation rel, const char *colname)
{
	errtable(rel);
	err_generic_string(PG_DIAG_COLUMN_NAME, colname);

	return 0;					/* return value does not matter */
}

/*
 * errtableconstraint --- stores schema_name, table_name and constraint_name
 * of a table-related constraint within the current errordata.
 */
int
errtableconstraint(Relation rel, const char *conname)
{
	errtable(rel);
	err_generic_string(PG_DIAG_CONSTRAINT_NAME, conname);

	return 0;					/* return value does not matter */
}


/*
 *	load_relcache_init_file, write_relcache_init_file
 *
 *		In late 1992, we started regularly having databases with more than
 *		a thousand classes in them.  With this number of classes, it became
 *		critical to do indexed lookups on the system catalogs.
 *
 *		Bootstrapping these lookups is very hard.  We want to be able to
 *		use an index on pg_attribute, for example, but in order to do so,
 *		we must have read pg_attribute for the attributes in the index,
 *		which implies that we need to use the index.
 *
 *		In order to get around the problem, we do the following:
 *
 *		   +  When the database system is initialized (at initdb time), we
 *			  don't use indexes.  We do sequential scans.
 *
 *		   +  When the backend is started up in normal mode, we load an image
 *			  of the appropriate relation descriptors, in internal format,
 *			  from an initialization file in the data/base/... directory.
 *
 *		   +  If the initialization file isn't there, then we create the
 *			  relation descriptors using sequential scans and write 'em to
 *			  the initialization file for use by subsequent backends.
 *
 *		As of Postgres 9.0, there is one local initialization file in each
 *		database, plus one shared initialization file for shared catalogs.
 *
 *		We could dispense with the initialization files and just build the
 *		critical reldescs the hard way on every backend startup, but that
 *		slows down backend startup noticeably.
 *
 *		We can in fact go further, and save more relcache entries than
 *		just the ones that are absolutely critical; this allows us to speed
 *		up backend startup by not having to build such entries the hard way.
 *		Presently, all the catalog and index entries that are referred to
 *		by catcaches are stored in the initialization files.
 *
 *		The same mechanism that detects when catcache and relcache entries
 *		need to be invalidated (due to catalog updates) also arranges to
 *		unlink the initialization files when the contents may be out of date.
 *		The files will then be rebuilt during the next backend startup.
 */

/*
 * load_relcache_init_file -- attempt to load cache from the shared
 * or local cache init file
 *
 * If successful, return true and set criticalRelcachesBuilt or
 * criticalSharedRelcachesBuilt to true.
 * If not successful, return false.
 *
 * NOTE: we assume we are already switched into CacheMemoryContext.
 */
static bool
load_relcache_init_file(bool shared)
{
	FILE	   *fp;
	char		initfilename[MAXPGPATH];
	Relation   *rels;
	int			relno,
				num_rels,
				max_rels,
				nailed_rels,
				nailed_indexes,
				magic;
	int			i;

	if (shared)
		snprintf(initfilename, sizeof(initfilename), "global/%s",
				 RELCACHE_INIT_FILENAME);
	else
		snprintf(initfilename, sizeof(initfilename), "%s/%s",
				 DatabasePath, RELCACHE_INIT_FILENAME);

	fp = AllocateFile(initfilename, PG_BINARY_R);
	if (fp == NULL)
		return false;

	/*
	 * Read the index relcache entries from the file.  Note we will not enter
	 * any of them into the cache if the read fails partway through; this
	 * helps to guard against broken init files.
	 */
	max_rels = 100;
	rels = (Relation *) palloc(max_rels * sizeof(Relation));
	num_rels = 0;
	nailed_rels = nailed_indexes = 0;

	/* check for correct magic number (compatible version) */
	if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
		goto read_failed;
	if (magic != RELCACHE_INIT_FILEMAGIC)
		goto read_failed;

	for (relno = 0;; relno++)
	{
		Size		len;
		size_t		nread;
		Relation	rel;
		Form_pg_class relform;
		bool		has_not_null;

		/* first read the relation descriptor length */
		nread = fread(&len, 1, sizeof(len), fp);
		if (nread != sizeof(len))
		{
			if (nread == 0)
				break;			/* end of file */
			goto read_failed;
		}

		/* safety check for incompatible relcache layout */
		if (len != sizeof(RelationData))
			goto read_failed;

		/* allocate another relcache header */
		if (num_rels >= max_rels)
		{
			max_rels *= 2;
			rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
		}

		rel = rels[num_rels++] = (Relation) palloc(len);

		/* then, read the Relation structure */
		if (fread(rel, 1, len, fp) != len)
			goto read_failed;

		/* next read the relation tuple form */
		if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
			goto read_failed;

		relform = (Form_pg_class) palloc(len);
		if (fread(relform, 1, len, fp) != len)
			goto read_failed;

		rel->rd_rel = relform;

		/* initialize attribute tuple forms */
		rel->rd_att = CreateTemplateTupleDesc(relform->relnatts);
		rel->rd_att->tdrefcount = 1;	/* mark as refcounted */

		rel->rd_att->tdtypeid = relform->reltype ? relform->reltype : RECORDOID;
		rel->rd_att->tdtypmod = -1; /* just to be sure */

		/* next read all the attribute tuple form data entries */
		has_not_null = false;
		for (i = 0; i < relform->relnatts; i++)
		{
			Form_pg_attribute attr = TupleDescAttr(rel->rd_att, i);

			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
				goto read_failed;
			if (len != ATTRIBUTE_FIXED_PART_SIZE)
				goto read_failed;
			if (fread(attr, 1, len, fp) != len)
				goto read_failed;

			has_not_null |= attr->attnotnull;

			populate_compact_attribute(rel->rd_att, i);
		}

		/* next read the access method specific field */
		if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
			goto read_failed;
		if (len > 0)
		{
			rel->rd_options = palloc(len);
			if (fread(rel->rd_options, 1, len, fp) != len)
				goto read_failed;
			if (len != VARSIZE(rel->rd_options))
				goto read_failed;	/* sanity check */
		}
		else
		{
			rel->rd_options = NULL;
		}

		/* mark not-null status */
		if (has_not_null)
		{
			TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

			constr->has_not_null = true;
			rel->rd_att->constr = constr;
		}

		/*
		 * If it's an index, there's more to do.  Note we explicitly ignore
		 * partitioned indexes here.
		 */
		if (rel->rd_rel->relkind == RELKIND_INDEX)
		{
			MemoryContext indexcxt;
			Oid		   *opfamily;
			Oid		   *opcintype;
			RegProcedure *support;
			int			nsupport;
			int16	   *indoption;
			Oid		   *indcollation;

			/* Count nailed indexes to ensure we have 'em all */
			if (rel->rd_isnailed)
				nailed_indexes++;

			/* read the pg_index tuple */
			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
				goto read_failed;

			rel->rd_indextuple = (HeapTuple) palloc(len);
			if (fread(rel->rd_indextuple, 1, len, fp) != len)
				goto read_failed;

			/* Fix up internal pointers in the tuple -- see heap_copytuple */
			rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
			rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);

			/*
			 * prepare index info context --- parameters should match
			 * RelationInitIndexAccessInfo
			 */
			indexcxt = AllocSetContextCreate(CacheMemoryContext,
											 "index info",
											 ALLOCSET_SMALL_SIZES);
			rel->rd_indexcxt = indexcxt;
			MemoryContextCopyAndSetIdentifier(indexcxt,
											  RelationGetRelationName(rel));

			/*
			 * Now we can fetch the index AM's API struct.  (We can't store
			 * that in the init file, since it contains function pointers that
			 * might vary across server executions.  Fortunately, it should be
			 * safe to call the amhandler even while bootstrapping indexes.)
			 */
			InitIndexAmRoutine(rel);

			/* read the vector of opfamily OIDs */
			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
				goto read_failed;

			opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
			if (fread(opfamily, 1, len, fp) != len)
				goto read_failed;

			rel->rd_opfamily = opfamily;

			/* read the vector of opcintype OIDs */
			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
				goto read_failed;

			opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
			if (fread(opcintype, 1, len, fp) != len)
				goto read_failed;

			rel->rd_opcintype = opcintype;

			/* read the vector of support procedure OIDs */
			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
				goto read_failed;
			support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
			if (fread(support, 1, len, fp) != len)
				goto read_failed;

			rel->rd_support = support;

			/* read the vector of collation OIDs */
			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
				goto read_failed;

			indcollation = (Oid *) MemoryContextAlloc(indexcxt, len);
			if (fread(indcollation, 1, len, fp) != len)
				goto read_failed;

			rel->rd_indcollation = indcollation;

			/* read the vector of indoption values */
			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
				goto read_failed;

			indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
			if (fread(indoption, 1, len, fp) != len)
				goto read_failed;

			rel->rd_indoption = indoption;

			/* read the vector of opcoptions values */
			rel->rd_opcoptions = (bytea **)
				MemoryContextAllocZero(indexcxt, sizeof(*rel->rd_opcoptions) * relform->relnatts);

			for (i = 0; i < relform->relnatts; i++)
			{
				if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
					goto read_failed;

				if (len > 0)
				{
					rel->rd_opcoptions[i] = (bytea *) MemoryContextAlloc(indexcxt, len);
					if (fread(rel->rd_opcoptions[i], 1, len, fp) != len)
						goto read_failed;
				}
			}

			/* set up zeroed fmgr-info vector */
			nsupport = relform->relnatts * rel->rd_indam->amsupport;
			rel->rd_supportinfo = (FmgrInfo *)
				MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
		}
		else
		{
			/* Count nailed rels to ensure we have 'em all */
			if (rel->rd_isnailed)
				nailed_rels++;

			/* Load table AM data */
			if (RELKIND_HAS_TABLE_AM(rel->rd_rel->relkind) || rel->rd_rel->relkind == RELKIND_SEQUENCE)
				RelationInitTableAccessMethod(rel);

			Assert(rel->rd_index == NULL);
			Assert(rel->rd_indextuple == NULL);
			Assert(rel->rd_indexcxt == NULL);
			Assert(rel->rd_indam == NULL);
			Assert(rel->rd_opfamily == NULL);
			Assert(rel->rd_opcintype == NULL);
			Assert(rel->rd_support == NULL);
			Assert(rel->rd_supportinfo == NULL);
			Assert(rel->rd_indoption == NULL);
			Assert(rel->rd_indcollation == NULL);
			Assert(rel->rd_opcoptions == NULL);
		}

		/*
		 * Rules and triggers are not saved (mainly because the internal
		 * format is complex and subject to change).  They must be rebuilt if
		 * needed by RelationCacheInitializePhase3.  This is not expected to
		 * be a big performance hit since few system catalogs have such. Ditto
		 * for RLS policy data, partition info, index expressions, predicates,
		 * exclusion info, and FDW info.
		 */
		rel->rd_rules = NULL;
		rel->rd_rulescxt = NULL;
		rel->trigdesc = NULL;
		rel->rd_rsdesc = NULL;
		rel->rd_partkey = NULL;
		rel->rd_partkeycxt = NULL;
		rel->rd_partdesc = NULL;
		rel->rd_partdesc_nodetached = NULL;
		rel->rd_partdesc_nodetached_xmin = InvalidTransactionId;
		rel->rd_pdcxt = NULL;
		rel->rd_pddcxt = NULL;
		rel->rd_partcheck = NIL;
		rel->rd_partcheckvalid = false;
		rel->rd_partcheckcxt = NULL;
		rel->rd_indexprs = NIL;
		rel->rd_indpred = NIL;
		rel->rd_exclops = NULL;
		rel->rd_exclprocs = NULL;
		rel->rd_exclstrats = NULL;
		rel->rd_fdwroutine = NULL;

		/*
		 * Reset transient-state fields in the relcache entry
		 */
		rel->rd_smgr = NULL;
		if (rel->rd_isnailed)
			rel->rd_refcnt = 1;
		else
			rel->rd_refcnt = 0;
		rel->rd_indexvalid = false;
		rel->rd_indexlist = NIL;
		rel->rd_pkindex = InvalidOid;
		rel->rd_replidindex = InvalidOid;
		rel->rd_attrsvalid = false;
		rel->rd_keyattr = NULL;
		rel->rd_pkattr = NULL;
		rel->rd_idattr = NULL;
		rel->rd_pubdesc = NULL;
		rel->rd_statvalid = false;
		rel->rd_statlist = NIL;
		rel->rd_fkeyvalid = false;
		rel->rd_fkeylist = NIL;
		rel->rd_createSubid = InvalidSubTransactionId;
		rel->rd_newRelfilelocatorSubid = InvalidSubTransactionId;
		rel->rd_firstRelfilelocatorSubid = InvalidSubTransactionId;
		rel->rd_droppedSubid = InvalidSubTransactionId;
		rel->rd_amcache = NULL;
		rel->pgstat_info = NULL;

		/*
		 * Recompute lock and physical addressing info.  This is needed in
		 * case the pg_internal.init file was copied from some other database
		 * by CREATE DATABASE.
		 */
		RelationInitLockInfo(rel);
		RelationInitPhysicalAddr(rel);
	}

	/*
	 * We reached the end of the init file without apparent problem.  Did we
	 * get the right number of nailed items?  This is a useful crosscheck in
	 * case the set of critical rels or indexes changes.  However, that should
	 * not happen in a normally-running system, so let's bleat if it does.
	 *
	 * For the shared init file, we're called before client authentication is
	 * done, which means that elog(WARNING) will go only to the postmaster
	 * log, where it's easily missed.  To ensure that developers notice bad
	 * values of NUM_CRITICAL_SHARED_RELS/NUM_CRITICAL_SHARED_INDEXES, we put
	 * an Assert(false) there.
	 */
	if (shared)
	{
		if (nailed_rels != NUM_CRITICAL_SHARED_RELS ||
			nailed_indexes != NUM_CRITICAL_SHARED_INDEXES)
		{
			elog(WARNING, "found %d nailed shared rels and %d nailed shared indexes in init file, but expected %d and %d respectively",
				 nailed_rels, nailed_indexes,
				 NUM_CRITICAL_SHARED_RELS, NUM_CRITICAL_SHARED_INDEXES);
			/* Make sure we get developers' attention about this */
			Assert(false);
			/* In production builds, recover by bootstrapping the relcache */
			goto read_failed;
		}
	}
	else
	{
		if (nailed_rels != NUM_CRITICAL_LOCAL_RELS ||
			nailed_indexes != NUM_CRITICAL_LOCAL_INDEXES)
		{
			elog(WARNING, "found %d nailed rels and %d nailed indexes in init file, but expected %d and %d respectively",
				 nailed_rels, nailed_indexes,
				 NUM_CRITICAL_LOCAL_RELS, NUM_CRITICAL_LOCAL_INDEXES);
			/* We don't need an Assert() in this case */
			goto read_failed;
		}
	}

	/*
	 * OK, all appears well.
	 *
	 * Now insert all the new relcache entries into the cache.
	 */
	for (relno = 0; relno < num_rels; relno++)
	{
		RelationCacheInsert(rels[relno], false);
	}

	pfree(rels);
	FreeFile(fp);

	if (shared)
		criticalSharedRelcachesBuilt = true;
	else
		criticalRelcachesBuilt = true;
	return true;

	/*
	 * init file is broken, so do it the hard way.  We don't bother trying to
	 * free the clutter we just allocated; it's not in the relcache so it
	 * won't hurt.
	 */
read_failed:
	pfree(rels);
	FreeFile(fp);

	return false;
}

/*
 * Write out a new initialization file with the current contents
 * of the relcache (either shared rels or local rels, as indicated).
 */
static void
write_relcache_init_file(bool shared)
{
	FILE	   *fp;
	char		tempfilename[MAXPGPATH];
	char		finalfilename[MAXPGPATH];
	int			magic;
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;
	int			i;

	/*
	 * If we have already received any relcache inval events, there's no
	 * chance of succeeding so we may as well skip the whole thing.
	 */
	if (relcacheInvalsReceived != 0L)
		return;

	/*
	 * We must write a temporary file and rename it into place. Otherwise,
	 * another backend starting at about the same time might crash trying to
	 * read the partially-complete file.
	 */
	if (shared)
	{
		snprintf(tempfilename, sizeof(tempfilename), "global/%s.%d",
				 RELCACHE_INIT_FILENAME, MyProcPid);
		snprintf(finalfilename, sizeof(finalfilename), "global/%s",
				 RELCACHE_INIT_FILENAME);
	}
	else
	{
		snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
				 DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
		snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
				 DatabasePath, RELCACHE_INIT_FILENAME);
	}

	unlink(tempfilename);		/* in case it exists w/wrong permissions */

	fp = AllocateFile(tempfilename, PG_BINARY_W);
	if (fp == NULL)
	{
		/*
		 * We used to consider this a fatal error, but we might as well
		 * continue with backend startup ...
		 */
		ereport(WARNING,
				(errcode_for_file_access(),
				 errmsg("could not create relation-cache initialization file \"%s\": %m",
						tempfilename),
				 errdetail("Continuing anyway, but there's something wrong.")));
		return;
	}

	/*
	 * Write a magic number to serve as a file version identifier.  We can
	 * change the magic number whenever the relcache layout changes.
	 */
	magic = RELCACHE_INIT_FILEMAGIC;
	if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
		ereport(FATAL,
				errcode_for_file_access(),
				errmsg_internal("could not write init file: %m"));

	/*
	 * Write all the appropriate reldescs (in no particular order).
	 */
	hash_seq_init(&status, RelationIdCache);

	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
	{
		Relation	rel = idhentry->reldesc;
		Form_pg_class relform = rel->rd_rel;

		/* ignore if not correct group */
		if (relform->relisshared != shared)
			continue;

		/*
		 * Ignore if not supposed to be in init file.  We can allow any shared
		 * relation that's been loaded so far to be in the shared init file,
		 * but unshared relations must be ones that should be in the local
		 * file per RelationIdIsInInitFile.  (Note: if you want to change the
		 * criterion for rels to be kept in the init file, see also inval.c.
		 * The reason for filtering here is to be sure that we don't put
		 * anything into the local init file for which a relcache inval would
		 * not cause invalidation of that init file.)
		 */
		if (!shared && !RelationIdIsInInitFile(RelationGetRelid(rel)))
		{
			/* Nailed rels had better get stored. */
			Assert(!rel->rd_isnailed);
			continue;
		}

		/* first write the relcache entry proper */
		write_item(rel, sizeof(RelationData), fp);

		/* next write the relation tuple form */
		write_item(relform, CLASS_TUPLE_SIZE, fp);

		/* next, do all the attribute tuple form data entries */
		for (i = 0; i < relform->relnatts; i++)
		{
			write_item(TupleDescAttr(rel->rd_att, i),
					   ATTRIBUTE_FIXED_PART_SIZE, fp);
		}

		/* next, do the access method specific field */
		write_item(rel->rd_options,
				   (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
				   fp);

		/*
		 * If it's an index, there's more to do. Note we explicitly ignore
		 * partitioned indexes here.
		 */
		if (rel->rd_rel->relkind == RELKIND_INDEX)
		{
			/* write the pg_index tuple */
			/* we assume this was created by heap_copytuple! */
			write_item(rel->rd_indextuple,
					   HEAPTUPLESIZE + rel->rd_indextuple->t_len,
					   fp);

			/* write the vector of opfamily OIDs */
			write_item(rel->rd_opfamily,
					   relform->relnatts * sizeof(Oid),
					   fp);

			/* write the vector of opcintype OIDs */
			write_item(rel->rd_opcintype,
					   relform->relnatts * sizeof(Oid),
					   fp);

			/* write the vector of support procedure OIDs */
			write_item(rel->rd_support,
					   relform->relnatts * (rel->rd_indam->amsupport * sizeof(RegProcedure)),
					   fp);

			/* write the vector of collation OIDs */
			write_item(rel->rd_indcollation,
					   relform->relnatts * sizeof(Oid),
					   fp);

			/* write the vector of indoption values */
			write_item(rel->rd_indoption,
					   relform->relnatts * sizeof(int16),
					   fp);

			Assert(rel->rd_opcoptions);

			/* write the vector of opcoptions values */
			for (i = 0; i < relform->relnatts; i++)
			{
				bytea	   *opt = rel->rd_opcoptions[i];

				write_item(opt, opt ? VARSIZE(opt) : 0, fp);
			}
		}
	}

	if (FreeFile(fp))
		ereport(FATAL,
				errcode_for_file_access(),
				errmsg_internal("could not write init file: %m"));

	/*
	 * Now we have to check whether the data we've so painstakingly
	 * accumulated is already obsolete due to someone else's just-committed
	 * catalog changes.  If so, we just delete the temp file and leave it to
	 * the next backend to try again.  (Our own relcache entries will be
	 * updated by SI message processing, but we can't be sure whether what we
	 * wrote out was up-to-date.)
	 *
	 * This mustn't run concurrently with the code that unlinks an init file
	 * and sends SI messages, so grab a serialization lock for the duration.
	 */
	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);

	/* Make sure we have seen all incoming SI messages */
	AcceptInvalidationMessages();

	/*
	 * If we have received any SI relcache invals since backend start, assume
	 * we may have written out-of-date data.
	 */
	if (relcacheInvalsReceived == 0L)
	{
		/*
		 * OK, rename the temp file to its final name, deleting any
		 * previously-existing init file.
		 *
		 * Note: a failure here is possible under Cygwin, if some other
		 * backend is holding open an unlinked-but-not-yet-gone init file. So
		 * treat this as a noncritical failure; just remove the useless temp
		 * file on failure.
		 */
		if (rename(tempfilename, finalfilename) < 0)
			unlink(tempfilename);
	}
	else
	{
		/* Delete the already-obsolete temp file */
		unlink(tempfilename);
	}

	LWLockRelease(RelCacheInitLock);
}

/* write a chunk of data preceded by its length */
static void
write_item(const void *data, Size len, FILE *fp)
{
	if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
		ereport(FATAL,
				errcode_for_file_access(),
				errmsg_internal("could not write init file: %m"));
	if (len > 0 && fwrite(data, 1, len, fp) != len)
		ereport(FATAL,
				errcode_for_file_access(),
				errmsg_internal("could not write init file: %m"));
}

/*
 * Determine whether a given relation (identified by OID) is one of the ones
 * we should store in a relcache init file.
 *
 * We must cache all nailed rels, and for efficiency we should cache every rel
 * that supports a syscache.  The former set is almost but not quite a subset
 * of the latter. The special cases are relations where
 * RelationCacheInitializePhase2/3 chooses to nail for efficiency reasons, but
 * which do not support any syscache.
 */
bool
RelationIdIsInInitFile(Oid relationId)
{
	if (relationId == SharedSecLabelRelationId ||
		relationId == TriggerRelidNameIndexId ||
		relationId == DatabaseNameIndexId ||
		relationId == SharedSecLabelObjectIndexId)
	{
		/*
		 * If this Assert fails, we don't need the applicable special case
		 * anymore.
		 */
		Assert(!RelationSupportsSysCache(relationId));
		return true;
	}
	return RelationSupportsSysCache(relationId);
}

/*
 * Invalidate (remove) the init file during commit of a transaction that
 * changed one or more of the relation cache entries that are kept in the
 * local init file.
 *
 * To be safe against concurrent inspection or rewriting of the init file,
 * we must take RelCacheInitLock, then remove the old init file, then send
 * the SI messages that include relcache inval for such relations, and then
 * release RelCacheInitLock.  This serializes the whole affair against
 * write_relcache_init_file, so that we can be sure that any other process
 * that's concurrently trying to create a new init file won't move an
 * already-stale version into place after we unlink.  Also, because we unlink
 * before sending the SI messages, a backend that's currently starting cannot
 * read the now-obsolete init file and then miss the SI messages that will
 * force it to update its relcache entries.  (This works because the backend
 * startup sequence gets into the sinval array before trying to load the init
 * file.)
 *
 * We take the lock and do the unlink in RelationCacheInitFilePreInvalidate,
 * then release the lock in RelationCacheInitFilePostInvalidate.  Caller must
 * send any pending SI messages between those calls.
 */
void
RelationCacheInitFilePreInvalidate(void)
{
	char		localinitfname[MAXPGPATH];
	char		sharedinitfname[MAXPGPATH];

	if (DatabasePath)
		snprintf(localinitfname, sizeof(localinitfname), "%s/%s",
				 DatabasePath, RELCACHE_INIT_FILENAME);
	snprintf(sharedinitfname, sizeof(sharedinitfname), "global/%s",
			 RELCACHE_INIT_FILENAME);

	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);

	/*
	 * The files might not be there if no backend has been started since the
	 * last removal.  But complain about failures other than ENOENT with
	 * ERROR.  Fortunately, it's not too late to abort the transaction if we
	 * can't get rid of the would-be-obsolete init file.
	 */
	if (DatabasePath)
		unlink_initfile(localinitfname, ERROR);
	unlink_initfile(sharedinitfname, ERROR);
}

void
RelationCacheInitFilePostInvalidate(void)
{
	LWLockRelease(RelCacheInitLock);
}

/*
 * Remove the init files during postmaster startup.
 *
 * We used to keep the init files across restarts, but that is unsafe in PITR
 * scenarios, and even in simple crash-recovery cases there are windows for
 * the init files to become out-of-sync with the database.  So now we just
 * remove them during startup and expect the first backend launch to rebuild
 * them.  Of course, this has to happen in each database of the cluster.
 */
void
RelationCacheInitFileRemove(void)
{
	const char *tblspcdir = PG_TBLSPC_DIR;
	DIR		   *dir;
	struct dirent *de;
	char		path[MAXPGPATH + sizeof(PG_TBLSPC_DIR) + sizeof(TABLESPACE_VERSION_DIRECTORY)];

	snprintf(path, sizeof(path), "global/%s",
			 RELCACHE_INIT_FILENAME);
	unlink_initfile(path, LOG);

	/* Scan everything in the default tablespace */
	RelationCacheInitFileRemoveInDir("base");

	/* Scan the tablespace link directory to find non-default tablespaces */
	dir = AllocateDir(tblspcdir);

	while ((de = ReadDirExtended(dir, tblspcdir, LOG)) != NULL)
	{
		if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
		{
			/* Scan the tablespace dir for per-database dirs */
			snprintf(path, sizeof(path), "%s/%s/%s",
					 tblspcdir, de->d_name, TABLESPACE_VERSION_DIRECTORY);
			RelationCacheInitFileRemoveInDir(path);
		}
	}

	FreeDir(dir);
}

/* Process one per-tablespace directory for RelationCacheInitFileRemove */
static void
RelationCacheInitFileRemoveInDir(const char *tblspcpath)
{
	DIR		   *dir;
	struct dirent *de;
	char		initfilename[MAXPGPATH * 2];

	/* Scan the tablespace directory to find per-database directories */
	dir = AllocateDir(tblspcpath);

	while ((de = ReadDirExtended(dir, tblspcpath, LOG)) != NULL)
	{
		if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
		{
			/* Try to remove the init file in each database */
			snprintf(initfilename, sizeof(initfilename), "%s/%s/%s",
					 tblspcpath, de->d_name, RELCACHE_INIT_FILENAME);
			unlink_initfile(initfilename, LOG);
		}
	}

	FreeDir(dir);
}

static void
unlink_initfile(const char *initfilename, int elevel)
{
	if (unlink(initfilename) < 0)
	{
		/* It might not be there, but log any error other than ENOENT */
		if (errno != ENOENT)
			ereport(elevel,
					(errcode_for_file_access(),
					 errmsg("could not remove cache file \"%s\": %m",
							initfilename)));
	}
}

/*
 * ResourceOwner callbacks
 */
static char *
ResOwnerPrintRelCache(Datum res)
{
	Relation	rel = (Relation) DatumGetPointer(res);

	return psprintf("relation \"%s\"", RelationGetRelationName(rel));
}

static void
ResOwnerReleaseRelation(Datum res)
{
	Relation	rel = (Relation) DatumGetPointer(res);

	/*
	 * This reference has already been removed from the resource owner, so
	 * just decrement reference count without calling
	 * ResourceOwnerForgetRelationRef.
	 */
	Assert(rel->rd_refcnt > 0);
	rel->rd_refcnt -= 1;

	RelationCloseCleanup((Relation) res);
}
