Module MAPLEAF.SimulationRunners

Defines functions and classes that manage simulations.
Includes some (Simulation) that run a single simulation, and others (OptimizingSimRunner) that run many simulations at once.

Expand source code
'''
Defines functions and classes that manage simulations.  
Includes some (`Simulation`) that run a single simulation, and others (`OptimizingSimRunner`) that run many simulations at once.

.. image:: https://storage.needpix.com/rsynced_images/important-1705212_1280.png
'''

# Make the classes in all submodules importable directly from MAPLEAF.Rocket
from .SingleSimulations import *
from .MonteCarlo import *
from .Convergence import *
from .Optimization import *
from .Batch import *

# For some reason CythonVector and company don't exist down here, so they won't import when running from MAPLEAF.Motion import *
subModules = [ SingleSimulations, MonteCarlo, Convergence, Optimization, Batch ]

__all__ = [ ]

for subModule in subModules:
    __all__ += subModule.__all__

Sub-modules

MAPLEAF.SimulationRunners.Batch

Script to run a batch of simulations, defined in a batch definition file. Can be run directly from the command line. Accessible as mapleaf-batch if …

MAPLEAF.SimulationRunners.Convergence
MAPLEAF.SimulationRunners.MonteCarlo
MAPLEAF.SimulationRunners.Optimization
MAPLEAF.SimulationRunners.SingleSimulations

Functions

def loadSimDefinition(simDefinitionFilePath=None, simDefinition=None, silent=False)

Loads a simulation definition file into a SimDefinition object - accepts either a file path or a SimDefinition object as input

Expand source code
def loadSimDefinition(simDefinitionFilePath=None, simDefinition=None, silent=False):
    ''' Loads a simulation definition file into a `MAPLEAF.IO.SimDefinition` object - accepts either a file path or a `MAPLEAF.IO.SimDefinition` object as input '''
    if simDefinition == None and simDefinitionFilePath != None:
        return SimDefinition(simDefinitionFilePath, silent=silent) # Parse simulation definition file

    elif simDefinition != None:
        return simDefinition # Use the SimDefinition that was passed in

    else:
        raise ValueError(""" Insufficient information to initialize a Simulation.
            Please provide either simDefinitionFilePath (string) or fW (SimDefinition), which has been created from the desired Sim Definition file.
            If both are provided, the SimDefinition is used.""")
def main(argv=None)
Expand source code
def main(argv=None):    
    # Parse command line arguments
    parser = _buildParser()
    args = parser.parse_args(argv)

    # Load definition file
    from MAPLEAF.Main import findSimDefinitionFile  # Delayed import here to avoid circular imports
    batchDefinitionPath = findSimDefinitionFile(args.batchDefinitionFile)
    batchDefinition = SimDefinition(batchDefinitionPath, defaultDict={}, silent=True)

    include = args.include[0] if len(args.include) > 0 else None
    exclude = args.exclude[0] if len(args.exclude) > 0 else None
    validate = args.validate[0] if len(args.validate) > 0 else None

    # Create batch run object containing settings and results
    batchRun = BatchRun(batchDefinition, args.recordAll, args.printStackTraces, include, exclude, resultToValidate=validate)

    # Run Cases
    return run(batchRun)
def optimizationRunnerFactory(simDefinitionFilePath=None, simDefinition=None, optimizationReader=None, silent=False, parallel=False) ‑> MAPLEAF.SimulationRunners.Optimization.OptimizingSimRunner

Provide a subdictionary reader pointed at an optimization dictionary. Will read the dictionary, initialize an optimizing simulation runner and return it.

Expand source code
def optimizationRunnerFactory(simDefinitionFilePath=None, simDefinition=None, optimizationReader=None, silent=False, parallel=False) -> OptimizingSimRunner:
    ''' Provide a subdictionary reader pointed at an optimization dictionary. Will read the dictionary, initialize an optimizing simulation runner and return it. '''
    if simDefinition != None or simDefinitionFilePath != None:
        simDefinition = loadSimDefinition(simDefinitionFilePath, simDefinition, silent)
        optimizationReader = SubDictReader('Optimization', simDefinition)

        # Ensure no output is produced during each simulation (cost function evaluation)
        simDefinition.setValue("SimControl.plot", "None")
        simDefinition.setValue("SimControl.RocketPlot", "Off")
    else:
        if optimizationReader == None:
            raise ValueError('subDictReader not initialized for a nested optimization')

    method = optimizationReader.tryGetString('method', defaultValue='PSO')

    if method == 'PSO':
        return PSORunner(optimizationReader, silent=silent, parallel=parallel)
    elif 'scipy.optimize.minimize' in method:
        return ScipyMinimizeRunner(optimizationReader, silent=silent, parallel=parallel)
    else:
        raise ValueError('Optimization method: {} not implemented, try PSO'.format(method))
def run(batchRun: BatchRun) ‑> int

Given a batchRun object (of type BatchRun), will run all of its test cases, and print a summary of the results

Expand source code
def run(batchRun: BatchRun) -> int:
    ''' Given a batchRun object (of type `BatchRun`), will run all of its test cases, and print a summary of the results '''
    # Track how long running cases takes
    startTime = time.time()

    # Get all the regression test cases
    testCases = batchRun.getCasesToRun()

    # Run them
    for case in testCases:
        caseResult = _runCase(case, batchRun)
        batchRun.casesRun.append(caseResult)

    # Print summary
    runTime = time.time() - startTime
    return batchRun.printResult(runTime) # Returns 0 or 1, suitable for the command line
def runMonteCarloSimulation(simDefinitionFilePath=None, simDefinition=None, silent=False, nCores=1)
Expand source code
def runMonteCarloSimulation(simDefinitionFilePath=None, simDefinition=None, silent=False, nCores=1):
    simDefinition = loadSimDefinition(simDefinitionFilePath, simDefinition, silent)

    nRuns, mCLogger, outputLists = _prepSim(simDefinition)    

    if nCores > 1:
        _runSimulations_Parallel(simDefinition, nRuns, outputLists, silent, nCores)
    else:            
        _runSimulations_SingleThreaded(simDefinition, nRuns, outputLists, mCLogger, silent)
    
    _showResults(simDefinition, outputLists, mCLogger)
def runSimulation(simDefinitionFilePath=None, simDefinition=None, silent=False)
Expand source code
def runSimulation(simDefinitionFilePath=None, simDefinition=None, silent=False):
    sim = Simulation(simDefinitionFilePath, simDefinition, silent)
    return sim.run()

Classes

class BatchRun (batchDefinition: SimDefinition, recordAll=False, printStackTraces=False, include=None, exclude=None, percentErrorTolerance=0.2, absoluteErrorTolerance=1e-09, resultToValidate=None)

Class to hold info about and results of a mapleaf-batch run

Expand source code
class BatchRun():
    ''' Class to hold info about and results of a mapleaf-batch run '''
    def __init__(self, 
            batchDefinition: SimDefinition, 
            recordAll=False, 
            printStackTraces=False, 
            include=None, 
            exclude=None,
            percentErrorTolerance=0.2,
            absoluteErrorTolerance=1e-9,
            resultToValidate=None
        ):
        self.batchDefinition = batchDefinition
        self.recordAll = recordAll
        self.printStackTraces = printStackTraces
        self.include = include
        self.exclude = exclude

        self.casesRun = []
        self.nComparisonSets = 0
        self.casesWithNewRecordedResults = set()

        self.warningCount = 0
        self.percentErrorTolerance = percentErrorTolerance
        self.absoluteErrorTolerance = absoluteErrorTolerance

        self.validationErrors = []
        self.validationDataUsed = []
        self.resultToValidate = resultToValidate

    def getCasesToRun(self):
        subDicts = self.batchDefinition.getImmediateSubDicts("")
        
        if self.include == None and self.exclude == None:
            # Run all cases
            return subDicts

        else:
            # Only run cases that include the include string AND do not contain the exclude string
            casesToRun = []
            for caseDictName in subDicts:
                if (self.include == None or self.include in caseDictName) and (self.exclude == None or self.exclude not in caseDictName):
                    casesToRun.append(caseDictName)

            return casesToRun
    
    def printResult(self, timeToRun=None) -> int:
        """ Outputs result summary """
        # Count number of cases failed
        casesFailed = []
        nTestsFailed = 0
        nTestsPassed = 0
        for result in self.casesRun:
            if result.testsFailed > 0 or result.totalErrors:
                casesFailed.append(result.name)
            
            nTestsPassed += result.testsPassed
            nTestsFailed += result.testsFailed

        nCases = len(self.casesRun)
        nCasesFailed = len(casesFailed)
        nCasesPassed = nCases - nCasesFailed
        nTests = nTestsFailed + nTestsPassed

        print("\n----------------------------------------------------------------------")
        print("BATCH RUN RESULTS")

        if timeToRun != None:
            print("Ran {} Case(s) in {:>.2f} s".format(nCases, timeToRun))
        else:
            print("Ran {} Case(s)".format(nCases))

        if self.resultToValidate != None:
            if len(self.validationErrors) > 0:
                print("\nValidation Results for {}:".format(self.resultToValidate))
                print("Average disagreement with validation data across {} validation data sets: {:2.2f}%".format( len(self.validationDataUsed), mean(self.validationErrors)))
                print("Average magnitude of disgreement with validation data across {} validation data sets: {:2.2f}%".format( len(self.validationDataUsed), mean([abs(error) for error in self.validationErrors])))
                print("Data Sets Used:")
                for (dataSet, avgError) in zip(self.validationDataUsed, self.validationErrors):
                    print("{}: {:2.2f}%".format(dataSet, avgError))
                print("")
            else:
                self.warning("\nERROR: No comparison/validation data for {} found. Make sure there is a plot of {} and some comparison data, and that {} is included in the name of those plotting dictionaries\n".format(self.resultToValidate, self.resultToValidate, self.resultToValidate))
  
        if self.warningCount > 0:
            print("Errors/Warnings: {}".format(self.warningCount))

        if len(self.casesWithNewRecordedResults) > 0:
            recordedCaseList = ", ".join(self.casesWithNewRecordedResults)
            print("New expected results were recorded for the following cases: {}".format(recordedCaseList))
            _writeModifiedTestDefinitionFile(self.batchDefinition)

        if nCasesFailed == 0:
            print("{} Case(s) ok".format(nCases))
            print("")
            if self.warningCount == 0:
                print("OK")
            else:
                print("WARNING")

            return 0
        else:
            print("{}/{} Case(s) Failed, {}/{} Parameter Comparison(s) Failed".format(nCasesFailed, nCases, nTestsFailed, nTests))
            print("")
            print("Failed Cases:")
            for case in casesFailed:
                print(case)
            print("")
            print("FAIL")

            return 1

    def warning(self, msg: str):
        ''' Currently, warnings are used when errors occur in processes not directly related to MAPLEAF simulations, like loading comparison data '''
        self.warningCount +=1
        print(msg)

