Module MAPLEAF.SimulationRunners.SingleSimulations

Expand source code
import os
import sys
from copy import deepcopy
from distutils.util import strtobool

from MAPLEAF.ENV import Environment
from MAPLEAF.IO import (Logging, Plotting, RocketFlight, SimDefinition,
                        SubDictReader)
from MAPLEAF.Motion import Vector
from MAPLEAF.Rocket import Rocket
from tqdm import tqdm

__all__ = [ "Simulation", "runSimulation", "WindTunnelSimulation", "loadSimDefinition" ]

def loadSimDefinition(simDefinitionFilePath=None, simDefinition=None, silent=False):
    ''' Loads a simulation definition file into a `MAPLEAF.IO.SimDefinition` object - accepts either a file path or a `MAPLEAF.IO.SimDefinition` object as input '''
    if simDefinition == None and simDefinitionFilePath != None:
        return SimDefinition(simDefinitionFilePath, silent=silent) # Parse simulation definition file

    elif simDefinition != None:
        return simDefinition # Use the SimDefinition that was passed in

    else:
        raise ValueError(""" Insufficient information to initialize a Simulation.
            Please provide either simDefinitionFilePath (string) or fW (SimDefinition), which has been created from the desired Sim Definition file.
            If both are provided, the SimDefinition is used.""")

