# Copyright (c) 2024-2025, PostgreSQL Global Development Group

#
# Test using a standby server as the subscriber.

use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

program_help_ok('pg_createsubscriber');
program_version_ok('pg_createsubscriber');
program_options_handling_ok('pg_createsubscriber');

my $datadir = PostgreSQL::Test::Utils::tempdir;

# Generate a database with a name made of a range of ASCII characters.
# Extracted from 002_pg_upgrade.pl.
sub generate_db
{
	my ($node, $prefix, $from_char, $to_char, $suffix) = @_;

	my $dbname = $prefix;
	for my $i ($from_char .. $to_char)
	{
		next if $i == 7 || $i == 10 || $i == 13;    # skip BEL, LF, and CR
		$dbname = $dbname . sprintf('%c', $i);
	}

	$dbname .= $suffix;

	# On Windows, older IPC::Run versions can mis-quote command line arguments
	# containing double quote or backslash
	$dbname =~ tr/\"\\//d if ($windows_os);

	$node->command_ok(
		[ 'createdb', $dbname ],
		"created database with ASCII characters from $from_char to $to_char");

	return $dbname;
}

#
# Test mandatory options
command_fails(['pg_createsubscriber'],
	'no subscriber data directory specified');
command_fails(
	[ 'pg_createsubscriber', '--pgdata' => $datadir ],
	'no publisher connection string specified');
command_fails(
	[
		'pg_createsubscriber',
		'--verbose',
		'--pgdata' => $datadir,
		'--publisher-server' => 'port=5432',
	],
	'no database name specified');
command_fails(
	[
		'pg_createsubscriber',
		'--verbose',
		'--pgdata' => $datadir,
		'--publisher-server' => 'port=5432',
		'--database' => 'pg1',
		'--database' => 'pg1',
	],
	'duplicate database name');
command_fails(
	[
		'pg_createsubscriber',
		'--verbose',
		'--pgdata' => $datadir,
		'--publisher-server' => 'port=5432',
		'--publication' => 'foo1',
		'--publication' => 'foo1',
		'--database' => 'pg1',
		'--database' => 'pg2',
	],
	'duplicate publication name');
command_fails(
	[
		'pg_createsubscriber',
		'--verbose',
		'--pgdata' => $datadir,
		'--publisher-server' => 'port=5432',
		'--publication' => 'foo1',
		'--database' => 'pg1',
		'--database' => 'pg2',
	],
	'wrong number of publication names');
command_fails(
	[
		'pg_createsubscriber',
		'--verbose',
		'--pgdata' => $datadir,
		'--publisher-server' => 'port=5432',
		'--publication' => 'foo1',
		'--publication' => 'foo2',
		'--subscription' => 'bar1',
		'--database' => 'pg1',
		'--database' => 'pg2',
	],
	'wrong number of subscription names');
command_fails(
	[
		'pg_createsubscriber',
		'--verbose',
		'--pgdata' => $datadir,
		'--publisher-server' => 'port=5432',
		'--publication' => 'foo1',
		'--publication' => 'foo2',
		'--subscription' => 'bar1',
		'--subscription' => 'bar2',
		'--replication-slot' => 'baz1',
		'--database' => 'pg1',
		'--database' => 'pg2',
	],
	'wrong number of replication slot names');

# Set up node P as primary
my $node_p = PostgreSQL::Test::Cluster->new('node_p');
my $pconnstr = $node_p->connstr;
$node_p->init(allows_streaming => 'logical');
# Disable autovacuum to avoid generating xid during stats update as otherwise
# the new XID could then be replicated to standby at some random point making
# slots at primary lag behind standby during slot sync.
$node_p->append_conf('postgresql.conf', 'autovacuum = off');
$node_p->start;

# Set up node F as about-to-fail node
# Force it to initialize a new cluster instead of copying a
# previously initdb'd cluster. New cluster has a different system identifier so
# we can test if the target cluster is a copy of the source cluster.
my $node_f = PostgreSQL::Test::Cluster->new('node_f');
$node_f->init(force_initdb => 1, allows_streaming => 'logical');

# On node P
# - create databases
# - create test tables
# - insert a row
# - create a physical replication slot
my $db1 = generate_db($node_p, 'regression\\"\\', 1, 45, '\\\\"\\\\\\');
my $db2 = generate_db($node_p, 'regression', 46, 90, '');

