当前位置:   article > 正文

c印记(十四):跨平台线程封装_c++跨平台线程封装

c++跨平台线程封装

一、缘起

不管是pc应用,服务端应用以及移动应用在编程过程当中几乎都会使用到线程相关的特性,而不同的操作系统可能会有不同的线程相关的API。比如类Unix系统上会有POSIX标准的pthread,windows上也有windows版的Pthread,但windows上并不是原生支持pthread,这样在使用的时候或多或少都会有一些性能,安全性上面的影响。再有就是一些rtos系统基本上都是task级别的API,因此封装一个跨平台的通用线程API是很有必要的。(不直接封装成pthread API,是因为pthread的API有很多,但我们常用的却很少)

二、跨平台通用线程API定义

为了简单易用,这里只抽象了三个API,具体如下:

/**
*@brief  get current thread ID
*
*@param  None
*
*@return return thread ID
*@see 
*/
G_API vos_thread_id vosThreadGetId(void);
/**
*@brief  create a thread instance
*
*@param  threadId  [io] thread id output
*@param  loopFunc  [in] thread main loop function.
*@param  loopParam [in] thread loop function's parameter.
*@param  attr      [in] thread attribute. 
*
*@return if success return G_OK, else return G_FAIL.
*@see 
*@note   thread default state(the param attr is NULL) is joinable(mean that, 
usr 
*        need call function vosThreadjoin to release thread resource)
*        
*/
G_API GS32 vosThreadCreate(vos_thread_id *threadId, vosThreadFunc_t loopFunc, 
void* loopParam, vos_thread_attr_t* attr);

/**
*@brief  wait for thread to release resource
*
*@param  threadId [in] the id of thread.
*
*@return if sucess return G_OK(0),else return error code
*@see
*@note  if thread not in joinable state,then will be return '
VOS_INVALID_OPERATION'
*/
G_API GS32 vosThreadJoin(vos_thread_id threadId);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • vosThreadGetId() : 获取当前线程的id(tid)
  • vosThreadCreate():创建一个线程
  • vosThreadJoin():如果不是detach模式的话,就需要调用此API来等待线程结束并释放资源
  • vos_thread_id: 线程的ID,其定义如下:
//thread id type define
typedef void* vos_thread_id;
  • 1
  • 2
  • vosThreadFunc_t: 线程循环的回调函数,其定义如下:
/**
*@brief  thread loop function type
*
*@param  
*
*@return if success exit return G_OK , else return error code.
*@see 
*/
typedef GS32 (*vosThreadFunc_t)(void *); 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • vos_thread_attr_t: 线程属性,其定义如下:
//thread attribute define
typedef struct vos_thread_attr_s 
{
    GS32 priority; /** thread priority(range: lowest 0 - 100 highest) */ 

    /**
     * GTRUE:  need call vosThreadJoin to wait thread exit and release thread 
     *         resource
     *         
     * GFALSE: need call nothing,thread resource will be auto release.     
     */
    GBOL joinable;

    /**
     * thread name, if NULL,then thread name will be set as "unknown"
     */
    const GS08* name;
}vos_thread_attr_t;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

目前的都只是简单的几个常用属性,其实也不需要定义太多的属性,如果必须使用非常复杂的属性,可以直接使用系统的API,作为特例处理,而这里没有必要为某些特例将整个API复杂化,那样即增加了实现的复杂度,也增加了不稳定的风险。

可以看到这组API和pthread的API长得几乎一模一样,没错,他们就是来源于pthread的。接下来就看看具体的API实现。

三、各平台对API的实现

虽然这里说是各平台的实现,但其实主要就只是POSIX(Linux和Android)实现以及windows实现,其他平台还暂未实现。

3.1 POSIX(Linux和Android)实现

  • 首先是 vosThreadGetId的实现,这个比较简单,直接调用pthread的相对应接口就可以了:
vos_thread_id vosThreadGetId(void)
{
    return (vos_thread_id)pthread_self();
}
  • 1
  • 2
  • 3
  • 4
  • 其次是这组API的主角,创建一个线程的API,vosThreadCreate:
