最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

RPC接口測試技術(shù)-websocket 自動化測試實踐

2022-11-01 14:27 作者:愛測軟件測試  | 我要投稿

WebSocket 是一種在單個 TCP 連接上進行全雙工通信(Full Duplex 是通訊傳輸?shù)囊粋€術(shù)語。通信允許數(shù)據(jù)在兩個方向上同時傳輸,它在能力上相當(dāng)于兩個單工通信方式的結(jié)合。全雙工指可以同時(瞬時)進行信號的雙向傳輸( A→B 且 B→A )。指 A→B 的同時 B→A,是瞬時同步的)的協(xié)議。

WebSocket 通信協(xié)議于 2011 年被 IETF 定為標(biāo)準(zhǔn) RFC 6455,并由 RFC7936 補充規(guī)范。WebSocket API (WebSocket API 是一個使用WebSocket 協(xié)議的接口,通過它來建立全雙工通道來收發(fā)消息) 也被 W3C 定為標(biāo)準(zhǔn)。

而 HTTP 協(xié)議就不支持持久連接,雖然在 HTTP1.1 中進行了改進,使得有一個 keep-alive,在一個 HTTP 連接中,可以發(fā)送多個 Request,接收多個 Response。

但是在 HTTP 中 Request = Response 永遠是成立的,也就是說一個 request 只能有一個response。而且這個response也是被動的,不能主動發(fā)起。

websocket 常用于社交/訂閱、多玩家游戲、協(xié)同辦公/編輯、股市基金報價、體育實況播放、音視頻聊天/視頻會議/在線教育、智能家居與基于位置的應(yīng)用。

websocket 接口不能使用 requests 直接進行接口的調(diào)用,可以依賴第三方庫的方式來實現(xiàn)調(diào)用,以下內(nèi)容介紹如何調(diào)用第三方庫實現(xiàn) websocket 的接口自動化測試。

實戰(zhàn)

使用 python 語言實現(xiàn) websocket 的接口自動化

環(huán)境準(zhǔn)備

1.安裝 pyhton3 環(huán)境下載需要的運行庫
2.下載需要的運行庫
pip install websocket-client

實戰(zhàn)演示

  • 連接 websoket 服務(wù)器

import logging

from websocket import create_connection

logger = logging.getLogger(__name__)?

url = 'ws://echo.websocket.org/' #一個在線的回環(huán)websocket接口,必須以websocket的方式連接后訪問,無法直接在網(wǎng)頁端輸入該地址訪問?

wss = create_connection(url, timeout=timeout)

  • 發(fā)送 websocket 消息

wss.send('Hello World')

  • 接收 websocket 消息

res = wss.recv() logger.info(res)

  • 關(guān)閉 websocket 連接

wss.close()

  • websocket 第三方庫的調(diào)用不支持直接發(fā)送除字符串外的其他數(shù)據(jù)類型,所以在發(fā)送請求之前需要將 Python 結(jié)構(gòu)化的格式,轉(zhuǎn)換為成為字符串類型或者 json 字符串后,再發(fā)起 websocket 的接口請求

#待發(fā)送的數(shù)據(jù)體格式為:

data= { ??

?"a" : "abcd", ?

??"b" : 123 ?

??}

# 發(fā)送前需要把數(shù)據(jù)處理成 json 字符串 new_data=json.dumps(data,ensure_ascii=False)

wss.send(new_data)

  • 接收的數(shù)據(jù)體的處理:如果接口定義為 json 的話,由于數(shù)據(jù)的傳輸都是字符串格式的,需要對接收的數(shù)據(jù)體進行轉(zhuǎn)換操作

# ? ?接收的數(shù)據(jù)體的格式也為字符串

?logger.info(type(res)) # <class 'str'>

對于響應(yīng)內(nèi)容進行格式轉(zhuǎn)換處理:

def load_json(base_str): ?

?if isinstance(base_str, str): ? ? ?

?try: ? ? ??

? ? ?res = json.loads(base_str) ? ??

? ? ? ?return load_json(res) ? ?

? ?except JSONDecodeError: ? ?

?? ? ? ?return base_str ?

??elif isinstance(base_str, list): ?

?? ? ?res = [] ? ??

? ?for i in base_str: ?

?? ? ? ? ?res.append(load_json(i)) ? ?

?? ?return res ??

?elif isinstance(base_str, dict): ? ?

?? ?for key, value in base_str.items(): ? ?

?? ? ? ?base_str[key] = load_json(value) ?

?? ? ?return base_str?

?? ?return base_str

  • websocket 接口自動化測試,二次封裝 demo 展示
    web_socket_util.py 封裝 websocket 接口通用操作:

import logging

?import json

from websocket import create_connection?

logger = logging.getLogger(__name__)

class WebsocketUtil(): ??

?def conn(self, uri, timeout=3): ??

? ? ?''' ??

? ? ?連接web服務(wù)器 ? ??

? ?:param uri: 服務(wù)的url ? ??

? ?:param timeout: 超時時間 ? ?

?? ?:return: ? ? ??

?''' ? ?

? ?self.wss = create_connection(uri, timeout=timeout)?

? ?def send(self, message): ?

?? ? ?''' ? ??

? ?發(fā)送請求數(shù)據(jù)體 ??

? ? ?:param message: 待發(fā)送的數(shù)據(jù)信息 ?

?? ? ?:return: ? ?

? ?''' ? ??

? ?if not isinstance(message, str): ??

? ? ? ? ?message = json.dumps(message)?

?? ? ? ?return self.wss.send(message)?

?? ?def load_json(self, base_str): ?