$node_p->safe_psql($db1, 'CREATE TABLE tbl1 (a text)');
$node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('first row')");
$node_p->safe_psql($db2, 'CREATE TABLE tbl2 (a text)');
my $slotname = 'physical_slot';
$node_p->safe_psql($db2,
	"SELECT pg_create_physical_replication_slot('$slotname')");

# Set up node S as standby linking to node P
$node_p->backup('backup_1');
my $node_s = PostgreSQL::Test::Cluster->new('node_s');
$node_s->init_from_backup($node_p, 'backup_1', has_streaming => 1);
$node_s->append_conf(
	'postgresql.conf', qq[
primary_slot_name = '$slotname'
primary_conninfo = '$pconnstr dbname=postgres'
hot_standby_feedback = on
]);
$node_s->set_standby_mode();
$node_s->start;

# Set up node T as standby linking to node P then promote it
my $node_t = PostgreSQL::Test::Cluster->new('node_t');
$node_t->init_from_backup($node_p, 'backup_1', has_streaming => 1);
$node_t->set_standby_mode();
$node_t->start;
$node_t->promote;
$node_t->stop;

# Run pg_createsubscriber on a promoted server
command_fails(
	[
		'pg_createsubscriber',
		'--verbose',
		'--dry-run',
		'--pgdata' => $node_t->data_dir,
		'--publisher-server' => $node_p->connstr($db1),
		'--socketdir' => $node_t->host,
		'--subscriber-port' => $node_t->port,
		'--database' => $db1,
		'--database' => $db2,
	],
	'target server is not in recovery');

# Run pg_createsubscriber when standby is running
command_fails(
	[
		'pg_createsubscriber',
		'--verbose',
		'--dry-run',
		'--pgdata' => $node_s->data_dir,
		'--publisher-server' => $node_p->connstr($db1),
		'--socketdir' => $node_s->host,
		'--subscriber-port' => $node_s->port,
		'--database' => $db1,
		'--database' => $db2,
	],
	'standby is up and running');

# Run pg_createsubscriber on about-to-fail node F
command_fails(
	[
		'pg_createsubscriber',
		'--verbose',
		'--pgdata' => $node_f->data_dir,
		'--publisher-server' => $node_p->connstr($db1),
		'--socketdir' => $node_f->host,
		'--subscriber-port' => $node_f->port,
		'--database' => $db1,
		'--database' => $db2
	],
	'subscriber data directory is not a copy of the source database cluster');

# Set up node C as standby linking to node S
$node_s->backup('backup_2');
my $node_c = PostgreSQL::Test::Cluster->new('node_c');
$node_c->init_from_backup($node_s, 'backup_2', has_streaming => 1);
$node_c->adjust_conf('postgresql.conf', 'primary_slot_name', undef);
$node_c->set_standby_mode();

# Run pg_createsubscriber on node C (P -> S -> C)
command_fails(
	[
		'pg_createsubscriber',
		'--verbose',
		'--dry-run',
		'--pgdata' => $node_c->data_dir,
		'--publisher-server' => $node_s->connstr($db1),
		'--socketdir' => $node_c->host,
		'--subscriber-port' => $node_c->port,
		'--database' => $db1,
		'--database' => $db2,
	],
	'primary server is in recovery');

# Check some unmet conditions on node P
$node_p->append_conf(
	'postgresql.conf', q{
wal_level = replica
max_replication_slots = 1
max_wal_senders = 1
max_worker_processes = 2
});
$node_p->restart;
$node_s->stop;
command_fails(
	[
		'pg_createsubscriber',
		'--verbose',
		'--dry-run',
		'--pgdata' => $node_s->data_dir,
		'--publisher-server' => $node_p->connstr($db1),
		'--socketdir' => $node_s->host,
		'--subscriber-port' => $node_s->port,
		'--database' => $db1,
		'--database' => $db2,

	],
	'primary contains unmet conditions on node P');
# Restore default settings here but only apply it after testing standby. Some
# standby settings should not be a lower setting than on the primary.
$node_p->append_conf(
	'postgresql.conf', q{
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
max_worker_processes = 8
});

# Check some unmet conditions on node S
$node_s->append_conf(
	'postgresql.conf', q{
max_active_replication_origins = 1
max_logical_replication_workers = 1
max_worker_processes = 2
});
command_fails(
	[
		'pg_createsubscriber',
		'--verbose',
		'--dry-run',
		'--pgdata' => $node_s->data_dir,
		'--publisher-server' => $node_p->connstr($db1),
		'--socketdir' => $node_s->host,
		'--subscriber-port' => $node_s->port,
		'--database' => $db1,
		'--database' => $db2,
	],
	'standby contains unmet conditions on node S');
