Как усреднить время между вызовами одной функции если она работает разное время?
Есть задача вызывать функцию draw_data так, что бы между вызовами проходило не более 50мс.Достигнуть мне этого удалось в этом участке кода:
remainder = round(total_execution_time / (count_call * 50), 2)
if remainder > 1:
root.after(0, draw_data, vols, end_time, count_call + 1)
elif remainder <= 1:
root.after(50, draw_data, vols, end_time, count_call + 1)
здесь count_call * 50 - это: count_call хранит суммарное количество вызовов которое я умножаю на 50мс что бы получить "эталонное время" как если бы между вызовами всегда было бы по 50мс.Потом я делю total_execution_time - суммарное реальное время между вызовами на эталонное, и далее если частное между ними больше 1 это значит что мои вызовы "опаздывают" и я вызываю функцию в следующий раз немедленно root.after(0), если частное меньше 1, значит я делаю вызовы раньше, тоесть ускоряюсь относительно плана и вызываю уже тогда через 50 мс.В конце я вывел график который показывает сколько мс занимает каждый интервал от вызова до вызова, и среднее время .По графику видно что время разбросано от 0 до 100.Поэтому вопрос можно ли как то усреднить ближе к 50мс, каждый интервал, что бы данные отрисовывались "плавнее"? То есть как подобрать более подходящую задержку между вызовами?Вообще оригинальная задача состоит вот в чем: данные идут через вебсокет с интервалами от 5мс до 100, то есть очень часто и быстро, в месте с ними передается время каждого элемента. Все их пишу в фаил. Далее уже потом "оффлайн" , группирую по 50мс в датафрейме как в этом примере, и стараюсь отрисовывать их так как если бы они шли онлайн. Возможно эту задачу решить как то по другому вообще? Воспроизводимый код:
import time
import random
import pandas as pd
import numpy as np
import tkinter as tk
from matplotlib import pyplot as plt
total_execution_time = 0
tup_work_time_func = ()
def generate_data():
# Создаем временной диапазон с шагом 10 мс длинной в одну минуту
time_range = pd.date_range(start='2023-10-01 00:00:00', periods=6000, freq='10ms')
random_volumes = np.random.randint(1, 101, size=len(time_range))
# Создаем DataFrame
df = pd.DataFrame({'time': time_range, 'vol': random_volumes})
# Группируем данные по 50 мс
agg_df_50ms = df.groupby(pd.Grouper(key='time', freq='50ms'))['vol'].sum()
print(f'Общая продолжительность времени в исходных данных {df['time'].max() - df['time'].min()}')
return agg_df_50ms.tolist()
# Функция для обрамления текста цветным заполненным прямоугольником
def fill_color_vols():
"""Функция для заполнения текста прямоугольным цветом"""
last_text_id = canvas.find_withtag('text')[-1]
value_text = int(canvas.itemcget(last_text_id, 'text'))
coord_text = canvas.bbox(last_text_id)
padding = 3 # Отступ от текста
# Определяем цвет в зависимости от значения
if value_text < 100:
color = 'yellow'
elif 200 <= value_text <= 300:
color = 'red'
else:
color = 'green'
# Рисуем прямоугольник
rect_id = canvas.create_rectangle(
coord_text[0] - padding, coord_text[1] - padding,
coord_text[2] + padding, coord_text[3] + padding,
fill=color, outline=color, tags='rect')
# Ставим прямоугольник позади текста который был нарисован последним
canvas.tag_lower(rect_id, last_text_id)
def draw_vols(vol):
"""Функция для рисования цифр на канвасе в случайной позиции"""
x = np.random.randint(50, 450) # Генерируем случайную позицию по оси X
y = np.random.randint(50, 450) # Генерируем случайную позицию по оси Y
canvas.create_text(x, y, text=vol, fill='black', tags='text')
count = 0
# Просто цикл для имитации разного времени выполнения этой функции
for i in range(random.choice([100000, 1000000, 10000, 10])):
count += i
def remove_vols_above_limit():
"""Функция для удаления первого элемента если их на канвасе больше 50"""
all_elem_texts = canvas.find_withtag('text')
all_elem_rect = canvas.find_withtag('rect')
if len(all_elem_texts) > 50:
# Удаляем первый элемент из canvas
canvas.delete(all_elem_texts[0])
canvas.delete(all_elem_rect[0])
def draw_data(vols, last_time=None, count_call=1):
global tup_work_time_func, total_execution_time
# Инициализация last_time при первом вызове
if last_time is None:
last_time = time.perf_counter()
vols_0 = vols[0]
draw_vols(vols_0)
fill_color_vols()
remove_vols_above_limit()
del vols[0]
if len(vols) >= 1:
# Фиксируем время окончания выполнения
end_time = time.perf_counter()
# Вычисляем время выполнения между вызовами в мс
execution_time = int((end_time - last_time) * 1000)
# Кладем в кортеж каждый промежуток времени между вызовами для статистики в конце
tup_work_time_func += (execution_time,)
# Добавляем время выполнения к общему времени
total_execution_time += execution_time
remainder = round(total_execution_time / (count_call * 50), 2)
if remainder > 1:
root.after(0, draw_data, vols, end_time, count_call + 1)
elif remainder <= 1:
root.after(50, draw_data, vols, end_time, count_call + 1)
else:
# Выводим общее время выполнения
total_seconds = round((total_execution_time / 1000), 3)
# Создаем список для оси X (номера вызовов)
x_values = range(1, len(tup_work_time_func) + 1)
# Включаем интерактивный режим
plt.ion()
# Вычисляем среднее значение
mean_value = sum(tup_work_time_func) / len(tup_work_time_func)
# Построение графика
plt.figure(figsize=(10, 6)) # Увеличиваем размер графика
plt.plot(x_values, tup_work_time_func, marker='o', linestyle='-', color='b', label='Время выполнения')
# Добавляем горизонтальную линию для среднего значения
plt.axhline(mean_value, color='r', linestyle='--', label=f'Среднее: {mean_value:.2f} мс')
# Подписи осей
plt.xlabel('Номер вызова') # Подпись оси X
plt.ylabel('мс') # Подпись оси Y
# Заголовок графика
plt.title('График времени выполнения')
# Легенда
plt.legend()
# Отображение графика
plt.show()
if total_seconds >= 60:
minutes = int(total_seconds // 60) # Целое количество минут
seconds = round(total_seconds % 60, 3) # Оставшиеся секунды с миллисекундами
print(f"Общее время выполнения на канвасе: {minutes} минут {seconds} секунд")
else: # если прошло меньше минуты
print(f"Общее время выполнения на канвасе: {round(total_seconds, 3)} секунд")
with open('remainder.txt', 'w'):
pass
root = tk.Tk()
root.geometry('500x500')
canvas = tk.Canvas()
canvas.pack(expand=1, fill=tk.BOTH)
data = generate_data()
draw_data(data)
root.mainloop()
Ответы (1 шт):
import time
import random
import pandas as pd
import numpy as np
import tkinter as tk
from matplotlib import pyplot as plt
from collections import deque
from typing import List, Tuple, Optional, Deque
class DataVisualizer:
def __init__(self, target_interval_ms: int = 50, display_limit: int = 50):
self.target_interval_ms = target_interval_ms
self.display_limit = display_limit
self.root = tk.Tk()
self.root.geometry('500x500')
self.canvas = tk.Canvas()
self.canvas.pack(expand=1, fill=tk.BOTH)
self.execution_times: List[int] = []
self.last_draw_time: Optional[float] = None
self.total_time_ms = 0
self.call_count = 0
self.recent_delays: Deque[int] = deque(maxlen=10)
def generate_data(self) -> List[int]:
time_range = pd.date_range(start='2023-10-01 00:00:00', periods=6000, freq='10ms')
random_volumes = np.random.randint(1, 101, size=len(time_range))
df = pd.DataFrame({'time': time_range, 'vol': random_volumes})
agg_df_50ms = df.groupby(pd.Grouper(key='time', freq='50ms'))['vol'].sum()
print(f'Total time span in source data: {df["time"].max() - df["time"].min()}')
return agg_df_50ms.tolist()
def draw_volume(self, volume: int) -> None:
x = np.random.randint(50, 450)
y = np.random.randint(50, 450)
self.canvas.create_text(x, y, text=volume, fill='black', tags='text')
dummy_count = 0
for i in range(random.choice([100000, 1000000, 10000, 10])):
dummy_count += i
def add_color_background(self) -> None:
last_text_id = self.canvas.find_withtag('text')[-1]
value_text = int(self.canvas.itemcget(last_text_id, 'text'))
coord_text = self.canvas.bbox(last_text_id)
padding = 3
if value_text < 100:
color = 'yellow'
elif 200 <= value_text <= 300:
color = 'red'
else:
color = 'green'
rect_id = self.canvas.create_rectangle(
coord_text[0] - padding, coord_text[1] - padding,
coord_text[2] + padding, coord_text[3] + padding,
fill=color, outline=color, tags='rect'
)
self.canvas.tag_lower(rect_id, last_text_id)
def manage_display_elements(self) -> None:
all_texts = self.canvas.find_withtag('text')
all_rects = self.canvas.find_withtag('rect')
if len(all_texts) > self.display_limit:
self.canvas.delete(all_texts[0])
self.canvas.delete(all_rects[0])
def calculate_next_delay(self) -> int:
self.call_count += 1
actual_interval = 0 if not self.recent_delays else sum(self.recent_delays) / len(self.recent_delays)
error = self.target_interval_ms - actual_interval
correction = int(error * 0.7)
next_delay = max(0, min(self.target_interval_ms * 2, self.target_interval_ms + correction))
return next_delay
def draw_data(self, data_points: List[int], current_time: Optional[float] = None) -> None:
if current_time is None:
current_time = time.perf_counter()
self.last_draw_time = current_time
if self.last_draw_time:
elapsed_ms = int((current_time - self.last_draw_time) * 1000)
self.recent_delays.append(elapsed_ms)
self.execution_times.append(elapsed_ms)
self.total_time_ms += elapsed_ms
if data_points:
current_value = data_points[0]
self.draw_volume(current_value)
self.add_color_background()
self.manage_display_elements()
if len(data_points) > 1:
next_data = data_points[1:]
next_time = time.perf_counter()
delay_ms = self.calculate_next_delay()
self.last_draw_time = next_time
self.root.after(delay_ms, self.draw_data, next_data, next_time)
else:
self.show_statistics()
def show_statistics(self) -> None:
total_seconds = self.total_time_ms / 1000
mean_value = sum(self.execution_times) / len(self.execution_times)
plt.figure(figsize=(10, 6))
plt.plot(range(1, len(self.execution_times) + 1), self.execution_times,
marker='o', linestyle='-', color='b', label='Execution Time')
plt.axhline(mean_value, color='r', linestyle='--',
label=f'Average: {mean_value:.2f} ms')
plt.axhline(self.target_interval_ms, color='g', linestyle='--',
label=f'Target: {self.target_interval_ms} ms')
plt.xlabel('Call Number')
plt.ylabel('Time (ms)')
plt.title('Draw Interval Timing')
plt.legend()
if total_seconds >= 60:
minutes = int(total_seconds // 60)
seconds = round(total_seconds % 60, 3)
print(f"Total execution time: {minutes} minutes {seconds} seconds")
else:
print(f"Total execution time: {round(total_seconds, 3)} seconds")
std_dev = np.std(self.execution_times)
print(f"Standard deviation: {std_dev:.2f} ms")
plt.show()
def run(self) -> None:
data = self.generate_data()
self.draw_data(data)
self.root.mainloop()
if __name__ == "__main__":
visualizer = DataVisualizer(target_interval_ms=50, display_limit=50)
visualizer.run()
Вместо простого двоичного подхода (0 мс или 50 мс) здесь используется PID-подобный регулятор, который постепенно регулирует время, основываясь на последних результатах работы. В результате получаются гораздо более стабильные интервалы. Сглаживание скользящего среднего: поддерживает скользящее окно последних времен выполнения для более точного прогнозирования предстоящих задержек.
Основное улучшение - в функции calculate_next_delay(), которая динамически подстраивает тайминг, основываясь на последних результатах работы, а не использует первоначальный подход, основанный на остатке. Это создает петлю обратной связи, которая постоянно подстраивает тайминг, чтобы он сходился к целевому значению 50 мс. и настрайвай тут под себя уже.
visualizer = DataVisualizer(target_interval_ms=5, display_limit=100)