Java与Flex通信

Java与Flex通信提到通信就得面临两个问题,一是通信协议的选择,二是数据协议的定义。通信协议耳熟能详的就有好几种,TCP,UDP,HTTP,FTP等等。数据协议是一种数据交换的格式,像jason,xml,amf3,googleprotocol都可以用作数据协议,你也可以自己根据通信的效率,安全等因素来定义自己的数据

大家好,欢迎来到IT知识分享网。

  提到通信就得面临两个问题,一是通信协议的选择,二是数据协议的定义。通信协议耳熟能详的就有好几种,TCPUDPHTTPFTP等等。数据协议是一种数据交换的格式,像jason,xml,amf3,google protocol都可以用作数据协议,你也可以自己根据通信的效率,安全等因素来定义自己的数据协议。

  通信系统的开发是一项很复杂的工作,不要以为往发服务端发一个Hello World!就认为完全掌握了通信系统的开发。概括来说要开发一个健壮的通信系统,必须从这几个方面来着手。

  一,通信粘包的处理

  这里包的概念是逻辑上的数据包,也就是我们发送的一个完整业务消息包,粘包情况有两种,一种是粘在一起的包都是完整的数据包,另一种情况是粘在一起的包有不完整的包。不是所有的粘包现象都需要处理,若传输的数据为不带结构的连续流数据(如文件传输),则不必把粘连的包分开(简称分包)。但在实际工程应用中,传输的数据一般为带结构的数据,这时就需要做分包处理。

  为了避免粘包现象,可采取以下几种措施。一是对于发送方引起的粘包现象,用户可通过编程设置来避免,TCP提供了强制数据立即传送的操作指令pushTCP软件收到该操作指令后,就立即将本段数据发送出去,而不必等待发送缓冲区满;二是对于接收方引起的粘包,则可通过优化程序设计、精简接收进程工作量、提高接收进程优先级等措施,使其及时接收数据,从而尽量避免出现粘包现象;三是由接收方控制,将一包数据按结构字段,人为控制分多次接收,然后合并,通过这种手段来避免粘包。

  以上提到的三种措施,都有其不足之处。总的来说降低了通信系统的吞吐量。我们可以自己设计一个分包算法来处理粘包的问题,该算法的实现是这样的:

  1. 当有数据到达时,将数据压入程序缓冲区。
  2. 循环处理缓冲区,如果缓冲区长度大于包头长度,则取出长度信息n,否则跳出循环,如果缓冲区的长度大于n,则从缓冲区取出一个完整包进行处理,否则跳出循环。

如果你是Java的爱好都可以参考一下Mina和netty2的实现,像Mina和Netty2都提供了粘包处理类可供使用,像Mina的CumulativeProtocolDecoder类,Netty2的LengthFieldBasedFrameDecoder。

  二,数据协议选择

  现在已经有很多数据协议可供我们选择,像jason,xml,amf3,google protocol等等,这些协议相应的语言都有API来对自身数据做协议处理,我们选择协标准无非就是效率和大小,这里每个人可以根据实际的应用环境选择适合的数据协议。

  三,网络系统的安全性

  网络安全是一个永远的话题,对通信数据加密一般常RSA对byte流加密,FLOOD验证,IP黑名单验证都是必须考虑到的。

  以上是做网络开发必须了解的一些基础知识,在这里我们使用一个具体的实例来加深一下理解,Java与Flex使用AMF3数据协议通信。做过网络开发的一般都会知道套接字(SOCKET),很多语言都会通SOCKET来提供对网络操作的API,Java的提供的NIO SOCKET是一个高效的异步通信API,当然可以在这个基础上来开发我们的网络应用,但这种Native API需要我们花很多精力来处理网络通信的细节,消弱了我们对业务的关心。为我们开发带来很多不便性,幸好Java有很多现成的NIO SOCKET框架可供使用,MinaNetty2xSocket等等,这些框架处理了很多底层的通信问题,提供了一些易用的API以供使用。在这个实例中我们使用Netty2来做通信框架。

  定义消息包,消息包有定长包和不定长包,不定长包无非就是要在消息包中加入长度信息,以对收到的网络字节流进行分界。消息包的定义如下

Java与Flex通信

  定义AMF3数据协议的编码和解码器 

Java与Flex通信
Java与Flex通信
AMF3Encoder

 1 
