测试so接口函数的脚本 [python]

下面是一个测试solib库中调用函数的测试脚本,但该脚本还存在这一些问题,我目前无法理解和解决;

问题:

1.我定义了logging采用日志滚动的方式,写日志,并且每个日志的大小是20M,但测试结果发现日志连1M都没到就开始轮转了,并且在轮转过程中,还出现logging写日志,却发现,日志轮转了,结果竟然报了,轮转日志不存在。

Traceback (most recent call last):

 File "/usr/local/python2710/lib/python2.7/logging/handlers.py", line 77, in emit

 File "/usr/local/python2710/lib/python2.7/logging/handlers.py", line 77, in emit

self.doRollover()

Traceback (most recent call last):

 File "/usr/local/python2710/lib/python2.7/logging/handlers.py", line 77, in emit

self.doRollover()

 File "/usr/local/python2710/lib/python2.7/logging/handlers.py", line 135, in doRollover

os.remove(dfn)

OSError: [Errno 2] No such file or directory: '/tmp/mysofuns_test.log.5'

2.在进行多线程测试时,发现写到文件中的日志和直接打到屏幕上的日志竟然不一样。但经过分析,发现写到文件中的日志,出现了重复,而打到屏幕上的日志却是正常的。不能理解,暂时我也不知道如何避免。

#!/usr/bin/python27
#coding:utf-8
import threading
from os.path import exists,basename
from os import mkdir,getcwd,sep
import sys
import time
import logging
logging.basicConfig(level=logging.DEBUG,format='%(asctime)s (%(threadName)-2s) %(message)s',)

