基于Kafka和Elasticsearch構(gòu)建實時站內(nèi)搜索功能的實踐
目前我們在構(gòu)建一個多租戶多產(chǎn)品類網(wǎng)站,為了讓用戶更好的找到他們所需要的產(chǎn)品,我們需要構(gòu)建站內(nèi)搜索功能,并且它應(yīng)該是實時更新的。本文將會討論構(gòu)建這一功能的核心基礎(chǔ)設(shè)施,以及支持此搜索能力的技術(shù)棧。
問題的定義與決策
為了構(gòu)建一個快速、實時的搜索引擎,我們必須做出某些設(shè)計決策。我們使用 MySQL 作為主數(shù)據(jù)庫存儲,因此有以下選擇:
直接在 MySQL 數(shù)據(jù)庫中查詢用戶在搜索框中輸入的每個關(guān)鍵詞,就像?
%#{word1}%#{word2}%...
?這樣。 ??使用一個高效的搜索數(shù)據(jù)庫,如 Elasticsearch。??
考慮到我們是一個多租戶應(yīng)用程序,同時被搜索的實體可能需要大量的關(guān)聯(lián)操作(如果我們使用的是 MySQL 一類的關(guān)系型數(shù)據(jù)庫),因為不同類型的產(chǎn)品有不同的數(shù)據(jù)結(jié)構(gòu),所以我們還可以能需要同時遍歷多個數(shù)據(jù)表來查詢用戶輸入的關(guān)鍵詞。所以我們決定不使用直接在 MySQL 中查詢關(guān)鍵詞的方案。??
因此,我們必須決定一種高效、可靠的方式,將數(shù)據(jù)實時地從 MySQL 遷移到 Elasticsearch 中。接下來需要做出如下的決定:
使用 Worker 定期查詢 MySQL 數(shù)據(jù)庫,并將所有變化的數(shù)據(jù)發(fā)送到 Elasticsearch。??
在應(yīng)用程序中使用 Elasticsearch 客戶端,將數(shù)據(jù)同時寫入到 MySQL 和 Elasticsearch 中。??
使用基于事件的流引擎,將 MySQL 數(shù)據(jù)庫中的數(shù)據(jù)更改作為事件,發(fā)送到流處理服務(wù)器上,經(jīng)過處理后將其轉(zhuǎn)發(fā)到 Elasticsearch。??
選項 1 并不是實時的,所以可以直接排除,而且即使我們縮短輪詢間隔,也會造成全表掃描給數(shù)據(jù)庫造成查詢壓力。除了不是實時的之外,選項 1 無法支持對數(shù)據(jù)的刪除操作,如果對數(shù)據(jù)進行了刪除,那么我們需要額外的表記錄之前存在過的數(shù)據(jù),這樣才能保證用戶不會搜索到已經(jīng)刪除了的臟數(shù)據(jù)。對于其他兩種選擇,不同的應(yīng)用場景做出的決定可能會有所不同。在我們的場景中,如果選擇選項 2,那么我們可以預(yù)見一些問題:如過 Elasticsearch 建立網(wǎng)絡(luò)連接并確認更新時速度很慢,那么這可能會降低我們應(yīng)用程序的速度;或者在寫入 Elasticsearch 時發(fā)生了未知異常,我們該如何對這一操作進行重試來保證數(shù)據(jù)完整性;不可否認開發(fā)團隊中不是所有開發(fā)人員都能了解所有的功能,如果有開發(fā)人員在開發(fā)新的與產(chǎn)品有關(guān)的業(yè)務(wù)邏輯時沒有引入 Elasticsearch 客戶端,那么我們將在 Elasticsearch 中更新這次數(shù)據(jù)的更改,無法保證 MySQL 與 Elasticsearch 間的數(shù)據(jù)一致性。
接下來我們該考慮如何將 MySQL 數(shù)據(jù)庫中的數(shù)據(jù)更改作為事件,發(fā)送到流處理服務(wù)器上。我們可以在數(shù)據(jù)庫變更后,在應(yīng)用程序中使用消息管道的客戶端同步地將事件發(fā)送到消息管道,但是這并沒有解決上面提到的使用 Elasticsearch 客戶端帶來的問題,只不過是將風(fēng)險從 Elasticsearch 轉(zhuǎn)移到了消息管道。最終我們決定通過采集 MySQL Binlog,將 MySQL Binlog 作為事件發(fā)送到消息管道中的方式來實現(xiàn)基于事件的流引擎。關(guān)于 binlog 的內(nèi)容可以點擊鏈接,在這里不再贅述。
服務(wù)簡介

