En este avanzado Supervisión de roboflow Tutorial, construimos una tubería completa de detección de objetos con la biblioteca de supervisión. Comenzamos configurando el seguimiento de objetos en tiempo real utilizando Bytetracker, agregando suavizado de detección y definiendo zonas de polígono para monitorear regiones específicas en una transmisión de video. A medida que procesamos los marcos, los anotamos con cuadros delimitadores, ID de objeto y datos de velocidad, lo que nos permite rastrear y analizar el comportamiento de los objetos con el tiempo. Nuestro objetivo es mostrar cómo podemos combinar la detección, el seguimiento, el análisis basado en zonas y la anotación visual en un flujo de trabajo de análisis de video sin interrupciones e inteligentes. Mira el Códigos completos aquí.
!pip install supervision ultralytics opencv-python
!pip install --upgrade supervision
import cv2
import numpy as np
import supervision as sv
from ultralytics import YOLO
import matplotlib.pyplot as plt
from collections import defaultdict
model = YOLO('yolov8n.pt')
Comenzamos instalando los paquetes necesarios, incluidas la supervisión, el ultraaltical y la opencv. Después de garantizar que tengamos la última versión de supervisión, importamos todas las bibliotecas requeridas. Luego inicializamos el modelo YOLOV8N, que sirve como detector de núcleo en nuestra tubería. Mira el Códigos completos aquí.
try:
tracker = sv.ByteTrack()
except AttributeError:
try:
tracker = sv.ByteTracker()
except AttributeError:
print("Using basic tracking - install latest supervision for advanced tracking")
tracker = None
try:
smoother = sv.DetectionsSmoother(length=5)
except AttributeError:
smoother = None
print("DetectionsSmoother not available in this version")
try:
box_annotator = sv.BoundingBoxAnnotator(thickness=2)
label_annotator = sv.LabelAnnotator()
if hasattr(sv, 'TraceAnnotator'):
trace_annotator = sv.TraceAnnotator(thickness=2, trace_length=30)
else:
trace_annotator = None
except AttributeError:
try:
box_annotator = sv.BoxAnnotator(thickness=2)
label_annotator = sv.LabelAnnotator()
trace_annotator = None
except AttributeError:
print("Using basic annotators - some features may be limited")
box_annotator = None
label_annotator = None
trace_annotator = None
def create_zones(frame_shape):
h, w = frame_shape[:2]
try:
entry_zone = sv.PolygonZone(
polygon=np.array([[0, h//3], [w//3, h//3], [w//3, 2*h//3], [0, 2*h//3]]),
frame_resolution_wh=(w, h)
)
exit_zone = sv.PolygonZone(
polygon=np.array([[2*w//3, h//3], [w, h//3], [w, 2*h//3], [2*w//3, 2*h//3]]),
frame_resolution_wh=(w, h)
)
except TypeError:
entry_zone = sv.PolygonZone(
polygon=np.array([[0, h//3], [w//3, h//3], [w//3, 2*h//3], [0, 2*h//3]])
)
exit_zone = sv.PolygonZone(
polygon=np.array([[2*w//3, h//3], [w, h//3], [w, 2*h//3], [2*w//3, 2*h//3]])
)
return entry_zone, exit_zone
Configuramos componentes esenciales de la biblioteca de supervisión, incluido el seguimiento de objetos con bytetrack, suavizado opcional utilizando detecciones que seis y anotadores flexibles para cajas delimitadoras, etiquetas y rastros. Para garantizar la compatibilidad entre las versiones, utilizamos bloques Try-Except para recurrir a clases alternativas o funcionalidad básica cuando sea necesario. Además, definimos zonas de polígono dinámico dentro del marco para monitorear regiones específicas como áreas de entrada y salida, lo que permite el análisis espacial avanzado. Mira el Códigos completos aquí.
class AdvancedAnalytics:
def __init__(self):
self.track_history = defaultdict(list)
self.zone_crossings = {"entry": 0, "exit": 0}
self.speed_data = defaultdict(list)
def update_tracking(self, detections):
if hasattr(detections, 'tracker_id') and detections.tracker_id is not None:
for i in range(len(detections)):
track_id = detections.tracker_id[i]
if track_id is not None:
bbox = detections.xyxy[i]
center = np.array([(bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2])
self.track_history[track_id].append(center)
if len(self.track_history[track_id]) >= 2:
prev_pos = self.track_history[track_id][-2]
curr_pos = self.track_history[track_id][-1]
speed = np.linalg.norm(curr_pos - prev_pos)
self.speed_data[track_id].append(speed)
def get_statistics(self):
total_tracks = len(self.track_history)
avg_speed = np.mean([np.mean(speeds) for speeds in self.speed_data.values() if speeds])
return {
"total_objects": total_tracks,
"zone_entries": self.zone_crossings["entry"],
"zone_exits": self.zone_crossings["exit"],
"avg_speed": avg_speed if not np.isnan(avg_speed) else 0
}
def process_video(source=0, max_frames=300):
"""
Process video source with advanced supervision features
source: video path or 0 for webcam
max_frames: limit processing for demo
"""
cap = cv2.VideoCapture(source)
analytics = AdvancedAnalytics()
ret, frame = cap.read()
if not ret:
print("Failed to read video source")
return
entry_zone, exit_zone = create_zones(frame.shape)
try:
entry_zone_annotator = sv.PolygonZoneAnnotator(
zone=entry_zone,
color=sv.Color.GREEN,
thickness=2
)
exit_zone_annotator = sv.PolygonZoneAnnotator(
zone=exit_zone,
color=sv.Color.RED,
thickness=2
)
except (AttributeError, TypeError):
entry_zone_annotator = sv.PolygonZoneAnnotator(zone=entry_zone)
exit_zone_annotator = sv.PolygonZoneAnnotator(zone=exit_zone)
frame_count = 0
results_frames = []
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
while ret and frame_count < max_frames:
ret, frame = cap.read()
if not ret:
break
results = model(frame, verbose=False)[0]
detections = sv.Detections.from_ultralytics(results)
detections = detections[detections.class_id == 0]
if tracker is not None:
detections = tracker.update_with_detections(detections)
if smoother is not None:
detections = smoother.update_with_detections(detections)
analytics.update_tracking(detections)
entry_zone.trigger(detections)
exit_zone.trigger(detections)
labels = []
for i in range(len(detections)):
confidence = detections.confidence[i] if detections.confidence is not None else 0.0
if hasattr(detections, 'tracker_id') and detections.tracker_id is not None:
track_id = detections.tracker_id[i]
if track_id is not None:
speed = analytics.speed_data[track_id][-1] if analytics.speed_data[track_id] else 0
label = f"ID:{track_id} | Conf:{confidence:.2f} | Speed:{speed:.1f}"
else:
label = f"Conf:{confidence:.2f}"
else:
label = f"Conf:{confidence:.2f}"
labels.append(label)
annotated_frame = frame.copy()
annotated_frame = entry_zone_annotator.annotate(annotated_frame)
annotated_frame = exit_zone_annotator.annotate(annotated_frame)
if trace_annotator is not None:
annotated_frame = trace_annotator.annotate(annotated_frame, detections)
if box_annotator is not None:
annotated_frame = box_annotator.annotate(annotated_frame, detections)
else:
for i in range(len(detections)):
bbox = detections.xyxy[i].astype(int)
cv2.rectangle(annotated_frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 255, 0), 2)
if label_annotator is not None:
annotated_frame = label_annotator.annotate(annotated_frame, detections, labels)
else:
for i, label in enumerate(labels):
if i < len(detections):
bbox = detections.xyxy[i].astype(int)
cv2.putText(annotated_frame, label, (bbox[0], bbox[1]-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
stats = analytics.get_statistics()
y_offset = 30
for key, value in stats.items():
text = f"{key.replace('_', ' ').title()}: {value:.1f}"
cv2.putText(annotated_frame, text, (10, y_offset),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
y_offset += 30
if frame_count % 30 == 0:
results_frames.append(cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB))
frame_count += 1
if frame_count % 50 == 0:
print(f"Processed {frame_count} frames...")
cap.release()
if results_frames:
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
axes = axes.flatten()
for i, (ax, frame) in enumerate(zip(axes, results_frames[:4])):
ax.imshow(frame)
ax.set_title(f"Frame {i*30}")
ax.axis('off')
plt.tight_layout()
plt.show()
final_stats = analytics.get_statistics()
print("\n=== FINAL ANALYTICS ===")
for key, value in final_stats.items():
print(f"{key.replace('_', ' ').title()}: {value:.2f}")
return analytics
print("Starting advanced supervision demo...")
print("Features: Object detection, tracking, zones, speed analysis, smoothing")
Definimos la clase AdvancedAnalytics para rastrear el movimiento de los objetos, calcular la velocidad y los cruces de zona de conteo, lo que permite una rica información de video en tiempo real. Dentro de la función Process_video, leemos cada cuadro de la fuente de video y la ejecutamos a través de nuestra tubería de detección, seguimiento y suavizado. Anotamos marcos con cajas limitantes, etiquetas, superposiciones de zona y estadísticas en vivo, dándonos un sistema poderoso y flexible para el monitoreo de objetos y el análisis espacial. A lo largo del bucle, también recopilamos datos para la visualización e imprimimos las estadísticas finales, que muestran la efectividad de las capacidades de extremo a extremo de RoboFlow Supervision. Mira el Códigos completos aquí.
def create_demo_video():
"""Create a simple demo video with moving objects"""
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('demo.mp4', fourcc, 20.0, (640, 480))
for i in range(100):
frame = np.zeros((480, 640, 3), dtype=np.uint8)
x1 = int(50 + i * 2)
y1 = 200
x2 = int(100 + i * 1.5)
y2 = 250
cv2.rectangle(frame, (x1, y1), (x1+50, y1+50), (0, 255, 0), -1)
cv2.rectangle(frame, (x2, y2), (x2+50, y2+50), (255, 0, 0), -1)
out.write(frame)
out.release()
return 'demo.mp4'
demo_video = create_demo_video()
analytics = process_video(demo_video, max_frames=100)
print("\nTutorial completed! Key features demonstrated:")
print("✓ YOLO integration with Supervision")
print("✓ Multi-object tracking with ByteTracker")
print("✓ Detection smoothing")
print("✓ Polygon zones for area monitoring")
print("✓ Advanced annotations (boxes, labels, traces)")
print("✓ Real-time analytics and statistics")
print("✓ Speed calculation and tracking history")
Para probar nuestra tubería completa, generamos un video de demostración sintético con dos rectángulos en movimiento que simulan objetos rastreados. Esto nos permite validar la detección, el seguimiento, el monitoreo de la zona y el análisis de velocidad sin necesidad de una entrada del mundo real. Luego ejecutamos la función Process_video en el clip generado. Al final, imprimimos un resumen de todas las características clave que hemos implementado, mostrando el poder de la supervisión de RoboFlow para el análisis visual en tiempo real.
En conclusión, hemos implementado con éxito una tubería completa que reúne la detección de objetos, el seguimiento, el monitoreo de la zona y el análisis en tiempo real. Demostramos cómo visualizar ideas clave como la velocidad del objeto, los cruces de zona e historial de seguimiento con marcos de video anotados. Esta configuración nos permite ir más allá de la detección básica y construir un sistema de vigilancia o análisis inteligente utilizando herramientas de código abierto. Ya sea para el uso de la investigación o la producción, ahora tenemos una base poderosa para expandirnos con capacidades aún más avanzadas.
Mira el Códigos completos aquí. No dude en ver nuestro Página de Github para tutoriales, códigos y cuadernos. Además, siéntete libre de seguirnos Gorjeo Y no olvides unirte a nuestro Subreddit de 100k+ ml y suscribirse a Nuestro boletín.
Asif Razzaq es el CEO de MarktechPost Media Inc .. Como empresario e ingeniero visionario, ASIF se compromete a aprovechar el potencial de la inteligencia artificial para el bien social. Su esfuerzo más reciente es el lanzamiento de una plataforma de medios de inteligencia artificial, MarktechPost, que se destaca por su cobertura profunda de noticias de aprendizaje automático y de aprendizaje profundo que es técnicamente sólido y fácilmente comprensible por una audiencia amplia. La plataforma cuenta con más de 2 millones de vistas mensuales, ilustrando su popularidad entre el público.