From 9a617bf95db473634394b468c39b1dd45709fcd4 Mon Sep 17 00:00:00 2001 From: "kajal.kumari" Date: Mon, 26 May 2025 11:09:34 +0000 Subject: [PATCH] Upload files to "/" --- L5_DB_script_ecofy.py | 638 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 638 insertions(+) create mode 100644 L5_DB_script_ecofy.py diff --git a/L5_DB_script_ecofy.py b/L5_DB_script_ecofy.py new file mode 100644 index 0000000..c39e933 --- /dev/null +++ b/L5_DB_script_ecofy.py @@ -0,0 +1,638 @@ +import pandas as pd +import numpy as np +from sklearn.neighbors import KDTree +from scipy.spatial import KDTree +import joblib +from sqlalchemy import create_engine , text +import sys + +# Step 1: Fetch raw data from the Database +model_path =r"C:\Users\vinayak\OneDrive\Desktop\Vecmocon\Script\stacking_model_xg_logic_diff.pkl" +most_peak_periods = [(22 * 3600, 24 * 3600), (0, 6 * 3600)] +peak_periods = [(6 * 3600, 12 * 3600), (18 * 3600, 22 * 3600)] + +def fetch_raw_data(engine, table_name, start_time=None, end_time=None): + try: + query = f"SELECT * FROM {table_name}" + if start_time and end_time: + query += f" WHERE last_reported_time BETWEEN '{start_time}' AND '{end_time}'" + + with engine.connect() as conn: + result = conn.execute(text(query)) + rows = result.fetchall() # Fetch all rows + columns = result.keys() # Get column names + + # Convert to DataFrame + df = pd.DataFrame(rows, columns=columns) + + print("\nData fetched from the database:") + print(df.head()) + return df + except Exception as e: + print(f"\nError fetching data from the database: {e}") + return pd.DataFrame() + +# step 2 process raw data + +def process_raw_data(vehicle_df): + """ + Process raw data for a single vehicle: sort by last_reported_time, remove duplicate timestamps, and clean data. + + Parameters: + vehicle_df (pd.DataFrame): Data of a single vehicle. + + Returns: + pd.DataFrame: Processed and sorted data. + """ + try: + # Ensure last_reported_time column is in datetime format + vehicle_df['last_reported_time'] = pd.to_datetime(vehicle_df['last_reported_time'], errors='coerce') + + # Step 1: Sort data by last_reported_time + vehicle_df = vehicle_df.sort_values(by="last_reported_time").reset_index(drop=True) + + # Step 2: Remove duplicate timestamps (keeping the latest occurrence) + vehicle_df = vehicle_df.drop_duplicates(subset=['last_reported_time'], keep='last') + + # Step 3: Clean missing values if necessary (optional) + vehicle_df = vehicle_df.dropna(subset=['last_reported_time', 'chassis_no']) + + return vehicle_df + + except Exception as e: + print(f"Error processing raw data for chassis {vehicle_df['chassis_no'].iloc[0] if 'chassis_no' in vehicle_df else 'Unknown'}: {e}") + return pd.DataFrame() # Return empty DataFrame on failure + + + +# step 3 : route identification +def route_identification(vehicle_df): + """ + Identifies routes for a single vehicle's data. + + Parameters: + vehicle_df (pd.DataFrame): Data for a single vehicle. + + Returns: + pd.DataFrame: Identified routes with frequency and date range. + """ + try: + chassis_no = vehicle_df['chassis_no'].iloc[0] + print(f"\nPerforming route identification for chassis: {chassis_no}") + + # Drop rows with missing latitude or longitude values + vehicle_df = vehicle_df.dropna(subset=['latitude', 'longitude']).copy() + + # Compute distance traveled (ensuring no negative values) + vehicle_df['timestamp_distance'] = vehicle_df['odometer_kms'].diff().fillna(0).clip(lower=0) + + # Convert timestamps to datetime + vehicle_df['date_time'] = pd.to_datetime(vehicle_df['last_reported_time'], errors='coerce') + + # Compute time differences & extract date + vehicle_df['Time_Difference'] = vehicle_df['date_time'].diff().dt.total_seconds().fillna(0) + vehicle_df['Date'] = vehicle_df['date_time'].dt.date # Store as date objects + + # Round coordinates for clustering + vehicle_df['lat3'] = vehicle_df['latitude'].round(3) + vehicle_df['lon3'] = vehicle_df['longitude'].round(3) + vehicle_df['coordinates3'] = vehicle_df['lat3'].astype(str) + ',' + vehicle_df['lon3'].astype(str) + + # Get sorted unique dates + unique_dates = np.sort(vehicle_df['Date'].dropna().unique()) + + # Initialize DataFrame for identified routes + all_routes = pd.DataFrame() + + # Process in a rolling window (7 day, considering sequential processing per vehicle) + for start_idx in range(len(unique_dates) - 7): + window_start_date = unique_dates[start_idx] + window_end_date = unique_dates[start_idx + 7] + date_range_str = f"{window_start_date.strftime('%b %d')} - {window_end_date.strftime('%b %d')}" + + # Filter data for the current window + df_window = vehicle_df[ + (vehicle_df['Date'] >= window_start_date) & + (vehicle_df['Date'] <= window_end_date) + ].copy() + + # Keep only records where the vehicle has moved + df_moving = df_window[df_window['timestamp_distance'] > 0] + + # Remove consecutive duplicate coordinates + df_moving = df_moving.loc[df_moving['coordinates3'].shift() != df_moving['coordinates3']] + + # Compute coordinate frequency + coord_counts = df_moving['coordinates3'].value_counts().reset_index() + coord_counts.columns = ['coordinates3', 'frequency3'] + + # Apply frequency threshold (80th percentile) + if not coord_counts.empty: + threshold = np.percentile(coord_counts['frequency3'], 80) + route = coord_counts[coord_counts['frequency3'] >= threshold].copy() + + # Convert coordinate strings to lat/lon + route[['lat3', 'lon3']] = route['coordinates3'].str.split(',', expand=True).astype(float) + + # Assign route identification number and date range + route['route_identification_no'] = start_idx + 1 + route['date_range'] = date_range_str + route['chassis_no'] = chassis_no # Keep track of vehicle ID + + # Append to all routes + all_routes = pd.concat([all_routes, route], ignore_index=True) + + return all_routes + + except Exception as e: + print(f"Error in route identification for chassis {chassis_no}: {e}") + return pd.DataFrame() # Return empty DataFrame on failure + +# Step 4: Route Analysis + +def route_analysis(vehicle_df, df_routes): + """ + Performs route analysis by comparing IoT data with identified routes. + + Parameters: + vehicle_df (pd.DataFrame): IoT data for a single vehicle. + df_routes (pd.DataFrame): Identified routes. + + Returns: + pd.DataFrame: Analysis results including route adherence percentages and trip details. + """ + try: + if vehicle_df.empty or df_routes.empty: + print("Warning: One or both input datasets are empty.") + return pd.DataFrame() + + chassis_no = vehicle_df['chassis_no'].iloc[0] + print(f"\nPerforming route analysis for chassis: {chassis_no}") + + # Preprocess vehicle data + vehicle_df = vehicle_df.dropna(subset=['latitude', 'longitude']).copy() + vehicle_df['timestamp_distance'] = vehicle_df['odometer_kms'].diff().fillna(0).clip(lower=0) + vehicle_df['date_time'] = pd.to_datetime(vehicle_df['last_reported_time'], errors='coerce') + vehicle_df['Time_Difference'] = vehicle_df['date_time'].diff().dt.total_seconds().fillna(0) + vehicle_df['Date'] = vehicle_df['date_time'].dt.date # Store as date objects + vehicle_df['lat4'] = vehicle_df['latitude'].round(4) + vehicle_df['lon4'] = vehicle_df['longitude'].round(4) + + # Map Dates to Route Identification Numbers + unique_dates = sorted(vehicle_df['Date'].dropna().unique()) + date_route_mapping = {date: idx + 1 for idx, date in enumerate(unique_dates)} + + results = [] + + for date in unique_dates: + df_day = vehicle_df[vehicle_df['Date'] == date].dropna(subset=['lat4', 'lon4']).copy() + if df_day.empty: + continue + + # Convert IoT data coordinates to radians + new_coords_radians = np.radians(df_day[['lat4', 'lon4']].to_numpy()) + + # Get corresponding route identification number + route_no = date_route_mapping.get(date, None) + route = df_routes[df_routes['route_identification_no'] == route_no] + + if route.empty: + print(f"Warning: No route found for date {date}. Skipping analysis.") + continue + + # Convert route coordinates to radians and build KDTree + coords_radians = np.radians(route[['lat3', 'lon3']].to_numpy()) + tree = KDTree(coords_radians) + + # Define a search radius of 150 meters + radius = 150 / 6371000 # Earth radius in meters converted to radians + + # Find nearest route points for each IoT data point + distances, _ = tree.query(new_coords_radians, distance_upper_bound=radius) + df_day['occurrence'] = (distances < radius).astype(int) # Mark as 1 if within radius + + # Compute route following percentage + def route_percentage(df): + route_dist = df[df['occurrence'] == 1]['timestamp_distance'].sum() + total_dist = df['timestamp_distance'].sum() + return route_dist / total_dist if total_dist > 0 else 0 + + route_perc = route_percentage(df_day) + date_range = route['date_range'].iloc[0] if 'date_range' in route.columns and not route.empty else "N/A" + + # Store results + results.append({ + 'date': date, + 'route_identification_no': route_no, + 'percentage_route_followed': route_perc * 100, + 'percentage_non_route_followed': (1 - route_perc) * 100, + 'chassis_no': chassis_no, + 'Day': df_day['date_time'].dt.strftime('%A').iloc[0], + 'date_range': date_range + }) + + return pd.DataFrame(results) + + except Exception as e: + print(f"Error during route analysis for chassis {chassis_no}: {e}") + return pd.DataFrame() # Return empty DataFrame on failure + +# Step 4: Create Additional Features +def create_additional_features(vehicle_df): + """ + Creates additional features like static_time_percentage, peak_percentage, voltage_drop, etc. + + Parameters: + vehicle_df (pd.DataFrame): IoT data for a single vehicle. + + Returns: + pd.DataFrame: A DataFrame with computed additional features. + """ + if vehicle_df.empty: + print("Warning: Vehicle data is empty.") + return pd.DataFrame() + + vehicle_df = vehicle_df.dropna(subset=['latitude', 'longitude']).copy() + + try: + vehicle_df['last_reported_time'] = pd.to_datetime(vehicle_df['last_reported_time'], errors='coerce') + vehicle_df = vehicle_df.dropna(subset=['last_reported_time']) # Remove rows where datetime conversion failed + vehicle_df['date'] = vehicle_df['last_reported_time'].dt.date + except Exception as e: + print(f"Error processing dates: {e}") + return pd.DataFrame() + + # Compute necessary differences + vehicle_df['timestamp_distance'] = vehicle_df['odometer_kms'].diff().fillna(0).clip(lower=0) + vehicle_df['ivdiff'] = vehicle_df['voltage'].diff().fillna(0) + + results = [] + voltage_results = [] + + valid_dates = [] + last_valid_odo = None + + try: + for date, group in vehicle_df.groupby('date'): + try: + group_sorted = group.sort_values(by='last_reported_time') + odos = group_sorted['odometer_kms'].values + + if last_valid_odo is not None and odos[0] < last_valid_odo: + continue + if not all(odos[i] >= odos[i - 1] for i in range(1, len(odos))): + continue + + valid_dates.append(date) + last_valid_odo = odos[-1] + except Exception as e: + print(f"Skipping date {date} due to error: {e}") + continue + + for date in valid_dates: + try: + daily_data = vehicle_df[vehicle_df['date'] == date].sort_values(by='last_reported_time') + + total_working_seconds = static_time_seconds = peak_time_seconds = 0 + last_odometer = last_timestamp = None + total_distance = total_travel_time = 0 + + for _, row in daily_data.iterrows(): + if last_odometer is not None: + time_diff = (row['last_reported_time'] - last_timestamp).total_seconds() + current_hour = row['last_reported_time'].hour * 3600 + \ + row['last_reported_time'].minute * 60 + \ + row['last_reported_time'].second + + if row['odometer_kms'] == last_odometer: + static_time_seconds += time_diff + else: + total_working_seconds += time_diff + distance_covered = row['odometer_kms'] - last_odometer + total_distance += distance_covered + total_travel_time += time_diff + + if any(start <= current_hour < end for start, end in most_peak_periods) or \ + any(start <= current_hour < end for start, end in peak_periods): + peak_time_seconds += time_diff + + last_odometer = row['odometer_kms'] + last_timestamp = row['last_reported_time'] + + total_working_hours = total_working_seconds / 3600 if total_working_seconds > 0 else 0 + static_time_percentage = static_time_seconds / (total_working_seconds + static_time_seconds) if (total_working_seconds + static_time_seconds) > 0 else 1.0 + peak_percentage = peak_time_seconds / total_working_seconds if total_working_seconds > 0 else 0 + average_speed_motion = (total_distance / total_travel_time) * 3600 if total_travel_time > 0 else 0 + trip_duration = total_working_seconds + static_time_seconds + + results.append({ + 'date': date, + 'total_working_hours': total_working_hours, + 'static_time_percentage': static_time_percentage, + 'peak_percentage': peak_percentage, + 'average_speed_motion': average_speed_motion, + 'total_distance_odo_for_speed': total_distance, + 'time_moving': total_working_seconds, + 'time_static': static_time_seconds, + 'trip_duration': trip_duration + }) + + # Voltage drop calculations + v1 = v3 = 0 + flag = False + sum_ivdiff = 0 + charging_drop = 0 + + for _, row in daily_data.iterrows(): + if row['odometer_kms'] > 0 and not flag: + v1 = row['voltage'] + flag = True + + if row['timestamp_distance'] > 0: + if sum_ivdiff >= 0.4: + charging_drop += sum_ivdiff + sum_ivdiff = 0 + else: + sum_ivdiff += row['ivdiff'] + + if row['odometer_kms'] > 0: + v3 = row['voltage'] + + voltage_drop = abs(v1 - v3 + charging_drop) if v1 and v3 else 0 + voltage_results.append({'date': date, 'cumulative_voltage_drop': voltage_drop}) + except Exception as e: + print(f"Skipping feature calculation for date {date} due to error: {e}") + continue + + except Exception as e: + print(f"Unexpected error in processing: {e}") + return pd.DataFrame() + + # Convert results to DataFrames + metrics_df = pd.DataFrame(results) + voltage_df = pd.DataFrame(voltage_results) + + if metrics_df.empty and voltage_df.empty: + print("No valid data processed.") + return pd.DataFrame() + + # Merge additional features + final_df = pd.merge(metrics_df, voltage_df, on='date', how='outer') + + return final_df + + +# step 5 : merging features +def merge_features(route_analysis_df, additional_features_df): + """ + Merges route analysis and additional features DataFrames on the 'date' column. + + Parameters: + route_analysis_df (pd.DataFrame): DataFrame containing route analysis data. + additional_features_df (pd.DataFrame): DataFrame containing additional computed features. + + Returns: + pd.DataFrame: Merged DataFrame containing both route analysis and additional features. + """ + print("Merging route analysis and additional features...") + + # Ensure inputs are DataFrames + if isinstance(route_analysis_df, dict): + route_analysis_df = pd.DataFrame([route_analysis_df]) + elif isinstance(route_analysis_df, list): + route_analysis_df = pd.DataFrame(route_analysis_df) + + if isinstance(additional_features_df, dict): + additional_features_df = pd.DataFrame([additional_features_df]) + elif isinstance(additional_features_df, list): + additional_features_df = pd.DataFrame(additional_features_df) + + # Validate required columns + if route_analysis_df.empty or 'date' not in route_analysis_df.columns: + print("Error: route_analysis_df is empty or missing 'date' column.") + return pd.DataFrame() + + if additional_features_df.empty or 'date' not in additional_features_df.columns: + print("Error: additional_features_df is empty or missing 'date' column.") + return pd.DataFrame() + + # Standardize column names (convert both to lowercase for consistency) + route_analysis_df.columns = route_analysis_df.columns.str.lower() + additional_features_df.columns = additional_features_df.columns.str.lower() + + # Convert 'date' columns to proper date format + route_analysis_df['date'] = pd.to_datetime(route_analysis_df['date']).dt.date + additional_features_df['date'] = pd.to_datetime(additional_features_df['date']).dt.date + + # Merge on 'date' column + merged_df = pd.merge(route_analysis_df, additional_features_df, on='date', how='outer') + + # Sort by date + merged_df.sort_values(by='date', inplace=True) + + print("Merging completed successfully.") + return merged_df + +# step 6 : Generating the predictions +def generate_predictions(merged_df): + """ + Generates predictions using the pre-trained model. + """ + + # === Preprocessing Steps === + + # Step 1: Keep only rows where total_working_hours is at least 1 hour + merged_df = merged_df[merged_df['total_working_hours'] >= 1] + + # Step 2: Cap total_working_hours at 14 hours if it exceeds that + merged_df.loc[merged_df['total_working_hours'] > 14, 'total_working_hours'] = 14 + + # Step 3: Remove rows where percentage_route_followed is less than 1% + merged_df = merged_df[merged_df['percentage_route_followed'] >= 1] + + # Step 4: Identify rows where total_distance_odo_for_speed exceeds 200 (needs to be capped) + distance_capped_mask = merged_df['total_distance_odo_for_speed'] > 200 + + # Step 5: Cap total_distance_odo_for_speed at 200 km + merged_df.loc[distance_capped_mask, 'total_distance_odo_for_speed'] = 200 + + # Step 6: Recalculate average_speed_motion only for the capped rows + merged_df.loc[distance_capped_mask, 'average_speed_motion'] = ( + merged_df.loc[distance_capped_mask, 'total_distance_odo_for_speed'] / + merged_df.loc[distance_capped_mask, 'total_working_hours'] + ) + + # Step 7: Remove rows where total_distance_odo_for_speed is less than 20 km + merged_df = merged_df[merged_df['total_distance_odo_for_speed'] >= 20] + + # Step 8: Remove rows with invalid (negative) average_speed_motion + merged_df = merged_df[merged_df['average_speed_motion'] >= 0] + + # Step 9: Reset index after filtering + merged_df = merged_df.reset_index(drop=True) + + # === Feature Engineering === + + merged_df["total_distance"] = merged_df['total_distance_odo_for_speed'] + merged_df['route_km'] = (merged_df['total_distance_odo_for_speed'] * merged_df['percentage_route_followed']) / 100 + merged_df['non_route_km'] = (merged_df['total_distance_odo_for_speed'] * merged_df['percentage_non_route_followed']) / 100 + + merged_df['earning_distance'] = 1.0 + # Step 10: Calculate earning_distance based on route-followed and distance conditions + for index, row in merged_df.iterrows(): + if row['percentage_route_followed'] >= 0.70 and row['total_distance'] > 40: + merged_df.at[index, 'earning_distance'] = row['route_km'] + row['non_route_km'] * 0.3 + else: + merged_df.at[index, 'earning_distance'] = row['route_km'] + row['non_route_km'] * 0.8 + + # === Prediction === + features = ['cumulative_voltage_drop', 'static_time_percentage', 'total_working_hours', + 'peak_percentage', 'percentage_route_followed', 'average_speed_motion'] + + X = merged_df[features].fillna(0) + + # Step 12: Load model and make predictions + stacking_model = joblib.load(model_path) + merged_df['rf_model_predictions'] = stacking_model.predict(X) + merged_df.loc[merged_df['rf_model_predictions'] < 1, 'rf_model_predictions'] = 1 + + # Step 14: Calculate predicted income using fare, load factor, and passengers + fareperpassenger = 10 + numberofpassengers = 4 + load_factor = 0.75 + merged_df['predicted_income'] = ( + merged_df['earning_distance'] * fareperpassenger * load_factor * numberofpassengers / merged_df['rf_model_predictions'] + ).abs() + # Step 15: Cap predicted income at 2000 + merged_df.loc[merged_df['predicted_income'] > 2000, 'predicted_income'] = 2000 + + # Step 16: Drop rows where chassis_no is missing + merged_df = merged_df.dropna(subset=['chassis_no']) + + return merged_df + + +def main(vehicle_df): + """ + Main function to process a single vehicle's data and generate predictions. + + Parameters: + vehicle_df (pd.DataFrame): Data of a single vehicle. + + Returns: + pd.DataFrame: Processed data with predictions. + """ + try: + print("Starting main function.") + chassis_no = vehicle_df['chassis_no'].iloc[0] + print(f"\nProcessing chassis number: {chassis_no}") + + # Step 1: Route Identification + print("Performing route identification...") + df_routes = route_identification(vehicle_df) + print("Route identification completed.") + + # Step 2: Route Analysis + print("Performing route analysis...") + route_analysis_df = route_analysis(vehicle_df, df_routes) + print("Route analysis completed.") + + # Step 3: Create Additional Features + print("Creating additional features...") + additional_features_df = create_additional_features(vehicle_df) + print("Additional features created.") + + # Step 4: Merge Route Analysis & Additional Features + print("Merging data...") + merged_df = merge_features(route_analysis_df, additional_features_df) + print("Data merged.") + + # Step 5: Generate Predictions + print("Generating predictions...") + final_vehicle_df = generate_predictions(merged_df) + print("Predictions generated.") + + return final_vehicle_df + + except Exception as e: + print(f"Error processing chassis {chassis_no}: {e}") + return pd.DataFrame() # Return empty DataFrame on failure + + +if __name__ == "__main__": + # Parse Command-Line Arguments (Start & End Time) + if len(sys.argv) != 3: + print(" Error: Please provide start_time and end_time as arguments.") + sys.exit(1) + + start_time = sys.argv[1] + end_time = sys.argv[2] + + print(f"Fetching data from {start_time} to {end_time}...") + + # ENTER YOUR DATABASE CREDENTIALS HERE + DB_USERNAME = "postgres" # Example: "admin" + DB_PASSWORD = "vecmocon" # Example: "mypassword" + DB_HOST = "13.50.153.215:5432" # Example: "localhost" or "192.168.1.100" + DB_NAME = "ecofy-dashboard" # Example: "mydb" + TABLE_NAME = "device_data" # + + # Construct the PostgreSQL connection URL + DB_URL = f"postgresql+psycopg2://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}" + + # Create Database Engine + try: + engine = create_engine(DB_URL) + print("PostgreSQL Database connection successful!") + except Exception as e: + print(f"Error connecting to the PostgreSQL database: {e}") + sys.exit(1) + + # Fetch Raw Data from the Database + df = fetch_raw_data(engine, TABLE_NAME, start_time, end_time) + + if df.empty: + print("⚠️ No data returned from the database.") + sys.exit(0) + + # Process and Analyze Each Vehicle Separately + processed_data_list = [] + + try: + for chassis_no in df['chassis_no'].unique(): + print(f"\nProcessing chassis number: {chassis_no}") + + # Step 1: Filter data for the current vehicle + vehicle_df = df[df['chassis_no'] == chassis_no].copy() + + # Step 2: Process raw data for this vehicle + vehicle_df = process_raw_data(vehicle_df) + + # Step 3: Run the workflow for this vehicle + final_vehicle_df = main(vehicle_df) + + # Step 4: Store Processed Vehicle Data + if not final_vehicle_df.empty: + processed_data_list.append(final_vehicle_df) + + # Merge and Insert Processed Data into the Database + if processed_data_list: + final_predictions = pd.concat(processed_data_list, ignore_index=True) + print(" Workflow completed successfully.") + + try: + with engine.begin() as conn: + INSERT_SQL = f"INSERT INTO monthly_predicted_device_data ({', '.join(final_predictions.columns)}) VALUES ({', '.join([f':{col}' for col in final_predictions.columns])})" + conn.execute(text(INSERT_SQL), final_predictions.to_dict(orient="records")) + + print(" Processed data inserted into 'monthly_predicted_device_data' successfully!") + except Exception as e: + print(f"Error inserting processed data into the database: {e}") + sys.exit(1) + else: + print("No valid vehicle data was processed.") + + except Exception as e: + print(f" Error during workflow execution: {e}") + sys.exit(1) + \ No newline at end of file