当前位置:   article > 正文

鸿蒙源码分析(十三)

鸿蒙源码

软总线模块中trans_service目录分析总结

本文主要总结软总线模块下trans_service目录中个文件的作用和功能,宏观上把握trans_service在鸿蒙系统中发挥的作用。
trans_service模块依赖于系统OS提供的网络socket服务,向认证模块提供认证通道管理和认证数据的收发;向业务模块提供session管理和基于session的数据收发功能,并且通过GCM模块的加密功能提供收发报文的加解密保护。

相关文章

这里分享队友对authmanager模块的总结分析,文中部分流程图出自队友绘制。
分布式软总线/authmanager模块总结

一、分布式软总线介绍

1.1 总线

总线(Bus)是一个非常广泛的概念,在传统计算机硬件体系中应用的非常广泛。它是一种内部结构,是cpu、内存、输入、输出设备传递信息的公用通道,主机的各个部件通过总线相连接,外部设备通过相应的接口电路再与总线相连接,从而形成了计算机硬件系统。在计算机系统中,各个部件之间传送信息的公共通路叫总线,微型计算机是以总线结构来连接各个功能部件的。按照计算机所传输的信息种类,计算机的总线可以划分为数据总线、地址总线和控制总线,分别用来传输数据、数据地址和控制信号。
在这里插入图片描述

2.2 分布式软总线定义

分布式软总线技术是基于华为多年的通信技术积累,参考计算机硬件总线,在1+8+N(1-手机、8-车机/音箱/耳机/手表/)设备间搭建一条“无形”的总线,具备自发现、自组网、高带宽低时延的特点。
在这里插入图片描述
全场景设备间可以基于软总线完成设备虚拟化、跨设备服务调用、多屏协同、文件分享等分布式业务。
分布式软总线的典型特征:

  • 自动发现/即连即用
  • 高带宽
  • 低时延
  • 高可靠
  • 开放/标准

2.3分布式软总线的功能和原理

2.3.1 分布式软总线的架构

在这里插入图片描述
通过协议货架和软硬协同层屏蔽各种设备的协议差别,总线中枢模块负责解析命令完成设备间发现和连接,通过任务和数据两条总线实现设备间文件传输、消息传输等功能。

分布式总线的总体目标是实现设备间无感发现,零等待传输。实现这个目标需要解决三个问题:

(1)设备间的发现和连接:从手动发现,进化成自发现
比如手机上有很多照片需要传到个人PC上,我们可以采用蓝牙传输,首先要打开手机和PC的蓝牙发现功能,手机或者PC点击搜索设备,然后互相配对授权即可连接上,成功连上后就可以发送照片了。

在分享照片这个场景中有很多人为的动作:开启蓝牙发现功能、搜索设备、配对授权,这确实有点麻烦,耗费了很多时间,可能会降低分享的意愿。

软总线提出了自动发现的概念,实现用户零等待的自发现体验,附近同账号的设备自动发现无需等待。

在这里插入图片描述
(2)多设备互联后的组网技术:软总线组网-异构网络组网
上面的例子中手机传照片是通过蓝牙,假如PC没有蓝牙功能只有WIFI,在传统的场景中这种可能就不能实现分享传输了。因为不同的组网方式之间是隔离的,所以我们要解决很多异构网络之间的组网问题
在这里插入图片描述
软总线提出的异构网络组网可以很好解决设备间不同协议的交互问题,就解决了手机通过蓝牙传输,而PC通过WIFI接收照片。解决方案如下图所示。
在这里插入图片描述

设备上线后会向网络层注册,同时网络层会与设备建立通道连接,实时检测设备的变换。网络层负责管理设备的上线下线变换,设备间可以监听自己感兴趣的设备,设备上线后可以立即与其建立连接,实现零等待体验。软总线可以自动构建一个逻辑全连接网络,用户或者业务开发者无需关心组网方式与物理协议。对于软件开发者来说软总线异构组网可以大大降低其开发成本。

传统开发模式:在传统开发模式中开发者需要适配不同网络协议和标准规范。

分布式开发模式:在HarmonyOS分布式开发模式中开发不再需要关心网络协议差异,业务开发与设备组网解耦,业务仅需监听设备上下线,开发成本大大降低。
(3)多设备多协议间的传输技术
传统协议的传输速率差异非常大,时延也难以得到保证。所以软总线传输要实现的目标有:高带宽(High Speed)、低时延(Low Latency)、高可靠(High Reliability)