$node_s->append_conf(
	'postgresql.conf', q{
max_active_replication_origins = 10
max_logical_replication_workers = 4
max_worker_processes = 8
});
# Restore default settings on both servers
$node_p->restart;

# Create failover slot to test its removal
my $fslotname = 'failover_slot';
$node_p->safe_psql($db1,
	"SELECT pg_create_logical_replication_slot('$fslotname', 'pgoutput', false, false, true)"
);
$node_s->start;
# Wait for the standby to catch up so that the standby is not lagging behind
# the failover slot.
$node_p->wait_for_replay_catchup($node_s);
$node_s->safe_psql('postgres', "SELECT pg_sync_replication_slots()");
my $result = $node_s->safe_psql('postgres',
	"SELECT slot_name FROM pg_replication_slots WHERE slot_name = '$fslotname' AND synced AND NOT temporary"
);
is($result, 'failover_slot', 'failover slot is synced');

# Insert another row on node P and wait node S to catch up. We
# intentionally performed this insert after syncing logical slot
# as otherwise the local slot's (created during synchronization of
# slot) xmin on standby could be ahead of the remote slot leading
# to failure in synchronization.
$node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('second row')");
$node_p->wait_for_replay_catchup($node_s);

# Create subscription to test its removal
my $dummy_sub = 'regress_sub_dummy';
$node_p->safe_psql($db1,
	"CREATE SUBSCRIPTION $dummy_sub CONNECTION 'dbname=dummy' PUBLICATION pub_dummy WITH (connect=false)"
);
$node_p->wait_for_replay_catchup($node_s);

# Create user-defined publications, wait for streaming replication to sync them
# to the standby, then verify that '--clean'
# removes them.
$node_p->safe_psql(
	$db1, qq(
	CREATE PUBLICATION test_pub1 FOR ALL TABLES;
	CREATE PUBLICATION test_pub2 FOR ALL TABLES;
));

$node_p->wait_for_replay_catchup($node_s);

ok($node_s->safe_psql($db1, "SELECT COUNT(*) = 2 FROM pg_publication"),
	'two pre-existing publications on subscriber');

$node_s->stop;

# dry run mode on node S
command_ok(
	[
		'pg_createsubscriber',
		'--verbose',
		'--dry-run',
		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
		'--pgdata' => $node_s->data_dir,
		'--publisher-server' => $node_p->connstr($db1),
		'--socketdir' => $node_s->host,
		'--subscriber-port' => $node_s->port,
		'--publication' => 'pub1',
		'--publication' => 'pub2',
		'--subscription' => 'sub1',
		'--subscription' => 'sub2',
		'--database' => $db1,
		'--database' => $db2,
	],
	'run pg_createsubscriber --dry-run on node S');

# Check if node S is still a standby
$node_s->start;
is($node_s->safe_psql('postgres', 'SELECT pg_catalog.pg_is_in_recovery()'),
	't', 'standby is in recovery');
$node_s->stop;

# pg_createsubscriber can run without --databases option
command_ok(
	[
		'pg_createsubscriber',
		'--verbose',
		'--dry-run',
		'--pgdata' => $node_s->data_dir,
		'--publisher-server' => $node_p->connstr($db1),
		'--socketdir' => $node_s->host,
		'--subscriber-port' => $node_s->port,
		'--replication-slot' => 'replslot1',
	],
	'run pg_createsubscriber without --databases');

# run pg_createsubscriber with '--database' and '--all' without '--dry-run'
# and verify the failure
command_fails_like(
	[
		'pg_createsubscriber',
		'--verbose',
		'--pgdata' => $node_s->data_dir,
		'--publisher-server' => $node_p->connstr($db1),
		'--socketdir' => $node_s->host,
		'--subscriber-port' => $node_s->port,
		'--database' => $db1,
		'--all',
	],
	qr/options --database and -a\/--all cannot be used together/,
	'fail if --database is used with --all');

# run pg_createsubscriber with '--publication' and '--all' and verify
# the failure
command_fails_like(
	[
		'pg_createsubscriber',
		'--verbose',
		'--dry-run',
		'--pgdata' => $node_s->data_dir,
		'--publisher-server' => $node_p->connstr($db1),
		'--socketdir' => $node_s->host,
		'--subscriber-port' => $node_s->port,
		'--all',
		'--publication' => 'pub1',
	],
	qr/options --publication and -a\/--all cannot be used together/,
	'fail if --publication is used with --all');

