scripts/pingnet.py

116 lines
3.1 KiB
Python
Executable File

#!/usr/bin/env python
import argparse
import ipaddr
import os
import signal
import subprocess
import sys
import threading
import time
try:
import queue
except ImportError: # Python 2
import Queue as queue
class WaitQueue(queue.Queue):
def __init__(self, *args, **kwargs):
self.event = threading.Event()
queue.Queue.__init__(self, *args, **kwargs)
class Queuer(threading.Thread):
def __init__(self, iterable):
self.iterable = iterable
self.queue = WaitQueue()
self.go = True
super(Queuer, self).__init__()
def stop(self):
self.go = False
def run(self):
for item in self.iterable:
if not self.go:
break
self.queue.put(item)
self.queue.event.set()
class Pinger(threading.Thread):
def __init__(self, q):
self.queue = q
self.go = True
self.quiet = False
super(Pinger, self).__init__()
def stop(self):
self.go = False
def run(self):
self.queue.event.wait()
while self.go:
try:
ip = self.queue.get(False)
except queue.Empty:
break
try:
self.ping(str(ip))
except KeyboardInterrupt:
break
def ping(self, host):
with open(os.devnull, 'r+') as nul:
p = subprocess.Popen(['ping', '-c', '1', '-W', '1', host],
stdin=nul,
stdout=nul,
stderr=subprocess.STDOUT)
p.wait()
if p.returncode == 0:
if not self.quiet:
sys.stdout.write('{0} is in use\n'.format(host))
else:
sys.stdout.write('{0} is available\n'.format(host))
class main(object):
def _parse_args(self):
parser = argparse.ArgumentParser()
parser.add_argument('--quiet', '-q', action='store_true',
help='Only print available addresses')
parser.add_argument('--num-threads', type=int, default=10,
metavar='COUNT',
help='Number of simultaneous pings to run')
parser.add_argument('networks', nargs='+', metavar='network',
help='Network to ping, in CIDR notation')
return parser.parse_args()
def __init__(self):
args = self._parse_args()
self.queuer = Queuer(h for n in args.networks
for h in ipaddr.IPv4Network(n).iterhosts())
self.pingers = []
signal.signal(signal.SIGINT, self.quit)
self.queuer.start()
for __ in range(0, args.num_threads):
pinger = Pinger(self.queuer.queue)
pinger.quiet = args.quiet
self.pingers.append(pinger)
pinger.start()
while len(threading.enumerate()) > 1:
time.sleep(1)
def quit(self, signum, frame):
self.queuer.stop()
for pinger in self.pingers:
pinger.stop()
if __name__ == '__main__':
main()