Methods

def getCasesToRun(self)
Expand source code
def getCasesToRun(self):
    subDicts = self.batchDefinition.getImmediateSubDicts("")
    
    if self.include == None and self.exclude == None:
        # Run all cases
        return subDicts

    else:
        # Only run cases that include the include string AND do not contain the exclude string
        casesToRun = []
        for caseDictName in subDicts:
            if (self.include == None or self.include in caseDictName) and (self.exclude == None or self.exclude not in caseDictName):
                casesToRun.append(caseDictName)

        return casesToRun
def printResult(self, timeToRun=None) ‑> int

Outputs result summary

Expand source code
def printResult(self, timeToRun=None) -> int:
    """ Outputs result summary """
    # Count number of cases failed
    casesFailed = []
    nTestsFailed = 0
    nTestsPassed = 0
    for result in self.casesRun:
        if result.testsFailed > 0 or result.totalErrors:
            casesFailed.append(result.name)
        
        nTestsPassed += result.testsPassed
        nTestsFailed += result.testsFailed

    nCases = len(self.casesRun)
    nCasesFailed = len(casesFailed)
    nCasesPassed = nCases - nCasesFailed
    nTests = nTestsFailed + nTestsPassed

    print("\n----------------------------------------------------------------------")
    print("BATCH RUN RESULTS")

    if timeToRun != None:
        print("Ran {} Case(s) in {:>.2f} s".format(nCases, timeToRun))
    else:
        print("Ran {} Case(s)".format(nCases))

    if self.resultToValidate != None:
        if len(self.validationErrors) > 0:
            print("\nValidation Results for {}:".format(self.resultToValidate))
            print("Average disagreement with validation data across {} validation data sets: {:2.2f}%".format( len(self.validationDataUsed), mean(self.validationErrors)))
            print("Average magnitude of disgreement with validation data across {} validation data sets: {:2.2f}%".format( len(self.validationDataUsed), mean([abs(error) for error in self.validationErrors])))
            print("Data Sets Used:")
            for (dataSet, avgError) in zip(self.validationDataUsed, self.validationErrors):
                print("{}: {:2.2f}%".format(dataSet, avgError))
            print("")
        else:
            self.warning("\nERROR: No comparison/validation data for {} found. Make sure there is a plot of {} and some comparison data, and that {} is included in the name of those plotting dictionaries\n".format(self.resultToValidate, self.resultToValidate, self.resultToValidate))

    if self.warningCount > 0:
        print("Errors/Warnings: {}".format(self.warningCount))

    if len(self.casesWithNewRecordedResults) > 0:
        recordedCaseList = ", ".join(self.casesWithNewRecordedResults)
        print("New expected results were recorded for the following cases: {}".format(recordedCaseList))
        _writeModifiedTestDefinitionFile(self.batchDefinition)

    if nCasesFailed == 0:
        print("{} Case(s) ok".format(nCases))
        print("")
        if self.warningCount == 0:
            print("OK")
        else:
            print("WARNING")

        return 0
    else:
        print("{}/{} Case(s) Failed, {}/{} Parameter Comparison(s) Failed".format(nCasesFailed, nCases, nTestsFailed, nTests))
        print("")
        print("Failed Cases:")
        for case in casesFailed:
            print(case)
        print("")
        print("FAIL")

        return 1
def warning(self, msg: str)

Currently, warnings are used when errors occur in processes not directly related to MAPLEAF simulations, like loading comparison data

Expand source code
def warning(self, msg: str):
    ''' Currently, warnings are used when errors occur in processes not directly related to MAPLEAF simulations, like loading comparison data '''
    self.warningCount +=1
    print(msg)
class ConvergenceSimRunner (simDefinitionFilePath=None, simDefinition=None, silent=False)

Runs a simulation repeatedly, decreasing the time step or target error each time, monitoring for convergence

Inputs

  • simDefinitionFilePath: (string) path to simulation definition file
  • fW: (SimDefinition) object that's already loaded and parsed the desired sim definition file
  • silent: (bool) toggles optional outputs to the console
