Skip to content

Instantly share code, notes, and snippets.

@dkam
Created December 18, 2025 10:03
Show Gist options
  • Select an option

  • Save dkam/fae0477cc6052a2561b60fd27f56e2c5 to your computer and use it in GitHub Desktop.

Select an option

Save dkam/fae0477cc6052a2561b60fd27f56e2c5 to your computer and use it in GitHub Desktop.
Storing IP addresses as unsigned 64bit
require "sqlite3"
require "ipaddr"
# ============================================================================
# IPNetwork64Bit Module - Mathematical operations for IP addresses
#
# The purpose of this module is to allow storing IPV4 and IPV6 in systems
# with signed, 64bit integers, and to allow inclusion queries such as:
# "Is ip address X enclosed in any recorded network?"
# ============================================================================
module IPNetwork64Bit
# SQLite only supports signed 64-bit integers, but we need unsigned 64-bit range
# To preserve comparison order, we shift values by 2^63 (subtract from unsigned)
# This maps: 0 -> -2^63, 2^63-1 -> -1, 2^63 -> 0, 2^64-1 -> 2^63-1
# Preserves ordering: all comparisons work correctly in signed space
def unsigned_to_signed(value) = value - 0x8000000000000000
# Convert signed back to unsigned
def signed_to_unsigned(value) = value + 0x8000000000000000
# Convert host IP address to two 64-bit integers (high, low)
def host_to_ints(ip)
ipa = IPAddr.new(ip)
ipa = ipa.ipv4_mapped if ipa.ipv4?
int = ipa.to_i
ip_high = int >> 64
ip_low = int & 0xFFFFFFFFFFFFFFFF
[ip_high, ip_low]
end
# Convert two 64-bit integers back to host IP address string
def ints_to_host(ip_high, ip_low)
int = (ip_high << 64) | ip_low
ipa = IPAddr.new(int, Socket::AF_INET6)
ipa.ipv4_mapped? ? ipa.native.to_s : ipa.to_s
end
# Convert network CIDR to start/end integers (high, low for each)
def network_to_ints(network)
# example: "2001:db8::/32" or "192.168.0.0/16"
ipnet = IPAddr.new(network)
# Convert IPv4 network to IPv4-mapped IPv6
if ipnet.ipv4?
first = ipnet.to_range.first.ipv4_mapped
last = ipnet.to_range.last.ipv4_mapped
start_int = first.to_i
end_int = last.to_i
else
start_int = ipnet.to_range.first.to_i
end_int = ipnet.to_range.last.to_i
end
start_high = start_int >> 64
start_low = start_int & 0xFFFFFFFFFFFFFFFF
end_high = end_int >> 64
end_low = end_int & 0xFFFFFFFFFFFFFFFF
[start_high, start_low, end_high, end_low]
end
# Convert start/end integers back to network CIDR notation
def ints_to_network(start_high, start_low, end_high, end_low)
start_int = (start_high << 64) | start_low
end_int = (end_high << 64) | end_low
start_ip = IPAddr.new(start_int, Socket::AF_INET6)
end_ip = IPAddr.new(end_int, Socket::AF_INET6)
# Convert back to IPv4 if it's IPv4-mapped
if start_ip.ipv4_mapped?
start_ip = start_ip.native
end_ip = end_ip.native
prefix = 32 - Math.log2(end_int - start_int + 1).to_i
"#{start_ip}/#{prefix}"
else
prefix = 128 - Math.log2(end_int - start_int + 1).to_i
"#{start_ip}/#{prefix}"
end
end
# Module method for easy access
module_function :unsigned_to_signed, :signed_to_unsigned, :host_to_ints, :ints_to_host,
:network_to_ints, :ints_to_network
end
# ============================================================================
# IPNetworkStore Class - Database operations for IP networks
# ============================================================================
class IPNetworkStore
include IPNetwork64Bit
def initialize(db)
@db = db
end
def create_table
@db.execute <<~SQL
CREATE TABLE IF NOT EXISTS ip_ranges (
id INTEGER PRIMARY KEY,
start_ip_high INTEGER, -- Upper 64 bits of start address
start_ip_low INTEGER, -- Lower 64 bits of start address
end_ip_high INTEGER, -- Upper 64 bits of end address
end_ip_low INTEGER -- Lower 64 bits of end address
);
SQL
end
def insert_network(network)
start_high, start_low, end_high, end_low = network_to_ints(network)
# Convert to signed for SQLite storage
start_high = unsigned_to_signed(start_high)
start_low = unsigned_to_signed(start_low)
end_high = unsigned_to_signed(end_high)
end_low = unsigned_to_signed(end_low)
@db.execute(
"INSERT INTO ip_ranges (start_ip_high, start_ip_low, end_ip_high, end_ip_low) VALUES (?, ?, ?, ?)",
[start_high, start_low, end_high, end_low]
)
end
def find_exact(ip)
ip_high, ip_low = host_to_ints(ip)
# Convert to signed for SQLite comparison
signed_high = unsigned_to_signed(ip_high)
signed_low = unsigned_to_signed(ip_low)
@db.execute(
"SELECT * FROM ip_ranges WHERE start_ip_high = ? AND start_ip_low = ? AND end_ip_high = ? AND end_ip_low = ?",
[signed_high, signed_low, signed_high, signed_low]
)
end
def find_all_enclosing(ip)
ip_high, ip_low = host_to_ints(ip)
# Convert to signed for SQLite comparison
signed_high = unsigned_to_signed(ip_high)
signed_low = unsigned_to_signed(ip_low)
@db.execute(<<~SQL, [signed_high, signed_high, signed_low, signed_high, signed_high, signed_low])
SELECT *,
((end_ip_high - start_ip_high) * 18446744073709551616.0 + (end_ip_low - start_ip_low)) as range_size
FROM ip_ranges
WHERE (? > start_ip_high OR
(? = start_ip_high AND ? >= start_ip_low))
AND (? < end_ip_high OR
(? = end_ip_high AND ? <= end_ip_low))
ORDER BY range_size ASC
SQL
end
def find_most_specific(ip)
ip_high, ip_low = host_to_ints(ip)
# Convert to signed for SQLite comparison
signed_high = unsigned_to_signed(ip_high)
signed_low = unsigned_to_signed(ip_low)
@db.execute(<<~SQL, [signed_high, signed_high, signed_low, signed_high, signed_high, signed_low])
SELECT * FROM ip_ranges
WHERE (? > start_ip_high OR
(? = start_ip_high AND ? >= start_ip_low))
AND (? < end_ip_high OR
(? = end_ip_high AND ? <= end_ip_low))
ORDER BY ((end_ip_high - start_ip_high) * 18446744073709551616.0 + (end_ip_low - start_ip_low)) ASC
LIMIT 1
SQL
end
# For backward compatibility
alias_method :find_enclosing, :find_all_enclosing
end
# ============================================================================
# Module-level convenience functions for backward compatibility
# These functions maintain the original procedural interface
# ============================================================================
# Global store instance for simple use cases
@_default_store = nil
def self.create_default_store(db = SQLite3::Database.new(":memory:"))
@_default_store = IPNetworkStore.new(db)
@_default_store.create_table
@_default_store
end
def self.default_store
@_default_store ||= create_default_store
end
# Delegates to maintain backward compatibility
%w[create_table insert_network find_exact find_all_enclosing find_most_specific find_enclosing].each do |method|
define_method(method) do |*args|
default_store.send(method, *args)
end
end
# Make math functions available at top level too
include IPNetwork64Bit
# ============================================================================
# Test Suite
# ============================================================================
def test_signed_unsigned_conversion
test_values = [
0,
1,
0x7FFFFFFFFFFFFFFF, # Max positive signed
0x8000000000000000, # First value that goes negative
0xFFFFFFFFFFFFFFFF # Max unsigned
]
test_values.each do |val|
signed = unsigned_to_signed(val)
unsigned = signed_to_unsigned(signed)
raise "Conversion failed for #{val}" unless val == unsigned
end
puts "All signed/unsigned conversion tests passed!"
end
def test_cases
# Test IPv6 host conversion
[
"fd7a:115c:a1e0::8201:ac4c",
"2001:db8:cafe:babe::1"
].each do |ip|
raise("Host test failed for #{ip}") unless ip == ints_to_host(*host_to_ints(ip))
end
# Test IPv4 host conversion (should round-trip back to IPv4)
[
"192.168.1.1",
"10.0.0.1",
"8.8.8.8"
].each do |ip|
result = ints_to_host(*host_to_ints(ip))
raise("IPv4 host test failed for #{ip}, got #{result}") unless result == ip
end
# Test IPv6 network conversion
[
"2001:db8::/32",
"fd7a:115c:a1e0::/48",
"2001:db8:cafe::/64",
"fd00::/8"
].each do |network|
result = ints_to_network(*network_to_ints(network))
raise("Network test failed for #{network}, got #{result}") unless network == result
end
# Test IPv4 network conversion (should round-trip back to IPv4)
[
"192.168.0.0/16",
"10.0.0.0/8",
"172.16.0.0/12"
].each do |ipv4_net|
result = ints_to_network(*network_to_ints(ipv4_net))
raise("IPv4 network test failed for #{ipv4_net}, got #{result}") unless result == ipv4_net
end
puts "All conversion tests passed!"
end
def debug_network_storage
puts "=" * 60
puts "Debugging ::/0 Network Storage"
puts "=" * 60
puts
# Show what ::/0 converts to
network = "::/0"
start_high, start_low, end_high, end_low = network_to_ints(network)
puts "Network: #{network}"
puts " Start: high=#{start_high} (0x#{start_high.to_s(16)}), low=#{start_low} (0x#{start_low.to_s(16)})"
puts " End: high=#{end_high} (0x#{end_high.to_s(16)}), low=#{end_low} (0x#{end_low.to_s(16)})"
puts
puts "After unsigned_to_signed conversion (for SQLite storage):"
puts " Start: high=#{unsigned_to_signed(start_high)}, low=#{unsigned_to_signed(start_low)}"
puts " End: high=#{unsigned_to_signed(end_high)}, low=#{unsigned_to_signed(end_low)}"
puts
# Show what 8.8.8.8 converts to
ip = "8.8.8.8"
ip_high, ip_low = host_to_ints(ip)
puts "IP: #{ip}"
puts " As IPv4-mapped IPv6: #{IPAddr.new(ip).ipv4_mapped}"
puts " Values: high=#{ip_high} (0x#{ip_high.to_s(16)}), low=#{ip_low} (0x#{ip_low.to_s(16)})"
puts
puts "After unsigned_to_signed conversion (for SQLite query):"
signed_high = unsigned_to_signed(ip_high)
signed_low = unsigned_to_signed(ip_low)
puts " Values: high=#{signed_high}, low=#{signed_low}"
puts
# Now test the conditions manually
puts "Testing range conditions:"
puts " Condition 1: (#{signed_high} > #{unsigned_to_signed(start_high)} OR"
puts " (#{signed_high} = #{unsigned_to_signed(start_high)} AND #{signed_low} >= #{unsigned_to_signed(start_low)}))"
cond1_a = signed_high > unsigned_to_signed(start_high)
cond1_b = signed_high == unsigned_to_signed(start_high) && signed_low >= unsigned_to_signed(start_low)
cond1 = cond1_a || cond1_b
puts " Result: #{cond1} (#{cond1_a} OR #{cond1_b})"
puts
puts " Condition 2: (#{signed_high} < #{unsigned_to_signed(end_high)} OR"
puts " (#{signed_high} = #{unsigned_to_signed(end_high)} AND #{signed_low} <= #{unsigned_to_signed(end_low)}))"
cond2_a = signed_high < unsigned_to_signed(end_high)
cond2_b = signed_high == unsigned_to_signed(end_high) && signed_low <= unsigned_to_signed(end_low)
cond2 = cond2_a || cond2_b
puts " Result: #{cond2} (#{cond2_a} OR #{cond2_b})"
puts
puts " Overall match: #{cond1 && cond2}"
puts
end
def test_database
store = IPNetworkStore.new(SQLite3::Database.new(":memory:"))
store.create_table
puts "=== Testing Database Operations ==="
puts
# Insert test networks
puts "Inserting test networks..."
test_networks = [
"2001:db8::/32", # Large IPv6 block
"2001:db8:cafe::/48", # Medium IPv6 block (inside /32)
"2001:db8:cafe::/64", # Small IPv6 block (inside /48)
"192.168.0.0/16", # IPv4 private network
"192.168.1.0/24", # IPv4 subnet (inside /16)
"10.0.0.0/8", # IPv4 large private network
"fd7a:115c:a1e0::/48", # Separate IPv6 network
"ffff::/16", # High IPv6 address (tests high values with sign bit)
"::/0" # Entire IPv6 space (edge case: 0 to max)
]
test_networks.each do |network|
store.insert_network(network)
puts " Inserted: #{network}"
end
puts
# Test find_enclosing with hierarchical networks
puts "=== Testing find_enclosing ==="
puts
test_ips = [
"2001:db8:cafe::1", # Should match all three 2001:db8 networks + ::/0
"192.168.1.100", # Should match both 192.168 networks + ::/0
"10.1.2.3", # Should match only 10.0.0.0/8 + ::/0
"8.8.8.8", # Should match ::/0 (IPv4-mapped)
"fd7a:115c:a1e0::1", # Should match only fd7a network + ::/0
"ffff::1", # Edge case: high IPv6 address
"::" # Edge case: zero address
]
test_ips.each do |ip|
puts "Searching for networks containing: #{ip}"
results = store.find_enclosing(ip)
if results.empty?
puts " No networks found"
else
puts " Found #{results.length} enclosing network(s) (ordered most specific to least specific):"
results.each do |row|
id, start_h, start_l, end_h, end_l, range_size = row
# Convert from signed (SQLite) back to unsigned
start_h = signed_to_unsigned(start_h)
start_l = signed_to_unsigned(start_l)
end_h = signed_to_unsigned(end_h)
end_l = signed_to_unsigned(end_l)
network = ints_to_network(start_h, start_l, end_h, end_l)
puts " #{network} (id: #{id}, range_size: #{range_size.to_i})"
end
end
puts
end
# Test find_exact
puts "=== Testing find_exact ==="
puts
puts "Searching for exact match: 192.168.1.100"
results = store.find_exact("192.168.1.100")
puts results.empty? ? " No exact matches (expected)" : " Found: #{results.inspect}"
puts
puts "=== All tests complete ==="
end
def test_edge_cases
puts "=" * 60
puts "Testing Edge Cases"
puts "=" * 60
puts
store = IPNetworkStore.new(SQLite3::Database.new(":memory:"))
store.create_table
# Test maximum IPv6 address
puts "=== Testing Maximum IPv6 Address ==="
max_ipv6 = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
store.insert_network("#{max_ipv6}/128")
puts " Inserted: #{max_ipv6}/128"
# Test prefix length extremes
puts "\n=== Testing Prefix Length Extremes ==="
extreme_networks = [
"::/1", # Very large network
"2001::/63", # Odd prefix length
"2001::/64", # Common prefix length
"2001::/65", # Just past 64-bit boundary
"2001:db8::/127", # Almost single host
"#{max_ipv6}/128" # Single host at max address
]
extreme_networks.each do |network|
store.insert_network(network)
puts " Inserted: #{network}"
end
# Test boundary addresses
puts "\n=== Testing Network Boundaries ==="
test_boundary_network = "2001:db8::/48"
store.insert_network(test_boundary_network)
puts " Inserted: #{test_boundary_network}"
# Calculate boundary addresses
start_h, start_l, end_h, end_l = network_to_ints(test_boundary_network)
start_addr = ints_to_host(start_h, start_l)
end_addr = ints_to_host(end_h, end_l)
puts " Network range: #{start_addr} to #{end_addr}"
# Test addresses at boundaries
boundary_tests = [
start_addr, # First address in network
end_addr, # Last address in network
"2001:db8::1", # Second address
"2001:db8::ffff:ffff:ffff:ffff" # Address before end
]
boundary_tests.each do |ip|
results = store.find_all_enclosing(ip)
puts "\n Testing boundary IP: #{ip}"
puts " Found #{results.length} matching network(s)"
results.first(3).each do |row|
id, start_h, start_l, end_h, end_l, range_size = row
network = ints_to_network(
signed_to_unsigned(start_h), signed_to_unsigned(start_l),
signed_to_unsigned(end_h), signed_to_unsigned(end_l)
)
puts " - #{network}"
end
end
end
def test_special_ipv6_formats
puts "=" * 60
puts "Testing Special IPv6 Formats"
puts "=" * 60
puts
# Test various IPv6 format representations
format_tests = [
# Compressed zeros
["2001:db8::1", "2001:0db8:0:0:0:0:0:1"],
["2001:0db8::", "2001:db8:0:0:0:0:0:0"],
["::1", "0:0:0:0:0:0:0:1"],
["::", "0:0:0:0:0:0:0:0"],
# Leading zeros
["0000:0000:0000:0000:0000:0000:0000:0001", "::1"],
# Documentation prefixes
["2001:db8::/32"],
["2001:0db8::/32"],
# ULA (Unique Local Addresses)
["fd00::/8"],
["fd7a:115c:a1e0::/48"],
# Link-local
["fe80::/10"],
["fe80::1/64"]
]
format_tests.each do |test_case|
if test_case.length == 2
ip1, ip2 = test_case
# Test that different formats for same address work
ip1_high, ip1_low = host_to_ints(ip1)
ip2_high, ip2_low = host_to_ints(ip2)
if ip1_high == ip2_high && ip1_low == ip2_low
puts " ✓ #{ip1} == #{ip2}"
else
puts " ✗ Mismatch: #{ip1} != #{ip2}"
end
else
# Test network conversion
network = test_case[0]
start_h, start_l, end_h, end_l = network_to_ints(network)
result = ints_to_network(start_h, start_l, end_h, end_l)
puts " ✓ #{network} -> #{result}"
end
end
puts "\n=== Testing IPv6 Format Consistency ==="
store = IPNetworkStore.new(SQLite3::Database.new(":memory:"))
store.create_table
# Insert networks in different formats, test they match the same IPs
networks_to_test = [
"2001:db8::/32",
"2001:0db8:0000:0000:0000:0000:0000:0000/32"
]
networks_to_test.each_with_index do |network, i|
store.insert_network(network)
puts " Inserted #{i+1}: #{network}"
end
# Test that an IP matches both representations
test_ip = "2001:db8::1"
results = store.find_all_enclosing(test_ip)
puts "\n IP #{test_ip} matches #{results.length} networks (expected: 2)"
end
def test_nested_networks
puts "=" * 60
puts "Testing Nested/Hierarchical Networks"
puts "=" * 60
puts
store = IPNetworkStore.new(SQLite3::Database.new(":memory:"))
store.create_table
# Create deeply nested networks
nested_networks = [
"::/0", # Everything
"2000::/3", # Current IPv6 allocations
"2001::/32", # IETF protocol allocations
"2001:db8::/32", # Documentation
"2001:db8:1234::/48", # Specific /48
"2001:db8:1234:5678::/64", # Specific /64
"2001:db8:1234:5678::1/128" # Single host
]
puts "=== Creating Nested Network Hierarchy ==="
nested_networks.each do |network|
store.insert_network(network)
puts " Inserted: #{network}"
end
# Test find_most_specific vs find_all_enclosing
test_ip = "2001:db8:1234:5678::1"
puts "\n=== Testing IP: #{test_ip} ==="
puts "\nUsing find_most_specific:"
result = store.find_most_specific(test_ip)
if result && !result.empty?
row = result[0]
id, start_h, start_l, end_h, end_l = row
network = ints_to_network(
signed_to_unsigned(start_h), signed_to_unsigned(start_l),
signed_to_unsigned(end_h), signed_to_unsigned(end_l)
)
puts " Most specific: #{network}"
end
puts "\nUsing find_all_enclosing:"
results = store.find_all_enclosing(test_ip)
puts " Found #{results.length} matching networks:"
results.each_with_index do |row, i|
id, start_h, start_l, end_h, end_l, range_size = row
network = ints_to_network(
signed_to_unsigned(start_h), signed_to_unsigned(start_l),
signed_to_unsigned(end_h), signed_to_unsigned(end_l)
)
# Calculate approximate prefix length for display
approx_prefix = 128 - Math.log2([range_size, 1].max).floor
puts " #{i+1}. #{network} (~/#{approx_prefix})"
end
end
def test_ipv4_edge_cases
puts "=" * 60
puts "Testing IPv4 Edge Cases"
puts "=" * 60
puts
store = IPNetworkStore.new(SQLite3::Database.new(":memory:"))
store.create_table
# IPv4 edge case networks
ipv4_networks = [
"0.0.0.0/0", # Special case: might be interpreted as IPv4 ::/0
"0.0.0.0/8", # Reserved
"127.0.0.0/8", # Loopback
"169.254.0.0/16", # Link-local
"192.168.0.0/16", # Private
"224.0.0.0/4", # Multicast
"255.255.255.255/32" # Broadcast
]
puts "=== Inserting IPv4 Edge Case Networks ==="
ipv4_networks.each do |network|
store.insert_network(network)
puts " Inserted: #{network}"
end
# Test edge case IPv4 addresses
ipv4_test_addresses = [
"0.0.0.0", # Zero address
"0.0.0.1", # First address
"127.0.0.1", # Loopback
"169.254.169.254", # Common AWS metadata IP
"192.168.255.255", # End of private range
"224.0.0.1", # All hosts multicast
"255.255.255.255" # Broadcast
]
puts "\n=== Testing IPv4 Edge Case Addresses ==="
ipv4_test_addresses.each do |ip|
results = store.find_all_enclosing(ip)
puts "\n IP: #{ip}"
puts " Matches: #{results.length} network(s)"
results.each do |row|
id, start_h, start_l, end_h, end_l, range_size = row
network = ints_to_network(
signed_to_unsigned(start_h), signed_to_unsigned(start_l),
signed_to_unsigned(end_h), signed_to_unsigned(end_l)
)
puts " - #{network} (range_size: #{range_size.to_i})"
end
end
end
def run_all_tests
puts "=" * 60
puts "IP Network Storage in 64-bit - Comprehensive Test Suite"
puts "=" * 60
puts
# Test basic conversions
test_cases
puts
# Test signed/unsigned conversion edge cases
test_signed_unsigned_conversion
puts
# Debug ::/0 issue
debug_network_storage
puts
# Test database operations
test_database
puts
# NEW COMPREHENSIVE TESTS
test_edge_cases
puts
test_special_ipv6_formats
puts
test_nested_networks
puts
test_ipv4_edge_cases
puts
puts
puts "=" * 60
puts "All tests passed! ✓"
puts "=" * 60
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment