JAVA和Nginx 教程大全

网站首页 > 精选教程 正文

Websocket入门与实例介绍 websocket入门教程

wys521 2024-10-16 14:51:17 精选教程 27 ℃ 0 评论

时光闹钟app开发者,请关注我,后续分享更精彩!

坚持原创,共同进步!

背景

工作中需要服务端监听远程终端程序的健康状态,管理端可以向终端进行定点或群发操作指令。这样的场景比较适合使用websocket技术,websocket具有双通道通信功能,客户端可以实时感知服务端推送消息。最近研究了下这门技术,记录下来,希望对有类似需要的小伙伴们有所帮助和借鉴。

Web socket介绍

什么是web socket

  • websocket协议本质上是一个基于tcp的协议之上的应用层协议
  • 实现了客户端与服务器全双工通信,能更好的节省服务器资源和带宽并达到实时通讯的目的

Web socket与http比较

相同点

  • 都是基于tcp的,都是可靠性传输协议
  • 都是应用层协议
  • WebSocket在建立握手时,数据是通过HTTP传输的。但是建立之后,在真正传输时候是不需要HTTP协议的

不同点

  • WebSocket是双向通信协议,模拟Socket协议,可以双向发送或接受信息。HTTP是单向的。
  • WebSocket通信需要客户端先和服务器握手建立连接,再进行数据交互。Http每次通信浏览器都要发起向服务器的连接

Web socket交互过程

  1. 首先,客户端发起http请求,经过3次握手后,建立起TCP连接;http请求里存放WebSocket支持的版本号等信息,如:Upgrade、Connection、WebSocket-Version等;
  1. 然后,服务器收到客户端的握手请求后,同样采用HTTP协议回馈数据;
  1. 最后,客户端收到连接成功的消息后,开始借助于TCP传输信道进行全双工通信。

Java-WebSocket方案

maven依赖

<dependency>
  <groupId>org.java-websocket</groupId>
  <artifactId>Java-WebSocket</artifactId>
  <version>1.5.3</version>
</dependency>

Server example

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;

import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;

public class SimpleServer extends WebSocketServer {

        public SimpleServer(InetSocketAddress address) {
                super(address);
        }

        @Override
        public void onOpen(WebSocket conn, ClientHandshake handshake) {
                conn.send("Welcome to the server!"); //This method sends a message to the new client
                broadcast( "new connection: " + handshake.getResourceDescriptor() ); //This method sends a message to all clients connected
                System.out.println("new connection to " + conn.getRemoteSocketAddress());
        }

        @Override
        public void onClose(WebSocket conn, int code, String reason, boolean remote) {
                System.out.println("closed " + conn.getRemoteSocketAddress() + " with exit code " + code + " additional info: " + reason);
        }

        @Override
        public void onMessage(WebSocket conn, String message) {
                System.out.println("received message from "        + conn.getRemoteSocketAddress() + ": " + message);
        }

        @Override
        public void onMessage( WebSocket conn, ByteBuffer message ) {
                System.out.println("received ByteBuffer from "        + conn.getRemoteSocketAddress());
        }

        @Override
        public void onError(WebSocket conn, Exception ex) {
                System.err.println("an error occurred on connection " + conn.getRemoteSocketAddress()  + ":" + ex);
        }
        
        @Override
        public void onStart() {
                System.out.println("server started successfully");
        }


        public static void main(String[] args) {
                String host = "localhost";
                int port = 8887;

                WebSocketServer server = new SimpleServer(new InetSocketAddress(host, port));
                server.run();
        }
}

Client example

import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;

import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;

public class EmptyClient extends WebSocketClient {

        public EmptyClient(URI serverUri, Draft draft) {
                super(serverUri, draft);
        }

        public EmptyClient(URI serverURI) {
                super(serverURI);
        }

        @Override
        public void onOpen(ServerHandshake handshakedata) {
                send("Hello, it is me. Mario :)");
                System.out.println("new connection opened");
        }

        @Override
        public void onClose(int code, String reason, boolean remote) {
                System.out.println("closed with exit code " + code + " additional info: " + reason);
        }

        @Override
        public void onMessage(String message) {
                System.out.println("received message: " + message);
        }

        @Override
        public void onMessage(ByteBuffer message) {
                System.out.println("received ByteBuffer");
        }

        @Override
        public void onError(Exception ex) {
                System.err.println("an error occurred:" + ex);
        }

        public static void main(String[] args) throws URISyntaxException {                
                WebSocketClient client = new EmptyClient(new URI("ws://localhost:8887"));
                //签名证书连接
                //WebSocketClient client = new EmptyClient(new URI("wss://echo.websocket.org"));
                client.connect();
        }
}

Spring websocket方案

Maven依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

Server example

继承TextWebSocketHandler,自定义webSocket消息处理的ChatServerMessageHandler类。类中覆盖TextWebSocketHandler中的连接建立/关闭、消息监听等方法。