Expand source code
class ConvergenceSimRunner(Simulation):
    '''
        Runs a simulation repeatedly, decreasing the time step or target error each time, monitoring for convergence
    '''
    def __init__(self, simDefinitionFilePath=None, simDefinition=None, silent=False):
        Simulation.__init__(self, simDefinitionFilePath=simDefinitionFilePath, simDefinition=simDefinition, silent=silent)

    def convergeSimEndPosition(self, refinementRatio=2, simLimit=10, plot=True, stopAtConvergence=False, showPlot=True, plotLineLabel="Simulations", ax1=None, ax2=None):
        '''
            Takes simulation and runs it repeatedly, cutting the time step in half each time.
            Once convergence is approximately asymptotic, exits and returns series of final positions, convergence order, and extrapolated final position
            Should use with simulations that have an EndCondition of type "Time"
                # Otherwise sim will be run using current settings, and its endtime will be taken as the new end time for future convergence sims
            This Fn called by compareIntegrationSchemes functions

            Parameters:
                simConfigFilePath       string, /path/to/simConfigFile
                fW                      SimDefinition, overrides simConfigFilePath
                refinementRatio         Number, Each time sim is run, time step or target error is divided by this number
                simLimit                Number, Max number of simulations to run (takes exponentially more time to run more simulations)
                plot                    True/False, whether to plot the results
                stopAtConvergence       True/False, if False, runs simLimit simulations even if asymptotic convergence is reached earlier
                showPlot                True/False, if True, calls plt.show()
                plotLineLabel           string, Label of line on plot
                ax1                     matplotlib Axes, Z-location (Y) axis
                ax2                     matplotlib Axes, Wall Time (Y) axis (2nd Y-axis)
        '''
        from MAPLEAF.IO.gridConvergenceFunctions import checkConvergence
        from statistics import mean
        import time

        self._setUpConfigFileForConvergenceRun()
        
        timeStepMethod = self.simDefinition.getValue("SimControl.timeDiscretization")
        adaptiveTimeStepping = "Adaptive" in timeStepMethod

        timeStepKey = "SimControl.timeStep"
        targetErrorKey = "SimControl.TimeStepAdaptation.targetError"

        #### Run Simulations ####
        print("Starting convergence simulations")
        if not adaptiveTimeStepping:
            timeStep = float(self.simDefinition.getValue(timeStepKey))*refinementRatio # Multiplied by 2 to give correct time step in first iteration
        else:
            targetError = float(self.simDefinition.getValue(targetErrorKey))*refinementRatio # Multiplied by 2 to give correct time step in first iteration

        simCount = 1
        finalPositionHistory = []
        convergenceHistory = []
        timeStepHistory = []
        simTimeHistory = []

        def printConvergenceHistory(ax1=ax1, ax2=ax2):
            print("")
            print("Convergence History:")
            print("Integration Method: {}".format(timeStepMethod))

            xPos = []
            yPos = []
            zPos = []

            for i in range(len(finalPositionHistory)):
                finalPos = finalPositionHistory[i]
                xPos.append(finalPos[0])
                yPos.append(finalPos[1])
                zPos.append(finalPos[2])
                printString = "FinalPosition(m): {:>7.3f} WallTime(s): {:>7.3f} ".format(finalPos, simTimeHistory[i])

                if i > 1: # TODO: Get convergence results into the .csv file
                    ordersOfConvergence, GCI12s, GCI23s, asymptoticChecks, richardsonExtrapVals, uncertainties = convergenceHistory[i-2]
                    printString += " Avg Order: {:>4.2f}, Avg Asymptotic Check: {:>6.3f}".format(mean(ordersOfConvergence), mean(asymptoticChecks))

                print(printString)

            if plot:
                if ax1 == None:
                    ax1 = plt.gca()
                if ax2 == None:
                    ax2 = ax1.twinx()

                ax1.plot(timeStepHistory, zPos, ":D", label=plotLineLabel)
                ax1.set_ylabel("Final Z Coordinate (m)")

                ax2.plot(timeStepHistory, simTimeHistory, "-*", label=plotLineLabel + " Wall Time")
                ax2.set_ylabel("Wall Time (s)")
                
                plt.xscale("log")
                plt.xlabel("Time Step (s)")
                plt.legend()
                plt.tight_layout()

                if showPlot:
                    plt.show()

        while simCount <= simLimit:
            if not adaptiveTimeStepping:
                timeStep /= refinementRatio
                self.simDefinition.setValue(timeStepKey, str(timeStep))
                timeStepHistory.append(timeStep)
                print("Simulation {}, Time step: {}".format(simCount, timeStep))
            else:
                targetError /= refinementRatio
                self.simDefinition.setValue(targetErrorKey, str(targetError))
                timeStepHistory.append(targetError)
                print("Simulation {}, Time step: {}".format(simCount, targetError))

            startTime = time.time()
            flights, _ = self.run()
            flight = flights[0]
            wallTime = time.time() - startTime
            simTimeHistory.append(wallTime)

            finalPositionHistory.append(flight.rigidBodyStates[-1].position)
            print("Final Position: {:1.3f}".format(finalPositionHistory[-1]))

            if len(finalPositionHistory) >= 3:
                # Check whether result is converging
                cV, mV, fV = finalPositionHistory[-3:]
                print("Checking convergence")
                convergResult = checkConvergence(cV, mV, fV, refinementRatio)
                ordersOfConvergence, GCI12s, GCI23s, asymptoticChecks, richardsonExtrapVals, uncertainties = convergResult
                convergenceHistory.append(convergResult)
                directions = ["X", "Y", "Z"]
                for d in range(len(directions)):
                    print("{}-Direction: Order: {:>4.3f}, Asymptotic Check: {:>6.3f}, RichardsonExtrap: {:>7.3f}, Estimated Uncertainty: {:>6.3f}".format(directions[d], ordersOfConvergence[d], asymptoticChecks[d], richardsonExtrapVals[d], uncertainties[d]))
                
                if stopAtConvergence and abs(sum(asymptoticChecks) / len(asymptoticChecks) - 1) < 0.1 and max(asymptoticChecks) - min(asymptoticChecks) < 0.2:
                    print("Simulation Converging Asymptotically")
                    printConvergenceHistory()
                    return timeStepHistory, finalPositionHistory, flight
            
            simCount += 1

        # Output whether convergence was achieved
        if simLimit >= 3:
            print("Asymptotic convergence not reached within {} simulations".format(simLimit))
        else:
            print("Asymptotic convergence impossible to reach with less than 3 iterations (performed {}). Adjust the parameter 'simLimit' to perform more iterations".format(simLimit))

        printConvergenceHistory(ax1, ax2)

        return timeStepHistory, finalPositionHistory, simTimeHistory

    def compareClassicalIntegrationSchemes(self, saveFigure=False, showPlot=True, simLimit=10, integrationSchemes = [ "Euler", "RK2Midpoint", "RK2Heun", "RK4" ], convergenceResultFilePath="convergenceResult.csv"):
        ''' Arguments:
                simConfigFilePath (string)
                saveFigure (Bool)
                convergenceFilePath (string or None) - will overwrite old files

            Outputs:
                Plot
                .csv file (Optional)

            Returns:
                Nothing
        '''

        plt.figure(figsize=(3.5,3))
        ax1 = plt.gca()
        ax2 = plt.twinx()
        
        initTimeStep = float(self.simDefinition.getValue("SimControl.timeStep"))

        # Lists to store results
        timeStepHistory = []
        finalPositionHistories = []
        wallTimeHistory = []

        # Run series of simulations for each integration scheme
        for scheme in integrationSchemes:
            self.simDefinition.setValue("SimControl.timeDiscretization", scheme)
            self.simDefinition.setValue("SimControl.timeStep", str(initTimeStep))
            timeSteps, finalPositions, wallTimes = self.convergeSimEndPosition(showPlot=False, simLimit=simLimit, plotLineLabel=scheme, ax1=ax1, ax2=ax2)
            
            timeStepHistory = timeSteps
            finalPositionHistories.append(finalPositions)
            wallTimeHistory.append(wallTimes)

        print("Simulations complete")

        if convergenceResultFilePath != None:
            print("Writing convergence results to: {}".format(convergenceResultFilePath))

            with open(convergenceResultFilePath, 'w', newline='') as file:
                writer = csv.writer(file)
                
                # Write Column Headers
                headerRow = [ "TimeStep(s)" ]
                for timeStep in range(len(integrationSchemes)):
                    intScheme = integrationSchemes[timeStep]
                    headerRow += [ "{}_FinalX(m)".format(intScheme), "{}_FinalY(m)".format(intScheme), "{}_FinalZ(m)".format(intScheme), "{}_WallTime(s)".format(intScheme) ]
                
                writer.writerow(headerRow)
                
                # Write convergence results, time step by time step
                for timeStep in range(len(timeStepHistory)):
                    row = [ timeStepHistory[timeStep] ]

                    for integrationScheme in range(len(integrationSchemes)):
                        row.append(finalPositionHistories[integrationScheme][timeStep].X)
                        row.append(finalPositionHistories[integrationScheme][timeStep].Y)
                        row.append(finalPositionHistories[integrationScheme][timeStep].Z)
                        row += [ wallTimeHistory[integrationScheme][timeStep] ]

                    writer.writerow(row)

        if saveFigure:
            try:
                plt.savefig("/home/hhstoldt/Documents/flightSimPaper/Figures/Images/AdaptTimeStepConvergence_ConstTimeStep.eps", bbox_inches="tight", pad_inches=0)
            except:
                plt.savefig("C:/Users/rando/Documents/flightSimPaper/Figures/Images/AdaptTimeStepConvergence_ConstTimeStep.eps", bbox_inches="tight", pad_inches=0)

        print("Showing plot")
        if showPlot:
            plt.show()

    def compareAdaptiveIntegrationSchemes(self, saveFigure=False, showPlot=True, integrationSchemes=["RK12Adaptive", "RK23Adaptive", "RK45Adaptive"], simLimit=10, convergenceResultFilePath="adaptiveConvergenceResult.csv"):
        ''' Arguments:
                simConfigFilePath (string)
                saveFigure (Bool)
                convergenceFilePath (string or None) - will overwrite old files

            Outputs:
                Plot
                .csv file (Optional)

            Returns:
                Nothing
        '''

        plt.figure(figsize=(3.5,3))
        ax1 = plt.gca()
        ax2 = plt.twinx()
        
        initErrorTarget = float(self.simDefinition.getValue("SimControl.TimeStepAdaptation.targetError"))
        
        # Lists to store results
        targetErrorHistory = []
        finalPositionHistories = []
        wallTimeHistory = []

        # Run simulations
        for scheme in integrationSchemes:
            self.simDefinition.setValue("SimControl.timeDiscretization", scheme)
            self.simDefinition.setValue("SimControl.TimeStepAdaptation.targetError", str(initErrorTarget))
            timeSteps, finalPositions, wallTimes = self.convergeSimEndPosition(showPlot=False, plotLineLabel=scheme, refinementRatio=2, simLimit=simLimit, ax1=ax1, ax2=ax2)
            
            targetErrorHistory = timeSteps
            finalPositionHistories.append(finalPositions)
            wallTimeHistory.append(wallTimes)

        # Write results to .csv file
        if convergenceResultFilePath != None:
            import csv
            print("Writing convergence results to: {}".format(convergenceResultFilePath))

            with open(convergenceResultFilePath, 'w', newline='') as file:
                writer = csv.writer(file)
                
                # Write Column Headers
                headerRow = [ "TargetError" ]
                for timeStep in range(len(integrationSchemes)):
                    intScheme = integrationSchemes[timeStep]
                    headerRow += [ "{}_FinalX(m)".format(intScheme), "{}_FinalY(m)".format(intScheme), "{}_FinalZ(m)".format(intScheme), "{}_WallTime(s)".format(intScheme) ]
                
                writer.writerow(headerRow)
                
                # Write convergence results, time step by time step
                for timeStep in range(len(targetErrorHistory)):
                    row = [ targetErrorHistory[timeStep] ]

                    for integrationScheme in range(len(integrationSchemes)):
                        row.append(finalPositionHistories[integrationScheme][timeStep].X)
                        row.append(finalPositionHistories[integrationScheme][timeStep].Y)
                        row.append(finalPositionHistories[integrationScheme][timeStep].Z)
                        row += [ wallTimeHistory[integrationScheme][timeStep] ]

                    writer.writerow(row)

        # Save results figure
        if saveFigure:
            try:
                plt.savefig("/home/hhstoldt/Documents/flightSimPaper/Figures/Images/TimeStepConvergence_ConstTimeStep.eps", bbox_inches="tight", pad_inches=0)
            except:
                plt.savefig("C:/Users/rando/Documents/flightSimPaper/Figures/Images/TimeStepConvergence_ConstTimeStep.eps", bbox_inches="tight", pad_inches=0)

        # Show Plot
        plt.xlabel("Target Error")

        if showPlot:
            plt.show()

    def _setUpConfigFileForConvergenceRun(self):
        print("Will attempt to converge final rocket position of simulation: {}".format(self.simDefinition.fileName))
        self.simDefinition.disableDistributionSampling = True # Don't sample from probability distributions while trying to converge a sim

        # Make sure no plots are created every time the sim runs
        self.simDefinition.setValue("SimControl.plot", "None")

        #### Make sure End Condition is a time ####
        endCondition = self.simDefinition.getValue("SimControl.EndCondition")
        if endCondition != "Time":
            print("Running simulation to determine end time")
            # Otherwise run the sim, get the end time and 
            flights, _ = self.run()
            endTime = flights[0].times[-1]
            # set that to the end condition
            print("Setting EndCondition = Time, EndConditionValue = {}".format(endTime))
            self.simDefinition.setValue("SimControl.EndCondition", "Time")
            self.simDefinition.setValue("SimControl.EndConditionValue", str(endTime))

