Android 一套完整的 Socket 解决方案,androidsocket,当然了文中最后也提到了,
Android 一套完整的 Socket 解决方案,androidsocket,当然了文中最后也提到了,
作者 | MeloDev 地址 | http://www.jianshu.com/p/61de9478c9aa 声明 | 本文是 MeloDev 原创,已获授权发布,未经原作者允许请勿转载 写在前面 项目地址,喜欢点一个 star: https://github.com/itsMelo/AndroidSocket 在上上周的时候,写了一篇文章: 在 Android 上,一个完整的 UDP 通信模块应该是怎样的? 文中介绍了在 Android 端,一个完整的 UDP 模块应该考虑哪些方面。当然了文中最后也提到了,UDP 的使用本身就有一些局限性,比如发送数据的大小有限制,属于不可靠协议,可能丢包。而且它是一对多发送的协议等等...如果能将这个模块能加入 TCP Socket 补充,那就比较完美解决了 Android 上端到端的通信。下面就来看看怎么去做。 整体步骤流程 先来说一下整体的步骤思路吧: 发送 UDP 广播,大家都知道 UDP 广播的特性是整个网段的设备都可以收到这个消息。 接收方收到了 UDP 的广播,将自己的 ip 地址,和双方约定的端口号,回复给 UDP 的发送方。 发送方拿到了对方的 ip 地址以及端口号,就可以发起 TCP 请求了,建立 TCP 连接。 保持一个 TCP 心跳,如果发现对方不在了,超时重复 1 步骤,重新建立联系。 整体的步骤就和上述的一样,下面用代码展开: 搭建 UDP 模块 首先进行一些初始化操作,准备线程池,记录对象初始的时间等等。 紧接着就创建了真正的一个 UDP Socket 端,DatagramSocket,注意这里传入的端口号 CLIENT_PORT 的意思是这个 DatagramSocket 在此端口号接收消息。 我们都知道 Socket 中要处理数据的发送和接收,并且发送和接收都是阻塞的,应该放在子线程中,这里就开启了一个线程,来处理接收到的 UDP 消息(UDP 模块上一篇文章讲得比较详细了,所以这里就不详细展开了) 在子线程接收 UDP 数据,并且 notifyMessageReceive 方法通过接口来向外通知消息。 接着 startHeartbeatTimer 开启一个心跳线程,每间隔五秒,就去广播一个 UDP 消息。注意这里 getBroadcastAddress 是获取的网段 ip,发送这个 UDP 消息的时候,整个网段的所有设备都可以接收到。 到此为止,我们发送端的 UDP 算是搭建完成了。 搭建 TCP 模块 接下来 TCP 模块该出场了,UDP 发送心跳广播的目的就是找到对应设备的 ip 地址和约定好的端口,所以在 UDP 数据的接收方法里: 这个方法的目的就是取到对方 UDPServer 端,发给我的 UDP 消息,将它的 ip 地址告诉了我,以及我们提前约定好的端口号。 怎么获得一个设备的 ip 呢? 现在拿到了对方的 ip,以及约定好的端口号,终于可以开启一个 TCP 客户端了。 当 TCP 客户端成功建立的时候,我们就可以通过 TCP Socket 来发送和接收消息了。 细节处理 接下来就是一些细节处理了,比如我们的 UDP 心跳,当 TCP 建立成功之时,我们要停止 UDP 的心跳: 对 TCP 连接进行心跳保护: 首先会每隔两秒,就给对方发送一个 ping 包,看看对面在不在,如果超过 15 秒还没有回复我,那就说明对方掉线了,关闭我这边的 TCP 端。进入 onFailed 方法。 当 TCP 连接超时,我就会重新启动 UDP 的广播心跳,寻找等待连接的设备。进入下一个步骤循环。 对于数据传输的格式啊等等细节,这个和业务相关。自己来定就好。 还可以根据自己业务的模式,是 CPU 密集型啊,还是 IO 密集型啊,来开启不同的线程通道。这个就涉及线程的知识了。 项目地址,喜欢点一个 star: https://github.com/itsMelo/AndroidSocketpublic UDPSocket(Context context) {
this.mContext = context;
int cpuNumbers = Runtime.getRuntime().availableProcessors();
// 根据CPU数目初始化线程池
mThreadPool = Executors.newFixedThreadPool(cpuNumbers * Config.POOL_SIZE);
// 记录创建对象时的时间
lastReceiveTime = System.currentTimeMillis();
messageReceiveList = new ArrayList<>();
Log.d(TAG, "创建 UDP 对象");
// createUser();
}
public void startUDPSocket() {
if (client != null) return;
try {
// 表明这个 Socket 在设置的端口上监听数据。
client = new DatagramSocket(CLIENT_PORT);
client.setReuseAddress(true);
if (receivePacket == null) {
// 创建接受数据的 packet
receivePacket = new DatagramPacket(receiveByte, BUFFER_LENGTH);
}
startSocketThread();
} catch (SocketException e) {
e.printStackTrace();
}
}
/**
* 开启发送数据的线程
*/
private void startSocketThread() {
clientThread = new Thread(new Runnable() {
@Override
public void run() {
receiveMessage();
}
});
isThreadRunning = true;
clientThread.start();
Log.d(TAG, "开启 UDP 数据接收线程");
startHeartbeatTimer();
}
/**
* 处理接受到的消息
*/
private void receiveMessage() {
while (isThreadRunning) {
try {
if (client != null) {
client.receive(receivePacket);
}
lastReceiveTime = System.currentTimeMillis();
Log.d(TAG, "receive packet success...");
} catch (IOException e) {
Log.e(TAG, "UDP数据包接收失败!线程停止");
stopUDPSocket();
e.printStackTrace();
return;
}
if (receivePacket == null || receivePacket.getLength() == 0) {
Log.e(TAG, "无法接收UDP数据或者接收到的UDP数据为空");
continue;
}
String strReceive = new String(receivePacket.getData(), receivePacket.getOffset(), receivePacket.getLength());
Log.d(TAG, strReceive + " from " + receivePacket.getAddress().getHostAddress() + ":" + receivePacket.getPort());
//解析接收到的 json 信息
notifyMessageReceive(strReceive);
// 每次接收完UDP数据后,重置长度。否则可能会导致下次收到数据包被截断。
if (receivePacket != null) {
receivePacket.setLength(BUFFER_LENGTH);
}
}
}
/**
* 发送心跳包
*
* @param message
*/
public void sendMessage(final String message) {
mThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
BROADCAST_IP = WifiUtil.getBroadcastAddress();
Log.d(TAG, "BROADCAST_IP:" + BROADCAST_IP);
InetAddress targetAddress = InetAddress.getByName(BROADCAST_IP);
DatagramPacket packet = new DatagramPacket(message.getBytes(), message.length(), targetAddress, CLIENT_PORT);
client.send(packet);
// 数据发送事件
Log.d(TAG, "数据发送成功");
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
/**
* 处理 udp 收到的消息
*
* @param message
*/
private void handleUdpMessage(String message) {
try {
JSONObject jsonObject = new JSONObject(message);
String ip = jsonObject.optString(Config.TCP_IP);
String port = jsonObject.optString(Config.TCP_PORT);
if (!TextUtils.isEmpty(ip) && !TextUtils.isEmpty(port)) {
startTcpConnection(ip, port);
}
} catch (JSONException e) {
e.printStackTrace();
}
}
public String getLocalIPAddress() {
WifiInfo wifiInfo = mWifiManager.getConnectionInfo();
return intToIp(wifiInfo.getIpAddress());
}
private static String intToIp(int i) {
return (i & 0xFF) + "." + ((i >> 8) & 0xFF) + "." + ((i >> 16) & 0xFF) + "."
+ ((i >> 24) & 0xFF);
}
private boolean startTcpConnection(final String ip, final int port) {
try {
if (mSocket == null) {
mSocket = new Socket(ip, port);
mSocket.setKeepAlive(true);
mSocket.setTcpNoDelay(true);
mSocket.setReuseAddress(true);
}
InputStream is = mSocket.getInputStream();
br = new BufferedReader(new InputStreamReader(is));
OutputStream os = mSocket.getOutputStream();
pw = new PrintWriter(new BufferedWriter(new OutputStreamWriter(os)), true);
Log.d(TAG, "tcp 创建成功...");
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
if (startTcpConnection(ip, Integer.valueOf(port))) {// 尝试建立 TCP 连接
if (mListener != null) {
mListener.onSuccess();
}
startReceiveTcpThread();
startHeartbeatTimer();
} else {
if (mListener != null) {
mListener.onFailed(Config.ErrorCode.CREATE_TCP_ERROR);
}
}
// TCP已经成功建立连接,停止 UDP 的心跳包。
public void stopHeartbeatTimer() {
if (timer != null) {
timer.exit();
timer = null;
}
}
/**
* 启动心跳
*/
private void startHeartbeatTimer() {
if (timer == null) {
timer = new HeartbeatTimer();
}
timer.setOnScheduleListener(new HeartbeatTimer.OnScheduleListener() {
@Override
public void onSchedule() {
Log.d(TAG, "timer is onSchedule...");
long duration = System.currentTimeMillis() - lastReceiveTime;
Log.d(TAG, "duration:" + duration);
if (duration > TIME_OUT) {//若超过十五秒都没收到我的心跳包,则认为对方不在线。
Log.d(TAG, "tcp ping 超时,对方已经下线");
stopTcpConnection();
if (mListener != null) {
mListener.onFailed(Config.ErrorCode.PING_TCP_TIMEOUT);
}
} else if (duration > HEARTBEAT_MESSAGE_DURATION) {//若超过两秒他没收到我的心跳包,则重新发一个。
JSONObject jsonObject = new JSONObject();
try {
jsonObject.put(Config.MSG, Config.PING);
} catch (JSONException e) {
e.printStackTrace();
}
sendTcpMessage(jsonObject.toString());
}
}
});
timer.startTimer(0, 1000 * 2);
}
@Override
public void onFailed(int errorCode) {// tcp 异常处理
switch (errorCode) {
case Config.ErrorCode.CREATE_TCP_ERROR:
break;
case Config.ErrorCode.PING_TCP_TIMEOUT:
udpSocket.startHeartbeatTimer();
tcpSocket = null;
break;
}
}
用户评论