# run pg_createsubscriber with '--all' option
my ($stdout, $stderr) = run_command(
	[
		'pg_createsubscriber',
		'--verbose',
		'--dry-run',
		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
		'--pgdata' => $node_s->data_dir,
		'--publisher-server' => $node_p->connstr,
		'--socketdir' => $node_s->host,
		'--subscriber-port' => $node_s->port,
		'--all',
	],
	'run pg_createsubscriber with --all');

# Verify that the required logical replication objects are output.
# The expected count 3 refers to postgres, $db1 and $db2 databases.
is(scalar(() = $stderr =~ /creating publication/g),
	3, "verify publications are created for all databases");
is(scalar(() = $stderr =~ /creating the replication slot/g),
	3, "verify replication slots are created for all databases");
is(scalar(() = $stderr =~ /creating subscription/g),
	3, "verify subscriptions are created for all databases");

# Run pg_createsubscriber on node S.  --verbose is used twice
# to show more information.
# In passing, also test the --enable-two-phase option and
# --clean option
command_ok(
	[
		'pg_createsubscriber',
		'--verbose', '--verbose',
		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
		'--pgdata' => $node_s->data_dir,
		'--publisher-server' => $node_p->connstr($db1),
		'--socketdir' => $node_s->host,
		'--subscriber-port' => $node_s->port,
		'--publication' => 'pub1',
		'--publication' => 'pub2',
		'--replication-slot' => 'replslot1',
		'--replication-slot' => 'replslot2',
		'--database' => $db1,
		'--database' => $db2,
		'--enable-two-phase',
		'--clean' => 'publications',
	],
	'run pg_createsubscriber on node S');

# Confirm the physical replication slot has been removed
$result = $node_p->safe_psql($db1,
	"SELECT count(*) FROM pg_replication_slots WHERE slot_name = '$slotname'"
);
is($result, qq(0),
	'the physical replication slot used as primary_slot_name has been removed'
);

# Insert rows on P
$node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('third row')");
$node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");

# Start subscriber
$node_s->start;

# Confirm publications are removed from the subscriber node
is($node_s->safe_psql($db1, "SELECT COUNT(*) FROM pg_publication;"),
	'0', 'all publications on subscriber have been removed');

# Verify that all subtwophase states are pending or enabled,
# e.g. there are no subscriptions where subtwophase is disabled ('d')
is( $node_s->safe_psql(
		'postgres',
		"SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate = 'd'"
	),
	't',
	'subscriptions are created with the two-phase option enabled');

# Confirm the pre-existing subscription has been removed
$result = $node_s->safe_psql(
	'postgres', qq(
	SELECT count(*) FROM pg_subscription WHERE subname = '$dummy_sub'
));
is($result, qq(0), 'pre-existing subscription was dropped');

# Get subscription names
$result = $node_s->safe_psql(
	'postgres', qq(
	SELECT subname FROM pg_subscription WHERE subname ~ '^pg_createsubscriber_'
));
my @subnames = split("\n", $result);

# Wait subscriber to catch up
$node_s->wait_for_subscription_sync($node_p, $subnames[0]);
$node_s->wait_for_subscription_sync($node_p, $subnames[1]);

# Confirm the failover slot has been removed
$result = $node_s->safe_psql($db1,
	"SELECT count(*) FROM pg_replication_slots WHERE slot_name = '$fslotname'"
);
is($result, qq(0), 'failover slot was removed');

# Check result in database $db1
$result = $node_s->safe_psql($db1, 'SELECT * FROM tbl1');
is( $result, qq(first row
second row
third row),
	"logical replication works in database $db1");

# Check result in database $db2
$result = $node_s->safe_psql($db2, 'SELECT * FROM tbl2');
is($result, qq(row 1), "logical replication works in database $db2");

# Different system identifier?
my $sysid_p = $node_p->safe_psql('postgres',
	'SELECT system_identifier FROM pg_control_system()');
my $sysid_s = $node_s->safe_psql('postgres',
	'SELECT system_identifier FROM pg_control_system()');
ok($sysid_p != $sysid_s, 'system identifier was changed');

# clean up
$node_p->teardown_node;
$node_s->teardown_node;
$node_t->teardown_node;
$node_f->teardown_node;

done_testing();