Ancestors

Methods

def compareAdaptiveIntegrationSchemes(self, saveFigure=False, showPlot=True, integrationSchemes=['RK12Adaptive', 'RK23Adaptive', 'RK45Adaptive'], simLimit=10, convergenceResultFilePath='adaptiveConvergenceResult.csv')

Arguments

simConfigFilePath (string) saveFigure (Bool) convergenceFilePath (string or None) - will overwrite old files

Outputs

Plot .csv file (Optional)

Returns

Nothing

Expand source code
def compareAdaptiveIntegrationSchemes(self, saveFigure=False, showPlot=True, integrationSchemes=["RK12Adaptive", "RK23Adaptive", "RK45Adaptive"], simLimit=10, convergenceResultFilePath="adaptiveConvergenceResult.csv"):
    ''' Arguments:
            simConfigFilePath (string)
            saveFigure (Bool)
            convergenceFilePath (string or None) - will overwrite old files

        Outputs:
            Plot
            .csv file (Optional)

        Returns:
            Nothing
    '''

    plt.figure(figsize=(3.5,3))
    ax1 = plt.gca()
    ax2 = plt.twinx()
    
    initErrorTarget = float(self.simDefinition.getValue("SimControl.TimeStepAdaptation.targetError"))
    
    # Lists to store results
    targetErrorHistory = []
    finalPositionHistories = []
    wallTimeHistory = []

    # Run simulations
    for scheme in integrationSchemes:
        self.simDefinition.setValue("SimControl.timeDiscretization", scheme)
        self.simDefinition.setValue("SimControl.TimeStepAdaptation.targetError", str(initErrorTarget))
        timeSteps, finalPositions, wallTimes = self.convergeSimEndPosition(showPlot=False, plotLineLabel=scheme, refinementRatio=2, simLimit=simLimit, ax1=ax1, ax2=ax2)
        
        targetErrorHistory = timeSteps
        finalPositionHistories.append(finalPositions)
        wallTimeHistory.append(wallTimes)

    # Write results to .csv file
    if convergenceResultFilePath != None:
        import csv
        print("Writing convergence results to: {}".format(convergenceResultFilePath))

        with open(convergenceResultFilePath, 'w', newline='') as file:
            writer = csv.writer(file)
            
            # Write Column Headers
            headerRow = [ "TargetError" ]
            for timeStep in range(len(integrationSchemes)):
                intScheme = integrationSchemes[timeStep]
                headerRow += [ "{}_FinalX(m)".format(intScheme), "{}_FinalY(m)".format(intScheme), "{}_FinalZ(m)".format(intScheme), "{}_WallTime(s)".format(intScheme) ]
            
            writer.writerow(headerRow)
            
            # Write convergence results, time step by time step
            for timeStep in range(len(targetErrorHistory)):
                row = [ targetErrorHistory[timeStep] ]

                for integrationScheme in range(len(integrationSchemes)):
                    row.append(finalPositionHistories[integrationScheme][timeStep].X)
                    row.append(finalPositionHistories[integrationScheme][timeStep].Y)
                    row.append(finalPositionHistories[integrationScheme][timeStep].Z)
                    row += [ wallTimeHistory[integrationScheme][timeStep] ]

                writer.writerow(row)

    # Save results figure
    if saveFigure:
        try:
            plt.savefig("/home/hhstoldt/Documents/flightSimPaper/Figures/Images/TimeStepConvergence_ConstTimeStep.eps", bbox_inches="tight", pad_inches=0)
        except:
            plt.savefig("C:/Users/rando/Documents/flightSimPaper/Figures/Images/TimeStepConvergence_ConstTimeStep.eps", bbox_inches="tight", pad_inches=0)

    # Show Plot
    plt.xlabel("Target Error")

    if showPlot:
        plt.show()
def compareClassicalIntegrationSchemes(self, saveFigure=False, showPlot=True, simLimit=10, integrationSchemes=['Euler', 'RK2Midpoint', 'RK2Heun', 'RK4'], convergenceResultFilePath='convergenceResult.csv')

Arguments

simConfigFilePath (string) saveFigure (Bool) convergenceFilePath (string or None) - will overwrite old files

Outputs

Plot .csv file (Optional)

Returns

Nothing

Expand source code
def compareClassicalIntegrationSchemes(self, saveFigure=False, showPlot=True, simLimit=10, integrationSchemes = [ "Euler", "RK2Midpoint", "RK2Heun", "RK4" ], convergenceResultFilePath="convergenceResult.csv"):
    ''' Arguments:
            simConfigFilePath (string)
            saveFigure (Bool)
            convergenceFilePath (string or None) - will overwrite old files

        Outputs:
            Plot
            .csv file (Optional)

        Returns:
            Nothing
    '''

    plt.figure(figsize=(3.5,3))
    ax1 = plt.gca()
    ax2 = plt.twinx()
    
    initTimeStep = float(self.simDefinition.getValue("SimControl.timeStep"))

    # Lists to store results
    timeStepHistory = []
    finalPositionHistories = []
    wallTimeHistory = []

    # Run series of simulations for each integration scheme
    for scheme in integrationSchemes:
        self.simDefinition.setValue("SimControl.timeDiscretization", scheme)
        self.simDefinition.setValue("SimControl.timeStep", str(initTimeStep))
        timeSteps, finalPositions, wallTimes = self.convergeSimEndPosition(showPlot=False, simLimit=simLimit, plotLineLabel=scheme, ax1=ax1, ax2=ax2)
        
        timeStepHistory = timeSteps
        finalPositionHistories.append(finalPositions)
        wallTimeHistory.append(wallTimes)

    print("Simulations complete")

    if convergenceResultFilePath != None:
        print("Writing convergence results to: {}".format(convergenceResultFilePath))

        with open(convergenceResultFilePath, 'w', newline='') as file:
            writer = csv.writer(file)
            
            # Write Column Headers
            headerRow = [ "TimeStep(s)" ]
            for timeStep in range(len(integrationSchemes)):
                intScheme = integrationSchemes[timeStep]
                headerRow += [ "{}_FinalX(m)".format(intScheme), "{}_FinalY(m)".format(intScheme), "{}_FinalZ(m)".format(intScheme), "{}_WallTime(s)".format(intScheme) ]
            
            writer.writerow(headerRow)
            
            # Write convergence results, time step by time step
            for timeStep in range(len(timeStepHistory)):
                row = [ timeStepHistory[timeStep] ]

                for integrationScheme in range(len(integrationSchemes)):
                    row.append(finalPositionHistories[integrationScheme][timeStep].X)
                    row.append(finalPositionHistories[integrationScheme][timeStep].Y)
                    row.append(finalPositionHistories[integrationScheme][timeStep].Z)
                    row += [ wallTimeHistory[integrationScheme][timeStep] ]

                writer.writerow(row)

    if saveFigure:
        try:
            plt.savefig("/home/hhstoldt/Documents/flightSimPaper/Figures/Images/AdaptTimeStepConvergence_ConstTimeStep.eps", bbox_inches="tight", pad_inches=0)
        except:
            plt.savefig("C:/Users/rando/Documents/flightSimPaper/Figures/Images/AdaptTimeStepConvergence_ConstTimeStep.eps", bbox_inches="tight", pad_inches=0)

    print("Showing plot")
    if showPlot:
        plt.show()
def convergeSimEndPosition(self, refinementRatio=2, simLimit=10, plot=True, stopAtConvergence=False, showPlot=True, plotLineLabel='Simulations', ax1=None, ax2=None)

Takes simulation and runs it repeatedly, cutting the time step in half each time. Once convergence is approximately asymptotic, exits and returns series of final positions, convergence order, and extrapolated final position Should use with simulations that have an EndCondition of type "Time" # Otherwise sim will be run using current settings, and its endtime will be taken as the new end time for future convergence sims This Fn called by compareIntegrationSchemes functions

Parameters

simConfigFilePath string, /path/to/simConfigFile fW SimDefinition, overrides simConfigFilePath refinementRatio Number, Each time sim is run, time step or target error is divided by this number simLimit Number, Max number of simulations to run (takes exponentially more time to run more simulations) plot True/False, whether to plot the results stopAtConvergence True/False, if False, runs simLimit simulations even if asymptotic convergence is reached earlier showPlot True/False, if True, calls plt.show() plotLineLabel string, Label of line on plot ax1 matplotlib Axes, Z-location (Y) axis ax2 matplotlib Axes, Wall Time (Y) axis (2nd Y-axis)

