/*
 * task.c
 *		framework for parallelizing pg_upgrade's once-in-each-database tasks
 *
 * This framework provides an efficient way of running the various
 * once-in-each-database tasks required by pg_upgrade.  Specifically, it
 * parallelizes these tasks by managing a set of slots that follow a simple
 * state machine and by using libpq's asynchronous APIs to establish the
 * connections and run the queries.  Callers simply need to create a callback
 * function and build/execute an UpgradeTask.  A simple example follows:
 *
 *		static void
 *		my_process_cb(DbInfo *dbinfo, PGresult *res, void *arg)
 *		{
 *			for (int i = 0; i < PQntuples(res); i++)
 *			{
 *				... process results ...
 *			}
 *		}
 *
 *		void
 *		my_task(ClusterInfo *cluster)
 *		{
 *			UpgradeTask *task = upgrade_task_create();
 *
 *			upgrade_task_add_step(task,
 *								  "... query text ...",
 *								  my_process_cb,
 *								  true,		// let the task free the PGresult
 *								  NULL);	// "arg" pointer for callback
 *			upgrade_task_run(task, cluster);
 *			upgrade_task_free(task);
 *		}
 *
 * Note that multiple steps can be added to a given task.  When there are
 * multiple steps, the task will run all of the steps consecutively in the same
 * database connection before freeing the connection and moving on.  In other
 * words, it only ever initiates one connection to each database in the
 * cluster for a given run.
 *
 * Copyright (c) 2024-2025, PostgreSQL Global Development Group
 * src/bin/pg_upgrade/task.c
 */

#include "postgres_fe.h"

#include "common/connect.h"
#include "fe_utils/string_utils.h"
#include "pg_upgrade.h"

/*
 * dbs_complete stores the number of databases that we have completed
 * processing.  When this value equals the number of databases in the cluster,
 * the task is finished.
 */
static int	dbs_complete;

/*
 * dbs_processing stores the index of the next database in the cluster's array
 * of databases that will be picked up for processing.  It will always be
 * greater than or equal to dbs_complete.
 */
static int	dbs_processing;

/*
 * This struct stores the information for a single step of a task.  Note that
 * the query string is stored in the "queries" PQExpBuffer for the UpgradeTask.
 * All steps in a task are run in a single connection before moving on to the
 * next database (which requires a new connection).
 */
typedef struct UpgradeTaskStep
{
	UpgradeTaskProcessCB process_cb;	/* processes the results of the query */
	bool		free_result;	/* should we free the result? */
	void	   *arg;			/* pointer passed to process_cb */
} UpgradeTaskStep;

/*
 * This struct is a thin wrapper around an array of steps, i.e.,
 * UpgradeTaskStep, plus a PQExpBuffer for all the query strings.
 */
struct UpgradeTask
{
	UpgradeTaskStep *steps;
	int			num_steps;
	PQExpBuffer queries;
};

/*
 * The different states for a parallel slot.
 */
typedef enum UpgradeTaskSlotState
{
	FREE,						/* slot available for use in a new database */
	CONNECTING,					/* waiting for connection to be established */
	RUNNING_QUERIES,			/* running/processing queries in the task */
} UpgradeTaskSlotState;

/*
 * We maintain an array of user_opts.jobs slots to execute the task.
 */
typedef struct UpgradeTaskSlot
{
	UpgradeTaskSlotState state; /* state of the slot */
	int			db_idx;			/* index of the database assigned to slot */
	int			step_idx;		/* index of the current step of task */
	PGconn	   *conn;			/* current connection managed by slot */
	bool		ready;			/* slot is ready for processing */
	bool		select_mode;	/* select() mode: true->read, false->write */
	int			sock;			/* file descriptor for connection's socket */
} UpgradeTaskSlot;

/*
 * Initializes an UpgradeTask.
 */
UpgradeTask *
upgrade_task_create(void)
{
	UpgradeTask *task = pg_malloc0(sizeof(UpgradeTask));

	task->queries = createPQExpBuffer();

	/* All tasks must first set a secure search_path. */
	upgrade_task_add_step(task, ALWAYS_SECURE_SEARCH_PATH_SQL, NULL, true, NULL);

	return task;
}

/*
 * Frees all storage associated with an UpgradeTask.
 */