class Simulation():

    def __init__(self, simDefinitionFilePath=None, simDefinition=None, silent=False):
        '''
            Inputs:
                
                * simDefinitionFilePath:  (string) path to simulation definition file  
                * fW:                     (`MAPLEAF.IO.SimDefinition`) object that's already loaded and parsed the desired sim definition file  
                * silent:                 (bool) toggles optional outputs to the console  
        '''
        self.simDefinition = loadSimDefinition(simDefinitionFilePath, simDefinition, silent)
        ''' Instance of `MAPLEAF.IO.SimDefinition`. Defines the current simulation '''

        self.environment = Environment(self.simDefinition, silent=silent)
        ''' Instance of `MAPLEAF.ENV.Environment`. Will be shared by all Rockets created by this sim runner '''

        self.stagingIndex = None # Set in self.createRocket
        ''' (int) Set in `Simulation.createRocket`. Tracks how many stages have been dropped '''

        self.silent = silent
        ''' (bool) '''

        self.loggingLevel = int(self.simDefinition.getValue("SimControl.loggingLevel"))
        self.computeStageDropPaths = strtobool(self.simDefinition.getValue("SimControl.StageDropPaths.compute"))

    def run(self, rocket=None):
        ''' 
            Runs simulation defined by self.simDefinition (which has parsed a simulation definition file)

            Returns:
                * stageFlightsPaths: (list[`MAPLEAF.IO.RocketFlight.RocketFlight`]) Each RocketFlight object represents the flight path of a single stage
                * logFilePaths: (list[string]) list of paths to all log files created by this simulation
        '''
        simDefinition = self.simDefinition

        # Initialize the rocket + environment models and simulation logging
        if rocket == None:
            rocket = self.createRocket() # Initialize rocket on launch pad, with all stages attached
        self.rocketStages = [ rocket ] # In this array, 'stage' means independent rigid bodies. Stages are initialized as new rocket objects and added once they are dropped from the main rocket

        # Create progress bar if appropriate
        if simDefinition.getValue("SimControl.EndCondition") == "Time":
            endTime = float(simDefinition.getValue("SimControl.EndConditionValue"))
            progressBar = tqdm(total=endTime+0.01)
            
            try:
                self.logger.continueWritingToTerminal = False
            except AttributeError:
                pass # Logging not set up for this sim
        else:
            progressBar = None

        #### Main Loop Setup #### 
        self.dts = [ float(simDefinition.getValue("SimControl.timeStep")) ]    # (Initial) time step size
        self.endDetectors = [ self._getEndDetectorFunction(rocket, simDefinition) ] # List contains a boolean function that controls sim termination for each stage
        self.stageFlightPaths = [ self._setUpCachingForFlightAnimation(rocket) ] # List will contain resulting flight paths for each stage

        if(rocket.hardwareInTheLoopControl == "yes"):
            print("Setting up hardware in the loop interface")
            rocket.hilInterface.setupHIL(self.rocketStages[0].rigidBody.state)

        #### Main Loop ####
        stageIndex = 0
        while stageIndex < len(self.rocketStages):

            if stageIndex > 0:
                print("Computing stage {} drop path".format(stageIndex))

            rocket = self.rocketStages[stageIndex]
            endDetector = self.endDetectors[stageIndex]
            flight = self.stageFlightPaths[stageIndex]

            endSimulation, FinalTimeStepDt = endDetector(self.dts[stageIndex])
                
            while not endSimulation:
                ### Take a time step ###
                try:
                    if FinalTimeStepDt != None:
                        print("Simulation Runner overriding time step from {} to {} to accurately meet end condition".format(self.dts[stageIndex], FinalTimeStepDt))
                        self.dts[stageIndex] = FinalTimeStepDt

                    integrationResult = rocket.timeStep(self.dts[stageIndex])

                    if stageIndex == 0: # Currently, progress bar only works for bottom stage
                        try:
                            progressBar.update(integrationResult.dt)
                        except AttributeError:
                            pass
                except:
                    try:
                        progressBar.close()
                
                        if not self.silent:
                            sys.stdout.continueWritingToTerminal = True # sys.stdout is an instance of MAPLEAF.IO.Logging.Logger
                    except AttributeError:
                        pass

                    # Save simulation results and print out stack trace                    
                    self._handleSimulationCrash()

                # Adjust time step size for next iteration
                self.dts[stageIndex] = integrationResult.dt * integrationResult.timeStepAdaptationFactor

                # HIL
                if(rocket.hardwareInTheLoopControl == "yes"):
                    rocket.hilInterface.performHIL(rocket.rigidBody.state,rocket.rigidBody.time)

                # Cache states for flight animation
                self.cacheState(rocket, flight)

                # Check whether we should end the simulation, or take a modified-size final time step    
                endSimulation, FinalTimeStepDt = endDetector(self.dts[stageIndex])
            
            # Log last state (would be the starting state of the next time step)
            rocket._logState()

            # Move on to next (dropped) stage
            stageIndex += 1

            try:
                progressBar.close()
                
                if not self.silent:
                    sys.stdout.continueWritingToTerminal = True # Actually editing a MAPLEAF.IO.Logging.Logger object here
            except AttributeError:
                pass

        print("Simulation Complete")

        logFilePaths = self._postProcess(simDefinition)

        return self.stageFlightPaths, logFilePaths

    #### Pre-sim ####
    def createRocket(self, stage=None):
        ''' 
            Initializes a rocket, complete with an Environment object and logs, both owned by the instance of this class
            Returns an instance of Rocket with it's Environment/Logs initialized. Can be called by external classes to obtain a prepped rocket (used a lot this way in test cases).
        '''
        # Initialize Rocket
        rocketDictReader = SubDictReader("Rocket", self.simDefinition)  
        rocket = Rocket(rocketDictReader, silent=self.silent, stageToInitialize=stage, simRunner=self, environment=self.environment)       # Initialize Rocket

        if self.simDefinition.getValue('SimControl.RocketPlot') in [ 'On', 'on' ]:
            rocket.plotShape()  # Reference to this simRunner used to add to logs

        if stage == None:
            self._setUpConsoleLogging()
            self.stagingIndex = 0 # Initially zero, after dropping first stage: 1, after dropping second stage: 2, etc...
            
        return rocket

    def _setUpConsoleLogging(self):
        if self.loggingLevel > 0:
            # Set up logging so that the output of any print calls after this point is captured in mainSimulationLog
            self.consoleOutputLog = []
            if self.silent:
                self.logger = Logging.Logger(self.consoleOutputLog, continueWritingToTerminal=False)
            else:
                self.logger = Logging.Logger(self.consoleOutputLog)
            sys.stdout = self.logger
            
            # Output system info to console and to log
            Logging.getSystemInfo(printToConsole=True)
            # Output sim definition file and default value dict to the log only
            self.consoleOutputLog += Logging.getSimDefinitionAndDefaultValueDictsForOutput(simDefinition=self.simDefinition, printToConsole=False)

            # Output header for data outputted to the console during the simulation
            print("Starting Simulation:")
            print("Time(s) Altitude(m,ASL)")
                
        elif self.silent:
            # No intention of writing things to a log file, just prevent them from being printed to the terminal
            _ = []
            logger = Logging.Logger(_, continueWritingToTerminal=False)
            sys.stdout = logger

    def _getEndDetectorFunction(self, rocket, simConfig, droppedStage=False):
        ''' 
            Returns a function, which returns a boolean value and Union[None, float], indicating whether the 
                simulation endpoint has been reached. When close to the end of a sim, the second value returned is the recommended
                time step to take to hit the end criteria.
                Simulation end criteria defined in sim definition file. 
            Rocket object must be passed in because the end condition functions require a reference to the rocket, 
                so they can access its current altitude/velocity/time attributes
        '''
        # Read desired end criteria from simulation definition file
        if not droppedStage:
            # Get end condition for main stage
            endCondition = simConfig.getValue("SimControl.EndCondition")
            conditionValue = float(simConfig.getValue("SimControl.EndConditionValue"))
        else:
            # Get end condition for dropped stages
            endCondition = simConfig.getValue("SimControl.StageDropPaths.endCondition")
            conditionValue = float(simConfig.getValue("SimControl.StageDropPaths.endConditionValue"))

        # Define all possible end-detector functions
        def isAfterApogee(dt):
            return rocket.rigidBody.state.velocity.Z <= 0 and rocket.rigidBody.time > 1.0, None
        def isAboveAltitude(dt):
            return rocket.rigidBody.state.position.Z >= conditionValue, None
        def isBelowAltitude(dt):
            return rocket.environment.earthModel.getAltitude(*rocket.rigidBody.state.position) <= conditionValue, None
        def EndTimeReached(dt):
            currTime = rocket.rigidBody.time
            if currTime < conditionValue and currTime + dt >= conditionValue:
                return False, conditionValue+1e-14-currTime
            elif currTime >= conditionValue:
                return True, None
            else:
                return False, None

        # Return the desired function
        if endCondition == "Apogee":
            return isAfterApogee
        elif endCondition == "Altitude" and rocket.rigidBody.state.position.Z < conditionValue:
            return isAboveAltitude
        elif endCondition == "Altitude":
            return isBelowAltitude
        else:
            return EndTimeReached

    def _setUpCachingForFlightAnimation(self, rocket):
        flight = RocketFlight()
        flight.times.append(rocket.rigidBody.time)
        flight.rigidBodyStates.append(rocket.rigidBody.state)
        if rocket.controlSystem != None and rocket.controlSystem.controlledSystem != None: 
            # If rocket has moving fins, record their angles for plotting
            nActuators = len(rocket.controlSystem.controlledSystem.actuatorList)
            flight.actuatorDefls = [ [0] for i in range(nActuators) ]
            flight.actuatorTargetDefls = [ [0] for i in range(nActuators) ]
        else:
            flight.actuatorDefls = None
            flight.actuatorTargetDefls = None

        return flight

    #### During sim ####
    def cacheState(self, rocket: Rocket, flight: RocketFlight):
        ''' Adds the rocket's current state to the flight object '''
        time = rocket.rigidBody.time
        flight.times.append(time)
        flight.rigidBodyStates.append(rocket.rigidBody.state)
        if rocket.controlSystem != None:
            try:
                for a in range(len(rocket.controlSystem.controlledSystem.actuatorList)):
                    flight.actuatorDefls[a].append(rocket.controlSystem.controlledSystem.actuatorList[a].getDeflection(time))
                    flight.actuatorTargetDefls[a].append(rocket.controlSystem.controlledSystem.actuatorList[a].targetDeflection)
            except AttributeError:
                # Expecting to arrive here when timestepping a dropped stage of a controlled rocket, which doesn't have canards
                pass
                
    def createNewDetachedStage(self):
        ''' Called by Rocket._stageSeparation '''
        if self.computeStageDropPaths:
            newDetachedStage = self.createRocket(stage=self.stagingIndex)
            # Set kinematic properties to match those of the current top-most stage
            topStage = self.rocketStages[0]
            newDetachedStage.rigidBody.state = deepcopy(topStage.rigidBody.state)
            newDetachedStage.rigidBody.time = topStage.rigidBody.time
            self.rocketStages.append(newDetachedStage)

            # New sim termination condition function
            self.endDetectors.append(self._getEndDetectorFunction(newDetachedStage, self.simDefinition, droppedStage=True))
            self.dts.append(self.dts[0])
            
            # Duplicate existing flight object
            newFlightObject = deepcopy(self.stageFlightPaths[0]) # Will have had the same flight path as the top stage until the moment of separation
            newFlightObject.actuatorDefls = None # Dropped stage shouldn't have any canard deflections
            self.stageFlightPaths.append(newFlightObject)

            self.stagingIndex += 1

    def discardForceLogsForPreviousTimeStep(self, integrator):
        if self.loggingLevel >= 2:
            # Figure out how many times this integrator evaluates a function derivative (rocket forces in our case)
            if integrator.method == "RK12Adaptive":
                numDerivativeEvals = 2
            else:
                numDerivativeEvals = len(integrator.tableau)-1

            # Remove that number of rows from the end of the force evaluation log
            for i in range(numDerivativeEvals):
                for rocket in self.rocketStages:
                    rocket.derivativeEvaluationLog.deleteLastRow()

    def _handleSimulationCrash(self):
        ''' After a simulation crashes, tries to create log files and show plots anyways, before printing a stack trace '''
        print("ERROR: Simulation Crashed, Aborting")
        print("Attempting to save log files and show plots")
        self._postProcess(self.simDefinition)

        # Try to print out the stack trace
        print("Attempting to show stack trace")
        import traceback
        tb = traceback.format_exc()
        print(tb)

        print("Exiting")
        sys.exit()

    #### Post-sim ####
    def _postProcess(self, simDefinition):
        simDefinition.printDefaultValuesUsed() # Print these out before logging, to include them in the log

        # Log results
        logFilePaths = self._logSimulationResults(simDefinition)

        # Transfer key time info to flight objects from rocket
        for i in range(len(self.rocketStages)):
            self.stageFlightPaths[i].engineOffTime = self.rocketStages[i].engineShutOffTime
            self.stageFlightPaths[i].mainChuteDeployTime = self.rocketStages[i].mainChuteDeployTime
            self.stageFlightPaths[i].targetLocation = self.rocketStages[i].targetLocation

        # Plot results
        self._plotSimulationResults(self.rocketStages, simDefinition, self.stageFlightPaths, logFilePaths)

        # Print these out after logging to avoid including the log/plot keys in the unused keys
        # #TODO: Add exceptions for these keys, move this line to before logging so that it's output is also included in the simulation log
        simDefinition.printUnusedKeys()

        return logFilePaths

    def _logSimulationResults(self, simDefinition):
        ''' Logs simulation results to file (as/if specified in sim definition) '''
        logFilePaths = None
        if self.loggingLevel > 0:
            logFilePaths = []

            # Create a new folder for the results of the current simulation
            periodIndex = simDefinition.fileName.rfind('.')
            resultsFolderBaseName = simDefinition.fileName[:periodIndex] + "_Run"

            def tryCreateResultsFolder(resultsFolderBaseName):
                resultsFolderName = Logging.findNextAvailableNumberedFileName(fileBaseName=resultsFolderBaseName, extension="")
                
                try:
                    os.mkdir(resultsFolderName)
                    return resultsFolderName

                except FileExistsError:
                    # End up here if another process created the same results folder 
                        # (other thread runs os.mkdir b/w when this thread runs findNextAvailableNumberedFileName and os.mkdir)
                        # Should only happen during parallel runs
                    return ""

            createdResultsFolder = tryCreateResultsFolder(resultsFolderBaseName)
            iterations = 0
            while createdResultsFolder == "" and iterations < 50:
                createdResultsFolder = tryCreateResultsFolder(resultsFolderBaseName)
                iterations += 1

            if iterations == 50:
                raise ValueError("Repeated error (50x): unable to create a results folder: {}.".format(resultsFolderBaseName))

            # Write logs to file
            for rocket in self.rocketStages:
                logFilePaths += rocket.writeLogsToFile(createdResultsFolder)            

            # Output console output
            consoleOutputPath = os.path.join(createdResultsFolder, "consoleOutput.txt")
            print("Writing log file: {}".format(consoleOutputPath))
            with open(consoleOutputPath, 'w+') as file:
                file.writelines(self.consoleOutputLog)

        return logFilePaths

    def _plotSimulationResults(self, rocketStages, simDefinition, flights, logFilePaths):
        ''' Plot simulation results (as/if specified in sim definition) '''

        plotsToMake = simDefinition.getValue("SimControl.plot").split()

        if plotsToMake != ["None"]:

            if "FlightAnimation" in plotsToMake:
                print("Showing flight animation")

                # Show animation
                Plotting.flightAnimation(flights)

                # Done, remove from plotsToMake
                plotsToMake.remove("FlightAnimation")

            if "FlightPaths" in plotsToMake:
                earthModel = self.simDefinition.getValue("Environment.EarthModel")
                if earthModel in [ "None", "Flat" ]:
                    Plotting.plotFlightPaths_NoEarth(flights)
                else:
                    Plotting.plotFlightPaths_FullEarth(flights)

                plotsToMake.remove("FlightPaths")

            # Plot all other columns from log files
            for plotDefinitionString in plotsToMake:
                Plotting.plotFromLogFiles(logFilePaths, plotDefinitionString)