软总线要实现的这三大目标的尖刀武器是:极简协议。
在这里插入图片描述
极简协议将中间的四层协议栈精简为一层提升有效载荷,有效传输带宽提升20%,并且在传统网络协议的基础上进行增强:

  • 流式传输:基于UDP实现数据的保序和可靠传输;
  • 双轮驱动:颠覆传统TCP每包确认机制;
  • 不惧网损:摒弃传统滑动窗口机制,丢包快速恢复,避免阻塞;
  • 不惧抖动:智能感知网络变化,自适应流量控制和拥塞控制;

二、代码分析

在源代码目录,我们可以通过ls看到主要分为四个目录,分别是authmanager、discovery、trans_service、 和为兼容系统差别而生的os_adapter。在这里插入图片描述

  • discover:提供基于 COAP 协议的设备发现机制;
  • authmanager:提供设备认证机制和知识库管理功能;
  • trans_service:提供身份验证和数据传输通道;
  • os_adapter:检测运行设备性能,决定部分功能是否执行。

本篇主要分析trans_service目录
trans_service大致分析

trans_service目录中的代码提供身份验证和传输通道。它主要封装了socket、cJSON、线程锁接口,实现了用户的创建、监听、会话管理,以及设备、指令、数据等信息的获取,最终提供加密和解密传输两种传输通道。 trans_service模块依赖于系统OS提供的网络socket服务,向认证模块提供认证通道管理和认证数据的收发;向业务模块提供session管理和基于session的数据收发功能,并且通过GCM模块的加密功能提供收发报文的加解密保护。

各部分代码文件概述

  • auth_conn_manager.c
    用户创建,监听,连接等服务管理;
  • tcp_session_manager.c
    会话管理;
  • trans_lock.c
    互斥锁初始化以及互斥锁资源获取与释放;
  • aes_gcm.c
    提供加密传输和解密传输接口;
  • messages.c
    用于获取以cJSON格式管理的设备(包括设备名、设备类型、设备ID等)、指令、数据、会话(包括用户端口、会话端口等)等信息;
  • tcp_socket.c
    端口号管理以及数据传输管理。

trans_service详解
1. 初始化
在分布式软总线的设计中,trans_service模块是在authmanager模块中被初始化的,而authmanager模块又被discovery模块初始化,因此设备在向外发布本设备信息的过程中,即完成了这三个相互关联模块的初始化动作。
authmanager模块中存在StartBus()函数,其中,StartListener()函数负责为认证模块提供通道完成初始化,StartSession()函数负责初始化业务的session管理:

