2018-10-15 21:09:40 -04:00
#!/usr/bin/env python3
# DNSAggregatorInstance.py - Class implementing a DNS aggregator and run by pvcd
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import os
import sys
import time
2018-11-18 15:36:54 -05:00
import threading
import dns . zone
import dns . query
2019-05-20 22:40:07 -04:00
import psycopg2
2018-10-15 21:09:40 -04:00
import pvcd . log as log
import pvcd . zkhandler as zkhandler
import pvcd . common as common
class DNSAggregatorInstance ( object ) :
# Initialization function
2018-11-18 15:36:54 -05:00
def __init__ ( self , zk_conn , config , logger ) :
2018-10-15 21:09:40 -04:00
self . zk_conn = zk_conn
self . config = config
self . logger = logger
2018-11-18 15:36:54 -05:00
self . dns_networks = dict ( )
self . is_active = False
2018-10-15 21:09:40 -04:00
2019-05-20 22:40:07 -04:00
self . dns_server_daemon = PowerDNSInstance ( self )
self . dns_axfr_daemon = AXFRDaemonInstance ( self )
2018-10-15 21:09:40 -04:00
2018-11-18 15:36:54 -05:00
# Start up the PowerDNS instance
def start_aggregator ( self ) :
2019-05-20 22:40:07 -04:00
# Restart the SQL connection
2018-11-18 15:36:54 -05:00
self . dns_server_daemon . start ( )
self . dns_axfr_daemon . start ( )
2019-05-20 22:40:07 -04:00
self . is_active = True
2018-10-15 21:09:40 -04:00
2018-11-18 15:36:54 -05:00
# Stop the PowerDNS instance
def stop_aggregator ( self ) :
2019-05-20 22:40:07 -04:00
self . is_active = False
2018-11-18 15:36:54 -05:00
self . dns_axfr_daemon . stop ( )
self . dns_server_daemon . stop ( )
2018-10-15 21:09:40 -04:00
2018-11-18 15:36:54 -05:00
def add_network ( self , network ) :
2019-05-20 22:40:07 -04:00
self . dns_networks [ network ] = DNSNetworkInstance ( self , network )
2018-11-18 15:36:54 -05:00
self . dns_networks [ network ] . add_network ( )
self . dns_axfr_daemon . update_networks ( self . dns_networks )
2018-10-15 21:09:40 -04:00
2018-11-18 15:36:54 -05:00
def remove_network ( self , network ) :
if self . dns_networks [ network ] :
self . dns_networks [ network ] . remove_network ( )
del self . dns_networks [ network ]
self . dns_axfr_daemon . update_networks ( self . dns_networks )
2018-10-15 21:09:40 -04:00
2018-11-18 15:36:54 -05:00
class PowerDNSInstance ( object ) :
# Initialization function
2019-05-20 22:40:07 -04:00
def __init__ ( self , aggregator ) :
self . aggregator = aggregator
self . config = self . aggregator . config
self . logger = self . aggregator . logger
2018-11-18 15:36:54 -05:00
self . dns_server_daemon = None
2018-10-15 21:09:40 -04:00
2018-11-18 15:36:54 -05:00
# Floating upstreams
self . vni_dev = self . config [ ' vni_dev ' ]
self . vni_ipaddr , self . vni_cidrnetmask = self . config [ ' vni_floating_ip ' ] . split ( ' / ' )
self . upstream_dev = self . config [ ' upstream_dev ' ]
self . upstream_ipaddr , self . upstream_cidrnetmask = self . config [ ' upstream_floating_ip ' ] . split ( ' / ' )
def start ( self ) :
2018-10-15 21:09:40 -04:00
self . logger . out (
' Starting PowerDNS zone aggregator ' ,
2019-05-20 22:40:07 -04:00
state = ' i '
2018-10-15 21:09:40 -04:00
)
# Define the PowerDNS config
dns_configuration = [
2018-10-17 00:23:43 -04:00
# Option # Explanation
2018-10-15 21:09:40 -04:00
' --no-config ' ,
2018-10-17 00:23:43 -04:00
' --daemon=no ' , # Start directly
' --guardian=yes ' , # Use a guardian
' --disable-syslog=yes ' , # Log only to stdout (which is then captured)
' --disable-axfr=no ' , # Allow AXFRs
' --allow-axfr-ips=0.0.0.0/0 ' , # Allow AXFRs to anywhere
2018-11-18 15:36:54 -05:00
# '--also-notify=10.101.0.60', # Notify upstreams
2018-10-17 00:23:43 -04:00
' --local-address= {} , {} ' . format ( self . vni_ipaddr , self . upstream_ipaddr ) ,
# Listen on floating IPs
2018-11-18 15:36:54 -05:00
' --local-port=10053 ' , # On port 10053
2018-10-17 00:23:43 -04:00
' --log-dns-details=on ' , # Log details
' --loglevel=3 ' , # Log info
' --master=yes ' , # Enable master mode
' --slave=yes ' , # Enable slave mode
' --slave-renotify=yes ' , # Renotify out for our slaved zones
' --version-string=powerdns ' , # Set the version string
' --default-soa-name=dns.pvc.local ' , # Override dnsmasq's invalid name
2018-10-15 21:09:40 -04:00
' --socket-dir= {} ' . format ( self . config [ ' pdns_dynamic_directory ' ] ) ,
2018-10-17 00:23:43 -04:00
# Standard socket directory
2019-05-20 22:40:07 -04:00
' --launch=gpgsql ' , # Use the PostgreSQL backend
' --gpgsql-host= {} ' . format ( self . config [ ' pdns_postgresql_host ' ] ) ,
# PostgreSQL instance
' --gpgsql-port= {} ' . format ( self . config [ ' pdns_postgresql_port ' ] ) ,
2018-11-18 15:36:54 -05:00
# Default port
2019-05-20 22:40:07 -04:00
' --gpgsql-dbname= {} ' . format ( self . config [ ' pdns_postgresql_dbname ' ] ) ,
2018-11-18 15:36:54 -05:00
# Database name
2019-05-20 22:40:07 -04:00
' --gpgsql-user= {} ' . format ( self . config [ ' pdns_postgresql_user ' ] ) ,
2018-11-18 15:36:54 -05:00
# User name
2019-05-20 22:40:07 -04:00
' --gpgsql-password= {} ' . format ( self . config [ ' pdns_postgresql_password ' ] ) ,
2018-11-18 15:36:54 -05:00
# User password
2019-05-20 22:40:07 -04:00
' --gpgsql-dnssec=no ' , # Do DNSSEC elsewhere
2018-10-15 21:09:40 -04:00
]
# Start the pdns process in a thread
self . dns_server_daemon = common . run_os_daemon (
' /usr/sbin/pdns_server {} ' . format (
' ' . join ( dns_configuration )
) ,
environment = None ,
logfile = ' {} /pdns-aggregator.log ' . format ( self . config [ ' pdns_log_directory ' ] )
)
2019-05-20 22:40:07 -04:00
if self . dns_server_daemon :
self . logger . out (
' Successfully started PowerDNS zone aggregator ' ,
state = ' o '
)
2019-06-25 22:31:04 -04:00
2018-10-15 21:09:40 -04:00
2018-11-18 15:36:54 -05:00
def stop ( self ) :
2018-10-15 21:09:40 -04:00
if self . dns_server_daemon :
self . logger . out (
' Stopping PowerDNS zone aggregator ' ,
2019-05-20 22:40:07 -04:00
state = ' i '
2018-10-15 21:09:40 -04:00
)
2018-10-17 20:05:22 -04:00
# Terminate, then kill
2018-10-15 21:09:40 -04:00
self . dns_server_daemon . signal ( ' term ' )
2018-10-17 20:05:22 -04:00
time . sleep ( 0.2 )
self . dns_server_daemon . signal ( ' kill ' )
2019-05-20 22:40:07 -04:00
self . logger . out (
' Successfully stopped PowerDNS zone aggregator ' ,
state = ' o '
)
2018-11-18 15:36:54 -05:00
class DNSNetworkInstance ( object ) :
# Initialization function
2019-05-20 22:40:07 -04:00
def __init__ ( self , aggregator , network ) :
self . aggregator = aggregator
self . config = self . aggregator . config
self . logger = self . aggregator . logger
self . sql_conn = None
2018-11-18 15:36:54 -05:00
self . network = network
# Add a new network to the aggregator database
def add_network ( self ) :
network_domain = self . network . domain
if self . network . ip4_gateway != ' None ' :
network_gateway = self . network . ip4_gateway
else :
network_gateway = self . network . ip6_gateway
self . logger . out (
' Adding entry for client domain {} ' . format (
network_domain
) ,
prefix = ' DNS aggregator ' ,
state = ' o '
)
# Connect to the database
2019-05-20 22:40:07 -04:00
self . sql_conn = psycopg2 . connect (
" host= ' {} ' port= ' {} ' dbname= ' {} ' user= ' {} ' password= ' {} ' sslmode= ' disable ' " . format (
self . config [ ' pdns_postgresql_host ' ] ,
self . config [ ' pdns_postgresql_port ' ] ,
self . config [ ' pdns_postgresql_dbname ' ] ,
self . config [ ' pdns_postgresql_user ' ] ,
self . config [ ' pdns_postgresql_password ' ]
)
)
sql_curs = self . sql_conn . cursor ( )
2018-11-18 15:36:54 -05:00
# Try to access the domains entry
2019-05-20 22:40:07 -04:00
sql_curs . execute (
" SELECT * FROM domains WHERE name= %s " ,
2018-11-18 15:36:54 -05:00
( network_domain , )
)
2019-05-20 22:40:07 -04:00
results = sql_curs . fetchone ( )
2018-11-18 15:36:54 -05:00
# If we got back a result, don't try to add the domain to the DB
if results :
write_domain = False
else :
write_domain = True
2019-05-20 22:40:07 -04:00
# Write the domain to the database if we're active
if self . aggregator . is_active and write_domain :
sql_curs . execute (
" INSERT INTO domains (name, type, account, notified_serial) VALUES ( %s , ' MASTER ' , ' internal ' , 0) " ,
2018-11-18 15:36:54 -05:00
( network_domain , )
)
2019-05-20 22:40:07 -04:00
self . sql_conn . commit ( )
2018-11-18 15:36:54 -05:00
2019-05-20 22:40:07 -04:00
sql_curs . execute (
" SELECT id FROM domains WHERE name= %s " ,
2018-11-18 15:36:54 -05:00
( network_domain , )
)
2019-05-20 22:40:07 -04:00
domain_id = sql_curs . fetchone ( )
2018-11-18 15:36:54 -05:00
2019-05-20 22:40:07 -04:00
sql_curs . execute (
2018-11-18 15:36:54 -05:00
"""
INSERT INTO records ( domain_id , name , content , type , ttl , prio ) VALUES
( % s , % s , % s , % s , % s , % s )
""" ,
( domain_id , network_domain , ' nsX. {d} root. {d} 1 10800 1800 86400 86400 ' . format ( d = self . config [ ' cluster_domain ' ] ) , ' SOA ' , 86400 , 0 )
)
ns_servers = [ network_gateway , ' pvc-ns1. {} ' . format ( self . config [ ' cluster_domain ' ] ) , ' pvc-ns2. {} ' . format ( self . config [ ' cluster_domain ' ] ) ]
for ns_server in ns_servers :
2019-05-20 22:40:07 -04:00
sql_curs . execute (
2018-11-18 15:36:54 -05:00
" " "
INSERT INTO records ( domain_id , name , content , type , ttl , prio ) VALUES
( % s , % s , % s , % s , % s , % s )
""" ,
( domain_id , network_domain , ns_server , ' NS ' , 86400 , 0 )
)
2019-06-25 22:31:04 -04:00
2019-05-20 22:40:07 -04:00
self . sql_conn . commit ( )
self . sql_conn . close ( )
self . sql_conn = None
2018-11-18 15:36:54 -05:00
# Remove a deleted network from the aggregator database
def remove_network ( self ) :
network_domain = self . network . domain
self . logger . out (
' Removing entry for client domain {} ' . format (
network_domain
) ,
prefix = ' DNS aggregator ' ,
state = ' o '
)
2019-05-20 22:40:07 -04:00
2018-11-18 15:36:54 -05:00
# Connect to the database
2019-05-20 22:40:07 -04:00
self . sql_conn = psycopg2 . connect (
" host= ' {} ' port= ' {} ' dbname= ' {} ' user= ' {} ' password= ' {} ' sslmode= ' disable ' " . format (
self . config [ ' pdns_postgresql_host ' ] ,
self . config [ ' pdns_postgresql_port ' ] ,
self . config [ ' pdns_postgresql_dbname ' ] ,
self . config [ ' pdns_postgresql_user ' ] ,
self . config [ ' pdns_postgresql_password ' ]
)
2018-11-18 15:36:54 -05:00
)
2019-05-20 22:40:07 -04:00
sql_curs = self . sql_conn . cursor ( )
2018-11-18 15:36:54 -05:00
2019-05-20 22:40:07 -04:00
# Get the domain ID
sql_curs . execute (
" SELECT id FROM domains WHERE name= %s " ,
( network_domain , )
2018-11-18 15:36:54 -05:00
)
2019-05-20 22:40:07 -04:00
domain_id = sql_curs . fetchone ( )
# Delete the domain from the database if we're active
if self . aggregator . is_active and domain_id :
sql_curs . execute (
" DELETE FROM domains WHERE id= %s " ,
( domain_id , )
)
sql_curs . execute (
" DELETE FROM records WHERE domain_id= %s " ,
( domain_id , )
)
2018-11-18 15:36:54 -05:00
2019-05-20 22:40:07 -04:00
self . sql_conn . commit ( )
self . sql_conn . close ( )
self . sql_conn = None
2018-11-18 15:36:54 -05:00
class AXFRDaemonInstance ( object ) :
# Initialization function
2019-05-20 22:40:07 -04:00
def __init__ ( self , aggregator ) :
self . aggregator = aggregator
self . config = self . aggregator . config
self . logger = self . aggregator . logger
self . dns_networks = self . aggregator . dns_networks
2018-11-18 15:36:54 -05:00
self . thread_stopper = threading . Event ( )
self . thread = None
2019-05-20 22:40:07 -04:00
self . sql_conn = None
2018-11-18 15:36:54 -05:00
def update_networks ( self , dns_networks ) :
self . dns_networks = dns_networks
def start ( self ) :
2019-05-20 22:40:07 -04:00
# Create the thread
2018-11-18 15:36:54 -05:00
self . thread_stopper . clear ( )
self . thread = threading . Thread ( target = self . run , args = ( ) , kwargs = { } )
2019-05-20 22:40:07 -04:00
# Start a local instance of the SQL connection
# Trying to use the instance from the main DNS Aggregator can result in connection failures
# after the leader transitions
self . sql_conn = psycopg2 . connect (
" host= ' {} ' port= ' {} ' dbname= ' {} ' user= ' {} ' password= ' {} ' sslmode= ' disable ' " . format (
self . config [ ' pdns_postgresql_host ' ] ,
self . config [ ' pdns_postgresql_port ' ] ,
self . config [ ' pdns_postgresql_dbname ' ] ,
self . config [ ' pdns_postgresql_user ' ] ,
self . config [ ' pdns_postgresql_password ' ]
)
)
2019-06-25 22:31:04 -04:00
2019-05-20 22:40:07 -04:00
# Start the thread
2018-11-18 15:36:54 -05:00
self . thread . start ( )
def stop ( self ) :
self . thread_stopper . set ( )
2019-05-20 22:40:07 -04:00
if self . sql_conn :
self . sql_conn . close ( )
self . sql_conn = None
2018-11-18 15:36:54 -05:00
def run ( self ) :
# Wait for all the DNSMASQ instances to actually start
2019-05-20 22:40:07 -04:00
time . sleep ( 5 )
2018-11-18 15:36:54 -05:00
while not self . thread_stopper . is_set ( ) :
# We do this for each network
for network , instance in self . dns_networks . items ( ) :
zone_modified = False
# Set up our basic variables
domain = network . domain
if network . ip4_gateway != ' None ' :
dnsmasq_ip = network . ip4_gateway
else :
dnsmasq_ip = network . ip6_gateway
2018-12-05 21:38:28 -05:00
#
2018-11-18 15:36:54 -05:00
# Get an AXFR from the dnsmasq instance and list of records
2018-12-05 21:38:28 -05:00
#
2018-11-18 15:36:54 -05:00
try :
2018-11-27 22:18:59 -05:00
axfr = dns . query . xfr ( dnsmasq_ip , domain , lifetime = 5.0 )
z = dns . zone . from_xfr ( axfr )
2018-11-18 15:36:54 -05:00
records_raw = [ z [ n ] . to_text ( n ) for n in z . nodes . keys ( ) ]
2018-12-05 23:54:54 -05:00
except Exception as e :
2018-11-27 22:18:59 -05:00
print ( ' {} {} ( {} ) ' . format ( e , dnsmasq_ip , domain ) )
2018-11-18 15:36:54 -05:00
continue
# Fix the formatting because it's useless
# reference: ['@ 600 IN SOA . . 4 1200 180 1209600 600\n@ 600 IN NS .', 'test3 600 IN A 10.1.1.203\ntest3 600 IN AAAA 2001:b23e:1113:0:5054:ff:fe5c:f131', etc.]
# We don't really care about dnsmasq's terrible SOA or NS records which are in [0]
string_records = ' \n ' . join ( records_raw [ 1 : ] )
# Split into individual records
records_new = list ( )
for element in string_records . split ( ' \n ' ) :
if element :
record = element . split ( )
# Handle space-containing data elements
if domain not in record [ 0 ] :
name = ' {} . {} ' . format ( record [ 0 ] , domain )
else :
name = record [ 0 ]
entry = ' {} {} IN {} {} ' . format ( name , record [ 1 ] , record [ 3 ] , ' ' . join ( record [ 4 : ] ) )
records_new . append ( entry )
2018-12-05 21:38:28 -05:00
#
2018-11-18 15:36:54 -05:00
# Get the current zone from the database
2018-12-05 21:38:28 -05:00
#
2019-05-20 22:40:07 -04:00
sql_curs = self . sql_conn . cursor ( )
sql_curs . execute (
" SELECT id FROM domains WHERE name= %s " ,
2018-11-18 15:36:54 -05:00
( domain , )
)
2019-05-20 22:40:07 -04:00
domain_id = sql_curs . fetchone ( )
sql_curs . execute (
" SELECT * FROM records WHERE domain_id= %s " ,
2018-11-18 15:36:54 -05:00
( domain_id , )
)
2019-05-20 22:40:07 -04:00
results = list ( sql_curs . fetchall ( ) )
2018-11-18 15:36:54 -05:00
# Fix the formatting because it's useless for comparison
# reference: ((10, 28, 'testnet01.i.bonilan.net', 'SOA', 'nsX.pvc.local root.pvc.local 1 10800 1800 86400 86400', 86400, 0, None, 0, None, 1), etc.)
records_old = list ( )
records_old_ids = list ( )
for record in results :
2018-12-05 23:54:54 -05:00
# Skip the non-A
if record [ 3 ] != ' A ' or record [ 3 ] != ' AAAA ' :
2018-11-18 15:36:54 -05:00
continue
# Assemble a list element in the same format as the AXFR data
entry = ' {} {} IN {} {} ' . format ( record [ 2 ] , record [ 5 ] , record [ 3 ] , record [ 4 ] )
records_old . append ( entry )
records_old_ids . append ( record [ 0 ] )
records_new . sort ( )
records_old . sort ( )
# Find the differences between the lists
# Basic check one: are they completely equal
if records_new != records_old :
# Get set elements
in_new = set ( records_new )
in_old = set ( records_old )
in_new_not_in_old = in_new - in_old
in_old_not_in_new = in_old - in_new
# Go through the old list
remove_records = list ( ) # list of database IDs
for i in range ( len ( records_old ) ) :
record_id = records_old_ids [ i ]
record = records_old [ i ]
splitrecord = records_old [ i ] . split ( )
# If the record is not in the new list, remove it
if record in in_old_not_in_new :
remove_records . append ( record_id )
# Go through the new elements
for newrecord in in_new_not_in_old :
splitnewrecord = newrecord . split ( )
2018-11-18 17:29:35 -05:00
# If there's a name and type match with different content, remove the old one
if splitrecord [ 0 ] == splitnewrecord [ 0 ] and splitrecord [ 3 ] == splitnewrecord [ 3 ] :
2018-11-18 15:36:54 -05:00
remove_records . append ( record_id )
changed = False
if len ( remove_records ) > 0 :
# Remove the invalid old records
for record_id in remove_records :
2019-05-20 22:40:07 -04:00
sql_curs = self . sql_conn . cursor ( )
sql_curs . execute (
" DELETE FROM records WHERE id= %s " ,
2018-11-18 15:36:54 -05:00
( record_id , )
)
changed = True
if len ( in_new_not_in_old ) > 0 :
# Add the new records
for record in in_new_not_in_old :
# [NAME, TTL, 'IN', TYPE, DATA]
record = record . split ( )
rname = record [ 0 ]
rttl = record [ 1 ]
rtype = record [ 3 ]
rdata = record [ 4 ]
2019-05-20 22:40:07 -04:00
sql_curs = self . sql_conn . cursor ( )
sql_curs . execute (
" INSERT INTO records (domain_id, name, ttl, type, prio, content) VALUES ( %s , %s , %s , %s , %s , %s ) " ,
2018-11-18 15:36:54 -05:00
( domain_id , rname , rttl , rtype , 0 , rdata )
)
changed = True
if changed :
# Increase SOA serial
2019-05-20 22:40:07 -04:00
sql_curs . execute (
" SELECT content FROM records WHERE domain_id= %s AND type= ' SOA ' " ,
2018-11-18 15:36:54 -05:00
( domain_id , )
)
2019-05-20 22:40:07 -04:00
soa_record = list ( sql_curs . fetchone ( ) ) [ 0 ] . split ( )
2018-11-18 15:36:54 -05:00
current_serial = int ( soa_record [ 2 ] )
new_serial = current_serial + 1
soa_record [ 2 ] = str ( new_serial )
2019-05-20 22:40:07 -04:00
sql_curs . execute (
" UPDATE records SET content= %s WHERE domain_id= %s AND type= ' SOA ' " ,
2018-11-18 15:36:54 -05:00
( ' ' . join ( soa_record ) , domain_id )
)
# Commit all the previous changes
2019-05-20 22:40:07 -04:00
self . sql_conn . commit ( )
2019-06-25 22:31:04 -04:00
2018-11-18 15:36:54 -05:00
# Reload the domain
common . run_os_command (
' /usr/bin/pdns_control --socket-dir= {} reload {} ' . format (
self . config [ ' pdns_dynamic_directory ' ] ,
domain
) ,
background = False
)
# Wait for 10 seconds
time . sleep ( 10 )