import socket import concurrent.futures import ipaddress # 定义要扫描的网段 subnet = "192.168.112.0/20" # 定义要扫描的端口范围 port_range = [135] # 定义线程池大小 thread_pool_size = 200 def scan_port(ip, port): #print("正在扫描" + str(ip) + "的端口:" + str(port) +"\n") try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) result = sock.connect_ex((ip, port)) if result == 0: return str(ip) + ":" + str(port) except: pass finally: if sock: sock.close() def scan_subnet(subnet): ips = [str(ip) for ip in list(ipaddress.IPv4Network(subnet).hosts())] print(ips) with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor: futures = [executor.submit(scan_port, ip, port) for ip in ips for port in port_range] concurrent.futures.wait(futures) for future in concurrent.futures.as_completed(futures): port = future.result() if port is not None: print(port) if __name__ == "__main__": scan_subnet(subnet) |
import subprocess import os import sys import re import concurrent.futures import ipaddress import socket # 定义要扫描的网段 subnet = "192.168.112.0/20" # 定义线程池大小 thread_pool_size = 200 # 定义要扫描的端口范围 port_range = [135] def scan_port(ip, port): #print("正在扫描" + str(ip) + "的端口:" + str(port) +"\n") try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) result = sock.connect_ex((ip, port)) if result == 0: return str(ip) + ":" + str(port) except: pass finally: if sock: sock.close() def scan_subnet(subnet): with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor: futures = [executor.submit(scan_port, ip, port) for ip in subnet for port in port_range] concurrent.futures.wait(futures) for future in concurrent.futures.as_completed(futures): port = future.result() if port is not None: print(port) def PingIP(ip): try: p = subprocess.Popen(['ping','-n','1','-w','20',ip], stdout=subprocess.PIPE, stdin = subprocess.PIPE, stderr = subprocess.PIPE, shell = True) output = p.stdout.read().decode("gbk").upper() if "TTL" in output: return(ip) else: pass except: pass def checkLive(subnet): ips = [str(ip) for ip in list(ipaddress.IPv4Network(subnet).hosts())] iplist=[] with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor: futures = [executor.submit(PingIP, ip) for ip in ips] concurrent.futures.wait(futures) for future in concurrent.futures.as_completed(futures): ip = future.result() if ip is not None: iplist.append(ip) print(iplist) scan_subnet(iplist) if __name__ == "__main__": checkLive(subnets) |
import os import sys import time from scapy.all import ARP, Ether, srp import concurrent.futures import socket # 定义要扫描的网段 subnet = "192.168.118.0/24" # 定义要扫描的端口范围 port_range = [135,445,3306,3389,6379,22] # 定义线程池大小 thread_pool_size = 200 def scan_port(ip, port): #print("正在扫描" + str(ip) + "的端口:" + str(port) +"\n") try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) result = sock.connect_ex((ip, port)) if result == 0: return str(ip) + ":" + str(port) except: pass finally: if sock: sock.close() def scan_subnet(subnet): with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor: futures = [executor.submit(scan_port, ip, port) for ip in subnet for port in port_range] concurrent.futures.wait(futures) for future in concurrent.futures.as_completed(futures): port = future.result() if port is not None: print(port) def arpscan(subnet): arp_request = ARP(pdst=subnet) ether = Ether(dst="ff:ff:ff:ff:ff:ff") arp_request_broadcast = ether / arp_request answered_list = srp(arp_request_broadcast, timeout=1, verbose=False)[0] clients = [] for packet in answered_list: ip = packet[1].psrc clients.append(ip) scan_subnet(clients) if __name__ == "__main__": arpscan(subnet) v |
# encoding:utf-8 import time import struct import socket import select import concurrent.futures import ipaddress #Ping程序代码来自[url]https://blog.csdn.net/Small_Teenager/article/details/122123299[/url] # 定义要扫描的网段 subnet = "192.168.112.0/20" # 定义线程池大小 thread_pool_size = 200 # 定义要扫描的端口范围 port_range = [135,445,3306,3389,6379,22] def scan_port(ip, port): #print("正在扫描" + str(ip) + "的端口:" + str(port) +"\n") try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) result = sock.connect_ex((ip, port)) if result == 0: return str(ip) + ":" + str(port) except: pass finally: if sock: sock.close() def scan_subnet(subnet): with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor: futures = [executor.submit(scan_port, ip, port) for ip in subnet for port in port_range] concurrent.futures.wait(futures) for future in concurrent.futures.as_completed(futures): port = future.result() if port is not None: print(port) def chesksum(data): n = len(data) m = n % 2 sum = 0 for i in range(0, n - m ,2): sum += (data[i]) + ((data[i+1]) << 8)#传入data以每两个字节(十六进制)通过ord转十进制,第一字节在低位,第二个字节在高位 if m: sum += (data[-1]) #将高于16位与低16位相加 sum = (sum >> 16) + (sum & 0xffff) sum += (sum >> 16) #如果还有高于16位,将继续与低16位相加 answer = ~sum & 0xffff # 主机字节序转网络字节序列(参考小端序转大端序) answer = answer >> 8 | (answer << 8 & 0xff00) return answer def request_ping(data_type,data_code,data_checksum,data_ID,data_Sequence,payload_body): # 把字节打包成二进制数据 icmp_packet = struct.pack('>BBHHH32s',data_type,data_code,data_checksum,data_ID,data_Sequence,payload_body) icmp_chesksum = chesksum(icmp_packet) #获取校验和 # 把校验和传入,再次打包 icmp_packet = struct.pack('>BBHHH32s',data_type,data_code,icmp_chesksum,data_ID,data_Sequence,payload_body) return icmp_packet def raw_socket(dst_addr,icmp_packet): ''' 连接套接字,并将数据发送到套接字 ''' #实例化一个socket对象,ipv4,原套接字,分配协议端口 rawsocket = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket.getprotobyname("icmp")) #记录当前请求时间 send_request_ping_time = time.time() #发送数据到网络 rawsocket.sendto(icmp_packet,(dst_addr,80)) #返回数据 return send_request_ping_time,rawsocket,dst_addr def reply_ping(send_request_ping_time,rawsocket,data_Sequence,timeout = 2): while True: #开始时间 started_select = time.time() #实例化select对象,可读rawsocket,可写为空,可执行为空,超时时间 what_ready = select.select([rawsocket], [], [], timeout) #等待时间 wait_for_time = (time.time() - started_select) #没有返回可读的内容,判断超时 if what_ready[0] == []: # Timeout return -1 #记录接收时间 time_received = time.time() #设置接收的包的字节为1024 received_packet, addr = rawsocket.recvfrom(1024) #获取接收包的icmp头 #print(icmpHeader) icmpHeader = received_packet[20:28] #反转编码 type, code, checksum, packet_id, sequence = struct.unpack( ">BBHHH", icmpHeader ) if type == 0 and sequence == data_Sequence: return time_received - send_request_ping_time #数据包的超时时间判断 timeout = timeout - wait_for_time if timeout <= 0: return -1 def ping(host): #TODO icmp数据包的构建 data_type = 8 # ICMP Echo Request data_code = 0 # must be zero data_checksum = 0 # "...with value 0 substituted for this field..." data_ID = 0 #Identifier data_Sequence = 1 #Sequence number payload_body = b'abcdefghijklmnopqrstuvwabcdefghi' #data # 将主机名转ipv4地址格式,返回以ipv4地址格式的字符串,如果主机名称是ipv4地址,则它将保持不变 #dst_addr = socket.gethostbyname(host) #print("正在 Ping {0} [{1}] 具有 32 字节的数据:".format(host,dst_addr)) #请求ping数据包的二进制转换 icmp_packet = request_ping(data_type,data_code,data_checksum,data_ID,data_Sequence,payload_body) #连接套接字,并将数据发送到套接字 send_request_ping_time,rawsocket,addr = raw_socket(host,icmp_packet) #数据包传输时间 times = reply_ping(send_request_ping_time,rawsocket,data_Sequence) if times > 0: #print("来自 {0} 的回复: 字节=32 时间={1}ms".format(addr,int(times*1000))) return host else: #print("请求超时。") pass def StartPing(subnet): # 将网段转换为IP地址列表 ips = [str(ip) for ip in list(ipaddress.IPv4Network(subnet).hosts())] print(ips) # 创建线程池 with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor: # 对于每个IP地址和端口,提交扫描任务到线程池 futures = [executor.submit(ping, ip) for ip in ips] # 等待所有扫描任务完成 concurrent.futures.wait(futures) # 打印开放的端口号 iplist = [] for future in concurrent.futures.as_completed(futures): ip = future.result() if ip is not None: iplist.append(ip) scan_subnet(iplist) if __name__ == "__main__": StartPing(subnet) |