L5IncomeModel/L5_DB_script_ecofy.py

638 lines
26 KiB
Python

import pandas as pd
import numpy as np
from sklearn.neighbors import KDTree
from scipy.spatial import KDTree
import joblib
from sqlalchemy import create_engine , text
import sys
# Step 1: Fetch raw data from the Database
model_path =r"C:\Users\vinayak\OneDrive\Desktop\Vecmocon\Script\stacking_model_xg_logic_diff.pkl"
most_peak_periods = [(22 * 3600, 24 * 3600), (0, 6 * 3600)]
peak_periods = [(6 * 3600, 12 * 3600), (18 * 3600, 22 * 3600)]
def fetch_raw_data(engine, table_name, start_time=None, end_time=None):
try:
query = f"SELECT * FROM {table_name}"
if start_time and end_time:
query += f" WHERE last_reported_time BETWEEN '{start_time}' AND '{end_time}'"
with engine.connect() as conn:
result = conn.execute(text(query))
rows = result.fetchall() # Fetch all rows
columns = result.keys() # Get column names
# Convert to DataFrame
df = pd.DataFrame(rows, columns=columns)
print("\nData fetched from the database:")
print(df.head())
return df
except Exception as e:
print(f"\nError fetching data from the database: {e}")
return pd.DataFrame()
# step 2 process raw data
def process_raw_data(vehicle_df):
"""
Process raw data for a single vehicle: sort by last_reported_time, remove duplicate timestamps, and clean data.
Parameters:
vehicle_df (pd.DataFrame): Data of a single vehicle.
Returns:
pd.DataFrame: Processed and sorted data.
"""
try:
# Ensure last_reported_time column is in datetime format
vehicle_df['last_reported_time'] = pd.to_datetime(vehicle_df['last_reported_time'], errors='coerce')
# Step 1: Sort data by last_reported_time
vehicle_df = vehicle_df.sort_values(by="last_reported_time").reset_index(drop=True)
# Step 2: Remove duplicate timestamps (keeping the latest occurrence)
vehicle_df = vehicle_df.drop_duplicates(subset=['last_reported_time'], keep='last')
# Step 3: Clean missing values if necessary (optional)
vehicle_df = vehicle_df.dropna(subset=['last_reported_time', 'chassis_no'])
return vehicle_df
except Exception as e:
print(f"Error processing raw data for chassis {vehicle_df['chassis_no'].iloc[0] if 'chassis_no' in vehicle_df else 'Unknown'}: {e}")
return pd.DataFrame() # Return empty DataFrame on failure
# step 3 : route identification
def route_identification(vehicle_df):
"""
Identifies routes for a single vehicle's data.
Parameters:
vehicle_df (pd.DataFrame): Data for a single vehicle.
Returns:
pd.DataFrame: Identified routes with frequency and date range.
"""
try:
chassis_no = vehicle_df['chassis_no'].iloc[0]
print(f"\nPerforming route identification for chassis: {chassis_no}")
# Drop rows with missing latitude or longitude values
vehicle_df = vehicle_df.dropna(subset=['latitude', 'longitude']).copy()
# Compute distance traveled (ensuring no negative values)
vehicle_df['timestamp_distance'] = vehicle_df['odometer_kms'].diff().fillna(0).clip(lower=0)
# Convert timestamps to datetime
vehicle_df['date_time'] = pd.to_datetime(vehicle_df['last_reported_time'], errors='coerce')
# Compute time differences & extract date
vehicle_df['Time_Difference'] = vehicle_df['date_time'].diff().dt.total_seconds().fillna(0)
vehicle_df['Date'] = vehicle_df['date_time'].dt.date # Store as date objects
# Round coordinates for clustering
vehicle_df['lat3'] = vehicle_df['latitude'].round(3)
vehicle_df['lon3'] = vehicle_df['longitude'].round(3)
vehicle_df['coordinates3'] = vehicle_df['lat3'].astype(str) + ',' + vehicle_df['lon3'].astype(str)
# Get sorted unique dates
unique_dates = np.sort(vehicle_df['Date'].dropna().unique())
# Initialize DataFrame for identified routes
all_routes = pd.DataFrame()
# Process in a rolling window (7 day, considering sequential processing per vehicle)
for start_idx in range(len(unique_dates) - 7):
window_start_date = unique_dates[start_idx]
window_end_date = unique_dates[start_idx + 7]
date_range_str = f"{window_start_date.strftime('%b %d')} - {window_end_date.strftime('%b %d')}"
# Filter data for the current window
df_window = vehicle_df[
(vehicle_df['Date'] >= window_start_date) &
(vehicle_df['Date'] <= window_end_date)
].copy()
# Keep only records where the vehicle has moved
df_moving = df_window[df_window['timestamp_distance'] > 0]
# Remove consecutive duplicate coordinates
df_moving = df_moving.loc[df_moving['coordinates3'].shift() != df_moving['coordinates3']]
# Compute coordinate frequency
coord_counts = df_moving['coordinates3'].value_counts().reset_index()
coord_counts.columns = ['coordinates3', 'frequency3']
# Apply frequency threshold (80th percentile)
if not coord_counts.empty:
threshold = np.percentile(coord_counts['frequency3'], 80)
route = coord_counts[coord_counts['frequency3'] >= threshold].copy()
# Convert coordinate strings to lat/lon
route[['lat3', 'lon3']] = route['coordinates3'].str.split(',', expand=True).astype(float)
# Assign route identification number and date range
route['route_identification_no'] = start_idx + 1
route['date_range'] = date_range_str
route['chassis_no'] = chassis_no # Keep track of vehicle ID
# Append to all routes
all_routes = pd.concat([all_routes, route], ignore_index=True)
return all_routes
except Exception as e:
print(f"Error in route identification for chassis {chassis_no}: {e}")
return pd.DataFrame() # Return empty DataFrame on failure
# Step 4: Route Analysis
def route_analysis(vehicle_df, df_routes):
"""
Performs route analysis by comparing IoT data with identified routes.
Parameters:
vehicle_df (pd.DataFrame): IoT data for a single vehicle.
df_routes (pd.DataFrame): Identified routes.
Returns:
pd.DataFrame: Analysis results including route adherence percentages and trip details.
"""
try:
if vehicle_df.empty or df_routes.empty:
print("Warning: One or both input datasets are empty.")
return pd.DataFrame()
chassis_no = vehicle_df['chassis_no'].iloc[0]
print(f"\nPerforming route analysis for chassis: {chassis_no}")
# Preprocess vehicle data
vehicle_df = vehicle_df.dropna(subset=['latitude', 'longitude']).copy()
vehicle_df['timestamp_distance'] = vehicle_df['odometer_kms'].diff().fillna(0).clip(lower=0)
vehicle_df['date_time'] = pd.to_datetime(vehicle_df['last_reported_time'], errors='coerce')
vehicle_df['Time_Difference'] = vehicle_df['date_time'].diff().dt.total_seconds().fillna(0)
vehicle_df['Date'] = vehicle_df['date_time'].dt.date # Store as date objects
vehicle_df['lat4'] = vehicle_df['latitude'].round(4)
vehicle_df['lon4'] = vehicle_df['longitude'].round(4)
# Map Dates to Route Identification Numbers
unique_dates = sorted(vehicle_df['Date'].dropna().unique())
date_route_mapping = {date: idx + 1 for idx, date in enumerate(unique_dates)}
results = []
for date in unique_dates:
df_day = vehicle_df[vehicle_df['Date'] == date].dropna(subset=['lat4', 'lon4']).copy()
if df_day.empty:
continue
# Convert IoT data coordinates to radians
new_coords_radians = np.radians(df_day[['lat4', 'lon4']].to_numpy())
# Get corresponding route identification number
route_no = date_route_mapping.get(date, None)
route = df_routes[df_routes['route_identification_no'] == route_no]
if route.empty:
print(f"Warning: No route found for date {date}. Skipping analysis.")
continue
# Convert route coordinates to radians and build KDTree
coords_radians = np.radians(route[['lat3', 'lon3']].to_numpy())
tree = KDTree(coords_radians)
# Define a search radius of 150 meters
radius = 150 / 6371000 # Earth radius in meters converted to radians
# Find nearest route points for each IoT data point
distances, _ = tree.query(new_coords_radians, distance_upper_bound=radius)
df_day['occurrence'] = (distances < radius).astype(int) # Mark as 1 if within radius
# Compute route following percentage
def route_percentage(df):
route_dist = df[df['occurrence'] == 1]['timestamp_distance'].sum()
total_dist = df['timestamp_distance'].sum()
return route_dist / total_dist if total_dist > 0 else 0
route_perc = route_percentage(df_day)
date_range = route['date_range'].iloc[0] if 'date_range' in route.columns and not route.empty else "N/A"
# Store results
results.append({
'date': date,
'route_identification_no': route_no,
'percentage_route_followed': route_perc * 100,
'percentage_non_route_followed': (1 - route_perc) * 100,
'chassis_no': chassis_no,
'Day': df_day['date_time'].dt.strftime('%A').iloc[0],
'date_range': date_range
})
return pd.DataFrame(results)
except Exception as e:
print(f"Error during route analysis for chassis {chassis_no}: {e}")
return pd.DataFrame() # Return empty DataFrame on failure
# Step 4: Create Additional Features
def create_additional_features(vehicle_df):
"""
Creates additional features like static_time_percentage, peak_percentage, voltage_drop, etc.
Parameters:
vehicle_df (pd.DataFrame): IoT data for a single vehicle.
Returns:
pd.DataFrame: A DataFrame with computed additional features.
"""
if vehicle_df.empty:
print("Warning: Vehicle data is empty.")
return pd.DataFrame()
vehicle_df = vehicle_df.dropna(subset=['latitude', 'longitude']).copy()
try:
vehicle_df['last_reported_time'] = pd.to_datetime(vehicle_df['last_reported_time'], errors='coerce')
vehicle_df = vehicle_df.dropna(subset=['last_reported_time']) # Remove rows where datetime conversion failed
vehicle_df['date'] = vehicle_df['last_reported_time'].dt.date
except Exception as e:
print(f"Error processing dates: {e}")
return pd.DataFrame()
# Compute necessary differences
vehicle_df['timestamp_distance'] = vehicle_df['odometer_kms'].diff().fillna(0).clip(lower=0)
vehicle_df['ivdiff'] = vehicle_df['voltage'].diff().fillna(0)
results = []
voltage_results = []
valid_dates = []
last_valid_odo = None
try:
for date, group in vehicle_df.groupby('date'):
try:
group_sorted = group.sort_values(by='last_reported_time')
odos = group_sorted['odometer_kms'].values
if last_valid_odo is not None and odos[0] < last_valid_odo:
continue
if not all(odos[i] >= odos[i - 1] for i in range(1, len(odos))):
continue
valid_dates.append(date)
last_valid_odo = odos[-1]
except Exception as e:
print(f"Skipping date {date} due to error: {e}")
continue
for date in valid_dates:
try:
daily_data = vehicle_df[vehicle_df['date'] == date].sort_values(by='last_reported_time')
total_working_seconds = static_time_seconds = peak_time_seconds = 0
last_odometer = last_timestamp = None
total_distance = total_travel_time = 0
for _, row in daily_data.iterrows():
if last_odometer is not None:
time_diff = (row['last_reported_time'] - last_timestamp).total_seconds()
current_hour = row['last_reported_time'].hour * 3600 + \
row['last_reported_time'].minute * 60 + \
row['last_reported_time'].second
if row['odometer_kms'] == last_odometer:
static_time_seconds += time_diff
else:
total_working_seconds += time_diff
distance_covered = row['odometer_kms'] - last_odometer
total_distance += distance_covered
total_travel_time += time_diff
if any(start <= current_hour < end for start, end in most_peak_periods) or \
any(start <= current_hour < end for start, end in peak_periods):
peak_time_seconds += time_diff
last_odometer = row['odometer_kms']
last_timestamp = row['last_reported_time']
total_working_hours = total_working_seconds / 3600 if total_working_seconds > 0 else 0
static_time_percentage = static_time_seconds / (total_working_seconds + static_time_seconds) if (total_working_seconds + static_time_seconds) > 0 else 1.0
peak_percentage = peak_time_seconds / total_working_seconds if total_working_seconds > 0 else 0
average_speed_motion = (total_distance / total_travel_time) * 3600 if total_travel_time > 0 else 0
trip_duration = total_working_seconds + static_time_seconds
results.append({
'date': date,
'total_working_hours': total_working_hours,
'static_time_percentage': static_time_percentage,
'peak_percentage': peak_percentage,
'average_speed_motion': average_speed_motion,
'total_distance_odo_for_speed': total_distance,
'time_moving': total_working_seconds,
'time_static': static_time_seconds,
'trip_duration': trip_duration
})
# Voltage drop calculations
v1 = v3 = 0
flag = False
sum_ivdiff = 0
charging_drop = 0
for _, row in daily_data.iterrows():
if row['odometer_kms'] > 0 and not flag:
v1 = row['voltage']
flag = True
if row['timestamp_distance'] > 0:
if sum_ivdiff >= 0.4:
charging_drop += sum_ivdiff
sum_ivdiff = 0
else:
sum_ivdiff += row['ivdiff']
if row['odometer_kms'] > 0:
v3 = row['voltage']
voltage_drop = abs(v1 - v3 + charging_drop) if v1 and v3 else 0
voltage_results.append({'date': date, 'cumulative_voltage_drop': voltage_drop})
except Exception as e:
print(f"Skipping feature calculation for date {date} due to error: {e}")
continue
except Exception as e:
print(f"Unexpected error in processing: {e}")
return pd.DataFrame()
# Convert results to DataFrames
metrics_df = pd.DataFrame(results)
voltage_df = pd.DataFrame(voltage_results)
if metrics_df.empty and voltage_df.empty:
print("No valid data processed.")
return pd.DataFrame()
# Merge additional features
final_df = pd.merge(metrics_df, voltage_df, on='date', how='outer')
return final_df
# step 5 : merging features
def merge_features(route_analysis_df, additional_features_df):
"""
Merges route analysis and additional features DataFrames on the 'date' column.
Parameters:
route_analysis_df (pd.DataFrame): DataFrame containing route analysis data.
additional_features_df (pd.DataFrame): DataFrame containing additional computed features.
Returns:
pd.DataFrame: Merged DataFrame containing both route analysis and additional features.
"""
print("Merging route analysis and additional features...")
# Ensure inputs are DataFrames
if isinstance(route_analysis_df, dict):
route_analysis_df = pd.DataFrame([route_analysis_df])
elif isinstance(route_analysis_df, list):
route_analysis_df = pd.DataFrame(route_analysis_df)
if isinstance(additional_features_df, dict):
additional_features_df = pd.DataFrame([additional_features_df])
elif isinstance(additional_features_df, list):
additional_features_df = pd.DataFrame(additional_features_df)
# Validate required columns
if route_analysis_df.empty or 'date' not in route_analysis_df.columns:
print("Error: route_analysis_df is empty or missing 'date' column.")
return pd.DataFrame()
if additional_features_df.empty or 'date' not in additional_features_df.columns:
print("Error: additional_features_df is empty or missing 'date' column.")
return pd.DataFrame()
# Standardize column names (convert both to lowercase for consistency)
route_analysis_df.columns = route_analysis_df.columns.str.lower()
additional_features_df.columns = additional_features_df.columns.str.lower()
# Convert 'date' columns to proper date format
route_analysis_df['date'] = pd.to_datetime(route_analysis_df['date']).dt.date
additional_features_df['date'] = pd.to_datetime(additional_features_df['date']).dt.date
# Merge on 'date' column
merged_df = pd.merge(route_analysis_df, additional_features_df, on='date', how='outer')
# Sort by date
merged_df.sort_values(by='date', inplace=True)
print("Merging completed successfully.")
return merged_df
# step 6 : Generating the predictions
def generate_predictions(merged_df):
"""
Generates predictions using the pre-trained model.
"""
# === Preprocessing Steps ===
# Step 1: Keep only rows where total_working_hours is at least 1 hour
merged_df = merged_df[merged_df['total_working_hours'] >= 1]
# Step 2: Cap total_working_hours at 14 hours if it exceeds that
merged_df.loc[merged_df['total_working_hours'] > 14, 'total_working_hours'] = 14
# Step 3: Remove rows where percentage_route_followed is less than 1%
merged_df = merged_df[merged_df['percentage_route_followed'] >= 1]
# Step 4: Identify rows where total_distance_odo_for_speed exceeds 200 (needs to be capped)
distance_capped_mask = merged_df['total_distance_odo_for_speed'] > 200
# Step 5: Cap total_distance_odo_for_speed at 200 km
merged_df.loc[distance_capped_mask, 'total_distance_odo_for_speed'] = 200
# Step 6: Recalculate average_speed_motion only for the capped rows
merged_df.loc[distance_capped_mask, 'average_speed_motion'] = (
merged_df.loc[distance_capped_mask, 'total_distance_odo_for_speed'] /
merged_df.loc[distance_capped_mask, 'total_working_hours']
)
# Step 7: Remove rows where total_distance_odo_for_speed is less than 20 km
merged_df = merged_df[merged_df['total_distance_odo_for_speed'] >= 20]
# Step 8: Remove rows with invalid (negative) average_speed_motion
merged_df = merged_df[merged_df['average_speed_motion'] >= 0]
# Step 9: Reset index after filtering
merged_df = merged_df.reset_index(drop=True)
# === Feature Engineering ===
merged_df["total_distance"] = merged_df['total_distance_odo_for_speed']
merged_df['route_km'] = (merged_df['total_distance_odo_for_speed'] * merged_df['percentage_route_followed']) / 100
merged_df['non_route_km'] = (merged_df['total_distance_odo_for_speed'] * merged_df['percentage_non_route_followed']) / 100
merged_df['earning_distance'] = 1.0
# Step 10: Calculate earning_distance based on route-followed and distance conditions
for index, row in merged_df.iterrows():
if row['percentage_route_followed'] >= 0.70 and row['total_distance'] > 40:
merged_df.at[index, 'earning_distance'] = row['route_km'] + row['non_route_km'] * 0.3
else:
merged_df.at[index, 'earning_distance'] = row['route_km'] + row['non_route_km'] * 0.8
# === Prediction ===
features = ['cumulative_voltage_drop', 'static_time_percentage', 'total_working_hours',
'peak_percentage', 'percentage_route_followed', 'average_speed_motion']
X = merged_df[features].fillna(0)
# Step 12: Load model and make predictions
stacking_model = joblib.load(model_path)
merged_df['rf_model_predictions'] = stacking_model.predict(X)
merged_df.loc[merged_df['rf_model_predictions'] < 1, 'rf_model_predictions'] = 1
# Step 14: Calculate predicted income using fare, load factor, and passengers
fareperpassenger = 10
numberofpassengers = 4
load_factor = 0.75
merged_df['predicted_income'] = (
merged_df['earning_distance'] * fareperpassenger * load_factor * numberofpassengers / merged_df['rf_model_predictions']
).abs()
# Step 15: Cap predicted income at 2000
merged_df.loc[merged_df['predicted_income'] > 2000, 'predicted_income'] = 2000
# Step 16: Drop rows where chassis_no is missing
merged_df = merged_df.dropna(subset=['chassis_no'])
return merged_df
def main(vehicle_df):
"""
Main function to process a single vehicle's data and generate predictions.
Parameters:
vehicle_df (pd.DataFrame): Data of a single vehicle.
Returns:
pd.DataFrame: Processed data with predictions.
"""
try:
print("Starting main function.")
chassis_no = vehicle_df['chassis_no'].iloc[0]
print(f"\nProcessing chassis number: {chassis_no}")
# Step 1: Route Identification
print("Performing route identification...")
df_routes = route_identification(vehicle_df)
print("Route identification completed.")
# Step 2: Route Analysis
print("Performing route analysis...")
route_analysis_df = route_analysis(vehicle_df, df_routes)
print("Route analysis completed.")
# Step 3: Create Additional Features
print("Creating additional features...")
additional_features_df = create_additional_features(vehicle_df)
print("Additional features created.")
# Step 4: Merge Route Analysis & Additional Features
print("Merging data...")
merged_df = merge_features(route_analysis_df, additional_features_df)
print("Data merged.")
# Step 5: Generate Predictions
print("Generating predictions...")
final_vehicle_df = generate_predictions(merged_df)
print("Predictions generated.")
return final_vehicle_df
except Exception as e:
print(f"Error processing chassis {chassis_no}: {e}")
return pd.DataFrame() # Return empty DataFrame on failure
if __name__ == "__main__":
# Parse Command-Line Arguments (Start & End Time)
if len(sys.argv) != 3:
print(" Error: Please provide start_time and end_time as arguments.")
sys.exit(1)
start_time = sys.argv[1]
end_time = sys.argv[2]
print(f"Fetching data from {start_time} to {end_time}...")
# ENTER YOUR DATABASE CREDENTIALS HERE
DB_USERNAME = "postgres" # Example: "admin"
DB_PASSWORD = "vecmocon" # Example: "mypassword"
DB_HOST = "13.50.153.215:5432" # Example: "localhost" or "192.168.1.100"
DB_NAME = "ecofy-dashboard" # Example: "mydb"
TABLE_NAME = "device_data" #
# Construct the PostgreSQL connection URL
DB_URL = f"postgresql+psycopg2://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}"
# Create Database Engine
try:
engine = create_engine(DB_URL)
print("PostgreSQL Database connection successful!")
except Exception as e:
print(f"Error connecting to the PostgreSQL database: {e}")
sys.exit(1)
# Fetch Raw Data from the Database
df = fetch_raw_data(engine, TABLE_NAME, start_time, end_time)
if df.empty:
print("⚠️ No data returned from the database.")
sys.exit(0)
# Process and Analyze Each Vehicle Separately
processed_data_list = []
try:
for chassis_no in df['chassis_no'].unique():
print(f"\nProcessing chassis number: {chassis_no}")
# Step 1: Filter data for the current vehicle
vehicle_df = df[df['chassis_no'] == chassis_no].copy()
# Step 2: Process raw data for this vehicle
vehicle_df = process_raw_data(vehicle_df)
# Step 3: Run the workflow for this vehicle
final_vehicle_df = main(vehicle_df)
# Step 4: Store Processed Vehicle Data
if not final_vehicle_df.empty:
processed_data_list.append(final_vehicle_df)
# Merge and Insert Processed Data into the Database
if processed_data_list:
final_predictions = pd.concat(processed_data_list, ignore_index=True)
print(" Workflow completed successfully.")
try:
with engine.begin() as conn:
INSERT_SQL = f"INSERT INTO monthly_predicted_device_data ({', '.join(final_predictions.columns)}) VALUES ({', '.join([f':{col}' for col in final_predictions.columns])})"
conn.execute(text(INSERT_SQL), final_predictions.to_dict(orient="records"))
print(" Processed data inserted into 'monthly_predicted_device_data' successfully!")
except Exception as e:
print(f"Error inserting processed data into the database: {e}")
sys.exit(1)
else:
print("No valid vehicle data was processed.")
except Exception as e:
print(f" Error during workflow execution: {e}")
sys.exit(1)