Как коректно завершить поток при многопоточной работе в Pyton
find_template_for_single(template_path, window_cord, window_width, window_height, accuracy=0.99, scale_factors=[1.0], min_accuracy=0.80, max_wait_time=10, ynamic_accuracy=True, stabilization_thresholds=[0.7, 0.5, 0.3, 0.2], testxt=True, test=False)
Есть функция которая по шаблону занимается поиском изображения на экране и возвращает координаты найденого обьекта в заданом секторе поиска.
def find_multiple_templates(template_paths, window_cord, window_width, window_height, accuracy=0.99, scale_factors=[1.0], min_accuracy=0.80, max_wait_time=10, dynamic_accuracy=True,
stabilization_thresholds=[0.7, 0.5, 0.3, 0.2], testxt=True, test=False, single_wait_time=1, timeout_global=3):
all_results = [None] * len(template_paths) # Результаты поиска по каждому шаблону
found_templates = set() # Множество для отслеживания найденных шаблонов
def wrapper(template_path, idx):
"""Обёртка для вызова find_template_for_single с возвратом результата."""
result = find_template_for_single(
template_path,
window_cord,
window_width,
window_height,
accuracy,
scale_factors,
min_accuracy,
max_wait_time,
dynamic_accuracy,
stabilization_thresholds,
testxt,
test
)
return (result[0], result[1], idx + 1) if result[1] else None
with ThreadPoolExecutor() as executor:
futures = []
# Устанавливаем глобальный таймаут с помощью stopit
with stopit.ThreadingTimeout(timeout_global) as timeout_ctx:
try:
for idx, template_path in enumerate(template_paths):
futures.append(executor.submit(wrapper, template_path, idx))
# Получаем результаты, пока задачи выполняются
for future in futures:
try:
result = future.result(timeout=single_wait_time) # Таймаут для каждой задачи
if result is not None:
all_results[result[2] - 1] = result # Сохраняем результат
found_templates.add(result[2]) # Добавляем индекс найденного шаблона
except TimeoutError:
print("Задача превысила время ожидания.")
except Exception as e:
if testxt:
print(f"Ошибка в задаче: {e}")
except stopit.TimeoutException:
print(f"Глобальный таймаут ({timeout_global} секунд) истёк.")
except Exception as e:
print(f"Ошибка выполнения: {e}")
# Возвращаем все найденные результаты
return [result for result in all_results if result is not None]
Вторая функция запускает первую в несколько потоков для поиска нескольких изображений одновременно.
И вот главный вопрос, как прервать поток?
Пример, 4 шаблона. Если все шаблоны на экране, оно быстро найдет. А вот если одного изображения не будет будет на экране, искать будет до таймаута заданого. А я бы хотел преривать поиск, если уже несколько найдены.
Условно: 1 найден, значит запускается таймер в 1 секунду, если за это время нового не найдено - все потоки прерываются.
Уже пробовал внешнюю библиотеку stopit.
Ответы (1 шт):
Была похожая задача:
С помощью OpenCV нужно было производить поиск множества объектов на экране. Однако, как только находился хотя бы один валидный объект, процесс поиска необходимо было немедленно прервать, полученные координаты вернуть и запустить поиск заново.
Я решил использовать ProcessPoolExecutor для выполнения всех вычислений в отдельных процессах. Однако столкнулся с проблемой: задачи, отправленные в пул процессов, невозможно отменить.
В свою очередь это приводило к серьёзной проблеме: при многократной постановке задач на поиск, количество одновременно запущенных процессов увеличивалось, что вызывало заметное снижение производительности.
Метод future.cancel() отменял только те задачи, которые ещё не были запущены в работу. В свою очередь, задачи которые уже находились в процессе выполнения, продолжали выполняться в фоновом режиме. Это всё вызывало эффект снежного кома.
Немного изучив код ProcessPoolExecutor, я наткнулся на интересную строку:
# Map of pids to processes
self._processes = {}
И придумал небольшой хак:
for process in executor._processes.values():
process.terminate()
Очевидно, что доступ к защищённому атрибуту _processes не был предусмотрен разработчиками, однако в моём случае это сработало как надо.
Небольшой пример:
from concurrent.futures import ProcessPoolExecutor, as_completed, CancelledError
from concurrent.futures.process import BrokenProcessPool
import time
import os
import random
def task(data):
time.sleep(random.randint(2, 5))
print(f"Процесс {os.getpid()} завершил задачу {data}")
return data
search_value = 7
if __name__ == '__main__':
with ProcessPoolExecutor() as executor:
futures = [executor.submit(task, i) for i in range(40)]
shutdown_flag = False # Флаг, указывающий, что результат найден
try:
for future in as_completed(futures, timeout=6): # Завершенные задачи
result = future.result()
print(f"Задача завершена с результатом: {result}")
if result == search_value:
print(f"Искомый результат {search_value} найден. Завершаем остальные процессы.")
shutdown_flag = True
break
except TimeoutError:
print("Не все задачи завершились за указанный таймаут.")
shutdown_flag = True
# Принудительное завершение процессов, если shutdown_flag = True
if shutdown_flag:
for future in futures:
if not future.done() and not future.cancelled(): # Если задача ещё не была завершена и не отменена
print(f"Отменяем задачу {futures.index(future)}")
future.cancel()
# Принудительно завершаем процессы, которые всё ещё активны, что бы не крутились в фоне после future.cancel()
for process in executor._processes.values():
print(f"Завершаем процесс {process.pid}")
process.terminate()
for i, future in enumerate(futures):
if future.done():
try:
result = future.result()
print(f"Задача {i} завершена с результатом: {result}")
except CancelledError:
print(f"Задача {i} была отменена.")
except BrokenProcessPool:
print(f"Задача {i} выполнялась, но была прервана.")
Вывод:
Процесс 20740 завершил задачу 0
Задача завершена с результатом: 0
Процесс 7636 завершил задачу 2
Задача завершена с результатом: 2
Процесс 22816 завершил задачу 4
Задача завершена с результатом: 4
Процесс 21348 завершил задачу 6
Задача завершена с результатом: 6
Процесс 19836 завершил задачу 1
Задача завершена с результатом: 1
Процесс 20884 завершил задачу 3
Задача завершена с результатом: 3
Процесс 8080 завершил задачу 5
Задача завершена с результатом: 5
Процесс 7636 завершил задачу 9
Задача завершена с результатом: 9
Процесс 21348 завершил задачу 11
Задача завершена с результатом: 11
Процесс 23196 завершил задачу 7
Задача завершена с результатом: 7
Искомый результат 7 найден. Завершаем остальные процессы.
Отменяем задачу 8
Отменяем задачу 10
Отменяем задачу 12
Отменяем задачу 13
Отменяем задачу 14
Отменяем задачу 15
Отменяем задачу 16
Отменяем задачу 17
Отменяем задачу 18
Отменяем задачу 19
Отменяем задачу 20
Отменяем задачу 21
Отменяем задачу 22
Отменяем задачу 23
Отменяем задачу 24
Отменяем задачу 25
Отменяем задачу 26
Отменяем задачу 27
Отменяем задачу 28
Отменяем задачу 29
Отменяем задачу 30
Отменяем задачу 31
Отменяем задачу 32
Отменяем задачу 33
Отменяем задачу 34
Отменяем задачу 35
Отменяем задачу 36
Отменяем задачу 37
Отменяем задачу 38
Отменяем задачу 39
Завершаем процесс 20740
Завершаем процесс 19836
Завершаем процесс 7636
Завершаем процесс 20884
Завершаем процесс 22816
Завершаем процесс 8080
Завершаем процесс 21348
Завершаем процесс 23196
Задача 0 завершена с результатом: 0
Задача 1 завершена с результатом: 1
Задача 2 завершена с результатом: 2
Задача 3 завершена с результатом: 3
Задача 4 завершена с результатом: 4
Задача 5 завершена с результатом: 5
Задача 6 завершена с результатом: 6
Задача 7 завершена с результатом: 7
Задача 8 выполнялась, но была прервана.
Задача 9 завершена с результатом: 9
Задача 10 выполнялась, но была прервана.
Задача 11 завершена с результатом: 11
Задача 12 выполнялась, но была прервана.
Задача 13 выполнялась, но была прервана.
Задача 14 выполнялась, но была прервана.
Задача 15 выполнялась, но была прервана.
Задача 16 выполнялась, но была прервана.
Задача 17 выполнялась, но была прервана.
Задача 18 выполнялась, но была прервана.
Задача 19 выполнялась, но была прервана.
Задача 20 выполнялась, но была прервана.
Задача 21 выполнялась, но была прервана.
Задача 22 выполнялась, но была прервана.
Задача 23 выполнялась, но была прервана.
Задача 24 выполнялась, но была прервана.
Задача 25 выполнялась, но была прервана.
Задача 26 выполнялась, но была прервана.
Задача 27 была отменена.
Задача 28 была отменена.
Задача 29 была отменена.
Задача 30 была отменена.
Задача 31 была отменена.
Задача 32 была отменена.
Задача 33 была отменена.
Задача 34 была отменена.
Задача 35 была отменена.
Задача 36 была отменена.
Задача 37 была отменена.
Задача 38 была отменена.
Задача 39 была отменена.