Switch DNS aggregator to PostgreSQL

MariaDB+Galera was terribly unstable, with the cluster failing to
start or dying randomly, and generally seemed incredibly unsuitable
for an HA solution. This commit switches the DNS aggregator SQL
backend to PostgreSQL, implemented via Patroni HA.

It also manages the Patroni state, forcing the primary instance to
follow the PVC coordinator, such that the active DNS Aggregator
instance is always able to communicate read+write with the local
system.

This required some logic changes to how the DNS Aggregator worked,
specifically ensuring that database changes aren't attempted while
the instance isn't actively running - to be honest this was a bug
anyways that had just never been noticed.

Closes #34
This commit is contained in:
Joshua Boniface 2019-05-20 22:40:07 -04:00
parent 73443ecbaf
commit 595cf1782c
5 changed files with 171 additions and 97 deletions

2
debian/control vendored
View File

@ -8,7 +8,7 @@ X-Python3-Version: >= 3.2
Package: pvc-daemon
Architecture: all
Depends: python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-mysqldb, python3-dnspython, python3-yaml, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-mysql
Depends: python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-psycopg2, python3-dnspython, python3-yaml, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-mysql
Suggests: pvc-client-cli
Description: Parallel Virtual Cluster virtualization daemon (Python 3)
The Parallel Virtual Cluster provides a management solution for QEMU/KVM virtual clusters.

View File

@ -61,17 +61,17 @@ pvc:
coordinator:
# dns: DNS aggregator subsystem
dns:
# database: MySQL Galera database configuration
# database: Patroni PostgreSQL database configuration
database:
# host: MySQL hostname, invariably 'localhost'
# host: PostgreSQL hostname, invariably 'localhost'
host: localhost
# port: MySQL port, invariably 'localhost'
port: 3306
# name: MySQL database name, invariably 'pvcdns'
# port: PostgreSQL port, invariably 'localhost'
port: 5432
# name: PostgreSQL database name, invariably 'pvcdns'
name: pvcdns
# user: MySQL username, invariable 'pvcdns'
# user: PostgreSQL username, invariable 'pvcdns'
user: pvcdns
# pass: MySQL user password, randomly generated
# pass: PostgreSQL user password, randomly generated
pass: pvcdns
# system: Local PVC instance configuration
system:

View File

