Smithery Logo
MCPsSkillsDocsPricing
Login
Smithery Logo

Accelerating the Agent Economy

Resources

DocumentationPrivacy PolicySystem Status

Company

PricingAboutBlog

Connect

© 2026 Smithery. All rights reserved.

    kishorkukreja

    production-scheduling

    kishorkukreja/production-scheduling
    Planning
    3
    2 installs

    About

    SKILL.md

    Install

    Install via Skills CLI

    or add to your agent
    • Claude Code
      Claude Code
    • Codex
      Codex
    • OpenClaw
      OpenClaw
    • Cursor
      Cursor
    • Amp
      Amp
    • GitHub Copilot
      GitHub Copilot
    • Gemini CLI
      Gemini CLI
    • Kilo Code
      Kilo Code
    • Junie
      Junie
    • Replit
      Replit
    • Windsurf
      Windsurf
    • Cline
      Cline
    • Continue
      Continue
    • OpenCode
      OpenCode
    • OpenHands
      OpenHands
    • Roo Code
      Roo Code
    • Augment
      Augment
    • Goose
      Goose
    • Trae
      Trae
    • Zencoder
      Zencoder
    • Antigravity
      Antigravity
    ├─
    ├─
    └─

    About

    When the user wants to schedule production operations, optimize manufacturing schedules, balance workloads, or sequence jobs...

    SKILL.md

    Production Scheduling

    You are an expert in production scheduling and manufacturing operations. Your goal is to help organizations create efficient production schedules that minimize makespan, reduce WIP, meet due dates, and maximize resource utilization.

    Initial Assessment

    Before developing production schedules, understand:

    1. Manufacturing Environment

      • Shop type? (job shop, flow shop, batch production, continuous)
      • Number of machines/work centers?
      • Product variety and volumes?
      • Make-to-order, make-to-stock, or assemble-to-order?
    2. Scheduling Constraints

      • Precedence constraints (routing/sequence requirements)?
      • Machine capabilities and restrictions?
      • Setup times and changeover requirements?
      • Resource constraints (labor, tooling, materials)?
      • Shift patterns and availability?
    3. Scheduling Objectives

      • Primary goal? (minimize makespan, meet due dates, reduce WIP)
      • Due date commitments and penalties?
      • Priority rules for competing jobs?
      • Trade-offs between objectives?
    4. Current State

      • Current scheduling method? (manual, ERP, spreadsheet)
      • Schedule performance metrics?
      • Known bottlenecks or issues?
      • Planning horizon and replan frequency?

    Production Scheduling Framework

    Shop Floor Configurations

    1. Job Shop

    • Multiple products with unique routings
    • High variety, low volume
    • Complex scheduling problem (NP-hard)
    • Flexible machines, varied sequences
    • Example: Custom machinery, tool & die shops

    2. Flow Shop

    • Linear production flow
    • All products follow same machine sequence
    • Easier to schedule than job shop
    • Focus on sequence optimization
    • Example: Electronics assembly, food processing

    3. Batch Production

    • Produce in batches
    • Setup-dependent scheduling
    • Minimize changeovers
    • Balance batch sizes with demand
    • Example: Chemicals, pharmaceuticals

    4. Continuous Production

    • 24/7 operations
    • Minimize changeovers and downtime
    • Focus on throughput maximization
    • Production leveling important
    • Example: Oil refining, paper mills

    Scheduling Objectives

    Common Objectives:

    1. Minimize Makespan - Total completion time for all jobs
    2. Minimize Tardiness - Late deliveries relative to due dates
    3. Minimize WIP - Work-in-process inventory
    4. Maximize Throughput - Units produced per period
    5. Maximize Utilization - Equipment and labor efficiency
    6. Minimize Setup Time - Reduce changeovers between jobs

    Multi-Objective Optimization:

    • Often need to balance multiple objectives
    • Use weighted objective functions or Pareto optimization
    • Trade-off analysis between competing goals

    Job Shop Scheduling

    Problem Formulation

    Job Shop Scheduling Problem (JSP):

    Given:

    • n jobs J₁, J₂, ..., Jₙ
    • m machines M₁, M₂, ..., Mₘ
    • Each job has sequence of operations with processing times
    • Each operation requires specific machine
    • No preemption (operations can't be interrupted)

    Objective: Minimize makespan (Cmax = max completion time)

    Classic JSP Algorithms

    Dispatching Rules:

    Simple priority rules for job sequencing:

    import pandas as pd
    import numpy as np
    from typing import List, Dict, Tuple
    
    class DispatchingRules:
        """
        Common dispatching rules for production scheduling
        """
    
        @staticmethod
        def FCFS(jobs):
            """First Come First Served - arrival order"""
            return sorted(jobs, key=lambda j: j['arrival_time'])
    
        @staticmethod
        def SPT(jobs):
            """Shortest Processing Time first"""
            return sorted(jobs, key=lambda j: j['processing_time'])
    
        @staticmethod
        def LPT(jobs):
            """Longest Processing Time first"""
            return sorted(jobs, key=lambda j: j['processing_time'], reverse=True)
    
        @staticmethod
        def EDD(jobs):
            """Earliest Due Date first"""
            return sorted(jobs, key=lambda j: j['due_date'])
    
        @staticmethod
        def CR(jobs, current_time):
            """Critical Ratio = (due_date - current_time) / remaining_processing_time"""
            for job in jobs:
                slack = job['due_date'] - current_time
                job['cr'] = slack / job['remaining_time'] if job['remaining_time'] > 0 else float('inf')
            return sorted(jobs, key=lambda j: j['cr'])
    
        @staticmethod
        def SLACK(jobs, current_time):
            """Minimum Slack = due_date - current_time - remaining_processing_time"""
            for job in jobs:
                job['slack'] = job['due_date'] - current_time - job['remaining_time']
            return sorted(jobs, key=lambda j: j['slack'])
    
        @staticmethod
        def WSPT(jobs):
            """Weighted Shortest Processing Time"""
            for job in jobs:
                job['wspt'] = job['priority'] / job['processing_time']
            return sorted(jobs, key=lambda j: j['wspt'], reverse=True)
    
    # Example usage
    jobs = [
        {'id': 'J1', 'arrival_time': 0, 'processing_time': 5, 'due_date': 15,
         'remaining_time': 5, 'priority': 3},
        {'id': 'J2', 'arrival_time': 1, 'processing_time': 3, 'due_date': 10,
         'remaining_time': 3, 'priority': 2},
        {'id': 'J3', 'arrival_time': 2, 'processing_time': 8, 'due_date': 20,
         'remaining_time': 8, 'priority': 1},
        {'id': 'J4', 'arrival_time': 3, 'processing_time': 4, 'due_date': 12,
         'remaining_time': 4, 'priority': 2},
    ]
    
    # Apply different rules
    spt_sequence = DispatchingRules.SPT(jobs.copy())
    edd_sequence = DispatchingRules.EDD(jobs.copy())
    cr_sequence = DispatchingRules.CR(jobs.copy(), current_time=5)
    
    print("SPT Sequence:", [j['id'] for j in spt_sequence])
    print("EDD Sequence:", [j['id'] for j in edd_sequence])
    print("CR Sequence:", [j['id'] for j in cr_sequence])
    

    Job Shop Scheduling with Constraint Programming

    from ortools.sat.python import cp_model
    import pandas as pd
    
    class JobShopScheduler:
        """
        Solve Job Shop Scheduling Problem using CP-SAT
        Minimize makespan
        """
    
        def __init__(self, jobs_data):
            """
            jobs_data: list of jobs, each job is list of (machine, duration) tuples
    
            Example:
            jobs_data = [
                [(0, 3), (1, 2), (2, 2)],  # Job 0: M0(3h) -> M1(2h) -> M2(2h)
                [(0, 2), (2, 1), (1, 4)],  # Job 1: M0(2h) -> M2(1h) -> M1(4h)
                [(1, 4), (2, 3)]           # Job 2: M1(4h) -> M2(3h)
            ]
            """
            self.jobs_data = jobs_data
            self.num_jobs = len(jobs_data)
            self.num_machines = max(task[0] for job in jobs_data for task in job) + 1
            self.all_machines = range(self.num_machines)
    
            # Compute horizon (upper bound on makespan)
            self.horizon = sum(task[1] for job in jobs_data for task in job)
    
            self.model = cp_model.CpModel()
            self.solver = cp_model.CpSolver()
    
        def create_model(self):
            """Build the constraint programming model"""
    
            # Create variables
            self.task_type = {}
            self.starts = {}
            self.ends = {}
            self.intervals = {}
    
            for job_id, job in enumerate(self.jobs_data):
                for task_id, task in enumerate(job):
                    machine, duration = task
                    suffix = f'_{job_id}_{task_id}'
    
                    # Start time variable
                    start_var = self.model.NewIntVar(0, self.horizon, f'start{suffix}')
                    # End time variable
                    end_var = self.model.NewIntVar(0, self.horizon, f'end{suffix}')
                    # Interval variable
                    interval_var = self.model.NewIntervalVar(
                        start_var, duration, end_var, f'interval{suffix}'
                    )
    
                    self.task_type[(job_id, task_id)] = (machine, duration)
                    self.starts[(job_id, task_id)] = start_var
                    self.ends[(job_id, task_id)] = end_var
                    self.intervals[(job_id, task_id)] = interval_var
    
            # Precedence constraints (within each job)
            for job_id, job in enumerate(self.jobs_data):
                for task_id in range(len(job) - 1):
                    self.model.Add(
                        self.ends[(job_id, task_id)] <=
                        self.starts[(job_id, task_id + 1)]
                    )
    
            # No overlap constraints (for each machine)
            for machine in self.all_machines:
                intervals_on_machine = [
                    self.intervals[(job_id, task_id)]
                    for job_id, job in enumerate(self.jobs_data)
                    for task_id, task in enumerate(job)
                    if task[0] == machine
                ]
                if intervals_on_machine:
                    self.model.AddNoOverlap(intervals_on_machine)
    
            # Objective: minimize makespan
            self.makespan = self.model.NewIntVar(0, self.horizon, 'makespan')
            self.model.AddMaxEquality(
                self.makespan,
                [self.ends[(job_id, len(job) - 1)]
                 for job_id, job in enumerate(self.jobs_data)]
            )
            self.model.Minimize(self.makespan)
    
        def solve(self, time_limit_seconds=10):
            """Solve the scheduling problem"""
    
            self.solver.parameters.max_time_in_seconds = time_limit_seconds
            status = self.solver.Solve(self.model)
    
            if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
                return self._extract_solution()
            else:
                return None
    
        def _extract_solution(self):
            """Extract schedule from solved model"""
    
            schedule = []
    
            for job_id, job in enumerate(self.jobs_data):
                for task_id, task in enumerate(job):
                    machine, duration = task
                    start = self.solver.Value(self.starts[(job_id, task_id)])
                    end = self.solver.Value(self.ends[(job_id, task_id)])
    
                    schedule.append({
                        'job': job_id,
                        'task': task_id,
                        'machine': machine,
                        'start': start,
                        'end': end,
                        'duration': duration
                    })
    
            makespan = self.solver.Value(self.makespan)
    
            return {
                'makespan': makespan,
                'schedule': pd.DataFrame(schedule),
                'objective': self.solver.ObjectiveValue(),
                'solve_time': self.solver.WallTime()
            }
    
        def print_schedule(self, solution):
            """Print the schedule in readable format"""
    
            if solution is None:
                print("No solution found")
                return
    
            print(f"\nOptimal Makespan: {solution['makespan']}")
            print(f"Solve Time: {solution['solve_time']:.2f} seconds\n")
    
            df = solution['schedule'].sort_values(['machine', 'start'])
    
            for machine in sorted(df['machine'].unique()):
                print(f"Machine {machine}:")
                machine_tasks = df[df['machine'] == machine]
                for _, row in machine_tasks.iterrows():
                    print(f"  Job {row['job']}, Task {row['task']}: "
                          f"[{row['start']}, {row['end']}] (duration={row['duration']})")
                print()
    
    # Example: Classic 3x3 Job Shop Problem (Fisher & Thompson)
    jobs_data = [
        [(0, 3), (1, 2), (2, 2)],  # Job 0
        [(0, 2), (2, 1), (1, 4)],  # Job 1
        [(1, 4), (2, 3)]           # Job 2
    ]
    
    jsp = JobShopScheduler(jobs_data)
    jsp.create_model()
    solution = jsp.solve(time_limit_seconds=10)
    jsp.print_schedule(solution)
    

    Gantt Chart Visualization

    import matplotlib.pyplot as plt
    import matplotlib.patches as patches
    import numpy as np
    
    def plot_gantt_chart(schedule_df, title="Production Schedule"):
        """
        Create Gantt chart visualization of production schedule
    
        Parameters:
        - schedule_df: DataFrame with columns [job, machine, start, end]
        """
    
        fig, ax = plt.subplots(figsize=(14, 8))
    
        # Assign colors to jobs
        jobs = schedule_df['job'].unique()
        colors = plt.cm.Set3(np.linspace(0, 1, len(jobs)))
        job_colors = {job: colors[i] for i, job in enumerate(jobs)}
    
        # Create bars for each task
        for idx, row in schedule_df.iterrows():
            ax.barh(
                y=row['machine'],
                width=row['end'] - row['start'],
                left=row['start'],
                height=0.8,
                color=job_colors[row['job']],
                edgecolor='black',
                linewidth=1.5,
                label=f"Job {row['job']}" if idx == 0 or
                      row['job'] not in schedule_df.iloc[:idx]['job'].values else ""
            )
    
            # Add job label on bar
            ax.text(
                (row['start'] + row['end']) / 2,
                row['machine'],
                f"J{row['job']}",
                ha='center',
                va='center',
                fontweight='bold',
                fontsize=10
            )
    
        # Configure axes
        machines = sorted(schedule_df['machine'].unique())
        ax.set_yticks(machines)
        ax.set_yticklabels([f'Machine {m}' for m in machines])
        ax.set_xlabel('Time', fontsize=12, fontweight='bold')
        ax.set_ylabel('Machine', fontsize=12, fontweight='bold')
        ax.set_title(title, fontsize=14, fontweight='bold')
    
        # Add grid
        ax.grid(True, axis='x', alpha=0.3, linestyle='--')
    
        # Add legend (unique jobs only)
        handles, labels = ax.get_legend_handles_labels()
        by_label = dict(zip(labels, handles))
        ax.legend(by_label.values(), by_label.keys(),
                  loc='upper right', fontsize=10)
    
        plt.tight_layout()
        return fig
    
    # Use with previous solution
    if solution:
        fig = plot_gantt_chart(solution['schedule'],
                              title=f"Job Shop Schedule (Makespan={solution['makespan']})")
        plt.show()
    

    Flow Shop Scheduling

    NEH Algorithm (Nawaz-Enscore-Ham)

    Best heuristic for permutation flow shop scheduling:

    import numpy as np
    import itertools
    
    class FlowShopNEH:
        """
        NEH Algorithm for Permutation Flow Shop Scheduling
        Minimize makespan
        """
    
        def __init__(self, processing_times):
            """
            processing_times: 2D array [n_jobs x m_machines]
            processing_times[j][m] = time for job j on machine m
            """
            self.processing_times = np.array(processing_times)
            self.n_jobs, self.n_machines = self.processing_times.shape
    
        def calculate_makespan(self, sequence):
            """
            Calculate makespan for given job sequence
    
            Parameters:
            - sequence: list of job indices
    
            Returns completion time (makespan)
            """
            n = len(sequence)
            m = self.n_machines
    
            # Completion times: C[j][m] = completion time of job j on machine m
            C = np.zeros((n, m))
    
            for j_idx, job in enumerate(sequence):
                for machine in range(m):
                    if j_idx == 0 and machine == 0:
                        # First job, first machine
                        C[j_idx][machine] = self.processing_times[job][machine]
                    elif j_idx == 0:
                        # First job, subsequent machines
                        C[j_idx][machine] = (C[j_idx][machine-1] +
                                            self.processing_times[job][machine])
                    elif machine == 0:
                        # Subsequent jobs, first machine
                        C[j_idx][machine] = (C[j_idx-1][machine] +
                                            self.processing_times[job][machine])
                    else:
                        # Subsequent jobs, subsequent machines
                        C[j_idx][machine] = (max(C[j_idx-1][machine],
                                                C[j_idx][machine-1]) +
                                            self.processing_times[job][machine])
    
            return C[n-1][m-1]  # Makespan = completion time of last job on last machine
    
        def neh_algorithm(self):
            """
            NEH heuristic algorithm
    
            Returns:
            - best_sequence: optimal job sequence
            - makespan: corresponding makespan
            """
    
            # Step 1: Calculate total processing time for each job
            total_times = self.processing_times.sum(axis=1)
    
            # Step 2: Sort jobs by descending total processing time
            sorted_jobs = np.argsort(-total_times)  # Descending order
    
            # Step 3: Build sequence iteratively
            sequence = [sorted_jobs[0]]  # Start with job with highest total time
    
            for job in sorted_jobs[1:]:
                best_makespan = float('inf')
                best_position = 0
    
                # Try inserting job at each position
                for pos in range(len(sequence) + 1):
                    trial_sequence = sequence[:pos] + [job] + sequence[pos:]
                    makespan = self.calculate_makespan(trial_sequence)
    
                    if makespan < best_makespan:
                        best_makespan = makespan
                        best_position = pos
    
                # Insert job at best position
                sequence.insert(best_position, job)
    
            final_makespan = self.calculate_makespan(sequence)
    
            return {
                'sequence': sequence,
                'makespan': final_makespan,
                'job_labels': [f'Job_{j}' for j in sequence]
            }
    
    # Example: 4 jobs x 3 machines
    processing_times = [
        [5, 3, 4],  # Job 0
        [3, 4, 2],  # Job 1
        [6, 2, 3],  # Job 2
        [4, 5, 1]   # Job 3
    ]
    
    fs = FlowShopNEH(processing_times)
    result = fs.neh_algorithm()
    
    print(f"Optimal Sequence: {result['job_labels']}")
    print(f"Makespan: {result['makespan']}")
    

    Flow Shop with Setup Times

    class FlowShopWithSetups:
        """
        Flow shop scheduling with sequence-dependent setup times
        """
    
        def __init__(self, processing_times, setup_times):
            """
            processing_times: 2D array [n_jobs x m_machines]
            setup_times: 3D array [n_jobs x n_jobs x m_machines]
                         setup_times[i][j][m] = setup time from job i to job j on machine m
            """
            self.processing_times = np.array(processing_times)
            self.setup_times = np.array(setup_times)
            self.n_jobs, self.n_machines = self.processing_times.shape
    
        def calculate_makespan_with_setups(self, sequence):
            """Calculate makespan including setup times"""
    
            n = len(sequence)
            m = self.n_machines
            C = np.zeros((n, m))
    
            for j_idx, job in enumerate(sequence):
                for machine in range(m):
                    # Get setup time
                    if j_idx == 0:
                        setup_time = 0  # No setup for first job
                    else:
                        prev_job = sequence[j_idx - 1]
                        setup_time = self.setup_times[prev_job][job][machine]
    
                    processing_time = self.processing_times[job][machine]
    
                    if j_idx == 0 and machine == 0:
                        C[j_idx][machine] = processing_time
                    elif j_idx == 0:
                        C[j_idx][machine] = C[j_idx][machine-1] + setup_time + processing_time
                    elif machine == 0:
                        C[j_idx][machine] = C[j_idx-1][machine] + setup_time + processing_time
                    else:
                        C[j_idx][machine] = (max(C[j_idx-1][machine], C[j_idx][machine-1]) +
                                            setup_time + processing_time)
    
            return C[n-1][m-1]
    
        def genetic_algorithm(self, population_size=50, generations=100,
                             mutation_rate=0.1, crossover_rate=0.8):
            """
            Genetic algorithm for flow shop with setups
    
            Returns best sequence found
            """
    
            # Initialize population with random sequences
            population = [np.random.permutation(self.n_jobs).tolist()
                         for _ in range(population_size)]
    
            best_solution = None
            best_makespan = float('inf')
    
            for gen in range(generations):
                # Evaluate fitness (lower makespan = better)
                fitness = []
                for seq in population:
                    makespan = self.calculate_makespan_with_setups(seq)
                    fitness.append(1.0 / makespan)  # Inverse for maximization
    
                    if makespan < best_makespan:
                        best_makespan = makespan
                        best_solution = seq.copy()
    
                # Selection: tournament selection
                new_population = []
                fitness = np.array(fitness)
    
                for _ in range(population_size):
                    # Tournament of size 3
                    tournament = np.random.choice(population_size, 3, replace=False)
                    winner = tournament[np.argmax(fitness[tournament])]
                    new_population.append(population[winner].copy())
    
                # Crossover: order crossover (OX)
                for i in range(0, population_size-1, 2):
                    if np.random.random() < crossover_rate:
                        parent1 = new_population[i]
                        parent2 = new_population[i+1]
                        child1, child2 = self._order_crossover(parent1, parent2)
                        new_population[i] = child1
                        new_population[i+1] = child2
    
                # Mutation: swap mutation
                for i in range(population_size):
                    if np.random.random() < mutation_rate:
                        new_population[i] = self._swap_mutation(new_population[i])
    
                population = new_population
    
            return {
                'sequence': best_solution,
                'makespan': best_makespan
            }
    
        def _order_crossover(self, parent1, parent2):
            """Order crossover (OX) for permutation"""
            size = len(parent1)
            start, end = sorted(np.random.choice(size, 2, replace=False))
    
            child1 = [-1] * size
            child2 = [-1] * size
    
            # Copy segment
            child1[start:end] = parent1[start:end]
            child2[start:end] = parent2[start:end]
    
            # Fill remaining from other parent
            self._fill_remaining(child1, parent2, start, end)
            self._fill_remaining(child2, parent1, start, end)
    
            return child1, child2
    
        def _fill_remaining(self, child, parent, start, end):
            """Helper for order crossover"""
            size = len(child)
            parent_idx = end
            child_idx = end
    
            while -1 in child:
                if parent[parent_idx % size] not in child:
                    child[child_idx % size] = parent[parent_idx % size]
                    child_idx += 1
                parent_idx += 1
    
        def _swap_mutation(self, sequence):
            """Swap two random positions"""
            seq = sequence.copy()
            i, j = np.random.choice(len(seq), 2, replace=False)
            seq[i], seq[j] = seq[j], seq[i]
            return seq
    
    # Example with setup times
    processing_times = [
        [5, 3, 4],
        [3, 4, 2],
        [6, 2, 3],
        [4, 5, 1]
    ]
    
    # Setup times: [from_job][to_job][machine]
    n_jobs, n_machines = 4, 3
    setup_times = np.random.randint(1, 3, size=(n_jobs, n_jobs, n_machines))
    # No setup from job to itself
    for i in range(n_jobs):
        setup_times[i][i][:] = 0
    
    fs_setup = FlowShopWithSetups(processing_times, setup_times)
    result = fs_setup.genetic_algorithm(population_size=50, generations=100)
    
    print(f"Best Sequence: {result['sequence']}")
    print(f"Makespan: {result['makespan']}")
    

    Advanced Scheduling Techniques

    Finite Capacity Scheduling (FCS)

    from datetime import datetime, timedelta
    import pandas as pd
    
    class FiniteCapacityScheduler:
        """
        Finite capacity scheduler with resource constraints
        Forward and backward scheduling
        """
    
        def __init__(self, work_centers):
            """
            work_centers: dict {wc_id: {'capacity_hours': X, 'efficiency': Y}}
            """
            self.work_centers = work_centers
            self.schedule = []
    
        def forward_schedule(self, orders, start_date):
            """
            Forward scheduling from start date
            Schedule as early as possible
    
            Parameters:
            - orders: list of dicts with 'order_id', 'routing', 'quantity'
            - routing is list of (work_center, hours_per_unit)
            """
    
            # Track work center availability
            wc_availability = {wc: start_date for wc in self.work_centers}
    
            # Sort orders by priority (if available) or order
            orders = sorted(orders, key=lambda x: x.get('priority', 0), reverse=True)
    
            results = []
    
            for order in orders:
                order_start = start_date
    
                for operation_idx, (wc, hours_per_unit) in enumerate(order['routing']):
                    # Calculate required hours
                    required_hours = order['quantity'] * hours_per_unit
                    efficiency = self.work_centers[wc]['efficiency']
                    actual_hours = required_hours / efficiency
    
                    # Earliest start is max of order flow and resource availability
                    operation_start = max(order_start, wc_availability[wc])
                    operation_end = operation_start + timedelta(hours=actual_hours)
    
                    # Update work center availability
                    wc_availability[wc] = operation_end
    
                    # Record operation
                    results.append({
                        'order_id': order['order_id'],
                        'operation': operation_idx,
                        'work_center': wc,
                        'start': operation_start,
                        'end': operation_end,
                        'hours': actual_hours
                    })
    
                    # Next operation starts after this one
                    order_start = operation_end
    
            return pd.DataFrame(results)
    
        def backward_schedule(self, orders, due_dates):
            """
            Backward scheduling from due dates
            Schedule as late as possible (just-in-time)
    
            Parameters:
            - orders: list of dicts with 'order_id', 'routing', 'quantity'
            - due_dates: dict {order_id: due_date}
            """
    
            results = []
    
            for order in orders:
                order_due = due_dates[order['order_id']]
    
                # Process routing in reverse
                routing = list(reversed(order['routing']))
    
                for operation_idx, (wc, hours_per_unit) in enumerate(routing):
                    required_hours = order['quantity'] * hours_per_unit
                    efficiency = self.work_centers[wc]['efficiency']
                    actual_hours = required_hours / efficiency
    
                    operation_end = order_due
                    operation_start = operation_end - timedelta(hours=actual_hours)
    
                    results.append({
                        'order_id': order['order_id'],
                        'operation': len(routing) - operation_idx - 1,
                        'work_center': wc,
                        'start': operation_start,
                        'end': operation_end,
                        'hours': actual_hours
                    })
    
                    order_due = operation_start
    
            return pd.DataFrame(results).sort_values(['order_id', 'operation'])
    
        def check_capacity_feasibility(self, schedule, planning_horizon):
            """
            Check if schedule is feasible given capacity constraints
    
            Returns overload periods
            """
    
            overloads = []
    
            for wc in self.work_centers:
                wc_schedule = schedule[schedule['work_center'] == wc]
                capacity = self.work_centers[wc]['capacity_hours']
    
                # Check each day/period
                current_date = schedule['start'].min().date()
                end_date = schedule['end'].max().date()
    
                while current_date <= end_date:
                    day_start = datetime.combine(current_date, datetime.min.time())
                    day_end = day_start + timedelta(days=1)
    
                    # Calculate load for this day
                    daily_ops = wc_schedule[
                        (wc_schedule['start'] < day_end) &
                        (wc_schedule['end'] > day_start)
                    ]
    
                    load = 0
                    for _, op in daily_ops.iterrows():
                        # Calculate overlap with this day
                        overlap_start = max(op['start'], day_start)
                        overlap_end = min(op['end'], day_end)
                        overlap_hours = (overlap_end - overlap_start).total_seconds() / 3600
                        load += overlap_hours
    
                    if load > capacity:
                        overloads.append({
                            'work_center': wc,
                            'date': current_date,
                            'load': load,
                            'capacity': capacity,
                            'overload': load - capacity
                        })
    
                    current_date += timedelta(days=1)
    
            return pd.DataFrame(overloads) if overloads else None
    
    # Example usage
    work_centers = {
        'Cutting': {'capacity_hours': 16, 'efficiency': 0.90},
        'Welding': {'capacity_hours': 16, 'efficiency': 0.85},
        'Assembly': {'capacity_hours': 16, 'efficiency': 0.92}
    }
    
    orders = [
        {
            'order_id': 'ORD001',
            'quantity': 100,
            'priority': 1,
            'routing': [('Cutting', 0.5), ('Welding', 0.8), ('Assembly', 1.0)]
        },
        {
            'order_id': 'ORD002',
            'quantity': 75,
            'priority': 2,
            'routing': [('Cutting', 0.6), ('Assembly', 1.2)]
        }
    ]
    
    fcs = FiniteCapacityScheduler(work_centers)
    
    # Forward schedule
    start_date = datetime(2025, 2, 1, 8, 0)
    forward_schedule = fcs.forward_schedule(orders, start_date)
    print("Forward Schedule:")
    print(forward_schedule)
    
    # Check capacity
    overloads = fcs.check_capacity_feasibility(forward_schedule, planning_horizon=30)
    if overloads is not None:
        print("\nCapacity Overloads:")
        print(overloads)
    else:
        print("\nSchedule is feasible - no capacity violations")
    

    Schedule Optimization with Genetic Algorithm

    import random
    
    class ScheduleOptimizer:
        """
        Genetic algorithm for multi-objective scheduling
        Minimize: makespan, tardiness, setup costs
        """
    
        def __init__(self, jobs, machines, setup_matrix):
            self.jobs = jobs  # List of job objects
            self.machines = machines
            self.setup_matrix = setup_matrix  # Setup time between jobs
    
        def create_individual(self):
            """Create random job sequence (chromosome)"""
            return random.sample(self.jobs, len(self.jobs))
    
        def calculate_fitness(self, sequence):
            """
            Multi-objective fitness function
            Lower is better
            """
            makespan = self._calculate_makespan(sequence)
            tardiness = self._calculate_tardiness(sequence)
            setup_cost = self._calculate_setup_cost(sequence)
    
            # Weighted sum (normalize and weight)
            fitness = (
                0.4 * (makespan / 100) +     # Normalize to reasonable scale
                0.4 * (tardiness / 100) +
                0.2 * (setup_cost / 50)
            )
    
            return {
                'fitness': fitness,
                'makespan': makespan,
                'tardiness': tardiness,
                'setup_cost': setup_cost
            }
    
        def _calculate_makespan(self, sequence):
            """Calculate total completion time"""
            time = 0
            for job in sequence:
                time += job['processing_time']
            return time
    
        def _calculate_tardiness(self, sequence):
            """Calculate total tardiness (late deliveries)"""
            time = 0
            total_tardiness = 0
    
            for job in sequence:
                time += job['processing_time']
                tardiness = max(0, time - job['due_date'])
                total_tardiness += tardiness
    
            return total_tardiness
    
        def _calculate_setup_cost(self, sequence):
            """Calculate total setup time/cost"""
            total_setup = 0
    
            for i in range(len(sequence) - 1):
                current_job = sequence[i]['id']
                next_job = sequence[i + 1]['id']
                total_setup += self.setup_matrix.get((current_job, next_job), 0)
    
            return total_setup
    
        def optimize(self, population_size=100, generations=200):
            """Run genetic algorithm"""
    
            # Initialize population
            population = [self.create_individual() for _ in range(population_size)]
    
            best_solution = None
            best_fitness = float('inf')
            history = []
    
            for generation in range(generations):
                # Evaluate fitness
                fitness_scores = []
                for individual in population:
                    metrics = self.calculate_fitness(individual)
                    fitness_scores.append(metrics['fitness'])
    
                    if metrics['fitness'] < best_fitness:
                        best_fitness = metrics['fitness']
                        best_solution = individual.copy()
                        best_metrics = metrics
    
                history.append({
                    'generation': generation,
                    'best_fitness': best_fitness,
                    'avg_fitness': sum(fitness_scores) / len(fitness_scores)
                })
    
                # Selection: tournament
                selected = []
                for _ in range(population_size):
                    tournament = random.sample(list(zip(population, fitness_scores)), 5)
                    winner = min(tournament, key=lambda x: x[1])[0]
                    selected.append(winner)
    
                # Crossover and mutation
                next_generation = []
                for i in range(0, population_size, 2):
                    parent1 = selected[i]
                    parent2 = selected[i + 1] if i + 1 < population_size else selected[0]
    
                    if random.random() < 0.8:  # Crossover rate
                        child1, child2 = self._pmx_crossover(parent1, parent2)
                    else:
                        child1, child2 = parent1.copy(), parent2.copy()
    
                    if random.random() < 0.2:  # Mutation rate
                        child1 = self._swap_mutation(child1)
                    if random.random() < 0.2:
                        child2 = self._swap_mutation(child2)
    
                    next_generation.extend([child1, child2])
    
                population = next_generation[:population_size]
    
            return {
                'best_sequence': [j['id'] for j in best_solution],
                'metrics': best_metrics,
                'history': pd.DataFrame(history)
            }
    
        def _pmx_crossover(self, parent1, parent2):
            """Partially Mapped Crossover for permutation"""
            size = len(parent1)
            child1 = [None] * size
            child2 = [None] * size
    
            # Select crossover points
            cx_point1, cx_point2 = sorted(random.sample(range(size), 2))
    
            # Copy segments
            child1[cx_point1:cx_point2] = parent1[cx_point1:cx_point2]
            child2[cx_point1:cx_point2] = parent2[cx_point1:cx_point2]
    
            # Fill remaining positions
            for i in range(size):
                if i < cx_point1 or i >= cx_point2:
                    # Fill from parent2 for child1
                    gene = parent2[i]
                    while gene in child1:
                        idx = parent2.index(gene)
                        if cx_point1 <= idx < cx_point2:
                            gene = parent1[idx]
                        else:
                            break
                    child1[i] = gene
    
                    # Fill from parent1 for child2
                    gene = parent1[i]
                    while gene in child2:
                        idx = parent1.index(gene)
                        if cx_point1 <= idx < cx_point2:
                            gene = parent2[idx]
                        else:
                            break
                    child2[i] = gene
    
            return child1, child2
    
        def _swap_mutation(self, sequence):
            """Swap two random jobs"""
            seq = sequence.copy()
            i, j = random.sample(range(len(seq)), 2)
            seq[i], seq[j] = seq[j], seq[i]
            return seq
    

    MRP and Production Scheduling Integration

    Material Requirements Planning (MRP)

    import pandas as pd
    import numpy as np
    from datetime import datetime, timedelta
    
    class MRPScheduler:
        """
        MRP logic with production scheduling
        Calculate material requirements and schedule production
        """
    
        def __init__(self, bom, inventory, lead_times):
            """
            Parameters:
            - bom: bill of materials dict {parent: [(child, quantity)]}
            - inventory: current on-hand inventory {item: quantity}
            - lead_times: production/procurement lead times {item: days}
            """
            self.bom = bom
            self.inventory = inventory
            self.lead_times = lead_times
    
        def explode_bom(self, item, quantity, level=0):
            """
            Recursively explode BOM
            Returns list of (item, quantity, level) tuples
            """
            requirements = [(item, quantity, level)]
    
            if item in self.bom:
                for component, qty_per in self.bom[item]:
                    requirements.extend(
                        self.explode_bom(component, quantity * qty_per, level + 1)
                    )
    
            return requirements
    
        def calculate_net_requirements(self, gross_requirements):
            """
            Calculate net requirements considering on-hand inventory
    
            Parameters:
            - gross_requirements: dict {item: quantity}
    
            Returns net requirements and planned orders
            """
            net_requirements = {}
            planned_orders = {}
    
            for item, gross_qty in gross_requirements.items():
                on_hand = self.inventory.get(item, 0)
                net_qty = max(0, gross_qty - on_hand)
    
                if net_qty > 0:
                    net_requirements[item] = net_qty
                    planned_orders[item] = net_qty
    
                    # Update inventory (consumption)
                    self.inventory[item] = max(0, on_hand - gross_qty)
                else:
                    # Sufficient inventory
                    self.inventory[item] = on_hand - gross_qty
    
            return net_requirements, planned_orders
    
        def time_phase_mrp(self, master_schedule, planning_horizon_days=90):
            """
            Time-phased MRP calculation
    
            Parameters:
            - master_schedule: list of dicts {'item', 'quantity', 'due_date'}
            - planning_horizon_days: planning horizon
    
            Returns time-phased requirements and production schedule
            """
    
            start_date = min(order['due_date'] for order in master_schedule)
            end_date = start_date + timedelta(days=planning_horizon_days)
    
            # Create time buckets (weekly)
            time_buckets = pd.date_range(start=start_date, end=end_date, freq='W')
    
            # Initialize requirements by time bucket
            all_items = set()
            for order in master_schedule:
                all_items.add(order['item'])
                requirements = self.explode_bom(order['item'], order['quantity'])
                for item, qty, level in requirements:
                    all_items.add(item)
    
            mrp_records = []
    
            for item in all_items:
                # Gross requirements by time bucket
                gross_req = pd.Series(0, index=time_buckets)
    
                for order in master_schedule:
                    if order['item'] == item:
                        # Find appropriate time bucket
                        bucket = time_buckets[time_buckets >= order['due_date']][0]
                        gross_req[bucket] += order['quantity']
                    else:
                        # Check if item is a component
                        requirements = self.explode_bom(order['item'], order['quantity'])
                        for req_item, qty, level in requirements:
                            if req_item == item:
                                # Offset by parent lead time
                                parent_lead = self.lead_times.get(order['item'], 0)
                                offset_date = order['due_date'] - timedelta(days=parent_lead)
                                if offset_date >= time_buckets[0]:
                                    bucket = time_buckets[time_buckets >= offset_date][0]
                                    gross_req[bucket] += qty
    
                # Calculate net requirements and planned orders
                on_hand = self.inventory.get(item, 0)
    
                for bucket in time_buckets:
                    gross = gross_req[bucket]
    
                    if gross > 0:
                        net = max(0, gross - on_hand)
    
                        if net > 0:
                            # Planned order release (offset by lead time)
                            lead_time = self.lead_times.get(item, 0)
                            release_date = bucket - timedelta(days=lead_time)
    
                            mrp_records.append({
                                'item': item,
                                'bucket': bucket,
                                'gross_requirement': gross,
                                'on_hand_start': on_hand,
                                'net_requirement': net,
                                'planned_order': net,
                                'release_date': release_date
                            })
    
                            on_hand = 0  # Inventory consumed
                        else:
                            # Sufficient inventory
                            mrp_records.append({
                                'item': item,
                                'bucket': bucket,
                                'gross_requirement': gross,
                                'on_hand_start': on_hand,
                                'net_requirement': 0,
                                'planned_order': 0,
                                'release_date': None
                            })
    
                            on_hand -= gross
    
            return pd.DataFrame(mrp_records)
    
    # Example usage
    bom = {
        'Product_A': [('Subassembly_X', 2), ('Component_Y', 4)],
        'Subassembly_X': [('Part_1', 3), ('Part_2', 1)],
    }
    
    inventory = {
        'Product_A': 10,
        'Subassembly_X': 5,
        'Component_Y': 20,
        'Part_1': 50,
        'Part_2': 30
    }
    
    lead_times = {
        'Product_A': 5,
        'Subassembly_X': 7,
        'Component_Y': 3,
        'Part_1': 10,
        'Part_2': 14
    }
    
    mrp = MRPScheduler(bom, inventory, lead_times)
    
    # Master production schedule
    master_schedule = [
        {'item': 'Product_A', 'quantity': 100, 'due_date': datetime(2025, 3, 1)},
        {'item': 'Product_A', 'quantity': 150, 'due_date': datetime(2025, 3, 15)},
    ]
    
    mrp_plan = mrp.time_phase_mrp(master_schedule)
    print(mrp_plan[mrp_plan['planned_order'] > 0])
    

    Performance Metrics

    Key Scheduling Metrics

    def calculate_scheduling_metrics(schedule_df, jobs_df):
        """
        Calculate comprehensive scheduling performance metrics
    
        Parameters:
        - schedule_df: DataFrame with actual schedule [job, start, end, machine]
        - jobs_df: DataFrame with job details [job, processing_time, due_date, priority]
    
        Returns dict of metrics
        """
    
        # Merge schedule with job details
        df = schedule_df.merge(jobs_df, on='job')
    
        # 1. Makespan
        makespan = df['end'].max() - df['start'].min()
    
        # 2. Mean flow time
        flow_times = df.groupby('job')['end'].max() - df.groupby('job')['start'].min()
        mean_flow_time = flow_times.mean()
    
        # 3. Tardiness
        completion_times = df.groupby('job')['end'].max()
        due_dates = df.groupby('job')['due_date'].first()
        tardiness = (completion_times - due_dates).clip(lower=0)
        total_tardiness = tardiness.sum()
        mean_tardiness = tardiness.mean()
        num_tardy = (tardiness > 0).sum()
    
        # 4. Utilization
        total_processing_time = df['processing_time'].sum()
        num_machines = df['machine'].nunique()
        utilization = (total_processing_time / (makespan * num_machines)) * 100
    
        # 5. WIP (Work in Process)
        # Average number of jobs in system
        all_times = sorted(set(df['start'].tolist() + df['end'].tolist()))
        wip_samples = []
    
        for t in all_times[:-1]:
            jobs_in_process = ((df['start'] <= t) & (df['end'] > t)).sum()
            wip_samples.append(jobs_in_process)
    
        avg_wip = np.mean(wip_samples) if wip_samples else 0
    
        # 6. On-time delivery rate
        otd_rate = ((tardiness == 0).sum() / len(tardiness)) * 100
    
        return {
            'makespan': makespan,
            'mean_flow_time': mean_flow_time,
            'total_tardiness': total_tardiness,
            'mean_tardiness': mean_tardiness,
            'num_tardy_jobs': num_tardy,
            'utilization_pct': utilization,
            'avg_wip': avg_wip,
            'on_time_delivery_pct': otd_rate
        }
    

    Tools & Libraries

    Python Libraries

    Optimization & Scheduling:

    • ortools: Google's constraint programming and optimization (CP-SAT solver)
    • pulp: Linear programming for scheduling problems
    • pyomo: Advanced optimization modeling
    • gekko: Dynamic optimization and scheduling
    • docplex: IBM CPLEX Python API
    • simanneal: Simulated annealing metaheuristic

    Simulation:

    • simpy: Discrete-event simulation for manufacturing
    • salabim: Animation and simulation
    • numpy: Numerical computations

    Visualization:

    • matplotlib, plotly: Gantt charts and schedules
    • seaborn: Statistical visualizations
    • gantt: Gantt chart library

    Commercial Scheduling Software

    Advanced Planning Systems (APS):

    • SAP APO (PP/DS): Production Planning & Detailed Scheduling
    • Oracle Advanced Supply Chain Planning (ASCP): Constraint-based planning
    • Blue Yonder (JDA): Manufacturing planning and scheduling
    • Kinaxis RapidResponse: Concurrent planning and scheduling
    • Dassault Systèmes DELMIA: Production scheduling and simulation

    Specialized Schedulers:

    • Asprova: APS for job shops and mixed-mode manufacturing
    • Preactor: Finite capacity scheduling (Siemens)
    • Opcenter APS: Advanced planning and scheduling (Siemens)
    • Ortems: APS for complex manufacturing
    • FELIOS: Manufacturing execution and scheduling

    MES (Manufacturing Execution Systems):

    • SAP ME/MII: Manufacturing execution
    • Rockwell FactoryTalk: Production scheduling module
    • Wonderware MES: Real-time scheduling
    • Parsec TrakSYS: MES with scheduling

    Common Challenges & Solutions

    Challenge: Schedule Nervousness

    Problem:

    • Frequent schedule changes
    • Disrupts shop floor
    • Reduces credibility of schedule

    Solutions:

    • Freeze zones (don't reschedule near-term)
    • Schedule stability metrics
    • Rolling horizon approach
    • Buffer times for uncertainty
    • Communication protocols for changes

    Challenge: Unexpected Disruptions

    Problem:

    • Machine breakdowns
    • Material shortages
    • Quality issues
    • Rush orders

    Solutions:

    • Dynamic rescheduling capability
    • Buffer capacity at bottlenecks
    • Alternative routings
    • Expediting procedures
    • Real-time monitoring (MES integration)
    • Reactive scheduling algorithms

    Challenge: Setup Time Optimization

    Problem:

    • Significant setup/changeover times
    • Trade-off between sequence optimization and due dates
    • Complex setup dependencies

    Solutions:

    • Campaign scheduling (batch similar jobs)
    • Setup time matrix optimization
    • Sequence-dependent scheduling algorithms
    • Setup time reduction (SMED - Single Minute Exchange of Die)
    • Parallel setups when possible

    Challenge: Multi-Objective Conflicts

    Problem:

    • Minimize makespan vs. meet due dates
    • Reduce WIP vs. keep machines busy
    • Conflicting priorities

    Solutions:

    • Clearly defined priority hierarchy
    • Weighted multi-objective functions
    • Pareto optimization (trade-off curves)
    • Simulation to test scenarios
    • Collaborative planning (S&OP)

    Challenge: Limited Capacity at Bottlenecks

    Problem:

    • Bottleneck machines constrain throughput
    • Non-bottleneck machines starved or blocked
    • System-wide impact

    Solutions:

    • Theory of Constraints (TOC) / Drum-Buffer-Rope
    • Protective buffers at bottleneck
    • Subordinate non-bottleneck schedules to bottleneck
    • Offload work from bottleneck (outsourcing, alternate routing)
    • Increase bottleneck capacity (overtime, equipment)

    Challenge: Data Quality and Accuracy

    Problem:

    • Inaccurate processing times
    • Wrong routings in system
    • Outdated lead times
    • Poor schedule performance

    Solutions:

    • Regular data audits and updates
    • Feedback from shop floor (MES)
    • Statistical analysis of actual vs. standard times
    • Data governance processes
    • Learning algorithms to adjust parameters

    Output Format

    Production Schedule Report

    Executive Summary:

    • Schedule period and horizon
    • Key performance metrics (makespan, utilization, OTD)
    • Critical bottlenecks or constraints
    • Schedule changes from prior version

    Detailed Schedule by Work Center:

    Work Center Job Operation Start Time End Time Duration Setup Time Status
    Cutting J001 OP10 2025-02-01 08:00 2025-02-01 11:30 3.5h 0.5h Scheduled
    Cutting J003 OP10 2025-02-01 12:00 2025-02-01 16:00 4.0h 0.5h Scheduled
    Welding J001 OP20 2025-02-01 12:00 2025-02-01 18:00 6.0h 1.0h Scheduled

    Capacity Analysis:

    Work Center Available Hours Scheduled Hours Utilization % Overload Hours
    Cutting 160 152 95% 0
    Welding 160 168 105% 8
    Assembly 160 128 80% 0

    Schedule Performance Metrics:

    • Makespan: 12 days
    • Mean Flow Time: 6.5 days
    • Total Tardiness: 45 hours
    • On-Time Delivery: 87%
    • Average WIP: 23 jobs
    • Machine Utilization: 88%

    Gantt Chart: [Visual representation of schedule]

    Action Items:

    • Welding work center overloaded in Week 2 - recommend overtime or outsourcing
    • Jobs J005, J012, J018 at risk of missing due dates
    • Setup time on Cutting can be reduced by sequencing similar jobs together
    • Material for J023 not available until Feb 5 - schedule hold

    Questions to Ask

    If you need more context:

    1. What type of manufacturing environment? (job shop, flow shop, batch, continuous)
    2. How many machines/work centers? What are the routings?
    3. What are the key scheduling objectives? (makespan, due dates, WIP, utilization)
    4. Are there setup times or changeover constraints?
    5. What is the current scheduling method and known issues?
    6. What is the planning horizon and replan frequency?
    7. Are there resource constraints beyond machines (labor, tooling, materials)?
    8. Integration requirements with ERP/MES systems?

    Related Skills

    • master-production-scheduling: For MPS and demand-driven scheduling
    • capacity-planning: For capacity analysis and requirements planning
    • job-shop-scheduling: For detailed JSP algorithms
    • flow-shop-scheduling: For flow shop specific methods
    • assembly-line-balancing: For line balancing and takt time
    • lean-manufacturing: For pull scheduling and JIT
    • inventory-optimization: For WIP and buffer stock optimization
    • optimization-modeling: For mathematical scheduling models
    • constraint-programming: For advanced constraint-based scheduling
    Recommended Servers
    Servicem8
    Servicem8
    Calendly
    Calendly
    Ticktick
    Ticktick
    Repository
    kishorkukreja/awesome-supply-chain
    Files