#!/usr/bin/env crystal require "json" require "file_utils" require "option_parser" struct Lease include JSON::Serializable @[JSON::Field(key: "Address")] property address_bytes : Array(Int32) @[JSON::Field(key: "HardwareAddress")] property hardware_address : Array(Int32)? @[JSON::Field(key: "Hostname")] property hostname : String? @[JSON::Field(key: "ExpirationUSec")] property expiration_usec : Int64? def address : String address_bytes.join('.') end end struct DHCPServer include JSON::Serializable @[JSON::Field(key: "Leases")] property leases : Array(Lease)? end struct NetworkStatus include JSON::Serializable @[JSON::Field(key: "DHCPServer")] property dhcp_server : DHCPServer? end def sanitize_hostname(hostname : String) : String? # Lowercase and strip invalid characters sanitized = hostname.downcase.gsub(/[^a-z0-9-]/, "-") # Collapse multiple dashes into one sanitized = sanitized.gsub(/-+/, "-") # Strip leading/trailing dashes sanitized = sanitized.strip('-') # Ensure non-empty and not too long (max 63 chars for DNS label) return nil if sanitized.empty? return nil if sanitized.size > 63 # Ensure starts with letter or number (not dash) return nil if sanitized.starts_with?('-') sanitized end def generate_unbound_config(leases : Array(Lease), domain : String) : String lines = [] of String lines << "# Auto-generated DHCP leases for #{domain}" lines << "# Generated at #{Time.utc}" lines << "" leases.each do |lease| next unless hostname = lease.hostname sanitized = sanitize_hostname(hostname) next unless sanitized fqdn = "#{sanitized}.#{domain}." # A record lines << %{local-data: "#{fqdn} IN A #{lease.address}"} # PTR record - local-data-ptr expects IP in normal form, unbound reverses it lines << %{local-data-ptr: "#{lease.address} #{fqdn}"} end lines.join("\n") + "\n" end def get_leases(interface : String, networkctl_path : String? = nil) : Array(Lease) cmd = networkctl_path ? "#{networkctl_path}" : "networkctl" args = ["status", interface, "--json=short"] Process.run(cmd, args, output: Process::Redirect::Pipe, error: Process::Redirect::Pipe) do |process| result = process.wait output = process.output.to_s unless result.success? error = process.error.to_s raise "networkctl failed (exit code #{result.exit_code}): #{error.empty? ? output : error}" end status = NetworkStatus.from_json(output) status.dhcp_server.try(&.leases) || [] of Lease end end def write_if_changed(content : String, path : String) : Bool # Check if content is the same if File.exists?(path) current = File.read(path) return false if current == content end # Write to temp file and atomically move temp_path = "#{path}.tmp" File.write(temp_path, content) FileUtils.mv(temp_path, path) true end # Configuration interface = "koti" domain = "home.arpa" output_path = "/var/lib/unbound/dhcp-hosts.conf" networkctl_path : String? = nil unbound_control_path : String? = nil OptionParser.parse do |parser| parser.banner = "Usage: dhcp-leases-to-unbound [options]" parser.on("-i INTERFACE", "Network interface to monitor (default: koti)") do |i| interface = i end parser.on("-d DOMAIN", "Domain suffix (default: home.arpa)") do |d| domain = d end parser.on("-o PATH", "Output path for unbound config (default: /var/lib/unbound/dhcp-hosts.conf)") do |o| output_path = o end parser.on("--networkctl PATH", "Path to networkctl binary (default: networkctl from PATH)") do |path| networkctl_path = path end parser.on("--unbound-control PATH", "Path to unbound-control binary (default: unbound-control from PATH)") do |path| unbound_control_path = path end parser.on("-h", "--help", "Show this help") do puts parser exit end end def reload_unbound(unbound_control_path : String?) cmd = unbound_control_path ? "#{unbound_control_path}" : "unbound-control" puts "Reloading Unbound..." Process.run(cmd, ["reload"], output: Process::Redirect::Pipe, error: Process::Redirect::Pipe) do |process| result = process.wait unless result.success? raise "unbound reload failed (exit code #{result.exit_code}): #{process.error}" end end puts "Unbound reloaded successfully." end begin # Get leases from networkd leases = get_leases(interface, networkctl_path) # Generate Unbound config config = generate_unbound_config(leases, domain) # Write Unbound config if changed changed = write_if_changed(config, output_path) # Reload Unbound if config changed if changed reload_unbound(unbound_control_path) else puts "No DHCP lease changes detected." end rescue ex : Exception STDERR.puts "Error: #{ex.message}" exit 1 end