首先要准备一个类方便每次初始化的时候能够得到唯一的 VDevice, 同时也方便最后全部用完以后释放这个VDevice
class HailoTarget:
__instance: Optional[VDevice] = None
@staticmethod
def get_instance() -> VDevice:
if HailoTarget.__instance is None:
devices = Device.scan()
params = VDevice.create_params()
params.scheduling_algorithm = HailoSchedulingAlgorithm.ROUND_ROBIN
HailoTarget.__instance = VDevice(params)
return HailoTarget.__instance
@staticmethod
def release():
if HailoTarget.__instance is not None:
HailoTarget.__instance.release()
HailoTarget.__instance = None
创建一个 HailoExecutor, 里面只有 初始化, 运行,以及释放资源三个基本的函数,以输入的hef路径为输入参数
class HailoExecutor():
def __init__(self, model, batch_size):
file_name = os.path.basename(model)
self.model_name = os.path.splitext(file_name)[0]
self.hef, self.network_group, self.input_vstreams_params, self.output_vstreams_params = initNN(model, 0, batch_size);
self.infer_pipeline = InferVStreams(self.network_group, self.input_vstreams_params, self.output_vstreams_params)
self.infer_pipeline.__enter__()
def run(self, frame, postprocess):
start = int(round(time.time() * 1000))
raw_result = self.infer_pipeline.infer(frame)
end = int(round(time.time() * 1000))
print("NPU Inference Elapsed time for ", self.model_name, ": ", end-start, "milliseconds")
postprocess(raw_result);
def release(self):
print("release ", self.model_name)
self.infer_pipeline.__exit__(self.network_group, self.input_vstreams_params, self.output_vstreams_params)
initNN的细节为:
def initNN(nnModel="./models/carDet.hef", num=0, batch_size=1):
hef = HEF(nnModel)
inputs = hef.get_input_vstream_infos()
outputs = hef.get_output_vstream_infos()
configure_params = ConfigureParams.create_from_hef(hef, interface=HailoStreamInterface.PCIe)
network_name = hef.get_network_group_names()[0]
configure_params[network_name].batch_size = batch_size
target = HailoTarget.get_instance()
network_group = target.configure(hef, configure_params)[0]
network_group_params = network_group.create_params()
[log.info('Input layer: {} {}'.format(layer_info.name, layer_info.shape)) for layer_info in inputs]
[log.info('Output layer: {} {}'.format(layer_info.name, layer_info.shape)) for layer_info in outputs]
print(nnModel, "\t\tdone")
input_vstreams_params = InputVStreamParams.make_from_network_group(network_group, quantized=True, format_type=FormatType.UINT8)
output_vstreams_params = OutputVStreamParams.make_from_network_group(network_group, quantized=False, format_type=FormatType.FLOAT32)
#return network_group, input_vstreams_params, output_vstreams_params
return hef, network_group, input_vstreams_params, output_vstreams_params;
调用方式也及其的简单:
- 初始化executor:
poolCarDet = HailoExecutor(model=carDetModelPath, batch_size=1) - 在视频流中循环调用运行函数:
frame = poolCarDet.run(norm_image, my_callback), 如果后处理需要增加一些参数的话,可能还需要详细设计下这个run函数的架构 - 全部运行完成后释放模型资源以及设备资源
poolCarDet.release()HailoTarget.release()
注:如果要上面代码运行OK,需要开启hailort的service功能(安装hailort的时候,选择开启 hailort service):

目前还不确认如果不开启是否有其他封装方式。如果有,后期后增加专门的章节加入。
回复 alex 取消回复