class fileso():
    conf_dir = getcwd() + '/conf'
    conf_filename = "config.py"
    
    def init(self,conf_dir=None,conf_filename=None):
        if conf_dir:
            self.conf_dir = conf_dir
        conf_dir = self.conf_dir
        sys.path.append(conf_dir)
        
        if conf_filename:
            self.conf_filename = conf_filename
        conf_filename = conf_dir + sep + self.conf_filename
        
        if exists(conf_dir):
            if exists(conf_filename):
                return True
            else:
                print "没有在",conf_dir,"中找到测试主配置文件:",conf_filename
        else:
            mkdir(conf_dir)
            
        with open(conf_filename,'w+') as conf_file:
            print "开始初始化主配置文件...."
            conf_contant = """
#coding:utf-8
# 设置输出日志的格式,默认只输出消息.
# 如: -f fn 或 -f f lineno,dt,tn,pid
# 可使用的格式符号:
#    ln  显示日志级别名称.
#    fp  显示此程序的绝对路径.
#    fn  显示当前执行程序名.
#    funcn  显示打印日志的函数名.
#    lineno  显示日志的行号.
#    dt  显示日期时间.
#    tn  显示进程名.
#    pid  显示进程pid.
logformat = ""
# 设置输出日志的文件名:
logfilename = "/tmp/mytest.log"
# so库函数的位置:
so_file = "/tmp/libPSBCSM3JDLL.so"
# C语言类型与测试工具需要的类型的对应表
#
#--------------------------------------------------
# C type =============== TEST type
#--------------------------------------------------
#
# char  ---------------- strchar (1-character string)
# char* ---------------- strchar*
#
# char ----------------- char (byte char)
# unsigned char -------- unsigned char (byte char)
# unsigned char* ------- unsigned char* (byte char)
# wchar_t -------------- c_wchar (1-character unicode string)
# wchar_t* ------------- c_wchar_p (unicode or None)
#
# short ---------------- short
# unsigned short ------- unsigned short
# int ------------------ int
# int8 ----------------- int8 (8bit int)
# int16 ---------------- int16 (16bit int)
# int32 ---------------- int32 (32bit int)
# int64 or __int64 ----- int64 (64bit int)
# unsigned int8 -------- unsigned int8 (8bit unsigned int)
# unsigned int16 ------- unsigned int16 (16bit unsigned int)
# unsigned int32 ------- unsigned int32 (32bit unsigned int)
# unsigned int64 ------- unsigned int64 (64bit unsigned int)
# long ----------------- long
# unsigned long -------- unsigned long
# long long ------------ long long
#
# float ---------------- float
# double --------------- double
#
# bool ----------------- bool
# void* ---------------- void*
#
#--------------------------------------------------
# 
# so库函数所有的参数类型
so_args = (
    #['FunctionsName','Func_return_type','Func_type1','Func_type2',...]
    ["TestFuncsName1",
     "int",
     "unsigned char*",
     "unsigned char*",
     "unsigned char*",
     "unsigned char*",
     "int",
     "int",
     "int",
     "__int64",
     "int"], # <--- Note:","
    ["TestFuncsName2",
     "int",
     "unsigned char*",
     "unsigned char*",
     "unsigned char*",
     "unsigned char*",
     "int",
     "int",
     "int",
     "__int64",
     "int"], # <--- Note:","
)
# 给so库函数提供的所有测试数据.
# 下面是为了更清晰而分行写了,它们是可以写成一行的.
# 写成一行的格式:
#  so_value = [ (["函数名",'第一个参数','第二个参数','第三个参数',....]), ] 
#  请注意书写格式: 特别是逗号; 如-------------------------------------  ^  ,这个。
so_value = [
    #(["FunctionName",'Func_parameter1','Func_parameter2','Func_parameter3',....]),
    ("TestFuncsName1",
     ['0x1234567812345678123456781234567812345678123456781234567812345678',
      '12345678901',
      '123456',
      '123456',
      0,  #<---- 注:若整数类型的,可直接写.
      2,
      6,
      20160101000000,
      0
     ], # <--- 注意:","
     ['0x1234567812345678123456781234567812345678123456781234567812345678',
      '12345678901',
      '123456',
      '123456',
      0,
      2,
      6,
      20160101000000,
      0
     ], # <--- Note:","
    ),  # <--- Note:","
    ("TestFuncsName2",
     ['0x1234567812345678123456781234567812345678123456781234567812345678',
      '12345678901',
      '123456',
      '123456',
      0,
      2,
      6,
      20160101000000,
      0
     ], # <--- Note:","
     ['0x1234567812345678123456781234567812345678123456781234567812345678',
      '12345678901',
      '123456',
      '123456',
      0,
      2,
      6,
      20160101000000,
      0
     ],
    ), # <--- Note:","
]
"""
            conf_file.write(conf_contant)
            conf_file.flush()
            conf_file.close()
            print "初始化完成,现在可以修改主配置文件:",conf_filename, \
                 '了\n修改完成后,直接执行',sys.argv[0],'即可.\n查看帮助可输入:',sys.argv[0],' -h' \
                 '若需要重新初始化,则直接删除:',conf_filename
            sys.exit(0)
    
    osFuncs = {} 
    conf_fobj = None
    logformat = None
    logfilename = None
    multiThread = False
    # 注: 我对C语言不是太懂,以前学了皮毛,都还老师了,
    #故:这里的C类型对应,是按PythonDOC中的说明写的。其中char存在歧义,故做了一点修改,
    #还有int64不是太确定以下定义是否正确。
    dict_ctype = {"strchar":'c_char',"strchar*":'c_char_p',
          "wchar_t":'c_wchar',"wchar_t*":'c_wchar_p',
          "char":'c_byte',
          "unsigned char":'c_ubyte',"unsigned char*":'c_char_p',
          "short":'c_short',
          "unsigned short":'c_ushort',
          "int":'c_int',"int8":'c_uint8',"int16":'c_uint16',
          "int32":'c_uint32',"int64":'c_uint64',
          "unsigned int8":'c_uint8',"unsigned int16":'c_uint16',
          "unsigned int32":'c_uint32',"unsigned int64":'c_uint64',
          "long":'c_long',
          "unsigned long":'c_ulong',
          "__int64":'c_longlong',"long long":'c_longlong',
          "unsigned __int64":'c_ulonglong',"unsigned long long":'c_ulonglong',
          "float":'c_float',
          "double":'c_double',
          "bool":'c_bool',
          "void*":'c_void_p',
        }
        
    def __init__(self,conf_dir=None,conf_filename=None):
        self.init(conf_dir,conf_filename)
        self.conf_fobj = __import__(self.conf_filename.rstrip('.py'))
        self.logfilename = self.conf_fobj.logfilename
        self.logformat = self.conf_fobj.logformat.split(',')
        
    def setMultiThread(self,mBool):
        self.multiThread = mBool
    
    def convertType_c2t(self):
        """The function is c type convert test type. """
        fargs = self.conf_fobj.so_args
        funcs = {}
        paras = ""
        retargs = {} 
        for l in fargs:
            # Second paras: return type of the function
            retargs[l[0]]=(self.dict_ctype[l[1]])
            for i in range(len(l[2:])):
                paras = paras + "," + self.dict_ctype[l[i+2]]
            # First paras: function name of os file in
            funcs[l[0]] = paras.lstrip(',')
            paras = ""
        return funcs,retargs
        
    def handleValue_t2c(self,values,paras):
        """The function is test value convert c value."""
        count = 0
        vStr = ""
        intlong = ("c_byte","c_ubyte","c_short","c_ushort","c_int","c_long", \
                   "c_ulong","c_longlong","c_ulonglong","c_uint8","c_uint16", \
                   "c_uint32","c_uint64",)
        for p in paras.split(','):
            if p == "c_bool":
                # c type:_Bool = py type:bool(1)
                vStr = vStr + ',' + str(values[count])
            elif p == "c_char":
                # c type:char = py type: 1-character string
                vStr = vStr + ',"' + values[count] + '"'
            elif p == "c_wchar":
                # c type:wchar_t = py type: 1-character unicode string
                vStr = vStr + ',"' + values[count] + '"'
            elif p in intlong:
                # c type:intlong = py type: int/long
                vStr = vStr + ',' + str(values[count])
            elif p in ('c_float','c_double','c_longdouble'):
                # c type:() = py type: float
                vStr = vStr + ',' + str(values[count])
            elif p == 'c_char_p':
                # c type:char* = py type:string or None
                vStr = vStr + ',"' + values[count] + '"'
            elif p == 'c_wchar_p':
                # c type:wchar_t* = py type:unicode or None
                vStr = vStr + ',"' + values[count] + '"'
            elif p == 'c_void_p':
                # c type:void* = py type:int/long or None
                vStr = vStr + ',"' + str(values[count]) + '"'
            count += 1
        return vStr.lstrip(',')
    
