When we use the OpenCV computer open source vision library to process video streams (rtsp, rtmp…), we found that if we instantiate a non-existent RTSP video stream address, a timeout problem will occur, and This instantiation thread is atomically blocked and cannot be forced to exit. This is unacceptable in some scenarios, so we must adopt a certain plan to limit the instantiation time of cv2.VideoCapture(rtsp) After many explorations, I found that using custom multi-threading and decorators can solve this problem, the code is as follows:
# opencv_demo.py # OpenCV-Python timeout for opening a non-existent RTSP video stream import cv2 import base64 import time import threading TIME_LIMITED: int = 1
# Decorator to limit the actual request time or function execution time deflimit_decor(limit_time): """ :param limit_time: Set the maximum allowable execution time, unit: second :return: Untimed returns the value of the decorated function; timed out returns None """ deffunctions(func): defrun(*params): thre_func = MyThread(target=func, args=params) # The thread method terminates when the main thread terminates (exceeds its length) thre_func.setDaemon(True) thre_func.start() # Count the number of segmental slumbers sleep_num = int(limit_time // 1) sleep_nums = round(limit_time % 1, 1) # Sleep briefly several times and try to get the return value for i inrange(sleep_num): time.sleep(1) infor = thre_func.get_result() if infor: return infor time.sleep(sleep_nums) # Final return value (whether or not the thread has terminated) if thre_func.get_result(): return thre_func.get_result() else: return (False, None) # Timeout returns can be customized
RTSP = 'rtsp://192.168.3.88/av0_0'# No existing RTSP video stream address result = frame_get(RTSP) ifnot result: print("Failed to open RTSP stream") else: print(result[:10])
returnNone
if __name__ == "__main__": main()
测试结果
1 2 3
python opencv_demo.py False None Failed to open RTSP stream