def runSimulation(simDefinitionFilePath=None, simDefinition=None, silent=False):
    sim = Simulation(simDefinitionFilePath, simDefinition, silent)
    return sim.run()

class WindTunnelSimulation(Simulation):
    def __init__(self, parametersToSweep=None, parameterValues=None, simDefinitionFilePath=None, simDefinition=None, silent=False, smoothLine='False'):
        self.parametersToSweep = ["Rocket.velocity"] if (parametersToSweep == None) else parametersToSweep
        self.parameterValues = [["(0 0 100)", "(0 0 200)", "(0 0 300)"]] if (parameterValues == None) else parameterValues
        self.smoothLine = smoothLine

        # Error checks
        if len(self.parametersToSweep) != len(self.parameterValues):
            raise ValueError("Must have a list of values for each parameter to sweep over. Currently have {} parameters, and {} lists of values.".format(len(self.parametersToSweep), len(self.parameterValues)))

        paramValueCounts = [ len(x) for x in self.parameterValues ]
        if not all([ (x == paramValueCounts[0]) for x in paramValueCounts ]):
            raise ValueError("All lists of parameter values must be of equal length. Currently, list lengths are: {}".format(paramValueCounts))

        Simulation.__init__(self, simDefinitionFilePath, simDefinition, silent)

    def runSweep(self):
        '''
            Runs a single force evaluation for each value of self.parameterToSweekKey contained in self.parameterValueList
            Returns:
                List of log file paths. Main Sim Log will be empty. Force eval log and expanded force eval log will have one entry per sweep point
        '''
        # Make sure we're logging forces (the only way to get data out of this sim runner) and avoid producing any plots
        self.simDefinition.setValue("SimControl.loggingLevel", "3")
        self.simDefinition.setValue("SimControl.plot", "None") 
        self.simDefinition.setValue("SimControl.RocketPlot", "Off")

        if strtobool(self.smoothLine): # interpolate between user set parameter values
            self._addPoints() 

        for i in range(len(self.parameterValues[0])): # i corresponds to # of conditions, ie how many times parameter values will be changed (velocity1, velocity2, ...)
            for j in range(len(self.parameterValues)): # j'th parameter (velocity, temperature)
                self.simDefinition.setValue(self.parametersToSweep[j], self.parameterValues[j][i])

            # Run a single force evaluation, which creates a forces log entry for this force evaluation
            rocket = self.createRocket()

            # Track all derivative evaluations in a single log
            if i == 0:
                log = rocket.derivativeEvaluationLog
            else:
                rocket.derivativeEvaluationLog = log

            self.rocketStages = [ rocket ]
            rocket._getAppliedForce(0.0, rocket.rigidBody.state)

        # Write Logs to file, return path
        return self._postProcess()

    def _addPoints(self, pointMultiple=10):
        ''' Edits the parameter sweeps to include a multiple of the previous number of points, linearly interpolated between the given values '''
        for i in range(len(self.parameterValues[0]) - 1): # i corresponds to # of tests to run
            for interpPt in range(1, pointMultiple): # Loops over each new point
                
                intervalStartIndex = pointMultiple*i
                # Index at which to add new point (also interval end index)
                newPointIndex = pointMultiple*i + interpPt

                for param in range(len(self.parametersToSweep)): # j'th corresponds to parameters to sweep over (velocity, temperature)
                    try:
                        # Vector sweep
                        first = Vector(self.parameterValues[param][intervalStartIndex])
                        second = Vector(self.parameterValues[param][newPointIndex])
                        change = second - first
                        incrementalValue = str(interpPt/pointMultiple*change + first)
                    except ValueError:
                        # Scalar sweep
                        first = float(self.parameterValues[param][intervalStartIndex])
                        second = float(self.parameterValues[param][newPointIndex])
                        change = second - first
                        incrementalValue = str(interpPt/pointMultiple*change + first)
                    
                    self.parameterValues[param].insert(newPointIndex, incrementalValue)

    def createRocket(self):
        ''' 
            Do all the same stuff as the parent object, but also re-initialize the environment, 
                to make sure changes to environmental properties during the parameter sweep take effect 
        '''
        self.environment = Environment(self.simDefinition, silent=self.silent)
        return super().createRocket()

    def _setUpLogging(self):
        ''' Override to ensure that logs aren't re-initialized for every simulation.
            mainSimulationLog will only be absent the first time this function is run
            Want to keep all the force data in a single log file '''
        if not hasattr(self, 'consoleOutputLog'):
            return super()._setUpConsoleLogging()

    def _postProcess(self):
        ''' Creates an empty flight path object to prevent errors in the parent function, which is still run to create log files.
            Removes mainSimLog from (returned) log file paths since no time steps we taken by this sim '''
        # Create an empty flight path to prevent errors in the parent function)
        self.stageFlightPaths = [ RocketFlight() ]
        return Simulation._postProcess(self, self.simDefinition)