void
upgrade_task_free(UpgradeTask *task)
{
	destroyPQExpBuffer(task->queries);
	pg_free(task->steps);
	pg_free(task);
}

/*
 * Adds a step to an UpgradeTask.  The steps will be executed in each database
 * in the order in which they are added.
 *
 *	task: task object that must have been initialized via upgrade_task_create()
 *	query: the query text
 *	process_cb: function that processes the results of the query
 *	free_result: should we free the PGresult, or leave it to the caller?
 *	arg: pointer to task-specific data that is passed to each callback
 */
void
upgrade_task_add_step(UpgradeTask *task, const char *query,
					  UpgradeTaskProcessCB process_cb, bool free_result,
					  void *arg)
{
	UpgradeTaskStep *new_step;

	task->steps = pg_realloc(task->steps,
							 ++task->num_steps * sizeof(UpgradeTaskStep));

	new_step = &task->steps[task->num_steps - 1];
	new_step->process_cb = process_cb;
	new_step->free_result = free_result;
	new_step->arg = arg;

	appendPQExpBuffer(task->queries, "%s;", query);
}

/*
 * Build a connection string for the slot's current database and asynchronously
 * start a new connection, but do not wait for the connection to be
 * established.
 */
static void
start_conn(const ClusterInfo *cluster, UpgradeTaskSlot *slot)
{
	PQExpBufferData conn_opts;
	DbInfo	   *dbinfo = &cluster->dbarr.dbs[slot->db_idx];

	/* Build connection string with proper quoting */
	initPQExpBuffer(&conn_opts);
	appendPQExpBufferStr(&conn_opts, "dbname=");
	appendConnStrVal(&conn_opts, dbinfo->db_name);
	appendPQExpBufferStr(&conn_opts, " user=");
	appendConnStrVal(&conn_opts, os_info.user);
	appendPQExpBuffer(&conn_opts, " port=%d", cluster->port);
	if (cluster->sockdir)
	{
		appendPQExpBufferStr(&conn_opts, " host=");
		appendConnStrVal(&conn_opts, cluster->sockdir);
	}

	slot->conn = PQconnectStart(conn_opts.data);

	if (!slot->conn)
		pg_fatal("out of memory");

	termPQExpBuffer(&conn_opts);
}

/*
 * Run the process_cb callback function to process the result of a query, and
 * free the result if the caller indicated we should do so.
 */
static void
process_query_result(const ClusterInfo *cluster, UpgradeTaskSlot *slot,
					 const UpgradeTask *task)
{
	UpgradeTaskStep *steps = &task->steps[slot->step_idx];
	UpgradeTaskProcessCB process_cb = steps->process_cb;
	DbInfo	   *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
	PGresult   *res = PQgetResult(slot->conn);

	if (PQstatus(slot->conn) == CONNECTION_BAD ||
		(PQresultStatus(res) != PGRES_TUPLES_OK &&
		 PQresultStatus(res) != PGRES_COMMAND_OK))
		pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));

	/*
	 * We assume that a NULL process_cb callback function means there's
	 * nothing to process.  This is primarily intended for the initial step in
	 * every task that sets a safe search_path.
	 */
	if (process_cb)
		(*process_cb) (dbinfo, res, steps->arg);

	if (steps->free_result)
		PQclear(res);
}

/*
 * Advances the state machine for a given slot as necessary.
 */