/*


 2 
 * @(#)AMF3Encoder.java    0.1 05/11/17

 3 
 *

 4 
 * Copyright 2010 QISI, Inc. All rights reserved.

 5 
 * QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.

 6 
 
*/


 7 

package
 com.qidea.pushserver.codec;

 8 

import
 java.io.ByteArrayOutputStream;

 9 

import
 org.jboss.netty.buffer.ChannelBuffer;

10 

import
 org.jboss.netty.buffer.ChannelBuffers;

11 

import
 org.jboss.netty.channel.Channel;

12 

import
 org.jboss.netty.channel.ChannelHandlerContext;

13 

import
 org.jboss.netty.handler.codec.oneone.OneToOneEncoder;

14 

import
 com.qidea.pushserver.Constants;

15 

import
 com.qidea.pushserver.message.CommandMessage;

16 

import
 com.qidea.pushserver.message.PushMessage;

17 

import
 flex.messaging.io.SerializationContext;

18 

import
 flex.messaging.io.amf.Amf3Output;

19 

/**


20 
 * 

21 
 * 
@author
 sunwei

22 
 * 
@version
 2010-7-21

23 
 * 
@since
 JDK1.5

24 
 
*/


25 

public
 
class
 AMF3Encoder 
extends
 OneToOneEncoder

26 
{

27 
    
/**


28 
     * 

29 
     
*/


30 
    @Override

31 
    
protected
 Object encode(ChannelHandlerContext arg0, Channel arg1,

32 
            Object arg2) 
throws
 Exception

33 
    {

34 
        ByteArrayOutputStream stream 
=
 
new
 ByteArrayOutputStream();

35 
        SerializationContext serializationContext 
=
 
new
 SerializationContext();

36 
        Amf3Output amf3Output 
=
 
new
 Amf3Output(serializationContext);

37 
        amf3Output.setOutputStream(stream);

38 
        amf3Output.writeObject(arg2);

39 
        
byte
[] objSe 
=
 stream.toByteArray();

40 
        
if
 (objSe 
!=
 
null
 
&&
 objSe.length 
>
 
0
)

41 
        {

42 
            ChannelBuffer buffer 
=
 ChannelBuffers.buffer(objSe.length 
+
 
8
);

43 
            
if
 (arg2 
instanceof
 PushMessage)

44 
                buffer.writeInt(Constants.MAGIC_NUM_PUSH_MSG);

45 
            
else
 
if
 (arg2 
instanceof
 CommandMessage)

46 
                buffer.writeInt(Constants.MAGIC_NUM_COMMAND_MSG);

47 
            buffer.writeInt(objSe.length);

48 
            buffer.writeBytes(objSe);

49 
            
return
 buffer;

50 
        }

51 
        
return
 
null
;

52 
    }

53 
}

Java与Flex通信
Java与Flex通信
AMF3Decoder

 1 
/*


 2 
 * @(#)AMF3Decoder.java    0.1 05/11/17

 3 
 *

 4 
 * Copyright 2010 QISI, Inc. All rights reserved.

 5 
 * QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.

 6 
 
*/


 7 

package
 com.qidea.pushserver.codec;

 8 

import
 java.io.ByteArrayInputStream;

 9 

import
 org.jboss.netty.buffer.ChannelBuffer;

10 

import
 org.jboss.netty.channel.Channel;

11 

import
 org.jboss.netty.channel.ChannelHandlerContext;

12 

import
 org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;

13 

import
 org.slf4j.Logger;

14 

import
 org.slf4j.LoggerFactory;

15 

import
 flex.messaging.io.SerializationContext;

16 

import
 flex.messaging.io.amf.Amf3Input;

17 

/**


18 
 * amf3协议解码类

19 
 * 

20 
 * 
@author
 sunwei

21 
 * 
@version
 2010-7-21

22 
 * 
@since
 JDK1.5

23 
 
*/


24 

public
 
class
 AMF3Decoder 
extends
 LengthFieldBasedFrameDecoder

