diff --git a/debian/control b/debian/control index 540d7227..5901ee07 100644 --- a/debian/control +++ b/debian/control @@ -8,7 +8,7 @@ X-Python3-Version: >= 3.2 Package: pvc-daemon Architecture: all -Depends: python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, ipmitool, libvirt-daemon-system, arping, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-sqlite3 +Depends: python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-mysqldb, python3-dnspython, ipmitool, libvirt-daemon-system, arping, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-mysql Suggests: pvc-client-cli Description: Parallel Virtual Cluster virtualization daemon (Python 3) The Parallel Virtual Cluster provides a management solution for QEMU/KVM virtual clusters. diff --git a/node-daemon/pvcd.conf.sample b/node-daemon/pvcd.conf.sample index d428db17..d6646ddb 100644 --- a/node-daemon/pvcd.conf.sample +++ b/node-daemon/pvcd.conf.sample @@ -7,14 +7,16 @@ # nodename section and coordinators list, can be used as-is on a Debian system. # # The following values are required for each node or in a default section: -# coordinators: A CSV list of the short hostnames of the coordinator nodes; these nodes become +# coordinators: a CSV list of the short hostnames of the coordinator nodes; these nodes become # members of the Zookeeper cluster, can act as routers, and perform additional # special functions in a cluster; ideally there are 3 coordinators, though 5 # coordinators are supported -# dynamic_directory: The ramdisk directory for PVC to store its dynamic configurations, +# cluster_domain: the node cluster domain, set during bootstrap +# storage_domain: the node storage domain, set during bootstrap +# dynamic_directory: the ramdisk directory for PVC to store its dynamic configurations, # usually under /run or /var/run -# log_directory: The logging directory, usually under /var/log -# file_logging = Whether to log daemon to a file (pvc.log under log_directory) in addition to +# log_directory: the logging directory, usually under /var/log +# file_logging = whether to log daemon to a file (pvc.log under log_directory) in addition to # normal stdout printing # keepalive_interval: the interval between keepalives and for dead node timeout (defaults to 5) # fence_intervals: the number of keepalive_intervals without Zookeeper contact before this node @@ -31,6 +33,15 @@ # flush action; can be "mem", "load", "vcpus", or "vms" (defaults # to "mem"); the best choice based on this field is selected for # each VM to be migrated +# pdns_mysql_host: the host address (usually "localhost") of the PowerDNS zone aggregator +# backend database +# pdns_mysql_port: the port (usually "3306") of the PowerDNS zone aggregator backend database +# pdns_mysql_dbname: the database name (usually "pvcdns") of the PowerDNS zone aggregator +# backend database +# pdns_mysql_user: the client username (usually "pvcdns") of the PowerDNS zone aggregator +# backend database +# pdns_mysql_password: the client user password (randomly generated at cluster bootstrap) +# of the PowerDNS zone aggregator backend database # vni_floating_ip: the IP address (in CIDR format) for the floating IP on the VNI network, # used to provide a consistent view of the dynamic primary node to other # machines in the VNI network, e.g. for slaving DNS or sending in routes. @@ -56,6 +67,8 @@ [default] coordinators = pvc-hv1,pvc-hv2,pvc-hv3 +cluster_domain = i.bonilan.net +storage_domain = sx.bonilan.net dynamic_directory = /run/pvc log_directory = /var/log/pvc file_logging = True @@ -65,6 +78,11 @@ suicide_intervals = 0 successful_fence = migrate failed_fence = none migration_target_selector = mem +pdns_mysql_host = localhost +pdns_mysql_port = 3306 +pdns_mysql_dbname = pvcdns +pdns_mysql_user = pvcdns +pdns_mysql_password = pvcdns vni_floating_ip = 10.255.0.254/24 upstream_floating_ip = 10.101.0.30/24 diff --git a/node-daemon/pvcd/DNSAggregatorInstance.py b/node-daemon/pvcd/DNSAggregatorInstance.py index 85d31abb..85d095fb 100644 --- a/node-daemon/pvcd/DNSAggregatorInstance.py +++ b/node-daemon/pvcd/DNSAggregatorInstance.py @@ -23,22 +23,63 @@ import os import sys import time -import sqlite3 +import threading +import dns.zone +import dns.query +import MySQLdb as mysqldb import pvcd.log as log import pvcd.zkhandler as zkhandler import pvcd.common as common -# A barebones PowerDNS sqlite schema (relative to the active dir) -sql_schema_file = 'pvcd/powerdns-aggregator-schema.sql' - class DNSAggregatorInstance(object): # Initialization function - def __init__(self, zk_conn, config, logger, d_network): + def __init__(self, zk_conn, config, logger): self.zk_conn = zk_conn self.config = config self.logger = logger - self.d_network = d_network + self.dns_networks = dict() + self.is_active = False + + self.mysql_conn = mysqldb.connect( + host=self.config['pdns_mysql_host'], + port=int(self.config['pdns_mysql_port']), + user=self.config['pdns_mysql_user'], + passwd=self.config['pdns_mysql_password'], + db=self.config['pdns_mysql_dbname'] + ) + self.dns_server_daemon = PowerDNSInstance(self.config, self.logger, self.dns_networks) + self.dns_axfr_daemon = AXFRDaemonInstance(self.config, self.logger, self.mysql_conn, self.dns_networks) + + # Start up the PowerDNS instance + def start_aggregator(self): + self.dns_server_daemon.start() + self.dns_axfr_daemon.start() + + # Stop the PowerDNS instance + def stop_aggregator(self): + self.dns_axfr_daemon.stop() + self.dns_server_daemon.stop() + + def add_network(self, network): + print("adding {}".format(network)) + self.dns_networks[network] = DNSNetworkInstance(self.config, self.logger, self.mysql_conn, network) + self.dns_networks[network].add_network() + self.dns_axfr_daemon.update_networks(self.dns_networks) + + def remove_network(self, network): + print("removing {}".format(network)) + if self.dns_networks[network]: + self.dns_networks[network].remove_network() + del self.dns_networks[network] + self.dns_axfr_daemon.update_networks(self.dns_networks) + +class PowerDNSInstance(object): + # Initialization function + def __init__(self, config, logger, dns_networks): + self.config = config + self.logger = logger + self.dns_server_daemon = None # Floating upstreams self.vni_dev = self.config['vni_dev'] @@ -46,113 +87,7 @@ class DNSAggregatorInstance(object): self.upstream_dev = self.config['upstream_dev'] self.upstream_ipaddr, self.upstream_cidrnetmask = self.config['upstream_floating_ip'].split('/') - self.database_file = self.config['pdns_dynamic_directory'] + '/pdns-aggregator.sqlite3' - - self.dns_server_daemon = None - - self.prepare_db() - - # Preparation function - def prepare_db(self): - # Connect to the database - sql_conn = sqlite3.connect(self.database_file) - sql_curs = sql_conn.cursor() - - # Try to access the domains table - try: - sql_curs.execute( - 'select * from domains' - ) - write_schema = False - # The table doesn't exist, so we should create it - except sqlite3.OperationalError: - write_schema = True - - if write_schema: - with open('{}/{}'.format(os.getcwd(), sql_schema_file), 'r') as schema_file: - schema = ''.join(schema_file.readlines()) - sql_curs.executescript(schema) - - sql_conn.close() - - # Add a new network to the aggregator database - def add_client_network(self, network): - network_domain = self.d_network[network].domain - if self.d_network[network].ip4_gateway != 'None': - network_gateway = self.d_network[network].ip4_gateway - else: - network_gateway = self.d_network[network].ip6_gateway - - self.logger.out( - 'Adding entry for client domain {}'.format( - network_domain - ), - prefix='DNS aggregator', - state='o' - ) - # Connect to the database - sql_conn = sqlite3.connect(self.database_file) - sql_curs = sql_conn.cursor() - # Try to access the domains entry - sql_curs.execute( - 'select * from domains where name=?', - (network_domain,) - ) - results = sql_curs.fetchall() - if results: - write_domain = False - else: - write_domain = True - - if write_domain: - sql_curs.execute( - 'insert into domains (name, master, type, account) values (?, ?, "SLAVE", "internal")', - (network_domain, network_gateway) - ) - sql_conn.commit() - - sql_conn.close() - - # Remove a deleted network from the aggregator database - def remove_client_network(self, network): - network_domain = self.d_network[network].domain - - self.logger.out( - 'Removing entry for client domain {}'.format( - network_domain - ), - prefix='DNS aggregator', - state='o' - ) - # Connect to the database - sql_conn = sqlite3.connect(self.database_file) - sql_curs = sql_conn.cursor() - sql_curs.execute( - 'delete from domains where name=?', - (network_domain,) - ) - sql_conn.commit() - sql_conn.close() - - # Force AXFR - def get_axfr(self, network): - self.logger.out( - 'Perform AXFR for {}'.format( - self.d_network[network].domain - ), - prefix='DNS aggregator', - state='o' - ) - common.run_os_command( - '/usr/bin/pdns_control --socket-dir={} retrieve {}'.format( - self.config['pdns_dynamic_directory'], - self.d_network[network].domain - ), - background=True - ) - - # Start up the PowerDNS instance - def start_aggregator(self): + def start(self): self.logger.out( 'Starting PowerDNS zone aggregator', state='o' @@ -166,10 +101,10 @@ class DNSAggregatorInstance(object): '--disable-syslog=yes', # Log only to stdout (which is then captured) '--disable-axfr=no', # Allow AXFRs '--allow-axfr-ips=0.0.0.0/0', # Allow AXFRs to anywhere - '--also-notify=10.101.0.60', # Notify upstreams +# '--also-notify=10.101.0.60', # Notify upstreams '--local-address={},{}'.format(self.vni_ipaddr, self.upstream_ipaddr), # Listen on floating IPs - '--local-port=10053', # On port 10053 + '--local-port=10053', # On port 10053 '--log-dns-details=on', # Log details '--loglevel=3', # Log info '--master=yes', # Enable master mode @@ -179,10 +114,18 @@ class DNSAggregatorInstance(object): '--default-soa-name=dns.pvc.local', # Override dnsmasq's invalid name '--socket-dir={}'.format(self.config['pdns_dynamic_directory']), # Standard socket directory - '--launch=gsqlite3', # Use the sqlite3 backend - '--gsqlite3-database={}'.format(self.database_file), - # Database file - '--gsqlite3-dnssec=no' # Don't do DNSSEC here + '--launch=gmysql', # Use the MySQL backend + '--gmysql-host={}'.format(self.config['pdns_mysql_host']), + # MySQL instance + '--gmysql-port={}'.format(self.config['pdns_mysql_port']), + # Default port + '--gmysql-dbname={}'.format(self.config['pdns_mysql_dbname']), + # Database name + '--gmysql-user={}'.format(self.config['pdns_mysql_user']), + # User name + '--gmysql-password={}'.format(self.config['pdns_mysql_password']), + # User password + '--gmysql-dnssec=no', # Do DNSSEC elsewhere ] # Start the pdns process in a thread self.dns_server_daemon = common.run_os_daemon( @@ -193,8 +136,7 @@ class DNSAggregatorInstance(object): logfile='{}/pdns-aggregator.log'.format(self.config['pdns_log_directory']) ) - # Stop the PowerDNS instance - def stop_aggregator(self): + def stop(self): if self.dns_server_daemon: self.logger.out( 'Stopping PowerDNS zone aggregator', @@ -204,3 +146,289 @@ class DNSAggregatorInstance(object): self.dns_server_daemon.signal('term') time.sleep(0.2) self.dns_server_daemon.signal('kill') + +class DNSNetworkInstance(object): + # Initialization function + def __init__(self, config, logger, mysql_conn, network): + self.config = config + self.logger = logger + self.mysql_conn = mysql_conn + self.network = network + + # Add a new network to the aggregator database + def add_network(self): + network_domain = self.network.domain + if self.network.ip4_gateway != 'None': + network_gateway = self.network.ip4_gateway + else: + network_gateway = self.network.ip6_gateway + + self.logger.out( + 'Adding entry for client domain {}'.format( + network_domain + ), + prefix='DNS aggregator', + state='o' + ) + + # Connect to the database + mysql_curs = self.mysql_conn.cursor() + # Try to access the domains entry + mysql_curs.execute( + 'SELECT * FROM domains WHERE name=%s', + (network_domain,) + ) + results = mysql_curs.fetchone() + + # If we got back a result, don't try to add the domain to the DB + if results: + write_domain = False + else: + write_domain = True + + # Write the domain to the database + if write_domain: + mysql_curs.execute( + 'INSERT INTO domains (name, type, account, notified_serial) VALUES (%s, "MASTER", "internal", 0)', + (network_domain,) + ) + self.mysql_conn.commit() + + mysql_curs.execute( + 'SELECT id FROM domains WHERE name=%s', + (network_domain,) + ) + domain_id = mysql_curs.fetchone() + + mysql_curs.execute( + """ + INSERT INTO records (domain_id, name, content, type, ttl, prio) VALUES + (%s, %s, %s, %s, %s, %s) + """, + (domain_id, network_domain, 'nsX.{d} root.{d} 1 10800 1800 86400 86400'.format(d=self.config['cluster_domain']), 'SOA', 86400, 0) + ) + + ns_servers = [network_gateway, 'pvc-ns1.{}'.format(self.config['cluster_domain']), 'pvc-ns2.{}'.format(self.config['cluster_domain'])] + for ns_server in ns_servers: + mysql_curs.execute( + """ + INSERT INTO records (domain_id, name, content, type, ttl, prio) VALUES + (%s, %s, %s, %s, %s, %s) + """, + (domain_id, network_domain, ns_server, 'NS', 86400, 0) + ) + + self.mysql_conn.commit() + + # Remove a deleted network from the aggregator database + def remove_network(self): + network_domain = self.network.domain + + self.logger.out( + 'Removing entry for client domain {}'.format( + network_domain + ), + prefix='DNS aggregator', + state='o' + ) + # Connect to the database + mysql_curs = self.mysql_conn.cursor() + mysql_curs.execute( + 'SELECT id FROM domains WHERE name=%s', + (network_domain,) + ) + domain_id = mysql_curs.fetchone() + + mysql_curs.execute( + 'DELETE FROM domains WHERE id=%s', + (domain_id,) + ) + mysql_curs.execute( + 'DELETE FROM records WHERE domain_id=%s', + (domain_id,) + ) + + self.mysql_conn.commit() + + +class AXFRDaemonInstance(object): + # Initialization function + def __init__(self, config, logger, mysql_conn, dns_networks): + self.config = config + self.logger = logger + self.mysql_conn = mysql_conn + self.dns_networks = dns_networks + self.thread_stopper = threading.Event() + self.thread = None + + def update_networks(self, dns_networks): + self.dns_networks = dns_networks + + def start(self): + self.thread_stopper.clear() + self.thread = threading.Thread(target=self.run, args=(), kwargs={}) + self.thread.start() + + def stop(self): + self.thread_stopper.set() + + def run(self): + # Wait for all the DNSMASQ instances to actually start + time.sleep(2) + + while not self.thread_stopper.is_set(): + # We do this for each network + for network, instance in self.dns_networks.items(): + zone_modified = False + + # Set up our basic variables + domain = network.domain + if network.ip4_gateway != 'None': + dnsmasq_ip = network.ip4_gateway + else: + dnsmasq_ip = network.ip6_gateway + + # Get an AXFR from the dnsmasq instance and list of records + print('{} {}'.format(dnsmasq_ip, domain)) + try: + z = dns.zone.from_xfr(dns.query.xfr(dnsmasq_ip, domain, lifetime=5.0)) + records_raw = [z[n].to_text(n) for n in z.nodes.keys()] + except OSError as e: + print(e) + continue + + # Fix the formatting because it's useless + # reference: ['@ 600 IN SOA . . 4 1200 180 1209600 600\n@ 600 IN NS .', 'test3 600 IN A 10.1.1.203\ntest3 600 IN AAAA 2001:b23e:1113:0:5054:ff:fe5c:f131', etc.] + # We don't really care about dnsmasq's terrible SOA or NS records which are in [0] + string_records = '\n'.join(records_raw[1:]) + # Split into individual records + records_new = list() + for element in string_records.split('\n'): + if element: + record = element.split() + # Handle space-containing data elements + if domain not in record[0]: + name = '{}.{}'.format(record[0], domain) + else: + name = record[0] + entry = '{} {} IN {} {}'.format(name, record[1], record[3], ' '.join(record[4:])) + records_new.append(entry) + + # Get the current zone from the database + mysql_curs = self.mysql_conn.cursor() + mysql_curs.execute( + 'SELECT id FROM domains WHERE name=%s', + (domain,) + ) + domain_id = mysql_curs.fetchone() + mysql_curs.execute( + 'SELECT * FROM records WHERE domain_id=%s', + (domain_id,) + ) + results = list(mysql_curs.fetchall()) + + # Fix the formatting because it's useless for comparison + # reference: ((10, 28, 'testnet01.i.bonilan.net', 'SOA', 'nsX.pvc.local root.pvc.local 1 10800 1800 86400 86400', 86400, 0, None, 0, None, 1), etc.) + records_old = list() + records_old_ids = list() + for record in results: + # Skip the SOA and NS records + if record[3] == 'SOA' or record[3] == 'NS': + continue + # Assemble a list element in the same format as the AXFR data + entry = '{} {} IN {} {}'.format(record[2], record[5], record[3], record[4]) + records_old.append(entry) + records_old_ids.append(record[0]) + + records_new.sort() + records_old.sort() + + print('New: {}'.format(records_new)) + print('Old: {}'.format(records_old)) + + # Find the differences between the lists + # Basic check one: are they completely equal + if records_new != records_old: + # Get set elements + in_new = set(records_new) + in_old = set(records_old) + in_new_not_in_old = in_new - in_old + in_old_not_in_new = in_old - in_new + + # Go through the old list + remove_records = list() # list of database IDs + for i in range(len(records_old)): + record_id = records_old_ids[i] + record = records_old[i] + splitrecord = records_old[i].split() + + # If the record is not in the new list, remove it + if record in in_old_not_in_new: + remove_records.append(record_id) + + # Go through the new elements + for newrecord in in_new_not_in_old: + splitnewrecord = newrecord.split() + # If there's a name match with different content, remove the old one + if splitrecord[0] == splitnewrecord[0]: + remove_records.append(record_id) + + changed = False + if len(remove_records) > 0: + print(remove_records) + # Remove the invalid old records + for record_id in remove_records: + mysql_curs = self.mysql_conn.cursor() + mysql_curs.execute( + 'DELETE FROM records WHERE id=%s', + (record_id,) + ) + changed = True + + if len(in_new_not_in_old) > 0: + print(in_new_not_in_old) + # Add the new records + for record in in_new_not_in_old: + # [NAME, TTL, 'IN', TYPE, DATA] + record = record.split() + rname = record[0] + rttl = record[1] + rtype = record[3] + rdata = record[4] + mysql_curs = self.mysql_conn.cursor() + mysql_curs.execute( + 'INSERT INTO records (domain_id, name, ttl, type, prio, content) VALUES (%s, %s, %s, %s, %s, %s)', + (domain_id, rname, rttl, rtype, 0, rdata) + ) + changed = True + + if changed: + # Increase SOA serial + mysql_curs.execute( + 'SELECT content FROM records WHERE domain_id=%s AND type="SOA"', + (domain_id,) + ) + soa_record = list(mysql_curs.fetchone())[0].split() + print(soa_record) + current_serial = int(soa_record[2]) + new_serial = current_serial + 1 + soa_record[2] = str(new_serial) + mysql_curs.execute( + 'UPDATE records SET content=%s WHERE domain_id=%s AND type="SOA"', + (' '.join(soa_record), domain_id) + ) + + # Commit all the previous changes + self.mysql_conn.commit() + + # Reload the domain + common.run_os_command( + '/usr/bin/pdns_control --socket-dir={} reload {}'.format( + self.config['pdns_dynamic_directory'], + domain + ), + background=False + ) + + # Wait for 10 seconds + time.sleep(10) diff --git a/node-daemon/pvcd/Daemon.py b/node-daemon/pvcd/Daemon.py index 0aebbbdc..fb7d50f6 100644 --- a/node-daemon/pvcd/Daemon.py +++ b/node-daemon/pvcd/Daemon.py @@ -116,6 +116,8 @@ staticdata.append(subprocess.run(['uname', '-m'], stdout=subprocess.PIPE).stdout # Config values dictionary config_values = [ 'coordinators', + 'cluster_domain', + 'storage_domain', 'dynamic_directory', 'log_directory', 'file_logging', @@ -125,6 +127,11 @@ config_values = [ 'successful_fence', 'failed_fence', 'migration_target_selector', + 'pdns_mysql_host',# = 'localhost' + 'pdns_mysql_port',# = 3306 + 'pdns_mysql_dbname',# = 'pvcdns' + 'pdns_mysql_user',# = 'pvcdns' + 'pdns_mysql_password',# = 'pvcdns' 'vni_dev', 'vni_dev_ip', 'vni_floating_ip', @@ -531,7 +538,7 @@ pool_list = [] # Create an instance of the DNS Aggregator if we're a coordinator if config['daemon_mode'] == 'coordinator': - dns_aggregator = DNSAggregatorInstance.DNSAggregatorInstance(zk_conn, config, logger, d_network) + dns_aggregator = DNSAggregatorInstance.DNSAggregatorInstance(zk_conn, config, logger) else: dns_aggregator = None @@ -596,9 +603,10 @@ def update_networks(new_network_list): for network in new_network_list: if not network in network_list: d_network[network] = VXNetworkInstance.VXNetworkInstance(network, zk_conn, config, logger, this_node) + print(network) + dns_aggregator.add_network(d_network[network]) # Start primary functionality if this_node.router_state == 'primary': - dns_aggregator.add_client_network(network) d_network[network].createGateways() d_network[network].startDHCPServer() @@ -609,7 +617,7 @@ def update_networks(new_network_list): if this_node.router_state == 'primary': d_network[network].stopDHCPServer() d_network[network].removeGateways() - dns_aggregator.remove_client_network(network) + dns_aggregator.remove_network(d_network[network]) # Stop general functionality d_network[network].removeFirewall() d_network[network].removeNetwork() @@ -896,11 +904,6 @@ def update_zookeeper(): else: ceph_health_colour = logger.fmt_red - # DNS aggregator retransfer - if this_node.router_state == 'primary': - for network in d_network: - dns_aggregator.get_axfr(network) - # Set ceph health information in zookeeper (primary only) if this_node.router_state == 'primary': # Get status info diff --git a/node-daemon/pvcd/NodeInstance.py b/node-daemon/pvcd/NodeInstance.py index 58fe873c..12b12d3f 100644 --- a/node-daemon/pvcd/NodeInstance.py +++ b/node-daemon/pvcd/NodeInstance.py @@ -245,30 +245,22 @@ class NodeInstance(object): self.logger.out('Setting router {} to secondary state'.format(self.name), state='i') self.logger.out('Network list: {}'.format(', '.join(self.network_list))) time.sleep(1) + self.dns_aggregator.stop_aggregator() for network in self.d_network: self.d_network[network].stopDHCPServer() - self.d_network[network].removeGateway4Address() - self.d_network[network].removeGateway6Address() - self.dns_aggregator.remove_client_network(network) - self.dns_aggregator.stop_aggregator() + self.d_network[network].removeGateways() self.removeFloatingAddresses() def become_primary(self): self.logger.out('Setting router {} to primary state.'.format(self.name), state='i') self.logger.out('Network list: {}'.format(', '.join(self.network_list))) self.createFloatingAddresses() - self.dns_aggregator.start_aggregator() - time.sleep(0.5) # Start up the gateways and DHCP servers for network in self.d_network: - self.dns_aggregator.add_client_network(network) - self.d_network[network].createGateway4Address() - self.d_network[network].createGateway6Address() + self.d_network[network].createGateways() self.d_network[network].startDHCPServer() time.sleep(0.5) - # Handle AXFRs after to avoid slowdowns - for network in self.d_network: - self.dns_aggregator.get_axfr(network) + self.dns_aggregator.start_aggregator() def createFloatingAddresses(self): # VNI floating IP