import pandas as pd import numpy as np from sklearn.neighbors import KDTree from scipy.spatial import KDTree import joblib from sqlalchemy import create_engine , text import sys # Step 1: Fetch raw data from the Database model_path =r"C:\Users\vinayak\OneDrive\Desktop\Vecmocon\Script\stacking_model_xg_logic_diff.pkl" most_peak_periods = [(22 * 3600, 24 * 3600), (0, 6 * 3600)] peak_periods = [(6 * 3600, 12 * 3600), (18 * 3600, 22 * 3600)] def fetch_raw_data(engine, table_name, start_time=None, end_time=None): try: query = f"SELECT * FROM {table_name}" if start_time and end_time: query += f" WHERE last_reported_time BETWEEN '{start_time}' AND '{end_time}'" with engine.connect() as conn: result = conn.execute(text(query)) rows = result.fetchall() # Fetch all rows columns = result.keys() # Get column names # Convert to DataFrame df = pd.DataFrame(rows, columns=columns) print("\nData fetched from the database:") print(df.head()) return df except Exception as e: print(f"\nError fetching data from the database: {e}") return pd.DataFrame() # step 2 process raw data def process_raw_data(vehicle_df): """ Process raw data for a single vehicle: sort by last_reported_time, remove duplicate timestamps, and clean data. Parameters: vehicle_df (pd.DataFrame): Data of a single vehicle. Returns: pd.DataFrame: Processed and sorted data. """ try: # Ensure last_reported_time column is in datetime format vehicle_df['last_reported_time'] = pd.to_datetime(vehicle_df['last_reported_time'], errors='coerce') # Step 1: Sort data by last_reported_time vehicle_df = vehicle_df.sort_values(by="last_reported_time").reset_index(drop=True) # Step 2: Remove duplicate timestamps (keeping the latest occurrence) vehicle_df = vehicle_df.drop_duplicates(subset=['last_reported_time'], keep='last') # Step 3: Clean missing values if necessary (optional) vehicle_df = vehicle_df.dropna(subset=['last_reported_time', 'chassis_no']) return vehicle_df except Exception as e: print(f"Error processing raw data for chassis {vehicle_df['chassis_no'].iloc[0] if 'chassis_no' in vehicle_df else 'Unknown'}: {e}") return pd.DataFrame() # Return empty DataFrame on failure # step 3 : route identification def route_identification(vehicle_df): """ Identifies routes for a single vehicle's data. Parameters: vehicle_df (pd.DataFrame): Data for a single vehicle. Returns: pd.DataFrame: Identified routes with frequency and date range. """ try: chassis_no = vehicle_df['chassis_no'].iloc[0] print(f"\nPerforming route identification for chassis: {chassis_no}") # Drop rows with missing latitude or longitude values vehicle_df = vehicle_df.dropna(subset=['latitude', 'longitude']).copy() # Compute distance traveled (ensuring no negative values) vehicle_df['timestamp_distance'] = vehicle_df['odometer_kms'].diff().fillna(0).clip(lower=0) # Convert timestamps to datetime vehicle_df['date_time'] = pd.to_datetime(vehicle_df['last_reported_time'], errors='coerce') # Compute time differences & extract date vehicle_df['Time_Difference'] = vehicle_df['date_time'].diff().dt.total_seconds().fillna(0) vehicle_df['Date'] = vehicle_df['date_time'].dt.date # Store as date objects # Round coordinates for clustering vehicle_df['lat3'] = vehicle_df['latitude'].round(3) vehicle_df['lon3'] = vehicle_df['longitude'].round(3) vehicle_df['coordinates3'] = vehicle_df['lat3'].astype(str) + ',' + vehicle_df['lon3'].astype(str) # Get sorted unique dates unique_dates = np.sort(vehicle_df['Date'].dropna().unique()) # Initialize DataFrame for identified routes all_routes = pd.DataFrame() # Process in a rolling window (7 day, considering sequential processing per vehicle) for start_idx in range(len(unique_dates) - 7): window_start_date = unique_dates[start_idx] window_end_date = unique_dates[start_idx + 7] date_range_str = f"{window_start_date.strftime('%b %d')} - {window_end_date.strftime('%b %d')}" # Filter data for the current window df_window = vehicle_df[ (vehicle_df['Date'] >= window_start_date) & (vehicle_df['Date'] <= window_end_date) ].copy() # Keep only records where the vehicle has moved df_moving = df_window[df_window['timestamp_distance'] > 0] # Remove consecutive duplicate coordinates df_moving = df_moving.loc[df_moving['coordinates3'].shift() != df_moving['coordinates3']] # Compute coordinate frequency coord_counts = df_moving['coordinates3'].value_counts().reset_index() coord_counts.columns = ['coordinates3', 'frequency3'] # Apply frequency threshold (80th percentile) if not coord_counts.empty: threshold = np.percentile(coord_counts['frequency3'], 80) route = coord_counts[coord_counts['frequency3'] >= threshold].copy() # Convert coordinate strings to lat/lon route[['lat3', 'lon3']] = route['coordinates3'].str.split(',', expand=True).astype(float) # Assign route identification number and date range route['route_identification_no'] = start_idx + 1 route['date_range'] = date_range_str route['chassis_no'] = chassis_no # Keep track of vehicle ID # Append to all routes all_routes = pd.concat([all_routes, route], ignore_index=True) return all_routes except Exception as e: print(f"Error in route identification for chassis {chassis_no}: {e}") return pd.DataFrame() # Return empty DataFrame on failure # Step 4: Route Analysis def route_analysis(vehicle_df, df_routes): """ Performs route analysis by comparing IoT data with identified routes. Parameters: vehicle_df (pd.DataFrame): IoT data for a single vehicle. df_routes (pd.DataFrame): Identified routes. Returns: pd.DataFrame: Analysis results including route adherence percentages and trip details. """ try: if vehicle_df.empty or df_routes.empty: print("Warning: One or both input datasets are empty.") return pd.DataFrame() chassis_no = vehicle_df['chassis_no'].iloc[0] print(f"\nPerforming route analysis for chassis: {chassis_no}") # Preprocess vehicle data vehicle_df = vehicle_df.dropna(subset=['latitude', 'longitude']).copy() vehicle_df['timestamp_distance'] = vehicle_df['odometer_kms'].diff().fillna(0).clip(lower=0) vehicle_df['date_time'] = pd.to_datetime(vehicle_df['last_reported_time'], errors='coerce') vehicle_df['Time_Difference'] = vehicle_df['date_time'].diff().dt.total_seconds().fillna(0) vehicle_df['Date'] = vehicle_df['date_time'].dt.date # Store as date objects vehicle_df['lat4'] = vehicle_df['latitude'].round(4) vehicle_df['lon4'] = vehicle_df['longitude'].round(4) # Map Dates to Route Identification Numbers unique_dates = sorted(vehicle_df['Date'].dropna().unique()) date_route_mapping = {date: idx + 1 for idx, date in enumerate(unique_dates)} results = [] for date in unique_dates: df_day = vehicle_df[vehicle_df['Date'] == date].dropna(subset=['lat4', 'lon4']).copy() if df_day.empty: continue # Convert IoT data coordinates to radians new_coords_radians = np.radians(df_day[['lat4', 'lon4']].to_numpy()) # Get corresponding route identification number route_no = date_route_mapping.get(date, None) route = df_routes[df_routes['route_identification_no'] == route_no] if route.empty: print(f"Warning: No route found for date {date}. Skipping analysis.") continue # Convert route coordinates to radians and build KDTree coords_radians = np.radians(route[['lat3', 'lon3']].to_numpy()) tree = KDTree(coords_radians) # Define a search radius of 150 meters radius = 150 / 6371000 # Earth radius in meters converted to radians # Find nearest route points for each IoT data point distances, _ = tree.query(new_coords_radians, distance_upper_bound=radius) df_day['occurrence'] = (distances < radius).astype(int) # Mark as 1 if within radius # Compute route following percentage def route_percentage(df): route_dist = df[df['occurrence'] == 1]['timestamp_distance'].sum() total_dist = df['timestamp_distance'].sum() return route_dist / total_dist if total_dist > 0 else 0 route_perc = route_percentage(df_day) date_range = route['date_range'].iloc[0] if 'date_range' in route.columns and not route.empty else "N/A" # Store results results.append({ 'date': date, 'route_identification_no': route_no, 'percentage_route_followed': route_perc * 100, 'percentage_non_route_followed': (1 - route_perc) * 100, 'chassis_no': chassis_no, 'Day': df_day['date_time'].dt.strftime('%A').iloc[0], 'date_range': date_range }) return pd.DataFrame(results) except Exception as e: print(f"Error during route analysis for chassis {chassis_no}: {e}") return pd.DataFrame() # Return empty DataFrame on failure # Step 4: Create Additional Features def create_additional_features(vehicle_df): """ Creates additional features like static_time_percentage, peak_percentage, voltage_drop, etc. Parameters: vehicle_df (pd.DataFrame): IoT data for a single vehicle. Returns: pd.DataFrame: A DataFrame with computed additional features. """ if vehicle_df.empty: print("Warning: Vehicle data is empty.") return pd.DataFrame() vehicle_df = vehicle_df.dropna(subset=['latitude', 'longitude']).copy() try: vehicle_df['last_reported_time'] = pd.to_datetime(vehicle_df['last_reported_time'], errors='coerce') vehicle_df = vehicle_df.dropna(subset=['last_reported_time']) # Remove rows where datetime conversion failed vehicle_df['date'] = vehicle_df['last_reported_time'].dt.date except Exception as e: print(f"Error processing dates: {e}") return pd.DataFrame() # Compute necessary differences vehicle_df['timestamp_distance'] = vehicle_df['odometer_kms'].diff().fillna(0).clip(lower=0) vehicle_df['ivdiff'] = vehicle_df['voltage'].diff().fillna(0) results = [] voltage_results = [] valid_dates = [] last_valid_odo = None try: for date, group in vehicle_df.groupby('date'): try: group_sorted = group.sort_values(by='last_reported_time') odos = group_sorted['odometer_kms'].values if last_valid_odo is not None and odos[0] < last_valid_odo: continue if not all(odos[i] >= odos[i - 1] for i in range(1, len(odos))): continue valid_dates.append(date) last_valid_odo = odos[-1] except Exception as e: print(f"Skipping date {date} due to error: {e}") continue for date in valid_dates: try: daily_data = vehicle_df[vehicle_df['date'] == date].sort_values(by='last_reported_time') total_working_seconds = static_time_seconds = peak_time_seconds = 0 last_odometer = last_timestamp = None total_distance = total_travel_time = 0 for _, row in daily_data.iterrows(): if last_odometer is not None: time_diff = (row['last_reported_time'] - last_timestamp).total_seconds() current_hour = row['last_reported_time'].hour * 3600 + \ row['last_reported_time'].minute * 60 + \ row['last_reported_time'].second if row['odometer_kms'] == last_odometer: static_time_seconds += time_diff else: total_working_seconds += time_diff distance_covered = row['odometer_kms'] - last_odometer total_distance += distance_covered total_travel_time += time_diff if any(start <= current_hour < end for start, end in most_peak_periods) or \ any(start <= current_hour < end for start, end in peak_periods): peak_time_seconds += time_diff last_odometer = row['odometer_kms'] last_timestamp = row['last_reported_time'] total_working_hours = total_working_seconds / 3600 if total_working_seconds > 0 else 0 static_time_percentage = static_time_seconds / (total_working_seconds + static_time_seconds) if (total_working_seconds + static_time_seconds) > 0 else 1.0 peak_percentage = peak_time_seconds / total_working_seconds if total_working_seconds > 0 else 0 average_speed_motion = (total_distance / total_travel_time) * 3600 if total_travel_time > 0 else 0 trip_duration = total_working_seconds + static_time_seconds results.append({ 'date': date, 'total_working_hours': total_working_hours, 'static_time_percentage': static_time_percentage, 'peak_percentage': peak_percentage, 'average_speed_motion': average_speed_motion, 'total_distance_odo_for_speed': total_distance, 'time_moving': total_working_seconds, 'time_static': static_time_seconds, 'trip_duration': trip_duration }) # Voltage drop calculations v1 = v3 = 0 flag = False sum_ivdiff = 0 charging_drop = 0 for _, row in daily_data.iterrows(): if row['odometer_kms'] > 0 and not flag: v1 = row['voltage'] flag = True if row['timestamp_distance'] > 0: if sum_ivdiff >= 0.4: charging_drop += sum_ivdiff sum_ivdiff = 0 else: sum_ivdiff += row['ivdiff'] if row['odometer_kms'] > 0: v3 = row['voltage'] voltage_drop = abs(v1 - v3 + charging_drop) if v1 and v3 else 0 voltage_results.append({'date': date, 'cumulative_voltage_drop': voltage_drop}) except Exception as e: print(f"Skipping feature calculation for date {date} due to error: {e}") continue except Exception as e: print(f"Unexpected error in processing: {e}") return pd.DataFrame() # Convert results to DataFrames metrics_df = pd.DataFrame(results) voltage_df = pd.DataFrame(voltage_results) if metrics_df.empty and voltage_df.empty: print("No valid data processed.") return pd.DataFrame() # Merge additional features final_df = pd.merge(metrics_df, voltage_df, on='date', how='outer') return final_df # step 5 : merging features def merge_features(route_analysis_df, additional_features_df): """ Merges route analysis and additional features DataFrames on the 'date' column. Parameters: route_analysis_df (pd.DataFrame): DataFrame containing route analysis data. additional_features_df (pd.DataFrame): DataFrame containing additional computed features. Returns: pd.DataFrame: Merged DataFrame containing both route analysis and additional features. """ print("Merging route analysis and additional features...") # Ensure inputs are DataFrames if isinstance(route_analysis_df, dict): route_analysis_df = pd.DataFrame([route_analysis_df]) elif isinstance(route_analysis_df, list): route_analysis_df = pd.DataFrame(route_analysis_df) if isinstance(additional_features_df, dict): additional_features_df = pd.DataFrame([additional_features_df]) elif isinstance(additional_features_df, list): additional_features_df = pd.DataFrame(additional_features_df) # Validate required columns if route_analysis_df.empty or 'date' not in route_analysis_df.columns: print("Error: route_analysis_df is empty or missing 'date' column.") return pd.DataFrame() if additional_features_df.empty or 'date' not in additional_features_df.columns: print("Error: additional_features_df is empty or missing 'date' column.") return pd.DataFrame() # Standardize column names (convert both to lowercase for consistency) route_analysis_df.columns = route_analysis_df.columns.str.lower() additional_features_df.columns = additional_features_df.columns.str.lower() # Convert 'date' columns to proper date format route_analysis_df['date'] = pd.to_datetime(route_analysis_df['date']).dt.date additional_features_df['date'] = pd.to_datetime(additional_features_df['date']).dt.date # Merge on 'date' column merged_df = pd.merge(route_analysis_df, additional_features_df, on='date', how='outer') # Sort by date merged_df.sort_values(by='date', inplace=True) print("Merging completed successfully.") return merged_df # step 6 : Generating the predictions def generate_predictions(merged_df): """ Generates predictions using the pre-trained model. """ # === Preprocessing Steps === # Step 1: Keep only rows where total_working_hours is at least 1 hour merged_df = merged_df[merged_df['total_working_hours'] >= 1] # Step 2: Cap total_working_hours at 14 hours if it exceeds that merged_df.loc[merged_df['total_working_hours'] > 14, 'total_working_hours'] = 14 # Step 3: Remove rows where percentage_route_followed is less than 1% merged_df = merged_df[merged_df['percentage_route_followed'] >= 1] # Step 4: Identify rows where total_distance_odo_for_speed exceeds 200 (needs to be capped) distance_capped_mask = merged_df['total_distance_odo_for_speed'] > 200 # Step 5: Cap total_distance_odo_for_speed at 200 km merged_df.loc[distance_capped_mask, 'total_distance_odo_for_speed'] = 200 # Step 6: Recalculate average_speed_motion only for the capped rows merged_df.loc[distance_capped_mask, 'average_speed_motion'] = ( merged_df.loc[distance_capped_mask, 'total_distance_odo_for_speed'] / merged_df.loc[distance_capped_mask, 'total_working_hours'] ) # Step 7: Remove rows where total_distance_odo_for_speed is less than 20 km merged_df = merged_df[merged_df['total_distance_odo_for_speed'] >= 20] # Step 8: Remove rows with invalid (negative) average_speed_motion merged_df = merged_df[merged_df['average_speed_motion'] >= 0] # Step 9: Reset index after filtering merged_df = merged_df.reset_index(drop=True) # === Feature Engineering === merged_df["total_distance"] = merged_df['total_distance_odo_for_speed'] merged_df['route_km'] = (merged_df['total_distance_odo_for_speed'] * merged_df['percentage_route_followed']) / 100 merged_df['non_route_km'] = (merged_df['total_distance_odo_for_speed'] * merged_df['percentage_non_route_followed']) / 100 merged_df['earning_distance'] = 1.0 # Step 10: Calculate earning_distance based on route-followed and distance conditions for index, row in merged_df.iterrows(): if row['percentage_route_followed'] >= 0.70 and row['total_distance'] > 40: merged_df.at[index, 'earning_distance'] = row['route_km'] + row['non_route_km'] * 0.3 else: merged_df.at[index, 'earning_distance'] = row['route_km'] + row['non_route_km'] * 0.8 # === Prediction === features = ['cumulative_voltage_drop', 'static_time_percentage', 'total_working_hours', 'peak_percentage', 'percentage_route_followed', 'average_speed_motion'] X = merged_df[features].fillna(0) # Step 12: Load model and make predictions stacking_model = joblib.load(model_path) merged_df['rf_model_predictions'] = stacking_model.predict(X) merged_df.loc[merged_df['rf_model_predictions'] < 1, 'rf_model_predictions'] = 1 # Step 14: Calculate predicted income using fare, load factor, and passengers fareperpassenger = 10 numberofpassengers = 4 load_factor = 0.75 merged_df['predicted_income'] = ( merged_df['earning_distance'] * fareperpassenger * load_factor * numberofpassengers / merged_df['rf_model_predictions'] ).abs() # Step 15: Cap predicted income at 2000 merged_df.loc[merged_df['predicted_income'] > 2000, 'predicted_income'] = 2000 # Step 16: Drop rows where chassis_no is missing merged_df = merged_df.dropna(subset=['chassis_no']) return merged_df def main(vehicle_df): """ Main function to process a single vehicle's data and generate predictions. Parameters: vehicle_df (pd.DataFrame): Data of a single vehicle. Returns: pd.DataFrame: Processed data with predictions. """ try: print("Starting main function.") chassis_no = vehicle_df['chassis_no'].iloc[0] print(f"\nProcessing chassis number: {chassis_no}") # Step 1: Route Identification print("Performing route identification...") df_routes = route_identification(vehicle_df) print("Route identification completed.") # Step 2: Route Analysis print("Performing route analysis...") route_analysis_df = route_analysis(vehicle_df, df_routes) print("Route analysis completed.") # Step 3: Create Additional Features print("Creating additional features...") additional_features_df = create_additional_features(vehicle_df) print("Additional features created.") # Step 4: Merge Route Analysis & Additional Features print("Merging data...") merged_df = merge_features(route_analysis_df, additional_features_df) print("Data merged.") # Step 5: Generate Predictions print("Generating predictions...") final_vehicle_df = generate_predictions(merged_df) print("Predictions generated.") return final_vehicle_df except Exception as e: print(f"Error processing chassis {chassis_no}: {e}") return pd.DataFrame() # Return empty DataFrame on failure if __name__ == "__main__": # Parse Command-Line Arguments (Start & End Time) if len(sys.argv) != 3: print(" Error: Please provide start_time and end_time as arguments.") sys.exit(1) start_time = sys.argv[1] end_time = sys.argv[2] print(f"Fetching data from {start_time} to {end_time}...") # ENTER YOUR DATABASE CREDENTIALS HERE DB_USERNAME = "postgres" # Example: "admin" DB_PASSWORD = "vecmocon" # Example: "mypassword" DB_HOST = "13.50.153.215:5432" # Example: "localhost" or "192.168.1.100" DB_NAME = "ecofy-dashboard" # Example: "mydb" TABLE_NAME = "device_data" # # Construct the PostgreSQL connection URL DB_URL = f"postgresql+psycopg2://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}" # Create Database Engine try: engine = create_engine(DB_URL) print("PostgreSQL Database connection successful!") except Exception as e: print(f"Error connecting to the PostgreSQL database: {e}") sys.exit(1) # Fetch Raw Data from the Database df = fetch_raw_data(engine, TABLE_NAME, start_time, end_time) if df.empty: print("⚠️ No data returned from the database.") sys.exit(0) # Process and Analyze Each Vehicle Separately processed_data_list = [] try: for chassis_no in df['chassis_no'].unique(): print(f"\nProcessing chassis number: {chassis_no}") # Step 1: Filter data for the current vehicle vehicle_df = df[df['chassis_no'] == chassis_no].copy() # Step 2: Process raw data for this vehicle vehicle_df = process_raw_data(vehicle_df) # Step 3: Run the workflow for this vehicle final_vehicle_df = main(vehicle_df) # Step 4: Store Processed Vehicle Data if not final_vehicle_df.empty: processed_data_list.append(final_vehicle_df) # Merge and Insert Processed Data into the Database if processed_data_list: final_predictions = pd.concat(processed_data_list, ignore_index=True) print(" Workflow completed successfully.") try: with engine.begin() as conn: INSERT_SQL = f"INSERT INTO monthly_predicted_device_data ({', '.join(final_predictions.columns)}) VALUES ({', '.join([f':{col}' for col in final_predictions.columns])})" conn.execute(text(INSERT_SQL), final_predictions.to_dict(orient="records")) print(" Processed data inserted into 'monthly_predicted_device_data' successfully!") except Exception as e: print(f"Error inserting processed data into the database: {e}") sys.exit(1) else: print("No valid vehicle data was processed.") except Exception as e: print(f" Error during workflow execution: {e}") sys.exit(1)