python scapy庫(kù)可選擇網(wǎng)卡抓包分析數(shù)據(jù)包
注: python scapy庫(kù)提供了show_interface()函數(shù)來輸出本機(jī)網(wǎng)卡信息,但僅僅是輸出,網(wǎng)卡信息不能用,所以就需要對(duì)該函數(shù)做一些修改,具體就是在scapy安裝目錄下的interface.py文件中,搜索show()函數(shù),其中有一個(gè)res[]變量,中括號(hào)里面是一個(gè)pr開頭的什么單詞沒記下來,需要函數(shù)直接return這個(gè)變量,然后程序就能正常用了。 在pycharm中更容易,按住ctrl,點(diǎn)擊show_interface()函數(shù),然后再點(diǎn)擊show_interface()函數(shù)中的show()函數(shù),選擇第一個(gè)show()函數(shù),就直接跳轉(zhuǎn)到目的函數(shù)了。 注:計(jì)算機(jī)需安裝npcap供python調(diào)用。 具體實(shí)現(xiàn)效果是:每1秒存放數(shù)據(jù)包數(shù)量的字典會(huì)清零一次,然后把這一秒內(nèi)的數(shù)據(jù)存到當(dāng)前目錄下的data.txt文件中,每個(gè)tcp udp arp icmp數(shù)據(jù)包的內(nèi)容存放到當(dāng)前目錄下以協(xié)議名命名的txt文件中。 以下是完整代碼,雖然長(zhǎng),但大部分都是if判斷數(shù)據(jù)包分類。 from scapy.all import * import time import json global dict1 dict1={"TCP":0,"UDP":0,"ICMP":0,"ARP":0} def save(packet,name): ??dict1[name] = dict1.get(name, 0) + 1 ??trans=packet[f'{name}'].payload ??with open(f'./message/{name}.txt','a') as f: ????if name!="ARP": ??????src=packet["IP"].src ??????dst=packet["IP"].dst ??????if name=="TCP": ????????sport=packet[f'{name}'].sport ????????dport=packet[f'{name}'].dport ????????options=packet[f'{name}'].options ????????flags=packet[f'{name}'].flags ????????f.writelines(f"{src}/{sport}-->{dst}/{dport} {str(packet['TCP'].payload)} {str(packet['TCP'].load)}"+'\n') ??????elif name=="UDP": ????????sport = packet["UDP"].sport ????????dport = packet["UDP"].dport ????????f.writelines(f"{src}/{sport}-->{dst}/{dport}?{str(packet['UDP'].payload)} {str(packet['UDP'].load)}" + '\n') ??????elif name=="ICMP": ????????type = packet["ICMP"].type ????????code = packet["ICMP"].code ????????f.writelines(f"{type}/{code} {src}-->{dst} {str(packet['ICMP'].load)}" + '\n') ????elif name=="ARP": ??????op=packet["ARP"].op ??????hwsrc = packet['ARP'].hwsrc ??????psrc =?packet['ARP'].psrc ??????hwdst =?packet['ARP'].hwdst ??????pdst =?packet['ARP'].pdst ??????if str(op)=="1": ????????f.writelines(f"request {hwsrc}/{psrc}-->{pdst}"+'\n') ??????elif str(op)=="2": ????????f.writelines(f"respond {hwsrc}/{psrc}-->{hwdst}/{pdst}"+'\n') ??print('\r',dict1,end="") def tcp(packet): ??save(packet, "TCP") def udp(packet): ??save(packet,"UDP") def icmp(packet): ??save(packet,"ICMP") def ip4(packet): ??content=packet['IP'].payload ??proto=packet["IP"].proto ??if proto==6: ????tcp(packet) ??elif proto==17: ????udp(packet) ??elif proto==1: ????icmp(packet) def arp(packet): ??save(packet,"ARP") ??# print('\r',dict1,end="") def pack_callback(packet): ??ltime=time.time() ??global ftime,dict1 ??if ltime-ftime>=1: ????ftime=time.time() ????with open("./data.txt",'a') as f: ??????f.writelines(json.dumps(dict1)+'\n') ????del dict1 ????dict1={"TCP":0,"UDP":0,"ICMP":0,"ARP":0} ??try: ????proto=str(hex(packet["Ethernet"].type)) ????if proto=="0x800": ??????ip4(packet) ????elif proto=='0x806': ??????arp(packet) ????elif proto=="0x86dd": ??????pass ??????# print("IP6") ????elif proto=="0x8864": ??????print("PPPoE") ????elif proto=="0x8100": ??????print('802.1 q tag') ????elif proto=="0x8847": ??????print('MPLS lable') ????else: ??????pass ??except: ????pass ????# print("非以太網(wǎng)幀") ??# time.sleep(1) def sniff_1(netcart): ??with open("./data.txt", 'w') as f: ????f.writelines('') ??sniff(filter = "",prn=pack_callback, iface='{}'.format(netcart)) def netcart(inp): ??for i in list1: ????if inp in i: ??????print(f"已選擇{i[2]}") ??????return i[2] #show_interfaces()函數(shù)被更改過 list1=sorted(show_interfaces(),key=lambda x:int(x[1])) for i in list1: ??if "None" in i: ????continue ??print(' '.join(i[1:])) print("輸入pcap文件路徑或輸入序號(hào)選擇網(wǎng)卡:",end="") while True: ??inp=input() ??global ftime ??ftime = time.time() ??if '.pcap' in inp : ??????try: ????????packets = rdpcap(f'{inp}') ????????print('文件'+inp+'已選擇') ????????for i in packets: ??????????pack_callback(i) ????????break ??????except: ????????print("文件未找到,請(qǐng)檢查文件名以及文件路徑是否有誤,并重新輸入;或者再次通過序號(hào)選擇網(wǎng)卡:",end="") ????????pass ??elif int(inp): ????sniff_1(netcart(inp)) ????break