大家好,欢迎来到IT知识分享网。
提到通信就得面临两个问题,一是通信协议的选择,二是数据协议的定义。通信协议耳熟能详的就有好几种,TCP,UDP,HTTP,FTP等等。数据协议是一种数据交换的格式,像jason,xml,amf3,google protocol都可以用作数据协议,你也可以自己根据通信的效率,安全等因素来定义自己的数据协议。
通信系统的开发是一项很复杂的工作,不要以为往发服务端发一个Hello World!就认为完全掌握了通信系统的开发。概括来说要开发一个健壮的通信系统,必须从这几个方面来着手。
一,通信粘包的处理
这里包的概念是逻辑上的数据包,也就是我们发送的一个完整业务消息包,粘包情况有两种,一种是粘在一起的包都是完整的数据包,另一种情况是粘在一起的包有不完整的包。不是所有的粘包现象都需要处理,若传输的数据为不带结构的连续流数据(如文件传输),则不必把粘连的包分开(简称分包)。但在实际工程应用中,传输的数据一般为带结构的数据,这时就需要做分包处理。
为了避免粘包现象,可采取以下几种措施。一是对于发送方引起的粘包现象,用户可通过编程设置来避免,TCP提供了强制数据立即传送的操作指令push,TCP软件收到该操作指令后,就立即将本段数据发送出去,而不必等待发送缓冲区满;二是对于接收方引起的粘包,则可通过优化程序设计、精简接收进程工作量、提高接收进程优先级等措施,使其及时接收数据,从而尽量避免出现粘包现象;三是由接收方控制,将一包数据按结构字段,人为控制分多次接收,然后合并,通过这种手段来避免粘包。
以上提到的三种措施,都有其不足之处。总的来说降低了通信系统的吞吐量。我们可以自己设计一个分包算法来处理粘包的问题,该算法的实现是这样的:
- 当有数据到达时,将数据压入程序缓冲区。
- 循环处理缓冲区,如果缓冲区长度大于包头长度,则取出长度信息n,否则跳出循环,如果缓冲区的长度大于n,则从缓冲区取出一个完整包进行处理,否则跳出循环。
如果你是Java的爱好都可以参考一下Mina和netty2的实现,像Mina和Netty2都提供了粘包处理类可供使用,像Mina的CumulativeProtocolDecoder类,Netty2的LengthFieldBasedFrameDecoder。
二,数据协议选择
现在已经有很多数据协议可供我们选择,像jason,xml,amf3,google protocol等等,这些协议相应的语言都有API来对自身数据做协议处理,我们选择协标准无非就是效率和大小,这里每个人可以根据实际的应用环境选择适合的数据协议。
三,网络系统的安全性
网络安全是一个永远的话题,对通信数据加密一般常RSA对byte流加密,FLOOD验证,IP黑名单验证都是必须考虑到的。
以上是做网络开发必须了解的一些基础知识,在这里我们使用一个具体的实例来加深一下理解,Java与Flex使用AMF3数据协议通信。做过网络开发的一般都会知道套接字(SOCKET),很多语言都会通SOCKET来提供对网络操作的API,Java的提供的NIO SOCKET是一个高效的异步通信API,当然可以在这个基础上来开发我们的网络应用,但这种Native API需要我们花很多精力来处理网络通信的细节,消弱了我们对业务的关心。为我们开发带来很多不便性,幸好Java有很多现成的NIO SOCKET框架可供使用,像Mina,Netty2,xSocket等等,这些框架处理了很多底层的通信问题,提供了一些易用的API以供使用。在这个实例中我们使用Netty2来做通信框架。
定义消息包,消息包有定长包和不定长包,不定长包无非就是要在消息包中加入长度信息,以对收到的网络字节流进行分界。消息包的定义如下
定义AMF3数据协议的编码和解码器
AMF3Encoder
/*
2
* @(#)AMF3Encoder.java 0.1 05/11/17
3
*
4
* Copyright 2010 QISI, Inc. All rights reserved.
5
* QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6
*/
7
package
com.qidea.pushserver.codec;
8
import
java.io.ByteArrayOutputStream;
9
import
org.jboss.netty.buffer.ChannelBuffer;
10
import
org.jboss.netty.buffer.ChannelBuffers;
11
import
org.jboss.netty.channel.Channel;
12
import
org.jboss.netty.channel.ChannelHandlerContext;
13
import
org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
14
import
com.qidea.pushserver.Constants;
15
import
com.qidea.pushserver.message.CommandMessage;
16
import
com.qidea.pushserver.message.PushMessage;
17
import
flex.messaging.io.SerializationContext;
18
import
flex.messaging.io.amf.Amf3Output;
19
/**
20
*
21
*
@author
sunwei
22
*
@version
2010-7-21
23
*
@since
JDK1.5
24
*/
25
public
class
AMF3Encoder
extends
OneToOneEncoder
26
{
27
/**
28
*
29
*/
30
@Override
31
protected
Object encode(ChannelHandlerContext arg0, Channel arg1,
32
Object arg2)
throws
Exception
33
{
34
ByteArrayOutputStream stream
=
new
ByteArrayOutputStream();
35
SerializationContext serializationContext
=
new
SerializationContext();
36
Amf3Output amf3Output
=
new
Amf3Output(serializationContext);
37
amf3Output.setOutputStream(stream);
38
amf3Output.writeObject(arg2);
39
byte
[] objSe
=
stream.toByteArray();
40
if
(objSe
!=
null
&&
objSe.length
>
0
)
41
{
42
ChannelBuffer buffer
=
ChannelBuffers.buffer(objSe.length
+
8
);
43
if
(arg2
instanceof
PushMessage)
44
buffer.writeInt(Constants.MAGIC_NUM_PUSH_MSG);
45
else
if
(arg2
instanceof
CommandMessage)
46
buffer.writeInt(Constants.MAGIC_NUM_COMMAND_MSG);
47
buffer.writeInt(objSe.length);
48
buffer.writeBytes(objSe);
49
return
buffer;
50
}
51
return
null
;
52
}
53
}
AMF3Decoder
/*
2
* @(#)AMF3Decoder.java 0.1 05/11/17
3
*
4
* Copyright 2010 QISI, Inc. All rights reserved.
5
* QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6
*/
7
package
com.qidea.pushserver.codec;
8
import
java.io.ByteArrayInputStream;
9
import
org.jboss.netty.buffer.ChannelBuffer;
10
import
org.jboss.netty.channel.Channel;
11
import
org.jboss.netty.channel.ChannelHandlerContext;
12
import
org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
13
import
org.slf4j.Logger;
14
import
org.slf4j.LoggerFactory;
15
import
flex.messaging.io.SerializationContext;
16
import
flex.messaging.io.amf.Amf3Input;
17
/**
18
* amf3协议解码类
19
*
20
*
@author
sunwei
21
*
@version
2010-7-21
22
*
@since
JDK1.5
23
*/
24
public
class
AMF3Decoder
extends
LengthFieldBasedFrameDecoder
25
{
26
public
static
final
Logger logger
=
LoggerFactory
27
.getLogger(AMF3Decoder.
class
);
28
/**
29
*
30
*
@param
maxFrameLength
31
* 包的最大大小
32
*
@param
lengthFieldOffset
33
* 包头信息,长度的偏移位
34
*
@param
lengthFieldLength
35
* 包头信息,长度位数
36
*/
37
public
AMF3Decoder(
int
maxFrameLength,
int
lengthFieldOffset,
38
int
lengthFieldLength)
39
{
40
super
(maxFrameLength, lengthFieldOffset, lengthFieldLength);
41
}
42
/**
43
*
44
*
@param
maxFrameLength
45
*/
46
public
AMF3Decoder(
int
maxFrameLength)
47
{
48
super
(maxFrameLength,
4
,
4
,
0
,
0
);
49
}
50
/**
51
*
52
*/
53
@Override
54
protected
Object decode(ChannelHandlerContext ctx, Channel channel,
55
ChannelBuffer buffer)
throws
Exception
56
{
57
ChannelBuffer frame
=
(ChannelBuffer)
super
58
.decode(ctx, channel, buffer);
59
if
(frame
==
null
)
60
{
61
return
null
;
62
}
63
//
64
int
magicNum
=
frame.readInt();
65
int
dataLength
=
frame.readInt();
66
logger.info(
“
magic num={},data length={}
“
, magicNum, dataLength);
67
//
读AMF3字节流的内容
68
byte
[] content
=
new
byte
[frame.readableBytes()];
69
frame.readBytes(content);
70
SerializationContext serializationContext
=
new
SerializationContext();
71
Amf3Input amf3Input
=
new
Amf3Input(serializationContext);
72
amf3Input.setInputStream(
new
ByteArrayInputStream(content));
73
Object message
=
amf3Input.readObject();
74
return
message;
75
}
76
}
77
构建服务端
PushProtocolHandler
public
class
PushProtocolHandler
extends
SimpleChannelHandler
2
{
3
public
static
Logger log
=
LoggerFactory
4
.getLogger(PushProtocolHandler.
class
);
5
/**
6
*
7
*/
8
@Override
9
public
void
messageReceived(ChannelHandlerContext ctx, MessageEvent e)
10
{
11
if
(e.getMessage()
!=
null
)
12
{
13
ChannelManager channelManager
=
PushServerContext
14
.getBean(ChannelManager.
class
);
15
if
(e.getMessage()
instanceof
CommandMessage)
16
{
17
channelManager.handleMsg((CommandMessage) e.getMessage(), e
18
.getChannel());
19
}
20
else
if
(e.getMessage()
instanceof
PushMessage)
21
{
22
channelManager.handleMsg((PushMessage) e.getMessage(), e
23
.getChannel());
24
}
25
else
26
{
27
log.warn(
“
unkown message {}
“
, e);
28
}
29
}
30
}
31 }
PushServerPipelineFactory
import
static
org.jboss.netty.channel.Channels.
*
;
2
/**
3
*
4
*
@author
sunwei
5
*
@version
2010-7-22
6
*
@since
JDK1.5
7
*/
8
public
class
PushServerPipelineFactory
implements
ChannelPipelineFactory
9
{
10
@Override
11
public
ChannelPipeline getPipeline()
throws
Exception
12
{
13
ChannelPipeline pipeline
=
pipeline();
14
//
处理日志
15
pipeline.addLast(
“
logger
“
,
new
LoggingHandler());
16
//
处理coder
17
pipeline.addLast(
“
decoder
“
,
new
AMF3Decoder(Constants.MAX_OBJECT_SIZE));
18
pipeline.addLast(
“
encoder
“
,
new
AMF3Encoder());
19
//
20
pipeline.addLast(
“
handler
“
,
new
PushProtocolHandler());
21
//
22
return
pipeline;
23
}
24
}
ServerMain
public
static
main(String[] args)
2
{
3
//
开始NIO线程
4
ChannelFactory factory
=
new
NioServerSocketChannelFactory(Executors
5
.newCachedThreadPool(), Executors.newCachedThreadPool());
6
//
服务启始点
7
ServerBootstrap bootstrap
=
new
ServerBootstrap(factory);
8
//
处理过滤器
9
bootstrap.setPipelineFactory(
new
PushServerPipelineFactory());
10
//
设置相关参数
11
bootstrap.setOption(
“
child.tcpNoDelay
“
,
true
);
12
//
设置相关参数
13
bootstrap.setOption(
“
child.keepAlive
“
,
true
);
14
//
绑定相关端口
15
bootstrap.bind(
new
InetSocketAddress(getPushPort()));
16
}
Flex客户端
FlexSocket
public
class
FlexSocket
2
{
3
4
//
发送包
5
public
function send(type:
int
, obj:PushMessage):Boolean
6
{
7
if
(_socket
==
null
)
8
{
9
return
false
;
10
}
11
//
手动限制不给发送的时候用
12
if
(socketState
==
socket_state_closed
||
socketState
==
socket_state_connecting)
13
{
14
return
false
;
15
}
16
if
(
!
_socket.connected)
17
{
18
return
false
;
19
}
20
var byteArr:ByteArray
=
objToByteaArray(obj);
21
var msgHead:MsgHead
=
new
MsgHead(type, byteArr.length);
22
sendMsg(msgHead.getType(), msgHead.getSize(), byteArr);
23
return
true
;
24
}
25
26
//
接受包
27
private
function getDataHandler(e:ProgressEvent):
void
28
{
29
_timeServerDead.stop();
30
_timeServerDead.reset();
31
if
(_socket.bytesAvailable
>=
8
&&
!
_isReadHead)
32
{
33
_recvPackageType
=
_socket.readInt();
34
//
同意关闭
35
//
if(_recvPackageType == 5)
36
//
{
37
//
close();
38
//
}
39
_recvPackageSize
=
_socket.readInt();
40
_isReadHead
=
true
;
41
}
42
if
(_isReadHead
&&
_socket.bytesAvailable
>=
_recvPackageSize)
43
{
44
var
byte
:ByteArray
=
new
ByteArray();
45
_socket.readBytes(
byte
,
0
, _recvPackageSize);
46
_msgObj
=
byteArraytoObject(
byte
);
47
//
暂时用上面一种
48
if
(_recvPackageType
==
packageType.LOGIN_TYPE)
49
{
50
if
(_msgObj.ret
==
bodyType.RECEIVE_OK)
51
{
52
_timerDetectSocket.start();
53
socketState
=
socket_state_connected;
54
myEventDispatch.Instence().dispatcher(bodyType.INLINE_CURRENTSOCKETSTATE);
55
}
56
else
if
(_msgObj.ret
==
bodyType.RECEIVE_ERROR)
57
{
58
close();
59
}
60
61
}
62
else
if
(_recvPackageType
==
packageType.CHAT_TYPE)
63
{
64
myEventDispatch.Instence().dispatcher(selectEventName(_recvPackageType), _msgObj);
65
}
66
_recvPackageSize
=
0
;
67
_recvPackageType
=
0
;
68
_msgObj
=
null
;
69
_isReadHead
=
false
;
70
71
72
}
73
}
74
75
}
有关Mina的实现,你可以通过本博客向我索取相关代码。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/33731.html