public class ChatServerMessageHandler extends TextWebSocketHandler {

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        //建立connector连接
        super.afterConnectionEstablished(session);
        log.info("new connection is created, id={}.",session.getId());
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        //connector连接关闭
        super.afterConnectionClosed(session, status);
        log.info("connection is close, id={}.",session.getId());
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        super.handleTextMessage(session, message);
        //监听业务消息处理...
        //handleServerBizMsg(message,session);
    }
}

注册自定义handler

@Configuration
@EnableWebSocket
public class WebSocketServerConfig implements WebSocketConfigurer {
    @Autowired
    ChatServerMessageHandler chatServerMessageHandler;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
        //绑定handler消息处理类,绑定path "/chat"
        webSocketHandlerRegistry.addHandler(chatServerMessageHandler, "/chat");
    }
}

Client example

和sever端自定义handler类似,继承TextWebSocketHandler,扩展覆盖对应方法

public class ChatClientMessageHandler extends TextWebSocketHandler {

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        //建立connector连接
        super.afterConnectionEstablished(session);
        log.info("new connection is created, id={}.",session.getId());
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        //connector连接关闭
        super.afterConnectionClosed(session, status);
        //TODO: 连接断开,自定重连实现...
        log.info("connection is close, id={}.",session.getId());
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        super.handleTextMessage(session, message);
        //监听业务消息处理...
        //handleServerBizMsg(message,session);
    }
}

client端发起ws连接

@Component
@Slf4j
public class WebSocketClientConnection {
    private WebSocketSession webSocketSession;
    
    //自定义handler
    @Autowired
    ChatClientMessageHandler clientMessageHandler;
    
    @PostConstruct
    public void start() throws ExecutionException, InterruptedException {
        WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
        //ws server端启动ip+port
        URI uri = URI.create("ws://localhost:8080/chat");
        //签名证书连接
        //URI uri = URI.create("wss://localhost/chat");
        
        //采用同步方式.get()拿webSocketSession对象,缓存起来,便于后续业务发送消息使用
        webSocketSession = this.wsClient.doHandshake(clientMessageHandler, headers, uri).get();
        log.info("---> start to connect webSocket server...");
    }
}

Nginx ws 代理

Nginx.conf 配置

# http模块下添加/修改以下配置
upstream wsbackend{
     server 192.168.56.11:8081;
     server 192.168.56.12:8081;
}

server {
      listen   80;
      server_name localhost;
      location /chat {
            proxy_pass   http://wsbackend; #代理转发地址
            proxy_http_version 1.1;
            proxy_read_timeout   3600s; #超时设置
            #启用支持websocket连接
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
      }
}

Nginx docker

docker run \
    --name nginx \
    -u root \
    -p 80:80 \
    --restart=always \
    -d \
    -v /home/nginx/content/:/home/nginx/content/ \
    -v /home/nginx/log/:/var/log/nginx/ \
    -v /home/nginx/conf/nginx.conf:/etc/nginx/nginx.conf \
    -v /home/nginx/conf/conf.d:/etc/nginx/conf.d/ \
   nginx


ws集群方案

websocket建立连接后需要做会话保持,用于后续数据的双向通信。由于session会话与建立连接的server是绑定的,两者相互对应。但在企业应用中,为保证业务的高可用,服务的集群多节点部署几乎是一个必选项。如何实现ws server端的多节点部署,同时业务上又满足服务端到客户端的群发或定点推送消息要求呢?为了满足这一目的,需要满足以下几个关键点:

  • 通过redis实现消息的发布/订阅通知。所有ws server实例监听订阅一个渠道。需要发布一个通知时(群发/定点推送),往这个渠道推送消息。订阅方拿到这个消息,业务上根据条件自行筛选触发消息推送。
  • Ws client/server需实时监听ws连接的session状态,当出现open/close等事件时,实时的缓存/清理相关上下文数据。

ws关键功能点

  • Client -> server 推送消息
  • Client连接关闭自动重连

监听connection close事件,客户端基于策略自动重连。

  • Server -> client 定点推送消息

Client与server握手成功建立连接,在服务端缓存Client业务clientId和session的关联关系。业务推送消息时,通过clientId查找对应session连接,将消息推送到具体client端。

  • Server -> client 群发消息

群发消息就是定点消息推送的批量处理。服务端维护已建立连接的session列表,通过列表查找对应session,将消息推送到相应端。

  • Server集群模式部署

参见ws集群方案章节

  • Server端前置代理(Nginx)ws协议转发
  • Server端http/ws端口共享和服务共存

Java-WebSocket库:轻量级,依赖少。server端只支持ws服务,http和ws不能共享一个server实例。官方短期无支持计划。

Spring webSocket库:相对Java-WebSocket更重一些。功能支持更强。server端ws和http可共享一个端口,一个server实例上既支持http服务,也支持ws服务。

  • Server端ws ssl加密通信

可以在server前置代理端统一设置,如slb/Nginx。ws的加密通信在3次握手建立连接时实现,配置方式和http ssl类似,网上资料很多,这里不再赘述。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表