Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 196 additions & 11 deletions OMPython/ModelicaSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,22 +383,21 @@ def __init__(
self._linearized_outputs: list[str] = [] # linearization output list
self._linearized_states: list[str] = [] # linearization states list

self._session = session

# get OpenModelica version
version_str = self._session.get_version()
self._version = self._parse_om_version(version=version_str)

self._simulated = False # True if the model has already been simulated
self._result_file: Optional[OMPathABC] = None # for storing result file

self._work_dir: OMPathABC = self.setWorkDirectory(work_directory)

self._model_name: Optional[str] = None
self._libraries: Optional[list[str | tuple[str, str]]] = None
self._file_name: Optional[OMPathABC] = None
self._variable_filter: Optional[str] = None

self._session = session
# get OpenModelica version
version_str = self._session.get_version()
self._version = self._parse_om_version(version=version_str)

self._work_dir: OMPathABC = self.setWorkDirectory(work_directory)

def get_session(self) -> OMSessionABC:
"""
Return the OMC session used for this class.
Expand Down Expand Up @@ -468,6 +467,8 @@ def _xmlparse(self, xml_file: OMPathABC):
xml_content = xml_file.read_text()
tree = ET.ElementTree(ET.fromstring(xml_content))
root = tree.getroot()
if root is None:
raise ModelicaSystemError(f"Cannot read XML file: {xml_file}")
for attr in root.iter('DefaultExperiment'):
for key in ("startTime", "stopTime", "stepSize", "tolerance",
"solver", "outputFormat"):
Expand Down Expand Up @@ -1935,7 +1936,7 @@ def getSolutions(
self,
varList: Optional[str | list[str]] = None,
resultfile: Optional[str | os.PathLike] = None,
) -> tuple[str] | np.ndarray:
) -> tuple[str, ...] | np.ndarray:
"""Extract simulation results from a result data file.

Args:
Expand Down Expand Up @@ -1984,7 +1985,8 @@ def getSolutions(
result_vars = self.sendExpression(expr=f'readSimulationResultVars("{result_file.as_posix()}")')
self.sendExpression(expr="closeSimulationResultFile()")
if varList is None:
return result_vars
var_list = [str(var) for var in result_vars]
return tuple(var_list)

if isinstance(varList, str):
var_list_checked = [varList]
Expand Down Expand Up @@ -2064,6 +2066,8 @@ def convertFmu2Mo(
raise ModelicaSystemError(f"Missing FMU file: {fmu_path.as_posix()}")

filename = self._requestApi(apiName='importFMU', entity=fmu_path.as_posix())
if not isinstance(filename, str):
raise ModelicaSystemError(f"Invalid return value for the FMU filename: {filename}")
filepath = self.getWorkDirectory() / filename

# report proper error message
Expand Down Expand Up @@ -2106,14 +2110,148 @@ def optimize(self) -> dict[str, Any]:
"""
properties = ','.join(f"{key}={val}" for key, val in self._optimization_options.items())
self.set_command_line_options("-g=Optimica")
return self._requestApi(apiName='optimize', entity=self._model_name, properties=properties)
retval = self._requestApi(apiName='optimize', entity=self._model_name, properties=properties)
retval = cast(dict, retval)
return retval


class ModelicaSystem(ModelicaSystemOMC):
"""
Compatibility class.
"""

def __init__(
self,
fileName: Optional[str | os.PathLike | pathlib.Path] = None,
modelName: Optional[str] = None,
lmodel: Optional[list[str | tuple[str, str]]] = None,
commandLineOptions: Optional[list[str]] = None,
variableFilter: Optional[str] = None,
customBuildDirectory: Optional[str | os.PathLike] = None,
omhome: Optional[str] = None,
omc_process: Optional[OMCSessionLocal] = None,
build: bool = True,
) -> None:
super().__init__(
command_line_options=commandLineOptions,
work_directory=customBuildDirectory,
omhome=omhome,
session=omc_process,
)
self.model(
model_name=modelName,
model_file=fileName,
libraries=lmodel,
variable_filter=variableFilter,
build=build,
)
self._getconn = self._session

def setCommandLineOptions(self, commandLineOptions: str):
super().set_command_line_options(command_line_option=commandLineOptions)

def setContinuous( # type: ignore[override]
self,
cvals: str | list[str] | dict[str, Any],
) -> bool:
if isinstance(cvals, dict):
return super().setContinuous(**cvals)
raise ModelicaSystemError("Only dict input supported for setContinuous()")

def setParameters( # type: ignore[override]
self,
pvals: str | list[str] | dict[str, Any],
) -> bool:
if isinstance(pvals, dict):
return super().setParameters(**pvals)
raise ModelicaSystemError("Only dict input supported for setParameters()")

def setOptimizationOptions( # type: ignore[override]
self,
optimizationOptions: str | list[str] | dict[str, Any],
) -> bool:
if isinstance(optimizationOptions, dict):
return super().setOptimizationOptions(**optimizationOptions)
raise ModelicaSystemError("Only dict input supported for setOptimizationOptions()")

def setInputs( # type: ignore[override]
self,
name: str | list[str] | dict[str, Any],
) -> bool:
if isinstance(name, dict):
return super().setInputs(**name)
raise ModelicaSystemError("Only dict input supported for setInputs()")

def setSimulationOptions( # type: ignore[override]
self,
simOptions: str | list[str] | dict[str, Any],
) -> bool:
if isinstance(simOptions, dict):
return super().setSimulationOptions(**simOptions)
raise ModelicaSystemError("Only dict input supported for setSimulationOptions()")

def setLinearizationOptions( # type: ignore[override]
self,
linearizationOptions: str | list[str] | dict[str, Any],
) -> bool:
if isinstance(linearizationOptions, dict):
return super().setLinearizationOptions(**linearizationOptions)
raise ModelicaSystemError("Only dict input supported for setLinearizationOptions()")

def getContinuous(
self,
names: Optional[str | list[str]] = None,
):
retval = super().getContinuous(names=names)
if self._simulated:
return retval

if isinstance(retval, dict):
retval2: dict = {}
for key, val in retval.items():
if np.isnan(val):
retval2[key] = None
else:
retval2[key] = str(val)
return retval2
if isinstance(retval, list):
retval3: list[str | None] = []
for val in retval:
if np.isnan(val):
retval3.append(None)
else:
retval3.append(str(val))
return retval3

raise ModelExecutionException("Invalid data!")

def getOutputs(
self,
names: Optional[str | list[str]] = None,
):
retval = super().getOutputs(names=names)
if self._simulated:
return retval

if isinstance(retval, dict):
retval2: dict = {}
for key, val in retval.items():
if np.isnan(val):
retval2[key] = None
else:
retval2[key] = str(val)
return retval2
if isinstance(retval, list):
retval3: list[str | None] = []
for val in retval:
if np.isnan(val):
retval3.append(None)
else:
retval3.append(str(val))
return retval3

raise ModelExecutionException("Invalid data!")


class ModelicaDoEABC(metaclass=abc.ABCMeta):
"""
Expand Down Expand Up @@ -2685,3 +2823,50 @@ def _prepare_structure_parameters(
"pre-compiled binary of model.")

return {}


class ModelicaSystemCmd(ModelExecutionCmd):
# TODO: docstring

def __init__(
self,
runpath: pathlib.Path,
modelname: str,
timeout: float = 10.0,
) -> None:
super().__init__(
runpath=runpath,
timeout=timeout,
cmd_prefix=[],
model_name=modelname,
)

def get_exe(self) -> pathlib.Path:
"""Get the path to the compiled model executable."""
# TODO: move to the top
import platform

path_run = pathlib.Path(self._runpath)
if platform.system() == "Windows":
path_exe = path_run / f"{self._model_name}.exe"
else:
path_exe = path_run / self._model_name

if not path_exe.exists():
raise ModelicaSystemError(f"Application file path not found: {path_exe}")

return path_exe

def get_cmd(self) -> list:
"""Get a list with the path to the executable and all command line args.

This can later be used as an argument for subprocess.run().
"""

cmdl = [self.get_exe().as_posix()] + self.get_cmd_args()

return cmdl

def run(self):
cmd_definition = self.definition()
return cmd_definition.run()
Loading
Loading