From 595cf1782c0a0a4ba92ba8dbfe6ad528324b420e Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Mon, 20 May 2019 22:40:07 -0400 Subject: [PATCH] Switch DNS aggregator to PostgreSQL MariaDB+Galera was terribly unstable, with the cluster failing to start or dying randomly, and generally seemed incredibly unsuitable for an HA solution. This commit switches the DNS aggregator SQL backend to PostgreSQL, implemented via Patroni HA. It also manages the Patroni state, forcing the primary instance to follow the PVC coordinator, such that the active DNS Aggregator instance is always able to communicate read+write with the local system. This required some logic changes to how the DNS Aggregator worked, specifically ensuring that database changes aren't attempted while the instance isn't actively running - to be honest this was a bug anyways that had just never been noticed. Closes #34 --- debian/control | 2 +- node-daemon/pvcd.sample.yaml | 14 +- node-daemon/pvcd/DNSAggregatorInstance.py | 217 ++++++++++++++-------- node-daemon/pvcd/Daemon.py | 14 +- node-daemon/pvcd/NodeInstance.py | 21 ++- 5 files changed, 171 insertions(+), 97 deletions(-) diff --git a/debian/control b/debian/control index 5e6ec45c..6c7eb7b7 100644 --- a/debian/control +++ b/debian/control @@ -8,7 +8,7 @@ X-Python3-Version: >= 3.2 Package: pvc-daemon Architecture: all -Depends: python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-mysqldb, python3-dnspython, python3-yaml, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-mysql +Depends: python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-psycopg2, python3-dnspython, python3-yaml, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-mysql Suggests: pvc-client-cli Description: Parallel Virtual Cluster virtualization daemon (Python 3) The Parallel Virtual Cluster provides a management solution for QEMU/KVM virtual clusters. diff --git a/node-daemon/pvcd.sample.yaml b/node-daemon/pvcd.sample.yaml index e85e66d3..a06792e2 100644 --- a/node-daemon/pvcd.sample.yaml +++ b/node-daemon/pvcd.sample.yaml @@ -61,17 +61,17 @@ pvc: coordinator: # dns: DNS aggregator subsystem dns: - # database: MySQL Galera database configuration + # database: Patroni PostgreSQL database configuration database: - # host: MySQL hostname, invariably 'localhost' + # host: PostgreSQL hostname, invariably 'localhost' host: localhost - # port: MySQL port, invariably 'localhost' - port: 3306 - # name: MySQL database name, invariably 'pvcdns' + # port: PostgreSQL port, invariably 'localhost' + port: 5432 + # name: PostgreSQL database name, invariably 'pvcdns' name: pvcdns - # user: MySQL username, invariable 'pvcdns' + # user: PostgreSQL username, invariable 'pvcdns' user: pvcdns - # pass: MySQL user password, randomly generated + # pass: PostgreSQL user password, randomly generated pass: pvcdns # system: Local PVC instance configuration system: diff --git a/node-daemon/pvcd/DNSAggregatorInstance.py b/node-daemon/pvcd/DNSAggregatorInstance.py index 88fbaa9a..681d8cbc 100644 --- a/node-daemon/pvcd/DNSAggregatorInstance.py +++ b/node-daemon/pvcd/DNSAggregatorInstance.py @@ -26,7 +26,7 @@ import time import threading import dns.zone import dns.query -import MySQLdb as mysqldb +import psycopg2 import pvcd.log as log import pvcd.zkhandler as zkhandler @@ -41,28 +41,24 @@ class DNSAggregatorInstance(object): self.dns_networks = dict() self.is_active = False - self.mysql_conn = mysqldb.connect( - host=self.config['pdns_mysql_host'], - port=int(self.config['pdns_mysql_port']), - user=self.config['pdns_mysql_user'], - passwd=self.config['pdns_mysql_password'], - db=self.config['pdns_mysql_dbname'] - ) - self.dns_server_daemon = PowerDNSInstance(self.config, self.logger, self.dns_networks) - self.dns_axfr_daemon = AXFRDaemonInstance(self.config, self.logger, self.mysql_conn, self.dns_networks) + self.dns_server_daemon = PowerDNSInstance(self) + self.dns_axfr_daemon = AXFRDaemonInstance(self) # Start up the PowerDNS instance def start_aggregator(self): + # Restart the SQL connection self.dns_server_daemon.start() self.dns_axfr_daemon.start() + self.is_active = True # Stop the PowerDNS instance def stop_aggregator(self): + self.is_active = False self.dns_axfr_daemon.stop() self.dns_server_daemon.stop() def add_network(self, network): - self.dns_networks[network] = DNSNetworkInstance(self.config, self.logger, self.mysql_conn, network) + self.dns_networks[network] = DNSNetworkInstance(self, network) self.dns_networks[network].add_network() self.dns_axfr_daemon.update_networks(self.dns_networks) @@ -74,9 +70,10 @@ class DNSAggregatorInstance(object): class PowerDNSInstance(object): # Initialization function - def __init__(self, config, logger, dns_networks): - self.config = config - self.logger = logger + def __init__(self, aggregator): + self.aggregator = aggregator + self.config = self.aggregator.config + self.logger = self.aggregator.logger self.dns_server_daemon = None # Floating upstreams @@ -88,7 +85,7 @@ class PowerDNSInstance(object): def start(self): self.logger.out( 'Starting PowerDNS zone aggregator', - state='o' + state='i' ) # Define the PowerDNS config dns_configuration = [ @@ -112,18 +109,18 @@ class PowerDNSInstance(object): '--default-soa-name=dns.pvc.local', # Override dnsmasq's invalid name '--socket-dir={}'.format(self.config['pdns_dynamic_directory']), # Standard socket directory - '--launch=gmysql', # Use the MySQL backend - '--gmysql-host={}'.format(self.config['pdns_mysql_host']), - # MySQL instance - '--gmysql-port={}'.format(self.config['pdns_mysql_port']), + '--launch=gpgsql', # Use the PostgreSQL backend + '--gpgsql-host={}'.format(self.config['pdns_postgresql_host']), + # PostgreSQL instance + '--gpgsql-port={}'.format(self.config['pdns_postgresql_port']), # Default port - '--gmysql-dbname={}'.format(self.config['pdns_mysql_dbname']), + '--gpgsql-dbname={}'.format(self.config['pdns_postgresql_dbname']), # Database name - '--gmysql-user={}'.format(self.config['pdns_mysql_user']), + '--gpgsql-user={}'.format(self.config['pdns_postgresql_user']), # User name - '--gmysql-password={}'.format(self.config['pdns_mysql_password']), + '--gpgsql-password={}'.format(self.config['pdns_postgresql_password']), # User password - '--gmysql-dnssec=no', # Do DNSSEC elsewhere + '--gpgsql-dnssec=no', # Do DNSSEC elsewhere ] # Start the pdns process in a thread self.dns_server_daemon = common.run_os_daemon( @@ -133,24 +130,35 @@ class PowerDNSInstance(object): environment=None, logfile='{}/pdns-aggregator.log'.format(self.config['pdns_log_directory']) ) + if self.dns_server_daemon: + self.logger.out( + 'Successfully started PowerDNS zone aggregator', + state='o' + ) + def stop(self): if self.dns_server_daemon: self.logger.out( 'Stopping PowerDNS zone aggregator', - state='o' + state='i' ) # Terminate, then kill self.dns_server_daemon.signal('term') time.sleep(0.2) self.dns_server_daemon.signal('kill') + self.logger.out( + 'Successfully stopped PowerDNS zone aggregator', + state='o' + ) class DNSNetworkInstance(object): # Initialization function - def __init__(self, config, logger, mysql_conn, network): - self.config = config - self.logger = logger - self.mysql_conn = mysql_conn + def __init__(self, aggregator, network): + self.aggregator = aggregator + self.config = self.aggregator.config + self.logger = self.aggregator.logger + self.sql_conn = None self.network = network # Add a new network to the aggregator database @@ -170,13 +178,22 @@ class DNSNetworkInstance(object): ) # Connect to the database - mysql_curs = self.mysql_conn.cursor() + self.sql_conn = psycopg2.connect( + "host='{}' port='{}' dbname='{}' user='{}' password='{}' sslmode='disable'".format( + self.config['pdns_postgresql_host'], + self.config['pdns_postgresql_port'], + self.config['pdns_postgresql_dbname'], + self.config['pdns_postgresql_user'], + self.config['pdns_postgresql_password'] + ) + ) + sql_curs = self.sql_conn.cursor() # Try to access the domains entry - mysql_curs.execute( - 'SELECT * FROM domains WHERE name=%s', + sql_curs.execute( + "SELECT * FROM domains WHERE name=%s", (network_domain,) ) - results = mysql_curs.fetchone() + results = sql_curs.fetchone() # If we got back a result, don't try to add the domain to the DB if results: @@ -184,21 +201,21 @@ class DNSNetworkInstance(object): else: write_domain = True - # Write the domain to the database - if write_domain: - mysql_curs.execute( - 'INSERT INTO domains (name, type, account, notified_serial) VALUES (%s, "MASTER", "internal", 0)', + # Write the domain to the database if we're active + if self.aggregator.is_active and write_domain: + sql_curs.execute( + "INSERT INTO domains (name, type, account, notified_serial) VALUES (%s, 'MASTER', 'internal', 0)", (network_domain,) ) - self.mysql_conn.commit() + self.sql_conn.commit() - mysql_curs.execute( - 'SELECT id FROM domains WHERE name=%s', + sql_curs.execute( + "SELECT id FROM domains WHERE name=%s", (network_domain,) ) - domain_id = mysql_curs.fetchone() + domain_id = sql_curs.fetchone() - mysql_curs.execute( + sql_curs.execute( """ INSERT INTO records (domain_id, name, content, type, ttl, prio) VALUES (%s, %s, %s, %s, %s, %s) @@ -208,7 +225,7 @@ class DNSNetworkInstance(object): ns_servers = [network_gateway, 'pvc-ns1.{}'.format(self.config['cluster_domain']), 'pvc-ns2.{}'.format(self.config['cluster_domain'])] for ns_server in ns_servers: - mysql_curs.execute( + sql_curs.execute( """ INSERT INTO records (domain_id, name, content, type, ttl, prio) VALUES (%s, %s, %s, %s, %s, %s) @@ -216,7 +233,9 @@ class DNSNetworkInstance(object): (domain_id, network_domain, ns_server, 'NS', 86400, 0) ) - self.mysql_conn.commit() + self.sql_conn.commit() + self.sql_conn.close() + self.sql_conn = None # Remove a deleted network from the aggregator database def remove_network(self): @@ -229,50 +248,86 @@ class DNSNetworkInstance(object): prefix='DNS aggregator', state='o' ) + # Connect to the database - mysql_curs = self.mysql_conn.cursor() - mysql_curs.execute( - 'SELECT id FROM domains WHERE name=%s', + self.sql_conn = psycopg2.connect( + "host='{}' port='{}' dbname='{}' user='{}' password='{}' sslmode='disable'".format( + self.config['pdns_postgresql_host'], + self.config['pdns_postgresql_port'], + self.config['pdns_postgresql_dbname'], + self.config['pdns_postgresql_user'], + self.config['pdns_postgresql_password'] + ) + ) + sql_curs = self.sql_conn.cursor() + + # Get the domain ID + sql_curs.execute( + "SELECT id FROM domains WHERE name=%s", (network_domain,) ) - domain_id = mysql_curs.fetchone() + domain_id = sql_curs.fetchone() - mysql_curs.execute( - 'DELETE FROM domains WHERE id=%s', - (domain_id,) - ) - mysql_curs.execute( - 'DELETE FROM records WHERE domain_id=%s', - (domain_id,) - ) + # Delete the domain from the database if we're active + if self.aggregator.is_active and domain_id: + sql_curs.execute( + "DELETE FROM domains WHERE id=%s", + (domain_id,) + ) + sql_curs.execute( + "DELETE FROM records WHERE domain_id=%s", + (domain_id,) + ) - self.mysql_conn.commit() + self.sql_conn.commit() + self.sql_conn.close() + self.sql_conn = None class AXFRDaemonInstance(object): # Initialization function - def __init__(self, config, logger, mysql_conn, dns_networks): - self.config = config - self.logger = logger - self.mysql_conn = mysql_conn - self.dns_networks = dns_networks + def __init__(self, aggregator): + self.aggregator = aggregator + self.config = self.aggregator.config + self.logger = self.aggregator.logger + self.dns_networks = self.aggregator.dns_networks self.thread_stopper = threading.Event() self.thread = None + self.sql_conn = None def update_networks(self, dns_networks): self.dns_networks = dns_networks def start(self): + # Create the thread self.thread_stopper.clear() self.thread = threading.Thread(target=self.run, args=(), kwargs={}) + + # Start a local instance of the SQL connection + # Trying to use the instance from the main DNS Aggregator can result in connection failures + # after the leader transitions + self.sql_conn = psycopg2.connect( + "host='{}' port='{}' dbname='{}' user='{}' password='{}' sslmode='disable'".format( + self.config['pdns_postgresql_host'], + self.config['pdns_postgresql_port'], + self.config['pdns_postgresql_dbname'], + self.config['pdns_postgresql_user'], + self.config['pdns_postgresql_password'] + ) + ) + + # Start the thread self.thread.start() def stop(self): self.thread_stopper.set() + if self.sql_conn: + self.sql_conn.close() + self.sql_conn = None def run(self): # Wait for all the DNSMASQ instances to actually start - time.sleep(2) + time.sleep(5) while not self.thread_stopper.is_set(): # We do this for each network @@ -317,17 +372,17 @@ class AXFRDaemonInstance(object): # # Get the current zone from the database # - mysql_curs = self.mysql_conn.cursor() - mysql_curs.execute( - 'SELECT id FROM domains WHERE name=%s', + sql_curs = self.sql_conn.cursor() + sql_curs.execute( + "SELECT id FROM domains WHERE name=%s", (domain,) ) - domain_id = mysql_curs.fetchone() - mysql_curs.execute( - 'SELECT * FROM records WHERE domain_id=%s', + domain_id = sql_curs.fetchone() + sql_curs.execute( + "SELECT * FROM records WHERE domain_id=%s", (domain_id,) ) - results = list(mysql_curs.fetchall()) + results = list(sql_curs.fetchall()) # Fix the formatting because it's useless for comparison # reference: ((10, 28, 'testnet01.i.bonilan.net', 'SOA', 'nsX.pvc.local root.pvc.local 1 10800 1800 86400 86400', 86400, 0, None, 0, None, 1), etc.) @@ -376,9 +431,9 @@ class AXFRDaemonInstance(object): if len(remove_records) > 0: # Remove the invalid old records for record_id in remove_records: - mysql_curs = self.mysql_conn.cursor() - mysql_curs.execute( - 'DELETE FROM records WHERE id=%s', + sql_curs = self.sql_conn.cursor() + sql_curs.execute( + "DELETE FROM records WHERE id=%s", (record_id,) ) changed = True @@ -392,30 +447,30 @@ class AXFRDaemonInstance(object): rttl = record[1] rtype = record[3] rdata = record[4] - mysql_curs = self.mysql_conn.cursor() - mysql_curs.execute( - 'INSERT INTO records (domain_id, name, ttl, type, prio, content) VALUES (%s, %s, %s, %s, %s, %s)', + sql_curs = self.sql_conn.cursor() + sql_curs.execute( + "INSERT INTO records (domain_id, name, ttl, type, prio, content) VALUES (%s, %s, %s, %s, %s, %s)", (domain_id, rname, rttl, rtype, 0, rdata) ) changed = True if changed: # Increase SOA serial - mysql_curs.execute( - 'SELECT content FROM records WHERE domain_id=%s AND type="SOA"', + sql_curs.execute( + "SELECT content FROM records WHERE domain_id=%s AND type='SOA'", (domain_id,) ) - soa_record = list(mysql_curs.fetchone())[0].split() + soa_record = list(sql_curs.fetchone())[0].split() current_serial = int(soa_record[2]) new_serial = current_serial + 1 soa_record[2] = str(new_serial) - mysql_curs.execute( - 'UPDATE records SET content=%s WHERE domain_id=%s AND type="SOA"', + sql_curs.execute( + "UPDATE records SET content=%s WHERE domain_id=%s AND type='SOA'", (' '.join(soa_record), domain_id) ) # Commit all the previous changes - self.mysql_conn.commit() + self.sql_conn.commit() # Reload the domain common.run_os_command( diff --git a/node-daemon/pvcd/Daemon.py b/node-daemon/pvcd/Daemon.py index a0763d1b..050876f9 100644 --- a/node-daemon/pvcd/Daemon.py +++ b/node-daemon/pvcd/Daemon.py @@ -174,11 +174,11 @@ def readConfig(pvcd_config_file, myhostname): 'upstream_floating_ip': o_config['pvc']['cluster']['networks']['upstream']['floating_ip'], 'upstream_network': o_config['pvc']['cluster']['networks']['upstream']['network'], 'upstream_gateway': o_config['pvc']['cluster']['networks']['upstream']['gateway'], - 'pdns_mysql_host': o_config['pvc']['coordinator']['dns']['database']['host'], - 'pdns_mysql_port': o_config['pvc']['coordinator']['dns']['database']['port'], - 'pdns_mysql_dbname': o_config['pvc']['coordinator']['dns']['database']['name'], - 'pdns_mysql_user': o_config['pvc']['coordinator']['dns']['database']['user'], - 'pdns_mysql_password': o_config['pvc']['coordinator']['dns']['database']['pass'], + 'pdns_postgresql_host': o_config['pvc']['coordinator']['dns']['database']['host'], + 'pdns_postgresql_port': o_config['pvc']['coordinator']['dns']['database']['port'], + 'pdns_postgresql_dbname': o_config['pvc']['coordinator']['dns']['database']['name'], + 'pdns_postgresql_user': o_config['pvc']['coordinator']['dns']['database']['user'], + 'pdns_postgresql_password': o_config['pvc']['coordinator']['dns']['database']['pass'], 'vni_dev': o_config['pvc']['system']['configuration']['networking']['devices']['cluster'], 'vni_dev_ip': o_config['pvc']['system']['configuration']['networking']['addresses']['cluster'], 'storage_dev': o_config['pvc']['system']['configuration']['networking']['devices']['storage'], @@ -399,8 +399,8 @@ if enable_hypervisor: if enable_networking: if config['daemon_mode'] == 'coordinator': - logger.out('Starting MariaDB daemon', state='i') - common.run_os_command('systemctl start mariadb.service') + logger.out('Starting Patroni daemon', state='i') + common.run_os_command('systemctl start patroni.service') logger.out('Starting FRRouting daemon', state='i') common.run_os_command('systemctl start frr.service') diff --git a/node-daemon/pvcd/NodeInstance.py b/node-daemon/pvcd/NodeInstance.py index 98fa645c..16f6d15d 100644 --- a/node-daemon/pvcd/NodeInstance.py +++ b/node-daemon/pvcd/NodeInstance.py @@ -269,7 +269,26 @@ class NodeInstance(object): for network in self.d_network: self.d_network[network].createGateways() self.d_network[network].startDHCPServer() - time.sleep(0.5) + time.sleep(1) + # Force Patroni to switch to the local instance + self.logger.out('Setting Patroni leader to this node', state='i') + retcode, stdout, stderr = common.run_os_command( + """ + patronictl + -c /etc/patroni/config.yml + -d zookeeper://localhost:2181 + switchover + --candidate {} + --force + pvcdns + """.format(self.name) + ) + if stdout: + self.logger.out('Successfully switched Patroni leader\n{}'.format(stdout), state='o') + else: + self.logger.out('Failed to switch Patroni leader\n{}'.format(stderr), state='e') + time.sleep(1) + # Start the DNS aggregator instance self.dns_aggregator.start_aggregator() def createFloatingAddresses(self):