Real-Time Edge Vision: OpenCV + ESP32-CAM for Embedded Inference
Stream video from an ESP32-CAM, process frames with OpenCV on the edge, and run lightweight inference — all without a cloud dependency.
Why ESP32-CAM + OpenCV?
Cloud-based computer vision has a problem: latency, cost, and privacy. When you need real-time vision on a device that costs $8 and fits in your palm, the ESP32-CAM paired with OpenCV on a companion computer is hard to beat.
The architecture:
1ESP32-CAM --WiFi/MJPEG--> Companion Computer (OpenCV + Inference)1. Setting Up the ESP32-CAM Stream
Flash the standard CameraWebServer example or a minimal MJPEG streamer:
1#include "esp_camera.h"
2#include <WiFi.h>
3
4#define CAMERA_MODEL_AI_THINKER
5#include "camera_pins.h"
6
7const char* ssid = "YOUR_WIFI";
8const char* password = "YOUR_PASSWORD";
9
10void startCameraServer();
11
12void setup() {
13 Serial.begin(115200);
14 WiFi.begin(ssid, password);
15 while (WiFi.status() != WL_CONNECTED) delay(500);
16
17 camera_config_t config;
18 config.ledc_channel = LEDC_CHANNEL_0;
19 config.ledc_timer = LEDC_TIMER_0;
20 config.pin_d0 = Y2_GPIO_NUM;
21 config.pin_d1 = Y3_GPIO_NUM;
22 // ... (standard pin config for AI-Thinker board)
23 config.xclk_freq_hz = 20000000;
24 config.pixel_format = PIXFORMAT_JPEG;
25 config.frame_size = FRAMESIZE_VGA;
26 config.jpeg_quality = 10;
27 config.fb_count = 2;
28
29 esp_camera_init(&config);
30 startCameraServer();
31 Serial.print("Stream ready at http://");
32 Serial.println(WiFi.localIP());
33}
34
35void loop() { delay(10000); }The ESP32-CAM now serves an MJPEG stream at http://<ESP_IP>:81/stream.
2. Capturing Frames with OpenCV
1import cv2
2import time
3
4class ESP32CamStream:
5 def __init__(self, url, timeout=5):
6 self.url = url
7 self.timeout = timeout
8 self.cap = None
9
10 def connect(self):
11 # Use FFMPEG backend for MJPEG
12 self.cap = cv2.VideoCapture(self.url, cv2.CAP_FFMPEG)
13 self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # minimize latency
14
15 def read(self):
16 if self.cap is None or not self.cap.isOpened():
17 self.connect()
18 ret, frame = self.cap.read()
19 if not ret:
20 self.cap.release()
21 self.cap = None
22 return None
23 return frame
24
25 def release(self):
26 if self.cap:
27 self.cap.release()
28
29# Usage
30cam = ESP32CamStream("http://192.168.1.100:81/stream")
31while True:
32 frame = cam.read()
33 if frame is None:
34 continue
35 cv2.imshow("ESP32-CAM", frame)
36 if cv2.waitKey(1) & 0xFF == ord('q'):
37 break
38cam.release()
39cv2.destroyAllWindows()CAP_PROP_BUFFERSIZE=1 is critical — without it, OpenCV buffers frames and you get multi-second latency.
3. Preprocessing for Edge Inference
Before running any model, you need to preprocess frames:
1import numpy as np
2
3def preprocess_frame(frame, target_size=(224, 224)):
4 # 1. Resize (preserving aspect ratio with padding)
5 h, w = frame.shape[:2]
6 scale = min(target_size[0] / w, target_size[1] / h)
7 new_w, new_h = int(w * scale), int(h * scale)
8 resized = cv2.resize(frame, (new_w, new_h))
9
10 # 2. Pad to target size
11 pad_w = target_size[0] - new_w
12 pad_h = target_size[1] - new_h
13 top, bottom = pad_h // 2, pad_h - pad_h // 2
14 left, right = pad_w // 2, pad_w - pad_w // 2
15 padded = cv2.copyMakeBorder(resized, top, bottom, left, right,
16 cv2.BORDER_CONSTANT, value=(0, 0, 0))
17
18 # 3. Normalize to [0, 1]
19 normalized = padded.astype(np.float32) / 255.0
20
21 # 4. Convert BGR → RGB (OpenCV uses BGR, most models expect RGB)
22 rgb = cv2.cvtColor(normalized, cv2.COLOR_BGR2RGB)
23
24 # 5. Add batch dimension
25 return np.expand_dims(rgb, axis=0)
26
27def draw_detections(frame, detections, color=(0, 255, 0)):
28 for det in detections:
29 x, y, w, h = det["bbox"]
30 label = f"{det['class']} {det['confidence']:.2f}"
31 cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
32 cv2.putText(frame, label, (x, y - 10),
33 cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
34 return frame4. Running Lightweight Inference
Here's a complete pipeline using a TFLite model:
1import tflite_runtime.interpreter as tflite
2
3class EdgeVisionPipeline:
4 def __init__(self, stream_url, model_path, labels):
5 self.cam = ESP32CamStream(stream_url)
6 self.interpreter = tflite.Interpreter(model_path=model_path)
7 self.interpreter.allocate_tensors()
8 self.input_details = self.interpreter.get_input_details()
9 self.output_details = self.interpreter.get_output_details()
10 self.labels = labels
11 self.fps_counter = 0
12 self.fps_timer = time.time()
13
14 def run(self):
15 while True:
16 frame = self.cam.read()
17 if frame is None:
18 continue
19
20 # Preprocess
21 input_data = preprocess_frame(frame,
22 target_size=tuple(self.input_details[0]["shape"][1:3]))
23
24 # Inference
25 self.interpreter.set_tensor(self.input_details[0]["index"], input_data)
26 self.interpreter.invoke()
27 output = self.interpreter.get_tensor(self.output_details[0]["index"])
28
29 # Post-process
30 detections = self._postprocess(output, frame.shape[:2])
31 frame = draw_detections(frame, detections)
32
33 # FPS counter
34 self.fps_counter += 1
35 elapsed = time.time() - self.fps_timer
36 if elapsed >= 1.0:
37 cv2.putText(frame, f"FPS: {self.fps_counter / elapsed:.1f}",
38 (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
39 self.fps_counter = 0
40 self.fps_timer = time.time()
41
42 cv2.imshow("Edge Vision", frame)
43 if cv2.waitKey(1) & 0xFF == ord('q'):
44 break
45
46 self.cam.release()
47 cv2.destroyAllWindows()
48
49 def _postprocess(self, output, frame_shape):
50 # Model-specific post-processing
51 detections = []
52 # ... parse output tensor into bounding boxes
53 return detections
54
55pipeline = EdgeVisionPipeline(
56 stream_url="http://192.168.1.100:81/stream",
57 model_path="models/detect.tflite",
58 labels=["person", "face", "object"],
59)
60pipeline.run()5. Performance Tips
| Optimization | Impact | How |
|---|---|---|
| Lower resolution | +10-15 FPS | Use FRAMESIZE_QVGA instead of FRAMESIZE_VGA |
| Skip frames | +inference throughput | Run inference every Nth frame, display all |
| Threaded capture | -50% latency | Capture in a separate thread, inference in main |
| Quantized models | 2-4x faster | Use INT8 quantized TFLite models |
| JPEG quality | less bandwidth | Set jpeg_quality = 15 on ESP32 |
Threaded capture pattern:
1import threading
2
3class ThreadedCamera:
4 def __init__(self, url):
5 self.cam = ESP32CamStream(url)
6 self.frame = None
7 self.running = True
8 self.lock = threading.Lock()
9
10 def _capture_loop(self):
11 while self.running:
12 frame = self.cam.read()
13 if frame is not None:
14 with self.lock:
15 self.frame = frame
16
17 def start(self):
18 self.cam.connect()
19 thread = threading.Thread(target=self._capture_loop, daemon=True)
20 thread.start()
21
22 def read(self):
23 with self.lock:
24 return self.frame.copy() if self.frame is not None else None
25
26 def stop(self):
27 self.running = False
28 self.cam.release()Summary
- ESP32-CAM streams MJPEG over WiFi — cheap, wireless, $8
- OpenCV captures and preprocesses frames —
CAP_PROP_BUFFERSIZE=1for low latency - TFLite runs lightweight inference on the companion computer — INT8 quantized for speed
- Threaded capture decouples frame capture from inference — prevents pipeline stalls
- No cloud needed — the entire pipeline runs on-device, preserving privacy and reducing latency
Go from Arduino to Production Firmware
The ESP32-IDF Workshop covers ESP-IDF from scratch — tasks, queues, OTA, Wifi management, and deploying firmware that doesn't break at 3am.
Frequently Asked Questions
Quick answers to common questions

I build things that run on chips and the software that talks to them. ESP32, STM32, FreeRTOS, FastAPI, TinyML — from bare-metal firmware to cloud backends to on-device inference. Based in Bengaluru. Founder of Analog Data.