最近在做一个项目的demo,需要使用到NPU的检测模型检测出来结果,然后把结果再利用NPU做特征提取,利用deepsort去做追踪任务,最后显示最终结果到画面。
因为客户要求的FPS需要达到实时显示的条件,而检测以及特征提取需要的算力比较高,从latency的角度来说,串行运行这些是不可能达到实时的效果的。
那么在Python的框架下,我们可以利用多进程以及对应的共享内存和queue的方式来进行加速,从而实现了实时监测与跟踪的效果。
具体实现过程
- images_queue、terminate_event、det_ready_event 和 draw_ready_event 用于进程间通信和同步。
- 利用det_queue传递检测结果以及tracking_queue传递最终追踪结果。
- 考虑到不管是检测还是追踪里面的特征提取都需要使用的图像,我们使用frames_shared作为图像的共享内存数组,用于存储帧数据,以便多个进程进行处理。
- 一共创建并启动了三个进程:
- p_npu 用于检测处理。
- p_tracking 用于跟踪(包含特征提取)。
- p_frames 用于帧处理。
- 主循环从视频文件中读取帧,将其复制到共享内存数组中,并在视频结束时设置terminate_event。
- 最后,代码释放资源,加入进程,并关闭任何打开的窗口。
代码:
from multiprocessing import Process, Queue, Lock, Event
import cv2
import queue
import multiprocessing
video_path = “VID.mp4”
#used for display.
images_queue = Queue()
terminate_event = Event()
det_ready_event = Event()
draw_ready_event = Event()
det_queue = multiprocessing.Queue()
tracking_queue = multiprocessing.Queue()
cap = cv2.VideoCapture(video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frames_shared = multiprocessing.Array(‘B’, width * height * 3)
p_npu = multiprocessing.Process(target=npu_process, args=(frames_shared, width, height, det_queue, terminate_event))
p_tracking = multiprocessing.Process(target=tracking_process, args=(frames_shared, width, height,
tracking_queue, terminate_event, draw_ready_event))
p_frames = multiprocessing.Process(target=process_frames, args=(frames_shared, width, height,
tracking_queue, terminate_event, draw_ready_event))
p_npu.start()
p_tracking.start()
p_frames.start()
while cap.isOpened():
start_time = time.time()
ret, frame = cap.read()
if not ret:
break
frame_shared = np.frombuffer(frames_shared.get_obj(), dtype=np.uint8).reshape(-1, height, width, 3)[0]
np.copyto(frame_shared, frame)
terminate_event.set()
cap.release()
p_npu.join()
p_tracking.join()
p_frames.join()
cv2.destroyAllWindows()
备注: 此方案实现了在x86上面的实时性,但是当转移到RK3588上面的时候,整个过程多进程混乱,整体效果很差,目前不确定优化方向。
发表回复