实例介绍
【实例截图】

【核心代码】
private ExecutorService executorService1 = new ThreadPoolExecutor(
10,
20,
1L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
private ExecutorService executorService2 = new ThreadPoolExecutor(
10,
20,
1L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
private static AtomicInteger atomicInteger = new AtomicInteger(1);
private static AtomicInteger atomic = new AtomicInteger(1);
private static AtomicInteger atomic_23 = new AtomicInteger(1);
private static Lock lock_1 = new ReentrantLock();
private static Lock lock_2 = new ReentrantLock();
private static Lock lock_3 = new ReentrantLock();
private static Lock lock_4 = new ReentrantLock();
private static int count = 0;
//private final NettyMqttService nettyMqttService;
// 此处缓存一份设备的最新数据
Map<Object, Map<String, Object>> lastedEnv = new ConcurrentHashMap<Object, Map<String, Object>>();
/**
* 管理一个全局map,保存连接进服务端的通道数量
*/
private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();
@Override //数据读取完毕
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush 是 write flush
//将数据写入到缓存,并刷新
//一般讲,我们对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8));
}
/**
* 处理异常, 一般是需要关闭通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
/**
* @param ctx
* @author caobing
* @DESCRIPTION: 有客户端连接服务器会触发此函数
* @return: void
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
lock_1.lock();
try {
//获取连接通道唯一标识
ChannelId channelId = ctx.channel().id();
//如果map中不包含此连接,就保存连接
if (CHANNEL_MAP.containsKey(channelId)) {
log.info("客户端【" channelId "】是连接状态,连接通道数量: " CHANNEL_MAP.size());
} else {
//保存连接
CHANNEL_MAP.put(channelId, ctx);
log.info("客户端【" channelId "】连接netty服务器");
log.info("连接通道数量: " CHANNEL_MAP.size());
}
} finally {
lock_1.unlock();
}
}
/**
* @param ctx
* @author caobing
* @DESCRIPTION: 有客户端终止连接服务器会触发此函数
* @return: void
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
lock_2.lock();
try {
ChannelId channelId = ctx.channel().id();
//包含此客户端才去删除
if (CHANNEL_MAP.containsKey(channelId)) {
//删除连接
CHANNEL_MAP.remove(channelId);
System.out.println();
log.info("客户端【" channelId "】退出netty服务器");
log.info("连接通道数量: " CHANNEL_MAP.size());
}
} finally {
lock_2.unlock();
}
}
/**
* 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
* 2. Object msg: 就是客户端发送的数据 默认Object
* <p>
* 读取数据实际(这里我们可以读取客户端发送的消息)
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
lock_3.lock();
try {
//log.info("服务器读取线程 " Thread.currentThread().getName() " channle = " ctx.channel());
//log.info("进入服务端数据:" msg.toString());
// Channel channel = ctx.channel();
// //将 msg 转成一个 ByteBuf
// //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
ByteBuf buf = (ByteBuf) msg;
//得到此时客户端的数据长度
int bytes_length = buf.readableBytes();
// log.info("此时客户端的数据长度: " bytes_length);
//组件新的字节数组
byte[] buffer = new byte[bytes_length];
buf.readBytes(buffer);
// final String allData = NettyByteAndStringUtils.byteToHex(buffer);
//log.info("进入服务端数据:" allData);
String s = new String(buffer);
System.out.println("s:" count);
count ;
}
相关软件
小贴士
感谢您为本站写下的评论,您的评论对其它用户来说具有重要的参考价值,所以请认真填写。
- 类似“顶”、“沙发”之类没有营养的文字,对勤劳贡献的楼主来说是令人沮丧的反馈信息。
- 相信您也不想看到一排文字/表情墙,所以请不要反馈意义不大的重复字符,也请尽量不要纯表情的回复。
- 提问之前请再仔细看一遍楼主的说明,或许是您遗漏了。
- 请勿到处挖坑绊人、招贴广告。既占空间让人厌烦,又没人会搭理,于人于己都无利。
关于好例子网
本站旨在为广大IT学习爱好者提供一个非营利性互相学习交流分享平台。本站所有资源都可以被免费获取学习研究。本站资源来自网友分享,对搜索内容的合法性不具有预见性、识别性、控制性,仅供学习研究,请务必在下载后24小时内给予删除,不得用于其他任何用途,否则后果自负。基于互联网的特殊性,平台无法对用户传输的作品、信息、内容的权属或合法性、安全性、合规性、真实性、科学性、完整权、有效性等进行实质审查;无论平台是否已进行审查,用户均应自行承担因其传输的作品、信息、内容而可能或已经产生的侵权或权属纠纷等法律责任。本站所有资源不代表本站的观点或立场,基于网友分享,根据中国法律《信息网络传播权保护条例》第二十二与二十三条之规定,若资源存在侵权或相关问题请联系本站客服人员,点此联系我们。关于更多版权及免责申明参见 版权及免责申明
网友评论
我要评论