Expand source code
def convergeSimEndPosition(self, refinementRatio=2, simLimit=10, plot=True, stopAtConvergence=False, showPlot=True, plotLineLabel="Simulations", ax1=None, ax2=None):
    '''
        Takes simulation and runs it repeatedly, cutting the time step in half each time.
        Once convergence is approximately asymptotic, exits and returns series of final positions, convergence order, and extrapolated final position
        Should use with simulations that have an EndCondition of type "Time"
            # Otherwise sim will be run using current settings, and its endtime will be taken as the new end time for future convergence sims
        This Fn called by compareIntegrationSchemes functions

        Parameters:
            simConfigFilePath       string, /path/to/simConfigFile
            fW                      SimDefinition, overrides simConfigFilePath
            refinementRatio         Number, Each time sim is run, time step or target error is divided by this number
            simLimit                Number, Max number of simulations to run (takes exponentially more time to run more simulations)
            plot                    True/False, whether to plot the results
            stopAtConvergence       True/False, if False, runs simLimit simulations even if asymptotic convergence is reached earlier
            showPlot                True/False, if True, calls plt.show()
            plotLineLabel           string, Label of line on plot
            ax1                     matplotlib Axes, Z-location (Y) axis
            ax2                     matplotlib Axes, Wall Time (Y) axis (2nd Y-axis)
    '''
    from MAPLEAF.IO.gridConvergenceFunctions import checkConvergence
    from statistics import mean
    import time

    self._setUpConfigFileForConvergenceRun()
    
    timeStepMethod = self.simDefinition.getValue("SimControl.timeDiscretization")
    adaptiveTimeStepping = "Adaptive" in timeStepMethod

    timeStepKey = "SimControl.timeStep"
    targetErrorKey = "SimControl.TimeStepAdaptation.targetError"

    #### Run Simulations ####
    print("Starting convergence simulations")
    if not adaptiveTimeStepping:
        timeStep = float(self.simDefinition.getValue(timeStepKey))*refinementRatio # Multiplied by 2 to give correct time step in first iteration
    else:
        targetError = float(self.simDefinition.getValue(targetErrorKey))*refinementRatio # Multiplied by 2 to give correct time step in first iteration

    simCount = 1
    finalPositionHistory = []
    convergenceHistory = []
    timeStepHistory = []
    simTimeHistory = []

    def printConvergenceHistory(ax1=ax1, ax2=ax2):
        print("")
        print("Convergence History:")
        print("Integration Method: {}".format(timeStepMethod))

        xPos = []
        yPos = []
        zPos = []

        for i in range(len(finalPositionHistory)):
            finalPos = finalPositionHistory[i]
            xPos.append(finalPos[0])
            yPos.append(finalPos[1])
            zPos.append(finalPos[2])
            printString = "FinalPosition(m): {:>7.3f} WallTime(s): {:>7.3f} ".format(finalPos, simTimeHistory[i])

            if i > 1: # TODO: Get convergence results into the .csv file
                ordersOfConvergence, GCI12s, GCI23s, asymptoticChecks, richardsonExtrapVals, uncertainties = convergenceHistory[i-2]
                printString += " Avg Order: {:>4.2f}, Avg Asymptotic Check: {:>6.3f}".format(mean(ordersOfConvergence), mean(asymptoticChecks))

            print(printString)

        if plot:
            if ax1 == None:
                ax1 = plt.gca()
            if ax2 == None:
                ax2 = ax1.twinx()

            ax1.plot(timeStepHistory, zPos, ":D", label=plotLineLabel)
            ax1.set_ylabel("Final Z Coordinate (m)")

            ax2.plot(timeStepHistory, simTimeHistory, "-*", label=plotLineLabel + " Wall Time")
            ax2.set_ylabel("Wall Time (s)")
            
            plt.xscale("log")
            plt.xlabel("Time Step (s)")
            plt.legend()
            plt.tight_layout()

            if showPlot:
                plt.show()

    while simCount <= simLimit:
        if not adaptiveTimeStepping:
            timeStep /= refinementRatio
            self.simDefinition.setValue(timeStepKey, str(timeStep))
            timeStepHistory.append(timeStep)
            print("Simulation {}, Time step: {}".format(simCount, timeStep))
        else:
            targetError /= refinementRatio
            self.simDefinition.setValue(targetErrorKey, str(targetError))
            timeStepHistory.append(targetError)
            print("Simulation {}, Time step: {}".format(simCount, targetError))

        startTime = time.time()
        flights, _ = self.run()
        flight = flights[0]
        wallTime = time.time() - startTime
        simTimeHistory.append(wallTime)

        finalPositionHistory.append(flight.rigidBodyStates[-1].position)
        print("Final Position: {:1.3f}".format(finalPositionHistory[-1]))

        if len(finalPositionHistory) >= 3:
            # Check whether result is converging
            cV, mV, fV = finalPositionHistory[-3:]
            print("Checking convergence")
            convergResult = checkConvergence(cV, mV, fV, refinementRatio)
            ordersOfConvergence, GCI12s, GCI23s, asymptoticChecks, richardsonExtrapVals, uncertainties = convergResult
            convergenceHistory.append(convergResult)
            directions = ["X", "Y", "Z"]
            for d in range(len(directions)):
                print("{}-Direction: Order: {:>4.3f}, Asymptotic Check: {:>6.3f}, RichardsonExtrap: {:>7.3f}, Estimated Uncertainty: {:>6.3f}".format(directions[d], ordersOfConvergence[d], asymptoticChecks[d], richardsonExtrapVals[d], uncertainties[d]))
            
            if stopAtConvergence and abs(sum(asymptoticChecks) / len(asymptoticChecks) - 1) < 0.1 and max(asymptoticChecks) - min(asymptoticChecks) < 0.2:
                print("Simulation Converging Asymptotically")
                printConvergenceHistory()
                return timeStepHistory, finalPositionHistory, flight
        
        simCount += 1

    # Output whether convergence was achieved
    if simLimit >= 3:
        print("Asymptotic convergence not reached within {} simulations".format(simLimit))
    else:
        print("Asymptotic convergence impossible to reach with less than 3 iterations (performed {}). Adjust the parameter 'simLimit' to perform more iterations".format(simLimit))

    printConvergenceHistory(ax1, ax2)

    return timeStepHistory, finalPositionHistory, simTimeHistory

Inherited members

class Simulation (simDefinitionFilePath=None, simDefinition=None, silent=False)

Inputs

  • simDefinitionFilePath: (string) path to simulation definition file
  • fW: (SimDefinition) object that's already loaded and parsed the desired sim definition file
  • silent: (bool) toggles optional outputs to the console