GS32 vosThreadCreate(vos_thread_id *threadId, vosThreadFunc_t loopFunc, void* 
loopParam, vos_thread_attr_t* attr)
{
#if defined(_WIN32) /** for windows */
    GS32 policy = SCHED_OTHER; /**这里包含了使用windows版本的POSIX实现的情况 */
#else /** other */
    GS32 policy = SCHED_RR;
#endif
    GS32 ret = 0;
    GS32 maxPri = 0;
    GS32 minPri = 0;
    GS32 rangePri = 0;
    struct sched_param schParam = {0};
    pthread_attr_t thdattr;
    posixThread_t* posixThd = NULL;
    GS32 priority = attr->priority;
    

    GCHECK(threadId);
    *threadId = NULL;
    /** 分配线程结构的内存 */
    posixThd = (posixThread_t*)memAlloc(sizeof(posixThread_t));

    if (!posixThd)
    {
        THD_PRINT("allocate win thread instance failed\n");
        goto ERROR;
    }

    memset(posixThd, 0, sizeof(posixThread_t));
    if (attr && attr->name)/**处理线程名属性 */
    {
        GS32 len = strlen(attr->name);

        if (len > THD_NAME_MAX_LEN)/**线程名最初只能31个字节,否则会被截断*/
        {
            printf("thread name too long\n");
            memcpy(posixThd->name, attr->name, THD_NAME_MAX_LEN);
        }
        else
        {
            memcpy(posixThd->name, attr->name, len);
        }
    }
    else
    {        
        memcpy(posixThd->name, "unkown", 6);
    }
    /** 防止线程在设置好参数之前就启动的条件变量 */
    ret = pthread_cond_init(&(posixThd->cond), NULL);
    if (0 != ret)
    {
        THD_PRINT("init condition lock failed,ret(%d),err(%d,%s)\n",
                 ret, errno, strerror(errno));
        goto ERROR;
    }

	/** 条件变量相关的互斥锁 */
    ret = pthread_mutex_init(&(posixThd->lock), NULL);
    if (0 != ret)
    {
        THD_PRINT("init mutex lock failed,ret(%d),err(%d,%s)\n", 
                 ret, errno, strerror(errno));
        goto ERROR;
    }

    /** 记录API传入的参数 */
    posixThd->param          = loopParam;
    posixThd->loopFunc       = loopFunc;
    posixThd->initIsComplete = GFALSE;

    /**初始化pthread属性结构*/
    pthread_attr_init(&thdattr);
    pthread_attr_getschedparam(&thdattr, &schParam);
    
    pthread_attr_setschedpolicy(&thdattr, policy);
    minPri = sched_get_priority_min(policy);

    THD_PRINT("thread [%s], policy(%d), min-max[%d-%d], pri(%d)\n",
              posixThd->name, policy, minPri, maxPri, prior44dity);
              
    rangePri = maxPri - minPri;/**获取线程优先级取值区间*/
    schParam.sched_priority = priority * rangePri / 100;
    pthread_attr_setschedparam(&thdattr, &schParam);
    //pthread_attr_setscope(&attr, NULL);

    if (attr->joinable == GFALSE)
    {/** 设置pthread为detach模式 */
        pthread_attr_setdetachstate(&thdattr, PTHREAD_CREATE_DETACHED);
    }

	/** 创建pthread线程 */
    ret = pthread_create(&(posixThd->thd), &thdattr, threadFunc, posixThd);

    pthread_attr_destroy (&thdattr);
    
    if (0 != ret)
    {
        THD_PRINT("===>:pthread create failed,ret(%d),err(%d, %s)\n", 
	             ret, errno, strerror(errno));        
        goto ERROR;
    }
    
    *threadId= (vos_thread_id)posixThd->thd; /** 输出线程id */

    pthread_mutex_lock(&posixThd->lock);
    posixThd->initIsComplete = GTRUE;    
    pthread_cond_broadcast(&(posixThd->cond)); /**完成创建以及初始化,启动线程*/
    pthread_mutex_unlock(&posixThd->lock);

    return G_OK;

ERROR:
            
    if (posixThd)
    {
        pthread_cond_destroy(&(posixThd->cond));
        pthread_mutex_destroy(&(posixThd->lock));
        memFree(posixThd);
    }
   

    return G_FAIL;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • vosThreadJoin: 等待线程结束并释放资源
GS32 vosThreadJoin(vos_thread_id threadId)
{
    if (threadId != VTHD_INVALID_TID) 
    {
		/**判断要等待的线程是否为当前线程,预防死等的情况发生 */
        if (threadId != vosThreadGetId()) 
        {
            void* retVal = NULL;
            pthread_t thd = (pthread_t)(threadId);
			/**调用pthread API等待线程结束并释放资源*/
            GS32 ret = pthread_join(thd, &retVal);

            if (ret == 0)
            {
                return G_OK;
            }
            else
            {
                THD_PRINT("===>:pthread join failed,ret(%d),err(%d, %s)\n", 
                         ret, errno, strerror(errno));
                
                if (ret == EINVAL)
                {
                    return G_ErrorInvalidOperation;
                }
                else if (ret == EDEADLK)
                {
                    return G_ErrorWouldBlock;
                }
            }
        }
        else
        {
            THD_PRINT("can not call(%s) in thread self\n", __FUNCTION__);
            return G_ErrorWouldBlock;
        }

    }

    return G_FAIL;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

3.2 windows平台上的实现

这里使用windows的系统API,_beginthreadex来实现创建线程的API,因为windows上要操作线程只能使用线程的handle,而不是tid,所以这里需要先实现一个线程tid和handle映射的关系,并使用一个全局的单例类来记录当前进程中创建的线程的handle和tid。

首先是封装了两个简单的互斥锁的类:

class myTinyLocker /** 互斥锁 */
{
public:
    inline myTinyLocker() { m_hMutex = ::CreateMutex(NULL, FALSE, NULL); }
    inline ~myTinyLocker() { ::CloseHandle(m_hMutex); }
    inline void lock() const { ::WaitForSingleObject(m_hMutex, INFINITE); }
    inline void unlock() const { ::ReleaseMutex(m_hMutex); }
private:
    HANDLE m_hMutex;
};

class myTinyAutoLock /** 利用c++的改造函数和析构函数的特性实现的自动互斥锁 */
{
public:
    myTinyAutoLock(const myTinyLocker& lock):mLock(lock)
    {
        mLock.lock();
    }

    ~myTinyAutoLock()
    {
        mLock.unlock();
    }
private:
    const myTinyLocker& mLock;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

然后是注册线程handle和tid的单例类:

struct myThdRegItem /** 线程的基本信息 */
{
public:
    HANDLE thd;
    GU32 tid;
    GBOL joinable;
};

class myThreadHandleReg
{
private:
    ~myThreadHandleReg()
    {
        GS32 ret = 0;
        map<GU32, myThdRegItem>::const_iterator it = mReg.begin();

        for (; it != mReg.end(); it++)
        {
            myThdRegItem* item = (myThdRegItem*)&(it->second);
            CloseHandle(item->thd);
            //ret = GetLastError();
        }

        mReg.clear();
    }
public:
    static myThreadHandleReg* getInstance()
    {
        myTinyAutoLock autoLock(mLock);
        static myThreadHandleReg gThdReg;

        return &gThdReg;
    }

    void insert(GU32 key, myThdRegItem item)
    {
        myTinyAutoLock autoLock(mLock);
        mReg.insert(make_pair(key, item));
    }

    myThdRegItem* find(GU32 key)
    {
        myTinyAutoLock autoLock(mLock);
        map<GU32, myThdRegItem>::const_iterator it = mReg.find(key);

        if (it == mReg.end()) {

            return NULL;
        }

        return (myThdRegItem*)&(it->second);
    }

    GS32 destroyItem(GU32 key)
    {
        myTinyAutoLock autoLock(mLock);
        map<GU32, myThdRegItem>::const_iterator it = mReg.find(key);

        if (it == mReg.end())
        {
            return G_FAIL;
        }

        myThdRegItem* item = (myThdRegItem*)&(it->second);

        CloseHandle(item->thd);
        mReg.erase(it);

        return G_OK;

    }
private:
    static myTinyLocker mLock;

    map<GU32, myThdRegItem> mReg;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76

接下来依然是按次序介绍三个线程API的实现。

  • vosThreadGetId
vos_thread_id vosThreadGetId(void)
{
	/** 直接调用windows的API来获取当前线程的tid */
    return (vos_thread_id)GetCurrentThreadId();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • vosThreadCreate
GS32 vosThreadCreate(vos_thread_id *threadId, vosThreadFunc_t loopFunc, void* 
loopParam, vos_thread_attr_t* attr)
{
    winThread_t* winThd = NULL;
    GU32 tid = 0;
    GCHECK(threadId);
    *threadId = VTHD_INVALID_TID;
    winThd = (winThread_t*)memAlloc(sizeof(winThread_t));

    if (!winThd)
    {
        printf("allocate win thread instance failed\n");
        goto EXIT;
    }

    memset(winThd, 0, sizeof(winThread_t));

    if (attr &&  attr->name)
    {
        GS32 len = strlen((const char*)attr->name);

        if (len > THD_NAME_MAX_LEN)
        {
            printf("thread name too long\n");
            memcpy(winThd->name, attr->name, THD_NAME_MAX_LEN);
        }
        else
        {
            memcpy(winThd->name, attr->name, len);
        }
    }
    else
    {        
        memcpy(winThd->name, "unknown", 6);
    }
    winThd->loopFunc = loopFunc;   
    winThd->param    = loopParam;
	/** 调用_beginthreadex API来创建一个windows线程,默认状态为:暂停 */
    winThd->thd = _beginthreadex(NULL, 0, threadFunc, winThd, CREATE_SUSPENDED,
            &tid);//or CREATE_SUSPENDED(start with pause state),0(with run state)

    if (!winThd->thd)
    {
        memFree(winThd);
        winThd = NULL;
        goto EXIT;
    }

    *threadId = (vos_thread_id)tid; //create success, here output thread 
instance handle.

    myThreadHandleReg* thdReg = myThreadHandleReg::getInstance();
    myThdRegItem item;
    item.tid = tid;
    item.thd = (HANDLE)winThd->thd;    

    if (attr)
    {
        winThd->attr = *attr;
        item.joinable = attr->joinable;
    }
    else
    {
        item.joinable = GFALSE;
    }

    thdReg->insert(tid, item); /** 注册线程基本信息 */

    winThd->tid = tid;
	/** 恢复线程为运行状态 */
    ResumeThread((HANDLE)winThd->thd); //here resum thread
    
EXIT:

    if (!winThd)
    {
        return G_FAIL;
    }

    return G_OK;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • vosThreadJoin
GS32 vosThreadJoin(vos_thread_id threadId)
{
    if (threadId != VTHD_INVALID_TID)/** 判断线程id是否无效 */
    {
        GU32 tid = *((GU32*)threadId);

        if (threadId != vosThreadGetId())/** 预防死等情况 */
        {
            myThreadHandleReg* thdReg = myThreadHandleReg::getInstance();
            myThdRegItem* item = thdReg->find(tid);/** 查询此线程id是否存在?*/

            if (item)
            {
                if (item->joinable == GTRUE)
                {/** 如果是joinable模式就等待线程循环退出,并释放线程资源 */
                    DWORD ret = 0;
                    WaitForSingleObject(item->thd, INFINITE);
                    GetExitCodeThread(item->thd, &ret);
                    thdReg->destroyItem(tid);
                    return G_OK; //todo process return value.
                }
                else
                {
                    return G_ErrorInvalidOperation;
                }
            }
        }
        else
        {
            printf("can not call(%s) in thread self\n", __FUNCTION__);
            return G_ErrorWouldBlock;
        }
    }

    return G_FAIL;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

四、更高层进一步的封装

前面的封装只是对线程API进行了简单的封装,但仍然保持在相当原始和简洁的状态。这一章就是在此前的基础上进一步进行封装,加入线程控制部分,如暂停,恢复等操作。

4.1 API定义

其API定义如下:

/**
*@brief  create a thread instance
*
*@param  name   [in] the name of thread.
*@param  timeout_ms  [in] the duration of thread timeout, in milli-second(ms),
set 0 to disable this feature
*@param  priority [in] the priority of current thread, @see also 
tf_thread_priority_t.
*@param  looper   [in] thread looper callback function.
*@param  opaque   [in] the parameter for callback function looper().
*
*@return success: thread instance, fail: NULL .
*@see
*@note   1. thread will not run looper() func until tfThreadResume() be called.
*        2. the default state is paused.
*
*/
G_API GPHD tfThreadCreate(const char* name, GU32 timeout_ms, GU32 priority, 
tfThreadFunc_t looper, void* opaque);

/**
*@brief  destroy a thread instance
*
*@param  tthd  [in] the handle of thread
*
*@return none.
*@see
*@note  thread must destroy after thread.
*
*/
G_API void  tfThreadDestroy(GPHD tthd);


/**
*@brief  resume current thread
*
*@param  tthd  [in] the handle of thread
*
*@return success: G_OK. fail: error code.
*@see
*@note this operation is async
*
*/
G_API GS32 tfThreadResume(GPHD tthd);


/**
*@brief  pause current thread
*
*@param  tthd   [in] the handle of thread
*@param  wait [in] how wait(see also: tf_thread_wait_t)
*
*@return success: G_OK. fail: error code.
*@see
*
*/
G_API GS32 tfThreadPause(GPHD tthd, tf_thread_wait_t wait);

/**
*@brief  stop current thread
*
*@param  tthd [in] the handle of thread
*
*@return success: G_OK. fail: error code.
*@see
*@note 1. this operation is sync mode
*      2. must call this API ,before destroy thread.
*      3. atfer this operation, the tfThreadCheckTerminated() will return 
GTRUE.
*
*/
G_API GS32 tfThreadStop(GPHD tthd);

/**
*@brief  check is need terminate current thread
*
*@param  tthd [in] the handle of thread
*
*@return need: GTRUE. needn't: GFALSE.
*@see
*@note void thread_looper(* opaque)
*      {
*		  GPHD thd = ((xxx)(opaque))->thd;
*
*		  while(tfThreadCheckTerminated(thd) != GTRUE)
*		  {
*		    ....
*		  }
*	   }
*
*/
G_API GBOL tfThreadCheckTerminated(GPHD tthd);


/**
*@brief  check current thread is timeout
*
*@param  tthd [in] the handle of thread
*
*@return timeout: GTRUE. doesn't: GFALSE.
*@see
*
*/
G_API GBOL tfThreadIsTimeout(GPHD tthd);


/**
*@brief  get thread id by thread handle
*
*@param  tthd [in] the handle of thread
*
*@return success: thread id, fail: TF_INVALID_TID 
*@see
*
*/
G_API tf_thread_id tfThreadGetTid(GPHD tthd);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116

4.2 API实现

接下来就是依次说明每个API的功能以及具体实现:

  • tfThreadCreate:创建一个线程,其参数较简单,就是,线程名字, 超时时间(如果一次线程循环超过这个时间,就可以根据相应的API来判断这个线程是否超时了,如果超时,就可能意味着程序没有按照我们预想的逻辑在运行,然后就是进一步追查问题所在位置以及原因)
GPHD tfThreadCreate(const char* name, GU32 timeout_ms, GU32 priority, 
tfThreadFunc_t looper, void* opaque)
{
      
	my_thread_t* mthd = NULL;
	
	if (looper && opaque)/** 判断参数是否有效 */
	{
		mthd = (my_thread_t*)memAlloc(sizeof(my_thread_t));

		if (mthd)
		{
			memset(mthd, 0, sizeof(*mthd));
			mthd->lock = vosMutexCreate();/**创建互斥锁 */
			GCHECK(mthd->lock);
			mthd->thread_cond = vosCondCreate();
			GCHECK(mthd->thread_cond);
			mthd->thread_request_cond = vosCondCreate();/**创建条件变量*/
			GCHECK(mthd->thread_request_cond);
			mthd->looper = looper;
			mthd->state = TTHD_StatePause;
			mthd->request_state = TTHD_StatePause;
			mthd->priority = priority;
			mthd->opaque = opaque;
			mthd->thd_name = name ? strdup(name) : name;
			mthd->tid = TF_INVALID_TID;

			if (timeout_ms > 0)/**只有当超时时间设置为大于0时,才会启动超时机制 */
			{/**initialize watchdog, only when timeout > 0 */
				mthd->watchdog = &(mthd->storge_watchdog);
				if (vosWatchdogLiteInit(mthd->watchdog) != G_OK)
				{/**初始化软件看门狗(模拟的,并非真实的看门狗) */
					mthd->watchdog = NULL;
					LOGW("initialize watchdog failed, the function of thread is timeout"
						"will lose efficacy\n");
				}
				else
				{/**如果初始化程序,就复位看门狗 */
					/** start watchdog */
					vosWatchdogLiteReset(mthd->watchdog, timeout_ms);
				}
			}
		}
		else
		{
			LOGE("alloc thread controller instance failed\n");
		}
	}
	else
	{
		LOGE("bad parameter, get_tid is NULL\n");
	}
	

	return (GPHD)mthd;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • tfThreadDestroy:销毁一个线程
void  tfThreadDestroy(GPHD tthd)
{
	if (tthd)
	{
		my_thread_t* mthd = (my_thread_t*)tthd;

		tfThreadStop(tthd);/**销毁之前必须保证线程处于stop状态 */

		if (mthd->lock)
		{/**销毁互斥锁 */
			vosMutexFree(mthd->lock);
		}

		if (mthd->thread_cond)
		{/**销毁条件变量*/
			vosCondFree(mthd->thread_cond);
		}

		if (mthd->thread_request_cond)
		{/**销毁请求改变线程状态的条件变量*/
			vosCondFree(mthd->thread_request_cond);
		}

		if (mthd->watchdog)
		{/**销毁软件看门狗*/
			vosWatchdogLiteDeinit(mthd->watchdog);
		}

		if (mthd->thd_name)
		{/**释放存储线程名的内存*/
			free((void*)mthd->thd_name);
		}

		if (mthd->tid != TF_INVALID_TID)
		{/**等待线程结束并释放资源*/
			vosThreadJoin(mthd->tid);
		}

		memFree(tthd);/**释放线程实例的内存*/
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • tfThreadResume:恢复线程到运行状态
GS32 tfThreadResume(GPHD tthd)
{
	GS32 ret = G_ErrorBadParameter;

	if (tthd)
	{
		my_thread_t* mthd = (my_thread_t*)tthd;
		vosMutexLock(mthd->lock);/**为操作加锁*/

		if (mthd->state != TTHD_StateRun)
		{/**判断线程是否已经处于运行状态*/
			if (mthd->tid == TF_INVALID_TID)
			{/**判断线程是否已经创建*/
				vos_thread_id tid;
				vos_thread_attr_t attr;
				memset(&attr, 0, sizeof(attr));
				attr.name = mthd->thd_name;
				attr.priority = mthd->priority;
				attr.joinable = GTRUE;
				/**调用vos封装的线程API来创建线程*/
				ret = vosThreadCreate(&tid, tfThreadLooper, mthd, &attr);

				if (G_OK == ret)
				{
					mthd->state = TTHD_StateRun;/**initialize state to running*/
					mthd->tid = tid;
				}
			}
			else
			{
				ret = G_OK;
			}

			if (ret == G_OK)
			{/**改变线程状态为运行状态,并广播通知线程等待的条件变量 */
				mthd->request_state = TTHD_StateRun; /**todo ,something */
				vosCondBroadcast(mthd->thread_cond);		
			}
		}
		else
		{
			ret = G_ErrorInvalidOperation;
		}
		/**解除加锁状态*/
		vosMutexUnlock(mthd->lock);
	}

	return ret;
}
	
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • tfThreadPause: 暂停线程(注:这里必须要等到完成一个线程循环才能进入暂停状态)
GS32 tfThreadPause(GPHD tthd, tf_thread_wait_t wait)
{
	GS32 ret = G_ErrorBadParameter;

	if (tthd)
	{
		my_thread_t* mthd = (my_thread_t*)tthd;
		vosMutexLock(mthd->lock);

		if (mthd->tid != TF_INVALID_TID)
		{
			if (mthd->state == TTHD_StateRun)
			{/**只有当线程处于运行状态时才能进行暂停操作 */				
				mthd->request_state = TTHD_StatePause; /**表示现在请求线程进入暂停状态*/

				if (wait == TTHD_WaitStateChange)
				{/**如果传入的参数为等待状态变化的话(同步操作),就进行等待 */
					if (mthd->tid == vosThreadGetId())
					{/**判断调用本API的地方是否为线程循环体,以避免死循环 */
						LOGE("%s called in thread's loop function and wait state\n"
							"change to pause, this will be lead to endless loop, "
							"it's a Bug\n", __FUNCTION__);
					}

					while (mthd->state != TTHD_StatePause)
					{/** wait thread state really change to pause */
					/**使用条件变量进行等待,直到状态变化完成 */
						vosCondWait(mthd->thread_request_cond, mthd->lock);
					}
				}
				
				ret = G_OK;
			}
			else
			{
				LOGE("thread state(%s) not in running,so can't do pause\n", 
				tfGetStateName(mthd->state));	
				ret = G_ErrorInvalidOperation;
			}
		}
		else
		{
			LOGE("thread haven't initialized\n");
			ret = G_ErrorNoInit;
		}

		vosMutexUnlock(mthd->lock);

	}

	return ret;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • tfThreadStop:停止一个线程,机实现机制和tfThreadPause基本相同,只是本API
    必须等待状态改变(也即是同步操作),就不在多做解释
GS32 tfThreadStop(GPHD tthd)
{
	GS32 ret = G_ErrorBadParameter;

	if (tthd)
	{
		my_thread_t* mthd = (my_thread_t*)tthd;
		vosMutexLock(mthd->lock);

		if (mthd->tid != TF_INVALID_TID)
		{
			if (mthd->state != TTHD_StateStop)
			{
				mthd->request_state = TTHD_StateStop;
				vosCondBroadcast(mthd->thread_cond);/**如果是暂停状态就需要唤醒条件变量*/
				if (mthd->tid == vosThreadGetId())
				{
					LOGE("%s called in thread's loop function and wait state\n"
						"change to pause, this will be lead to endless loop, "
						"it's a Bug\n", __FUNCTION__);
				}

				while (mthd->state != TTHD_StateStop)
				{/** wait thread state really change to stop */
					vosCondWait(mthd->thread_request_cond, mthd->lock);
				}
			}
			else
			{

				LOGE("thread already stopped,this is invalid operation\n");
				ret = G_ErrorInvalidOperation;
			}
		}
		else
		{
			LOGE("thread haven't initialized \n");
			ret = G_ErrorNoInit;
		}
		
		vosMutexUnlock(mthd->lock);
	}

	return ret;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • tfThreadCheckTerminated:判断线程是否结束以及具体实现线程的暂停等功能
GBOL tfThreadCheckTerminated(GPHD tthd)
{
	GBOL ret = GFALSE;

	if (tthd)
	{
		my_thread_t* mthd = (my_thread_t*)tthd;
		vosWatchdogLite_t* watchdog = mthd->watchdog;
		tf_thread_state_t request_state = mthd->request_state;
		if (watchdog)
		{/**如果启动的超时机制,此处就需要进行“喂狗” 操作 */
			vosWatchdogLiteReset(watchdog, mthd->timeout_ms);
		}

		if (TTHD_StatePause == request_state)
		{/**如果有请求暂停的操作,就进行如下操作*/

			vosMutexLock(mthd->lock);
			if (watchdog)
			{/**禁止/关断看门狗*/
				vosWatchdogLiteDisable(watchdog);
			}

			mthd->state = TTHD_StatePause;/**将线程状态变为:暂停*/

			vosCondBroadcast(mthd->thread_request_cond);
			/**广播通知等待状态变化的条件变量*/

			while (TTHD_StatePause == mthd->request_state)
			{/**线程进入暂停*/
				vosCondWait(mthd->thread_cond, mthd->lock);
			}
			
			/**如果线程状态变为非暂停(运行或停止)状态时,退出等待,
			 * 并将线程状态置为新的请求状态
			 */
			mthd->state = mthd->request_state;
			
			if (watchdog)
			{/**使能看门狗*/
				vosWatchdogLiteEnable(watchdog);
			}

			vosMutexUnlock(mthd->lock);

		}

		if (TTHD_StateStop == mthd->state)
		{/**如果线程状态变为了stop,返回GTRUE,表示结束当前线程,退出线程循环 */
		    /** when this function return ,current thread loop will be exit*/
			ret = GTRUE;
		}

	}
	else
	{
		ret = GTRUE;
	}

	return ret;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

-tfThreadIsTimeout:判断线程是否已经超时

GBOL tfThreadIsTimeout(GPHD tthd)
{
	GBOL ret = GFALSE; /** default as doesn't timeout */

	if (tthd)
	{
		my_thread_t* mthd = (my_thread_t*)tthd;

		vosMutexLock(mthd->lock);

		if (mthd->watchdog)
		{/**通过软件看门狗获取是否已经发生超时*/
			ret = vosWatchdogLiteIsTimeout(mthd->watchdog);			
		}

		vosMutexUnlock(mthd->lock); /** unlock */
	}

	return ret;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • tfThreadGetTid:获取当前线程实例所对应的的线程id
tf_thread_id tfThreadGetTid(GPHD tthd)
{
	return tthd ? ((my_thread_t*)tthd)->tid : TF_INVALID_TID;
}
  • 1
  • 2
  • 3
  • 4

###4.3 简单例子

#include "tiny_thread.h"
#include "vos_sys.h"

typedef struct myType_s
{
	GPHD myThd;
}myType_t;/**用户数据结构,包含有线程的handle*/

static void myThreadLoop(void* opaque)/**线程循环函数*/
{
	myType_t* mt = (myType_t*)opaque;
	GS64 st = vosSysTimeMs();
	while (tfThreadCheckTerminated(mt->myThd)!=GTRUE)/**判断线程是否结束以及控制线程状态*/
	{
		printf("hello my thread!!, the time: %lld (ms)\n", vosSysTimeMs() - st);
		vosSleepMs(100); /** sleep 100 ms */
	}

	printf("my thread exit..., the time: %lld (ms)\n", vosSysTimeMs() - st);
}


void threadTest()
{
	myType_t mt;
	/**创建线程*/
	mt.myThd = tfThreadCreate("myThread", 500,TTHD_PriorityDefault,myThreadLoop,&mt);

	if (mt.myThd)
	{
		tfThreadResume(mt.myThd);/**运行线程*/
		vosSleepSec(1); /** 睡眠1秒 */
		tfThreadPause(mt.myThd, TTHD_WaitStateChange);/**暂停线程,并等待线程状态变化完成 */
		vosSleepSec(1); /** 睡眠1秒*/

		tfThreadDestroy(mt.myThd); /**销毁线程*/
	}
	else
	{
		printf("create thread failed\n");
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

##五、完整代码

完整的代码在开源中国的git(码云)上面,是tiny framework(一个简易的c语言框架)的一部分,其地址如下:

https://gitee.com/xuanwolanxue/tiny_framework

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/418471
推荐阅读
相关标签
  

闽ICP备14008679号