static void
process_slot(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task)
{
	PostgresPollingStatusType status;

	if (!slot->ready)
		return;

	switch (slot->state)
	{
		case FREE:

			/*
			 * If all of the databases in the cluster have been processed or
			 * are currently being processed by other slots, we are done.
			 */
			if (dbs_processing >= cluster->dbarr.ndbs)
				return;

			/*
			 * Claim the next database in the cluster's array and initiate a
			 * new connection.
			 */
			slot->db_idx = dbs_processing++;
			slot->state = CONNECTING;
			start_conn(cluster, slot);

			return;

		case CONNECTING:

			/* Check for connection failure. */
			status = PQconnectPoll(slot->conn);
			if (status == PGRES_POLLING_FAILED)
				pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));

			/* Check whether the connection is still establishing. */
			if (status != PGRES_POLLING_OK)
			{
				slot->select_mode = (status == PGRES_POLLING_READING);
				return;
			}

			/*
			 * Move on to running/processing the queries in the task.
			 */
			slot->state = RUNNING_QUERIES;
			slot->select_mode = true;	/* wait until ready for reading */
			if (!PQsendQuery(slot->conn, task->queries->data))
				pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));

			return;

		case RUNNING_QUERIES:

			/*
			 * Consume any available data and clear the read-ready indicator
			 * for the connection.
			 */
			if (!PQconsumeInput(slot->conn))
				pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));

			/*
			 * Process any results that are ready so that we can free up this
			 * slot for another database as soon as possible.
			 */
			for (; slot->step_idx < task->num_steps; slot->step_idx++)
			{
				/* If no more results are available yet, move on. */
				if (PQisBusy(slot->conn))
					return;

				process_query_result(cluster, slot, task);
			}

			/*
			 * If we just finished processing the result of the last step in
			 * the task, free the slot.  We recursively call this function on
			 * the newly-freed slot so that we can start initiating the next
			 * connection immediately instead of waiting for the next loop
			 * through the slots.
			 */
			dbs_complete++;
			PQfinish(slot->conn);
			memset(slot, 0, sizeof(UpgradeTaskSlot));
			slot->ready = true;

			process_slot(cluster, slot, task);

			return;
	}
}

/*
 * Returns -1 on error, else the number of ready descriptors.
 */
static int
select_loop(int maxFd, fd_set *input, fd_set *output)
{
	fd_set		save_input = *input;
	fd_set		save_output = *output;

	if (maxFd == 0)
		return 0;

	for (;;)
	{
		int			i;

		*input = save_input;
		*output = save_output;

		i = select(maxFd + 1, input, output, NULL, NULL);

#ifndef WIN32
		if (i < 0 && errno == EINTR)
			continue;
#else
		if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
			continue;
#endif
		return i;
	}
}

/*
 * Wait on the slots to either finish connecting or to receive query results if
 * possible.  This avoids a tight loop in upgrade_task_run().
 */
static void
wait_on_slots(UpgradeTaskSlot *slots, int numslots)
{
	fd_set		input;
	fd_set		output;
	int			maxFd = 0;

	FD_ZERO(&input);
	FD_ZERO(&output);

	for (int i = 0; i < numslots; i++)
	{
		/*
		 * We assume the previous call to process_slot() handled everything
		 * that was marked ready in the previous call to wait_on_slots(), if
		 * any.
		 */
		slots[i].ready = false;

		/*
		 * This function should only ever see free slots as we are finishing
		 * processing the last few databases, at which point we don't have any
		 * databases left for them to process.  We'll never use these slots
		 * again, so we can safely ignore them.
		 */
		if (slots[i].state == FREE)
			continue;

		/*
		 * Add the socket to the set.
		 */
		slots[i].sock = PQsocket(slots[i].conn);
		if (slots[i].sock < 0)
			pg_fatal("invalid socket");
		FD_SET(slots[i].sock, slots[i].select_mode ? &input : &output);
		maxFd = Max(maxFd, slots[i].sock);
	}

	/*
	 * If we found socket(s) to wait on, wait.
	 */
	if (select_loop(maxFd, &input, &output) == -1)
		pg_fatal("%s() failed: %m", "select");

	/*
	 * Mark which sockets appear to be ready.
	 */
	for (int i = 0; i < numslots; i++)
		slots[i].ready |= (FD_ISSET(slots[i].sock, &input) ||
						   FD_ISSET(slots[i].sock, &output));
}

/*
 * Runs all the steps of the task in every database in the cluster using
 * user_opts.jobs parallel slots.
 */
void
upgrade_task_run(const UpgradeTask *task, const ClusterInfo *cluster)
{
	int			jobs = Max(1, user_opts.jobs);
	UpgradeTaskSlot *slots = pg_malloc0(sizeof(UpgradeTaskSlot) * jobs);

	dbs_complete = 0;
	dbs_processing = 0;

	/*
	 * Process every slot the first time round.
	 */
	for (int i = 0; i < jobs; i++)
		slots[i].ready = true;

	while (dbs_complete < cluster->dbarr.ndbs)
	{
		for (int i = 0; i < jobs; i++)
			process_slot(cluster, &slots[i], task);

		wait_on_slots(slots, jobs);
	}

	pg_free(slots);
}
