在13.1章节中,所有参与聊天的客户端必须连接到同一个聊天服务器上面,而且这个聊天服务只能启动一个实例,显然,它同时支持的在线客户端数量是有限的。如果客户端同时在线增多,只能通过提高服务器物理配置增加承载量,这样也只是治标不治本。要解决这个问题,只能使用集群部署,实现负载均衡,实现分而治之的策略
13.2.1 分布式世界聊天系统设计
分布式世界聊天系统最主要的问题是,所有的客户端目标服务并不是在同一个聊天服务实例上面。假如聊天服务的实例有两个或两个以上,根据负载均衡的算法,不同的客户端发送的消息会到达不同的聊天服务上面,在这个聊天服务上面转发的聊天信息,只能被这个服务上面的其它客户端接收。如图13.2所示。

图13.2
单服转发聊天信息 客户端A发送一条聊天信息,被网关负载到聊天服务A上面,由聊天服务A转发,那么客户端B和客户端C会收到消息,而客户端D和客户端E就收不到消息了。
要解决这个问题有两种方案,一是,改造网关,当收到聊天消息的时候,将此消息负载到所有的聊天服务实例上面,然后每个聊天服务实例再将消息转发到各自服务的客户端上面。二是,在聊天服务上面处理,某一个聊天服务收到客户端消息时,先不立即转发消息,而是将消息发布到所有的聊天服务上面,聊天服务收到发布的消息之后,再将消息转发到各自服务的客户端上面。
先看第一种方案,网关是一个公共的组件,它应该只负责自己的职责而不需要关心业务逻辑的实现。业务是千差万别的,不能因为业务的变更而影响到网关的功能。所以为了实现聊天功能而去修改网关的代码是不合适。第二种方案就比较灵活一些了。聊天服务的业务实现只在聊天服务代码中。某个聊天服务收到一个客户端的聊天信息时,向消息总线中固定的一个聊天消息发布Topic发布这个消息,所有的聊天服务都会监听这个Topic,收到消息之后,再将消息转发到服务的客户端上面,如图13.3所示。