Expand source code
class Simulation():

    def __init__(self, simDefinitionFilePath=None, simDefinition=None, silent=False):
        '''
            Inputs:
                
                * simDefinitionFilePath:  (string) path to simulation definition file  
                * fW:                     (`MAPLEAF.IO.SimDefinition`) object that's already loaded and parsed the desired sim definition file  
                * silent:                 (bool) toggles optional outputs to the console  
        '''
        self.simDefinition = loadSimDefinition(simDefinitionFilePath, simDefinition, silent)
        ''' Instance of `MAPLEAF.IO.SimDefinition`. Defines the current simulation '''

        self.environment = Environment(self.simDefinition, silent=silent)
        ''' Instance of `MAPLEAF.ENV.Environment`. Will be shared by all Rockets created by this sim runner '''

        self.stagingIndex = None # Set in self.createRocket
        ''' (int) Set in `Simulation.createRocket`. Tracks how many stages have been dropped '''

        self.silent = silent
        ''' (bool) '''

        self.loggingLevel = int(self.simDefinition.getValue("SimControl.loggingLevel"))
        self.computeStageDropPaths = strtobool(self.simDefinition.getValue("SimControl.StageDropPaths.compute"))

    def run(self, rocket=None):
        ''' 
            Runs simulation defined by self.simDefinition (which has parsed a simulation definition file)

            Returns:
                * stageFlightsPaths: (list[`MAPLEAF.IO.RocketFlight.RocketFlight`]) Each RocketFlight object represents the flight path of a single stage
                * logFilePaths: (list[string]) list of paths to all log files created by this simulation
        '''
        simDefinition = self.simDefinition

        # Initialize the rocket + environment models and simulation logging
        if rocket == None:
            rocket = self.createRocket() # Initialize rocket on launch pad, with all stages attached
        self.rocketStages = [ rocket ] # In this array, 'stage' means independent rigid bodies. Stages are initialized as new rocket objects and added once they are dropped from the main rocket

        # Create progress bar if appropriate
        if simDefinition.getValue("SimControl.EndCondition") == "Time":
            endTime = float(simDefinition.getValue("SimControl.EndConditionValue"))
            progressBar = tqdm(total=endTime+0.01)
            
            try:
                self.logger.continueWritingToTerminal = False
            except AttributeError:
                pass # Logging not set up for this sim
        else:
            progressBar = None

        #### Main Loop Setup #### 
        self.dts = [ float(simDefinition.getValue("SimControl.timeStep")) ]    # (Initial) time step size
        self.endDetectors = [ self._getEndDetectorFunction(rocket, simDefinition) ] # List contains a boolean function that controls sim termination for each stage
        self.stageFlightPaths = [ self._setUpCachingForFlightAnimation(rocket) ] # List will contain resulting flight paths for each stage

        if(rocket.hardwareInTheLoopControl == "yes"):
            print("Setting up hardware in the loop interface")
            rocket.hilInterface.setupHIL(self.rocketStages[0].rigidBody.state)

        #### Main Loop ####
        stageIndex = 0
        while stageIndex < len(self.rocketStages):

            if stageIndex > 0:
                print("Computing stage {} drop path".format(stageIndex))

            rocket = self.rocketStages[stageIndex]
            endDetector = self.endDetectors[stageIndex]
            flight = self.stageFlightPaths[stageIndex]

            endSimulation, FinalTimeStepDt = endDetector(self.dts[stageIndex])
                
            while not endSimulation:
                ### Take a time step ###
                try:
                    if FinalTimeStepDt != None:
                        print("Simulation Runner overriding time step from {} to {} to accurately meet end condition".format(self.dts[stageIndex], FinalTimeStepDt))
                        self.dts[stageIndex] = FinalTimeStepDt

                    integrationResult = rocket.timeStep(self.dts[stageIndex])

                    if stageIndex == 0: # Currently, progress bar only works for bottom stage
                        try:
                            progressBar.update(integrationResult.dt)
                        except AttributeError:
                            pass
                except:
                    try:
                        progressBar.close()
                
                        if not self.silent:
                            sys.stdout.continueWritingToTerminal = True # sys.stdout is an instance of MAPLEAF.IO.Logging.Logger
                    except AttributeError:
                        pass

                    # Save simulation results and print out stack trace                    
                    self._handleSimulationCrash()

                # Adjust time step size for next iteration
                self.dts[stageIndex] = integrationResult.dt * integrationResult.timeStepAdaptationFactor

                # HIL
                if(rocket.hardwareInTheLoopControl == "yes"):
                    rocket.hilInterface.performHIL(rocket.rigidBody.state,rocket.rigidBody.time)

                # Cache states for flight animation
                self.cacheState(rocket, flight)

                # Check whether we should end the simulation, or take a modified-size final time step    
                endSimulation, FinalTimeStepDt = endDetector(self.dts[stageIndex])
            
            # Log last state (would be the starting state of the next time step)
            rocket._logState()

            # Move on to next (dropped) stage
            stageIndex += 1

            try:
                progressBar.close()
                
                if not self.silent:
                    sys.stdout.continueWritingToTerminal = True # Actually editing a MAPLEAF.IO.Logging.Logger object here
            except AttributeError:
                pass

        print("Simulation Complete")

        logFilePaths = self._postProcess(simDefinition)

        return self.stageFlightPaths, logFilePaths

    #### Pre-sim ####
    def createRocket(self, stage=None):
        ''' 
            Initializes a rocket, complete with an Environment object and logs, both owned by the instance of this class
            Returns an instance of Rocket with it's Environment/Logs initialized. Can be called by external classes to obtain a prepped rocket (used a lot this way in test cases).
        '''
        # Initialize Rocket
        rocketDictReader = SubDictReader("Rocket", self.simDefinition)  
        rocket = Rocket(rocketDictReader, silent=self.silent, stageToInitialize=stage, simRunner=self, environment=self.environment)       # Initialize Rocket

        if self.simDefinition.getValue('SimControl.RocketPlot') in [ 'On', 'on' ]:
            rocket.plotShape()  # Reference to this simRunner used to add to logs

        if stage == None:
            self._setUpConsoleLogging()
            self.stagingIndex = 0 # Initially zero, after dropping first stage: 1, after dropping second stage: 2, etc...
            
        return rocket

    def _setUpConsoleLogging(self):
        if self.loggingLevel > 0:
            # Set up logging so that the output of any print calls after this point is captured in mainSimulationLog
            self.consoleOutputLog = []
            if self.silent:
                self.logger = Logging.Logger(self.consoleOutputLog, continueWritingToTerminal=False)
            else:
                self.logger = Logging.Logger(self.consoleOutputLog)
            sys.stdout = self.logger
            
            # Output system info to console and to log
            Logging.getSystemInfo(printToConsole=True)
            # Output sim definition file and default value dict to the log only
            self.consoleOutputLog += Logging.getSimDefinitionAndDefaultValueDictsForOutput(simDefinition=self.simDefinition, printToConsole=False)

            # Output header for data outputted to the console during the simulation
            print("Starting Simulation:")
            print("Time(s) Altitude(m,ASL)")
                
        elif self.silent:
            # No intention of writing things to a log file, just prevent them from being printed to the terminal
            _ = []
            logger = Logging.Logger(_, continueWritingToTerminal=False)
            sys.stdout = logger

    def _getEndDetectorFunction(self, rocket, simConfig, droppedStage=False):
        ''' 
            Returns a function, which returns a boolean value and Union[None, float], indicating whether the 
                simulation endpoint has been reached. When close to the end of a sim, the second value returned is the recommended
                time step to take to hit the end criteria.
                Simulation end criteria defined in sim definition file. 
            Rocket object must be passed in because the end condition functions require a reference to the rocket, 
                so they can access its current altitude/velocity/time attributes
        '''
        # Read desired end criteria from simulation definition file
        if not droppedStage:
            # Get end condition for main stage
            endCondition = simConfig.getValue("SimControl.EndCondition")
            conditionValue = float(simConfig.getValue("SimControl.EndConditionValue"))
        else:
            # Get end condition for dropped stages
            endCondition = simConfig.getValue("SimControl.StageDropPaths.endCondition")
            conditionValue = float(simConfig.getValue("SimControl.StageDropPaths.endConditionValue"))

        # Define all possible end-detector functions
        def isAfterApogee(dt):
            return rocket.rigidBody.state.velocity.Z <= 0 and rocket.rigidBody.time > 1.0, None
        def isAboveAltitude(dt):
            return rocket.rigidBody.state.position.Z >= conditionValue, None
        def isBelowAltitude(dt):
            return rocket.environment.earthModel.getAltitude(*rocket.rigidBody.state.position) <= conditionValue, None
        def EndTimeReached(dt):
            currTime = rocket.rigidBody.time
            if currTime < conditionValue and currTime + dt >= conditionValue:
                return False, conditionValue+1e-14-currTime
            elif currTime >= conditionValue:
                return True, None
            else:
                return False, None

        # Return the desired function
        if endCondition == "Apogee":
            return isAfterApogee
        elif endCondition == "Altitude" and rocket.rigidBody.state.position.Z < conditionValue:
            return isAboveAltitude
        elif endCondition == "Altitude":
            return isBelowAltitude
        else:
            return EndTimeReached

    def _setUpCachingForFlightAnimation(self, rocket):
        flight = RocketFlight()
        flight.times.append(rocket.rigidBody.time)
        flight.rigidBodyStates.append(rocket.rigidBody.state)
        if rocket.controlSystem != None and rocket.controlSystem.controlledSystem != None: 
            # If rocket has moving fins, record their angles for plotting
            nActuators = len(rocket.controlSystem.controlledSystem.actuatorList)
            flight.actuatorDefls = [ [0] for i in range(nActuators) ]
            flight.actuatorTargetDefls = [ [0] for i in range(nActuators) ]
        else:
            flight.actuatorDefls = None
            flight.actuatorTargetDefls = None

        return flight

    #### During sim ####
    def cacheState(self, rocket: Rocket, flight: RocketFlight):
        ''' Adds the rocket's current state to the flight object '''
        time = rocket.rigidBody.time
        flight.times.append(time)
        flight.rigidBodyStates.append(rocket.rigidBody.state)
        if rocket.controlSystem != None:
            try:
                for a in range(len(rocket.controlSystem.controlledSystem.actuatorList)):
                    flight.actuatorDefls[a].append(rocket.controlSystem.controlledSystem.actuatorList[a].getDeflection(time))
                    flight.actuatorTargetDefls[a].append(rocket.controlSystem.controlledSystem.actuatorList[a].targetDeflection)
            except AttributeError:
                # Expecting to arrive here when timestepping a dropped stage of a controlled rocket, which doesn't have canards
                pass
                
    def createNewDetachedStage(self):
        ''' Called by Rocket._stageSeparation '''
        if self.computeStageDropPaths:
            newDetachedStage = self.createRocket(stage=self.stagingIndex)
            # Set kinematic properties to match those of the current top-most stage
            topStage = self.rocketStages[0]
            newDetachedStage.rigidBody.state = deepcopy(topStage.rigidBody.state)
            newDetachedStage.rigidBody.time = topStage.rigidBody.time
            self.rocketStages.append(newDetachedStage)

            # New sim termination condition function
            self.endDetectors.append(self._getEndDetectorFunction(newDetachedStage, self.simDefinition, droppedStage=True))
            self.dts.append(self.dts[0])
            
            # Duplicate existing flight object
            newFlightObject = deepcopy(self.stageFlightPaths[0]) # Will have had the same flight path as the top stage until the moment of separation
            newFlightObject.actuatorDefls = None # Dropped stage shouldn't have any canard deflections
            self.stageFlightPaths.append(newFlightObject)

            self.stagingIndex += 1

    def discardForceLogsForPreviousTimeStep(self, integrator):
        if self.loggingLevel >= 2:
            # Figure out how many times this integrator evaluates a function derivative (rocket forces in our case)
            if integrator.method == "RK12Adaptive":
                numDerivativeEvals = 2
            else:
                numDerivativeEvals = len(integrator.tableau)-1

            # Remove that number of rows from the end of the force evaluation log
            for i in range(numDerivativeEvals):
                for rocket in self.rocketStages:
                    rocket.derivativeEvaluationLog.deleteLastRow()

    def _handleSimulationCrash(self):
        ''' After a simulation crashes, tries to create log files and show plots anyways, before printing a stack trace '''
        print("ERROR: Simulation Crashed, Aborting")
        print("Attempting to save log files and show plots")
        self._postProcess(self.simDefinition)

        # Try to print out the stack trace
        print("Attempting to show stack trace")
        import traceback
        tb = traceback.format_exc()
        print(tb)

        print("Exiting")
        sys.exit()

    #### Post-sim ####
    def _postProcess(self, simDefinition):
        simDefinition.printDefaultValuesUsed() # Print these out before logging, to include them in the log

        # Log results
        logFilePaths = self._logSimulationResults(simDefinition)

        # Transfer key time info to flight objects from rocket
        for i in range(len(self.rocketStages)):
            self.stageFlightPaths[i].engineOffTime = self.rocketStages[i].engineShutOffTime
            self.stageFlightPaths[i].mainChuteDeployTime = self.rocketStages[i].mainChuteDeployTime
            self.stageFlightPaths[i].targetLocation = self.rocketStages[i].targetLocation

        # Plot results
        self._plotSimulationResults(self.rocketStages, simDefinition, self.stageFlightPaths, logFilePaths)

        # Print these out after logging to avoid including the log/plot keys in the unused keys
        # #TODO: Add exceptions for these keys, move this line to before logging so that it's output is also included in the simulation log
        simDefinition.printUnusedKeys()

        return logFilePaths

    def _logSimulationResults(self, simDefinition):
        ''' Logs simulation results to file (as/if specified in sim definition) '''
        logFilePaths = None
        if self.loggingLevel > 0:
            logFilePaths = []

            # Create a new folder for the results of the current simulation
            periodIndex = simDefinition.fileName.rfind('.')
            resultsFolderBaseName = simDefinition.fileName[:periodIndex] + "_Run"

            def tryCreateResultsFolder(resultsFolderBaseName):
                resultsFolderName = Logging.findNextAvailableNumberedFileName(fileBaseName=resultsFolderBaseName, extension="")
                
                try:
                    os.mkdir(resultsFolderName)
                    return resultsFolderName

                except FileExistsError:
                    # End up here if another process created the same results folder 
                        # (other thread runs os.mkdir b/w when this thread runs findNextAvailableNumberedFileName and os.mkdir)
                        # Should only happen during parallel runs
                    return ""

            createdResultsFolder = tryCreateResultsFolder(resultsFolderBaseName)
            iterations = 0
            while createdResultsFolder == "" and iterations < 50:
                createdResultsFolder = tryCreateResultsFolder(resultsFolderBaseName)
                iterations += 1

            if iterations == 50:
                raise ValueError("Repeated error (50x): unable to create a results folder: {}.".format(resultsFolderBaseName))

            # Write logs to file
            for rocket in self.rocketStages:
                logFilePaths += rocket.writeLogsToFile(createdResultsFolder)            

            # Output console output
            consoleOutputPath = os.path.join(createdResultsFolder, "consoleOutput.txt")
            print("Writing log file: {}".format(consoleOutputPath))
            with open(consoleOutputPath, 'w+') as file:
                file.writelines(self.consoleOutputLog)

        return logFilePaths

    def _plotSimulationResults(self, rocketStages, simDefinition, flights, logFilePaths):
        ''' Plot simulation results (as/if specified in sim definition) '''

        plotsToMake = simDefinition.getValue("SimControl.plot").split()

        if plotsToMake != ["None"]:

            if "FlightAnimation" in plotsToMake:
                print("Showing flight animation")

                # Show animation
                Plotting.flightAnimation(flights)

                # Done, remove from plotsToMake
                plotsToMake.remove("FlightAnimation")

            if "FlightPaths" in plotsToMake:
                earthModel = self.simDefinition.getValue("Environment.EarthModel")
                if earthModel in [ "None", "Flat" ]:
                    Plotting.plotFlightPaths_NoEarth(flights)
                else:
                    Plotting.plotFlightPaths_FullEarth(flights)

                plotsToMake.remove("FlightPaths")

            # Plot all other columns from log files
            for plotDefinitionString in plotsToMake:
                Plotting.plotFromLogFiles(logFilePaths, plotDefinitionString)

