下面是一个测试solib库中调用函数的测试脚本,但该脚本还存在这一些问题,我目前无法理解和解决;
问题:
1.我定义了logging采用日志滚动的方式,写日志,并且每个日志的大小是20M,但测试结果发现日志连1M都没到就开始轮转了,并且在轮转过程中,还出现logging写日志,却发现,日志轮转了,结果竟然报了,轮转日志不存在。
Traceback (most recent call last):
File "/usr/local/python2710/lib/python2.7/logging/handlers.py", line 77, in emit
File "/usr/local/python2710/lib/python2.7/logging/handlers.py", line 77, in emit
self.doRollover()
Traceback (most recent call last):
File "/usr/local/python2710/lib/python2.7/logging/handlers.py", line 77, in emit
self.doRollover()
File "/usr/local/python2710/lib/python2.7/logging/handlers.py", line 135, in doRollover
os.remove(dfn)
OSError: [Errno 2] No such file or directory: '/tmp/mysofuns_test.log.5'
2.在进行多线程测试时,发现写到文件中的日志和直接打到屏幕上的日志竟然不一样。但经过分析,发现写到文件中的日志,出现了重复,而打到屏幕上的日志却是正常的。不能理解,暂时我也不知道如何避免。
#!/usr/bin/python27
#coding:utf-8
import threading
from os.path import exists,basename
from os import mkdir,getcwd,sep
import sys
import time
import logging
logging.basicConfig(level=logging.DEBUG,format='%(asctime)s (%(threadName)-2s) %(message)s',)
class fileso():
conf_dir = getcwd() + '/conf'
conf_filename = "config.py"
def init(self,conf_dir=None,conf_filename=None):
if conf_dir:
self.conf_dir = conf_dir
conf_dir = self.conf_dir
sys.path.append(conf_dir)
if conf_filename:
self.conf_filename = conf_filename
conf_filename = conf_dir + sep + self.conf_filename
if exists(conf_dir):
if exists(conf_filename):
return True
else:
print "没有在",conf_dir,"中找到测试主配置文件:",conf_filename
else:
mkdir(conf_dir)
with open(conf_filename,'w+') as conf_file:
print "开始初始化主配置文件...."
conf_contant = """
#coding:utf-8
# 设置输出日志的格式,默认只输出消息.
# 如: -f fn 或 -f f lineno,dt,tn,pid
# 可使用的格式符号:
# ln 显示日志级别名称.
# fp 显示此程序的绝对路径.
# fn 显示当前执行程序名.
# funcn 显示打印日志的函数名.
# lineno 显示日志的行号.
# dt 显示日期时间.
# tn 显示进程名.
# pid 显示进程pid.
logformat = ""
# 设置输出日志的文件名:
logfilename = "/tmp/mytest.log"
# so库函数的位置:
so_file = "/tmp/libPSBCSM3JDLL.so"
# C语言类型与测试工具需要的类型的对应表
#
#--------------------------------------------------
# C type =============== TEST type
#--------------------------------------------------
#
# char ---------------- strchar (1-character string)
# char* ---------------- strchar*
#
# char ----------------- char (byte char)
# unsigned char -------- unsigned char (byte char)
# unsigned char* ------- unsigned char* (byte char)
# wchar_t -------------- c_wchar (1-character unicode string)
# wchar_t* ------------- c_wchar_p (unicode or None)
#
# short ---------------- short
# unsigned short ------- unsigned short
# int ------------------ int
# int8 ----------------- int8 (8bit int)
# int16 ---------------- int16 (16bit int)
# int32 ---------------- int32 (32bit int)
# int64 or __int64 ----- int64 (64bit int)
# unsigned int8 -------- unsigned int8 (8bit unsigned int)
# unsigned int16 ------- unsigned int16 (16bit unsigned int)
# unsigned int32 ------- unsigned int32 (32bit unsigned int)
# unsigned int64 ------- unsigned int64 (64bit unsigned int)
# long ----------------- long
# unsigned long -------- unsigned long
# long long ------------ long long
#
# float ---------------- float
# double --------------- double
#
# bool ----------------- bool
# void* ---------------- void*
#
#--------------------------------------------------
#
# so库函数所有的参数类型
so_args = (
#['FunctionsName','Func_return_type','Func_type1','Func_type2',...]
["TestFuncsName1",
"int",
"unsigned char*",
"unsigned char*",
"unsigned char*",
"unsigned char*",
"int",
"int",
"int",
"__int64",
"int"], # <--- Note:","
["TestFuncsName2",
"int",
"unsigned char*",
"unsigned char*",
"unsigned char*",
"unsigned char*",
"int",
"int",
"int",
"__int64",
"int"], # <--- Note:","
)
# 给so库函数提供的所有测试数据.
# 下面是为了更清晰而分行写了,它们是可以写成一行的.
# 写成一行的格式:
# so_value = [ (["函数名",'第一个参数','第二个参数','第三个参数',....]), ]
# 请注意书写格式: 特别是逗号; 如------------------------------------- ^ ,这个。
so_value = [
#(["FunctionName",'Func_parameter1','Func_parameter2','Func_parameter3',....]),
("TestFuncsName1",
['0x1234567812345678123456781234567812345678123456781234567812345678',
'12345678901',
'123456',
'123456',
0, #<---- 注:若整数类型的,可直接写.
2,
6,
20160101000000,
0
], # <--- 注意:","
['0x1234567812345678123456781234567812345678123456781234567812345678',
'12345678901',
'123456',
'123456',
0,
2,
6,
20160101000000,
0
], # <--- Note:","
), # <--- Note:","
("TestFuncsName2",
['0x1234567812345678123456781234567812345678123456781234567812345678',
'12345678901',
'123456',
'123456',
0,
2,
6,
20160101000000,
0
], # <--- Note:","
['0x1234567812345678123456781234567812345678123456781234567812345678',
'12345678901',
'123456',
'123456',
0,
2,
6,
20160101000000,
0
],
), # <--- Note:","
]
"""
conf_file.write(conf_contant)
conf_file.flush()
conf_file.close()
print "初始化完成,现在可以修改主配置文件:",conf_filename, \
'了\n修改完成后,直接执行',sys.argv[0],'即可.\n查看帮助可输入:',sys.argv[0],' -h' \
'若需要重新初始化,则直接删除:',conf_filename
sys.exit(0)
osFuncs = {}
conf_fobj = None
logformat = None
logfilename = None
multiThread = False
# 注: 我对C语言不是太懂,以前学了皮毛,都还老师了,
#故:这里的C类型对应,是按PythonDOC中的说明写的。其中char存在歧义,故做了一点修改,
#还有int64不是太确定以下定义是否正确。
dict_ctype = {"strchar":'c_char',"strchar*":'c_char_p',
"wchar_t":'c_wchar',"wchar_t*":'c_wchar_p',
"char":'c_byte',
"unsigned char":'c_ubyte',"unsigned char*":'c_char_p',
"short":'c_short',
"unsigned short":'c_ushort',
"int":'c_int',"int8":'c_uint8',"int16":'c_uint16',
"int32":'c_uint32',"int64":'c_uint64',
"unsigned int8":'c_uint8',"unsigned int16":'c_uint16',
"unsigned int32":'c_uint32',"unsigned int64":'c_uint64',
"long":'c_long',
"unsigned long":'c_ulong',
"__int64":'c_longlong',"long long":'c_longlong',
"unsigned __int64":'c_ulonglong',"unsigned long long":'c_ulonglong',
"float":'c_float',
"double":'c_double',
"bool":'c_bool',
"void*":'c_void_p',
}
def __init__(self,conf_dir=None,conf_filename=None):
self.init(conf_dir,conf_filename)
self.conf_fobj = __import__(self.conf_filename.rstrip('.py'))
self.logfilename = self.conf_fobj.logfilename
self.logformat = self.conf_fobj.logformat.split(',')
def setMultiThread(self,mBool):
self.multiThread = mBool
def convertType_c2t(self):
"""The function is c type convert test type. """
fargs = self.conf_fobj.so_args
funcs = {}
paras = ""
retargs = {}
for l in fargs:
# Second paras: return type of the function
retargs[l[0]]=(self.dict_ctype[l[1]])
for i in range(len(l[2:])):
paras = paras + "," + self.dict_ctype[l[i+2]]
# First paras: function name of os file in
funcs[l[0]] = paras.lstrip(',')
paras = ""
return funcs,retargs
def handleValue_t2c(self,values,paras):
"""The function is test value convert c value."""
count = 0
vStr = ""
intlong = ("c_byte","c_ubyte","c_short","c_ushort","c_int","c_long", \
"c_ulong","c_longlong","c_ulonglong","c_uint8","c_uint16", \
"c_uint32","c_uint64",)
for p in paras.split(','):
if p == "c_bool":
# c type:_Bool = py type:bool(1)
vStr = vStr + ',' + str(values[count])
elif p == "c_char":
# c type:char = py type: 1-character string
vStr = vStr + ',"' + values[count] + '"'
elif p == "c_wchar":
# c type:wchar_t = py type: 1-character unicode string
vStr = vStr + ',"' + values[count] + '"'
elif p in intlong:
# c type:intlong = py type: int/long
vStr = vStr + ',' + str(values[count])
elif p in ('c_float','c_double','c_longdouble'):
# c type:() = py type: float
vStr = vStr + ',' + str(values[count])
elif p == 'c_char_p':
# c type:char* = py type:string or None
vStr = vStr + ',"' + values[count] + '"'
elif p == 'c_wchar_p':
# c type:wchar_t* = py type:unicode or None
vStr = vStr + ',"' + values[count] + '"'
elif p == 'c_void_p':
# c type:void* = py type:int/long or None
vStr = vStr + ',"' + str(values[count]) + '"'
count += 1
return vStr.lstrip(',')
# 注:这里不是注释,预览看这里变成注释了;
def content_join(self,mainContent):
imp_section_content = """
#coding:utf-8
from ctypes import *
import logging
from logging.handlers import RotatingFileHandler
"""
format = ""
for f in self.logformat:
if f == 'ln':
format = format + ' %(levelname)s'
elif f == 'fp':
format = format + ' %(pathname)s'
elif f == 'fn':
format = format + ' %(filename)s'
elif f == 'funcn':
format = format + ' %(funcName)s'
elif f == 'lineno':
# output: log row number.
format = format + ' %(lineno)d'
elif f == 'dt':
# output: data time.
format = format + ' %(asctime)s'
elif f == 'tn':
format = format + ' %(threadName)s'
elif f == 'pid':
format = format + ' %(process)d'
format = format + ' %(message)s'
log_section_content = 'format="' + format + '"\n' + 'logfilename="' + self.logfilename + '"' + """
log = logging.getLogger()
Rthandler = RotatingFileHandler(logfilename,maxBytes=20*1024*1024,backupCount=5)
Rthandler.setFormatter(logging.Formatter(format))
log.addHandler(Rthandler)
log.setLevel(logging.INFO)
"""
main_section_content = "fso='" + self.conf_fobj.so_file + "'" + '''
cdll.LoadLibrary(fso)
openFso = CDLL(fso)
''' + mainContent
content = imp_section_content + '\n' + log_section_content + '\n' + main_section_content
return content
def constructor(self):
'''功能: 合成调用so库函数的语句;
语句有三种:
1.指定so库函数的参数都是什么类型;
2.指定so库函数的返回值是什么类型;
3.指定so库函数需要的所有参数值.
'''
# convertType_c2t return 2 value:
# funcs={"funcName":"funcParaTypes"}
# retargs={funcName:returnParaType}
funcs,retargs = self.convertType_c2t()
mainContentAll = ""
if self.multiThread:
compile_main = {}
compile_common = compile(self.content_join(""),'','exec')
self.osFuncs[compile_common] = compile_main
# paraValues=('funcName',['paras1','paras2',...],[...],...)
# so_value=[paraValues,paraValues,...]
for paraValues in self.conf_fobj.so_value:
for k_funcName,v_funcStrParaTypes in funcs.items():
if paraValues[0] == k_funcName:
count = 0
for groupval in paraValues[1:]:
funcStrParaValues = self.handleValue_t2c(groupval,v_funcStrParaTypes)
#指定so库函数的参数都是什么类型;
funcName_type_join = 'openFso.' + k_funcName + '.argtpes=[' + v_funcStrParaTypes + ']'
#指定so库函数的返回值是什么类型;
funcName_retType_join = 'openFso.' + k_funcName + '.restype=' + retargs[k_funcName]
#指定so库函数需要的所有参数值.
call_func_join = 'openFso.' + k_funcName + '(' + funcStrParaValues + ')'
output_msg_c1 = "log.info('测试:" + str(count) + "')"
output_msg_c2 = "log.info('测试内容:" + call_func_join + "')"
exec_func_join = "exec_func = " + call_func_join
output_msg_c3 = "log.info('测试结果:'+str(exec_func))"
mainContent = funcName_type_join + '\n' + \
funcName_retType_join + '\n' + \
output_msg_c1 + '\n' + \
exec_func_join + '\n' + \
output_msg_c2 + '\n' + \
output_msg_c3 + '\n'
# 生成一个文件中包含so库的所有函数
mainContentAll = mainContentAll + '\n' + mainContent
if self.multiThread:
# 每个元素是一个文件,一个文件中只测试一个so库函数
if count == 0:
compile_custom_test = []
#print 'k_funcName=',k_funcName,'\nmainContent=',mainContent
compile_custom_test.append(compile(mainContent,'','exec'))
compile_main[k_funcName]=compile_custom_test
count += 1
return self.content_join(mainContentAll)
def starttest(self):
''' 单线程测试. '''
exec self.constructor()
print "提示:请查看输出日志文件:",self.logfilename
def mstarttest(self):
''' 根据测试函数的个数启动相应数量的线程. '''
self.setMultiThread(True)
self.constructor()
self.run()
def num_starttest(self,num):
"""启动指定倍数的线程"""
self.setMultiThread(True)
self.constructor()
for i in range(num):
self.run('--'+str(i)+"轮")
def run(self,mark=""):
if self.osFuncs:
# self.osFuncs=[compile_common_code1,{"funcName":[compile_custom_test_para1,...]},{..},..]
k_common_code, = self.osFuncs
all_paras_code = self.osFuncs[k_common_code]
# condition: 它必须是全局条件锁,否则它将失去它的作用.
condition = threading.Condition()
worker = []
i = 0
for k_funcName,v_execcode in all_paras_code.items():
funcCode = (k_common_code,v_execcode,)
w = threading.Thread(name='Worker'+str(i)+mark,target=self.worker,args=(condition,funcCode,))
worker.append(w)
i += 1
b = threading.Thread(name='Boss',target=self.boss,args=(condition,))
for t in worker:
t.start()
b.start()
def boss(self,condition):
logging.debug('多线程测试开始...')
# 这里用with语句写的,但其实,我对它并不是很了解,只是照书上写的。如果有了解的,也可以多交流,呵呵。。
with condition:
#这里调用notifyAll同时通知所有已经处于wait状态的子线程,使它们同时启动起来。实现一个简单的并行压测。
condition.notifyAll()
def worker(self,condition,codefile):
#with condition就类似于if condition.acquire():,来判断是否获取了锁;
#with语句的优势: 似乎是可以通过__exit__()来释放锁;这个我不确定,可能理解错误。
#这篇文章,介绍with语句的用法:http://blog.csdn.net/suwei19870312/article/details/23258495
with condition:
#线程执行wait()方法时,会处于等待状态,它只有接受到notify通知后,才能获得锁。
condition.wait()
exec codefile[0]
for codeObj in codefile[1]:
#print 'codeObj=',codeObj
exec codeObj
help_msg = '使用格式:' + sys.argv[0] + ' [-h | -f | -mf | -mn]\n' + \
'参数:\n' + \
'\t-h 显示帮助信息.\n\t-mf 启动线程数=测试函数的个数\n' + \
'\t-mn 启动线程数=测试函数的个数 x 指定的数.如: -mn 3 就是启动3倍的函数个数.\n'
def handle_paras():
fso = fileso()
count = 1
for args in sys.argv[1:]:
if args in ('/h','-h','--help'):
print help_msg
sys.exit(0)
elif args == '-mf':
fso.mstarttest()
return
elif args == '-mn':
Num = int(sys.argv[count+1])
fso.num_starttest(Num)
return
fso.starttest()
if __name__ == '__main__':
handle_paras()
原创文章,作者:Wn1m,如若转载,请注明出处:http://www.178linux.com/10758

