最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

java實現(xiàn)Socket群聊

2022-09-11 13:51 作者:虛云幻仙  | 我要投稿

/**
* 實現(xiàn)群聊
* 服務(wù)端向每個客戶端一對一通信,將一個客戶端的消息發(fā)送給其他所有人,客戶端與客戶端之間沒有直接聯(lián)系
*/

public class GroupChatServer {
? ?public static String msg;
? ?//Sender線程和Receiver線程的緩沖區(qū),Receiver線程收到某個客戶端的消息,放入msg,喚醒Sender線程群發(fā)消息
? ?public static void main(String[] args) {
? ? ? ?try(ServerSocket ss = new ServerSocket(8888)) {
? ? ? ? ? ?System.out.println("群聊服務(wù)器已啟動,等待客戶端連接");
? ? ? ? ? ?while (true){
? ? ? ? ? ? ? ?Socket socket = ss.accept();
? ? ? ? ? ? ? ?System.out.println("有新的客戶端連接: "+socket.getInetAddress());
? ? ? ? ? ? ? ?new Receiver2(socket).start();
? ? ? ? ? ? ? ?new Sender2(socket).start();
? ? ? ? ? ?}
? ? ? ?} catch (IOException e) {
? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ?}

? ?}
}
class Sender2 extends Thread{
? ?//向客戶端一對一發(fā)送消息
? ?final Socket socket;

? ?public Sender2(Socket socket) {
? ? ? ?this.socket = socket;
? ?}

? ?@Override
? ?public void run() {
? ? ? ?try(PrintWriter pw = new PrintWriter(socket.getOutputStream(),true)) {
? ? ? ? ? ?while (true){
? ? ? ? ? ? ? ?synchronized(""){
? ? ? ? ? ? ? ? ? ?//所有收發(fā)線程同步""
? ? ? ? ? ? ? ? ? ?"".wait();
? ? ? ? ? ? ? ? ? ?//每次先等待,當(dāng)有人說話了再喚醒,發(fā)送消息
? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?if (socket.isClosed()||socket.isOutputShutdown())break;
? ? ? ? ? ? ? ?//如果Receiver線程.readLine返回null即對方Socket對象已關(guān)閉,則關(guān)閉己方Socket對象,Sender線程檢測到Socket關(guān)閉退出循環(huán)結(jié)束線程
? ? ? ? ? ? ? ?pw.println(GroupChatServer.msg);
? ? ? ? ? ? ? ?//服務(wù)器對每個客戶端一對一發(fā)送,將這一個消息群發(fā)給所有客戶端,因為只是讀取msg而不會改變它所以不需要同步
? ? ? ? ? ?}
? ? ? ?} catch (IOException | InterruptedException e) {
? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ?}finally {
? ? ? ? ? ?if (socket != null) {
? ? ? ? ? ? ? ?try {
? ? ? ? ? ? ? ? ? ?socket.close();
? ? ? ? ? ? ? ? ? ?//在Receiver線程也會finally socket.close() close方法可以多次執(zhí)行,方法內(nèi)判斷如果isClosed直接return,不會產(chǎn)生異常
? ? ? ? ? ? ? ?} catch (IOException e) {
? ? ? ? ? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ? ? ? ? ?}
? ? ? ? ? ?}
? ? ? ?}

? ?}
}
class Receiver2 extends Thread{
? ?//與客戶端一對一接收消息
? ?final Socket socket;

? ?public Receiver2(Socket socket) {
? ? ? ?this.socket = socket;
? ?}

? ?@Override
? ?public void run() {
? ? ? ?try(BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
? ? ? ? ? ?synchronized (""){
? ? ? ? ? ? ? ?GroupChatServer.msg = "用戶"+socket.getInetAddress()+"加入群聊";
? ? ? ? ? ? ? ?//每當(dāng)有新用戶加入時在群內(nèi)提示
? ? ? ? ? ? ? ?"".notifyAll();
? ? ? ? ? ?}
? ? ? ? ? ?while (true){
? ? ? ? ? ? ? ?String newMsg = br.readLine();
? ? ? ? ? ? ? ?if (newMsg == null){
? ? ? ? ? ? ? ? ? ?System.out.println("用戶"+socket.getInetAddress()+"結(jié)束通信");
? ? ? ? ? ? ? ? ? ?synchronized (""){
? ? ? ? ? ? ? ? ? ? ? ?GroupChatServer.msg = "用戶"+socket.getInetAddress()+"退出群聊";
? ? ? ? ? ? ? ? ? ? ? ?"".notifyAll();
? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ? ?break;
? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?synchronized (""){
? ? ? ? ? ? ? ? ? ?//同步""每次只能一個Receiver線程存msg
? ? ? ? ? ? ? ? ? ?GroupChatServer.msg = "用戶"+socket.getInetAddress()+"說: "+newMsg;
? ? ? ? ? ? ? ? ? ?"".notifyAll();
? ? ? ? ? ? ? ? ? ?//.notifyAll()喚醒所有"".wait()阻塞的Sender線程,每個Sender線程將msg發(fā)送給客戶端后再次wait
? ? ? ? ? ? ? ?}
? ? ? ? ? ?}
? ? ? ?} catch (IOException e) {
? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ?}finally {
? ? ? ? ? ?if (socket!=null){
? ? ? ? ? ? ? ?try {
? ? ? ? ? ? ? ? ? ?socket.close();
? ? ? ? ? ? ? ? ? ?//Receiver線程先發(fā)現(xiàn)用戶結(jié)束通信,喚醒Sender線程后break執(zhí)行close,這時Sender還在就緒狀態(tài)排隊,所以Receiver線程會更早關(guān)閉socket
? ? ? ? ? ? ? ?} catch (IOException e) {
? ? ? ? ? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ? ? ? ? ?}
? ? ? ? ? ?}
? ? ? ?}
? ?}
}

class GCS2{
? ?//解決:當(dāng)多個Receiver同時收到客戶端的消息,在Sender喚醒但還沒有得到CPU資源或部分線程沒得到CPU資源發(fā)送消息時,下一個Receiver將msg更改,部分用戶漏收消息的可能性
? ?static String msg;
? ?static Socket microphone;
? ?//緩存發(fā)送msg的Socket
? ?final static ArrayList<Socket> group = new ArrayList<>();
? ?//組員容器
? ?final static ArrayList<PrintWriter> pws = new ArrayList<>();
? ?//輸出流容器

? ?public static void add(Socket socket){
? ? ? ?group.add(socket);
? ? ? ?try {
? ? ? ? ? ?pws.add(new PrintWriter(socket.getOutputStream(),true));
? ? ? ?} catch (IOException e) {
? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ?}
? ?}

? ?public static void main(String[] args) {
? ? ? ?try(ServerSocket ss = new ServerSocket(8888)) {
? ? ? ? ? ?new Sender3().start();
? ? ? ? ? ?//用一個Sender線程處理所有輸出流,每次將msg發(fā)送給所有人之后再讓Receiver傳新的msg
? ? ? ? ? ?while (true){
? ? ? ? ? ? ? ?Socket socket = ss.accept();
? ? ? ? ? ? ? ?add(socket);
? ? ? ? ? ? ? ?new Receiver3(socket).start();
? ? ? ? ? ?}
? ? ? ?} catch (IOException e) {
? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ?}
? ?}
}
class Receiver3 extends Thread{
? ?final Socket socket;

? ?public Receiver3(Socket socket) {
? ? ? ?this.socket = socket;
? ?}

? ?@Override
? ?public void run() {
? ? ? ?try(BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
? ? ? ? ? ?push("用戶"+socket.getInetAddress()+":"+socket.getPort()+"加入群聊");
? ? ? ? ? ?//push()方法用于更新msg,單獨寫方法體便于復(fù)用
? ? ? ? ? ?while (true){
? ? ? ? ? ? ? ?String nMsg = br.readLine();
? ? ? ? ? ? ? ?if (nMsg == null) {
? ? ? ? ? ? ? ? ? ?push("用戶"+socket.getInetAddress()+":"+socket.getPort()+"退出群聊");
? ? ? ? ? ? ? ? ? ?break;
? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?push("用戶"+socket.getInetAddress()+":"+socket.getPort()+"說: "+ nMsg);
? ? ? ? ? ? ? ?//客戶端發(fā)送給服務(wù)端的信息已經(jīng)被緩存在Socket中了,每次.readLine()是讀取本地緩存中的內(nèi)容,也就是說即便Receiver線程因為排隊更新msg而阻塞,也不會漏掉客戶端的某段msg
? ? ? ? ? ?}

? ? ? ?} catch (IOException | InterruptedException e) {
? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ?} finally {
? ? ? ? ? ?if (socket != null) {
? ? ? ? ? ? ? ?try {
? ? ? ? ? ? ? ? ? ?socket.close();
? ? ? ? ? ? ? ?} catch (IOException e) {
? ? ? ? ? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ? ? ? ? ?}
? ? ? ? ? ?}
? ? ? ?}
? ?}

? ?private void push(String msg) throws InterruptedException {
? ? ? ?synchronized ("r"){
? ? ? ? ? ?if (GCS2.msg!=null){
? ? ? ? ? ? ? ?"r".wait();
? ? ? ? ? ? ? ?//當(dāng)Sender沒有取走msg時.wait
? ? ? ? ? ?}
? ? ? ? ? ?GCS2.msg = msg;
? ? ? ? ? ?GCS2.microphone = socket;
? ? ? ?}
? ? ? ?synchronized ("s"){
? ? ? ? ? ?"s".notify();
? ? ? ? ? ?//喚醒Sender線程,Receiver線程在"r".wait()阻塞,Sender線程在"s".wait()阻塞,如果是同一個對象鎖的話使用notify()無法保證喚醒的是Sender
? ? ? ?}
? ?}

}
class Sender3 extends Thread{
? ?@Override
? ?public void run() {
? ? ? ?while (true){
? ? ? ? ? ?synchronized ("s"){
? ? ? ? ? ? ? ?if (GCS2.msg==null) {
? ? ? ? ? ? ? ? ? ?try {
? ? ? ? ? ? ? ? ? ? ? ?"s".wait();
? ? ? ? ? ? ? ? ? ?} catch (InterruptedException e) {
? ? ? ? ? ? ? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?}
? ? ? ? ? ?}
? ? ? ? ? ?checkConnection();
? ? ? ? ? ?//將已經(jīng)關(guān)閉的Socket從group移除并關(guān)閉PW輸出流
? ? ? ? ? ?for (int i = 0;i<GCS2.group.size();i++){
? ? ? ? ? ? ? ?if (GCS2.group.get(i)!=GCS2.microphone){
? ? ? ? ? ? ? ? ? ?GCS2.pws.get(i).println(GCS2.msg);
? ? ? ? ? ? ? ?}
? ? ? ? ? ?}
? ? ? ? ? ?synchronized ("r"){
? ? ? ? ? ? ? ?GCS2.msg=null;
? ? ? ? ? ? ? ?GCS2.microphone = null;
? ? ? ? ? ? ? ?"r".notify();
? ? ? ? ? ? ? ?//喚醒一個因為"r".wait()阻塞的Receiver線程更新信息
? ? ? ? ? ?}


? ? ? ?}
? ?}

? ?private void checkConnection(){
? ? ? ?ArrayList<Integer> list = new ArrayList<>();
? ? ? ?for (int i = GCS2.group.size()-1;i>=0;i--){
? ? ? ? ? ?if (GCS2.group.get(i).isClosed()||GCS2.group.get(i).isOutputShutdown()){
? ? ? ? ? ? ? ?list.add(i);
? ? ? ? ? ?}
? ? ? ?}
? ? ? ?for (int i :
? ? ? ? ? ? ? ?list) {
? ? ? ? ? ?GCS2.group.remove(i);
? ? ? ? ? ?GCS2.pws.get(i).close();
? ? ? ? ? ?GCS2.pws.remove(i);
? ? ? ? ? ?//從后往前刪,每次刪除不會影響下一個要刪的元素的索引
? ? ? ?}
? ?}
}

class GCS3{
? ?final static String[] history = new String[99];
? ?//使用數(shù)組緩存歷史記錄
? ?static int cursor;
? ?public static void push(String msg){
? ? ? ?synchronized (history){
? ? ? ? ? ?//存的時候串行
? ? ? ? ? ?history[cursor++]=msg;
? ? ? ? ? ?if (cursor==99){
? ? ? ? ? ? ? ?cursor=0;
? ? ? ? ? ?}
? ? ? ?}
? ? ? ?synchronized ("s"){
? ? ? ? ? ?"s".notifyAll();
? ? ? ?}
? ?}

? ?public static void main(String[] args) {
? ? ? ?try(ServerSocket ss = new ServerSocket(8888)) {
? ? ? ? ? ?System.out.println("服務(wù)器啟動");
? ? ? ? ? ?while (true){
? ? ? ? ? ? ? ?Socket socket = ss.accept();
? ? ? ? ? ? ? ?new Receiver4(socket).start();
? ? ? ? ? ? ? ?new Sender4(socket).start();
? ? ? ? ? ?}
? ? ? ?} catch (IOException e) {
? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ?}
? ?}

}

class Sender4 extends Thread{
? ?final Socket socket;
? ?int index;

? ?public Sender4(Socket socket) {
? ? ? ?this.socket = socket;
? ? ? ?this.index = GCS3.cursor;
? ?}

? ?@Override
? ?public void run() {
? ? ? ?try(PrintWriter pw = new PrintWriter(socket.getOutputStream(),true)) {
? ? ? ? ? ?while (true){
? ? ? ? ? ? ? ?synchronized ("s"){
? ? ? ? ? ? ? ? ? ?if (index==GCS3.cursor){
? ? ? ? ? ? ? ? ? ? ? ?"s".wait();
? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?//取的時候并行
? ? ? ? ? ? ? ?if (socket.isClosed()||socket.isOutputShutdown())break;
? ? ? ? ? ? ? ?for (int c = GCS3.cursor;index!=c;index=(index+1)%99){
? ? ? ? ? ? ? ? ? ?pw.println(GCS3.history[index]);
? ? ? ? ? ? ? ?}

? ? ? ? ? ?}
? ? ? ?} catch (IOException | InterruptedException e) {
? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ?}
? ?}
}
class Receiver4 extends Thread{
? ?final Socket socket;

? ?public Receiver4(Socket socket) {
? ? ? ?this.socket = socket;
? ?}

? ?@Override
? ?public void run() {
? ? ? ?try(BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
? ? ? ? ? ?GCS3.push("用戶"+socket.getInetAddress()+":"+socket.getPort()+"加入群聊");
? ? ? ? ? ?while (true){
? ? ? ? ? ? ? ?String msg = br.readLine();
? ? ? ? ? ? ? ?if (msg == null) {
? ? ? ? ? ? ? ? ? ?GCS3.push("用戶"+socket.getInetAddress()+":"+socket.getPort()+"退出群聊");
? ? ? ? ? ? ? ? ? ?break;
? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?GCS3.push("用戶"+socket.getInetAddress()+":"+socket.getPort()+"說: "+msg);

? ? ? ? ? ?}
? ? ? ?} catch (IOException e) {
? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ?}

? ? ? ?try {
? ? ? ? ? ?socket.close();
? ? ? ?} catch (IOException e) {
? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ?}
? ?}
}

class GCS5 {
? ?static int total;
? ?//當(dāng)前通信的連接總數(shù)
? ?static int count;
? ?//對一段msg完成發(fā)送的Sender數(shù)
? ?static String msg = "加入群聊";
? ?static Socket microphone;

? ?public static void totalUp(){
? ? ? ?synchronized("total"){
? ? ? ? ? ?total++;
? ? ? ?}
? ?}
? ?public static void totalDown(){
? ? ? ?synchronized ("total"){
? ? ? ? ? ?total--;
? ? ? ? ? ?//修改同一數(shù)據(jù)時需要串行
? ? ? ?}
? ?}
? ?public static void countUp(){
? ? ? ?synchronized ("count"){
? ? ? ? ? ?count++;
? ? ? ?}
? ?}
? ?public static void push(String s,Socket socket){
? ? ? ?if (GCS5.count<GCS5.total) {
? ? ? ? ? ?synchronized ("r") {
? ? ? ? ? ? ? ?try {
? ? ? ? ? ? ? ? ? ?"r".wait();
? ? ? ? ? ? ? ? ? ?//每次放入新的消息前先等待,等著前一句全部發(fā)送完畢后被喚醒
? ? ? ? ? ? ? ? ? ?boolean b;
? ? ? ? ? ? ? ?} catch (InterruptedException e) {
? ? ? ? ? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ? ? ? ? ?}
? ? ? ? ? ?}
? ? ? ?}
? ? ? ?synchronized ("count"){
? ? ? ? ? ?count=0;
? ? ? ? ? ?msg = s;
? ? ? ? ? ?microphone = socket;
? ? ? ?}
? ? ? ?synchronized ("s"){
? ? ? ? ? ?"s".notifyAll();
? ? ? ?}
? ?}

? ?public static void main(String[] args) {
? ? ? ?try(ServerSocket ss = new ServerSocket(8888)) {
? ? ? ? ? ?System.out.println("已啟動");
? ? ? ? ? ?while (true){
? ? ? ? ? ? ? ?Socket socket = ss.accept();
? ? ? ? ? ? ? ?System.out.println("新的連接");
? ? ? ? ? ? ? ?new Receiver5(socket).start();
? ? ? ? ? ? ? ?new Sender5(socket).start();
? ? ? ? ? ?}
? ? ? ?} catch (IOException e) {
? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ?}
? ?}


}

class Receiver5 extends Thread{
? ?final Socket socket;

? ?public Receiver5(Socket socket) {
? ? ? ?this.socket = socket;
? ?}

? ?@Override
? ?public void run() {
? ? ? ?try(BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
? ? ? ? ? ?GCS5.push("用戶"+socket.getInetAddress()+":"+socket.getPort()+"加入群聊",socket);
? ? ? ? ? ?while (true){
? ? ? ? ? ? ? ?String msg = br.readLine();
? ? ? ? ? ? ? ?if (msg == null) {
? ? ? ? ? ? ? ? ? ?GCS5.push("用戶"+socket.getInetAddress()+":"+socket.getPort()+"退出群聊",socket);
? ? ? ? ? ? ? ? ? ?break;
? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?GCS5.push("用戶"+socket.getInetAddress()+":"+socket.getPort()+"說: "+msg,socket);
? ? ? ? ? ?}
? ? ? ?} catch (IOException e) {
? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ?}finally {
? ? ? ? ? ?if (socket != null) {
? ? ? ? ? ? ? ?try {
? ? ? ? ? ? ? ? ? ?socket.close();
? ? ? ? ? ? ? ?} catch (IOException e) {
? ? ? ? ? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ? ? ? ? ?}
? ? ? ? ? ?}
? ? ? ?}
? ? ? ?GCS5.totalDown();
? ?}


}
class Sender5 extends Thread{
? ?final Socket socket;

? ?public Sender5(Socket socket) {
? ? ? ?this.socket = socket;
? ?}

? ?@Override
? ?public void run() {
? ? ? ?try(PrintWriter pw = new PrintWriter(socket.getOutputStream(),true)) {
? ? ? ? ? ?pw.println("加入群聊");
? ? ? ? ? ?GCS5.totalUp();
? ? ? ? ? ?//Sender線程運行后再total++,使創(chuàng)建到運行這期間count不會卡在total-1
? ? ? ? ? ?while (!socket.isClosed()){
? ? ? ? ? ? ? ?GCS5.countUp();
? ? ? ? ? ? ? ?if (socket!=GCS5.microphone){
? ? ? ? ? ? ? ? ? ?pw.println(GCS5.msg);
? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?//先count++,如果Receiver線程正在更新msg,更新后count增加的1是新的msg的發(fā)送量
? ? ? ? ? ? ? ?if (GCS5.count>=GCS5.total){
? ? ? ? ? ? ? ? ? ?synchronized("r"){
? ? ? ? ? ? ? ? ? ? ? ?"r".notify();
? ? ? ? ? ? ? ? ? ? ? ?//當(dāng)Sender都發(fā)送完之后喚醒一個.wait()的Receiver。但如果這時沒有.wait()的Receiver,所有Sender全部.wait(),再收到新的msg時Receiver需要自己判斷count>=total不執(zhí)行.wait()
? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?synchronized ("s"){
? ? ? ? ? ? ? ? ? ?"s".wait();
? ? ? ? ? ? ? ?}
? ? ? ? ? ?}
? ? ? ?} catch (IOException | InterruptedException e) {
? ? ? ? ? ?throw new RuntimeException(e);
? ? ? ?}
? ?}
}

java實現(xiàn)Socket群聊的評論 (共 條)

分享到微博請遵守國家法律
新干县| 丰顺县| 天气| 辽源市| 进贤县| 兴仁县| 泸西县| 密云县| 黄平县| 潞西市| 武威市| 安乡县| 彭州市| 施秉县| 延边| 塔城市| 哈巴河县| 保亭| 临洮县| 扶风县| 瓮安县| 潮安县| 固始县| 连云港市| 多伦县| 凤冈县| 荔浦县| 莒南县| 双辽市| 霍林郭勒市| 云梦县| 阿城市| 都兰县| 湖北省| 永吉县| 定州市| 逊克县| 简阳市| 大连市| 吉林省| 宝应县|