為了對外提供統(tǒng)一的搜索接口,我們首先需要定義用于搜索的數(shù)據(jù)結(jié)構(gòu)。對于大部分的搜索系統(tǒng)而言,對用戶展示的搜索結(jié)果通常包括為標題
和內(nèi)容
,這部分內(nèi)容我們稱之可搜索內(nèi)容(Searchable Content)。在多租戶系統(tǒng)中我們還需要在搜索結(jié)果中標示出該搜索結(jié)果屬于哪個租戶,或用來過濾當前租戶下可搜索的內(nèi)容,我們還需要額外的信息來幫助用戶篩選自己想要搜索的產(chǎn)品類別,我們將這部分通用的但不用來進行搜索的內(nèi)容稱為元數(shù)據(jù)(Metadata)。最后,在我們展示搜索結(jié)果時可能希望根據(jù)不同類型的產(chǎn)品提供不同的展示效果,我們需要在搜索結(jié)果中返回這些個性化展示所需要的原始內(nèi)容(Raw Content)。到此為止我們可以定義出了存儲到 Elasticsearch 中的通用數(shù)據(jù)結(jié)構(gòu):
{ "searchable": { "title": "string", "content": "string"
}, "metadata": { "tenant_id": "long", "type": "long", "created_at": "date", "created_by": "string", "updated_at": "date", "updated_by": "string"
}, "raw": {}
}
基礎(chǔ)設(shè)施
Apache Kafka:?Apache?Kafka 是開源的分布式事件流平臺。我們使用 Apache kafka 作為數(shù)據(jù)庫事件(插入、修改和刪除)的持久化存儲。
mysql-binlog-connector-java:?我們使用?mysql-binlog-connector-java?從 MySQL Binlog 中獲取數(shù)據(jù)庫事件,并將它發(fā)送到 Apache Kafka 中。我們將單獨啟動一個服務(wù)來完成這個過程。
在接收端我們也將單獨啟動一個服務(wù)來消費 Kafka 中的事件,并對數(shù)據(jù)進行處理然后發(fā)送到 Elasticsearch 中。
Q:為什么不使用Elasticsearch connector之類的連接器對數(shù)據(jù)進行處理并發(fā)送到Elasticsearch中?
A:在我們的系統(tǒng)中是不允許將大文本存入到MySQL中的,所以我們使用了額外的對象存儲服務(wù)來存放我們的產(chǎn)品文檔,所以我們無法直接使用連接器將數(shù)據(jù)發(fā)送到Elasticsearch中。
Q:為什么不在發(fā)送到Kafka前就將數(shù)據(jù)進行處理?
A:這樣會有大量的數(shù)據(jù)被持久化到Kafka中,占用Kafka的磁盤空間,而這部分數(shù)據(jù)實際上也被存儲到了Elasticsearch。
Q:為什么要用單獨的服務(wù)來采集binlog,而不是使用Filebeat之類的agent?
A:當然可以直接在MySQL數(shù)據(jù)庫中安裝agent來直接采集binlog并發(fā)送到Kafka中。但是在部分情況下開發(fā)者使用的是云服務(wù)商或其他基礎(chǔ)設(shè)施部門提供的MySQL服務(wù)器,這種情況下我們無法直接進入服務(wù)器安裝agent,所以使用更加通用的、無侵入性的C/S結(jié)構(gòu)來消費MySQL的binlog。
配置技術(shù)棧
我們使用 docker 和 docker-compose 來配置和部署服務(wù)。為了簡單起見,MySQL 直接使用了 root 作為用戶名和密碼,Kafka 和 Elasticsearch 使用的是單節(jié)點集群,且沒有設(shè)置任何鑒權(quán)方式,僅供開發(fā)環(huán)境使用,請勿直接用于生產(chǎn)環(huán)境。
version: "3"services:
?mysql:
? ?image: mysql:5.7
? ?container_name: mysql
? ?environment:
? ? ?MYSQL_ROOT_PASSWORD: root
? ? ?MYSQL_DATABASE: app
? ?ports:
? ? ?- 3306:3306
? ?volumes:
? ? ?- mysql:/var/lib/mysql
?zookeeper:
? ?image: bitnami/zookeeper:3.6.2
? ?container_name: zookeeper
? ?ports:
? ? ?- 2181:2181
? ?volumes:
? ? ?- zookeeper:/bitnami
? ?environment:
? ? ?- ALLOW_ANONYMOUS_LOGIN=yes
?kafka:
? ?image: bitnami/kafka:2.7.0
? ?container_name: kafka
? ?ports:
? ? ?- 9092:9092
? ?volumes:
? ? ?- kafka:/bitnami
? ?environment:
? ? ?- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
? ? ?- ALLOW_PLAINTEXT_LISTENER=yes
? ?depends_on:
? ? ?- zookeeper
?elasticsearch:
? ?image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0
? ?container_name: elasticsearch
? ?environment:
? ? ?- discovery.type=single-node
? ?volumes:
? ? ?- elasticsearch:/usr/share/elasticsearch/data
? ?ports:
? ? ?- 9200:9200volumes:
?mysql:
? ?driver: local
?zookeeper:
? ?driver: local
?kafka:
? ?driver: local
?elasticsearch:
? ?driver: local
在服務(wù)啟動成功后我們需要為 Elasticsearch 創(chuàng)建索引,在這里我們直接使用 curl 調(diào)用 Elasticsearch 的 RESTful API,也可以使用 busybox 基礎(chǔ)鏡像創(chuàng)建服務(wù)來完成這個步驟。
# Elasticsearchcurl "http://localhost:9200/search" -XPUT -d '
{ ?"mappings": { ? ?"properties": { ? ? ?"searchable": { ? ? ? ?"type": "nested", ? ? ? ?"properties": { ? ? ? ? ?"title": { ? ? ? ? ? ?"type": "text"
? ? ? ? ?}, ? ? ? ? ?"content": { ? ? ? ? ? ?"type": "text"
? ? ? ? ?}
? ? ? ?}
? ? ?}, ? ? ?"metadata": { ? ? ? ?"type": "nested", ? ? ? ?"properties": { ? ? ? ? ?"tenant_id": { ? ? ? ? ? ?"type": "long"
? ? ? ? ?}, ? ? ? ? ?"type": { ? ? ? ? ? ?"type": "integer"
? ? ? ? ?}, ? ? ? ? ?"created_at": { ? ? ? ? ? ?"type": "date"
? ? ? ? ?}, ? ? ? ? ?"created_by": { ? ? ? ? ? ?"type": "keyword"
? ? ? ? ?}, ? ? ? ? ?"updated_at": { ? ? ? ? ? ?"type": "date"
? ? ? ? ?}, ? ? ? ? ?"updated_by": { ? ? ? ? ? ?"type": "keyword"
? ? ? ? ?}
? ? ? ?}
? ? ?}, ? ? ?"raw": { ? ? ? ?"type": "nested"
? ? ?}
? ?}
?}
}'
核心代碼實現(xiàn)(SpringBoot + Kotlin)
Binlog 采集端:
? ?override fun run() {
? ? ? ?client.serverId = properties.serverId ? ? ? ?val eventDeserializer = EventDeserializer()
? ? ? ?eventDeserializer.setCompatibilityMode(
? ? ? ? ? ?EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
? ? ? ?)
? ? ? ?client.setEventDeserializer(eventDeserializer)
? ? ? ?client.registerEventListener { ? ? ? ? ? ?val header = it.getHeader<EventHeader>() ? ? ? ? ? ?val data = it.getData<EventData>() ? ? ? ? ? ?if (header.eventType == EventType.TABLE_MAP) {
? ? ? ? ? ? ? ?tableRepository.updateTable(Table.of(data as TableMapEventData))
? ? ? ? ? ?} else if (EventType.isRowMutation(header.eventType)) { ? ? ? ? ? ? ? ?val events = when {
? ? ? ? ? ? ? ? ? ?EventType.isWrite(header.eventType) -> mapper.map(data as WriteRowsEventData)
? ? ? ? ? ? ? ? ? ?EventType.isUpdate(header.eventType) -> mapper.map(data as UpdateRowsEventData)
? ? ? ? ? ? ? ? ? ?EventType.isDelete(header.eventType) -> mapper.map(data as DeleteRowsEventData) ? ? ? ? ? ? ? ? ? ?else -> emptyList()
? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?logger.info("Mutation events: {}", events) ? ? ? ? ? ? ? ?for (event in events) {
? ? ? ? ? ? ? ? ? ?kafkaTemplate.send("binlog", objectMapper.writeValueAsString(event))
? ? ? ? ? ? ? ?}
? ? ? ? ? ?}
? ? ? ?}
? ? ? ?client.connect()
? ?}
在這段代碼里面,我們首先是對 binlog 客戶端進行了初始化,隨后開始監(jiān)聽 binlog 事件。binlog 事件類型有很多,大部分都是我們不需要關(guān)心的事件,我們只需要關(guān)注 TABLE_MAP 和 WRITE/UPDATE/DELETE 就可以。當我們接收到 TABLE_MAP 事件,我們會對內(nèi)存中的數(shù)據(jù)庫表結(jié)構(gòu)進行更新,在后續(xù)的 WRITE/UPDATE/DELETE 事件中,我們會使用內(nèi)存緩存的數(shù)據(jù)庫結(jié)構(gòu)進行映射。整個過程大概如下所示:
Table: ["id", "title", "content",...]Row: [1, "Foo", "Bar",...]
=>
{ "id": 1, "title": "Foo", "content": "Bar"}
隨后我們將收集到的事件發(fā)送到 Kafka 中,并由 Event Processor 進行消費處理。
事件處理器
@Componentclass KafkaBinlogTopicListener( ? ?val binlogEventHandler: BinlogEventHandler
) { ? ?companion object { ? ? ? ?private val logger = LoggerFactory.getLogger(KafkaBinlogTopicListener::class.java)
? ?} ? ?private val objectMapper = jacksonObjectMapper() ? ?
? ?fun process(message: String) { ? ? ? ?val binlogEvent = objectMapper.readValue<BinlogEvent>(message)
? ? ? ?logger.info("Consume binlog event: {}", binlogEvent)
? ? ? ?binlogEventHandler.handle(binlogEvent)
? ?}
}
首先使用?SpringBoot Message Kafka?提供的注解對事件進行消費,接下來將事件委托到?binlogEventHandler
?去進行處理。實際上?BinlogEventHandler
?是個自定義的函數(shù)式接口,我們自定義事件處理器實現(xiàn)該接口后通過 Spring Bean 的方式注入到?KafkaBinlogTopicListener
?中。
@Componentclass ElasticsearchIndexerBinlogEventHandler( ? ?val restHighLevelClient: RestHighLevelClient
) : BinlogEventHandler { ? ?override fun handle(binlogEvent: BinlogEvent) { ? ? ? ?val payload = binlogEvent.payload as Map<*, *> ? ? ? ?val documentId = "${binlogEvent.database}_${binlogEvent.table}_${payload["id"]}"
? ? ? ?// Should delete from Elasticsearch
? ? ? ?if (binlogEvent.eventType == EVENT_TYPE_DELETE) { ? ? ? ? ? ?val deleteRequest = DeleteRequest()
? ? ? ? ? ?deleteRequest
? ? ? ? ? ? ? ?.index("search")
? ? ? ? ? ? ? ?.id(documentId)
? ? ? ? ? ?restHighLevelClient.delete(deleteRequest, DEFAULT)
? ? ? ?} else { ? ? ? ? ? ?// Not ever WRITE or UPDATE, just reindex
? ? ? ? ? ?val indexRequest = IndexRequest()
? ? ? ? ? ?indexRequest
? ? ? ? ? ? ? ?.index("search")
? ? ? ? ? ? ? ?.id(documentId)
? ? ? ? ? ? ? ?.source(
? ? ? ? ? ? ? ? ? ?mapOf<String, Any>( ? ? ? ? ? ? ? ? ? ? ? ?"searchable" to mapOf( ? ? ? ? ? ? ? ? ? ? ? ? ? ?"title" to payload["title"], ? ? ? ? ? ? ? ? ? ? ? ? ? ?"content" to payload["content"]
? ? ? ? ? ? ? ? ? ? ? ?), ? ? ? ? ? ? ? ? ? ? ? ?"metadata" to mapOf( ? ? ? ? ? ? ? ? ? ? ? ? ? ?"tenantId" to payload["tenantId"], ? ? ? ? ? ? ? ? ? ? ? ? ? ?"type" to payload["type"], ? ? ? ? ? ? ? ? ? ? ? ? ? ?"createdAt" to payload["createdAt"], ? ? ? ? ? ? ? ? ? ? ? ? ? ?"createdBy" to payload["createdBy"], ? ? ? ? ? ? ? ? ? ? ? ? ? ?"updatedAt" to payload["updatedAt"], ? ? ? ? ? ? ? ? ? ? ? ? ? ?"updatedBy" to payload["updatedBy"]
? ? ? ? ? ? ? ? ? ? ? ?)
? ? ? ? ? ? ? ? ? ?)
? ? ? ? ? ? ? ?)
? ? ? ? ? ?restHighLevelClient.index(indexRequest, DEFAULT)
? ? ? ?}
? ?}
}
在這里我們只需要簡單地判斷是否為刪除操作就可以,如果是刪除操作需要在 Elasticsearch 中將數(shù)據(jù)刪除,而如果是非刪除操作只需要在 Elasticsearch 重新按照為文檔建立索引即可。這段代碼簡單地使用了 Kotlin 中提供的 mapOf 方法對數(shù)據(jù)進行映射,如果需要其他復(fù)雜的處理只需要按照 Java 代碼的方式編寫處理器即可。
總結(jié)
其實 Binlog 的處理部分有很多開源的處理引擎,包括 Alibaba Canal,本文使用手動處理的方式也是為其他使用非 MySQL 數(shù)據(jù)源的同學(xué)類似的解決方案。大家可以按需所取,因地制宜,為自己的網(wǎng)站設(shè)計屬于自己的實時站內(nèi)搜索引擎!
更多學(xué)習(xí)資料盡在有軟官網(wǎng)
https://www.jiaruvip.com
https://www.jiaruvip.com/software
https://www.jiaruvip.com/hacker
https://www.jiaruvip.com/phone
https://www.jiaruvip.com/game
https://www.jiaruvip.com/technical
https://www.jiaruvip.com/news
https://www.jiaruvip.com/author