java批量操作數(shù)據(jù)庫優(yōu)化
一、背景
最近收到數(shù)據(jù)庫CPU告警,跟蹤后發(fā)現(xiàn)是有小伙伴在循環(huán)操作數(shù)據(jù)庫,每個數(shù)據(jù)都要查詢+更新,看著血壓有點高~
偽代碼:
java復(fù)制代碼Map<String, GetTaskListResponse> taskList = ; // 接口查詢一個列表 ??taskList.forEach((taskId, task) -> { ??Task originalTask = taskMapper ?? ?.selectOne(new LambdaQueryWrapper<Task>().eq(Task::getTaskId, taskId)); ??... // 存在判斷+計算 ??taskMapper.updateById(originalTask); });
分析cpu飆升過程:如果循環(huán)操作10萬次 【查詢+更新】,會有大量請求數(shù)據(jù)庫的操作。其次,請求總時間會很久,在調(diào)用日志里面看到一次請求大于60s(這個慢請求是定時器發(fā)出的,所以一開始沒有重視),觸發(fā)一次超時重試,會再次發(fā)起請求,最終會導(dǎo)致數(shù)據(jù)庫頻繁請求,cpu升高。
與數(shù)據(jù)庫的交互
應(yīng)用連接到數(shù)據(jù)庫之后,與數(shù)據(jù)庫之間的交互基本時間開銷有:連接、網(wǎng)絡(luò)、io,如果操作的數(shù)據(jù)量很大,還會有數(shù)據(jù)庫的計算開銷。上面案例中的數(shù)據(jù)表數(shù)據(jù)不多,而連接是有池化的,所以開銷主要是網(wǎng)絡(luò)和io,減少交互的次數(shù),是最簡單的優(yōu)化方式。
考慮分治:改成分批次更新,比如操作10萬個數(shù)據(jù),一次查1000個數(shù)據(jù)出來,計算好,一次提交1000個更新sql,循環(huán)100次即可。如果數(shù)據(jù)是沒有依賴關(guān)系的,代碼里面可以用多線程,異步查詢、計算及更新。當(dāng)然,異步發(fā)送請求太多太快,小心數(shù)據(jù)庫會撐不住,這個要觀察。
sql 批量操作方法
常見的數(shù)據(jù)庫,比如mysql,可以使用的批量技術(shù)有2種,ORM框架用的機(jī)制也是基于這2個:
多個sql一次提交;
多個sql合成單個sql。
(1)多個sql一次提交
用";"分割的多個sql:將多次提交變成一次提交。比如下面的sql,數(shù)據(jù)表為user:
sql復(fù)制代碼CREATE TABLE `user` ( ??`id` int(11) NOT NULL, ??`name` varchar(255) DEFAULT "", ??`age` int(11) DEFAULT 0, ??PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
批量sql:
sql復(fù)制代碼-- 插入 INSERT INTO user(name, id) VALUES('hello',1); INSERT INTO user(name, id) VALUES('world',2); ?-- 更新 update user set name='hello1' where id=1; update user set name='world1' where id=2;
(2)合成sql
多個sql合成一個提交:
sql復(fù)制代碼-- 插入: 這里用自增id做主鍵 INSERT INTO user(name, age) VALUES ('hello',10), ('world',20); ?-- 更新: 利用 case-when-then 合成 ?update user set ? ?name= case ? ? ?when id=1 then 'hello1' ? ? ?when id=2 then 'world1' ??end ??where id in (1,2); -- 這里也可以用or,但是可能會索引失效。
二、常見ORM的批量操作
1.JPA
批量更新、批量插入都是saveAll,demo:
java復(fù)制代碼List<User> userList = // 獲取用戶列表 userRepository.saveAll(userList);
源碼:
java復(fù)制代碼// org.springframework.data.jpa.repository.support.SimpleJpaRepository @Transactional public <S extends T> List<S> saveAll(Iterable<S> entities) { ?? ?Assert.notNull(entities, "Entities must not be null!"); ?? ?List<S> result = new ArrayList(); ?? ?Iterator var3 = entities.iterator(); ? ? ?while(var3.hasNext()) { ?? ? ? ?S entity = var3.next(); ?? ? ? ?result.add(this.save(entity)); ?? ?} ? ? ?return result; }
這里可以看到這個saveAll是"假"的,實現(xiàn)方式是循環(huán)調(diào)用save(),大量數(shù)據(jù)提交的話,會有性能問題。
2.mybatis-plus 批量更新
(1)手動批量flush
java復(fù)制代碼List<Task> taskList = // 獲取任務(wù)列表 ?try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH)) { ??TaskMapper taskMapper = sqlSession.getMapper(TaskMapper.class); ? ?for (Task task : taskList) { ?? ?taskMapper.updateById(task); ??} ? ?sqlSession.flushStatements();// 提交一批sql ??sqlSession.commit(); }
處理方式是多個sql一次提交。
(2)updateBatchById
來自mybatis-plus-extension-3.2.0.jar,下是調(diào)用demo:
java復(fù)制代碼public interface UserService extends IService<User> { ?? ?void updateBatchUserById(List<User> list); } ?@Service public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService { ? ? ?@Override ?? ?public void updateBatchUserById(List<User> list) { ?? ? ? ?this.updateBatchById(list); ?? ?} }
ServiceImpl 里的 this.updateBatchById源碼如下:
java復(fù)制代碼@Transactional( ?? ?rollbackFor = {Exception.class} ) public boolean updateBatchById(Collection<T> entityList, int batchSize) { ?? ?... ?? ?String sqlStatement = this.sqlStatement(SqlMethod.UPDATE_BY_ID); ?? ?// UPDATE_BY_ID("updateById", "根據(jù)ID 選擇修改數(shù)據(jù)", "<script>\nUPDATE %s %s WHERE %s=#{%s} %s\n</script>"), ? ? ?? ?SqlSession batchSqlSession = this.sqlSessionBatch(); ? ? ?try { ?? ? ? ?int i = 0; ?? ? ? ?// 分批 ?? ? ? ?for(Iterator var7 = entityList.iterator(); var7.hasNext(); ++i) { ?? ? ? ? ? ?T anEntityList = var7.next(); ?? ? ? ? ? ?ParamMap<T> param = new ParamMap(); ?? ? ? ? ? ?param.put("et", anEntityList); ?// 這里有坑 anEntityList不能為null ?? ? ? ? ? ?batchSqlSession.update(sqlStatement, param); ?? ? ? ? ? ?if (i >= 1 && i % batchSize == 0) { ?? ? ? ? ? ? ? ?batchSqlSession.flushStatements(); //提交sql ?? ? ? ? ? ?} ?? ? ? ?} ?? ? ? ?batchSqlSession.flushStatements();//提交最后一批sql ?? ? ? ?return true; ?? ?} catch (Throwable var17) { ?? ? ? ?// 異常轉(zhuǎn)換 ?? ?} finally { ?? ? ? ?// 關(guān)閉會話 ?? ?} } ?
思路其實和上面(1)的批量flush是一樣的。也可以自定義sql:
xml復(fù)制代碼<update id="batchUpdate" parameterType="java.util.List"> ?? ?<foreach collection="list" item="item" separator=";"> ?? ? ? ?update user set name = #{item.name} where id = #{item.id} ?? ?</foreach> </update>
(3)更進(jìn)一步:合成一個sql
提交的一批sql還能合并嗎?使用case-when-then
xml復(fù)制代碼<update id="updateBatch" parameterType="java.util.List" > ?? ?update user ?? ?<trim prefix="set" suffixOverrides=","> ?? ? ? ?<trim prefix="name=case" suffix="end,"> ?? ? ? ? ? ?<foreach collection="list" item="i" index="index"> ?? ? ? ? ? ? ? ?<if test="i.name != null and i.name != ''"> ?? ? ? ? ? ? ? ? ? ?when id=#{i.id} then #{i.name} ?? ? ? ? ? ? ? ?</if> ?? ? ? ? ? ?</foreach> ?? ? ? ?</trim> ?? ?</trim> ?? ?where ? ? ?<foreach collection="list" separator="or" item="i" index="index" > ?? ? ? ?id = #{i.id} ?? ?</foreach> </update>
如果直接提交10萬個更新的sql,也是可以的。有可能出的問題:
(1)jvm 字符串太大,OOM。
(2)數(shù)據(jù)庫限制條數(shù)或者提交sql的數(shù)據(jù)大小。
參考:Mybatis中進(jìn)行批量更新(updateBatch)
3.mybatis-plus 批量插入
(1)對象操作saveBatch
demo:
sql復(fù)制代碼List<User> userList = new ArrayList<>(); userList.add(new User(1L, "Tom")); userList.add(new User(2L, "Jerry")); userList.add(new User(3L, "Mike")); int result = userMapper.saveBatch(userList);
源碼:
java復(fù)制代碼@Transactional( ?? ?rollbackFor = {Exception.class} ) public boolean saveBatch(Collection<T> entityList, int batchSize) { ?? ?String sqlStatement = this.sqlStatement(SqlMethod.INSERT_ONE);//INSERT_ONE("insert", "插入一條數(shù)據(jù)(選擇字段插入)", "<script>\nINSERT INTO %s %s VALUES %s\n</script>"), ? ? ?? ?SqlSession batchSqlSession = this.sqlSessionBatch(); ?? ?Throwable var5 = null; ?? ?try { ?? ? ? ?int i = 0; ?? ? ? ?// 分批 ?? ? ? ?for(Iterator var7 = entityList.iterator(); var7.hasNext(); ++i) { ?? ? ? ? ? ?T anEntityList = var7.next(); ?? ? ? ? ? ?batchSqlSession.insert(sqlStatement, anEntityList); ?? ? ? ? ? ?if (i >= 1 && i % batchSize == 0) { ?? ? ? ? ? ? ? ?batchSqlSession.flushStatements();// 提交sql ?? ? ? ? ? ?} ?? ? ? ?} ?? ? ? ?batchSqlSession.flushStatements();// //提交最后一批sql ?? ? ? ?return true; ?? ?} catch (Throwable var16) { ?? ? ? ?// 異常處理 ?? ?} finally { ?? ? ? ?// 關(guān)閉會話 ?? ?} }
這里比較遺憾,mybatis-plus的插入不會合成一個sql,只是一次提交多個插入的sql。
#####(2)合成一個sql 腳本如下:
xml復(fù)制代碼<insert id="batchInsert" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id" flushCache="true"> ? ? ?? ?insert into user (name) values ?? ?<foreach item="item" index="index" collection="list" separator=","> ?? ? ? ?(#{item.name}) ?? ?</foreach> </insert>
參考: 數(shù)據(jù)庫批量插入這么講究的么? MyBatis批量插入的五種方式,哪種最強
4.jdbcTemplate
(1)手動拼接
這種方式比較原始,需要把每個更新的insert/update sql語句寫好,拼成一個大的sql提交。
(2)批量操作
demo如下:
java復(fù)制代碼List<User> userList = // 獲取用戶列表 ?String sql = "update user set age=? where id=?"; jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() { ?? ?@Override ?? ?public void setValues(PreparedStatement ps, int i) throws SQLException { ?? ? ? ?User user = userList.get(i); ?? ? ? ?ps.setInt(1, user.gee()); ?? ? ? ?ps.setLong(2, user.getId()); ?? ?} ? ? ?@Override ?? ?public int getBatchSize() { ?? ? ? ?return userList.size(); ?? ?} });
源碼:
ini復(fù)制代碼public int[] batchUpdate(String sql, BatchPreparedStatementSetter pss) throws DataAccessException { ?? ?int[] result = (int[])this.execute(sql, (ps) -> { // execute執(zhí)行sql ?? ? ? ?try { ?? ? ? ? ? ?... ?? ? ? ? ? ?if (JdbcUtils.supportsBatchUpdates(ps.getConnection())) { ?? ? ? ? ? ? ? ?int ixx = 0; ?? ? ? ? ? ? ? ?while(true) { ?? ? ? ? ? ? ? ? ? ?if (ixx < batchSize) { ?? ? ? ? ? ? ? ? ? ? ? ?pss.setValues(ps, ixx); ?? ? ? ? ? ? ? ? ? ? ? ?if (ipss == null || !ipss.isBatchExhausted(ixx)) { ?? ? ? ? ? ? ? ? ? ? ? ? ? ?ps.addBatch(); // 添加 ?? ? ? ? ? ? ? ? ? ? ? ? ? ?++ixx; ?? ? ? ? ? ? ? ? ? ? ? ? ? ?continue; ?? ? ? ? ? ? ? ? ? ? ? ?} ?? ? ? ? ? ? ? ? ? ?} ?? ? ? ? ? ? ? ? ? ?// 獲取預(yù)編譯參數(shù) ?? ? ? ? ? ? ? ? ? ?int[] var11 = ps.executeBatch(); ?? ? ? ? ? ? ? ? ? ?return var11; ?? ? ? ? ? ? ? ?} ?? ? ? ? ? ?} else { ?? ? ? ? ? ? ? ?List<Integer> rowsAffected = new ArrayList(); ?? ? ? ? ? ? ? ?for(int i = 0; i < batchSize; ++i) { ?? ? ? ? ? ? ? ? ? ?pss.setValues(ps, i); ?? ? ? ? ? ? ? ? ? ?if (ipss != null && ipss.isBatchExhausted(i)) { ?? ? ? ? ? ? ? ? ? ? ? ?break; ?? ? ? ? ? ? ? ? ? ?} ?? ? ? ? ? ? ? ? ? ?rowsAffected.add(ps.executeUpdate()); // 添加 ?? ? ? ? ? ? ? ?} ?? ? ? ? ? ? ? ?int[] rowsAffectedArray = new int[rowsAffected.size()]; ?? ? ? ? ? ? ? ?for(int ix = 0; ix < rowsAffectedArray.length; ++ix) { ?? ? ? ? ? ? ? ? ? ?rowsAffectedArray[ix] = (Integer)rowsAffected.get(ix); ?? ? ? ? ? ? ? ?} ?? ? ? ? ? ? ? ?// 獲取預(yù)編譯參數(shù) ?? ? ? ? ? ? ? ?int[] var13 = rowsAffectedArray; ?? ? ? ? ? ? ? ?return var13; ?? ? ? ? ? ?} ?? ? ? ?} finally { ?? ? ? ? ? ?// 清理 ?? ? ? ?} ?? ?}); ?? ?Assert.state(result != null, "No result array"); ?? ?return result; } ?
看源碼,原理一樣的~最后都是拼接一批sql,JdbcTemplate只是提供了輔助拼接的工具。
三、總結(jié)
時刻記住,盡量不要循環(huán)操作數(shù)據(jù)庫。大量操作數(shù)據(jù)庫,要分批處理。
功能上線后要關(guān)注接口的性能,考慮把時間過長的請求納入告警。
要加強代碼review,代碼靜態(tài)分析工作。