【python】并發(fā)編程

1、基礎(chǔ)了解
python中可以使用threading模塊來(lái)實(shí)現(xiàn)多線程的開(kāi)發(fā)。
使用multiprocessing模塊實(shí)現(xiàn)多核多CPU的并行執(zhí)行。
對(duì)大數(shù)據(jù)可以使用hadoop、hive、spark實(shí)現(xiàn)多個(gè)機(jī)器甚至集群的并行。
多線程:threading,利用CPU和IO可以同時(shí)執(zhí)行的原理,讓CPU不會(huì)干巴巴等待IO完成。
多進(jìn)程:multiprocessing,利用多核CPU的能力,真正的并行執(zhí)行任務(wù)。
異步IO:asyncio,在單線程利用CPU和IO同時(shí)執(zhí)行的原理,實(shí)現(xiàn)函數(shù)異步執(zhí)行。
使用Lock對(duì)資源加鎖,防止沖突訪問(wèn)。
使用Queue實(shí)現(xiàn)不同線程/進(jìn)程之間的數(shù)據(jù)通信,實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者【邊爬取邊解析】模式。
使用線程池Pool/進(jìn)程池Pool,簡(jiǎn)化線程/進(jìn)程的任務(wù)提交、等待結(jié)束、獲取結(jié)果。
使用subprocess啟動(dòng)外部程序的進(jìn)程,并進(jìn)行輸入輸出交互。
2、Python并發(fā)編程有三種方式:多線程Thread、多進(jìn)程Process、多協(xié)程Coroutine。
CPU密集型計(jì)算(CPU-bound):也叫計(jì)算密集型,是指I/O在很短時(shí)間就可以完成,CPU需要大量的計(jì)算和處理,特點(diǎn)是CPU占用率相當(dāng)高。
例如:壓縮解壓縮、加密解密、正則表達(dá)式搜索【為什么這些操作在CPU上進(jìn)行大量的計(jì)算呢】
IO密集型計(jì)算(I/O bound):IO密集型指的是系統(tǒng)運(yùn)作大部分的狀況是CPU在等待I/O(硬盤(pán)/內(nèi)存)的讀寫(xiě)操作,CPU占用率仍然較低。
例如:文件處理程序、網(wǎng)絡(luò)爬蟲(chóng)程序、讀寫(xiě)數(shù)據(jù)庫(kù)程序。運(yùn)維時(shí),拷貝慢就是典型的IO瓶頸。
多進(jìn)程 Process (multiprocessing): 一個(gè)進(jìn)程可以啟動(dòng)N個(gè)線程
有點(diǎn):可以利用多核CPU并行運(yùn)算
缺點(diǎn):占用資源最多,可啟動(dòng)數(shù)目比線程少
適用于:CPU密集型計(jì)算
多線程Thread (threading): 一個(gè)線程可以啟動(dòng)N個(gè)協(xié)程
優(yōu)點(diǎn):相比進(jìn)程,更輕量級(jí)、占用資源(內(nèi)存)少
缺點(diǎn): # 相比進(jìn)程:多線程只能并發(fā)執(zhí)行,不能利用多CPU(GIL)
? ? ? ? ? # 相比協(xié)程:?jiǎn)?dòng)數(shù)目有限,占用內(nèi)存資源,有線程切換開(kāi)銷(xiāo)
適用于:IO密集型計(jì)算、同時(shí)運(yùn)行的任務(wù)數(shù)目要求不多。
多協(xié)程 Coroutine (asyncio):?
優(yōu)點(diǎn):內(nèi)存開(kāi)銷(xiāo)最少,啟動(dòng)協(xié)程數(shù)量最多(可達(dá)幾萬(wàn)個(gè))
缺點(diǎn):支持庫(kù)有限制(爬蟲(chóng)不支持requests,但是有個(gè)庫(kù)aiohttp可以使用)、代碼實(shí)現(xiàn)復(fù)雜
適用于:IO密集型計(jì)算、需要超多任務(wù)運(yùn)行,但有現(xiàn)成庫(kù)支持的場(chǎng)景
3、全局解釋器鎖GIL
Python速度慢的原因:
原因一:動(dòng)態(tài)類(lèi)型語(yǔ)言,邊解釋邊執(zhí)行。例如要不斷檢測(cè)數(shù)據(jù)類(lèi)型需要花銷(xiāo)時(shí)間,還有python從源碼到機(jī)器語(yǔ)言的轉(zhuǎn)換也需要花銷(xiāo)。
原因二:GIL,無(wú)法利用多核CPU并發(fā)執(zhí)行。
全局解釋器鎖(Global Interpreter Lock?, GIL)
是計(jì)算機(jī)程序設(shè)計(jì)語(yǔ)言解釋器用于同步線程的一種機(jī)制,它使得任何時(shí)刻僅有一個(gè)線程在執(zhí)行。
即便在多核心處理器上,使用GIL的解釋器也只允許同一時(shí)間執(zhí)行一個(gè)線程。
多線程在運(yùn)行時(shí),會(huì)持有GIL鎖,遇到IO操作,會(huì)釋放GIL鎖,讓別的線程運(yùn)行。

