0% found this document useful (0 votes)
4 views

Camera Processing Script

The document describes the fast_cam_mp_new_v2.py script, a FastAPI-based application for concurrent processing of multiple camera feeds using multiprocessing and multithreading. It incorporates FFmpeg for frame capture, Kafka for event streaming, and MongoDB for status persistence, featuring robust error handling and dynamic configuration reloading. Key improvements over the previous analytics.py include enhanced performance, reliability, flexibility, and a comprehensive management interface through REST API endpoints.

Uploaded by

S R Saini
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
4 views

Camera Processing Script

The document describes the fast_cam_mp_new_v2.py script, a FastAPI-based application for concurrent processing of multiple camera feeds using multiprocessing and multithreading. It incorporates FFmpeg for frame capture, Kafka for event streaming, and MongoDB for status persistence, featuring robust error handling and dynamic configuration reloading. Key improvements over the previous analytics.py include enhanced performance, reliability, flexibility, and a comprehensive management interface through REST API endpoints.

Uploaded by

S R Saini
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
You are on page 1/ 9

Deep Dive into the Camera Processing Script (fast_cam_mp_new_v2.

py)
The Python script fast_cam_mp_new_v2.py outlines a robust FastAPI-based
application designed for concurrent processing of multiple camera feeds. It
leverages multiprocessing and multithreading for efficiency, FFmpeg for reliable
frame capture, Kafka for event streaming, and MongoDB for status persistence.
Here's a detailed breakdown of its logic:
I. Core Architecture & Setup
 Libraries: The script begins by importing a comprehensive set of libraries
crucial for its operations:
o fastapi: For building the web API.

o multiprocessing, threading: For parallel processing of camera feeds.

o subprocess: To run external commands like FFmpeg.

o cv2 (OpenCV), numpy: For image processing tasks.

o confluent_kafka: For interacting with Apache Kafka.

o pymongo: For interacting with MongoDB.

o logging, json, os, time, datetime, pytz, hashlib, uuid, queue, signal,
sys, torch: For various utility, data handling, and system functions.
 Configuration:
o Paths for critical files are defined: INPUT_JSON_PATH (for camera
configurations, camera.json), LOG_FILE, and CONFIG_FILE (for
application settings like database/Kafka connection details,
local.json).
o WORKER_PROCESSES and THREADS_PER_PROCESS are determined,
typically based on CPU core availability, to optimize parallel task
execution.
o A global logger instance is prepared.

 Logging (setup_logging, get_logger):


o A centralized logging system is established using Python's logging
module.
o It features a RotatingFileHandler to manage log file size and
backups.
o Log messages are formatted to include timestamps, process ID,
thread ID, and log level for easier debugging across concurrent
operations.
 MongoDB Connection (connect_mongodb):
o A function to establish a connection to the MongoDB instance using
connection strings and database names specified in the
CONFIG_FILE.
 FastAPI Initialization:
o An instance of the FastAPI application is created.

II. Frame Acquisition and Initial Processing


 FFmpeg for Frame Reading (read_one_frame_ffmpeg):
o This function is responsible for capturing a single frame from a
given video source (typically an RTSP stream).
o It constructs and executes an ffmpeg command as a subprocess.

o Key ffmpeg arguments include:

 -rtsp_transport tcp: Specifies TCP for RTSP transport.


 -stimeout: Sets a timeout for the stream connection.
 -vframes 1: Instructs ffmpeg to grab only one frame.
 -f rawvideo -pix_fmt bgr24: Specifies the output format as
raw BGR24 pixel data.
 -s {w}x{h}: Resizes the frame to a defined resolution.
o The raw frame data is read from the subprocess's stdout, converted
into a NumPy array, and then reshaped into an image format usable
by OpenCV.
 Input Configuration (read_input_json, check_json_completion):
o read_input_json(): Reads camera configurations (like RTSP URLs,
event settings) from the cam.json file. It can flatten nested lists of
camera data and optionally "rotate" the list, which might be a
simple load distribution or processing order mechanism.
o check_json_completion(): Validates if the cam.json file is a complete
and syntactically correct JSON document. This prevents errors from
processing partially written configuration files.
III. Core Frame Processing Logic (FrameProcessor Class)
This class encapsulates the logic for analyzing individual camera frames.
 Initialization (__init__):
o Initializes a dictionary camera_covered_counters to track
consecutive frames where a camera might be covered.
o Sets default OpenCV capture parameters.

 Time Check (check_time):


o A utility to determine if the current time falls within a specified start
and end hour/minute. This is used to enable or disable alerts based
on a schedule.
 Camera Coverage Detection (is_camera_covered):
o A sophisticated function to detect if a camera's view is obstructed. It
employs multiple checks:
 Blur Detection: Calculates the standard deviation of pixel