Functions

def loadSimDefinition(simDefinitionFilePath=None, simDefinition=None, silent=False)

Loads a simulation definition file into a SimDefinition object - accepts either a file path or a SimDefinition object as input

Expand source code
def loadSimDefinition(simDefinitionFilePath=None, simDefinition=None, silent=False):
    ''' Loads a simulation definition file into a `MAPLEAF.IO.SimDefinition` object - accepts either a file path or a `MAPLEAF.IO.SimDefinition` object as input '''
    if simDefinition == None and simDefinitionFilePath != None:
        return SimDefinition(simDefinitionFilePath, silent=silent) # Parse simulation definition file

    elif simDefinition != None:
        return simDefinition # Use the SimDefinition that was passed in

    else:
        raise ValueError(""" Insufficient information to initialize a Simulation.
            Please provide either simDefinitionFilePath (string) or fW (SimDefinition), which has been created from the desired Sim Definition file.
            If both are provided, the SimDefinition is used.""")
def runSimulation(simDefinitionFilePath=None, simDefinition=None, silent=False)
Expand source code
def runSimulation(simDefinitionFilePath=None, simDefinition=None, silent=False):
    sim = Simulation(simDefinitionFilePath, simDefinition, silent)
    return sim.run()

Classes

class Simulation (simDefinitionFilePath=None, simDefinition=None, silent=False)

Inputs

  • simDefinitionFilePath: (string) path to simulation definition file
  • fW: (SimDefinition) object that's already loaded and parsed the desired sim definition file
  • silent: (bool) toggles optional outputs to the console