?? ? ?''' ? ? ? ?

進行數(shù)據(jù)體的處理? ? ? ?

?:param base_str: 待處理的數(shù)據(jù)體 ? ?

?? ?:return: ? ??

? ?''' ? ? ?

??if isinstance(base_str, str):?

?? ? ? ? ? ?try: ? ?

? ? ? ? ? ?res = json.loads(base_str) ??

? ? ? ? ? ? ?return self.load_json(res) ??

? ? ? ? ?except JSONDecodeError: ? ?

?? ? ? ? ? ?return base_str ? ? ?

??elif isinstance(base_str, list): ? ? ??

? ? ?res = [] ? ? ?

?? ? ?for i in base_str: ?

? ? ? ? ? ? ?res.append(self.load_json(i)) ??

? ? ? ? ?return res ? ? ?

??elif isinstance(base_str, dict): ? ??

? ? ? ?for key, value in base_str.items(): ??

? ? ? ? ? ? ?base_str[key] = self.load_json(value) ? ?

? ? ? ?return base_str ? ?

?? ?return base_str ??

?def recv(self, timeout=3): ? ?

?? ?''' ? ? ??

?接收數(shù)據(jù)體信息,并調(diào)用數(shù)據(jù)體處理方法處理響應(yīng)體 ?

?? ? ?:param timeout: 超時時間 ? ??

? ?:return:? ? ?

?? ''' ? ??

? ?if isinstance(timeout, dict): ?

?? ? ? ? ?timeout = timeout["timeout"] ? ?

?? ?try: ? ? ? ??

? ?self.settimeout(timeout) ??

? ? ? ? ?recv_json = self.wss.recv() ?

?? ? ? ? ?all_json_recv = self.load_json(recv_json) ? ? ? ? ? ?self._set_response(all_json_recv) ? ? ?

?? ? ?return all_json_recv ? ??

? ?except WebSocketTimeoutException: ??

? ? ? ? ?logger.error(f"已經(jīng)超過{timeout}秒沒有接收數(shù)據(jù)啦") ?

??def settimeout(self, timeout): ? ? ?

??''' ? ??

? ?設(shè)置超時時間 ? ? ?

??:param timeout: 超時時間 ??

? ? ?:return: ? ??

? ?'''? ? ? ?

?self.wss.settimeout(timeout) ?

??def recv_all(self, timeout=3): ? ??

? ?''' ? ??

? ?接收多個數(shù)據(jù)體信息,并調(diào)用數(shù)據(jù)體處理方法處理響應(yīng)體?

?? ? ? ?:param timeout: 超時時間 ? ??

? ?:return: ? ?

?? ?''' ? ??

? ?if isinstance(timeout, dict): ?

?? ? ? ? ?timeout = timeout["timeout"]? ? ??

? recv_list = [] ? ?

?? ?while True: ? ??

? ? ? ?try: ? ? ? ??

? ? ? ?self.settimeout(timeout) ? ??

? ? ? ? ? ?recv_json = self.wss.recv() ? ?

?? ? ? ? ? ?all_json_recv = self.load_json(recv_json) ? ? ? ? ? ? ? ?recv_list.append(all_json_recv) ?

?? ? ? ? ? ? ?logger.info(f"all::::: {all_json_recv}") ?

?? ? ? ? ?except WebSocketTimeoutException: ? ? ? ? ? ? ? ?logger.error(f"已經(jīng)超過{timeout}秒沒有接收數(shù)據(jù)啦") ? ?

? ? ? ? ? ?break ??

? ? ?self._set_response(recv_list) ? ??

? ?return recv_list ?

??def close(self): ??

? ? ?''' ? ? ?

??關(guān)閉連接 ? ? ?

??:return: ? ??

? ?''' ? ? ?

?return self.wss.close()?

?? ?def _set_response(self, response): ? ?

? ?self.response = response ?

??def _get_response(self) -> list: ? ? ?

??return self.response

test_case.py websocket 接口自動化測試用例:

class TestWsDemo: ? ?

def setup(self): ?

?? ? ?url = 'ws://echo.websocket.org/' ??

? ? ?self.wss = WebsocketUtil() ?

?? ? ?self.wss.conn(url) ??

?def teardown(self): ? ? ??

?self.wss.close() ??

?def test_demo(self):?

?? ? ? ?data = {"a": "hello", "b": "world"} ? ?

? ?self.wss.send(data) ?

?? ? ?res = self.wss.recv() ?

?? ? ?assert 'hello' == res['a']

WebSocket 使得客戶端和服務(wù)器之間的數(shù)據(jù)交換變得更加簡單,允許服務(wù)端主動向客戶端推送數(shù)據(jù)。

在 WebSocket API 中,瀏覽器和服務(wù)器只需要完成一次握手,兩者之間就直接可以創(chuàng)建持久性的連接,并進行雙向數(shù)據(jù)傳輸。


RPC接口測試技術(shù)-websocket 自動化測試實踐的評論 (共 條)

分享到微博請遵守國家法律
青冈县| 三台县| 揭西县| 衡东县| 宝坻区| 桐柏县| 射洪县| 福海县| 崇信县| 乐山市| 榆社县| 金乡县| 邯郸县| 枣强县| 额济纳旗| 福安市| 遂平县| 娄底市| 江川县| 阳泉市| 武功县| 稻城县| 宜川县| 塔河县| 蒲江县| 揭西县| 三都| 迁安市| 独山县| 科尔| 阜新市| 闸北区| 新余市| 永嘉县| 介休市| 随州市| 宝应县| 上栗县| 宜兴市| 兴隆县| 霍邱县|