intensities in a grayscale version of the frame. A low standard
deviation suggests a blurry image.
 Monochromatic Detection: Analyzes the histogram of the
grayscale frame to determine if a very high percentage of
pixels are near-black or near-white.
 Uniformity Check: Calculates the variance of pixel
intensities across the frame. Very low variance might indicate
a frozen or disconnected stream showing a static, uniform
image.
o The camera is considered "covered" if the frame is blurry AND NOT
predominantly monochromatic AND NOT extremely uniform (to
differentiate from connectivity issues).
 Main Frame Processing (process_frame): This is the heart of the per-
camera processing.
1. Setup: Retrieves camera details (ID, name, RTSP URLs) from the
input record. It supports primary and secondary RTSP URLs for
fallback.
2. Frame Acquisition: Iterates through the available RTSP URLs and
attempts to grab a frame using read_one_frame_ffmpeg().
3. Connection Status:

 Failure: If frame grabbing fails for all URLs, it logs the error,
updates the camera's status in MongoDB to "disconnected"
(via update_camera_status), and sends a camera_disconnect
event to Kafka.
 Success: If a frame is successfully grabbed, it updates the
camera's status in MongoDB to "connected".
4. Camera Covered Event:

 It calls is_camera_covered() to check for obstruction.


 If the camera is detected as covered for a defined number of
consecutive frames (e.g., > 5), the current frame is Base64
encoded, and a camera_covered event is sent to Kafka. The
counter helps prevent false positives from transient issues.
5. Other Custom Events:

 If the camera is not covered, the script iterates through


events_settings defined for that camera in camera.json.
 For each defined event (other than "camera_disconnect" or
"camera_covered" which are handled specially):
 It uses check_time() to see if the event is currently
active based on its schedule.
 If active, the frame is Base64 encoded, the event type
is set, and the event data is sent to Kafka.
6. Error Handling: Robust try-except blocks manage potential errors
during frame processing, database updates, or Kafka messaging.
IV. Data Persistence and Messaging
 MongoDB Updates (update_camera_status):
o This function is responsible for updating the status of each camera
in a MongoDB collection (likely named total_cameras).
o It records information like connection status (true/false), error
messages, the current working directory, location name, and
camera number.
o It uses upsert=True to create a new document if one doesn't exist
for the camera ID or update an existing one.
o Includes a retry mechanism for database operations to handle
transient network issues.
 Kafka Integration (send_message, delivery_callback):
o send_message(): Publishes messages (JSON strings representing
camera events) to a specified Kafka topic.
 Each message is assigned a unique key (UUID).
 It uses the producer.produce() method and can leverage
Kafka's batching capabilities for efficiency.
 producer.flush() is called to ensure messages are sent,
though frequent flushing might impact performance and is
typically managed by Kafka's internal batching/linger
settings.
o delivery_callback(): A callback function used by the Kafka producer
to log the success or failure of message delivery.
V. Concurrency: Worker Processes and Threads
The system is designed for high throughput using a multi-layered concurrency
model.
 Worker Thread (worker_thread):
o This is the smallest unit of execution for camera processing.

o Each thread runs in an infinite loop, fetching camera tasks


(dictionaries containing camera details) from a shared
mp.JoinableQueue.
o It maintains its own instances of FrameProcessor, MongoDB client,
and Kafka producer.
o When it receives a None (sentinel value) from the queue, the thread
cleans up (flushes Kafka producer) and exits.
o Error handling is present to catch exceptions during processing or
queue operations.
 Worker Process (worker_process):
o Each worker process manages one or more worker_threads.

o It sets up signal handlers (SIGTERM, SIGINT) for graceful shutdown.

o It creates and starts a defined number (THREADS_PER_PROCESS) of


worker_threads, passing them the camera queue.
o It reports its "ready" status back to the main process via a
result_queue.
o It waits for all its managed threads to complete (join).

 Camera Distribution (distribute_cameras_direct):


o This function takes a list of camera configurations (read from
camera.json).
o It distributes these cameras among the camera_queues (one queue
per worker process).
o The distribution aims for basic load balancing by adding new
camera tasks to the queues that currently have the fewest items.
This allows dynamic addition of cameras without waiting for existing
queues to empty.
VI. System Orchestration and Management
 JSON Processing Loop (json_processing_loop):
o Runs in a separate thread.

o Periodically (e.g., every 30 seconds) checks if the camera.json file


has been modified by comparing its current MD5 hash with a stored
hash.
o If the file has changed (or on a periodic reload schedule) and the
JSON is valid (check_json_completion), it calls
distribute_cameras_direct to update the camera tasks in the worker
queues. This allows for dynamic reconfiguration of cameras without
restarting the entire application.
 Supervisor Loop (supervisor_loop):
o Runs in a separate thread.