Expand source code
class Simulation():

    def __init__(self, simDefinitionFilePath=None, simDefinition=None, silent=False):
        '''
            Inputs:
                
                * simDefinitionFilePath:  (string) path to simulation definition file  
                * fW:                     (`MAPLEAF.IO.SimDefinition`) object that's already loaded and parsed the desired sim definition file  
                * silent:                 (bool) toggles optional outputs to the console  
        '''
        self.simDefinition = loadSimDefinition(simDefinitionFilePath, simDefinition, silent)
        ''' Instance of `MAPLEAF.IO.SimDefinition`. Defines the current simulation '''

        self.environment = Environment(self.simDefinition, silent=silent)
        ''' Instance of `MAPLEAF.ENV.Environment`. Will be shared by all Rockets created by this sim runner '''

        self.stagingIndex = None # Set in self.createRocket
        ''' (int) Set in `Simulation.createRocket`. Tracks how many stages have been dropped '''

        self.silent = silent
        ''' (bool) '''

        self.loggingLevel = int(self.simDefinition.getValue("SimControl.loggingLevel"))
        self.computeStageDropPaths = strtobool(self.simDefinition.getValue("SimControl.StageDropPaths.compute"))

    def run(self, rocket=None):
        ''' 
            Runs simulation defined by self.simDefinition (which has parsed a simulation definition file)

            Returns:
                * stageFlightsPaths: (list[`MAPLEAF.IO.RocketFlight.RocketFlight`]) Each RocketFlight object represents the flight path of a single stage
                * logFilePaths: (list[string]) list of paths to all log files created by this simulation
        '''
        simDefinition = self.simDefinition

        # Initialize the rocket + environment models and simulation logging
        if rocket == None:
            rocket = self.createRocket() # Initialize rocket on launch pad, with all stages attached
        self.rocketStages = [ rocket ] # In this array, 'stage' means independent rigid bodies. Stages are initialized as new rocket objects and added once they are dropped from the main rocket

        # Create progress bar if appropriate
        if simDefinition.getValue("SimControl.EndCondition") == "Time":
            endTime = float(simDefinition.getValue("SimControl.EndConditionValue"))
            progressBar = tqdm(total=endTime+0.01)
            
            try:
                self.logger.continueWritingToTerminal = False
            except AttributeError:
                pass # Logging not set up for this sim
        else:
            progressBar = None

        #### Main Loop Setup #### 
        self.dts = [ float(simDefinition.getValue("SimControl.timeStep")) ]    # (Initial) time step size
        self.endDetectors = [ self._getEndDetectorFunction(rocket, simDefinition) ] # List contains a boolean function that controls sim termination for each stage
        self.stageFlightPaths = [ self._setUpCachingForFlightAnimation(rocket) ] # List will contain resulting flight paths for each stage

        if(rocket.hardwareInTheLoopControl == "yes"):
            print("Setting up hardware in the loop interface")
            rocket.hilInterface.setupHIL(self.rocketStages[0].rigidBody.state)

        #### Main Loop ####
        stageIndex = 0
        while stageIndex < len(self.rocketStages):

            if stageIndex > 0:
                print("Computing stage {} drop path".format(stageIndex))

            rocket = self.rocketStages[stageIndex]
            endDetector = self.endDetectors[stageIndex]
            flight = self.stageFlightPaths[stageIndex]

            endSimulation, FinalTimeStepDt = endDetector(self.dts[stageIndex])
                
            while not endSimulation:
                ### Take a time step ###
                try:
                    if FinalTimeStepDt != None:
                        print("Simulation Runner overriding time step from {} to {} to accurately meet end condition".format(self.dts[stageIndex], FinalTimeStepDt))
                        self.dts[stageIndex] = FinalTimeStepDt

                    integrationResult = rocket.timeStep(self.dts[stageIndex])

                    if stageIndex == 0: # Currently, progress bar only works for bottom stage
                        try:
                            progressBar.update(integrationResult.dt)
                        except AttributeError:
                            pass
                except:
                    try:
                        progressBar.close()
                
                        if not self.silent:
                            sys.stdout.continueWritingToTerminal = True # sys.stdout is an instance of MAPLEAF.IO.Logging.Logger
                    except AttributeError:
                        pass

                    # Save simulation results and print out stack trace                    
                    self._handleSimulationCrash()

                # Adjust time step size for next iteration
                self.dts[stageIndex] = integrationResult.dt * integrationResult.timeStepAdaptationFactor

                # HIL
                if(rocket.hardwareInTheLoopControl == "yes"):
                    rocket.hilInterface.performHIL(rocket.rigidBody.state,rocket.rigidBody.time)

                # Cache states for flight animation
                self.cacheState(rocket, flight)

                # Check whether we should end the simulation, or take a modified-size final time step    
                endSimulation, FinalTimeStepDt = endDetector(self.dts[stageIndex])
            
            # Log last state (would be the starting state of the next time step)
            rocket._logState()

            # Move on to next (dropped) stage
            stageIndex += 1

            try:
                progressBar.close()
                
                if not self.silent:
                    sys.stdout.continueWritingToTerminal = True # Actually editing a MAPLEAF.IO.Logging.Logger object here
            except AttributeError:
                pass

        print("Simulation Complete")

        logFilePaths = self._postProcess(simDefinition)

        return self.stageFlightPaths, logFilePaths

    #### Pre-sim ####
    def createRocket(self, stage=None):
        ''' 
            Initializes a rocket, complete with an Environment object and logs, both owned by the instance of this class
            Returns an instance of Rocket with it's Environment/Logs initialized. Can be called by external classes to obtain a prepped rocket (used a lot this way in test cases).
        '''
        # Initialize Rocket
        rocketDictReader = SubDictReader("Rocket", self.simDefinition)  
        rocket = Rocket(rocketDictReader, silent=self.silent, stageToInitialize=stage, simRunner=self, environment=self.environment)       # Initialize Rocket

        if self.simDefinition.getValue('SimControl.RocketPlot') in [ 'On', 'on' ]:
            rocket.plotShape()  # Reference to this simRunner used to add to logs

        if stage == None:
            self._setUpConsoleLogging()
            self.stagingIndex = 0 # Initially zero, after dropping first stage: 1, after dropping second stage: 2, etc...
            
        return rocket

    def _setUpConsoleLogging(self):
        if self.loggingLevel > 0:
            # Set up logging so that the output of any print calls after this point is captured in mainSimulationLog
            self.consoleOutputLog = []
            if self.silent:
                self.logger = Logging.Logger(self.consoleOutputLog, continueWritingToTerminal=False)
            else:
                self.logger = Logging.Logger(self.consoleOutputLog)
            sys.stdout = self.logger
            
            # Output system info to console and to log
            Logging.getSystemInfo(printToConsole=True)
            # Output sim definition file and default value dict to the log only
            self.consoleOutputLog += Logging.getSimDefinitionAndDefaultValueDictsForOutput(simDefinition=self.simDefinition, printToConsole=False)

            # Output header for data outputted to the console during the simulation
            print("Starting Simulation:")
            print("Time(s) Altitude(m,ASL)")
                
        elif self.silent:
            # No intention of writing things to a log file, just prevent them from being printed to the terminal
            _ = []
            logger = Logging.Logger(_, continueWritingToTerminal=False)
            sys.stdout = logger

    def _getEndDetectorFunction(self, rocket, simConfig, droppedStage=False):
        ''' 
            Returns a function, which returns a boolean value and Union[None, float], indicating whether the 
                simulation endpoint has been reached. When close to the end of a sim, the second value returned is the recommended
                time step to take to hit the end criteria.
                Simulation end criteria defined in sim definition file. 
            Rocket object must be passed in because the end condition functions require a reference to the rocket, 
                so they can access its current altitude/velocity/time attributes
        '''
        # Read desired end criteria from simulation definition file
        if not droppedStage:
            # Get end condition for main stage
            endCondition = simConfig.getValue("SimControl.EndCondition")
            conditionValue = float(simConfig.getValue("SimControl.EndConditionValue"))
        else:
            # Get end condition for dropped stages
            endCondition = simConfig.getValue("SimControl.StageDropPaths.endCondition")
            conditionValue = float(simConfig.getValue("SimControl.StageDropPaths.endConditionValue"))

        # Define all possible end-detector functions
        def isAfterApogee(dt):
            return rocket.rigidBody.state.velocity.Z <= 0 and rocket.rigidBody.time > 1.0, None
        def isAboveAltitude(dt):
            return rocket.rigidBody.state.position.Z >= conditionValue, None
        def isBelowAltitude(dt):
            return rocket.environment.earthModel.getAltitude(*rocket.rigidBody.state.position) <= conditionValue, None
        def EndTimeReached(dt):
            currTime = rocket.rigidBody.time
            if currTime < conditionValue and currTime + dt >= conditionValue:
                return False, conditionValue+1e-14-currTime
            elif currTime >= conditionValue:
                return True, None
            else:
                return False, None

        # Return the desired function
        if endCondition == "Apogee":
            return isAfterApogee
        elif endCondition == "Altitude" and rocket.rigidBody.state.position.Z < conditionValue:
            return isAboveAltitude
        elif endCondition == "Altitude":
            return isBelowAltitude
        else:
            return EndTimeReached

    def _setUpCachingForFlightAnimation(self, rocket):
        flight = RocketFlight()
        flight.times.append(rocket.rigidBody.time)
        flight.rigidBodyStates.append(rocket.rigidBody.state)
        if rocket.controlSystem != None and rocket.controlSystem.controlledSystem != None: 
            # If rocket has moving fins, record their angles for plotting
            nActuators = len(rocket.controlSystem.controlledSystem.actuatorList)
            flight.actuatorDefls = [ [0] for i in range(nActuators) ]
            flight.actuatorTargetDefls = [ [0] for i in range(nActuators) ]
        else:
            flight.actuatorDefls = None
            flight.actuatorTargetDefls = None

        return flight

    #### During sim ####
    def cacheState(self, rocket: Rocket, flight: RocketFlight):
        ''' Adds the rocket's current state to the flight object '''
        time = rocket.rigidBody.time
        flight.times.append(time)
        flight.rigidBodyStates.append(rocket.rigidBody.state)
        if rocket.controlSystem != None:
            try:
                for a in range(len(rocket.controlSystem.controlledSystem.actuatorList)):
                    flight.actuatorDefls[a].append(rocket.controlSystem.controlledSystem.actuatorList[a].getDeflection(time))
                    flight.actuatorTargetDefls[a].append(rocket.controlSystem.controlledSystem.actuatorList[a].targetDeflection)
            except AttributeError:
                # Expecting to arrive here when timestepping a dropped stage of a controlled rocket, which doesn't have canards
                pass
                
    def createNewDetachedStage(self):
        ''' Called by Rocket._stageSeparation '''
        if self.computeStageDropPaths:
            newDetachedStage = self.createRocket(stage=self.stagingIndex)
            # Set kinematic properties to match those of the current top-most stage
            topStage = self.rocketStages[0]
            newDetachedStage.rigidBody.state = deepcopy(topStage.rigidBody.state)
            newDetachedStage.rigidBody.time = topStage.rigidBody.time
            self.rocketStages.append(newDetachedStage)

            # New sim termination condition function
            self.endDetectors.append(self._getEndDetectorFunction(newDetachedStage, self.simDefinition, droppedStage=True))
            self.dts.append(self.dts[0])
            
            # Duplicate existing flight object
            newFlightObject = deepcopy(self.stageFlightPaths[0]) # Will have had the same flight path as the top stage until the moment of separation
            newFlightObject.actuatorDefls = None # Dropped stage shouldn't have any canard deflections
            self.stageFlightPaths.append(newFlightObject)

            self.stagingIndex += 1

    def discardForceLogsForPreviousTimeStep(self, integrator):
        if self.loggingLevel >= 2:
            # Figure out how many times this integrator evaluates a function derivative (rocket forces in our case)
            if integrator.method == "RK12Adaptive":
                numDerivativeEvals = 2
            else:
                numDerivativeEvals = len(integrator.tableau)-1

            # Remove that number of rows from the end of the force evaluation log
            for i in range(numDerivativeEvals):
                for rocket in self.rocketStages:
                    rocket.derivativeEvaluationLog.deleteLastRow()

    def _handleSimulationCrash(self):
        ''' After a simulation crashes, tries to create log files and show plots anyways, before printing a stack trace '''
        print("ERROR: Simulation Crashed, Aborting")
        print("Attempting to save log files and show plots")
        self._postProcess(self.simDefinition)

        # Try to print out the stack trace
        print("Attempting to show stack trace")
        import traceback
        tb = traceback.format_exc()
        print(tb)

        print("Exiting")
        sys.exit()

    #### Post-sim ####
    def _postProcess(self, simDefinition):
        simDefinition.printDefaultValuesUsed() # Print these out before logging, to include them in the log

        # Log results
        logFilePaths = self._logSimulationResults(simDefinition)

        # Transfer key time info to flight objects from rocket
        for i in range(len(self.rocketStages)):
            self.stageFlightPaths[i].engineOffTime = self.rocketStages[i].engineShutOffTime
            self.stageFlightPaths[i].mainChuteDeployTime = self.rocketStages[i].mainChuteDeployTime
            self.stageFlightPaths[i].targetLocation = self.rocketStages[i].targetLocation

        # Plot results
        self._plotSimulationResults(self.rocketStages, simDefinition, self.stageFlightPaths, logFilePaths)

        # Print these out after logging to avoid including the log/plot keys in the unused keys
        # #TODO: Add exceptions for these keys, move this line to before logging so that it's output is also included in the simulation log
        simDefinition.printUnusedKeys()

        return logFilePaths

    def _logSimulationResults(self, simDefinition):
        ''' Logs simulation results to file (as/if specified in sim definition) '''
        logFilePaths = None
        if self.loggingLevel > 0:
            logFilePaths = []

            # Create a new folder for the results of the current simulation
            periodIndex = simDefinition.fileName.rfind('.')
            resultsFolderBaseName = simDefinition.fileName[:periodIndex] + "_Run"

            def tryCreateResultsFolder(resultsFolderBaseName):
                resultsFolderName = Logging.findNextAvailableNumberedFileName(fileBaseName=resultsFolderBaseName, extension="")
                
                try:
                    os.mkdir(resultsFolderName)
                    return resultsFolderName

                except FileExistsError:
                    # End up here if another process created the same results folder 
                        # (other thread runs os.mkdir b/w when this thread runs findNextAvailableNumberedFileName and os.mkdir)
                        # Should only happen during parallel runs
                    return ""

            createdResultsFolder = tryCreateResultsFolder(resultsFolderBaseName)
            iterations = 0
            while createdResultsFolder == "" and iterations < 50:
                createdResultsFolder = tryCreateResultsFolder(resultsFolderBaseName)
                iterations += 1

            if iterations == 50:
                raise ValueError("Repeated error (50x): unable to create a results folder: {}.".format(resultsFolderBaseName))

            # Write logs to file
            for rocket in self.rocketStages:
                logFilePaths += rocket.writeLogsToFile(createdResultsFolder)            

            # Output console output
            consoleOutputPath = os.path.join(createdResultsFolder, "consoleOutput.txt")
            print("Writing log file: {}".format(consoleOutputPath))
            with open(consoleOutputPath, 'w+') as file:
                file.writelines(self.consoleOutputLog)

        return logFilePaths

    def _plotSimulationResults(self, rocketStages, simDefinition, flights, logFilePaths):
        ''' Plot simulation results (as/if specified in sim definition) '''

        plotsToMake = simDefinition.getValue("SimControl.plot").split()

        if plotsToMake != ["None"]:

            if "FlightAnimation" in plotsToMake:
                print("Showing flight animation")

                # Show animation
                Plotting.flightAnimation(flights)

                # Done, remove from plotsToMake
                plotsToMake.remove("FlightAnimation")

            if "FlightPaths" in plotsToMake:
                earthModel = self.simDefinition.getValue("Environment.EarthModel")
                if earthModel in [ "None", "Flat" ]:
                    Plotting.plotFlightPaths_NoEarth(flights)
                else:
                    Plotting.plotFlightPaths_FullEarth(flights)

                plotsToMake.remove("FlightPaths")

            # Plot all other columns from log files
            for plotDefinitionString in plotsToMake:
                Plotting.plotFromLogFiles(logFilePaths, plotDefinitionString)