25 
{

26 
    
public
 
static
 
final
 Logger logger 
=
 LoggerFactory

27 
            .getLogger(AMF3Decoder.
class
);

28 
    
/**


29 
     * 

30 
     * 
@param
 maxFrameLength

31 
     *            包的最大大小

32 
     * 
@param
 lengthFieldOffset

33 
     *            包头信息,长度的偏移位

34 
     * 
@param
 lengthFieldLength

35 
     *            包头信息,长度位数

36 
     
*/


37 
    
public
 AMF3Decoder(
int
 maxFrameLength, 
int
 lengthFieldOffset,

38 
            
int
 lengthFieldLength)

39 
    {

40 
        
super
(maxFrameLength, lengthFieldOffset, lengthFieldLength);

41 
    }

42 
    
/**


43 
     * 

44 
     * 
@param
 maxFrameLength

45 
     
*/


46 
    
public
 AMF3Decoder(
int
 maxFrameLength)

47 
    {

48 
        
super
(maxFrameLength, 
4

4

0

0
);

49 
    }

50 
    
/**


51 
     * 

52 
     
*/


53 
    @Override

54 
    
protected
 Object decode(ChannelHandlerContext ctx, Channel channel,

55 
            ChannelBuffer buffer) 
throws
 Exception

56 
    {

57 
        ChannelBuffer frame 
=
 (ChannelBuffer) 
super


58 
                .decode(ctx, channel, buffer);

59 
        
if
 (frame 
==
 
null
)

60 
        {

61 
            
return
 
null
;

62 
        }

63 
        
//

64 

        
int
 magicNum 
=
 frame.readInt();

65 
        
int
 dataLength 
=
 frame.readInt();

66 
        logger.info(

magic num={},data length={}

, magicNum, dataLength);

67 
        
//
 读AMF3字节流的内容


68 

        
byte
[] content 
=
 
new
 
byte
[frame.readableBytes()];

69 
        frame.readBytes(content);

70 
        SerializationContext serializationContext 
=
 
new
 SerializationContext();

71 
        Amf3Input amf3Input 
=
 
new
 Amf3Input(serializationContext);

72 
        amf3Input.setInputStream(
new
 ByteArrayInputStream(content));

73 
        Object message 
=
 amf3Input.readObject();

74 
        
return
 message;

75 
    }

76 
}

77 

  构建服务端

Java与Flex通信
Java与Flex通信
PushProtocolHandler

 1 
public
 
class
 PushProtocolHandler 
extends
 SimpleChannelHandler

 2 
{

 3 
    
public
 
static
 Logger log 
=
 LoggerFactory

 4 
            .getLogger(PushProtocolHandler.
class
);

 5 
    
/**


 6 
     * 

 7 
     
*/


 8 
    @Override

 9 
    
public
 
void
 messageReceived(ChannelHandlerContext ctx, MessageEvent e)

10 
    {

11 
        
if
 (e.getMessage() 
!=
 
null
)

12 
        {

13 
            ChannelManager channelManager 
=
 PushServerContext

14 
                    .getBean(ChannelManager.
class
);

15 
            
if
 (e.getMessage() 
instanceof
 CommandMessage)

16 
            {

17 
                channelManager.handleMsg((CommandMessage) e.getMessage(), e

18 
                        .getChannel());

19 
            }

20 
            
else
 
if
 (e.getMessage() 
instanceof
 PushMessage)

21 
            {

22 
                channelManager.handleMsg((PushMessage) e.getMessage(), e

23 
                        .getChannel());

24 
            }

25 
            
else


26 
            {

27 
                log.warn(

unkown message {}

, e);

28 
            }

29 
        }

30 
    }

31 }

Java与Flex通信
Java与Flex通信
PushServerPipelineFactory

 1 
import
 
static
 org.jboss.netty.channel.Channels.
*
;

 2 

/**


 3 
 * 

 4 
 * 
@author
 sunwei

 5 
 * 
@version
 2010-7-22

 6 
 * 
@since
 JDK1.5

 7 
 
*/


 8 

public
 
class
 PushServerPipelineFactory 
implements
 ChannelPipelineFactory

 9 
{

10 
    @Override

11 
    
public
 ChannelPipeline getPipeline() 
throws
 Exception

12 
    {

13 
        ChannelPipeline pipeline 
=
 pipeline();

14 
        
//
 处理日志


15 

        pipeline.addLast(

logger


new
 LoggingHandler());

16 
        
//
 处理coder


17 

        pipeline.addLast(

decoder


new
 AMF3Decoder(Constants.MAX_OBJECT_SIZE));

18 
        pipeline.addLast(

encoder


new
 AMF3Encoder());

19 
        
//

20 

        pipeline.addLast(

handler


new
 PushProtocolHandler());

21 
        
//

22 

        
return
 pipeline;

23 
    }

24 
}