//开始总线通讯
int StartBus(void)
{
    if (g_busStartFlag == 1) {
        return 0;
    }
    //获取设备信息
    DeviceInfo *info = GetCommonDeviceInfo();
    if (info == NULL) {
        return ERROR_FAIL;//-1
    }

    g_baseLister.onConnectEvent = OnConnectEvent;
    g_baseLister.onDataEvent = OnDataEvent;
    //通过监听函数中的函数GetSockPort(g_listenFd)获取端口
    int authPort = StartListener(&g_baseLister, info->deviceIp);
    if (authPort < 0) {
        SOFTBUS_PRINT("[AUTH] StartBus StartListener fail\n");
        return ERROR_FAIL;
    }
    //赋值给info的设备端口
    info->devicePort = authPort;
    //开始会话通过IP得到会话端口
    int sessionPort = StartSession(info->deviceIp);
    if (sessionPort < 0) {
        SOFTBUS_PRINT("[AUTH] StartBus StartSession fail\n");
        StopListener();//停止监听
        return ERROR_FAIL;
    }
    //变量名称前面加g表示该文件下的全局变量
    AuthMngInit(authPort, sessionPort);
    g_busStartFlag = 1;

    SOFTBUS_PRINT("[AUTH] StartBus ok\n");
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

认证通信与业务session的实现原理类似,本文中我们以简单一些的认证通信代码为例子,说明典型的处理流程。

初始化的入口在StartListener,StartListener()函数的底层存在对应不同版本平台的适配函数,这印证了鸿蒙OS各部分解耦的模块化设计思想,针对不同的硬件设备,组合成最适合该设备的OS。比如创建线程时采用了统一的static void WaitProcess(void)函数,而其内部封装了不同底层API的适配代码。


int StartListener(BaseListener *callback, const char *ip)
{
    if (callback == NULL || ip == NULL) {
        return -DBE_BAD_PARAM;
    }
    //如果回调变量和ip有一个为空则返回-DBE_BAD_PARAM
    g_callback = callback;

    int rc = InitListenFd(ip, SESSIONPORT);//调用函数,初始化监听listen
    if (rc != DBE_SUCCESS) {//这个if用来判断初始化函数是否执行成功
        return -DBE_BAD_PARAM;//初始化失败则返回-DBE_BAD_PARAM = -300+0
    }

    signal(SIGPIPE, SIG_IGN);//为指定的信号安装一个新的信号处理函数。
    ThreadAttr attr = {"auth", 0x800, 20, 0, 0};
    register ThreadId threadId = (ThreadId)AuthCreate((Runnable)WaitProcess, &attr);
    if (threadId == NULL) {//如果返回值为NULL,说明创建失败
        SOFTBUS_PRINT("[TRANS] StartListener AuthCreate fail\n");
        return -1;
        //返回值-1,并且输出相关信息
    }
    return GetSockPort(g_listenFd); 
    //GetSockPort将获得一个与socket相关的地址,服务器端可以通过它得到相关客户端地址,而客户端也可以得到当前已连接成功的socket的ip和端口。
    //最后返回是一个16位数由网络字节顺序转换为主机字节顺序
 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

StartListener()调用InitListenFd()函数完成监听TCP socket的创建和监听,其中IP地址和端口号由上层调用者指定。

static int InitListenFd(const char *ip, int port)//初始化listen设备
{
    if (ip == NULL || g_listenFd != -1) {
        return -DBE_BAD_PARAM;//检查参数
    }

    if (strncmp(ip, "0.0.0.0", strlen(ip)) == 0) {//比较ip和0.0.0.0,大致为ip是否有效
        return -DBE_BAD_PARAM;
    }

    int rc = OpenTcpServer(ip, port);//根据ip和port打开Tcp服务器
    if (rc < 0) {
        SOFTBUS_PRINT("[TRANS] InitListenFd OpenTcpServer fail\n");
        return rc;
    }
    g_listenFd = rc;
    RefreshMaxFd(g_listenFd);

    rc = listen(rc, DEFAULT_BACKLOG);//监听rc设备
    if (rc != 0) {//查看监听是否成功
        SOFTBUS_PRINT("[TRANS] InitListenFd listen fail\n");
        StopListener();
        return -DBE_LISTEN_FAIL;
    }

    return DBE_SUCCESS;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

如上文所述,AuthCreate()在不同平台上会有不同的实现,在LITEOS_A和Linux平台上, AuthCreate()会调用兼容POSIX的pthread_create()完成线程的创建,线程的入口函数为static void WaitProcess(void)。

ThreadId AuthCreate(Runnable run, const ThreadAttr *attr)/*创建auth权限基于线程*/
{
    pthread_attr_t threadAttr;//线程变量
    pthread_attr_init(&threadAttr);//初始化线程
    pthread_attr_setstacksize(&threadAttr, (attr->stackSize | MIN_STACK_SIZE));//设定线程变量
    struct sched_param sched = {attr->priority};
    pthread_attr_setschedparam(&threadAttr, &sched);
    pthread_key_create(&g_localKey, NULL);
    pthread_t threadId = 0;
    int errCode = pthread_create(&threadId, &threadAttr, run, NULL);//创建以恶搞线程
    if (errCode != 0) {
        return NULL;
    }
    return (ThreadId)threadId;
    //返回一个结构体变量threadId
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

2. 监听

static void WaitProcess(void)/*静态函数,用来等待进程*/
{
    SOFTBUS_PRINT("[TRANS] WaitProcess begin\n");
    //开始创建进程输出
    fd_set readSet;
    fd_set exceptfds;
    //定义两个文件描述字(fd)的集合类型的变量
    while (1) {
        FD_ZERO(&readSet);
        FD_ZERO(&exceptfds);
        FD_SET(g_listenFd, &readSet);
        if (g_dataFd >= 0) {
            FD_SET(g_dataFd, &readSet);
            FD_SET(g_dataFd, &exceptfds);
        }
        int ret = select(g_maxFd + 1, &readSet, NULL, &exceptfds, NULL);//select函数用来监视文件描述符的变化
        //返回值:>0:就绪描述字的正数目  -1:出错    0 :超时
        if (ret > 0) {//ret>0说明select监视成功
            if (!ProcessAuthData(g_listenFd, &readSet)) {/*该函数用来获取当前进程所有者的信息*/
            //如果获取失败,则进行友好输出信息并且停止listener
                SOFTBUS_PRINT("[TRANS] WaitProcess ProcessAuthData fail\n");
                StopListener();
                break;
            }
        } else if (ret < 0) {//ret<0说明select监视失败
            if (errno == EINTR || (g_dataFd > 0 && FD_ISSET(g_dataFd, &exceptfds))) {
                SOFTBUS_PRINT("[TRANS] errno == EINTR or g_dataFd is in exceptfds set.\n");
                CloseAuthSessionFd(g_dataFd);
                //关闭AuthSession(用户会话),函数相关说明见下面函数定义
                continue;
            }
            SOFTBUS_PRINT("[TRANS] WaitProcess select fail, stop listener\n");
            StopListener();
            //停止listener,关闭g_listenFd和g_maxFd对应的文件
            break;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

WaitProcess()使用忙等方式,调用select()来监听listenFd和数据g_dataFd的信息,如果监听到有数据可读,则进入ProcessAuthData来处理。如果发现g_dataFd有异常信息,则将其关闭。其中g_dataFd是由listenFd监听到连接时创建的socket。
3. 处理新连接和数据

static bool ProcessAuthData(int listenFd, const fd_set *readSet)
{
    if (readSet == NULL || g_callback == NULL || g_callback->onConnectEvent == NULL ||
        g_callback->onDataEvent == NULL) {
        return false;
        //回调g_callback和该链表所关联事件为空和所连接的数据文件为空或者文件描述符集合readset为空、返回False
    }
    if (FD_ISSET(listenFd, readSet)) {//套接字结合操作宏 
        struct sockaddr_in addrClient = {0};
        socklen_t addrLen = sizeof(addrClient);
        g_dataFd = accept(listenFd, (struct sockaddr *)(&addrClient), &addrLen);//accept函数指定服务端去接受客户端的连接,接收后,返回了客户端套接字的标识,且获得了客户端套接字的“地方”
        //accept函数的第一个参数用来标识服务端套接字(也就是listen函数中设置为监听状态的套接字)
        //第二个参数是用来保存客户端套接字对应的“地方”(包括客户端IP和端口信息等)
        //第三个参数是“地方”的占地大小。返回值对应客户端套接字标识。

        //accept函数执行成功则返回非负整数,执行失败则返回一个负值
        if (g_dataFd < 0) {
            CloseAuthSessionFd(listenFd);
            return false;
        }
        //如果accept函数执行失败,关闭listenFd描述符对应文件
        RefreshMaxFd(g_dataFd);
        //传入g_dataFd刷新g_maxFd的值,保持g_maxFd为最大

        if (g_callback->onConnectEvent(g_dataFd, inet_ntoa(addrClient.sin_addr)) != 0) {
            CloseAuthSessionFd(g_dataFd);
            //如果g_callback->onConnectEvent中对应的描述符g_dataFd和ip不为零,则关闭g_dataFd对应的文件
        }
    }

    if (g_dataFd > 0 && FD_ISSET(g_dataFd, readSet)) {
        g_callback->onDataEvent(g_dataFd);
    //如果g_dataFd为正值,并且FD_ISSET检查g_dataFd在readSet这个描述符集合里面,则将g_dataFd写入g_callback->onDataEvent
    }

    return true;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

无论是新连接请求,还是已有连接中有数据到来,均会进入本函数。

函数通过FD_ISSET()判断是否是listenFd上存在消息,如果是,则说明当前存在新的连接,这时调用accept()完成链接创建,新创建的socket的fd被存储在g_dataFd中,同时调用g_callback->onConnectEvent通知认证模块有新的连接事件发生,并将新创建的fd和client的IP地址告知认证模块。

与此同时,创建g_dataFd时候需要刷新g_maxFd,以保证在WaitProcess()中的下一次select()操作时中,会监听到g_dataFd上的事件。

如果FD_ISSET()判断出g_dataFd上存在消息,则说明已完成握手的连接向本节点发送了数据,这时函数回调g_callback->onDataEvent(),把控制权返回给调用者,以处理接收到的数据。
4. 回调
trans_service模块的使用者设置的回调函数将在存在新连接、和新数据时被调用,比如认证模块通过以下函数完成认证动作:OnConnectEvent()函数中完成对新连接的处理, OnDataEvent()函数中完成对新数据的处理。

//通过fd标识符和IP将事件上链
//调用函数PorcessConnectEvent()
//里面又调用AddAuthConnToList(AuthConn *aconn)将conn插入链表的尾部
int OnConnectEvent(int fd, const char *ip)
{
    ProcessConnectEvent(fd, ip);
    return 0;
}

//通过标识符找到对应的链接conn
//如果不空则将其中的信息提取打包并加上信息头部
//根据模式进行IP验证还是id验证
//根据输出的文本判断是否验证成功
int OnDataEvent(int fd)
{
    ProcessDataEvent(fd);
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

5. session处理
该部分代码负责业务的数据通信,节点通过名称进行通信,对外隐藏了端口信息,代码的实现原理与上面的认证通信类似,针对业务需求增加了如多路通信等方面的实现,这里不再详细分析。

整体流程

在这里插入图片描述
被发现端(轻量设备)注册、发布服务,成功后回调处理,被发现端使用CreateSessionServer来创建会话服务器并等待发现端的连接、创建会话。发现端(如:智慧屏设备)根据服务的名称和设备ID建立一个会话,就可以实现服务间的数据传输。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/124472
推荐阅读
相关标签
  

闽ICP备14008679号