#!/usr/bin/env python # # Jon Oberheide # jon@oberheide.org # http://jon.oberheide.org/pdpt/ # # The Passive DNS Port Test (PDPT) tool acts as a passive DNS monitor to flag # resolvers that may be vulnerable to the cache poisoning issue described in # CERT VU #800113. Similar to OARC's porttest, this monitor will judge the # source port behavior of resolvers based on the standard deviation of observed # source ports. # # Active probing may be insufficient in discovering vulnerable servers within # an organization as resolvers are often configured to only provide recursive # service to a restricted address range. By passively monitoring near an # organization's egress point, these resolvers can be flagged and audited. # import socket, time, bisect, optparse import dpkt, pcap BPF_FILTER = 'udp and dst port 53' STDDEV_RATING = [ 'BAD', 'FAIR', 'GOOD' ] STDDEV_THRESH = [ 3000, 10000 ] class DNSMonitor(object): def __init__(self): self.stats = {} def main(self): opt = optparse.OptionParser() opt.add_option('-i', dest='device', default=None, type='string', help='interface to listen for queries on') opt.add_option('-c', dest='count', default=100, type='int', help='queries to observe before judging a resolver') self.opts, args = opt.parse_args() pc = pcap.pcap(self.opts.device) pc.setfilter(BPF_FILTER) decode = { pcap.DLT_LOOP:dpkt.loopback.Loopback, pcap.DLT_NULL:dpkt.loopback.Loopback, pcap.DLT_EN10MB:dpkt.ethernet.Ethernet }[pc.datalink()] try: print 'listening on %s: %s' % (pc.name, pc.filter) for ts, pkt in pc: try: self.process(decode(pkt)) except: pass except KeyboardInterrupt: nrecv, ndrop, nifdrop = pc.stats() print '\n%d packets received by filter' % nrecv print '%d packets dropped by kernel' % ndrop def process(self, pkt): ip = pkt.data udp = ip.data dns = dpkt.dns.DNS(udp.data) if dns.qr != dpkt.dns.DNS_Q: return if (dns.op & dpkt.dns.DNS_RD) != 0: return if ip.src not in self.stats: self.stats[ip.src] = [] self.stats[ip.src].append(udp.sport) if len(self.stats[ip.src]) >= self.opts.count: ports = self.stats[ip.src] stddev = self.stddev(ports) rating = STDDEV_RATING[bisect.bisect(STDDEV_THRESH, stddev)] print '%s: %s is %s: %d queries from %d ports with std dev %f' % \ (time.ctime(), socket.inet_ntoa(ip.src), rating, len(ports), len(set(ports)), stddev) self.stats[ip.src] = [] def stddev(self, ports): avg = sum(ports) / float(len(ports)) sdsq = sum([(i - avg) ** 2 for i in ports]) return (sdsq / (len(ports) - 1)) ** .5 if __name__ == '__main__': DNSMonitor().main()