Subclasses

Instance variables

var environment

Instance of Environment. Will be shared by all Rockets created by this sim runner

var silent

(bool)

var simDefinition

Instance of SimDefinition. Defines the current simulation

var stagingIndex

(int) Set in Simulation.createRocket(). Tracks how many stages have been dropped

Methods

def cacheState(self, rocket: Rocket, flight: RocketFlight)

Adds the rocket's current state to the flight object

Expand source code
def cacheState(self, rocket: Rocket, flight: RocketFlight):
    ''' Adds the rocket's current state to the flight object '''
    time = rocket.rigidBody.time
    flight.times.append(time)
    flight.rigidBodyStates.append(rocket.rigidBody.state)
    if rocket.controlSystem != None:
        try:
            for a in range(len(rocket.controlSystem.controlledSystem.actuatorList)):
                flight.actuatorDefls[a].append(rocket.controlSystem.controlledSystem.actuatorList[a].getDeflection(time))
                flight.actuatorTargetDefls[a].append(rocket.controlSystem.controlledSystem.actuatorList[a].targetDeflection)
        except AttributeError:
            # Expecting to arrive here when timestepping a dropped stage of a controlled rocket, which doesn't have canards
            pass
def createNewDetachedStage(self)

Called by Rocket._stageSeparation

