diff --git a/debian/control b/debian/control index 5e6ec45c..6c7eb7b7 100644 --- a/debian/control +++ b/debian/control @@ -8,7 +8,7 @@ X-Python3-Version: >= 3.2 Package: pvc-daemon Architecture: all -Depends: python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-mysqldb, python3-dnspython, python3-yaml, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-mysql +Depends: python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-psycopg2, python3-dnspython, python3-yaml, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-mysql Suggests: pvc-client-cli Description: Parallel Virtual Cluster virtualization daemon (Python 3) The Parallel Virtual Cluster provides a management solution for QEMU/KVM virtual clusters. diff --git a/node-daemon/pvcd.sample.yaml b/node-daemon/pvcd.sample.yaml index e85e66d3..a06792e2 100644 --- a/node-daemon/pvcd.sample.yaml +++ b/node-daemon/pvcd.sample.yaml @@ -61,17 +61,17 @@ pvc: coordinator: # dns: DNS aggregator subsystem dns: - # database: MySQL Galera database configuration + # database: Patroni PostgreSQL database configuration database: - # host: MySQL hostname, invariably 'localhost' + # host: PostgreSQL hostname, invariably 'localhost' host: localhost - # port: MySQL port, invariably 'localhost' - port: 3306 - # name: MySQL database name, invariably 'pvcdns' + # port: PostgreSQL port, invariably 'localhost' + port: 5432 + # name: PostgreSQL database name, invariably 'pvcdns' name: pvcdns - # user: MySQL username, invariable 'pvcdns' + # user: PostgreSQL username, invariable 'pvcdns' user: pvcdns - # pass: MySQL user password, randomly generated + # pass: PostgreSQL user password, randomly generated pass: pvcdns # system: Local PVC instance configuration system: diff --git a/node-daemon/pvcd/DNSAggregatorInstance.py b/node-daemon/pvcd/DNSAggregatorInstance.py index 88fbaa9a..681d8cbc 100644 --- a/node-daemon/pvcd/DNSAggregatorInstance.py +++ b/node-daemon/pvcd/DNSAggregatorInstance.py @@ -26,7 +26,7 @@ import time import threading import dns.zone import dns.query -import MySQLdb as mysqldb +import psycopg2 import pvcd.log as log import pvcd.zkhandler as zkhandler @@ -41,28 +41,24 @@ class DNSAggregatorInstance(object): self.dns_networks = dict() self.is_active = False - self.mysql_conn = mysqldb.connect( - host=self.config['pdns_mysql_host'], - port=int(self.config['pdns_mysql_port']), - user=self.config['pdns_mysql_user'], - passwd=self.config['pdns_mysql_password'], - db=self.config['pdns_mysql_dbname'] - ) - self.dns_server_daemon = PowerDNSInstance(self.config, self.logger, self.dns_networks) - self.dns_axfr_daemon = AXFRDaemonInstance(self.config, self.logger, self.mysql_conn, self.dns_networks) + self.dns_server_daemon = PowerDNSInstance(self) + self.dns_axfr_daemon = AXFRDaemonInstance(self) # Start up the PowerDNS instance def start_aggregator(self): + # Restart the SQL connection self.dns_server_daemon.start() self.dns_axfr_daemon.start() + self.is_active = True # Stop the PowerDNS instance def stop_aggregator(self): + self.is_active = False self.dns_axfr_daemon.stop() self.dns_server_daemon.stop() def add_network(self, network): - self.dns_networks[network] = DNSNetworkInstance(self.config, self.logger, self.mysql_conn, network) + self.dns_networks[network] = DNSNetworkInstance(self, network) self.dns_networks[network].add_network() self.dns_axfr_daemon.update_networks(self.dns_networks) @@ -74,9 +70,10 @@ class DNSAggregatorInstance(object): class PowerDNSInstance(object): # Initialization function - def __init__(self, config, logger, dns_networks): - self.config = config - self.logger = logger + def __init__(self, aggregator): + self.aggregator = aggregator + self.config = self.aggregator.config + self.logger = self.aggregator.logger self.dns_server_daemon = None # Floating upstreams @@ -88,7 +85,7 @@ class PowerDNSInstance(object): def start(self): self.logger.out( 'Starting PowerDNS zone aggregator', - state='o' + state='i' ) # Define the PowerDNS config dns_configuration = [ @@ -112,18 +109,18 @@ class PowerDNSInstance(object): '--default-soa-name=dns.pvc.local', # Override dnsmasq's invalid name '--socket-dir={}'.format(self.config['pdns_dynamic_directory']), # Standard socket directory - '--launch=gmysql', # Use the MySQL backend - '--gmysql-host={}'.format(self.config['pdns_mysql_host']), - # MySQL instance - '--gmysql-port={}'.format(self.config['pdns_mysql_port']), + '--launch=gpgsql', # Use the PostgreSQL backend + '--gpgsql-host={}'.format(self.config['pdns_postgresql_host']), + # PostgreSQL instance + '--gpgsql-port={}'.format(self.config['pdns_postgresql_port']), # Default port - '--gmysql-dbname={}'.format(self.config['pdns_mysql_dbname']), + '--gpgsql-dbname={}'.format(self.config['pdns_postgresql_dbname']), # Database name - '--gmysql-user={}'.format(self.config['pdns_mysql_user']), + '--gpgsql-user={}'.format(self.config['pdns_postgresql_user']), # User name - '--gmysql-password={}'.format(self.config['pdns_mysql_password']), + '--gpgsql-password={}'.format(self.config['pdns_postgresql_password']), # User password - '--gmysql-dnssec=no', # Do DNSSEC elsewhere + '--gpgsql-dnssec=no', # Do DNSSEC elsewhere ] # Start the pdns process in a thread self.dns_server_daemon = common.run_os_daemon( @@ -133,24 +130,35 @@ class PowerDNSInstance(object): environment=None, logfile='{}/pdns-aggregator.log'.format(self.config['pdns_log_directory']) ) + if self.dns_server_daemon: + self.logger.out( + 'Successfully started PowerDNS zone aggregator', + state='o' + ) + def stop(self): if self.dns_server_daemon: self.logger.out( 'Stopping PowerDNS zone aggregator', - state='o' + state='i' ) # Terminate, then kill self.dns_server_daemon.signal('term') time.sleep(0.2) self.dns_server_daemon.signal('kill') + self.logger.out( + 'Successfully stopped PowerDNS zone aggregator', + state='o' + ) class DNSNetworkInstance(object): # Initialization function - def __init__(self, config, logger, mysql_conn, network): - self.config = config - self.logger = logger - self.mysql_conn = mysql_conn + def __init__(self, aggregator, network): + self.aggregator = aggregator + self.config = self.aggregator.config + self.logger = self.aggregator.logger + self.sql_conn = None self.network = network # Add a new network to the aggregator database @@ -170,13 +178,22 @@ class DNSNetworkInstance(object): ) # Connect to the database - mysql_curs = self.mysql_conn.cursor() + self.sql_conn = psycopg2.connect( + "host='{}' port='{}' dbname='{}' user='{}' password='{}' sslmode='disable'".format( + self.config['pdns_postgresql_host'], + self.config['pdns_postgresql_port'], + self.config['pdns_postgresql_dbname'], + self.config['pdns_postgresql_user'], + self.config['pdns_postgresql_password'] + ) + ) + sql_curs = self.sql_conn.cursor() # Try to access the domains entry - mysql_curs.execute( - 'SELECT * FROM domains WHERE name=%s', + sql_curs.execute( + "SELECT * FROM domains WHERE name=%s", (network_domain,) ) - results = mysql_curs.fetchone() + results = sql_curs.fetchone() # If we got back a result, don't try to add the domain to the DB if results: @@ -184,21 +201,21 @@ class DNSNetworkInstance(object): else: write_domain = True - # Write the domain to the database - if write_domain: - mysql_curs.execute( - 'INSERT INTO domains (name, type, account, notified_serial) VALUES (%s, "MASTER", "internal", 0)', + # Write the domain to the database if we're active + if self.aggregator.is_active and write_domain: + sql_curs.execute( + "INSERT INTO domains (name, type, account, notified_serial) VALUES (%s, 'MASTER', 'internal', 0)", (network_domain,) ) - self.mysql_conn.commit() + self.sql_conn.commit() - mysql_curs.execute( - 'SELECT id FROM domains WHERE name=%s', + sql_curs.execute( + "SELECT id FROM domains WHERE name=%s", (network_domain,) ) - domain_id = mysql_curs.fetchone() + domain_id = sql_curs.fetchone() - mysql_curs.execute( + sql_curs.execute( """ INSERT INTO records (domain_id, name, content, type, ttl, prio) VALUES (%s, %s, %s, %s, %s, %s) @@ -208,7 +225,7 @@ class DNSNetworkInstance(object): ns_servers = [network_gateway, 'pvc-ns1.{}'.format(self.config['cluster_domain']), 'pvc-ns2.{}'.format(self.config['cluster_domain'])] for ns_server in ns_servers: - mysql_curs.execute( + sql_curs.execute( """ INSERT INTO records (domain_id, name, content, type, ttl, prio) VALUES (%s, %s, %s, %s, %s, %s) @@ -216,7 +233,9 @@ class DNSNetworkInstance(object): (domain_id, network_domain, ns_server, 'NS', 86400, 0) ) - self.mysql_conn.commit() + self.sql_conn.commit() + self.sql_conn.close() + self.sql_conn = None # Remove a deleted network from the aggregator database def remove_network(self): @@ -229,50 +248,86 @@ class DNSNetworkInstance(object): prefix='DNS aggregator', state='o' ) + # Connect to the database - mysql_curs = self.mysql_conn.cursor() - mysql_curs.execute( - 'SELECT id FROM domains WHERE name=%s', + self.sql_conn = psycopg2.connect( + "host='{}' port='{}' dbname='{}' user='{}' password='{}' sslmode='disable'".format( + self.config['pdns_postgresql_host'], + self.config['pdns_postgresql_port'], + self.config['pdns_postgresql_dbname'], + self.config['pdns_postgresql_user'], + self.config['pdns_postgresql_password'] + ) + ) + sql_curs = self.sql_conn.cursor() + + # Get the domain ID + sql_curs.execute( + "SELECT id FROM domains WHERE name=%s", (network_domain,) ) - domain_id = mysql_curs.fetchone() + domain_id = sql_curs.fetchone() - mysql_curs.execute( - 'DELETE FROM domains WHERE id=%s', - (domain_id,) - ) - mysql_curs.execute( - 'DELETE FROM records WHERE domain_id=%s', - (domain_id,) - ) + # Delete the domain from the database if we're active + if self.aggregator.is_active and domain_id: + sql_curs.execute( + "DELETE FROM domains WHERE id=%s", + (domain_id,) + ) + sql_curs.execute( + "DELETE FROM records WHERE domain_id=%s", + (domain_id,) + ) - self.mysql_conn.commit() + self.sql_conn.commit() + self.sql_conn.close() + self.sql_conn = None class AXFRDaemonInstance(object): # Initialization function - def __init__(self, config, logger, mysql_conn, dns_networks): - self.config = config - self.logger = logger - self.mysql_conn = mysql_conn - self.dns_networks = dns_networks + def __init__(self, aggregator): + self.aggregator = aggregator + self.config = self.aggregator.config + self.logger = self.aggregator.logger + self.dns_networks = self.aggregator.dns_networks self.thread_stopper = threading.Event() self.thread = None + self.sql_conn = None def update_networks(self, dns_networks): self.dns_networks = dns_networks def start(self): + # Create the thread self.thread_stopper.clear() self.thread = threading.Thread(target=self.run, args=(), kwargs={}) + + # Start a local instance of the SQL connection + # Trying to use the instance from the main DNS Aggregator can result in connection failures + # after the leader transitions + self.sql_conn = psycopg2.connect( + "host='{}' port='{}' dbname='{}' user='{}' password='{}' sslmode='disable'".format( + self.config['pdns_postgresql_host'], + self.config['pdns_postgresql_port'], + self.config['pdns_postgresql_dbname'], + self.config['pdns_postgresql_user'], + self.config['pdns_postgresql_password'] + ) + ) + + # Start the thread self.thread.start() def stop(self): self.thread_stopper.set() + if self.sql_conn: + self.sql_conn.close() + self.sql_conn = None def run(self): # Wait for all the DNSMASQ instances to actually start - time.sleep(2) + time.sleep(5) while not self.thread_stopper.is_set(): # We do this for each network @@ -317,17 +372,17 @@ class AXFRDaemonInstance(object): # # Get the current zone from the database # - mysql_curs = self.mysql_conn.cursor() - mysql_curs.execute( - 'SELECT id FROM domains WHERE name=%s', + sql_curs = self.sql_conn.cursor() + sql_curs.execute( + "SELECT id FROM domains WHERE name=%s", (domain,) ) - domain_id = mysql_curs.fetchone() - mysql_curs.execute( - 'SELECT * FROM records WHERE domain_id=%s', + domain_id = sql_curs.fetchone() + sql_curs.execute( + "SELECT * FROM records WHERE domain_id=%s", (domain_id,) ) - results = list(mysql_curs.fetchall()) + results = list(sql_curs.fetchall()) # Fix the formatting because it's useless for comparison # reference: ((10, 28, 'testnet01.i.bonilan.net', 'SOA', 'nsX.pvc.local root.pvc.local 1 10800 1800 86400 86400', 86400, 0, None, 0, None, 1), etc.) @@ -376,9 +431,9 @@ class AXFRDaemonInstance(object): if len(remove_records) > 0: # Remove the invalid old records for record_id in remove_records: - mysql_curs = self.mysql_conn.cursor() - mysql_curs.execute( - 'DELETE FROM records WHERE id=%s', + sql_curs = self.sql_conn.cursor() + sql_curs.execute( + "DELETE FROM records WHERE id=%s", (record_id,) ) changed = True @@ -392,30 +447,30 @@ class AXFRDaemonInstance(object): rttl = record[1] rtype = record[3] rdata = record[4] - mysql_curs = self.mysql_conn.cursor() - mysql_curs.execute( - 'INSERT INTO records (domain_id, name, ttl, type, prio, content) VALUES (%s, %s, %s, %s, %s, %s)', + sql_curs = self.sql_conn.cursor() + sql_curs.execute( + "INSERT INTO records (domain_id, name, ttl, type, prio, content) VALUES (%s, %s, %s, %s, %s, %s)", (domain_id, rname, rttl, rtype, 0, rdata) ) changed = True if changed: # Increase SOA serial - mysql_curs.execute( - 'SELECT content FROM records WHERE domain_id=%s AND type="SOA"', + sql_curs.execute( + "SELECT content FROM records WHERE domain_id=%s AND type='SOA'", (domain_id,) ) - soa_record = list(mysql_curs.fetchone())[0].split() + soa_record = list(sql_curs.fetchone())[0].split() current_serial = int(soa_record[2]) new_serial = current_serial + 1 soa_record[2] = str(new_serial) - mysql_curs.execute( - 'UPDATE records SET content=%s WHERE domain_id=%s AND type="SOA"', + sql_curs.execute( + "UPDATE records SET content=%s WHERE domain_id=%s AND type='SOA'", (' '.join(soa_record), domain_id) ) # Commit all the previous changes - self.mysql_conn.commit() + self.sql_conn.commit() # Reload the domain common.run_os_command( diff --git a/node-daemon/pvcd/Daemon.py b/node-daemon/pvcd/Daemon.py index a0763d1b..050876f9 100644 --- a/node-daemon/pvcd/Daemon.py +++ b/node-daemon/pvcd/Daemon.py @@ -174,11 +174,11 @@ def readConfig(pvcd_config_file, myhostname): 'upstream_floating_ip': o_config['pvc']['cluster']['networks']['upstream']['floating_ip'], 'upstream_network': o_config['pvc']['cluster']['networks']['upstream']['network'], 'upstream_gateway': o_config['pvc']['cluster']['networks']['upstream']['gateway'], - 'pdns_mysql_host': o_config['pvc']['coordinator']['dns']['database']['host'], - 'pdns_mysql_port': o_config['pvc']['coordinator']['dns']['database']['port'], - 'pdns_mysql_dbname': o_config['pvc']['coordinator']['dns']['database']['name'], - 'pdns_mysql_user': o_config['pvc']['coordinator']['dns']['database']['user'], - 'pdns_mysql_password': o_config['pvc']['coordinator']['dns']['database']['pass'], + 'pdns_postgresql_host': o_config['pvc']['coordinator']['dns']['database']['host'], + 'pdns_postgresql_port': o_config['pvc']['coordinator']['dns']['database']['port'], + 'pdns_postgresql_dbname': o_config['pvc']['coordinator']['dns']['database']['name'], + 'pdns_postgresql_user': o_config['pvc']['coordinator']['dns']['database']['user'], + 'pdns_postgresql_password': o_config['pvc']['coordinator']['dns']['database']['pass'], 'vni_dev': o_config['pvc']['system']['configuration']['networking']['devices']['cluster'], 'vni_dev_ip': o_config['pvc']['system']['configuration']['networking']['addresses']['cluster'], 'storage_dev': o_config['pvc']['system']['configuration']['networking']['devices']['storage'], @@ -399,8 +399,8 @@ if enable_hypervisor: if enable_networking: if config['daemon_mode'] == 'coordinator': - logger.out('Starting MariaDB daemon', state='i') - common.run_os_command('systemctl start mariadb.service') + logger.out('Starting Patroni daemon', state='i') + common.run_os_command('systemctl start patroni.service') logger.out('Starting FRRouting daemon', state='i') common.run_os_command('systemctl start frr.service') diff --git a/node-daemon/pvcd/NodeInstance.py b/node-daemon/pvcd/NodeInstance.py index 98fa645c..16f6d15d 100644 --- a/node-daemon/pvcd/NodeInstance.py +++ b/node-daemon/pvcd/NodeInstance.py @@ -269,7 +269,26 @@ class NodeInstance(object): for network in self.d_network: self.d_network[network].createGateways() self.d_network[network].startDHCPServer() - time.sleep(0.5) + time.sleep(1) + # Force Patroni to switch to the local instance + self.logger.out('Setting Patroni leader to this node', state='i') + retcode, stdout, stderr = common.run_os_command( + """ + patronictl + -c /etc/patroni/config.yml + -d zookeeper://localhost:2181 + switchover + --candidate {} + --force + pvcdns + """.format(self.name) + ) + if stdout: + self.logger.out('Successfully switched Patroni leader\n{}'.format(stdout), state='o') + else: + self.logger.out('Failed to switch Patroni leader\n{}'.format(stderr), state='e') + time.sleep(1) + # Start the DNS aggregator instance self.dns_aggregator.start_aggregator() def createFloatingAddresses(self):