Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@ public abstract class MyBizHandler<T> extends SimpleChannelInboundHandler<T> {

protected UserService userService;

public MyBizHandler(UserService userService) {
this.userService = userService;
public MyBizHandler(){
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, T msg) throws Exception {
channelRead(ctx.channel(), msg);
public MyBizHandler(UserService userService) {
this.userService = userService;
}

public abstract void channelRead(Channel channel, T msg);

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ protected void initChannel(SocketChannel channel) throws Exception {
//对象传输处理[解码]
channel.pipeline().addLast(new ObjDecoder());
// 在管道中添加我们自己的接收数据实现方法
channel.pipeline().addLast(new AddFriendHandler(userService));
channel.pipeline().addLast(new DelTalkHandler(userService));
channel.pipeline().addLast(new LoginHandler(userService));
channel.pipeline().addLast(new MsgGroupHandler(userService));
channel.pipeline().addLast(new MsgHandler(userService));
channel.pipeline().addLast(new ReconnectHandler(userService));
channel.pipeline().addLast(new SearchFriendHandler(userService));
channel.pipeline().addLast(new TalkNoticeHandler(userService));
// channel.pipeline().addLast(AddFriendHandler.getInstance(userService));
// channel.pipeline().addLast(DelTalkHandler.getInstance(userService));
// channel.pipeline().addLast(LoginHandler.getInstance(userService));
// channel.pipeline().addLast(MsgGroupHandler.getInstance(userService));
// channel.pipeline().addLast(MsgHandler.getInstance(userService));
// channel.pipeline().addLast(ReconnectHandler.getInstance(userService));
// channel.pipeline().addLast(SearchFriendHandler.getInstance(userService));
// channel.pipeline().addLast(TalkNoticeHandler.getInstance(userService));
channel.pipeline().addLast(IMHandler.getINSTANCE(userService));
//对象传输处理[编码]
channel.pipeline().addLast(new ObjEncoder());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.itstack.naive.chat.socket.handler;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import org.itstack.naive.chat.application.UserService;
import org.itstack.naive.chat.domain.user.model.UserInfo;
import org.itstack.naive.chat.infrastructure.common.SocketChannelUtil;
Expand All @@ -17,22 +19,29 @@
* 公众号:bugstack虫洞栈 | 沉淀、分享、成长,让自己和他人都能有所收获!
* create by 小傅哥 on @2020
*/
@ChannelHandler.Sharable
public class AddFriendHandler extends MyBizHandler<AddFriendRequest> {

public AddFriendHandler(UserService userService) {
super(userService);
private static final AddFriendHandler addFriendHandler = new AddFriendHandler();

private AddFriendHandler() {
}

public static AddFriendHandler getInstance(UserService userService){
addFriendHandler.userService = userService;
return addFriendHandler;
}

@Override
public void channelRead(Channel channel, AddFriendRequest msg) {
public void channelRead0(ChannelHandlerContext ctx, AddFriendRequest msg) {
// 1. 添加好友到数据库中[A->B B->A]
List<UserFriend> userFriendList = new ArrayList<>();
userFriendList.add(new UserFriend(msg.getUserId(), msg.getFriendId()));
userFriendList.add(new UserFriend(msg.getFriendId(), msg.getUserId()));
userService.addUserFriend(userFriendList);
// 2. 推送好友添加完成 A
UserInfo userInfo = userService.queryUserInfo(msg.getFriendId());
channel.writeAndFlush(new AddFriendResponse(userInfo.getUserId(), userInfo.getUserNickName(), userInfo.getUserHead()));
ctx.channel().writeAndFlush(new AddFriendResponse(userInfo.getUserId(), userInfo.getUserNickName(), userInfo.getUserHead()));
// 3. 推送好友添加完成 B
Channel friendChannel = SocketChannelUtil.getChannel(msg.getFriendId());
if (null == friendChannel) return;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.itstack.naive.chat.socket.handler;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import org.itstack.naive.chat.application.UserService;
import org.itstack.naive.chat.protocol.talk.DelTalkRequest;
import org.itstack.naive.chat.socket.MyBizHandler;
Expand All @@ -10,14 +12,21 @@
* 公众号:bugstack虫洞栈 | 沉淀、分享、成长,让自己和他人都能有所收获!
* create by 小傅哥 on @2020
*/
@ChannelHandler.Sharable
public class DelTalkHandler extends MyBizHandler<DelTalkRequest> {

public DelTalkHandler(UserService userService) {
super(userService);
private static final DelTalkHandler delTalkHandler = new DelTalkHandler();

private DelTalkHandler() {
}

public static DelTalkHandler getInstance(UserService userService){
delTalkHandler.userService = userService;
return delTalkHandler;
}

@Override
public void channelRead(Channel channel, DelTalkRequest msg) {
public void channelRead0(ChannelHandlerContext ctx, DelTalkRequest msg) {
userService.deleteUserTalk(msg.getUserId(), msg.getTalkId());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.itstack.naive.chat.socket.handler;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.itstack.naive.chat.application.UserService;
import org.itstack.naive.chat.protocol.Command;
import org.itstack.naive.chat.protocol.Packet;
import org.itstack.naive.chat.socket.MyBizHandler;

import java.util.HashMap;
import java.util.Map;

@ChannelHandler.Sharable
public class IMHandler extends MyBizHandler<Packet> {

private static volatile IMHandler INSTANCE = null;

private final Map<Byte, SimpleChannelInboundHandler<? extends Packet>> handlerMap = new HashMap<>();

private IMHandler(UserService userService){
handlerMap.put(Command.AddFriendRequest, AddFriendHandler.getInstance(userService));
handlerMap.put(Command.DelTalkRequest, DelTalkHandler.getInstance(userService));
handlerMap.put(Command.LoginRequest, LoginHandler.getInstance(userService));
handlerMap.put(Command.MsgGroupRequest, MsgGroupHandler.getInstance(userService));
handlerMap.put(Command.MsgRequest, MsgHandler.getInstance(userService));
handlerMap.put(Command.ReconnectRequest, ReconnectHandler.getInstance(userService));
handlerMap.put(Command.SearchFriendRequest, SearchFriendHandler.getInstance(userService));
handlerMap.put(Command.TalkNoticeRequest, TalkNoticeHandler.getInstance(userService));
}

public static IMHandler getINSTANCE(UserService userService) {
if (INSTANCE == null){
synchronized (IMHandler.class){
if (INSTANCE == null){
INSTANCE = new IMHandler(userService);
}
}
}
return INSTANCE;
}

@Override
public void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception {
handlerMap.get(msg.getCommand()).channelRead(ctx,msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import org.itstack.naive.chat.application.UserService;
import org.itstack.naive.chat.domain.user.model.*;
import org.itstack.naive.chat.infrastructure.common.Constants;
Expand All @@ -24,29 +26,36 @@
* <p>
* 登陆请求处理
*/
@ChannelHandler.Sharable
public class LoginHandler extends MyBizHandler<LoginRequest> {

public LoginHandler(UserService userService) {
super(userService);
private static final LoginHandler loginHandler = new LoginHandler();

private LoginHandler() {
}

public static LoginHandler getInstance(UserService userService){
loginHandler.userService = userService;
return loginHandler;
}

@Override
public void channelRead(Channel channel, LoginRequest msg) {
public void channelRead0(ChannelHandlerContext ctx, LoginRequest msg) {
logger.info("登陆请求处理:{} ", JSON.toJSONString(msg));
// 1. 登陆失败返回false
boolean auth = userService.checkAuth(msg.getUserId(), msg.getUserPassword());
if (!auth) {
// 传输消息
channel.writeAndFlush(new LoginResponse(false));
ctx.channel().writeAndFlush(new LoginResponse(false));
return;
}
// 2. 登陆成功绑定Channel
// 2.1 绑定用户ID
SocketChannelUtil.addChannel(msg.getUserId(), channel);
SocketChannelUtil.addChannel(msg.getUserId(), ctx.channel());
// 2.2 绑定群组
List<String> groupsIdList = userService.queryUserGroupsIdList(msg.getUserId());
for (String groupId : groupsIdList) {
SocketChannelUtil.addChannelGroup(groupId, channel);
SocketChannelUtil.addChannelGroup(groupId, ctx.channel());
}
// 3. 反馈消息;用户信息、用户对话框列表、好友列表、群组列表
// 组装消息包
Expand Down Expand Up @@ -136,7 +145,7 @@ else if (Constants.TalkType.Group.getCode().equals(talkBoxInfo.getTalkType())) {
loginResponse.setUserNickName(userInfo.getUserNickName());
loginResponse.setUserHead(userInfo.getUserHead());
// 传输消息
channel.writeAndFlush(loginResponse);
ctx.channel().writeAndFlush(loginResponse);
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.itstack.naive.chat.socket.handler;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.group.ChannelGroup;
import org.itstack.naive.chat.application.UserService;
import org.itstack.naive.chat.domain.user.model.ChatRecordInfo;
Expand All @@ -18,18 +20,25 @@
* <p>
* 群组消息发送
*/
@ChannelHandler.Sharable
public class MsgGroupHandler extends MyBizHandler<MsgGroupRequest> {

public MsgGroupHandler(UserService userService) {
super(userService);
private static final MsgGroupHandler msgGroupHandler = new MsgGroupHandler();

private MsgGroupHandler() {
}

public static MsgGroupHandler getInstance(UserService userService){
msgGroupHandler.userService = userService;
return msgGroupHandler;
}

@Override
public void channelRead(Channel channel, MsgGroupRequest msg) {
public void channelRead0(ChannelHandlerContext ctx, MsgGroupRequest msg) {
// 获取群组通信管道
ChannelGroup channelGroup = SocketChannelUtil.getChannelGroup(msg.getTalkId());
if (null == channelGroup) {
SocketChannelUtil.addChannelGroup(msg.getTalkId(), channel);
SocketChannelUtil.addChannelGroup(msg.getTalkId(), ctx.channel());
channelGroup = SocketChannelUtil.getChannelGroup(msg.getTalkId());
}
// 异步写库
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import org.itstack.naive.chat.application.UserService;
import org.itstack.naive.chat.domain.user.model.ChatRecordInfo;
import org.itstack.naive.chat.infrastructure.common.Constants;
Expand All @@ -18,14 +20,21 @@
* <p>
* 消息信息处理
*/
@ChannelHandler.Sharable
public class MsgHandler extends MyBizHandler<MsgRequest> {

public MsgHandler(UserService userService) {
super(userService);
private static final MsgHandler msgHandler = new MsgHandler();

private MsgHandler() {
}

public static MsgHandler getInstance(UserService userService){
msgHandler.userService = userService;
return msgHandler;
}

@Override
public void channelRead(Channel channel, MsgRequest msg) {
public void channelRead0(ChannelHandlerContext ctx, MsgRequest msg) {
logger.info("消息信息处理:{}", JSON.toJSONString(msg));
// 异步写库
userService.asyncAppendChatRecord(new ChatRecordInfo(msg.getUserId(), msg.getFriendId(), msg.getMsgText(), msg.getMsgType(), msg.getMsgDate()));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.itstack.naive.chat.socket.handler;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import org.itstack.naive.chat.application.UserService;
import org.itstack.naive.chat.infrastructure.common.SocketChannelUtil;
import org.itstack.naive.chat.protocol.login.ReconnectRequest;
Expand All @@ -16,22 +18,29 @@
* <p>
* 重连处理
*/
@ChannelHandler.Sharable
public class ReconnectHandler extends MyBizHandler<ReconnectRequest> {

public ReconnectHandler(UserService userService) {
super(userService);
private static final ReconnectHandler reconnectHandler = new ReconnectHandler();

private ReconnectHandler() {
}

public static ReconnectHandler getInstance(UserService userService){
reconnectHandler.userService = userService;
return reconnectHandler;
}

@Override
public void channelRead(Channel channel, ReconnectRequest msg) {
public void channelRead0(ChannelHandlerContext ctx, ReconnectRequest msg) {
logger.info("客户端断线重连处理。userId:{}", msg.getUserId());
// 添加用户Channel
SocketChannelUtil.removeUserChannelByUserId(msg.getUserId());
SocketChannelUtil.addChannel(msg.getUserId(), channel);
SocketChannelUtil.addChannel(msg.getUserId(), ctx.channel());
// 添加群组Channel
List<String> groupsIdList = userService.queryTalkBoxGroupsIdList(msg.getUserId());
for (String groupsId : groupsIdList) {
SocketChannelUtil.addChannelGroup(groupsId, channel);
SocketChannelUtil.addChannelGroup(groupsId, ctx.channel());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import org.itstack.naive.chat.application.UserService;
import org.itstack.naive.chat.domain.user.model.LuckUserInfo;
import org.itstack.naive.chat.protocol.friend.SearchFriendRequest;
Expand All @@ -17,14 +19,20 @@
* 公众号:bugstack虫洞栈 | 沉淀、分享、成长,让自己和他人都能有所收获!
* create by 小傅哥 on @2020
*/
@ChannelHandler.Sharable
public class SearchFriendHandler extends MyBizHandler<SearchFriendRequest> {

public SearchFriendHandler(UserService userService) {
super(userService);
private static final SearchFriendHandler searchFriendHandler = new SearchFriendHandler();

private SearchFriendHandler() {
}

public static SearchFriendHandler getInstance(UserService userService){
searchFriendHandler.userService = userService;
return searchFriendHandler;
}
@Override
public void channelRead(Channel channel, SearchFriendRequest msg) {
public void channelRead0(ChannelHandlerContext ctx, SearchFriendRequest msg) {
logger.info("搜索好友请求处理:{}", JSON.toJSONString(msg));
List<UserDto> userDtoList = new ArrayList<>();
List<LuckUserInfo> userInfoList = userService.queryFuzzyUserInfoList(msg.getUserId(), msg.getSearchKey());
Expand All @@ -38,7 +46,7 @@ public void channelRead(Channel channel, SearchFriendRequest msg) {
}
SearchFriendResponse response = new SearchFriendResponse();
response.setList(userDtoList);
channel.writeAndFlush(response);
ctx.channel().writeAndFlush(response);
}

}
Loading