图13.3 分布式聊天服务系统
比如客户A发送一条聊天信息,被网关负载到聊天服务A了,这个时候,A并不会立刻将聊天信息转发到客户端B和客户端C,而是先将聊天信息发布到消息总线服务,这时聊天服务A和聊天服务B会监听到发布的聊天消息,这时聊天服务A和聊天服务B才将聊天信息转发到所有的客户端上面。
13.2.2 创建单独的聊天项目
为了方便说明分布式世界聊天系统的开发与应用,这里单独创建一个聊天项目,后期可以打包为可运行的Jar包,启动多个服务实例。在my-game-server项目中创建子模块项目my-game-im,从my-game-xinyue项目中复制一份config配置,因为是一个新的项目,需要修改端口号(同一台机器端口号不能重复),spring.application.name为game-im(区别不同的项目),service-id为103(和其它服务不能重复),server-id为10301。在pom.xml中添加依赖和配置:
<dependencies>
<dependency>
<groupId>com.game</groupId>
<artifactId>my-game-gateway-message-starter</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<!-- 在这里配置Main方法所在的类 -->
<mainClass>com.mygame.im.GameIMMain</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
添加Build之后,就可以将my-game-im打包为可运行的Jar包。
由于新添加了聊天服务,所以客户端发送聊天的信息类也需要重新创建,为了区别之前的例子,这里使用IMSendIMMsgRequest,如下代码所示:
@GameMessageMetadata(messageId = 312, messageType = EnumMesasageType.REQUEST, serviceId = 103)
public class IMSendIMMsgRequest extends AbstractJsonGameMessage<SendIMMsgBody> {
public static class SendIMMsgBody {
private String chat;
private String sender;
public String getSender() {
return sender;
}
public void setSender(String sender) {
this.sender = sender;
}
public String getChat() {
return chat;
}
public void setChat(String chat) {
this.chat = chat;
}
}
@Override
protected Class<SendIMMsgBody> getBodyObjClass() {
return SendIMMsgBody.class;
}
}
这里需要注意的是messageId和serviceId,messageId不能和其它的messageId重复,serviceId必须是my-game-im项目配置的service-id。这样网关才能将客户端的请求消息负载到聊天服务上面。为了区别之前单服发送聊天信息的命令,在my-game-client的类IMClientCommand中添加一个新的命令,用于发送分布式聊天信息,如下代码所示:
@ShellMethod("send-chat chatmsg")
public void sendChat(@ShellOption String chatMsg) {
IMSendIMMsgRequest request = new IMSendIMMsgRequest();
request.getBodyObj().setChat(chatMsg);
request.getBodyObj().setSender(nickName);
gameClientBoot.getChannel().writeAndFlush(request);
}
然后在类EnterGameHandler添加接收分布式聊天服务转发的聊天信息,如下代码所示:
@GameMessageMapping(IMSendIMMsgeResponse.class)
public void chatMsgIM(IMSendIMMsgeResponse response,GameClientChannelContext ctx) {
logger.info("聊天信息-{}说:{}",response.getBodyObj().getSender(),response.getBodyObj().getChat());
}
13.2.3 实现聊天消息的发布与转发
添加一个新项目之后,首先是添加一个Handler,用于处理GameChannel的初始化,和收到客户端消息之后调用对应的处理方法。这里添加GameIMHandler类,继承AbstractGameMessageDispatchHandler,如下代码所示:
public class GameIMHandler extends AbstractGameMessageDispatchHandler<IMManager>{
private IMManager imManager;
public GameIMHandler(ApplicationContext applicationContext) {
super(applicationContext);
}
@Override
protected IMManager getDataManager() {
return imManager;
}
@Override
protected Future<Boolean> updateToRedis(Promise<Boolean> promise) {
promise.setSuccess(true);
return promise;
}
@Override
protected Future<Boolean> updateToDB(Promise<Boolean> promise) {
promise.setSuccess(true);
return promise;
}
@Override
protected void initData(AbstractGameChannelHandlerContext ctx, long playerId, GameChannelPromise promise) {
imManager = new IMManager();
promise.setSuccess();
}
}
因为目前聊天服务不需要初始化任何玩家的数据,所以上面的代码中的方法都返回成功,创建一个空的IMManager实例。将来如果有需要缓存的数据,可以放在IMManager类之中。然后添加IMLogicHandler类,用于接收客户端的消息。当收到客户端的消息之后,将消息封装,发布到Kafka服务之中,并另外监听发布的消息。如下代码所示:
@GameMessageHandler
public class IMLogicHandler {
@Autowired
private KafkaTemplate<String, byte[]> kafkaTemplate;
private final static String IM_TOPIC = "game-im-topic";
@Autowired
private GatewayMessageConsumerService gatewayMessageConsumerService;
//发布消息Kafka服务之中
private void publishMessage(ChatMessage chatMessage) {
String json = JSON.toJSONString(chatMessage);
try {
byte[] message = json.getBytes("utf8");
ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(IM_TOPIC, "IM", message);
kafkaTemplate.send(record);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
//这里需要注意的是groupId一定要不一样,因为kafka的机制是一个消息只能被同一个消费者组下的某个消费者消费一次。不同的服务实例的serverId不一样
@KafkaListener(topics = {IM_TOPIC},groupId= "IM-SERVER-" + "${game.server.config.server-id}")
public void messageListener(ConsumerRecord<String, byte[]> record) {
//监听聊天服务发布的信息,收到信息之后,将聊天信息转发到所有的客户端。
byte[] value = record.value();
try {
String json = new String(value,"utf8");
ChatMessage chatMessage = JSON.parseObject(json, ChatMessage.class);
IMSendIMMsgeResponse response = new IMSendIMMsgeResponse();
response.getBodyObj().setChat(chatMessage.getChatMessage());
response.getBodyObj().setSender(chatMessage.getNickName());
//因为这里不再GatewayMessageContext参数,所以这里使用总的GameChannel管理类,将消息广播出去 gatewayMessageConsumerService.getGameMessageEventDispatchService().broadcastMessage(response);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@GameMessageMapping(IMSendIMMsgRequest.class)//在这里接收客户端发送的聊天消息
public void chatMsg(IMSendIMMsgRequest request,GatewayMessageContext<IMManager> ctx) {
ChatMessage chatMessage = new ChatMessage();
chatMessage.setChatMessage(request.getBodyObj().getChat());
chatMessage.setNickName(request.getBodyObj().getSender());
chatMessage.setPlayerId(ctx.getPlayerId());
this.publishMessage(chatMessage);//收到客户端的聊天消息之后,把消息封装,发布到Kafka之中。
}
}
从上面的代码可以看出,不管启动多少个聊天服务的实例,客户端发送一条聊天信息,所有的聊天服务实例都可以监听到这个消息,然后将此聊天消息转发到每个聊天服务实例上面的所有客户端,这样所有的客户端都可以接收到聊天消息了。
13.2.4 分布式世界聊天服务测试
分布式世界聊天系统测试的时候比13.1.3测试中多启动一个单独的聊天服务。为了测试不同的客户端消息会被负载到不同的聊天服务实例上,需要启动两个或两个以上的聊天服务实例。在my-game-server目录下启动一个命令窗口,执行打包命令:
mvn clean install –Dmaven.test.skip=true
然后就可以在my-game-im的target目录下找到可执行的Jar包:my-game-im-0.0.1-SNAPSHOT.jar,由于此Jar运行的时候,也需要依赖config配置,所以把它们复制出来,放到同一个目录下面,如下结构所示:
├── config
│ ├── application.yml
│ └── log4j2.xml
└── my-game-im-0.0.1-SNAPSHOT.jar
然后打开命令窗口,并cd到my-game-im-0.0.1-SNAPSHOT.jar所在的目录,由于需要在同一台电脑上面启动两个聊天服务实例,所以在启动的时候需要指定不同的端口,如下面命令所示:
java -jar my-game-im-0.0.1-SNAPSHOT.jar --server.port=7001
java -jar my-game-im-0.0.1-SNAPSHOT.jar --server.port=7002
其它服务可以参考13.1.3章节启动。然后再启动两个或两个以上面的客户端,保证在不同的聊天服务实例中都会被网关负载到客户端的消息,分别执行如下命令:(1)客户端用户登陆命令:login test001 (如果服务端不存在用户test001,会自动创建,两个不同的客户端登陆名不能一样,比如另一个可以为test002)。
shell:>login test001
另一个客户端:
shell:>login test002
(2)登陆成功之后创建角色:create-player DaMao (创建角色,昵称为DaoMao,这里不要使用中文,昵称也不能相同,比如另一个客户端可以输入XiaoMing)。
shell:>create-player DaMao
另一个客户端:
shell:>create-player XiaoMing
(3)选择和连接网关:select-gateway (选择一个网关,并自动创建和认证连接)。
shell:>select-gateway
等日志中输出连接认证成功之后,使用新的命令发送聊天信息,如下所示:
shell>send-chat hello
可以看到在所有的客户端可以看到打印了聊天信息,说明消息转发成功。
本文节选自《Java游戏服务器架构实战》一书。

为了感谢读者的支持,现在抽奖送 6 本该书。
参与方式:扫描下面的微信公众号关注后回复关键字“游戏图书”参与抽奖。

读者也可以自己购买,点击下面的链接可以购买:

