Files
nixos/modules/services/dhcp-dns-sync/dhcp-leases-to-unbound.cr
2026-03-07 13:33:51 +02:00

188 lines
4.7 KiB
Crystal

#!/usr/bin/env crystal
require "json"
require "file_utils"
require "option_parser"
struct Lease
include JSON::Serializable
@[JSON::Field(key: "Address")]
property address_bytes : Array(Int32)
@[JSON::Field(key: "HardwareAddress")]
property hardware_address : Array(Int32)?
@[JSON::Field(key: "Hostname")]
property hostname : String?
@[JSON::Field(key: "ExpirationUSec")]
property expiration_usec : Int64?
def address : String
address_bytes.join('.')
end
end
struct DHCPServer
include JSON::Serializable
@[JSON::Field(key: "Leases")]
property leases : Array(Lease)?
end
struct NetworkStatus
include JSON::Serializable
@[JSON::Field(key: "DHCPServer")]
property dhcp_server : DHCPServer?
end
def sanitize_hostname(hostname : String) : String?
# Lowercase and strip invalid characters
sanitized = hostname.downcase.gsub(/[^a-z0-9-]/, "-")
# Collapse multiple dashes into one
sanitized = sanitized.gsub(/-+/, "-")
# Strip leading/trailing dashes
sanitized = sanitized.strip('-')
# Ensure non-empty and not too long (max 63 chars for DNS label)
return nil if sanitized.empty?
return nil if sanitized.size > 63
# Ensure starts with letter or number (not dash)
return nil if sanitized.starts_with?('-')
sanitized
end
def generate_unbound_config(leases : Array(Lease), domain : String) : String
lines = [] of String
lines << "# Auto-generated DHCP leases for #{domain}"
lines << "# Generated at #{Time.utc}"
lines << ""
leases.each do |lease|
next unless hostname = lease.hostname
sanitized = sanitize_hostname(hostname)
next unless sanitized
fqdn = "#{sanitized}.#{domain}."
# A record
lines << %{local-data: "#{fqdn} IN A #{lease.address}"}
# PTR record - local-data-ptr expects IP in normal form, unbound reverses it
lines << %{local-data-ptr: "#{lease.address} #{fqdn}"}
end
lines.join("\n") + "\n"
end
def get_leases(interface : String, networkctl_path : String? = nil) : Array(Lease)
cmd = networkctl_path ? "#{networkctl_path}" : "networkctl"
args = ["status", interface, "--json=short"]
Process.run(cmd, args, output: Process::Redirect::Pipe, error: Process::Redirect::Pipe) do |process|
result = process.wait
output = process.output.to_s
unless result.success?
error = process.error.to_s
raise "networkctl failed (exit code #{result.exit_code}): #{error.empty? ? output : error}"
end
status = NetworkStatus.from_json(output)
status.dhcp_server.try(&.leases) || [] of Lease
end
end
def write_if_changed(content : String, path : String) : Bool
# Check if content is the same
if File.exists?(path)
current = File.read(path)
return false if current == content
end
# Write to temp file and atomically move
temp_path = "#{path}.tmp"
File.write(temp_path, content)
FileUtils.mv(temp_path, path)
true
end
# Configuration
interface = "koti"
domain = "home.arpa"
output_path = "/var/lib/unbound/dhcp-hosts.conf"
networkctl_path : String? = nil
unbound_control_path : String? = nil
OptionParser.parse do |parser|
parser.banner = "Usage: dhcp-leases-to-unbound [options]"
parser.on("-i INTERFACE", "Network interface to monitor (default: koti)") do |i|
interface = i
end
parser.on("-d DOMAIN", "Domain suffix (default: home.arpa)") do |d|
domain = d
end
parser.on("-o PATH", "Output path for unbound config (default: /var/lib/unbound/dhcp-hosts.conf)") do |o|
output_path = o
end
parser.on("--networkctl PATH", "Path to networkctl binary (default: networkctl from PATH)") do |path|
networkctl_path = path
end
parser.on("--unbound-control PATH", "Path to unbound-control binary (default: unbound-control from PATH)") do |path|
unbound_control_path = path
end
parser.on("-h", "--help", "Show this help") do
puts parser
exit
end
end
def reload_unbound(unbound_control_path : String?)
cmd = unbound_control_path ? "#{unbound_control_path}" : "unbound-control"
puts "Reloading Unbound..."
Process.run(cmd, ["reload"], output: Process::Redirect::Pipe, error: Process::Redirect::Pipe) do |process|
result = process.wait
unless result.success?
raise "unbound reload failed (exit code #{result.exit_code}): #{process.error}"
end
end
puts "Unbound reloaded successfully."
end
begin
# Get leases from networkd
leases = get_leases(interface, networkctl_path)
# Generate Unbound config
config = generate_unbound_config(leases, domain)
# Write Unbound config if changed
changed = write_if_changed(config, output_path)
# Reload Unbound if config changed
if changed
reload_unbound(unbound_control_path)
else
puts "No DHCP lease changes detected."
end
rescue ex : Exception
STDERR.puts "Error: #{ex.message}"
exit 1
end