Subclasses

Instance variables

var environment

Instance of Environment. Will be shared by all Rockets created by this sim runner

var silent

(bool)

var simDefinition

Instance of SimDefinition. Defines the current simulation

var stagingIndex

(int) Set in Simulation.createRocket(). Tracks how many stages have been dropped

Methods

def cacheState(self, rocket: Rocket, flight: RocketFlight)

Adds the rocket's current state to the flight object

Expand source code
def cacheState(self, rocket: Rocket, flight: RocketFlight):
    ''' Adds the rocket's current state to the flight object '''
    time = rocket.rigidBody.time
    flight.times.append(time)
    flight.rigidBodyStates.append(rocket.rigidBody.state)
    if rocket.controlSystem != None:
        try:
            for a in range(len(rocket.controlSystem.controlledSystem.actuatorList)):
                flight.actuatorDefls[a].append(rocket.controlSystem.controlledSystem.actuatorList[a].getDeflection(time))
                flight.actuatorTargetDefls[a].append(rocket.controlSystem.controlledSystem.actuatorList[a].targetDeflection)
        except AttributeError:
            # Expecting to arrive here when timestepping a dropped stage of a controlled rocket, which doesn't have canards
            pass
def createNewDetachedStage(self)

Called by Rocket._stageSeparation

Expand source code
def createNewDetachedStage(self):
    ''' Called by Rocket._stageSeparation '''
    if self.computeStageDropPaths:
        newDetachedStage = self.createRocket(stage=self.stagingIndex)
        # Set kinematic properties to match those of the current top-most stage
        topStage = self.rocketStages[0]
        newDetachedStage.rigidBody.state = deepcopy(topStage.rigidBody.state)
        newDetachedStage.rigidBody.time = topStage.rigidBody.time
        self.rocketStages.append(newDetachedStage)

        # New sim termination condition function
        self.endDetectors.append(self._getEndDetectorFunction(newDetachedStage, self.simDefinition, droppedStage=True))
        self.dts.append(self.dts[0])
        
        # Duplicate existing flight object
        newFlightObject = deepcopy(self.stageFlightPaths[0]) # Will have had the same flight path as the top stage until the moment of separation
        newFlightObject.actuatorDefls = None # Dropped stage shouldn't have any canard deflections
        self.stageFlightPaths.append(newFlightObject)

        self.stagingIndex += 1
def createRocket(self, stage=None)

Initializes a rocket, complete with an Environment object and logs, both owned by the instance of this class Returns an instance of Rocket with it's Environment/Logs initialized. Can be called by external classes to obtain a prepped rocket (used a lot this way in test cases).

Expand source code
def createRocket(self, stage=None):
    ''' 
        Initializes a rocket, complete with an Environment object and logs, both owned by the instance of this class
        Returns an instance of Rocket with it's Environment/Logs initialized. Can be called by external classes to obtain a prepped rocket (used a lot this way in test cases).
    '''
    # Initialize Rocket
    rocketDictReader = SubDictReader("Rocket", self.simDefinition)  
    rocket = Rocket(rocketDictReader, silent=self.silent, stageToInitialize=stage, simRunner=self, environment=self.environment)       # Initialize Rocket

    if self.simDefinition.getValue('SimControl.RocketPlot') in [ 'On', 'on' ]:
        rocket.plotShape()  # Reference to this simRunner used to add to logs

    if stage == None:
        self._setUpConsoleLogging()
        self.stagingIndex = 0 # Initially zero, after dropping first stage: 1, after dropping second stage: 2, etc...
        
    return rocket
def discardForceLogsForPreviousTimeStep(self, integrator)
Expand source code
def discardForceLogsForPreviousTimeStep(self, integrator):
    if self.loggingLevel >= 2:
        # Figure out how many times this integrator evaluates a function derivative (rocket forces in our case)
        if integrator.method == "RK12Adaptive":
            numDerivativeEvals = 2
        else:
            numDerivativeEvals = len(integrator.tableau)-1

        # Remove that number of rows from the end of the force evaluation log
        for i in range(numDerivativeEvals):
            for rocket in self.rocketStages:
                rocket.derivativeEvaluationLog.deleteLastRow()
def run(self, rocket=None)

Runs simulation defined by self.simDefinition (which has parsed a simulation definition file)

Returns

  • stageFlightsPaths: (list[MAPLEAF.IO.RocketFlight.RocketFlight]) Each RocketFlight object represents the flight path of a single stage
  • logFilePaths: (list[string]) list of paths to all log files created by this simulation