Expand source code
def createNewDetachedStage(self):
    ''' Called by Rocket._stageSeparation '''
    if self.computeStageDropPaths:
        newDetachedStage = self.createRocket(stage=self.stagingIndex)
        # Set kinematic properties to match those of the current top-most stage
        topStage = self.rocketStages[0]
        newDetachedStage.rigidBody.state = deepcopy(topStage.rigidBody.state)
        newDetachedStage.rigidBody.time = topStage.rigidBody.time
        self.rocketStages.append(newDetachedStage)

        # New sim termination condition function
        self.endDetectors.append(self._getEndDetectorFunction(newDetachedStage, self.simDefinition, droppedStage=True))
        self.dts.append(self.dts[0])
        
        # Duplicate existing flight object
        newFlightObject = deepcopy(self.stageFlightPaths[0]) # Will have had the same flight path as the top stage until the moment of separation
        newFlightObject.actuatorDefls = None # Dropped stage shouldn't have any canard deflections
        self.stageFlightPaths.append(newFlightObject)

        self.stagingIndex += 1
def createRocket(self, stage=None)

Initializes a rocket, complete with an Environment object and logs, both owned by the instance of this class Returns an instance of Rocket with it's Environment/Logs initialized. Can be called by external classes to obtain a prepped rocket (used a lot this way in test cases).

Expand source code
def createRocket(self, stage=None):
    ''' 
        Initializes a rocket, complete with an Environment object and logs, both owned by the instance of this class
        Returns an instance of Rocket with it's Environment/Logs initialized. Can be called by external classes to obtain a prepped rocket (used a lot this way in test cases).
    '''
    # Initialize Rocket
    rocketDictReader = SubDictReader("Rocket", self.simDefinition)  
    rocket = Rocket(rocketDictReader, silent=self.silent, stageToInitialize=stage, simRunner=self, environment=self.environment)       # Initialize Rocket

    if self.simDefinition.getValue('SimControl.RocketPlot') in [ 'On', 'on' ]:
        rocket.plotShape()  # Reference to this simRunner used to add to logs

    if stage == None:
        self._setUpConsoleLogging()
        self.stagingIndex = 0 # Initially zero, after dropping first stage: 1, after dropping second stage: 2, etc...
        
    return rocket
def discardForceLogsForPreviousTimeStep(self, integrator)
Expand source code
def discardForceLogsForPreviousTimeStep(self, integrator):
    if self.loggingLevel >= 2:
        # Figure out how many times this integrator evaluates a function derivative (rocket forces in our case)
        if integrator.method == "RK12Adaptive":
            numDerivativeEvals = 2
        else:
            numDerivativeEvals = len(integrator.tableau)-1

        # Remove that number of rows from the end of the force evaluation log
        for i in range(numDerivativeEvals):
            for rocket in self.rocketStages:
                rocket.derivativeEvaluationLog.deleteLastRow()
def run(self, rocket=None)

Runs simulation defined by self.simDefinition (which has parsed a simulation definition file)

Returns

  • stageFlightsPaths: (list[MAPLEAF.IO.RocketFlight.RocketFlight]) Each RocketFlight object represents the flight path of a single stage
  • logFilePaths: (list[string]) list of paths to all log files created by this simulation
Expand source code
def run(self, rocket=None):
    ''' 
        Runs simulation defined by self.simDefinition (which has parsed a simulation definition file)

        Returns:
            * stageFlightsPaths: (list[`MAPLEAF.IO.RocketFlight.RocketFlight`]) Each RocketFlight object represents the flight path of a single stage
            * logFilePaths: (list[string]) list of paths to all log files created by this simulation
    '''
    simDefinition = self.simDefinition

    # Initialize the rocket + environment models and simulation logging
    if rocket == None:
        rocket = self.createRocket() # Initialize rocket on launch pad, with all stages attached
    self.rocketStages = [ rocket ] # In this array, 'stage' means independent rigid bodies. Stages are initialized as new rocket objects and added once they are dropped from the main rocket

    # Create progress bar if appropriate
    if simDefinition.getValue("SimControl.EndCondition") == "Time":
        endTime = float(simDefinition.getValue("SimControl.EndConditionValue"))
        progressBar = tqdm(total=endTime+0.01)
        
        try:
            self.logger.continueWritingToTerminal = False
        except AttributeError:
            pass # Logging not set up for this sim
    else:
        progressBar = None

    #### Main Loop Setup #### 
    self.dts = [ float(simDefinition.getValue("SimControl.timeStep")) ]    # (Initial) time step size
    self.endDetectors = [ self._getEndDetectorFunction(rocket, simDefinition) ] # List contains a boolean function that controls sim termination for each stage
    self.stageFlightPaths = [ self._setUpCachingForFlightAnimation(rocket) ] # List will contain resulting flight paths for each stage

    if(rocket.hardwareInTheLoopControl == "yes"):
        print("Setting up hardware in the loop interface")
        rocket.hilInterface.setupHIL(self.rocketStages[0].rigidBody.state)

    #### Main Loop ####
    stageIndex = 0
    while stageIndex < len(self.rocketStages):

        if stageIndex > 0:
            print("Computing stage {} drop path".format(stageIndex))

        rocket = self.rocketStages[stageIndex]
        endDetector = self.endDetectors[stageIndex]
        flight = self.stageFlightPaths[stageIndex]

        endSimulation, FinalTimeStepDt = endDetector(self.dts[stageIndex])
            
        while not endSimulation:
            ### Take a time step ###
            try:
                if FinalTimeStepDt != None:
                    print("Simulation Runner overriding time step from {} to {} to accurately meet end condition".format(self.dts[stageIndex], FinalTimeStepDt))
                    self.dts[stageIndex] = FinalTimeStepDt

                integrationResult = rocket.timeStep(self.dts[stageIndex])

                if stageIndex == 0: # Currently, progress bar only works for bottom stage
                    try:
                        progressBar.update(integrationResult.dt)
                    except AttributeError:
                        pass
            except:
                try:
                    progressBar.close()
            
                    if not self.silent:
                        sys.stdout.continueWritingToTerminal = True # sys.stdout is an instance of MAPLEAF.IO.Logging.Logger
                except AttributeError:
                    pass

                # Save simulation results and print out stack trace                    
                self._handleSimulationCrash()

            # Adjust time step size for next iteration
            self.dts[stageIndex] = integrationResult.dt * integrationResult.timeStepAdaptationFactor

            # HIL
            if(rocket.hardwareInTheLoopControl == "yes"):
                rocket.hilInterface.performHIL(rocket.rigidBody.state,rocket.rigidBody.time)

            # Cache states for flight animation
            self.cacheState(rocket, flight)

            # Check whether we should end the simulation, or take a modified-size final time step    
            endSimulation, FinalTimeStepDt = endDetector(self.dts[stageIndex])
        
        # Log last state (would be the starting state of the next time step)
        rocket._logState()

        # Move on to next (dropped) stage
        stageIndex += 1

        try:
            progressBar.close()
            
            if not self.silent:
                sys.stdout.continueWritingToTerminal = True # Actually editing a MAPLEAF.IO.Logging.Logger object here
        except AttributeError:
            pass

    print("Simulation Complete")

    logFilePaths = self._postProcess(simDefinition)

    return self.stageFlightPaths, logFilePaths
class WindTunnelSimulation (parametersToSweep=None, parameterValues=None, simDefinitionFilePath=None, simDefinition=None, silent=False, smoothLine='False')