# 注:这里不是注释,预览看这里变成注释了;
    def content_join(self,mainContent):
        imp_section_content = """
#coding:utf-8
from ctypes import *
import logging
from logging.handlers import RotatingFileHandler
"""
        format = ""
        for f in self.logformat:
            if f == 'ln':
                format = format + ' %(levelname)s'
            elif f == 'fp':
                format = format + ' %(pathname)s'
            elif f == 'fn':
                format = format + ' %(filename)s'
            elif f == 'funcn':
                format = format + ' %(funcName)s'
            elif f == 'lineno':
                # output: log row number.
                format = format + ' %(lineno)d'
            elif f == 'dt':
                # output: data time.
                format = format + ' %(asctime)s'
            elif f == 'tn':
                format = format + ' %(threadName)s'
            elif f == 'pid':
                format = format + ' %(process)d'
        format = format + ' %(message)s'
        log_section_content = 'format="' + format + '"\n' + 'logfilename="' + self.logfilename + '"' + """
log = logging.getLogger()
Rthandler = RotatingFileHandler(logfilename,maxBytes=20*1024*1024,backupCount=5)
Rthandler.setFormatter(logging.Formatter(format))
log.addHandler(Rthandler)
log.setLevel(logging.INFO)
"""
        main_section_content = "fso='" + self.conf_fobj.so_file + "'" + '''
cdll.LoadLibrary(fso)
openFso = CDLL(fso)
''' + mainContent
        content = imp_section_content + '\n' + log_section_content + '\n' + main_section_content
        return content
    def constructor(self):
        '''功能: 合成调用so库函数的语句;
                 语句有三种: 
                    1.指定so库函数的参数都是什么类型;
                    2.指定so库函数的返回值是什么类型;
                    3.指定so库函数需要的所有参数值.
        '''
        # convertType_c2t return 2 value:
        #     funcs={"funcName":"funcParaTypes"}
        #     retargs={funcName:returnParaType}
        funcs,retargs = self.convertType_c2t()
        mainContentAll = ""
        if self.multiThread:
            compile_main = {}
            compile_common = compile(self.content_join(""),'','exec')
            self.osFuncs[compile_common] = compile_main
        # paraValues=('funcName',['paras1','paras2',...],[...],...)
        # so_value=[paraValues,paraValues,...]
        for paraValues in self.conf_fobj.so_value:
            for k_funcName,v_funcStrParaTypes in funcs.items():
                if paraValues[0] == k_funcName:
                    count = 0
                    for groupval in paraValues[1:]:
                        funcStrParaValues = self.handleValue_t2c(groupval,v_funcStrParaTypes)
                        #指定so库函数的参数都是什么类型;
                        funcName_type_join = 'openFso.' + k_funcName + '.argtpes=[' + v_funcStrParaTypes + ']'
                        #指定so库函数的返回值是什么类型;
                        funcName_retType_join = 'openFso.' + k_funcName + '.restype=' + retargs[k_funcName]
                        #指定so库函数需要的所有参数值.
                        call_func_join = 'openFso.' + k_funcName + '(' + funcStrParaValues + ')'
                        output_msg_c1 = "log.info('测试:" + str(count) + "')"
                        output_msg_c2 = "log.info('测试内容:" + call_func_join + "')"
                        exec_func_join = "exec_func = " + call_func_join
                        output_msg_c3 = "log.info('测试结果:'+str(exec_func))"
                        mainContent = funcName_type_join + '\n' + \
                                      funcName_retType_join + '\n' + \
                                      output_msg_c1 + '\n' + \
                                      exec_func_join + '\n' + \
                                      output_msg_c2 + '\n' + \
                                      output_msg_c3 + '\n'
                        # 生成一个文件中包含so库的所有函数
                        mainContentAll = mainContentAll + '\n' + mainContent
                        if self.multiThread:
                            # 每个元素是一个文件,一个文件中只测试一个so库函数
                            if count == 0:
                                compile_custom_test = []
                            #print 'k_funcName=',k_funcName,'\nmainContent=',mainContent
                            compile_custom_test.append(compile(mainContent,'','exec'))
                            compile_main[k_funcName]=compile_custom_test
                        count += 1
        return self.content_join(mainContentAll)
        
    def starttest(self):
        ''' 单线程测试. '''
        exec self.constructor()
        print "提示:请查看输出日志文件:",self.logfilename
        
    def mstarttest(self):
        ''' 根据测试函数的个数启动相应数量的线程. '''
        self.setMultiThread(True)
        self.constructor()
        self.run()
        
    def num_starttest(self,num):
        """启动指定倍数的线程"""
        self.setMultiThread(True)
        self.constructor()
        for i in range(num):
            self.run('--'+str(i)+"轮")
            
    def run(self,mark=""):
        if self.osFuncs:
            # self.osFuncs=[compile_common_code1,{"funcName":[compile_custom_test_para1,...]},{..},..]
            k_common_code, = self.osFuncs
            all_paras_code = self.osFuncs[k_common_code]
            # condition: 它必须是全局条件锁,否则它将失去它的作用.
            condition = threading.Condition()
            worker = []
            i = 0
            for k_funcName,v_execcode in all_paras_code.items():
                funcCode = (k_common_code,v_execcode,)
                w = threading.Thread(name='Worker'+str(i)+mark,target=self.worker,args=(condition,funcCode,))
                worker.append(w)
                i += 1
            b = threading.Thread(name='Boss',target=self.boss,args=(condition,))
            for t in worker:
                t.start()
            b.start()
            
    def boss(self,condition): 
        logging.debug('多线程测试开始...')
        # 这里用with语句写的,但其实,我对它并不是很了解,只是照书上写的。如果有了解的,也可以多交流,呵呵。。
        with condition:
            #这里调用notifyAll同时通知所有已经处于wait状态的子线程,使它们同时启动起来。实现一个简单的并行压测。
            condition.notifyAll()
    def worker(self,condition,codefile):
        #with condition就类似于if condition.acquire():,来判断是否获取了锁;
        #with语句的优势: 似乎是可以通过__exit__()来释放锁;这个我不确定,可能理解错误。
        #这篇文章,介绍with语句的用法:http://blog.csdn.net/suwei19870312/article/details/23258495
        with condition:
            #线程执行wait()方法时,会处于等待状态,它只有接受到notify通知后,才能获得锁。
            condition.wait()
            exec codefile[0]
            for codeObj in codefile[1]:
                #print 'codeObj=',codeObj
                exec codeObj
                
help_msg = '使用格式:' + sys.argv[0] + ' [-h | -f | -mf | -mn]\n' + \
           '参数:\n' + \
           '\t-h   显示帮助信息.\n\t-mf   启动线程数=测试函数的个数\n' + \
           '\t-mn   启动线程数=测试函数的个数 x 指定的数.如: -mn 3 就是启动3倍的函数个数.\n'
           
def handle_paras():
    fso = fileso() 
    count = 1
    for args in sys.argv[1:]:
        if args in ('/h','-h','--help'):
            print help_msg
            sys.exit(0)
        elif args == '-mf':
            fso.mstarttest()
            return
        elif args == '-mn':
            Num = int(sys.argv[count+1])
            fso.num_starttest(Num)
            return
    fso.starttest()
    
if __name__ == '__main__':
    handle_paras()

原创文章,作者:Wn1m,如若转载,请注明出处:http://www.178linux.com/10758