Module MAPLEAF.Rocket.simEventDetector
Generalized event detector (Apogee, motor burnout etc…) for a single vehicle.
Used to trigger stage separations and recovery system deployments.
Expand source code
# Created by: Henry Stoldt
# April 2020
'''
Generalized event detector (Apogee, motor burnout etc...) for a single vehicle.
Used to trigger stage separations and recovery system deployments.
'''
from enum import Enum
from typing import Tuple
__all__ = [ "EventTypes", "SimEventDetector" ]
class EventTypes(Enum):
Apogee = "apogee"
AscendingThroughAltitude = "ascendingThroughAltitude"
DescendingThroughAltitude = "descendingThroughAltitude"
MotorBurnout = "motorBurnout"
TimeReached = "timeReached"
class SimEventDetector():
'''
Each sim event detector detects events on a single rocket
'''
def __init__(self, rocket):
self.rocket = rocket
self.callbackFunctions = []
self.conditionsEvalFunctions = []
self.conditionValues = []
self.triggerDelays = []
def subscribeToEvent(self, eventType, callbackFunction, eventTriggerValue=None, triggerDelay=0):
'''
Inputs:
eventType: "apogee", "ascendingThroughAltitude", "descendingThroughAltitude", 'motorBurnout" or "timeReached"
Can pass in a member of the EventType Enum defined above, if desired
callbackfunction: function to call when the desired event occurs. Must have no arguments
eventTriggerValue: None, or a trigger parameter for the eventType - an altitude or a time
triggerDelay: (numeric) the amount of time, in seconds, to wait before triggering an event after a trigger condition has been met
Note:
Motor Burnout always triggers at the burnout time of the LOWEST stage's motor!
'''
# Define map between eventType strings and checker functions
stringToEvalFnMap = {
EventTypes.Apogee.value: self._isAfterApogee,
EventTypes.AscendingThroughAltitude.value: self._isAboveAltitude,
EventTypes.DescendingThroughAltitude.value: self._isBelowAltitude,
EventTypes.MotorBurnout.value: self._isBottomStageMotorBurnedOut,
EventTypes.TimeReached.value: self._timeReached
}
# Make sure eventType is a string
if not isinstance(eventType, str):
eventType = eventType.value # If for example EventType.Apogee is passed in, EventType.Apogee.value retrieves "apogee" from the EventType enum
self.callbackFunctions.append(callbackFunction)
self.conditionValues.append(eventTriggerValue)
self.conditionsEvalFunctions.append(stringToEvalFnMap[eventType])
self.triggerDelays.append(triggerDelay)
def triggerEvents(self) -> Tuple[float, bool]:
''' Checks if any of the events that this SimEventDetector is supposed to detect have happened. If so, triggers their callback functions.
Returns the estimated time to the next event, and whether that event happens at a set time (time-deterministic) or not (non-time-deterministic - like an altitude condition)
These return values are used to influence time step adaptation to accurately resolve discrete event timing
'''
# Save indices of conditions that have been triggered, will now be removed
indicesToRemove = []
estimatedTimeToNextEvent = 1e10
nextEventTimeDeterministic = False
# Precomputed to pass to sub-functions (required for altitude conditions)
ENUState = self.rocket.environment.convertStateToENUFrame(self.rocket.rigidBody.state)
# Check each callback condition
for i in range(len(self.callbackFunctions)):
# If a conditions has come true
eventOccurred, timeToOccurrence, timeDeterministic = self.conditionsEvalFunctions[i](self.conditionValues[i], ENUState)
# Get info about the next event to occur
if timeToOccurrence < estimatedTimeToNextEvent and not eventOccurred:
estimatedTimeToNextEvent = timeToOccurrence
nextEventTimeDeterministic = timeDeterministic
if eventOccurred:
if self.triggerDelays[i] == 0:
# Call its function
# TODO: Print message to simulation log saying that the event has been triggered
self.callbackFunctions[i]()
else:
# Schedule the event to happen in triggerDelay seconds
triggerTime = self.rocket.rigidBody.time + self.triggerDelays[i]
self.subscribeToEvent(EventTypes.TimeReached, self.callbackFunctions[i], triggerTime)
# And mark it for deletion
indicesToRemove.append(i)
# Delete all of the events that were triggered - working in reverse to ensure the indices of the items aren't changed before we delete them
for i in range(len(indicesToRemove)):
delI = len(indicesToRemove)-1-i
indexToRemove = indicesToRemove[delI]
del self.callbackFunctions[indexToRemove]
del self.conditionValues[indexToRemove]
del self.conditionsEvalFunctions[indexToRemove]
del self.triggerDelays[indexToRemove]
# Save velocity and time for next timestep (if we're trying to detect apogee)
# Doing this here to avoid problems calling the apogee function multiple times in a single time step
self.lastVelocity = ENUState.velocity
self.lastTime = self.rocket.rigidBody.time
return estimatedTimeToNextEvent, nextEventTimeDeterministic
### Event evaluation functions - each is expected to have a single parameter, return True/False ###
def _isAfterApogee(self, _, ENUState):
# Time > 1.0 condition here to avoid setting off events if sliding slightly down before engine lights
eventOccurred = ENUState.velocity.Z <= 0 and self.rocket.rigidBody.time > 1.0
timeToOccurrence = 1e10
try:
accel = (ENUState.velocity.Z - self.lastVelocity.Z) / (self.rocket.rigidBody.time - self.lastTime)
if accel < 0 and ENUState.velocity.Z > 0:
timeToOccurrence = -ENUState.velocity.Z / accel
except (AttributeError, ZeroDivisionError):
pass # Haven't saved a velocity/time yet / calling function twice in a single time step
return eventOccurred, timeToOccurrence, False
def _isAboveAltitude(self, altitude, ENUState):
eventOccurred = ENUState.position.Z >= altitude and ENUState.velocity.Z > 0
if ENUState.velocity.Z > 0:
timeToOccurrence = (altitude - ENUState.position.Z) / ENUState.velocity.Z
else:
timeToOccurrence = 1e10
return eventOccurred, timeToOccurrence, False
def _isBelowAltitude(self, altitude, ENUState):
eventOccurred = ENUState.position.Z <= altitude and ENUState.velocity.Z < 0
if ENUState.velocity.Z < 0:
timeToOccurrence = (altitude - ENUState.position.Z) / ENUState.velocity.Z
else:
timeToOccurrence = 1e10
return eventOccurred, timeToOccurrence, False
def _isBottomStageMotorBurnedOut(self, _, ENUState):
eventOccurred = self.rocket.rigidBody.time >= self.rocket.stages[-1].engineShutOffTime
timeToOccurrence = self.rocket.stages[-1].engineShutOffTime - self.rocket.rigidBody.time
return eventOccurred, timeToOccurrence, True
def _timeReached(self, time, ENUState):
eventOccurred = self.rocket.rigidBody.time >= time
timeToOccurrence = time - self.rocket.rigidBody.time
return eventOccurred, timeToOccurrence, True
Classes
class EventTypes (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Expand source code
class EventTypes(Enum): Apogee = "apogee" AscendingThroughAltitude = "ascendingThroughAltitude" DescendingThroughAltitude = "descendingThroughAltitude" MotorBurnout = "motorBurnout" TimeReached = "timeReached"
Ancestors
- enum.Enum
Class variables
var Apogee
var AscendingThroughAltitude
var DescendingThroughAltitude
var MotorBurnout
var TimeReached
class SimEventDetector (rocket)
-
Each sim event detector detects events on a single rocket
Expand source code
class SimEventDetector(): ''' Each sim event detector detects events on a single rocket ''' def __init__(self, rocket): self.rocket = rocket self.callbackFunctions = [] self.conditionsEvalFunctions = [] self.conditionValues = [] self.triggerDelays = [] def subscribeToEvent(self, eventType, callbackFunction, eventTriggerValue=None, triggerDelay=0): ''' Inputs: eventType: "apogee", "ascendingThroughAltitude", "descendingThroughAltitude", 'motorBurnout" or "timeReached" Can pass in a member of the EventType Enum defined above, if desired callbackfunction: function to call when the desired event occurs. Must have no arguments eventTriggerValue: None, or a trigger parameter for the eventType - an altitude or a time triggerDelay: (numeric) the amount of time, in seconds, to wait before triggering an event after a trigger condition has been met Note: Motor Burnout always triggers at the burnout time of the LOWEST stage's motor! ''' # Define map between eventType strings and checker functions stringToEvalFnMap = { EventTypes.Apogee.value: self._isAfterApogee, EventTypes.AscendingThroughAltitude.value: self._isAboveAltitude, EventTypes.DescendingThroughAltitude.value: self._isBelowAltitude, EventTypes.MotorBurnout.value: self._isBottomStageMotorBurnedOut, EventTypes.TimeReached.value: self._timeReached } # Make sure eventType is a string if not isinstance(eventType, str): eventType = eventType.value # If for example EventType.Apogee is passed in, EventType.Apogee.value retrieves "apogee" from the EventType enum self.callbackFunctions.append(callbackFunction) self.conditionValues.append(eventTriggerValue) self.conditionsEvalFunctions.append(stringToEvalFnMap[eventType]) self.triggerDelays.append(triggerDelay) def triggerEvents(self) -> Tuple[float, bool]: ''' Checks if any of the events that this SimEventDetector is supposed to detect have happened. If so, triggers their callback functions. Returns the estimated time to the next event, and whether that event happens at a set time (time-deterministic) or not (non-time-deterministic - like an altitude condition) These return values are used to influence time step adaptation to accurately resolve discrete event timing ''' # Save indices of conditions that have been triggered, will now be removed indicesToRemove = [] estimatedTimeToNextEvent = 1e10 nextEventTimeDeterministic = False # Precomputed to pass to sub-functions (required for altitude conditions) ENUState = self.rocket.environment.convertStateToENUFrame(self.rocket.rigidBody.state) # Check each callback condition for i in range(len(self.callbackFunctions)): # If a conditions has come true eventOccurred, timeToOccurrence, timeDeterministic = self.conditionsEvalFunctions[i](self.conditionValues[i], ENUState) # Get info about the next event to occur if timeToOccurrence < estimatedTimeToNextEvent and not eventOccurred: estimatedTimeToNextEvent = timeToOccurrence nextEventTimeDeterministic = timeDeterministic if eventOccurred: if self.triggerDelays[i] == 0: # Call its function # TODO: Print message to simulation log saying that the event has been triggered self.callbackFunctions[i]() else: # Schedule the event to happen in triggerDelay seconds triggerTime = self.rocket.rigidBody.time + self.triggerDelays[i] self.subscribeToEvent(EventTypes.TimeReached, self.callbackFunctions[i], triggerTime) # And mark it for deletion indicesToRemove.append(i) # Delete all of the events that were triggered - working in reverse to ensure the indices of the items aren't changed before we delete them for i in range(len(indicesToRemove)): delI = len(indicesToRemove)-1-i indexToRemove = indicesToRemove[delI] del self.callbackFunctions[indexToRemove] del self.conditionValues[indexToRemove] del self.conditionsEvalFunctions[indexToRemove] del self.triggerDelays[indexToRemove] # Save velocity and time for next timestep (if we're trying to detect apogee) # Doing this here to avoid problems calling the apogee function multiple times in a single time step self.lastVelocity = ENUState.velocity self.lastTime = self.rocket.rigidBody.time return estimatedTimeToNextEvent, nextEventTimeDeterministic ### Event evaluation functions - each is expected to have a single parameter, return True/False ### def _isAfterApogee(self, _, ENUState): # Time > 1.0 condition here to avoid setting off events if sliding slightly down before engine lights eventOccurred = ENUState.velocity.Z <= 0 and self.rocket.rigidBody.time > 1.0 timeToOccurrence = 1e10 try: accel = (ENUState.velocity.Z - self.lastVelocity.Z) / (self.rocket.rigidBody.time - self.lastTime) if accel < 0 and ENUState.velocity.Z > 0: timeToOccurrence = -ENUState.velocity.Z / accel except (AttributeError, ZeroDivisionError): pass # Haven't saved a velocity/time yet / calling function twice in a single time step return eventOccurred, timeToOccurrence, False def _isAboveAltitude(self, altitude, ENUState): eventOccurred = ENUState.position.Z >= altitude and ENUState.velocity.Z > 0 if ENUState.velocity.Z > 0: timeToOccurrence = (altitude - ENUState.position.Z) / ENUState.velocity.Z else: timeToOccurrence = 1e10 return eventOccurred, timeToOccurrence, False def _isBelowAltitude(self, altitude, ENUState): eventOccurred = ENUState.position.Z <= altitude and ENUState.velocity.Z < 0 if ENUState.velocity.Z < 0: timeToOccurrence = (altitude - ENUState.position.Z) / ENUState.velocity.Z else: timeToOccurrence = 1e10 return eventOccurred, timeToOccurrence, False def _isBottomStageMotorBurnedOut(self, _, ENUState): eventOccurred = self.rocket.rigidBody.time >= self.rocket.stages[-1].engineShutOffTime timeToOccurrence = self.rocket.stages[-1].engineShutOffTime - self.rocket.rigidBody.time return eventOccurred, timeToOccurrence, True def _timeReached(self, time, ENUState): eventOccurred = self.rocket.rigidBody.time >= time timeToOccurrence = time - self.rocket.rigidBody.time return eventOccurred, timeToOccurrence, True
Methods
def subscribeToEvent(self, eventType, callbackFunction, eventTriggerValue=None, triggerDelay=0)
-
Inputs
eventType: "apogee", "ascendingThroughAltitude", "descendingThroughAltitude", 'motorBurnout" or "timeReached" Can pass in a member of the EventType Enum defined above, if desired callbackfunction: function to call when the desired event occurs. Must have no arguments eventTriggerValue: None, or a trigger parameter for the eventType - an altitude or a time triggerDelay: (numeric) the amount of time, in seconds, to wait before triggering an event after a trigger condition has been met
Note
Motor Burnout always triggers at the burnout time of the LOWEST stage's motor!
Expand source code
def subscribeToEvent(self, eventType, callbackFunction, eventTriggerValue=None, triggerDelay=0): ''' Inputs: eventType: "apogee", "ascendingThroughAltitude", "descendingThroughAltitude", 'motorBurnout" or "timeReached" Can pass in a member of the EventType Enum defined above, if desired callbackfunction: function to call when the desired event occurs. Must have no arguments eventTriggerValue: None, or a trigger parameter for the eventType - an altitude or a time triggerDelay: (numeric) the amount of time, in seconds, to wait before triggering an event after a trigger condition has been met Note: Motor Burnout always triggers at the burnout time of the LOWEST stage's motor! ''' # Define map between eventType strings and checker functions stringToEvalFnMap = { EventTypes.Apogee.value: self._isAfterApogee, EventTypes.AscendingThroughAltitude.value: self._isAboveAltitude, EventTypes.DescendingThroughAltitude.value: self._isBelowAltitude, EventTypes.MotorBurnout.value: self._isBottomStageMotorBurnedOut, EventTypes.TimeReached.value: self._timeReached } # Make sure eventType is a string if not isinstance(eventType, str): eventType = eventType.value # If for example EventType.Apogee is passed in, EventType.Apogee.value retrieves "apogee" from the EventType enum self.callbackFunctions.append(callbackFunction) self.conditionValues.append(eventTriggerValue) self.conditionsEvalFunctions.append(stringToEvalFnMap[eventType]) self.triggerDelays.append(triggerDelay)
def triggerEvents(self) ‑> Tuple[float, bool]
-
Checks if any of the events that this SimEventDetector is supposed to detect have happened. If so, triggers their callback functions. Returns the estimated time to the next event, and whether that event happens at a set time (time-deterministic) or not (non-time-deterministic - like an altitude condition) These return values are used to influence time step adaptation to accurately resolve discrete event timing
Expand source code
def triggerEvents(self) -> Tuple[float, bool]: ''' Checks if any of the events that this SimEventDetector is supposed to detect have happened. If so, triggers their callback functions. Returns the estimated time to the next event, and whether that event happens at a set time (time-deterministic) or not (non-time-deterministic - like an altitude condition) These return values are used to influence time step adaptation to accurately resolve discrete event timing ''' # Save indices of conditions that have been triggered, will now be removed indicesToRemove = [] estimatedTimeToNextEvent = 1e10 nextEventTimeDeterministic = False # Precomputed to pass to sub-functions (required for altitude conditions) ENUState = self.rocket.environment.convertStateToENUFrame(self.rocket.rigidBody.state) # Check each callback condition for i in range(len(self.callbackFunctions)): # If a conditions has come true eventOccurred, timeToOccurrence, timeDeterministic = self.conditionsEvalFunctions[i](self.conditionValues[i], ENUState) # Get info about the next event to occur if timeToOccurrence < estimatedTimeToNextEvent and not eventOccurred: estimatedTimeToNextEvent = timeToOccurrence nextEventTimeDeterministic = timeDeterministic if eventOccurred: if self.triggerDelays[i] == 0: # Call its function # TODO: Print message to simulation log saying that the event has been triggered self.callbackFunctions[i]() else: # Schedule the event to happen in triggerDelay seconds triggerTime = self.rocket.rigidBody.time + self.triggerDelays[i] self.subscribeToEvent(EventTypes.TimeReached, self.callbackFunctions[i], triggerTime) # And mark it for deletion indicesToRemove.append(i) # Delete all of the events that were triggered - working in reverse to ensure the indices of the items aren't changed before we delete them for i in range(len(indicesToRemove)): delI = len(indicesToRemove)-1-i indexToRemove = indicesToRemove[delI] del self.callbackFunctions[indexToRemove] del self.conditionValues[indexToRemove] del self.conditionsEvalFunctions[indexToRemove] del self.triggerDelays[indexToRemove] # Save velocity and time for next timestep (if we're trying to detect apogee) # Doing this here to avoid problems calling the apogee function multiple times in a single time step self.lastVelocity = ENUState.velocity self.lastTime = self.rocket.rigidBody.time return estimatedTimeToNextEvent, nextEventTimeDeterministic