o Monitors the health of all worker_process instances.


o If a worker process is found to be no longer alive, the supervisor
restarts it. The new process will take over the same camera queue,
ensuring that unprocessed camera tasks are handled. This provides
resilience against worker process crashes.
 System Start-up (start_system):
o This function initializes the entire processing pipeline:

1. Sets up logging.
2. Creates the result_queue (for ready signals from processes)
and the camera_queues.
3. Starts the WORKER_PROCESSES worker processes.
4. Waits to receive a "ready" message from each worker
process.
5. Starts the json_processing_loop thread for dynamic
configuration loading.
6. Starts the supervisor_loop thread for process monitoring.
 System Shutdown (stop_system):
o This function handles the graceful shutdown of the application:

1. Signals the json_processing_loop and supervisor_loop to stop.


2. Sends a None sentinel value to each camera_queue for each
thread, prompting worker threads to finish their current task
and exit.
3. Waits for all items in the camera queues to be processed
(queue.join()).
4. Terminates any worker processes that haven't exited after a
timeout.
VII. FastAPI Endpoints and Main Execution
 FastAPI Events (startup_event, shutdown_event):
o @app.on_event("startup"): Calls start_system() when the FastAPI
application starts.
o @app.on_event("shutdown"): Calls stop_system() when the FastAPI
application shuts down.
 API Endpoints:
o GET /status: Provides a status overview of the system, including the
number of worker processes, threads, and potentially active
cameras (approximated by non-empty queues).
o POST /reload: An endpoint intended to trigger a manual reload of
the camera configurations. The implementation
background_tasks.add_task(distribute_cameras) has a reference to
an undefined distribute_cameras function. It likely intends to use
distribute_cameras_direct after reading the cam.json or trigger the
json_processing_loop's logic.
 Main Execution (if __name__ == "__main__":):
o This block is executed when the script is run directly.

o It initializes the logger.

o It uses uvicorn to run the FastAPI application, making it accessible


over HTTP.
o It configures Uvicorn's logging to integrate with the application's
existing logging setup by setting disable_existing_loggers=False in
Uvicorn's log configuration.
Comparison with analytics.py

Feature analytics.py fast_cam_mp_new_v2.py Improvement


Major scalability
Architecture Single-process Multi-process/Multi-threaded
improvement
Stream OpenCV's More reliable stream
ffmpeg subprocess
Handling VideoCapture connection

Comprehensive supervisor
Error Recovery Basic retries Better fault tolerance
system

Configuration Static Dynamic with auto-reload Better operational flexibility

Resource Sequential
Parallel processing Better CPU utilization
Usage processing

Better performance
Load Balancing None Queue-based with metrics
distribution

Logging Minimal Comprehensive with rotation Better operational visibility

Management
None FastAPI REST endpoints Better system control
Interface

Camera Enhanced with better


Similar More accurate detection
Algorithm parameters

Process
None Full process/thread supervision Better reliability
Monitoring

Key Improvements

1. Performance: Parallelized processing allows handling more cameras


simultaneously
2. Reliability: Supervisor system ensures continuous operation despite
failures
3. Flexibility: Dynamic configuration reloading without restart
4. Maintainability: Comprehensive logging for troubleshooting
5. Efficiency: Smart load balancing and resource allocation
Management: REST API for monitoring and control
Summary of Workflow:
1. Initialization: On application start, worker processes and their threads
are launched. Monitoring (supervisor and JSON config) threads also begin.
2. Configuration Loading: The json_processing_loop reads cam.json and
distributes camera processing tasks to worker queues.
3. Concurrent Processing: Worker threads pick up camera tasks. Each
thread uses ffmpeg to get a frame.
4. Frame Analysis: The FrameProcessor analyzes the frame for events (e.g.,
camera covered, custom alerts based on time schedules).
5. Event Publishing: Detected events (with optional frame data) are sent as
messages to a Kafka topic.
6. Status Update: Camera connection status and errors are logged to
MongoDB.
7. Dynamic Updates: Changes to cam.json are automatically detected, and
camera tasks are redistributed.
8. Resilience: The supervisor restarts any crashed worker processes.
9. API Interaction: Status can be queried, and configuration reloads can be
triggered via HTTP endpoints.
This script represents a complex, scalable, and resilient system for real-time
video stream processing and event generation.

Conclusion
The fast_cam_mp_new_v2.py represents a significant upgrade from analytics.py,
offering:

1. Scalability: The system can handle hundreds of cameras across multiple


processes
2. Reliability: With automatic recovery from failures
3. Efficiency: Through parallel processing and intelligent load balancing
4. Manageability: With comprehensive logging and API controls

This architectural improvement enables monitoring many more cameras with the
same hardware resources, while providing better detection reliability and system
resilience.

You might also like