基于Java信令服務(wù)器WebRTC一對(duì)多直播



package org.kurento.tutorial.one2manycall;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import org.kurento.client.*;
import org.kurento.jsonrpc.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
public class MyDemo extends TextWebSocketHandler {
? ?// 序列化工具
? ?private static final Gson gson = new GsonBuilder().create();
? ?// 線程安全的hashMap
? ?private final ConcurrentHashMap<String, UserSession> viewers = new ConcurrentHashMap<>();
? ?// 注入與kms通信客戶端
? ?@Autowired
? ?private KurentoClient kurento;
? ?// 媒體管線(管道)用于連接兩個(gè)媒體元素之間的流
? ?private MediaPipeline pipeline;
? ?// 主播端session會(huì)話
? ?private UserSession presenterUserSession;
? ?/**
? ? * 處理客戶端的數(shù)據(jù)包
? ? * @param session
? ? * @param message
? ? * @throws Exception
? ? */
? ?@Override
? ?protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
? ? ? ?// 序列化消息到j(luò)son對(duì)象
? ? ? ?JsonObject jsonMessage = gson.fromJson(message.getPayload(),JsonObject.class);
? ? ? ?switch (jsonMessage.get("id").getAsString()) {
? ? ? ? ? ?case "presenter":
? ? ? ? ? ? ? ?// 處理主播端發(fā)來(lái)的SdpOffer
? ? ? ? ? ? ? ?presenter(session, jsonMessage);
? ? ? ? ? ? ? ?break;
? ? ? ? ? ?case "viewer":
? ? ? ? ? ? ? ?// 處理觀眾端發(fā)來(lái)的SdpOffer
? ? ? ? ? ? ? ?viewer(session, jsonMessage);
? ? ? ? ? ? ? ?break;
? ? ? ? ? ?case "onIceCandidate":
? ? ? ? ? ? ? ?// 處理客戶端發(fā)來(lái)的候選人信息
? ? ? ? ? ? ? ?onIceCandidate(session,jsonMessage);
? ? ? ? ? ? ? ?break;
? ? ? ? ? ?default:
? ? ? ? ? ? ? ?break;
? ? ? ?}
? ?}
? ?/**
? ? * 客戶端關(guān)閉連接,釋放媒體管道
? ? * @param session
? ? * @param status
? ? * @throws Exception
? ? */
? ?@Override
? ?public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
? ? ? ?// 關(guān)閉客戶端會(huì)話
? ?}
? ?/**
? ? * 處理主播端發(fā)來(lái)的SdpOffer
? ? * @param session
? ? * @param jsonMessage
? ? */
? ?private synchronized void presenter(final WebSocketSession session, JsonObject jsonMessage) {
? ? ? ?// 如果當(dāng)前沒(méi)有主播在直播
? ? ? ?if (presenterUserSession == null) {
? ? ? ? ? ?// 實(shí)例化一個(gè)用戶會(huì)話對(duì)象
? ? ? ? ? ?presenterUserSession = new UserSession(session);
? ? ? ? ? ?// 創(chuàng)建一個(gè)流媒體管道
? ? ? ? ? ?pipeline = kurento.createMediaPipeline();
? ? ? ? ? ?// 通過(guò)流媒體管道實(shí)例化一個(gè)流媒體端點(diǎn)元素
? ? ? ? ? ?WebRtcEndpoint presenterWebRtc = new WebRtcEndpoint.Builder(pipeline).build();
? ? ? ? ? ?// 在主播端會(huì)話設(shè)置流媒體端點(diǎn)元素成員屬性
? ? ? ? ? ?presenterUserSession.setWebRtcEndpoint(presenterWebRtc);
? ? ? ? ? ?/**
? ? ? ? ? ? * 注冊(cè)IceCandidate監(jiān)聽(tīng)器 用于監(jiān)聽(tīng)生成的候選人信息 并發(fā)送給主播客戶端
? ? ? ? ? ? * 實(shí)現(xiàn)基于IceCandidate監(jiān)聽(tīng)器事件監(jiān)聽(tīng)器
? ? ? ? ? ? */
? ? ? ? ? ?presenterWebRtc.addIceCandidateFoundListener(new EventListener<IceCandidateFoundEvent>() {
? ? ? ? ? ? ? ?/**
? ? ? ? ? ? ? ? * IceCandidate事件處理器
? ? ? ? ? ? ? ? *
? ? ? ? ? ? ? ? * @param event IceCandidateFoundEvent事件對(duì)象
? ? ? ? ? ? ? ? */
? ? ? ? ? ? ? ?@Override
? ? ? ? ? ? ? ?public void onEvent(IceCandidateFoundEvent event) {
? ? ? ? ? ? ? ? ? ?JsonObject response = new JsonObject();
? ? ? ? ? ? ? ? ? ?response.addProperty("id", "iceCandidate");
? ? ? ? ? ? ? ? ? ?// 從事件對(duì)象拿到事件的IceCandidate信息
? ? ? ? ? ? ? ? ? ?response.add("candidate", JsonUtils.toJsonObject(event.getCandidate()));
? ? ? ? ? ? ? ? ? ?// 拿到候選信息之后發(fā)送給主播端的客戶端
? ? ? ? ? ? ? ? ? ?try {
? ? ? ? ? ? ? ? ? ? ? ?synchronized (session) {
? ? ? ? ? ? ? ? ? ? ? ? ? ?session.sendMessage(new TextMessage(response.toString()));
? ? ? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ? ?} catch (IOException e) {
? ? ? ? ? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?}
? ? ? ? ? ?});
? ? ? ? ? ?// 開(kāi)始處理客戶端傳過(guò)來(lái)的SdpOffer
? ? ? ? ? ?String sdpOffer = jsonMessage.getAsJsonPrimitive("sdpOffer").getAsString();
? ? ? ? ? ?// 處理SdpOffer并從KMS服務(wù)端返回Sdp應(yīng)答信息
? ? ? ? ? ?String sdpAnswer = presenterWebRtc.processOffer(sdpOffer);
? ? ? ? ? ?// 發(fā)送sdpAnswer應(yīng)答信息到主播端的客戶端處理
? ? ? ? ? ?JsonObject response = new JsonObject();
? ? ? ? ? ?response.addProperty("id", "presenterResponse");
? ? ? ? ? ?response.addProperty("response", "accepted");
? ? ? ? ? ?response.addProperty("sdpAnswer", sdpAnswer);
? ? ? ? ? ?// 同步線程發(fā)送sdpAnswer應(yīng)答
? ? ? ? ? ?synchronized (session) {
? ? ? ? ? ? ? ?try {
? ? ? ? ? ? ? ? ? ?presenterUserSession.sendMessage(response);
? ? ? ? ? ? ? ?} catch (IOException e) {
? ? ? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? ? ? ?}
? ? ? ? ? ?}
? ? ? ? ? ?// 開(kāi)始收集KMS的presenterWebRtc候選信息
? ? ? ? ? ?presenterWebRtc.gatherCandidates();
? ? ? ?} else {
? ? ? ? ? ?// 釋放媒體管道
? ? ? ?}
? ?}
? ?public void onIceCandidate(WebSocketSession session,JsonObject jsonMessage) {
? ? ? ?// 拿到客戶端發(fā)來(lái)候選信息
? ? ? ?JsonObject candidate = jsonMessage.get("candidate").getAsJsonObject();
? ? ? ?// 聲明一個(gè)UserSession對(duì)象
? ? ? ?UserSession user = null;
? ? ? ?// 如果主播端會(huì)話存在
? ? ? ?if (presenterUserSession != null) {
? ? ? ? ? ?// 判斷當(dāng)前的session是不是主播端session
? ? ? ? ? ?if (presenterUserSession.getSession() == session) {
? ? ? ? ? ? ? ?// 設(shè)置當(dāng)前要處理的會(huì)話對(duì)象是主播端session
? ? ? ? ? ? ? ?user = presenterUserSession;
? ? ? ? ? ?} else {
? ? ? ? ? ? ? ?// 不是主播端就是觀眾端會(huì)話了
? ? ? ? ? ? ? ?user = viewers.get(session.getId());
? ? ? ? ? ?}
? ? ? ?}
? ? ? ?if (user != null) {
? ? ? ? ? ?/**
? ? ? ? ? ? * 為這個(gè)會(huì)話在kms添加候選信息以便進(jìn)行流媒體傳輸通信
? ? ? ? ? ? * 封裝候選信息到一個(gè)對(duì)象
? ? ? ? ? ? */
? ? ? ? ? ?IceCandidate cand = new IceCandidate(
? ? ? ? ? ? ? ? ? ?candidate.get("candidate").getAsString(),
? ? ? ? ? ? ? ? ? ?candidate.get("sdpMid").getAsString(),
? ? ? ? ? ? ? ? ? ?candidate.get("sdpMLineIndex").getAsInt());
? ? ? ? ? ?// 開(kāi)始會(huì)user添加候選人信息
? ? ? ? ? ?user.addCandidate(cand);
? ? ? ?}
? ?}
? ?/**
? ? * 處理觀眾端發(fā)來(lái)的SdpOffer
? ? * @param session
? ? * @param jsonMessage
? ? */
? ?public synchronized void viewer(final WebSocketSession session,JsonObject jsonMessage) {
? ? ? ?// 實(shí)例化一個(gè)新的觀眾對(duì)象
? ? ? ?UserSession viewer = new UserSession(session);
? ? ? ?// 把當(dāng)前的會(huì)話添加到觀眾會(huì)話容器當(dāng)中
? ? ? ?viewers.put(session.getId(), viewer);
? ? ? ?// 用當(dāng)前的流媒體管道實(shí)例化一個(gè)新的觀眾媒體元素
? ? ? ?WebRtcEndpoint nextWebRtc = new WebRtcEndpoint.Builder(pipeline).build();
? ? ? ?// 添加候選監(jiān)聽(tīng)器
? ? ? ?nextWebRtc.addIceCandidateFoundListener(new EventListener<IceCandidateFoundEvent>() {
? ? ? ? ? ?@Override
? ? ? ? ? ?public void onEvent(IceCandidateFoundEvent event) {
? ? ? ? ? ? ? ?JsonObject response = new JsonObject();
? ? ? ? ? ? ? ?response.addProperty("id","iceCandidate");
? ? ? ? ? ? ? ?response.add("candidate", JsonUtils.toJsonObject(event.getCandidate()));
? ? ? ? ? ? ? ?try{
? ? ? ? ? ? ? ? ? ?synchronized(session){
? ? ? ? ? ? ? ? ? ? ? ?session.sendMessage(new TextMessage(response.toString()));
? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?}catch (IOException e){
? ? ? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? ? ? ?}
? ? ? ? ? ?}
? ? ? ?});
? ? ? ?viewer.setWebRtcEndpoint(nextWebRtc);
? ? ? ?// 從主播端媒體元素的src端連接到當(dāng)前觀眾端元素的sink端進(jìn)行輸,必須要媒體管道
? ? ? ?presenterUserSession.getWebRtcEndpoint().connect(nextWebRtc);
? ? ? ?// 處理sdpOffer提供
? ? ? ?String sdpOffer = jsonMessage.getAsJsonPrimitive("sdpOffer").getAsString();
? ? ? ?String sdpAnswer = nextWebRtc.processOffer(sdpOffer);
? ? ? ?// 發(fā)送kms端的媒體元素的sdpAnswer應(yīng)答
? ? ? ?JsonObject response = new JsonObject();
? ? ? ?response.addProperty("id", "viewerResponse");
? ? ? ?response.addProperty("response", "accepted");
? ? ? ?response.addProperty("sdpAnswer", sdpAnswer);
? ? ? ?// 同步發(fā)送消息
? ? ? ?synchronized (session) {
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?viewer.sendMessage(response);
? ? ? ? ? ?} catch (IOException e) {
? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? ?}
? ? ? ?}
? ? ? ?// 開(kāi)始收集候選信息
? ? ? ?nextWebRtc.gatherCandidates();
? ?}
}
var ws = new WebSocket('wss://' + location.host + '/call'); ?// 升級(jí)為WebSocket通信
var video; // 視頻組件對(duì)象
var webRtcPeer; // 點(diǎn)對(duì)點(diǎn)通信對(duì)象
window.onload = function() {
? ?video = document.getElementById('video');
? ?$('#startLive').attr('onclick', 'startLive()');
? ?$('#checkLive').attr('onclick', 'checkLive()');
}
// 窗口關(guān)閉之前,斷開(kāi)WebSocket會(huì)話連接
window.onbeforeunload = function() {
? ?ws.close();
}
/**
* 開(kāi)始實(shí)例化WebRtcPeer對(duì)象,設(shè)置候選收集監(jiān)聽(tīng)回調(diào)函數(shù),生成SDP提供,指定生成SDP回調(diào)函數(shù)
*/
function startLive() {
? ?var options = {
? ? ? ?localVideo : video,
? ? ? ?onicecandidate : onIceCandidate
? ?};
? ?webRtcPeer = new kurentoUtils.WebRtcPeer.WebRtcPeerSendonly(options, function (error) {
? ? ? ?if (error) {
? ? ? ? ? ?return console.log('實(shí)例化主播端webRtcPeer對(duì)象失?。?' + error);
? ? ? ?}
? ? ? ?// 生成主播端的SDP提供
? ? ? ?webRtcPeer.generateOffer(onAnchorOfferSdp);
? ?});
}
/**
* 異步監(jiān)聽(tīng)
* 回調(diào)函數(shù),得到主播端的SDPOffer并發(fā)送給后端
*/
function onAnchorOfferSdp(error, offerSdp) {
? ?if (error) {
? ? ? ?return console.log('監(jiān)聽(tīng)生成OfferSdp失敗');
? ?}
? ?console.log('監(jiān)聽(tīng)生成OfferSdp失敗');
? ?var message = {
? ? ? ?id : 'presenter', // presenter sendAnchorOfferSdp
? ? ? ?sdpOffer: offerSdp
? ?};
? ?// 發(fā)送主播端的OfferSdp
? ?sendMessage(message);
}
/**
* 處理服務(wù)端發(fā)送過(guò)來(lái)的sdpAnswer處理給主播端
* @param message
*/
function anchorProcessAnswer(message) {
? ?webRtcPeer.processAnswer(message.sdpAnswer, function (error) {
? ? ? ?if(error)
? ? ? ? ? ?return console.log("處理服務(wù)端發(fā)送過(guò)來(lái)的sdpAnswer處理給主播端錯(cuò)誤: "+error);
? ?});
}
/**
* 異步監(jiān)聽(tīng)
* 公共回調(diào)函數(shù)
* 收集候選通信數(shù)據(jù),并發(fā)送給后端
* @param candidate
*/
function onIceCandidate(candidate) {
? ?var message = {
? ? ? ?id : 'onIceCandidate',
? ? ? ?candidate : candidate
? ?};
? ?// 發(fā)送主播端或觀眾端的候選數(shù)據(jù)到服務(wù)端
? ?sendMessage(message);
}
/**
* 公共處理服務(wù)端發(fā)送過(guò)來(lái)的候選人數(shù)據(jù)
* @param message
*/
function serverSendIceCandidate(message) {
? ?webRtcPeer.addIceCandidate(message.candidate, function (error) {
? ? ? ?if(error)
? ? ? ? ? ?return console.log('添加服務(wù)端的候選人失敗: ' + error);
? ?});
}
/**
* 發(fā)送WebSocket請(qǐng)求到服務(wù)端
*/
function sendMessage(message) {
? ?// js對(duì)象序列化到字符串
? ?var jsonMessage = JSON.stringify(message);
? ?console.log("發(fā)送WebSocket請(qǐng)求到服務(wù)端:" + jsonMessage);
? ?//調(diào)用ws對(duì)象send接口
? ?ws.send(jsonMessage);
}
/**
* 觀眾端查看直播
*/
function checkLive() {
? ?var options = {
? ? ? ?remoteVideo : video,
? ? ? ?onicecandidate: onIceCandidate //與主播共用同一個(gè)回調(diào)函數(shù)
? ?};
? ?webRtcPeer = new kurentoUtils.WebRtcPeer.WebRtcPeerRecvonly(options, function (error) {
? ? ? ?if (error) {
? ? ? ? ? ?return console.log('實(shí)例化觀眾端webRtcPeer失敗 ' + error);
? ? ? ?}
? ? ? ?// 生成觀眾端的SDP提供,需要指定回調(diào)函數(shù)
? ? ? ?this.generateOffer(onAudienceOfferSdp);
? ?});
}
/**
* 異步監(jiān)聽(tīng)生成觀眾端的OfferSdp
* @param error
* @param offerSdp
*/
function onAudienceOfferSdp(error, offerSdp) {
? ?if (error) {
? ? ? ?return console.log('監(jiān)聽(tīng)生成OfferSdp失敗');
? ?}
? ?var message = {
? ? ? ?id : 'viewer', // viewer sendAudienceOfferSdp
? ? ? ?sdpOffer: offerSdp
? ?};
? ?sendMessage(message);
}
/**
* 處理服務(wù)端提供的sdp用于觀眾端
* @param message
*/
function audienceProcessAnswer(message) {
? ?webRtcPeer.processAnswer(message.sdpAnswer, function (error) {
? ? ? ?if(error)
? ? ? ? ? ?return console.log("處理服務(wù)端提供的sdp用于觀眾端錯(cuò)誤: "+error);
? ?});
}
// 監(jiān)聽(tīng)WebSocket消息
ws.onmessage = function (message) {
? ?// 字符串序列化到JSON對(duì)象
? ?var parsedMessage = JSON.parse(message.data);
? ?console.log('接收到消息:' + message.data);
? ?switch (parsedMessage.id) {
? ? ? ?case 'presenterResponse': // presenterResponse anchorProcessAnswer
? ? ? ? ? ?anchorProcessAnswer(parsedMessage);
? ? ? ? ? ?break;
? ? ? ?case 'viewerResponse': // viewerResponse audienceProcessAnswer
? ? ? ? ? ?audienceProcessAnswer(parsedMessage);
? ? ? ? ? ?break;
? ? ? ?case 'iceCandidate': // iceCandidate serverSendIceCandidate
? ? ? ? ? ?serverSendIceCandidate(parsedMessage)
? ? ? ? ? ?break;
? ? ? ?default:
? ? ? ? ? ?console.log('無(wú)法處理消息: ', parsedMessage);
? ?}
};