Java与Flex通信
Java与Flex通信
ServerMain

 1 
public
 
static
 main(String[] args)

 2 
{

 3 
        
//
 开始NIO线程


 4 

         ChannelFactory factory 
=
 
new
 NioServerSocketChannelFactory(Executors

 5 
                .newCachedThreadPool(), Executors.newCachedThreadPool());

 6 
        
//
 服务启始点


 7 

    ServerBootstrap bootstrap 
=
 
new
 ServerBootstrap(factory);

 8 
    
//
 处理过滤器


 9 

    bootstrap.setPipelineFactory(
new
 PushServerPipelineFactory());

10 
    
//
 设置相关参数


11 

    bootstrap.setOption(

child.tcpNoDelay


true
);

12 
    
//
 设置相关参数


13 

    bootstrap.setOption(

child.keepAlive


true
);

14 
    
//
 绑定相关端口


15 

    bootstrap.bind(
new
 InetSocketAddress(getPushPort()));

16 
}

  Flex客户端

Java与Flex通信
Java与Flex通信
FlexSocket

 1 
public
 
class
 FlexSocket

 2 
{

 3 
 

 4 

//
发送包


 5 

        
public
 function send(type:
int
, obj:PushMessage):Boolean

 6 
        {

 7 
            
if
 (_socket 
==
 
null
)

 8 
            {

 9 
                
return
 
false
;

10 
            }

11 
            
//
手动限制不给发送的时候用


12 

            
if
 (socketState 
==
 socket_state_closed 
||
 socketState 
==
 socket_state_connecting)

13 
            {

14 
                
return
 
false
;

15 
            }

16 
            
if
 (
!
_socket.connected)

17 
            {

18 
                
return
 
false
;

19 
            }

20 
            var byteArr:ByteArray
=
objToByteaArray(obj);

21 
            var msgHead:MsgHead
=
new
 MsgHead(type, byteArr.length);

22 
            sendMsg(msgHead.getType(), msgHead.getSize(), byteArr);

23 
            
return
 
true
;

24 
        }

25 


26 

//
接受包


27 

                
private
 function getDataHandler(e:ProgressEvent):
void


28 
        {

29 
            _timeServerDead.stop();

30 
            _timeServerDead.reset();

31 
            
if
 (_socket.bytesAvailable 
>=
 
8
 
&&
 
!
_isReadHead)

32 
            {

33 
                _recvPackageType
=
_socket.readInt();

34 
                
//
同意关闭

35 

//
                if(_recvPackageType == 5)

36 

//
                {

37 

//
                    close();

38 

//
                }


39 

                _recvPackageSize
=
_socket.readInt();

40 
                _isReadHead
=
true
;

41 
            }

42 
            
if
 (_isReadHead 
&&
 _socket.bytesAvailable 
>=
 _recvPackageSize)

43 
            {

44 
                var 
byte
:ByteArray
=
new
 ByteArray();

45 
                _socket.readBytes(
byte

0
, _recvPackageSize);

46 
                _msgObj
=
byteArraytoObject(
byte
);

47 
                
//
暂时用上面一种 


48 

                
if
 (_recvPackageType 
==
 packageType.LOGIN_TYPE)

49 
                {

50 
                    
if
 (_msgObj.ret 
==
 bodyType.RECEIVE_OK)

51 
                    {

52 
                        _timerDetectSocket.start();

53 
                        socketState
=
socket_state_connected;

54 
                        myEventDispatch.Instence().dispatcher(bodyType.INLINE_CURRENTSOCKETSTATE);

55 
                    }

56 
                    
else
 
if
 (_msgObj.ret 
==
 bodyType.RECEIVE_ERROR)

57 
                    {

58 
                        close();

59 
                    }

60 


61 
                }

62 
                
else
 
if
 (_recvPackageType 
==
 packageType.CHAT_TYPE)

63 
                {

64 
                    myEventDispatch.Instence().dispatcher(selectEventName(_recvPackageType), _msgObj);

65 
                }

66 
                _recvPackageSize
=
0
;

67 
                _recvPackageType
=
0
;

68 
                _msgObj
=
null
;

69 
                _isReadHead
=
false
;

70 


71 


72 
            }

73 
        }

74 


75 
}

  有关Mina的实现,你可以通过本博客向我索取相关代码。 

          

 

 

 

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/33731.html

(0)
上一篇 2023-12-22 12:33
下一篇 2023-12-22 18:15

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信