Python中對(duì)象的管理,是使用引用計(jì)數(shù)器進(jìn)行的,引用數(shù)為0則釋放對(duì)象。
Python設(shè)計(jì)初期,為了規(guī)避并發(fā)問(wèn)題引入了GIL,解決了多線程之間數(shù)據(jù)完整性和狀態(tài)同步問(wèn)題,簡(jiǎn)化了Python對(duì)共享資源的管理,現(xiàn)在想去除卻去不掉了。
上一句中的并發(fā)問(wèn)題舉個(gè)例子:
兩個(gè)線程A,B都引用了一個(gè)對(duì)象obj,此時(shí)引用計(jì)數(shù)器的值是2
線程A撤銷(xiāo)了對(duì)象obj的引用,引用計(jì)數(shù)器的值變?yōu)榱?
此時(shí)多線程調(diào)度切換,切換到了線程B,線程B也撤銷(xiāo)了obj的引用,引用計(jì)數(shù)器的值變?yōu)榱?,就把obj給釋放掉了
此時(shí)又發(fā)生多線程調(diào)度切換,切換到了線程A,線程A判斷引用計(jì)數(shù)值變成了0,又釋放了一把obj,此時(shí)obj已經(jīng)不存在了,線程A再次釋放就可能破環(huán)內(nèi)存了。
為了解決上述問(wèn)題,引入了GIL。