Expand source code
def run(self, rocket=None):
    ''' 
        Runs simulation defined by self.simDefinition (which has parsed a simulation definition file)

        Returns:
            * stageFlightsPaths: (list[`MAPLEAF.IO.RocketFlight.RocketFlight`]) Each RocketFlight object represents the flight path of a single stage
            * logFilePaths: (list[string]) list of paths to all log files created by this simulation
    '''
    simDefinition = self.simDefinition

    # Initialize the rocket + environment models and simulation logging
    if rocket == None:
        rocket = self.createRocket() # Initialize rocket on launch pad, with all stages attached
    self.rocketStages = [ rocket ] # In this array, 'stage' means independent rigid bodies. Stages are initialized as new rocket objects and added once they are dropped from the main rocket

    # Create progress bar if appropriate
    if simDefinition.getValue("SimControl.EndCondition") == "Time":
        endTime = float(simDefinition.getValue("SimControl.EndConditionValue"))
        progressBar = tqdm(total=endTime+0.01)
        
        try:
            self.logger.continueWritingToTerminal = False
        except AttributeError:
            pass # Logging not set up for this sim
    else:
        progressBar = None

    #### Main Loop Setup #### 
    self.dts = [ float(simDefinition.getValue("SimControl.timeStep")) ]    # (Initial) time step size
    self.endDetectors = [ self._getEndDetectorFunction(rocket, simDefinition) ] # List contains a boolean function that controls sim termination for each stage
    self.stageFlightPaths = [ self._setUpCachingForFlightAnimation(rocket) ] # List will contain resulting flight paths for each stage

    if(rocket.hardwareInTheLoopControl == "yes"):
        print("Setting up hardware in the loop interface")
        rocket.hilInterface.setupHIL(self.rocketStages[0].rigidBody.state)

    #### Main Loop ####
    stageIndex = 0
    while stageIndex < len(self.rocketStages):

        if stageIndex > 0:
            print("Computing stage {} drop path".format(stageIndex))

        rocket = self.rocketStages[stageIndex]
        endDetector = self.endDetectors[stageIndex]
        flight = self.stageFlightPaths[stageIndex]

        endSimulation, FinalTimeStepDt = endDetector(self.dts[stageIndex])
            
        while not endSimulation:
            ### Take a time step ###
            try:
                if FinalTimeStepDt != None:
                    print("Simulation Runner overriding time step from {} to {} to accurately meet end condition".format(self.dts[stageIndex], FinalTimeStepDt))
                    self.dts[stageIndex] = FinalTimeStepDt

                integrationResult = rocket.timeStep(self.dts[stageIndex])

                if stageIndex == 0: # Currently, progress bar only works for bottom stage
                    try:
                        progressBar.update(integrationResult.dt)
                    except AttributeError:
                        pass
            except:
                try:
                    progressBar.close()
            
                    if not self.silent:
                        sys.stdout.continueWritingToTerminal = True # sys.stdout is an instance of MAPLEAF.IO.Logging.Logger
                except AttributeError:
                    pass

                # Save simulation results and print out stack trace                    
                self._handleSimulationCrash()

            # Adjust time step size for next iteration
            self.dts[stageIndex] = integrationResult.dt * integrationResult.timeStepAdaptationFactor

            # HIL
            if(rocket.hardwareInTheLoopControl == "yes"):
                rocket.hilInterface.performHIL(rocket.rigidBody.state,rocket.rigidBody.time)

            # Cache states for flight animation
            self.cacheState(rocket, flight)

            # Check whether we should end the simulation, or take a modified-size final time step    
            endSimulation, FinalTimeStepDt = endDetector(self.dts[stageIndex])
        
        # Log last state (would be the starting state of the next time step)
        rocket._logState()

        # Move on to next (dropped) stage
        stageIndex += 1

        try:
            progressBar.close()
            
            if not self.silent:
                sys.stdout.continueWritingToTerminal = True # Actually editing a MAPLEAF.IO.Logging.Logger object here
        except AttributeError:
            pass

    print("Simulation Complete")

    logFilePaths = self._postProcess(simDefinition)

    return self.stageFlightPaths, logFilePaths
class WindTunnelSimulation (parametersToSweep=None, parameterValues=None, simDefinitionFilePath=None, simDefinition=None, silent=False, smoothLine='False')

Inputs

  • simDefinitionFilePath: (string) path to simulation definition file
  • fW: (SimDefinition) object that's already loaded and parsed the desired sim definition file
  • silent: (bool) toggles optional outputs to the console
Expand source code
class WindTunnelSimulation(Simulation):
    def __init__(self, parametersToSweep=None, parameterValues=None, simDefinitionFilePath=None, simDefinition=None, silent=False, smoothLine='False'):
        self.parametersToSweep = ["Rocket.velocity"] if (parametersToSweep == None) else parametersToSweep
        self.parameterValues = [["(0 0 100)", "(0 0 200)", "(0 0 300)"]] if (parameterValues == None) else parameterValues
        self.smoothLine = smoothLine

        # Error checks
        if len(self.parametersToSweep) != len(self.parameterValues):
            raise ValueError("Must have a list of values for each parameter to sweep over. Currently have {} parameters, and {} lists of values.".format(len(self.parametersToSweep), len(self.parameterValues)))

        paramValueCounts = [ len(x) for x in self.parameterValues ]
        if not all([ (x == paramValueCounts[0]) for x in paramValueCounts ]):
            raise ValueError("All lists of parameter values must be of equal length. Currently, list lengths are: {}".format(paramValueCounts))

        Simulation.__init__(self, simDefinitionFilePath, simDefinition, silent)

    def runSweep(self):
        '''
            Runs a single force evaluation for each value of self.parameterToSweekKey contained in self.parameterValueList
            Returns:
                List of log file paths. Main Sim Log will be empty. Force eval log and expanded force eval log will have one entry per sweep point
        '''
        # Make sure we're logging forces (the only way to get data out of this sim runner) and avoid producing any plots
        self.simDefinition.setValue("SimControl.loggingLevel", "3")
        self.simDefinition.setValue("SimControl.plot", "None") 
        self.simDefinition.setValue("SimControl.RocketPlot", "Off")

        if strtobool(self.smoothLine): # interpolate between user set parameter values
            self._addPoints() 

        for i in range(len(self.parameterValues[0])): # i corresponds to # of conditions, ie how many times parameter values will be changed (velocity1, velocity2, ...)
            for j in range(len(self.parameterValues)): # j'th parameter (velocity, temperature)
                self.simDefinition.setValue(self.parametersToSweep[j], self.parameterValues[j][i])

            # Run a single force evaluation, which creates a forces log entry for this force evaluation
            rocket = self.createRocket()

            # Track all derivative evaluations in a single log
            if i == 0:
                log = rocket.derivativeEvaluationLog
            else:
                rocket.derivativeEvaluationLog = log

            self.rocketStages = [ rocket ]
            rocket._getAppliedForce(0.0, rocket.rigidBody.state)

        # Write Logs to file, return path
        return self._postProcess()

    def _addPoints(self, pointMultiple=10):
        ''' Edits the parameter sweeps to include a multiple of the previous number of points, linearly interpolated between the given values '''
        for i in range(len(self.parameterValues[0]) - 1): # i corresponds to # of tests to run
            for interpPt in range(1, pointMultiple): # Loops over each new point
                
                intervalStartIndex = pointMultiple*i
                # Index at which to add new point (also interval end index)
                newPointIndex = pointMultiple*i + interpPt

                for param in range(len(self.parametersToSweep)): # j'th corresponds to parameters to sweep over (velocity, temperature)
                    try:
                        # Vector sweep
                        first = Vector(self.parameterValues[param][intervalStartIndex])
                        second = Vector(self.parameterValues[param][newPointIndex])
                        change = second - first
                        incrementalValue = str(interpPt/pointMultiple*change + first)
                    except ValueError:
                        # Scalar sweep
                        first = float(self.parameterValues[param][intervalStartIndex])
                        second = float(self.parameterValues[param][newPointIndex])
                        change = second - first
                        incrementalValue = str(interpPt/pointMultiple*change + first)
                    
                    self.parameterValues[param].insert(newPointIndex, incrementalValue)

    def createRocket(self):
        ''' 
            Do all the same stuff as the parent object, but also re-initialize the environment, 
                to make sure changes to environmental properties during the parameter sweep take effect 
        '''
        self.environment = Environment(self.simDefinition, silent=self.silent)
        return super().createRocket()

    def _setUpLogging(self):
        ''' Override to ensure that logs aren't re-initialized for every simulation.
            mainSimulationLog will only be absent the first time this function is run
            Want to keep all the force data in a single log file '''
        if not hasattr(self, 'consoleOutputLog'):
            return super()._setUpConsoleLogging()

    def _postProcess(self):
        ''' Creates an empty flight path object to prevent errors in the parent function, which is still run to create log files.
            Removes mainSimLog from (returned) log file paths since no time steps we taken by this sim '''
        # Create an empty flight path to prevent errors in the parent function)
        self.stageFlightPaths = [ RocketFlight() ]
        return Simulation._postProcess(self, self.simDefinition)

Ancestors

Methods

def createRocket(self)

Do all the same stuff as the parent object, but also re-initialize the environment, to make sure changes to environmental properties during the parameter sweep take effect

Expand source code
def createRocket(self):
    ''' 
        Do all the same stuff as the parent object, but also re-initialize the environment, 
            to make sure changes to environmental properties during the parameter sweep take effect 
    '''
    self.environment = Environment(self.simDefinition, silent=self.silent)
    return super().createRocket()
def runSweep(self)

Runs a single force evaluation for each value of self.parameterToSweekKey contained in self.parameterValueList

Returns

List of log file paths. Main Sim Log will be empty. Force eval log and expanded force eval log will have one entry per sweep point

Expand source code
def runSweep(self):
    '''
        Runs a single force evaluation for each value of self.parameterToSweekKey contained in self.parameterValueList
        Returns:
            List of log file paths. Main Sim Log will be empty. Force eval log and expanded force eval log will have one entry per sweep point
    '''
    # Make sure we're logging forces (the only way to get data out of this sim runner) and avoid producing any plots
    self.simDefinition.setValue("SimControl.loggingLevel", "3")
    self.simDefinition.setValue("SimControl.plot", "None") 
    self.simDefinition.setValue("SimControl.RocketPlot", "Off")

    if strtobool(self.smoothLine): # interpolate between user set parameter values
        self._addPoints() 

    for i in range(len(self.parameterValues[0])): # i corresponds to # of conditions, ie how many times parameter values will be changed (velocity1, velocity2, ...)
        for j in range(len(self.parameterValues)): # j'th parameter (velocity, temperature)
            self.simDefinition.setValue(self.parametersToSweep[j], self.parameterValues[j][i])

        # Run a single force evaluation, which creates a forces log entry for this force evaluation
        rocket = self.createRocket()

        # Track all derivative evaluations in a single log
        if i == 0:
            log = rocket.derivativeEvaluationLog
        else:
            rocket.derivativeEvaluationLog = log

        self.rocketStages = [ rocket ]
        rocket._getAppliedForce(0.0, rocket.rigidBody.state)

    # Write Logs to file, return path
    return self._postProcess()

Inherited members