Module MAPLEAF.SimulationRunners
Defines functions and classes that manage simulations.
Includes some (Simulation
) that run a single simulation, and others (OptimizingSimRunner
) that run many simulations at once.
Expand source code
'''
Defines functions and classes that manage simulations.
Includes some (`Simulation`) that run a single simulation, and others (`OptimizingSimRunner`) that run many simulations at once.
.. image:: https://storage.needpix.com/rsynced_images/important-1705212_1280.png
'''
# Make the classes in all submodules importable directly from MAPLEAF.Rocket
from .SingleSimulations import *
from .MonteCarlo import *
from .Convergence import *
from .Optimization import *
from .Batch import *
# For some reason CythonVector and company don't exist down here, so they won't import when running from MAPLEAF.Motion import *
subModules = [ SingleSimulations, MonteCarlo, Convergence, Optimization, Batch ]
__all__ = [ ]
for subModule in subModules:
__all__ += subModule.__all__
Sub-modules
MAPLEAF.SimulationRunners.Batch
-
Script to run a batch of simulations, defined in a batch definition file. Can be run directly from the command line. Accessible as
mapleaf-batch
if … MAPLEAF.SimulationRunners.Convergence
MAPLEAF.SimulationRunners.MonteCarlo
MAPLEAF.SimulationRunners.Optimization
MAPLEAF.SimulationRunners.SingleSimulations
Functions
def loadSimDefinition(simDefinitionFilePath=None, simDefinition=None, silent=False)
-
Loads a simulation definition file into a
SimDefinition
object - accepts either a file path or aSimDefinition
object as inputExpand source code
def loadSimDefinition(simDefinitionFilePath=None, simDefinition=None, silent=False): ''' Loads a simulation definition file into a `MAPLEAF.IO.SimDefinition` object - accepts either a file path or a `MAPLEAF.IO.SimDefinition` object as input ''' if simDefinition == None and simDefinitionFilePath != None: return SimDefinition(simDefinitionFilePath, silent=silent) # Parse simulation definition file elif simDefinition != None: return simDefinition # Use the SimDefinition that was passed in else: raise ValueError(""" Insufficient information to initialize a Simulation. Please provide either simDefinitionFilePath (string) or fW (SimDefinition), which has been created from the desired Sim Definition file. If both are provided, the SimDefinition is used.""")
def main(argv=None)
-
Expand source code
def main(argv=None): # Parse command line arguments parser = _buildParser() args = parser.parse_args(argv) # Load definition file from MAPLEAF.Main import findSimDefinitionFile # Delayed import here to avoid circular imports batchDefinitionPath = findSimDefinitionFile(args.batchDefinitionFile) batchDefinition = SimDefinition(batchDefinitionPath, defaultDict={}, silent=True) include = args.include[0] if len(args.include) > 0 else None exclude = args.exclude[0] if len(args.exclude) > 0 else None validate = args.validate[0] if len(args.validate) > 0 else None # Create batch run object containing settings and results batchRun = BatchRun(batchDefinition, args.recordAll, args.printStackTraces, include, exclude, resultToValidate=validate) # Run Cases return run(batchRun)
def optimizationRunnerFactory(simDefinitionFilePath=None, simDefinition=None, optimizationReader=None, silent=False, parallel=False) ‑> MAPLEAF.SimulationRunners.Optimization.OptimizingSimRunner
-
Provide a subdictionary reader pointed at an optimization dictionary. Will read the dictionary, initialize an optimizing simulation runner and return it.
Expand source code
def optimizationRunnerFactory(simDefinitionFilePath=None, simDefinition=None, optimizationReader=None, silent=False, parallel=False) -> OptimizingSimRunner: ''' Provide a subdictionary reader pointed at an optimization dictionary. Will read the dictionary, initialize an optimizing simulation runner and return it. ''' if simDefinition != None or simDefinitionFilePath != None: simDefinition = loadSimDefinition(simDefinitionFilePath, simDefinition, silent) optimizationReader = SubDictReader('Optimization', simDefinition) # Ensure no output is produced during each simulation (cost function evaluation) simDefinition.setValue("SimControl.plot", "None") simDefinition.setValue("SimControl.RocketPlot", "Off") else: if optimizationReader == None: raise ValueError('subDictReader not initialized for a nested optimization') method = optimizationReader.tryGetString('method', defaultValue='PSO') if method == 'PSO': return PSORunner(optimizationReader, silent=silent, parallel=parallel) elif 'scipy.optimize.minimize' in method: return ScipyMinimizeRunner(optimizationReader, silent=silent, parallel=parallel) else: raise ValueError('Optimization method: {} not implemented, try PSO'.format(method))
def run(batchRun: BatchRun) ‑> int
-
Given a batchRun object (of type
BatchRun
), will run all of its test cases, and print a summary of the resultsExpand source code
def run(batchRun: BatchRun) -> int: ''' Given a batchRun object (of type `BatchRun`), will run all of its test cases, and print a summary of the results ''' # Track how long running cases takes startTime = time.time() # Get all the regression test cases testCases = batchRun.getCasesToRun() # Run them for case in testCases: caseResult = _runCase(case, batchRun) batchRun.casesRun.append(caseResult) # Print summary runTime = time.time() - startTime return batchRun.printResult(runTime) # Returns 0 or 1, suitable for the command line
def runMonteCarloSimulation(simDefinitionFilePath=None, simDefinition=None, silent=False, nCores=1)
-
Expand source code
def runMonteCarloSimulation(simDefinitionFilePath=None, simDefinition=None, silent=False, nCores=1): simDefinition = loadSimDefinition(simDefinitionFilePath, simDefinition, silent) nRuns, mCLogger, outputLists = _prepSim(simDefinition) if nCores > 1: _runSimulations_Parallel(simDefinition, nRuns, outputLists, silent, nCores) else: _runSimulations_SingleThreaded(simDefinition, nRuns, outputLists, mCLogger, silent) _showResults(simDefinition, outputLists, mCLogger)
def runSimulation(simDefinitionFilePath=None, simDefinition=None, silent=False)
-
Expand source code
def runSimulation(simDefinitionFilePath=None, simDefinition=None, silent=False): sim = Simulation(simDefinitionFilePath, simDefinition, silent) return sim.run()
Classes
class BatchRun (batchDefinition: SimDefinition, recordAll=False, printStackTraces=False, include=None, exclude=None, percentErrorTolerance=0.2, absoluteErrorTolerance=1e-09, resultToValidate=None)
-
Class to hold info about and results of a mapleaf-batch run
Expand source code
class BatchRun(): ''' Class to hold info about and results of a mapleaf-batch run ''' def __init__(self, batchDefinition: SimDefinition, recordAll=False, printStackTraces=False, include=None, exclude=None, percentErrorTolerance=0.2, absoluteErrorTolerance=1e-9, resultToValidate=None ): self.batchDefinition = batchDefinition self.recordAll = recordAll self.printStackTraces = printStackTraces self.include = include self.exclude = exclude self.casesRun = [] self.nComparisonSets = 0 self.casesWithNewRecordedResults = set() self.warningCount = 0 self.percentErrorTolerance = percentErrorTolerance self.absoluteErrorTolerance = absoluteErrorTolerance self.validationErrors = [] self.validationDataUsed = [] self.resultToValidate = resultToValidate def getCasesToRun(self): subDicts = self.batchDefinition.getImmediateSubDicts("") if self.include == None and self.exclude == None: # Run all cases return subDicts else: # Only run cases that include the include string AND do not contain the exclude string casesToRun = [] for caseDictName in subDicts: if (self.include == None or self.include in caseDictName) and (self.exclude == None or self.exclude not in caseDictName): casesToRun.append(caseDictName) return casesToRun def printResult(self, timeToRun=None) -> int: """ Outputs result summary """ # Count number of cases failed casesFailed = [] nTestsFailed = 0 nTestsPassed = 0 for result in self.casesRun: if result.testsFailed > 0 or result.totalErrors: casesFailed.append(result.name) nTestsPassed += result.testsPassed nTestsFailed += result.testsFailed nCases = len(self.casesRun) nCasesFailed = len(casesFailed) nCasesPassed = nCases - nCasesFailed nTests = nTestsFailed + nTestsPassed print("\n----------------------------------------------------------------------") print("BATCH RUN RESULTS") if timeToRun != None: print("Ran {} Case(s) in {:>.2f} s".format(nCases, timeToRun)) else: print("Ran {} Case(s)".format(nCases)) if self.resultToValidate != None: if len(self.validationErrors) > 0: print("\nValidation Results for {}:".format(self.resultToValidate)) print("Average disagreement with validation data across {} validation data sets: {:2.2f}%".format( len(self.validationDataUsed), mean(self.validationErrors))) print("Average magnitude of disgreement with validation data across {} validation data sets: {:2.2f}%".format( len(self.validationDataUsed), mean([abs(error) for error in self.validationErrors]))) print("Data Sets Used:") for (dataSet, avgError) in zip(self.validationDataUsed, self.validationErrors): print("{}: {:2.2f}%".format(dataSet, avgError)) print("") else: self.warning("\nERROR: No comparison/validation data for {} found. Make sure there is a plot of {} and some comparison data, and that {} is included in the name of those plotting dictionaries\n".format(self.resultToValidate, self.resultToValidate, self.resultToValidate)) if self.warningCount > 0: print("Errors/Warnings: {}".format(self.warningCount)) if len(self.casesWithNewRecordedResults) > 0: recordedCaseList = ", ".join(self.casesWithNewRecordedResults) print("New expected results were recorded for the following cases: {}".format(recordedCaseList)) _writeModifiedTestDefinitionFile(self.batchDefinition) if nCasesFailed == 0: print("{} Case(s) ok".format(nCases)) print("") if self.warningCount == 0: print("OK") else: print("WARNING") return 0 else: print("{}/{} Case(s) Failed, {}/{} Parameter Comparison(s) Failed".format(nCasesFailed, nCases, nTestsFailed, nTests)) print("") print("Failed Cases:") for case in casesFailed: print(case) print("") print("FAIL") return 1 def warning(self, msg: str): ''' Currently, warnings are used when errors occur in processes not directly related to MAPLEAF simulations, like loading comparison data ''' self.warningCount +=1 print(msg)
Methods
def getCasesToRun(self)
-
Expand source code
def getCasesToRun(self): subDicts = self.batchDefinition.getImmediateSubDicts("") if self.include == None and self.exclude == None: # Run all cases return subDicts else: # Only run cases that include the include string AND do not contain the exclude string casesToRun = [] for caseDictName in subDicts: if (self.include == None or self.include in caseDictName) and (self.exclude == None or self.exclude not in caseDictName): casesToRun.append(caseDictName) return casesToRun
def printResult(self, timeToRun=None) ‑> int
-
Outputs result summary
Expand source code
def printResult(self, timeToRun=None) -> int: """ Outputs result summary """ # Count number of cases failed casesFailed = [] nTestsFailed = 0 nTestsPassed = 0 for result in self.casesRun: if result.testsFailed > 0 or result.totalErrors: casesFailed.append(result.name) nTestsPassed += result.testsPassed nTestsFailed += result.testsFailed nCases = len(self.casesRun) nCasesFailed = len(casesFailed) nCasesPassed = nCases - nCasesFailed nTests = nTestsFailed + nTestsPassed print("\n----------------------------------------------------------------------") print("BATCH RUN RESULTS") if timeToRun != None: print("Ran {} Case(s) in {:>.2f} s".format(nCases, timeToRun)) else: print("Ran {} Case(s)".format(nCases)) if self.resultToValidate != None: if len(self.validationErrors) > 0: print("\nValidation Results for {}:".format(self.resultToValidate)) print("Average disagreement with validation data across {} validation data sets: {:2.2f}%".format( len(self.validationDataUsed), mean(self.validationErrors))) print("Average magnitude of disgreement with validation data across {} validation data sets: {:2.2f}%".format( len(self.validationDataUsed), mean([abs(error) for error in self.validationErrors]))) print("Data Sets Used:") for (dataSet, avgError) in zip(self.validationDataUsed, self.validationErrors): print("{}: {:2.2f}%".format(dataSet, avgError)) print("") else: self.warning("\nERROR: No comparison/validation data for {} found. Make sure there is a plot of {} and some comparison data, and that {} is included in the name of those plotting dictionaries\n".format(self.resultToValidate, self.resultToValidate, self.resultToValidate)) if self.warningCount > 0: print("Errors/Warnings: {}".format(self.warningCount)) if len(self.casesWithNewRecordedResults) > 0: recordedCaseList = ", ".join(self.casesWithNewRecordedResults) print("New expected results were recorded for the following cases: {}".format(recordedCaseList)) _writeModifiedTestDefinitionFile(self.batchDefinition) if nCasesFailed == 0: print("{} Case(s) ok".format(nCases)) print("") if self.warningCount == 0: print("OK") else: print("WARNING") return 0 else: print("{}/{} Case(s) Failed, {}/{} Parameter Comparison(s) Failed".format(nCasesFailed, nCases, nTestsFailed, nTests)) print("") print("Failed Cases:") for case in casesFailed: print(case) print("") print("FAIL") return 1
def warning(self, msg: str)
-
Currently, warnings are used when errors occur in processes not directly related to MAPLEAF simulations, like loading comparison data
Expand source code
def warning(self, msg: str): ''' Currently, warnings are used when errors occur in processes not directly related to MAPLEAF simulations, like loading comparison data ''' self.warningCount +=1 print(msg)
class ConvergenceSimRunner (simDefinitionFilePath=None, simDefinition=None, silent=False)
-
Runs a simulation repeatedly, decreasing the time step or target error each time, monitoring for convergence
Inputs
- simDefinitionFilePath: (string) path to simulation definition file
- fW:
(
SimDefinition
) object that's already loaded and parsed the desired sim definition file - silent: (bool) toggles optional outputs to the console
Expand source code
class ConvergenceSimRunner(Simulation): ''' Runs a simulation repeatedly, decreasing the time step or target error each time, monitoring for convergence ''' def __init__(self, simDefinitionFilePath=None, simDefinition=None, silent=False): Simulation.__init__(self, simDefinitionFilePath=simDefinitionFilePath, simDefinition=simDefinition, silent=silent) def convergeSimEndPosition(self, refinementRatio=2, simLimit=10, plot=True, stopAtConvergence=False, showPlot=True, plotLineLabel="Simulations", ax1=None, ax2=None): ''' Takes simulation and runs it repeatedly, cutting the time step in half each time. Once convergence is approximately asymptotic, exits and returns series of final positions, convergence order, and extrapolated final position Should use with simulations that have an EndCondition of type "Time" # Otherwise sim will be run using current settings, and its endtime will be taken as the new end time for future convergence sims This Fn called by compareIntegrationSchemes functions Parameters: simConfigFilePath string, /path/to/simConfigFile fW SimDefinition, overrides simConfigFilePath refinementRatio Number, Each time sim is run, time step or target error is divided by this number simLimit Number, Max number of simulations to run (takes exponentially more time to run more simulations) plot True/False, whether to plot the results stopAtConvergence True/False, if False, runs simLimit simulations even if asymptotic convergence is reached earlier showPlot True/False, if True, calls plt.show() plotLineLabel string, Label of line on plot ax1 matplotlib Axes, Z-location (Y) axis ax2 matplotlib Axes, Wall Time (Y) axis (2nd Y-axis) ''' from MAPLEAF.IO.gridConvergenceFunctions import checkConvergence from statistics import mean import time self._setUpConfigFileForConvergenceRun() timeStepMethod = self.simDefinition.getValue("SimControl.timeDiscretization") adaptiveTimeStepping = "Adaptive" in timeStepMethod timeStepKey = "SimControl.timeStep" targetErrorKey = "SimControl.TimeStepAdaptation.targetError" #### Run Simulations #### print("Starting convergence simulations") if not adaptiveTimeStepping: timeStep = float(self.simDefinition.getValue(timeStepKey))*refinementRatio # Multiplied by 2 to give correct time step in first iteration else: targetError = float(self.simDefinition.getValue(targetErrorKey))*refinementRatio # Multiplied by 2 to give correct time step in first iteration simCount = 1 finalPositionHistory = [] convergenceHistory = [] timeStepHistory = [] simTimeHistory = [] def printConvergenceHistory(ax1=ax1, ax2=ax2): print("") print("Convergence History:") print("Integration Method: {}".format(timeStepMethod)) xPos = [] yPos = [] zPos = [] for i in range(len(finalPositionHistory)): finalPos = finalPositionHistory[i] xPos.append(finalPos[0]) yPos.append(finalPos[1]) zPos.append(finalPos[2]) printString = "FinalPosition(m): {:>7.3f} WallTime(s): {:>7.3f} ".format(finalPos, simTimeHistory[i]) if i > 1: # TODO: Get convergence results into the .csv file ordersOfConvergence, GCI12s, GCI23s, asymptoticChecks, richardsonExtrapVals, uncertainties = convergenceHistory[i-2] printString += " Avg Order: {:>4.2f}, Avg Asymptotic Check: {:>6.3f}".format(mean(ordersOfConvergence), mean(asymptoticChecks)) print(printString) if plot: if ax1 == None: ax1 = plt.gca() if ax2 == None: ax2 = ax1.twinx() ax1.plot(timeStepHistory, zPos, ":D", label=plotLineLabel) ax1.set_ylabel("Final Z Coordinate (m)") ax2.plot(timeStepHistory, simTimeHistory, "-*", label=plotLineLabel + " Wall Time") ax2.set_ylabel("Wall Time (s)") plt.xscale("log") plt.xlabel("Time Step (s)") plt.legend() plt.tight_layout() if showPlot: plt.show() while simCount <= simLimit: if not adaptiveTimeStepping: timeStep /= refinementRatio self.simDefinition.setValue(timeStepKey, str(timeStep)) timeStepHistory.append(timeStep) print("Simulation {}, Time step: {}".format(simCount, timeStep)) else: targetError /= refinementRatio self.simDefinition.setValue(targetErrorKey, str(targetError)) timeStepHistory.append(targetError) print("Simulation {}, Time step: {}".format(simCount, targetError)) startTime = time.time() flights, _ = self.run() flight = flights[0] wallTime = time.time() - startTime simTimeHistory.append(wallTime) finalPositionHistory.append(flight.rigidBodyStates[-1].position) print("Final Position: {:1.3f}".format(finalPositionHistory[-1])) if len(finalPositionHistory) >= 3: # Check whether result is converging cV, mV, fV = finalPositionHistory[-3:] print("Checking convergence") convergResult = checkConvergence(cV, mV, fV, refinementRatio) ordersOfConvergence, GCI12s, GCI23s, asymptoticChecks, richardsonExtrapVals, uncertainties = convergResult convergenceHistory.append(convergResult) directions = ["X", "Y", "Z"] for d in range(len(directions)): print("{}-Direction: Order: {:>4.3f}, Asymptotic Check: {:>6.3f}, RichardsonExtrap: {:>7.3f}, Estimated Uncertainty: {:>6.3f}".format(directions[d], ordersOfConvergence[d], asymptoticChecks[d], richardsonExtrapVals[d], uncertainties[d])) if stopAtConvergence and abs(sum(asymptoticChecks) / len(asymptoticChecks) - 1) < 0.1 and max(asymptoticChecks) - min(asymptoticChecks) < 0.2: print("Simulation Converging Asymptotically") printConvergenceHistory() return timeStepHistory, finalPositionHistory, flight simCount += 1 # Output whether convergence was achieved if simLimit >= 3: print("Asymptotic convergence not reached within {} simulations".format(simLimit)) else: print("Asymptotic convergence impossible to reach with less than 3 iterations (performed {}). Adjust the parameter 'simLimit' to perform more iterations".format(simLimit)) printConvergenceHistory(ax1, ax2) return timeStepHistory, finalPositionHistory, simTimeHistory def compareClassicalIntegrationSchemes(self, saveFigure=False, showPlot=True, simLimit=10, integrationSchemes = [ "Euler", "RK2Midpoint", "RK2Heun", "RK4" ], convergenceResultFilePath="convergenceResult.csv"): ''' Arguments: simConfigFilePath (string) saveFigure (Bool) convergenceFilePath (string or None) - will overwrite old files Outputs: Plot .csv file (Optional) Returns: Nothing ''' plt.figure(figsize=(3.5,3)) ax1 = plt.gca() ax2 = plt.twinx() initTimeStep = float(self.simDefinition.getValue("SimControl.timeStep")) # Lists to store results timeStepHistory = [] finalPositionHistories = [] wallTimeHistory = [] # Run series of simulations for each integration scheme for scheme in integrationSchemes: self.simDefinition.setValue("SimControl.timeDiscretization", scheme) self.simDefinition.setValue("SimControl.timeStep", str(initTimeStep)) timeSteps, finalPositions, wallTimes = self.convergeSimEndPosition(showPlot=False, simLimit=simLimit, plotLineLabel=scheme, ax1=ax1, ax2=ax2) timeStepHistory = timeSteps finalPositionHistories.append(finalPositions) wallTimeHistory.append(wallTimes) print("Simulations complete") if convergenceResultFilePath != None: print("Writing convergence results to: {}".format(convergenceResultFilePath)) with open(convergenceResultFilePath, 'w', newline='') as file: writer = csv.writer(file) # Write Column Headers headerRow = [ "TimeStep(s)" ] for timeStep in range(len(integrationSchemes)): intScheme = integrationSchemes[timeStep] headerRow += [ "{}_FinalX(m)".format(intScheme), "{}_FinalY(m)".format(intScheme), "{}_FinalZ(m)".format(intScheme), "{}_WallTime(s)".format(intScheme) ] writer.writerow(headerRow) # Write convergence results, time step by time step for timeStep in range(len(timeStepHistory)): row = [ timeStepHistory[timeStep] ] for integrationScheme in range(len(integrationSchemes)): row.append(finalPositionHistories[integrationScheme][timeStep].X) row.append(finalPositionHistories[integrationScheme][timeStep].Y) row.append(finalPositionHistories[integrationScheme][timeStep].Z) row += [ wallTimeHistory[integrationScheme][timeStep] ] writer.writerow(row) if saveFigure: try: plt.savefig("/home/hhstoldt/Documents/flightSimPaper/Figures/Images/AdaptTimeStepConvergence_ConstTimeStep.eps", bbox_inches="tight", pad_inches=0) except: plt.savefig("C:/Users/rando/Documents/flightSimPaper/Figures/Images/AdaptTimeStepConvergence_ConstTimeStep.eps", bbox_inches="tight", pad_inches=0) print("Showing plot") if showPlot: plt.show() def compareAdaptiveIntegrationSchemes(self, saveFigure=False, showPlot=True, integrationSchemes=["RK12Adaptive", "RK23Adaptive", "RK45Adaptive"], simLimit=10, convergenceResultFilePath="adaptiveConvergenceResult.csv"): ''' Arguments: simConfigFilePath (string) saveFigure (Bool) convergenceFilePath (string or None) - will overwrite old files Outputs: Plot .csv file (Optional) Returns: Nothing ''' plt.figure(figsize=(3.5,3)) ax1 = plt.gca() ax2 = plt.twinx() initErrorTarget = float(self.simDefinition.getValue("SimControl.TimeStepAdaptation.targetError")) # Lists to store results targetErrorHistory = [] finalPositionHistories = [] wallTimeHistory = [] # Run simulations for scheme in integrationSchemes: self.simDefinition.setValue("SimControl.timeDiscretization", scheme) self.simDefinition.setValue("SimControl.TimeStepAdaptation.targetError", str(initErrorTarget)) timeSteps, finalPositions, wallTimes = self.convergeSimEndPosition(showPlot=False, plotLineLabel=scheme, refinementRatio=2, simLimit=simLimit, ax1=ax1, ax2=ax2) targetErrorHistory = timeSteps finalPositionHistories.append(finalPositions) wallTimeHistory.append(wallTimes) # Write results to .csv file if convergenceResultFilePath != None: import csv print("Writing convergence results to: {}".format(convergenceResultFilePath)) with open(convergenceResultFilePath, 'w', newline='') as file: writer = csv.writer(file) # Write Column Headers headerRow = [ "TargetError" ] for timeStep in range(len(integrationSchemes)): intScheme = integrationSchemes[timeStep] headerRow += [ "{}_FinalX(m)".format(intScheme), "{}_FinalY(m)".format(intScheme), "{}_FinalZ(m)".format(intScheme), "{}_WallTime(s)".format(intScheme) ] writer.writerow(headerRow) # Write convergence results, time step by time step for timeStep in range(len(targetErrorHistory)): row = [ targetErrorHistory[timeStep] ] for integrationScheme in range(len(integrationSchemes)): row.append(finalPositionHistories[integrationScheme][timeStep].X) row.append(finalPositionHistories[integrationScheme][timeStep].Y) row.append(finalPositionHistories[integrationScheme][timeStep].Z) row += [ wallTimeHistory[integrationScheme][timeStep] ] writer.writerow(row) # Save results figure if saveFigure: try: plt.savefig("/home/hhstoldt/Documents/flightSimPaper/Figures/Images/TimeStepConvergence_ConstTimeStep.eps", bbox_inches="tight", pad_inches=0) except: plt.savefig("C:/Users/rando/Documents/flightSimPaper/Figures/Images/TimeStepConvergence_ConstTimeStep.eps", bbox_inches="tight", pad_inches=0) # Show Plot plt.xlabel("Target Error") if showPlot: plt.show() def _setUpConfigFileForConvergenceRun(self): print("Will attempt to converge final rocket position of simulation: {}".format(self.simDefinition.fileName)) self.simDefinition.disableDistributionSampling = True # Don't sample from probability distributions while trying to converge a sim # Make sure no plots are created every time the sim runs self.simDefinition.setValue("SimControl.plot", "None") #### Make sure End Condition is a time #### endCondition = self.simDefinition.getValue("SimControl.EndCondition") if endCondition != "Time": print("Running simulation to determine end time") # Otherwise run the sim, get the end time and flights, _ = self.run() endTime = flights[0].times[-1] # set that to the end condition print("Setting EndCondition = Time, EndConditionValue = {}".format(endTime)) self.simDefinition.setValue("SimControl.EndCondition", "Time") self.simDefinition.setValue("SimControl.EndConditionValue", str(endTime))
Ancestors
Methods
def compareAdaptiveIntegrationSchemes(self, saveFigure=False, showPlot=True, integrationSchemes=['RK12Adaptive', 'RK23Adaptive', 'RK45Adaptive'], simLimit=10, convergenceResultFilePath='adaptiveConvergenceResult.csv')
-
Arguments
simConfigFilePath (string) saveFigure (Bool) convergenceFilePath (string or None) - will overwrite old files
Outputs
Plot .csv file (Optional)
Returns
Nothing
Expand source code
def compareAdaptiveIntegrationSchemes(self, saveFigure=False, showPlot=True, integrationSchemes=["RK12Adaptive", "RK23Adaptive", "RK45Adaptive"], simLimit=10, convergenceResultFilePath="adaptiveConvergenceResult.csv"): ''' Arguments: simConfigFilePath (string) saveFigure (Bool) convergenceFilePath (string or None) - will overwrite old files Outputs: Plot .csv file (Optional) Returns: Nothing ''' plt.figure(figsize=(3.5,3)) ax1 = plt.gca() ax2 = plt.twinx() initErrorTarget = float(self.simDefinition.getValue("SimControl.TimeStepAdaptation.targetError")) # Lists to store results targetErrorHistory = [] finalPositionHistories = [] wallTimeHistory = [] # Run simulations for scheme in integrationSchemes: self.simDefinition.setValue("SimControl.timeDiscretization", scheme) self.simDefinition.setValue("SimControl.TimeStepAdaptation.targetError", str(initErrorTarget)) timeSteps, finalPositions, wallTimes = self.convergeSimEndPosition(showPlot=False, plotLineLabel=scheme, refinementRatio=2, simLimit=simLimit, ax1=ax1, ax2=ax2) targetErrorHistory = timeSteps finalPositionHistories.append(finalPositions) wallTimeHistory.append(wallTimes) # Write results to .csv file if convergenceResultFilePath != None: import csv print("Writing convergence results to: {}".format(convergenceResultFilePath)) with open(convergenceResultFilePath, 'w', newline='') as file: writer = csv.writer(file) # Write Column Headers headerRow = [ "TargetError" ] for timeStep in range(len(integrationSchemes)): intScheme = integrationSchemes[timeStep] headerRow += [ "{}_FinalX(m)".format(intScheme), "{}_FinalY(m)".format(intScheme), "{}_FinalZ(m)".format(intScheme), "{}_WallTime(s)".format(intScheme) ] writer.writerow(headerRow) # Write convergence results, time step by time step for timeStep in range(len(targetErrorHistory)): row = [ targetErrorHistory[timeStep] ] for integrationScheme in range(len(integrationSchemes)): row.append(finalPositionHistories[integrationScheme][timeStep].X) row.append(finalPositionHistories[integrationScheme][timeStep].Y) row.append(finalPositionHistories[integrationScheme][timeStep].Z) row += [ wallTimeHistory[integrationScheme][timeStep] ] writer.writerow(row) # Save results figure if saveFigure: try: plt.savefig("/home/hhstoldt/Documents/flightSimPaper/Figures/Images/TimeStepConvergence_ConstTimeStep.eps", bbox_inches="tight", pad_inches=0) except: plt.savefig("C:/Users/rando/Documents/flightSimPaper/Figures/Images/TimeStepConvergence_ConstTimeStep.eps", bbox_inches="tight", pad_inches=0) # Show Plot plt.xlabel("Target Error") if showPlot: plt.show()
def compareClassicalIntegrationSchemes(self, saveFigure=False, showPlot=True, simLimit=10, integrationSchemes=['Euler', 'RK2Midpoint', 'RK2Heun', 'RK4'], convergenceResultFilePath='convergenceResult.csv')
-
Arguments
simConfigFilePath (string) saveFigure (Bool) convergenceFilePath (string or None) - will overwrite old files
Outputs
Plot .csv file (Optional)
Returns
Nothing
Expand source code
def compareClassicalIntegrationSchemes(self, saveFigure=False, showPlot=True, simLimit=10, integrationSchemes = [ "Euler", "RK2Midpoint", "RK2Heun", "RK4" ], convergenceResultFilePath="convergenceResult.csv"): ''' Arguments: simConfigFilePath (string) saveFigure (Bool) convergenceFilePath (string or None) - will overwrite old files Outputs: Plot .csv file (Optional) Returns: Nothing ''' plt.figure(figsize=(3.5,3)) ax1 = plt.gca() ax2 = plt.twinx() initTimeStep = float(self.simDefinition.getValue("SimControl.timeStep")) # Lists to store results timeStepHistory = [] finalPositionHistories = [] wallTimeHistory = [] # Run series of simulations for each integration scheme for scheme in integrationSchemes: self.simDefinition.setValue("SimControl.timeDiscretization", scheme) self.simDefinition.setValue("SimControl.timeStep", str(initTimeStep)) timeSteps, finalPositions, wallTimes = self.convergeSimEndPosition(showPlot=False, simLimit=simLimit, plotLineLabel=scheme, ax1=ax1, ax2=ax2) timeStepHistory = timeSteps finalPositionHistories.append(finalPositions) wallTimeHistory.append(wallTimes) print("Simulations complete") if convergenceResultFilePath != None: print("Writing convergence results to: {}".format(convergenceResultFilePath)) with open(convergenceResultFilePath, 'w', newline='') as file: writer = csv.writer(file) # Write Column Headers headerRow = [ "TimeStep(s)" ] for timeStep in range(len(integrationSchemes)): intScheme = integrationSchemes[timeStep] headerRow += [ "{}_FinalX(m)".format(intScheme), "{}_FinalY(m)".format(intScheme), "{}_FinalZ(m)".format(intScheme), "{}_WallTime(s)".format(intScheme) ] writer.writerow(headerRow) # Write convergence results, time step by time step for timeStep in range(len(timeStepHistory)): row = [ timeStepHistory[timeStep] ] for integrationScheme in range(len(integrationSchemes)): row.append(finalPositionHistories[integrationScheme][timeStep].X) row.append(finalPositionHistories[integrationScheme][timeStep].Y) row.append(finalPositionHistories[integrationScheme][timeStep].Z) row += [ wallTimeHistory[integrationScheme][timeStep] ] writer.writerow(row) if saveFigure: try: plt.savefig("/home/hhstoldt/Documents/flightSimPaper/Figures/Images/AdaptTimeStepConvergence_ConstTimeStep.eps", bbox_inches="tight", pad_inches=0) except: plt.savefig("C:/Users/rando/Documents/flightSimPaper/Figures/Images/AdaptTimeStepConvergence_ConstTimeStep.eps", bbox_inches="tight", pad_inches=0) print("Showing plot") if showPlot: plt.show()
def convergeSimEndPosition(self, refinementRatio=2, simLimit=10, plot=True, stopAtConvergence=False, showPlot=True, plotLineLabel='Simulations', ax1=None, ax2=None)
-
Takes simulation and runs it repeatedly, cutting the time step in half each time. Once convergence is approximately asymptotic, exits and returns series of final positions, convergence order, and extrapolated final position Should use with simulations that have an EndCondition of type "Time" # Otherwise sim will be run using current settings, and its endtime will be taken as the new end time for future convergence sims This Fn called by compareIntegrationSchemes functions
Parameters
simConfigFilePath string, /path/to/simConfigFile fW SimDefinition, overrides simConfigFilePath refinementRatio Number, Each time sim is run, time step or target error is divided by this number simLimit Number, Max number of simulations to run (takes exponentially more time to run more simulations) plot True/False, whether to plot the results stopAtConvergence True/False, if False, runs simLimit simulations even if asymptotic convergence is reached earlier showPlot True/False, if True, calls plt.show() plotLineLabel string, Label of line on plot ax1 matplotlib Axes, Z-location (Y) axis ax2 matplotlib Axes, Wall Time (Y) axis (2nd Y-axis)
Expand source code
def convergeSimEndPosition(self, refinementRatio=2, simLimit=10, plot=True, stopAtConvergence=False, showPlot=True, plotLineLabel="Simulations", ax1=None, ax2=None): ''' Takes simulation and runs it repeatedly, cutting the time step in half each time. Once convergence is approximately asymptotic, exits and returns series of final positions, convergence order, and extrapolated final position Should use with simulations that have an EndCondition of type "Time" # Otherwise sim will be run using current settings, and its endtime will be taken as the new end time for future convergence sims This Fn called by compareIntegrationSchemes functions Parameters: simConfigFilePath string, /path/to/simConfigFile fW SimDefinition, overrides simConfigFilePath refinementRatio Number, Each time sim is run, time step or target error is divided by this number simLimit Number, Max number of simulations to run (takes exponentially more time to run more simulations) plot True/False, whether to plot the results stopAtConvergence True/False, if False, runs simLimit simulations even if asymptotic convergence is reached earlier showPlot True/False, if True, calls plt.show() plotLineLabel string, Label of line on plot ax1 matplotlib Axes, Z-location (Y) axis ax2 matplotlib Axes, Wall Time (Y) axis (2nd Y-axis) ''' from MAPLEAF.IO.gridConvergenceFunctions import checkConvergence from statistics import mean import time self._setUpConfigFileForConvergenceRun() timeStepMethod = self.simDefinition.getValue("SimControl.timeDiscretization") adaptiveTimeStepping = "Adaptive" in timeStepMethod timeStepKey = "SimControl.timeStep" targetErrorKey = "SimControl.TimeStepAdaptation.targetError" #### Run Simulations #### print("Starting convergence simulations") if not adaptiveTimeStepping: timeStep = float(self.simDefinition.getValue(timeStepKey))*refinementRatio # Multiplied by 2 to give correct time step in first iteration else: targetError = float(self.simDefinition.getValue(targetErrorKey))*refinementRatio # Multiplied by 2 to give correct time step in first iteration simCount = 1 finalPositionHistory = [] convergenceHistory = [] timeStepHistory = [] simTimeHistory = [] def printConvergenceHistory(ax1=ax1, ax2=ax2): print("") print("Convergence History:") print("Integration Method: {}".format(timeStepMethod)) xPos = [] yPos = [] zPos = [] for i in range(len(finalPositionHistory)): finalPos = finalPositionHistory[i] xPos.append(finalPos[0]) yPos.append(finalPos[1]) zPos.append(finalPos[2]) printString = "FinalPosition(m): {:>7.3f} WallTime(s): {:>7.3f} ".format(finalPos, simTimeHistory[i]) if i > 1: # TODO: Get convergence results into the .csv file ordersOfConvergence, GCI12s, GCI23s, asymptoticChecks, richardsonExtrapVals, uncertainties = convergenceHistory[i-2] printString += " Avg Order: {:>4.2f}, Avg Asymptotic Check: {:>6.3f}".format(mean(ordersOfConvergence), mean(asymptoticChecks)) print(printString) if plot: if ax1 == None: ax1 = plt.gca() if ax2 == None: ax2 = ax1.twinx() ax1.plot(timeStepHistory, zPos, ":D", label=plotLineLabel) ax1.set_ylabel("Final Z Coordinate (m)") ax2.plot(timeStepHistory, simTimeHistory, "-*", label=plotLineLabel + " Wall Time") ax2.set_ylabel("Wall Time (s)") plt.xscale("log") plt.xlabel("Time Step (s)") plt.legend() plt.tight_layout() if showPlot: plt.show() while simCount <= simLimit: if not adaptiveTimeStepping: timeStep /= refinementRatio self.simDefinition.setValue(timeStepKey, str(timeStep)) timeStepHistory.append(timeStep) print("Simulation {}, Time step: {}".format(simCount, timeStep)) else: targetError /= refinementRatio self.simDefinition.setValue(targetErrorKey, str(targetError)) timeStepHistory.append(targetError) print("Simulation {}, Time step: {}".format(simCount, targetError)) startTime = time.time() flights, _ = self.run() flight = flights[0] wallTime = time.time() - startTime simTimeHistory.append(wallTime) finalPositionHistory.append(flight.rigidBodyStates[-1].position) print("Final Position: {:1.3f}".format(finalPositionHistory[-1])) if len(finalPositionHistory) >= 3: # Check whether result is converging cV, mV, fV = finalPositionHistory[-3:] print("Checking convergence") convergResult = checkConvergence(cV, mV, fV, refinementRatio) ordersOfConvergence, GCI12s, GCI23s, asymptoticChecks, richardsonExtrapVals, uncertainties = convergResult convergenceHistory.append(convergResult) directions = ["X", "Y", "Z"] for d in range(len(directions)): print("{}-Direction: Order: {:>4.3f}, Asymptotic Check: {:>6.3f}, RichardsonExtrap: {:>7.3f}, Estimated Uncertainty: {:>6.3f}".format(directions[d], ordersOfConvergence[d], asymptoticChecks[d], richardsonExtrapVals[d], uncertainties[d])) if stopAtConvergence and abs(sum(asymptoticChecks) / len(asymptoticChecks) - 1) < 0.1 and max(asymptoticChecks) - min(asymptoticChecks) < 0.2: print("Simulation Converging Asymptotically") printConvergenceHistory() return timeStepHistory, finalPositionHistory, flight simCount += 1 # Output whether convergence was achieved if simLimit >= 3: print("Asymptotic convergence not reached within {} simulations".format(simLimit)) else: print("Asymptotic convergence impossible to reach with less than 3 iterations (performed {}). Adjust the parameter 'simLimit' to perform more iterations".format(simLimit)) printConvergenceHistory(ax1, ax2) return timeStepHistory, finalPositionHistory, simTimeHistory
Inherited members
class Simulation (simDefinitionFilePath=None, simDefinition=None, silent=False)
-
Inputs
- simDefinitionFilePath: (string) path to simulation definition file
- fW:
(
SimDefinition
) object that's already loaded and parsed the desired sim definition file - silent: (bool) toggles optional outputs to the console
Expand source code
class Simulation(): def __init__(self, simDefinitionFilePath=None, simDefinition=None, silent=False): ''' Inputs: * simDefinitionFilePath: (string) path to simulation definition file * fW: (`MAPLEAF.IO.SimDefinition`) object that's already loaded and parsed the desired sim definition file * silent: (bool) toggles optional outputs to the console ''' self.simDefinition = loadSimDefinition(simDefinitionFilePath, simDefinition, silent) ''' Instance of `MAPLEAF.IO.SimDefinition`. Defines the current simulation ''' self.environment = Environment(self.simDefinition, silent=silent) ''' Instance of `MAPLEAF.ENV.Environment`. Will be shared by all Rockets created by this sim runner ''' self.stagingIndex = None # Set in self.createRocket ''' (int) Set in `Simulation.createRocket`. Tracks how many stages have been dropped ''' self.silent = silent ''' (bool) ''' self.loggingLevel = int(self.simDefinition.getValue("SimControl.loggingLevel")) self.computeStageDropPaths = strtobool(self.simDefinition.getValue("SimControl.StageDropPaths.compute")) def run(self, rocket=None): ''' Runs simulation defined by self.simDefinition (which has parsed a simulation definition file) Returns: * stageFlightsPaths: (list[`MAPLEAF.IO.RocketFlight.RocketFlight`]) Each RocketFlight object represents the flight path of a single stage * logFilePaths: (list[string]) list of paths to all log files created by this simulation ''' simDefinition = self.simDefinition # Initialize the rocket + environment models and simulation logging if rocket == None: rocket = self.createRocket() # Initialize rocket on launch pad, with all stages attached self.rocketStages = [ rocket ] # In this array, 'stage' means independent rigid bodies. Stages are initialized as new rocket objects and added once they are dropped from the main rocket # Create progress bar if appropriate if simDefinition.getValue("SimControl.EndCondition") == "Time": endTime = float(simDefinition.getValue("SimControl.EndConditionValue")) progressBar = tqdm(total=endTime+0.01) try: self.logger.continueWritingToTerminal = False except AttributeError: pass # Logging not set up for this sim else: progressBar = None #### Main Loop Setup #### self.dts = [ float(simDefinition.getValue("SimControl.timeStep")) ] # (Initial) time step size self.endDetectors = [ self._getEndDetectorFunction(rocket, simDefinition) ] # List contains a boolean function that controls sim termination for each stage self.stageFlightPaths = [ self._setUpCachingForFlightAnimation(rocket) ] # List will contain resulting flight paths for each stage if(rocket.hardwareInTheLoopControl == "yes"): print("Setting up hardware in the loop interface") rocket.hilInterface.setupHIL(self.rocketStages[0].rigidBody.state) #### Main Loop #### stageIndex = 0 while stageIndex < len(self.rocketStages): if stageIndex > 0: print("Computing stage {} drop path".format(stageIndex)) rocket = self.rocketStages[stageIndex] endDetector = self.endDetectors[stageIndex] flight = self.stageFlightPaths[stageIndex] endSimulation, FinalTimeStepDt = endDetector(self.dts[stageIndex]) while not endSimulation: ### Take a time step ### try: if FinalTimeStepDt != None: print("Simulation Runner overriding time step from {} to {} to accurately meet end condition".format(self.dts[stageIndex], FinalTimeStepDt)) self.dts[stageIndex] = FinalTimeStepDt integrationResult = rocket.timeStep(self.dts[stageIndex]) if stageIndex == 0: # Currently, progress bar only works for bottom stage try: progressBar.update(integrationResult.dt) except AttributeError: pass except: try: progressBar.close() if not self.silent: sys.stdout.continueWritingToTerminal = True # sys.stdout is an instance of MAPLEAF.IO.Logging.Logger except AttributeError: pass # Save simulation results and print out stack trace self._handleSimulationCrash() # Adjust time step size for next iteration self.dts[stageIndex] = integrationResult.dt * integrationResult.timeStepAdaptationFactor # HIL if(rocket.hardwareInTheLoopControl == "yes"): rocket.hilInterface.performHIL(rocket.rigidBody.state,rocket.rigidBody.time) # Cache states for flight animation self.cacheState(rocket, flight) # Check whether we should end the simulation, or take a modified-size final time step endSimulation, FinalTimeStepDt = endDetector(self.dts[stageIndex]) # Log last state (would be the starting state of the next time step) rocket._logState() # Move on to next (dropped) stage stageIndex += 1 try: progressBar.close() if not self.silent: sys.stdout.continueWritingToTerminal = True # Actually editing a MAPLEAF.IO.Logging.Logger object here except AttributeError: pass print("Simulation Complete") logFilePaths = self._postProcess(simDefinition) return self.stageFlightPaths, logFilePaths #### Pre-sim #### def createRocket(self, stage=None): ''' Initializes a rocket, complete with an Environment object and logs, both owned by the instance of this class Returns an instance of Rocket with it's Environment/Logs initialized. Can be called by external classes to obtain a prepped rocket (used a lot this way in test cases). ''' # Initialize Rocket rocketDictReader = SubDictReader("Rocket", self.simDefinition) rocket = Rocket(rocketDictReader, silent=self.silent, stageToInitialize=stage, simRunner=self, environment=self.environment) # Initialize Rocket if self.simDefinition.getValue('SimControl.RocketPlot') in [ 'On', 'on' ]: rocket.plotShape() # Reference to this simRunner used to add to logs if stage == None: self._setUpConsoleLogging() self.stagingIndex = 0 # Initially zero, after dropping first stage: 1, after dropping second stage: 2, etc... return rocket def _setUpConsoleLogging(self): if self.loggingLevel > 0: # Set up logging so that the output of any print calls after this point is captured in mainSimulationLog self.consoleOutputLog = [] if self.silent: self.logger = Logging.Logger(self.consoleOutputLog, continueWritingToTerminal=False) else: self.logger = Logging.Logger(self.consoleOutputLog) sys.stdout = self.logger # Output system info to console and to log Logging.getSystemInfo(printToConsole=True) # Output sim definition file and default value dict to the log only self.consoleOutputLog += Logging.getSimDefinitionAndDefaultValueDictsForOutput(simDefinition=self.simDefinition, printToConsole=False) # Output header for data outputted to the console during the simulation print("Starting Simulation:") print("Time(s) Altitude(m,ASL)") elif self.silent: # No intention of writing things to a log file, just prevent them from being printed to the terminal _ = [] logger = Logging.Logger(_, continueWritingToTerminal=False) sys.stdout = logger def _getEndDetectorFunction(self, rocket, simConfig, droppedStage=False): ''' Returns a function, which returns a boolean value and Union[None, float], indicating whether the simulation endpoint has been reached. When close to the end of a sim, the second value returned is the recommended time step to take to hit the end criteria. Simulation end criteria defined in sim definition file. Rocket object must be passed in because the end condition functions require a reference to the rocket, so they can access its current altitude/velocity/time attributes ''' # Read desired end criteria from simulation definition file if not droppedStage: # Get end condition for main stage endCondition = simConfig.getValue("SimControl.EndCondition") conditionValue = float(simConfig.getValue("SimControl.EndConditionValue")) else: # Get end condition for dropped stages endCondition = simConfig.getValue("SimControl.StageDropPaths.endCondition") conditionValue = float(simConfig.getValue("SimControl.StageDropPaths.endConditionValue")) # Define all possible end-detector functions def isAfterApogee(dt): return rocket.rigidBody.state.velocity.Z <= 0 and rocket.rigidBody.time > 1.0, None def isAboveAltitude(dt): return rocket.rigidBody.state.position.Z >= conditionValue, None def isBelowAltitude(dt): return rocket.environment.earthModel.getAltitude(*rocket.rigidBody.state.position) <= conditionValue, None def EndTimeReached(dt): currTime = rocket.rigidBody.time if currTime < conditionValue and currTime + dt >= conditionValue: return False, conditionValue+1e-14-currTime elif currTime >= conditionValue: return True, None else: return False, None # Return the desired function if endCondition == "Apogee": return isAfterApogee elif endCondition == "Altitude" and rocket.rigidBody.state.position.Z < conditionValue: return isAboveAltitude elif endCondition == "Altitude": return isBelowAltitude else: return EndTimeReached def _setUpCachingForFlightAnimation(self, rocket): flight = RocketFlight() flight.times.append(rocket.rigidBody.time) flight.rigidBodyStates.append(rocket.rigidBody.state) if rocket.controlSystem != None and rocket.controlSystem.controlledSystem != None: # If rocket has moving fins, record their angles for plotting nActuators = len(rocket.controlSystem.controlledSystem.actuatorList) flight.actuatorDefls = [ [0] for i in range(nActuators) ] flight.actuatorTargetDefls = [ [0] for i in range(nActuators) ] else: flight.actuatorDefls = None flight.actuatorTargetDefls = None return flight #### During sim #### def cacheState(self, rocket: Rocket, flight: RocketFlight): ''' Adds the rocket's current state to the flight object ''' time = rocket.rigidBody.time flight.times.append(time) flight.rigidBodyStates.append(rocket.rigidBody.state) if rocket.controlSystem != None: try: for a in range(len(rocket.controlSystem.controlledSystem.actuatorList)): flight.actuatorDefls[a].append(rocket.controlSystem.controlledSystem.actuatorList[a].getDeflection(time)) flight.actuatorTargetDefls[a].append(rocket.controlSystem.controlledSystem.actuatorList[a].targetDeflection) except AttributeError: # Expecting to arrive here when timestepping a dropped stage of a controlled rocket, which doesn't have canards pass def createNewDetachedStage(self): ''' Called by Rocket._stageSeparation ''' if self.computeStageDropPaths: newDetachedStage = self.createRocket(stage=self.stagingIndex) # Set kinematic properties to match those of the current top-most stage topStage = self.rocketStages[0] newDetachedStage.rigidBody.state = deepcopy(topStage.rigidBody.state) newDetachedStage.rigidBody.time = topStage.rigidBody.time self.rocketStages.append(newDetachedStage) # New sim termination condition function self.endDetectors.append(self._getEndDetectorFunction(newDetachedStage, self.simDefinition, droppedStage=True)) self.dts.append(self.dts[0]) # Duplicate existing flight object newFlightObject = deepcopy(self.stageFlightPaths[0]) # Will have had the same flight path as the top stage until the moment of separation newFlightObject.actuatorDefls = None # Dropped stage shouldn't have any canard deflections self.stageFlightPaths.append(newFlightObject) self.stagingIndex += 1 def discardForceLogsForPreviousTimeStep(self, integrator): if self.loggingLevel >= 2: # Figure out how many times this integrator evaluates a function derivative (rocket forces in our case) if integrator.method == "RK12Adaptive": numDerivativeEvals = 2 else: numDerivativeEvals = len(integrator.tableau)-1 # Remove that number of rows from the end of the force evaluation log for i in range(numDerivativeEvals): for rocket in self.rocketStages: rocket.derivativeEvaluationLog.deleteLastRow() def _handleSimulationCrash(self): ''' After a simulation crashes, tries to create log files and show plots anyways, before printing a stack trace ''' print("ERROR: Simulation Crashed, Aborting") print("Attempting to save log files and show plots") self._postProcess(self.simDefinition) # Try to print out the stack trace print("Attempting to show stack trace") import traceback tb = traceback.format_exc() print(tb) print("Exiting") sys.exit() #### Post-sim #### def _postProcess(self, simDefinition): simDefinition.printDefaultValuesUsed() # Print these out before logging, to include them in the log # Log results logFilePaths = self._logSimulationResults(simDefinition) # Transfer key time info to flight objects from rocket for i in range(len(self.rocketStages)): self.stageFlightPaths[i].engineOffTime = self.rocketStages[i].engineShutOffTime self.stageFlightPaths[i].mainChuteDeployTime = self.rocketStages[i].mainChuteDeployTime self.stageFlightPaths[i].targetLocation = self.rocketStages[i].targetLocation # Plot results self._plotSimulationResults(self.rocketStages, simDefinition, self.stageFlightPaths, logFilePaths) # Print these out after logging to avoid including the log/plot keys in the unused keys # #TODO: Add exceptions for these keys, move this line to before logging so that it's output is also included in the simulation log simDefinition.printUnusedKeys() return logFilePaths def _logSimulationResults(self, simDefinition): ''' Logs simulation results to file (as/if specified in sim definition) ''' logFilePaths = None if self.loggingLevel > 0: logFilePaths = [] # Create a new folder for the results of the current simulation periodIndex = simDefinition.fileName.rfind('.') resultsFolderBaseName = simDefinition.fileName[:periodIndex] + "_Run" def tryCreateResultsFolder(resultsFolderBaseName): resultsFolderName = Logging.findNextAvailableNumberedFileName(fileBaseName=resultsFolderBaseName, extension="") try: os.mkdir(resultsFolderName) return resultsFolderName except FileExistsError: # End up here if another process created the same results folder # (other thread runs os.mkdir b/w when this thread runs findNextAvailableNumberedFileName and os.mkdir) # Should only happen during parallel runs return "" createdResultsFolder = tryCreateResultsFolder(resultsFolderBaseName) iterations = 0 while createdResultsFolder == "" and iterations < 50: createdResultsFolder = tryCreateResultsFolder(resultsFolderBaseName) iterations += 1 if iterations == 50: raise ValueError("Repeated error (50x): unable to create a results folder: {}.".format(resultsFolderBaseName)) # Write logs to file for rocket in self.rocketStages: logFilePaths += rocket.writeLogsToFile(createdResultsFolder) # Output console output consoleOutputPath = os.path.join(createdResultsFolder, "consoleOutput.txt") print("Writing log file: {}".format(consoleOutputPath)) with open(consoleOutputPath, 'w+') as file: file.writelines(self.consoleOutputLog) return logFilePaths def _plotSimulationResults(self, rocketStages, simDefinition, flights, logFilePaths): ''' Plot simulation results (as/if specified in sim definition) ''' plotsToMake = simDefinition.getValue("SimControl.plot").split() if plotsToMake != ["None"]: if "FlightAnimation" in plotsToMake: print("Showing flight animation") # Show animation Plotting.flightAnimation(flights) # Done, remove from plotsToMake plotsToMake.remove("FlightAnimation") if "FlightPaths" in plotsToMake: earthModel = self.simDefinition.getValue("Environment.EarthModel") if earthModel in [ "None", "Flat" ]: Plotting.plotFlightPaths_NoEarth(flights) else: Plotting.plotFlightPaths_FullEarth(flights) plotsToMake.remove("FlightPaths") # Plot all other columns from log files for plotDefinitionString in plotsToMake: Plotting.plotFromLogFiles(logFilePaths, plotDefinitionString)
Subclasses
Instance variables
var environment
-
Instance of
Environment
. Will be shared by all Rockets created by this sim runner var silent
-
(bool)
var simDefinition
-
Instance of
SimDefinition
. Defines the current simulation var stagingIndex
-
(int) Set in
Simulation.createRocket()
. Tracks how many stages have been dropped
Methods
def cacheState(self, rocket: Rocket, flight: RocketFlight)
-
Adds the rocket's current state to the flight object
Expand source code
def cacheState(self, rocket: Rocket, flight: RocketFlight): ''' Adds the rocket's current state to the flight object ''' time = rocket.rigidBody.time flight.times.append(time) flight.rigidBodyStates.append(rocket.rigidBody.state) if rocket.controlSystem != None: try: for a in range(len(rocket.controlSystem.controlledSystem.actuatorList)): flight.actuatorDefls[a].append(rocket.controlSystem.controlledSystem.actuatorList[a].getDeflection(time)) flight.actuatorTargetDefls[a].append(rocket.controlSystem.controlledSystem.actuatorList[a].targetDeflection) except AttributeError: # Expecting to arrive here when timestepping a dropped stage of a controlled rocket, which doesn't have canards pass
def createNewDetachedStage(self)
-
Called by Rocket._stageSeparation
Expand source code
def createNewDetachedStage(self): ''' Called by Rocket._stageSeparation ''' if self.computeStageDropPaths: newDetachedStage = self.createRocket(stage=self.stagingIndex) # Set kinematic properties to match those of the current top-most stage topStage = self.rocketStages[0] newDetachedStage.rigidBody.state = deepcopy(topStage.rigidBody.state) newDetachedStage.rigidBody.time = topStage.rigidBody.time self.rocketStages.append(newDetachedStage) # New sim termination condition function self.endDetectors.append(self._getEndDetectorFunction(newDetachedStage, self.simDefinition, droppedStage=True)) self.dts.append(self.dts[0]) # Duplicate existing flight object newFlightObject = deepcopy(self.stageFlightPaths[0]) # Will have had the same flight path as the top stage until the moment of separation newFlightObject.actuatorDefls = None # Dropped stage shouldn't have any canard deflections self.stageFlightPaths.append(newFlightObject) self.stagingIndex += 1
def createRocket(self, stage=None)
-
Initializes a rocket, complete with an Environment object and logs, both owned by the instance of this class Returns an instance of Rocket with it's Environment/Logs initialized. Can be called by external classes to obtain a prepped rocket (used a lot this way in test cases).
Expand source code
def createRocket(self, stage=None): ''' Initializes a rocket, complete with an Environment object and logs, both owned by the instance of this class Returns an instance of Rocket with it's Environment/Logs initialized. Can be called by external classes to obtain a prepped rocket (used a lot this way in test cases). ''' # Initialize Rocket rocketDictReader = SubDictReader("Rocket", self.simDefinition) rocket = Rocket(rocketDictReader, silent=self.silent, stageToInitialize=stage, simRunner=self, environment=self.environment) # Initialize Rocket if self.simDefinition.getValue('SimControl.RocketPlot') in [ 'On', 'on' ]: rocket.plotShape() # Reference to this simRunner used to add to logs if stage == None: self._setUpConsoleLogging() self.stagingIndex = 0 # Initially zero, after dropping first stage: 1, after dropping second stage: 2, etc... return rocket
def discardForceLogsForPreviousTimeStep(self, integrator)
-
Expand source code
def discardForceLogsForPreviousTimeStep(self, integrator): if self.loggingLevel >= 2: # Figure out how many times this integrator evaluates a function derivative (rocket forces in our case) if integrator.method == "RK12Adaptive": numDerivativeEvals = 2 else: numDerivativeEvals = len(integrator.tableau)-1 # Remove that number of rows from the end of the force evaluation log for i in range(numDerivativeEvals): for rocket in self.rocketStages: rocket.derivativeEvaluationLog.deleteLastRow()
def run(self, rocket=None)
-
Runs simulation defined by self.simDefinition (which has parsed a simulation definition file)
Returns
- stageFlightsPaths: (list[
MAPLEAF.IO.RocketFlight.RocketFlight
]) Each RocketFlight object represents the flight path of a single stage - logFilePaths: (list[string]) list of paths to all log files created by this simulation
Expand source code
def run(self, rocket=None): ''' Runs simulation defined by self.simDefinition (which has parsed a simulation definition file) Returns: * stageFlightsPaths: (list[`MAPLEAF.IO.RocketFlight.RocketFlight`]) Each RocketFlight object represents the flight path of a single stage * logFilePaths: (list[string]) list of paths to all log files created by this simulation ''' simDefinition = self.simDefinition # Initialize the rocket + environment models and simulation logging if rocket == None: rocket = self.createRocket() # Initialize rocket on launch pad, with all stages attached self.rocketStages = [ rocket ] # In this array, 'stage' means independent rigid bodies. Stages are initialized as new rocket objects and added once they are dropped from the main rocket # Create progress bar if appropriate if simDefinition.getValue("SimControl.EndCondition") == "Time": endTime = float(simDefinition.getValue("SimControl.EndConditionValue")) progressBar = tqdm(total=endTime+0.01) try: self.logger.continueWritingToTerminal = False except AttributeError: pass # Logging not set up for this sim else: progressBar = None #### Main Loop Setup #### self.dts = [ float(simDefinition.getValue("SimControl.timeStep")) ] # (Initial) time step size self.endDetectors = [ self._getEndDetectorFunction(rocket, simDefinition) ] # List contains a boolean function that controls sim termination for each stage self.stageFlightPaths = [ self._setUpCachingForFlightAnimation(rocket) ] # List will contain resulting flight paths for each stage if(rocket.hardwareInTheLoopControl == "yes"): print("Setting up hardware in the loop interface") rocket.hilInterface.setupHIL(self.rocketStages[0].rigidBody.state) #### Main Loop #### stageIndex = 0 while stageIndex < len(self.rocketStages): if stageIndex > 0: print("Computing stage {} drop path".format(stageIndex)) rocket = self.rocketStages[stageIndex] endDetector = self.endDetectors[stageIndex] flight = self.stageFlightPaths[stageIndex] endSimulation, FinalTimeStepDt = endDetector(self.dts[stageIndex]) while not endSimulation: ### Take a time step ### try: if FinalTimeStepDt != None: print("Simulation Runner overriding time step from {} to {} to accurately meet end condition".format(self.dts[stageIndex], FinalTimeStepDt)) self.dts[stageIndex] = FinalTimeStepDt integrationResult = rocket.timeStep(self.dts[stageIndex]) if stageIndex == 0: # Currently, progress bar only works for bottom stage try: progressBar.update(integrationResult.dt) except AttributeError: pass except: try: progressBar.close() if not self.silent: sys.stdout.continueWritingToTerminal = True # sys.stdout is an instance of MAPLEAF.IO.Logging.Logger except AttributeError: pass # Save simulation results and print out stack trace self._handleSimulationCrash() # Adjust time step size for next iteration self.dts[stageIndex] = integrationResult.dt * integrationResult.timeStepAdaptationFactor # HIL if(rocket.hardwareInTheLoopControl == "yes"): rocket.hilInterface.performHIL(rocket.rigidBody.state,rocket.rigidBody.time) # Cache states for flight animation self.cacheState(rocket, flight) # Check whether we should end the simulation, or take a modified-size final time step endSimulation, FinalTimeStepDt = endDetector(self.dts[stageIndex]) # Log last state (would be the starting state of the next time step) rocket._logState() # Move on to next (dropped) stage stageIndex += 1 try: progressBar.close() if not self.silent: sys.stdout.continueWritingToTerminal = True # Actually editing a MAPLEAF.IO.Logging.Logger object here except AttributeError: pass print("Simulation Complete") logFilePaths = self._postProcess(simDefinition) return self.stageFlightPaths, logFilePaths
- stageFlightsPaths: (list[
class WindTunnelSimulation (parametersToSweep=None, parameterValues=None, simDefinitionFilePath=None, simDefinition=None, silent=False, smoothLine='False')
-
Inputs
- simDefinitionFilePath: (string) path to simulation definition file
- fW:
(
SimDefinition
) object that's already loaded and parsed the desired sim definition file - silent: (bool) toggles optional outputs to the console
Expand source code
class WindTunnelSimulation(Simulation): def __init__(self, parametersToSweep=None, parameterValues=None, simDefinitionFilePath=None, simDefinition=None, silent=False, smoothLine='False'): self.parametersToSweep = ["Rocket.velocity"] if (parametersToSweep == None) else parametersToSweep self.parameterValues = [["(0 0 100)", "(0 0 200)", "(0 0 300)"]] if (parameterValues == None) else parameterValues self.smoothLine = smoothLine # Error checks if len(self.parametersToSweep) != len(self.parameterValues): raise ValueError("Must have a list of values for each parameter to sweep over. Currently have {} parameters, and {} lists of values.".format(len(self.parametersToSweep), len(self.parameterValues))) paramValueCounts = [ len(x) for x in self.parameterValues ] if not all([ (x == paramValueCounts[0]) for x in paramValueCounts ]): raise ValueError("All lists of parameter values must be of equal length. Currently, list lengths are: {}".format(paramValueCounts)) Simulation.__init__(self, simDefinitionFilePath, simDefinition, silent) def runSweep(self): ''' Runs a single force evaluation for each value of self.parameterToSweekKey contained in self.parameterValueList Returns: List of log file paths. Main Sim Log will be empty. Force eval log and expanded force eval log will have one entry per sweep point ''' # Make sure we're logging forces (the only way to get data out of this sim runner) and avoid producing any plots self.simDefinition.setValue("SimControl.loggingLevel", "3") self.simDefinition.setValue("SimControl.plot", "None") self.simDefinition.setValue("SimControl.RocketPlot", "Off") if strtobool(self.smoothLine): # interpolate between user set parameter values self._addPoints() for i in range(len(self.parameterValues[0])): # i corresponds to # of conditions, ie how many times parameter values will be changed (velocity1, velocity2, ...) for j in range(len(self.parameterValues)): # j'th parameter (velocity, temperature) self.simDefinition.setValue(self.parametersToSweep[j], self.parameterValues[j][i]) # Run a single force evaluation, which creates a forces log entry for this force evaluation rocket = self.createRocket() # Track all derivative evaluations in a single log if i == 0: log = rocket.derivativeEvaluationLog else: rocket.derivativeEvaluationLog = log self.rocketStages = [ rocket ] rocket._getAppliedForce(0.0, rocket.rigidBody.state) # Write Logs to file, return path return self._postProcess() def _addPoints(self, pointMultiple=10): ''' Edits the parameter sweeps to include a multiple of the previous number of points, linearly interpolated between the given values ''' for i in range(len(self.parameterValues[0]) - 1): # i corresponds to # of tests to run for interpPt in range(1, pointMultiple): # Loops over each new point intervalStartIndex = pointMultiple*i # Index at which to add new point (also interval end index) newPointIndex = pointMultiple*i + interpPt for param in range(len(self.parametersToSweep)): # j'th corresponds to parameters to sweep over (velocity, temperature) try: # Vector sweep first = Vector(self.parameterValues[param][intervalStartIndex]) second = Vector(self.parameterValues[param][newPointIndex]) change = second - first incrementalValue = str(interpPt/pointMultiple*change + first) except ValueError: # Scalar sweep first = float(self.parameterValues[param][intervalStartIndex]) second = float(self.parameterValues[param][newPointIndex]) change = second - first incrementalValue = str(interpPt/pointMultiple*change + first) self.parameterValues[param].insert(newPointIndex, incrementalValue) def createRocket(self): ''' Do all the same stuff as the parent object, but also re-initialize the environment, to make sure changes to environmental properties during the parameter sweep take effect ''' self.environment = Environment(self.simDefinition, silent=self.silent) return super().createRocket() def _setUpLogging(self): ''' Override to ensure that logs aren't re-initialized for every simulation. mainSimulationLog will only be absent the first time this function is run Want to keep all the force data in a single log file ''' if not hasattr(self, 'consoleOutputLog'): return super()._setUpConsoleLogging() def _postProcess(self): ''' Creates an empty flight path object to prevent errors in the parent function, which is still run to create log files. Removes mainSimLog from (returned) log file paths since no time steps we taken by this sim ''' # Create an empty flight path to prevent errors in the parent function) self.stageFlightPaths = [ RocketFlight() ] return Simulation._postProcess(self, self.simDefinition)
Ancestors
Methods
def createRocket(self)
-
Do all the same stuff as the parent object, but also re-initialize the environment, to make sure changes to environmental properties during the parameter sweep take effect
Expand source code
def createRocket(self): ''' Do all the same stuff as the parent object, but also re-initialize the environment, to make sure changes to environmental properties during the parameter sweep take effect ''' self.environment = Environment(self.simDefinition, silent=self.silent) return super().createRocket()
def runSweep(self)
-
Runs a single force evaluation for each value of self.parameterToSweekKey contained in self.parameterValueList
Returns
List of log file paths. Main Sim Log will be empty. Force eval log and expanded force eval log will have one entry per sweep point
Expand source code
def runSweep(self): ''' Runs a single force evaluation for each value of self.parameterToSweekKey contained in self.parameterValueList Returns: List of log file paths. Main Sim Log will be empty. Force eval log and expanded force eval log will have one entry per sweep point ''' # Make sure we're logging forces (the only way to get data out of this sim runner) and avoid producing any plots self.simDefinition.setValue("SimControl.loggingLevel", "3") self.simDefinition.setValue("SimControl.plot", "None") self.simDefinition.setValue("SimControl.RocketPlot", "Off") if strtobool(self.smoothLine): # interpolate between user set parameter values self._addPoints() for i in range(len(self.parameterValues[0])): # i corresponds to # of conditions, ie how many times parameter values will be changed (velocity1, velocity2, ...) for j in range(len(self.parameterValues)): # j'th parameter (velocity, temperature) self.simDefinition.setValue(self.parametersToSweep[j], self.parameterValues[j][i]) # Run a single force evaluation, which creates a forces log entry for this force evaluation rocket = self.createRocket() # Track all derivative evaluations in a single log if i == 0: log = rocket.derivativeEvaluationLog else: rocket.derivativeEvaluationLog = log self.rocketStages = [ rocket ] rocket._getAppliedForce(0.0, rocket.rigidBody.state) # Write Logs to file, return path return self._postProcess()
Inherited members