@ -26,7 +26,7 @@ import time
import threading
import dns.zone
import dns.query
import MySQLdb as mysqldb
import psycopg2
import pvcd.log as log
import pvcd.zkhandler as zkhandler
@ -41,28 +41,24 @@ class DNSAggregatorInstance(object):
self.dns_networks = dict()
self.is_active = False
self.mysql_conn = mysqldb.connect(
host=self.config['pdns_mysql_host'],
port=int(self.config['pdns_mysql_port']),
user=self.config['pdns_mysql_user'],
passwd=self.config['pdns_mysql_password'],
db=self.config['pdns_mysql_dbname']
)
self.dns_server_daemon = PowerDNSInstance(self.config, self.logger, self.dns_networks)
self.dns_axfr_daemon = AXFRDaemonInstance(self.config, self.logger, self.mysql_conn, self.dns_networks)
self.dns_server_daemon = PowerDNSInstance(self)
self.dns_axfr_daemon = AXFRDaemonInstance(self)
# Start up the PowerDNS instance
def start_aggregator(self):
# Restart the SQL connection
self.dns_server_daemon.start()
self.dns_axfr_daemon.start()
self.is_active = True
# Stop the PowerDNS instance
def stop_aggregator(self):
self.is_active = False
self.dns_axfr_daemon.stop()
self.dns_server_daemon.stop()
def add_network(self, network):
self.dns_networks[network] = DNSNetworkInstance(self.config, self.logger, self.mysql_conn, network)
self.dns_networks[network] = DNSNetworkInstance(self, network)
self.dns_networks[network].add_network()
self.dns_axfr_daemon.update_networks(self.dns_networks)
@ -74,9 +70,10 @@ class DNSAggregatorInstance(object):
class PowerDNSInstance(object):
# Initialization function
def __init__(self, config, logger, dns_networks):
self.config = config
self.logger = logger
def __init__(self, aggregator):
self.aggregator = aggregator
self.config = self.aggregator.config
self.logger = self.aggregator.logger
self.dns_server_daemon = None
# Floating upstreams
@ -88,7 +85,7 @@ class PowerDNSInstance(object):
def start(self):
self.logger.out(
'Starting PowerDNS zone aggregator',
state='o'
state='i'
)
# Define the PowerDNS config
dns_configuration = [
@ -112,18 +109,18 @@ class PowerDNSInstance(object):
'--default-soa-name=dns.pvc.local', # Override dnsmasq's invalid name
'--socket-dir={}'.format(self.config['pdns_dynamic_directory']),
# Standard socket directory
'--launch=gmysql', # Use the MySQL backend
'--gmysql-host={}'.format(self.config['pdns_mysql_host']),
# MySQL instance
'--gmysql-port={}'.format(self.config['pdns_mysql_port']),
'--launch=gpgsql', # Use the PostgreSQL backend
'--gpgsql-host={}'.format(self.config['pdns_postgresql_host']),
# PostgreSQL instance
'--gpgsql-port={}'.format(self.config['pdns_postgresql_port']),
# Default port
'--gmysql-dbname={}'.format(self.config['pdns_mysql_dbname']),
'--gpgsql-dbname={}'.format(self.config['pdns_postgresql_dbname']),
# Database name
'--gmysql-user={}'.format(self.config['pdns_mysql_user']),
'--gpgsql-user={}'.format(self.config['pdns_postgresql_user']),
# User name
'--gmysql-password={}'.format(self.config['pdns_mysql_password']),
'--gpgsql-password={}'.format(self.config['pdns_postgresql_password']),
# User password
'--gmysql-dnssec=no', # Do DNSSEC elsewhere
'--gpgsql-dnssec=no', # Do DNSSEC elsewhere
]
# Start the pdns process in a thread
self.dns_server_daemon = common.run_os_daemon(
@ -133,24 +130,35 @@ class PowerDNSInstance(object):
environment=None,
logfile='{}/pdns-aggregator.log'.format(self.config['pdns_log_directory'])
)
if self.dns_server_daemon:
self.logger.out(
'Successfully started PowerDNS zone aggregator',
state='o'
)
def stop(self):
if self.dns_server_daemon:
self.logger.out(
'Stopping PowerDNS zone aggregator',
state='o'
state='i'
)
# Terminate, then kill
self.dns_server_daemon.signal('term')
time.sleep(0.2)
self.dns_server_daemon.signal('kill')
self.logger.out(
'Successfully stopped PowerDNS zone aggregator',
state='o'
)
class DNSNetworkInstance(object):
# Initialization function
def __init__(self, config, logger, mysql_conn, network):
self.config = config
self.logger = logger
self.mysql_conn = mysql_conn
def __init__(self, aggregator, network):
self.aggregator = aggregator
self.config = self.aggregator.config
self.logger = self.aggregator.logger
self.sql_conn = None
self.network = network
# Add a new network to the aggregator database
@ -170,13 +178,22 @@ class DNSNetworkInstance(object):
)
# Connect to the database
mysql_curs = self.mysql_conn.cursor()
self.sql_conn = psycopg2.connect(
"host='{}' port='{}' dbname='{}' user='{}' password='{}' sslmode='disable'".format(
self.config['pdns_postgresql_host'],
self.config['pdns_postgresql_port'],
self.config['pdns_postgresql_dbname'],
self.config['pdns_postgresql_user'],
self.config['pdns_postgresql_password']
)
)
sql_curs = self.sql_conn.cursor()
# Try to access the domains entry
mysql_curs.execute(
'SELECT * FROM domains WHERE name=%s',
sql_curs.execute(
"SELECT * FROM domains WHERE name=%s",
(network_domain,)
)
results = mysql_curs.fetchone()
results = sql_curs.fetchone()
# If we got back a result, don't try to add the domain to the DB
if results:
@ -184,21 +201,21 @@ class DNSNetworkInstance(object):
else:
write_domain = True
# Write the domain to the database
if write_domain:
mysql_curs.execute(
'INSERT INTO domains (name, type, account, notified_serial) VALUES (%s, "MASTER", "internal", 0)',
# Write the domain to the database if we're active
if self.aggregator.is_active and write_domain:
sql_curs.execute(
"INSERT INTO domains (name, type, account, notified_serial) VALUES (%s, 'MASTER', 'internal', 0)",
(network_domain,)
)
self.mysql_conn.commit()
self.sql_conn.commit()
mysql_curs.execute(
'SELECT id FROM domains WHERE name=%s',
sql_curs.execute(
"SELECT id FROM domains WHERE name=%s",
(network_domain,)
)
domain_id = mysql_curs.fetchone()
domain_id = sql_curs.fetchone()
mysql_curs.execute(
sql_curs.execute(
"""
INSERT INTO records (domain_id, name, content, type, ttl, prio) VALUES
(%s, %s, %s, %s, %s, %s)
@ -208,7 +225,7 @@ class DNSNetworkInstance(object):
ns_servers = [network_gateway, 'pvc-ns1.{}'.format(self.config['cluster_domain']), 'pvc-ns2.{}'.format(self.config['cluster_domain'])]
for ns_server in ns_servers:
mysql_curs.execute(
sql_curs.execute(
"""
INSERT INTO records (domain_id, name, content, type, ttl, prio) VALUES
(%s, %s, %s, %s, %s, %s)
@ -216,7 +233,9 @@ class DNSNetworkInstance(object):
(domain_id, network_domain, ns_server, 'NS', 86400, 0)
)
self.mysql_conn.commit()
self.sql_conn.commit()
self.sql_conn.close()
self.sql_conn = None
# Remove a deleted network from the aggregator database
def remove_network(self):
@ -229,50 +248,86 @@ class DNSNetworkInstance(object):
prefix='DNS aggregator',
state='o'
)
# Connect to the database
mysql_curs = self.mysql_conn.cursor()
mysql_curs.execute(
'SELECT id FROM domains WHERE name=%s',
self.sql_conn = psycopg2.connect(
"host='{}' port='{}' dbname='{}' user='{}' password='{}' sslmode='disable'".format(
self.config['pdns_postgresql_host'],
self.config['pdns_postgresql_port'],
self.config['pdns_postgresql_dbname'],
self.config['pdns_postgresql_user'],
self.config['pdns_postgresql_password']
)
)
sql_curs = self.sql_conn.cursor()
# Get the domain ID
sql_curs.execute(
"SELECT id FROM domains WHERE name=%s",
(network_domain,)
)
domain_id = mysql_curs.fetchone()
domain_id = sql_curs.fetchone()
mysql_curs.execute(
'DELETE FROM domains WHERE id=%s',
# Delete the domain from the database if we're active
if self.aggregator.is_active and domain_id:
sql_curs.execute(
"DELETE FROM domains WHERE id=%s",
(domain_id,)
)
mysql_curs.execute(
'DELETE FROM records WHERE domain_id=%s',
sql_curs.execute(
"DELETE FROM records WHERE domain_id=%s",
(domain_id,)
)
self.mysql_conn.commit()
self.sql_conn.commit()
self.sql_conn.close()
self.sql_conn = None
class AXFRDaemonInstance(object):
# Initialization function
def __init__(self, config, logger, mysql_conn, dns_networks):
self.config = config
self.logger = logger
self.mysql_conn = mysql_conn
self.dns_networks = dns_networks
def __init__(self, aggregator):
self.aggregator = aggregator
self.config = self.aggregator.config
self.logger = self.aggregator.logger
self.dns_networks = self.aggregator.dns_networks
self.thread_stopper = threading.Event()
self.thread = None
self.sql_conn = None
def update_networks(self, dns_networks):
self.dns_networks = dns_networks
def start(self):
# Create the thread
self.thread_stopper.clear()
self.thread = threading.Thread(target=self.run, args=(), kwargs={})
# Start a local instance of the SQL connection
# Trying to use the instance from the main DNS Aggregator can result in connection failures
# after the leader transitions
self.sql_conn = psycopg2.connect(
"host='{}' port='{}' dbname='{}' user='{}' password='{}' sslmode='disable'".format(
self.config['pdns_postgresql_host'],
self.config['pdns_postgresql_port'],
self.config['pdns_postgresql_dbname'],
self.config['pdns_postgresql_user'],
self.config['pdns_postgresql_password']
)
)
# Start the thread
self.thread.start()
def stop(self):
self.thread_stopper.set()
if self.sql_conn:
self.sql_conn.close()
self.sql_conn = None
def run(self):
# Wait for all the DNSMASQ instances to actually start
time.sleep(2)
time.sleep(5)
while not self.thread_stopper.is_set():
# We do this for each network
@ -317,17 +372,17 @@ class AXFRDaemonInstance(object):
#
# Get the current zone from the database
#
mysql_curs = self.mysql_conn.cursor()
mysql_curs.execute(
'SELECT id FROM domains WHERE name=%s',
sql_curs = self.sql_conn.cursor()
sql_curs.execute(
"SELECT id FROM domains WHERE name=%s",
(domain,)
)
domain_id = mysql_curs.fetchone()
mysql_curs.execute(
'SELECT * FROM records WHERE domain_id=%s',
domain_id = sql_curs.fetchone()
sql_curs.execute(
"SELECT * FROM records WHERE domain_id=%s",
(domain_id,)
)
results = list(mysql_curs.fetchall())
results = list(sql_curs.fetchall())
# Fix the formatting because it's useless for comparison
# reference: ((10, 28, 'testnet01.i.bonilan.net', 'SOA', 'nsX.pvc.local root.pvc.local 1 10800 1800 86400 86400', 86400, 0, None, 0, None, 1), etc.)
@ -376,9 +431,9 @@ class AXFRDaemonInstance(object):
if len(remove_records) > 0:
# Remove the invalid old records
for record_id in remove_records:
mysql_curs = self.mysql_conn.cursor()
mysql_curs.execute(
'DELETE FROM records WHERE id=%s',
sql_curs = self.sql_conn.cursor()
sql_curs.execute(
"DELETE FROM records WHERE id=%s",
(record_id,)
)
changed = True
@ -392,30 +447,30 @@ class AXFRDaemonInstance(object):
rttl = record[1]
rtype = record[3]
rdata = record[4]
mysql_curs = self.mysql_conn.cursor()
mysql_curs.execute(
'INSERT INTO records (domain_id, name, ttl, type, prio, content) VALUES (%s, %s, %s, %s, %s, %s)',
sql_curs = self.sql_conn.cursor()
sql_curs.execute(
"INSERT INTO records (domain_id, name, ttl, type, prio, content) VALUES (%s, %s, %s, %s, %s, %s)",
(domain_id, rname, rttl, rtype, 0, rdata)
)
changed = True
if changed:
# Increase SOA serial
mysql_curs.execute(
'SELECT content FROM records WHERE domain_id=%s AND type="SOA"',
sql_curs.execute(
"SELECT content FROM records WHERE domain_id=%s AND type='SOA'",
(domain_id,)
)
soa_record = list(mysql_curs.fetchone())[0].split()
soa_record = list(sql_curs.fetchone())[0].split()
current_serial = int(soa_record[2])
new_serial = current_serial + 1
soa_record[2] = str(new_serial)
mysql_curs.execute(
'UPDATE records SET content=%s WHERE domain_id=%s AND type="SOA"',
sql_curs.execute(
"UPDATE records SET content=%s WHERE domain_id=%s AND type='SOA'",
(' '.join(soa_record), domain_id)
)
# Commit all the previous changes
self.mysql_conn.commit()
self.sql_conn.commit()
# Reload the domain
common.run_os_command(

View File

@ -174,11 +174,11 @@ def readConfig(pvcd_config_file, myhostname):
'upstream_floating_ip': o_config['pvc']['cluster']['networks']['upstream']['floating_ip'],
'upstream_network': o_config['pvc']['cluster']['networks']['upstream']['network'],
'upstream_gateway': o_config['pvc']['cluster']['networks']['upstream']['gateway'],
'pdns_mysql_host': o_config['pvc']['coordinator']['dns']['database']['host'],
'pdns_mysql_port': o_config['pvc']['coordinator']['dns']['database']['port'],
'pdns_mysql_dbname': o_config['pvc']['coordinator']['dns']['database']['name'],
'pdns_mysql_user': o_config['pvc']['coordinator']['dns']['database']['user'],
'pdns_mysql_password': o_config['pvc']['coordinator']['dns']['database']['pass'],
'pdns_postgresql_host': o_config['pvc']['coordinator']['dns']['database']['host'],
'pdns_postgresql_port': o_config['pvc']['coordinator']['dns']['database']['port'],
'pdns_postgresql_dbname': o_config['pvc']['coordinator']['dns']['database']['name'],
'pdns_postgresql_user': o_config['pvc']['coordinator']['dns']['database']['user'],
'pdns_postgresql_password': o_config['pvc']['coordinator']['dns']['database']['pass'],
'vni_dev': o_config['pvc']['system']['configuration']['networking']['devices']['cluster'],
'vni_dev_ip': o_config['pvc']['system']['configuration']['networking']['addresses']['cluster'],
'storage_dev': o_config['pvc']['system']['configuration']['networking']['devices']['storage'],
@ -399,8 +399,8 @@ if enable_hypervisor:
if enable_networking:
if config['daemon_mode'] == 'coordinator':
logger.out('Starting MariaDB daemon', state='i')
common.run_os_command('systemctl start mariadb.service')
logger.out('Starting Patroni daemon', state='i')
common.run_os_command('systemctl start patroni.service')
logger.out('Starting FRRouting daemon', state='i')
common.run_os_command('systemctl start frr.service')

View File

@ -269,7 +269,26 @@ class NodeInstance(object):
for network in self.d_network:
self.d_network[network].createGateways()
self.d_network[network].startDHCPServer()
time.sleep(0.5)
time.sleep(1)
# Force Patroni to switch to the local instance
self.logger.out('Setting Patroni leader to this node', state='i')
retcode, stdout, stderr = common.run_os_command(
"""
patronictl
-c /etc/patroni/config.yml
-d zookeeper://localhost:2181
switchover
--candidate {}
--force
pvcdns
""".format(self.name)
)
if stdout:
self.logger.out('Successfully switched Patroni leader\n{}'.format(stdout), state='o')
else:
self.logger.out('Failed to switch Patroni leader\n{}'.format(stderr), state='e')
time.sleep(1)
# Start the DNS aggregator instance
self.dns_aggregator.start_aggregator()
def createFloatingAddresses(self):