怎樣規(guī)避GIlbert帶來(lái)的限制:
* 多線程threading機(jī)制依然是有用的,用于IO密集型計(jì)算
因?yàn)镮O期間,線程會(huì)釋放GIL,在這期間,實(shí)現(xiàn)CPU和IO的并行。因此多線程用于IO密集型計(jì)算依然可以大幅提升速度。
但是多線程用于CPU密集型計(jì)算時(shí),只會(huì)更加拖慢速度。
* 使用multiprocessing的多進(jìn)程機(jī)制實(shí)現(xiàn)并行計(jì)算、利用多核CPU優(yōu)勢(shì)。
為了應(yīng)對(duì)GIL的問(wèn)題,Python提供了multiprocessing
4、多線程例子--爬蟲(chóng)
import threading
import requests
import time
# def my_func(a, b):
# ? ? do_craw(a, b)
#
#
# t = threading.Thread(target=my_func, args=(100, 200))
#
# t.start() ? ?# 啟動(dòng)線程
# t.join() ? ?# 等待結(jié)束
# 列表解析
urls = [
? ?f"https://www.cnblogs.com/#p{page}"
? ?for page in range(1, 50+1)
]
def craw(url):
? ?r = requests.get(url)
? ?print(url, len(r.text))
def single_thread():
? ?print("single thread begin")
? ?for url in urls:
? ? ? ?craw(url)
? ?print("single thread end")
def multi_thread():
? ?print("multi thread begin")
? ?threads = []
? ?for url in urls:
? ? ? ?threads.append(
? ? ? ? ? ?threading.Thread(target=craw, args=(url, ))
? ? ? ?)
? ?for thread in threads:
? ? ? ?thread.start()
? ?for thread in threads:
? ? ? ?thread.join() ? ?# 等待線程結(jié)束
? ?print("multi thread end")
if __name__ == "__main__":
? ?start = time.time()
? ?single_thread()
? ?end = time.time()
? ?print("single thread cost:", end - start, "seconds")
? ?start = time.time()
? ?multi_thread()
? ?end = time.time()
? ?print("multi thread cost:", end - start, "seconds")
5、python實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式多線程爬蟲(chóng)
5.1 多組件的Pipeline技術(shù)架構(gòu)
輸入數(shù)據(jù)-》處理器1-》中間數(shù)據(jù)1-》處理器m->中間數(shù)據(jù)m-》處理器N-》輸出數(shù)據(jù)
生產(chǎn)者產(chǎn)生的數(shù)據(jù)會(huì)提供給消費(fèi)者消費(fèi)。例如處理器1當(dāng)作生產(chǎn)者的話,它產(chǎn)生的數(shù)據(jù)會(huì)供消費(fèi)者處理器m使用,最終傳遞給消費(fèi)者處理器N使用。
5.2 生產(chǎn)者消費(fèi)者爬蟲(chóng)的架構(gòu)
待爬取的URL列表 ——》線程組1網(wǎng)頁(yè)下載 ——》下載好的網(wǎng)頁(yè)隊(duì)列 ——》線程組2解析存儲(chǔ)——》解析結(jié)果存儲(chǔ)到數(shù)據(jù)庫(kù)
5.3 多線程數(shù)據(jù)通信的queue.Queue
queue.Queue用于多線程之間的、線程安全的數(shù)據(jù)通信。
線程安全指的是:多個(gè)線程并發(fā)同時(shí)訪問(wèn)數(shù)據(jù)不會(huì)發(fā)生沖突。
# 導(dǎo)入類(lèi)庫(kù)
import qeue
# 創(chuàng)建Queue
q = queue.Queue()
# 添加元素
q.put(item) ? # 阻塞型添加,隊(duì)列滿了,需要等待隊(duì)列騰出位置再添加
# 獲取元素
item = q.get() ? ?# 阻塞型獲取,隊(duì)列空了,需要等待隊(duì)列中添加了元素再獲取
# 查看元素的多少
q.qsize()
# 判斷是否為空
q.empty()
# 判斷是否已滿
q.full()
5.4 實(shí)現(xiàn)生產(chǎn)者消費(fèi)者爬蟲(chóng)
blog_spider.py
import threading
import requests
import time
from bs4 import BeautifulSoup
# 列表解析
urls = [
? ?f"https://www.cnblogs.com/#p{page}"
? ?for page in range(1, 50+1)
]
def craw(url):
? ?r = requests.get(url)
? ?print(url, len(r.text))
? ?return r.text
?
def parse(html):
? ?soup = BeautifulSoup(html, "html.parser")
? ?links = soup.find_all("a", class_="post-item-title")
? ?return [(link["href"], link.get_text()) for link in links]
producer_consumer_spider.py
sleep()語(yǔ)句會(huì)導(dǎo)致當(dāng)前線程的阻塞,進(jìn)行線程的切換。
import queue import blog_spider import time import random import threading def do_craw(url_queue: queue.Queue, html_queue: queue.Queue): ? ?while True: ? ? ? ?url = url_queue.get() ? ? ? ?html = blog_spider.craw(url) ? ? ? ?html_queue.put(html) ? ? ? ?print(threading.current_thread().name, f"craw {url}", ? ? ? ? ? ? ?"url_queue.size=", url_queue.qsize()) ? ? ? ?time.sleep(random.randint(1, 2)) def do_parse(html_queue: queue.Queue, fout): ? ?while True: ? ? ? ?html = html_queue.get() ? ? ? ?results = blog_spider.parse(html) ? ? ? ?for result in results: ? ? ? ? ? ?fout.write(str(result) + "\n") ? ? ? ?print(threading.current_thread().name, f"results.size", len(results), ? ? ? ? ? ? ?"html_queue.size=", html_queue.qsize()) ? ? ? ?time.sleep(random.randint(1, 2)) if __name__ == "__main__": ? ?url_queue = queue.Queue() ? ?html_queue = queue.Queue() ? ?for url in blog_spider.urls: ? ? ? ?url_queue.put(url) ? ?for idx in range(3): ? ? ? ?t = threading.Thread(target=do_craw, args=(url_queue, html_queue), ? ? ? ? ? ? ? ? ? ? ? ? ? ? name=f"craw{idx}") ? ? ? ?t.start() ? ?fout = open("data.txt", "w") ? ?for idx in range(2): ? ? ? ?t = threading.Thread(target=do_parse(), args=(html_queue, fout), ? ? ? ? ? ? ? ? ? ? ? ? ? ? name=f"parse{idx}") ? ? ? ?t.start()
6、Python線程安全問(wèn)題以及解決方案
6.1 線程安全概念介紹
線程安全:指某個(gè)函數(shù)、函數(shù)庫(kù)在多線程環(huán)境中被調(diào)用時(shí),能夠正確地處理多個(gè)線程之間的共享變量,使程序功能正確完成。
由于線程的執(zhí)行隨時(shí)會(huì)發(fā)生切換,就造成了不可預(yù)料的結(jié)果,出現(xiàn)線程不安全。
6.2 Lock用于解決線程安全問(wèn)題
# Lock的兩種用法
# 第一種方法
import threading
lock = threading.Lock()
lock.acquire()
try:
?# do something
finally:
?lock.release()
?
# 第二種方法
import threading
lock = threading.Lock()
with lock:
?# do something
例子:
import threading
import time
lock = threading.Lock()
class Account:
? ?def __init__(self, balance):
? ? ? ?self.balance = balance
def draw(account, amount):
? ?with lock:
? ? ? ?if account.balance >= amount:
? ? ? ? ? ?time.sleep(0.1) ? ?# 切換線程
? ? ? ? ? ?print(threading.current_thread().name, "取錢(qián)成功")
? ? ? ? ? ?account.balance -= amount
? ? ? ? ? ?print(threading.current_thread().name, "當(dāng)前余額", account.balance)
? ? ? ?else:
? ? ? ? ? ?print(threading.current_thread().name, "取錢(qián)失敗,余額不足")
if __name__ == "__main__":
? ?account = Account(1000)
? ?ta = threading.Thread(name="ta", target=draw, args=(account, 800))
? ?tb = threading.Thread(name="tb", target=draw, args=(account, 800))
? ?ta.start()
? ?tb.start()
? ?
? ?
# ?輸出結(jié)果:
ta 取錢(qián)成功
ta 當(dāng)前余額 200
tb 取錢(qián)失敗,余額不足
Process finished with exit code 0
7、線程池:ThreadPoolExecutor
7.1 線程池的原理
線程運(yùn)行狀態(tài):就緒態(tài)、阻塞態(tài)、運(yùn)行態(tài)。
新建線程系統(tǒng)需要分配資源,終止線程系統(tǒng)需要回收資源。如果可以重用線程,則可以減去新建/終止的開(kāi)銷(xiāo)。

7.2 線程池的好處
# 提升性能:因?yàn)闇p去了大量新建、終止線程的開(kāi)銷(xiāo),重用了線程資源
# 適用場(chǎng)景:適合處理突發(fā)性大量請(qǐng)求或需要大量線程完成任務(wù)、但實(shí)際任務(wù)處理時(shí)間較短
# 防御功能:能有效避免系統(tǒng)因?yàn)閯?chuàng)建線程過(guò)多,而導(dǎo)致系統(tǒng)負(fù)荷過(guò)大相應(yīng)變慢等問(wèn)題
# 代碼優(yōu)勢(shì):適用線程池的語(yǔ)法比自己新建線程執(zhí)行線程更加簡(jiǎn)潔
7.3 ThreadPoolExecutor的使用語(yǔ)法
# 方法一
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor() as pool:
?results = pool.map(craw, urls)
?for result in results:
? ?print(result)
? ?
# 方法二
with ThreadPoolExecutor() as pool:
?futures = [pool.submit(craw, url) for url in urls]
?for future in futures:
? ?print(future.result())
?for future in as_completed(futures):
? ?print(future.result())
7.4 使用線程池改造爬蟲(chóng)程序