Inputs

  • simDefinitionFilePath: (string) path to simulation definition file
  • fW: (SimDefinition) object that's already loaded and parsed the desired sim definition file
  • silent: (bool) toggles optional outputs to the console
Expand source code
class WindTunnelSimulation(Simulation):
    def __init__(self, parametersToSweep=None, parameterValues=None, simDefinitionFilePath=None, simDefinition=None, silent=False, smoothLine='False'):
        self.parametersToSweep = ["Rocket.velocity"] if (parametersToSweep == None) else parametersToSweep
        self.parameterValues = [["(0 0 100)", "(0 0 200)", "(0 0 300)"]] if (parameterValues == None) else parameterValues
        self.smoothLine = smoothLine

        # Error checks
        if len(self.parametersToSweep) != len(self.parameterValues):
            raise ValueError("Must have a list of values for each parameter to sweep over. Currently have {} parameters, and {} lists of values.".format(len(self.parametersToSweep), len(self.parameterValues)))

        paramValueCounts = [ len(x) for x in self.parameterValues ]
        if not all([ (x == paramValueCounts[0]) for x in paramValueCounts ]):
            raise ValueError("All lists of parameter values must be of equal length. Currently, list lengths are: {}".format(paramValueCounts))

        Simulation.__init__(self, simDefinitionFilePath, simDefinition, silent)

    def runSweep(self):
        '''
            Runs a single force evaluation for each value of self.parameterToSweekKey contained in self.parameterValueList
            Returns:
                List of log file paths. Main Sim Log will be empty. Force eval log and expanded force eval log will have one entry per sweep point
        '''
        # Make sure we're logging forces (the only way to get data out of this sim runner) and avoid producing any plots
        self.simDefinition.setValue("SimControl.loggingLevel", "3")
        self.simDefinition.setValue("SimControl.plot", "None") 
        self.simDefinition.setValue("SimControl.RocketPlot", "Off")

        if strtobool(self.smoothLine): # interpolate between user set parameter values
            self._addPoints() 

        for i in range(len(self.parameterValues[0])): # i corresponds to # of conditions, ie how many times parameter values will be changed (velocity1, velocity2, ...)
            for j in range(len(self.parameterValues)): # j'th parameter (velocity, temperature)
                self.simDefinition.setValue(self.parametersToSweep[j], self.parameterValues[j][i])

            # Run a single force evaluation, which creates a forces log entry for this force evaluation
            rocket = self.createRocket()

            # Track all derivative evaluations in a single log
            if i == 0:
                log = rocket.derivativeEvaluationLog
            else:
                rocket.derivativeEvaluationLog = log

            self.rocketStages = [ rocket ]
            rocket._getAppliedForce(0.0, rocket.rigidBody.state)

        # Write Logs to file, return path
        return self._postProcess()

    def _addPoints(self, pointMultiple=10):
        ''' Edits the parameter sweeps to include a multiple of the previous number of points, linearly interpolated between the given values '''
        for i in range(len(self.parameterValues[0]) - 1): # i corresponds to # of tests to run
            for interpPt in range(1, pointMultiple): # Loops over each new point
                
                intervalStartIndex = pointMultiple*i
                # Index at which to add new point (also interval end index)
                newPointIndex = pointMultiple*i + interpPt

                for param in range(len(self.parametersToSweep)): # j'th corresponds to parameters to sweep over (velocity, temperature)
                    try:
                        # Vector sweep
                        first = Vector(self.parameterValues[param][intervalStartIndex])
                        second = Vector(self.parameterValues[param][newPointIndex])
                        change = second - first
                        incrementalValue = str(interpPt/pointMultiple*change + first)
                    except ValueError:
                        # Scalar sweep
                        first = float(self.parameterValues[param][intervalStartIndex])
                        second = float(self.parameterValues[param][newPointIndex])
                        change = second - first
                        incrementalValue = str(interpPt/pointMultiple*change + first)
                    
                    self.parameterValues[param].insert(newPointIndex, incrementalValue)

    def createRocket(self):
        ''' 
            Do all the same stuff as the parent object, but also re-initialize the environment, 
                to make sure changes to environmental properties during the parameter sweep take effect 
        '''
        self.environment = Environment(self.simDefinition, silent=self.silent)
        return super().createRocket()

    def _setUpLogging(self):
        ''' Override to ensure that logs aren't re-initialized for every simulation.
            mainSimulationLog will only be absent the first time this function is run
            Want to keep all the force data in a single log file '''
        if not hasattr(self, 'consoleOutputLog'):
            return super()._setUpConsoleLogging()

    def _postProcess(self):
        ''' Creates an empty flight path object to prevent errors in the parent function, which is still run to create log files.
            Removes mainSimLog from (returned) log file paths since no time steps we taken by this sim '''
        # Create an empty flight path to prevent errors in the parent function)
        self.stageFlightPaths = [ RocketFlight() ]
        return Simulation._postProcess(self, self.simDefinition)

Ancestors

Methods

def createRocket(self)

Do all the same stuff as the parent object, but also re-initialize the environment, to make sure changes to environmental properties during the parameter sweep take effect

Expand source code
def createRocket(self):
    ''' 
        Do all the same stuff as the parent object, but also re-initialize the environment, 
            to make sure changes to environmental properties during the parameter sweep take effect 
    '''
    self.environment = Environment(self.simDefinition, silent=self.silent)
    return super().createRocket()
def runSweep(self)

Runs a single force evaluation for each value of self.parameterToSweekKey contained in self.parameterValueList

Returns

List of log file paths. Main Sim Log will be empty. Force eval log and expanded force eval log will have one entry per sweep point

Expand source code
def runSweep(self):
    '''
        Runs a single force evaluation for each value of self.parameterToSweekKey contained in self.parameterValueList
        Returns:
            List of log file paths. Main Sim Log will be empty. Force eval log and expanded force eval log will have one entry per sweep point
    '''
    # Make sure we're logging forces (the only way to get data out of this sim runner) and avoid producing any plots
    self.simDefinition.setValue("SimControl.loggingLevel", "3")
    self.simDefinition.setValue("SimControl.plot", "None") 
    self.simDefinition.setValue("SimControl.RocketPlot", "Off")

    if strtobool(self.smoothLine): # interpolate between user set parameter values
        self._addPoints() 

    for i in range(len(self.parameterValues[0])): # i corresponds to # of conditions, ie how many times parameter values will be changed (velocity1, velocity2, ...)
        for j in range(len(self.parameterValues)): # j'th parameter (velocity, temperature)
            self.simDefinition.setValue(self.parametersToSweep[j], self.parameterValues[j][i])

        # Run a single force evaluation, which creates a forces log entry for this force evaluation
        rocket = self.createRocket()

        # Track all derivative evaluations in a single log
        if i == 0:
            log = rocket.derivativeEvaluationLog
        else:
            rocket.derivativeEvaluationLog = log

        self.rocketStages = [ rocket ]
        rocket._getAppliedForce(0.0, rocket.rigidBody.state)

    # Write Logs to file, return path
    return self._postProcess()

Inherited members