diff --git a/doc/api.rst b/doc/api.rst index 20958857..1554ce60 100644 --- a/doc/api.rst +++ b/doc/api.rst @@ -24,6 +24,7 @@ Creating a model piecewise.segments model.Model.linexpr model.Model.remove_constraints + model.Model.copy Classes under the hook diff --git a/doc/release_notes.rst b/doc/release_notes.rst index 35b21c67..26007b5c 100644 --- a/doc/release_notes.rst +++ b/doc/release_notes.rst @@ -4,6 +4,7 @@ Release Notes Upcoming Version ---------------- +* Add ``Model.copy()`` (default deep copy) with ``deep`` and ``include_solution`` options; support Python ``copy.copy`` and ``copy.deepcopy`` protocols via ``__copy__`` and ``__deepcopy__``. * Harmonize coordinate alignment for operations with subset/superset objects: - Multiplication and division fill missing coords with 0 (variable doesn't participate) - Addition and subtraction of constants fill missing coords with 0 (identity element) and pin result to LHS coords diff --git a/linopy/io.py b/linopy/io.py index 2213cbb5..e7353b60 100644 --- a/linopy/io.py +++ b/linopy/io.py @@ -1239,3 +1239,124 @@ def get_prefix(ds: xr.Dataset, prefix: str) -> xr.Dataset: setattr(m, k, ds.attrs.get(k)) return m + + +def copy(m: Model, include_solution: bool = False, deep: bool = True) -> Model: + """ + Return a copy of this model. + + With ``deep=True`` (default), variables, constraints, objective, + parameters, blocks, and scalar attributes are copied to a fully + independent model. With ``deep=False``, returns a shallow copy. + + :meth:`Model.copy` defaults to deep copy for workflow safety. + In contrast, ``copy.copy(model)`` is shallow via ``__copy__``, and + ``copy.deepcopy(model)`` is deep via ``__deepcopy__``. + + Solver runtime metadata (for example, ``solver_name`` and + ``solver_model``) is intentionally not copied. Solver backend state + is recreated on ``solve()``. + + Parameters + ---------- + m : Model + The model to copy. + include_solution : bool, optional + Whether to include solution and dual values in the copy. + If False (default), solve artifacts are excluded: solution/dual data, + objective value, and solve status are reset to initialized state. + If True, these values are copied when present. For unsolved models, + this has no additional effect. + deep : bool, optional + Whether to return a deep copy (default) or shallow copy. If False, + the returned model uses independent wrapper objects that share + underlying data buffers with the source model. + + Returns + ------- + Model + A deep or shallow copy of the model. + """ + from linopy.model import ( + Constraint, + Constraints, + LinearExpression, + Model, + Objective, + Variable, + Variables, + ) + + SOLVE_STATE_ATTRS = {"status", "termination_condition"} + + new_model = Model( + chunk=m._chunk, + force_dim_names=m._force_dim_names, + auto_mask=m._auto_mask, + solver_dir=str(m._solver_dir), + ) + + new_model._variables = Variables( + { + name: Variable( + var.data.copy(deep=deep) + if include_solution + else var.data[m.variables.dataset_attrs].copy(deep=deep), + new_model, + name, + ) + for name, var in m.variables.items() + }, + new_model, + ) + + new_model._constraints = Constraints( + { + name: Constraint( + con.data.copy(deep=deep) + if include_solution + else con.data[m.constraints.dataset_attrs].copy(deep=deep), + new_model, + name, + ) + for name, con in m.constraints.items() + }, + new_model, + ) + + obj_expr = LinearExpression(m.objective.expression.data.copy(deep=deep), new_model) + new_model._objective = Objective(obj_expr, new_model, m.objective.sense) + new_model._objective._value = m.objective.value if include_solution else None + + new_model._parameters = m._parameters.copy(deep=deep) + new_model._blocks = m._blocks.copy(deep=deep) if m._blocks is not None else None + + for attr in m.scalar_attrs: + if include_solution or attr not in SOLVE_STATE_ATTRS: + setattr(new_model, attr, getattr(m, attr)) + + return new_model + + +def shallowcopy(m: Model) -> Model: + """ + Support Python's ``copy.copy`` protocol for ``Model``. + + Returns a shallow copy with independent wrapper objects that share + underlying array buffers with ``m``. Solve artifacts are excluded, + matching :meth:`Model.copy` defaults. + """ + return copy(m, include_solution=False, deep=False) + + +def deepcopy(m: Model, memo: dict[int, Any]) -> Model: + """ + Support Python's ``copy.deepcopy`` protocol for ``Model``. + + Returns a deep, structurally independent copy and records it in ``memo`` + as required by Python's copy protocol. Solve artifacts are excluded, + matching :meth:`Model.copy` defaults. + """ + new_model = copy(m, include_solution=False, deep=True) + memo[id(m)] = new_model + return new_model diff --git a/linopy/model.py b/linopy/model.py index 06e814c6..84275c8b 100644 --- a/linopy/model.py +++ b/linopy/model.py @@ -53,6 +53,9 @@ ScalarLinearExpression, ) from linopy.io import ( + copy, + deepcopy, + shallowcopy, to_block_files, to_cupdlpx, to_file, @@ -1877,6 +1880,12 @@ def reset_solution(self) -> None: self.variables.reset_solution() self.constraints.reset_dual() + copy = copy + + __copy__ = shallowcopy + + __deepcopy__ = deepcopy + to_netcdf = to_netcdf to_file = to_file diff --git a/test/test_model.py b/test/test_model.py index c363fe4c..c0988c26 100644 --- a/test/test_model.py +++ b/test/test_model.py @@ -5,6 +5,7 @@ from __future__ import annotations +import copy as pycopy from pathlib import Path from tempfile import gettempdir @@ -12,8 +13,13 @@ import pytest import xarray as xr -from linopy import EQUAL, Model -from linopy.testing import assert_model_equal +from linopy import EQUAL, Model, available_solvers +from linopy.testing import ( + assert_conequal, + assert_equal, + assert_linequal, + assert_model_equal, +) target_shape: tuple[int, int] = (10, 10) @@ -163,3 +169,167 @@ def test_assert_model_equal() -> None: m.add_objective(obj) assert_model_equal(m, m) + + +@pytest.fixture(scope="module") +def copy_test_model() -> Model: + """Small representative model used across copy tests.""" + m: Model = Model() + + lower: xr.DataArray = xr.DataArray( + np.zeros((10, 10)), coords=[range(10), range(10)] + ) + upper: xr.DataArray = xr.DataArray(np.ones((10, 10)), coords=[range(10), range(10)]) + x = m.add_variables(lower, upper, name="x") + y = m.add_variables(name="y") + + m.add_constraints(1 * x + 10 * y, EQUAL, 0) + m.add_objective((10 * x + 5 * y).sum()) + + return m + + +@pytest.fixture(scope="module") +def solved_copy_test_model(copy_test_model: Model) -> Model: + """Solved representative model used across solved-copy tests.""" + m = copy_test_model.copy(deep=True) + m.solve() + return m + + +def test_model_copy_unsolved(copy_test_model: Model) -> None: + """Copy of unsolved model is structurally equal and independent.""" + m = copy_test_model.copy(deep=True) + c = m.copy(include_solution=False) + + assert_model_equal(m, c) + + # independence: mutating copy does not affect source + c.add_variables(name="z") + assert "z" not in m.variables + + +def test_model_copy_unsolved_with_solution_flag(copy_test_model: Model) -> None: + """Unsolved model with include_solution=True has no extra solve artifacts.""" + m = copy_test_model.copy(deep=True) + + c_include_solution = m.copy(include_solution=True) + c_exclude_solution = m.copy(include_solution=False) + + assert_model_equal(c_include_solution, c_exclude_solution) + assert c_include_solution.status == "initialized" + assert c_include_solution.termination_condition == "" + assert c_include_solution.objective.value is None + + +def test_model_copy_shallow(copy_test_model: Model) -> None: + """Shallow copy has independent wrappers sharing underlying data buffers.""" + m = copy_test_model.copy(deep=True) + c = m.copy(deep=False) + + assert c is not m + assert c.variables is not m.variables + assert c.constraints is not m.constraints + assert c.objective is not m.objective + + # wrappers are distinct, but shallow copy shares payload buffers + c.variables["x"].lower.values[0, 0] = 123.0 + assert m.variables["x"].lower.values[0, 0] == 123.0 + + +def test_model_deepcopy_protocol(copy_test_model: Model) -> None: + """copy.deepcopy(model) dispatches to Model.__deepcopy__ and stays independent.""" + m = copy_test_model.copy(deep=True) + c = pycopy.deepcopy(m) + + assert_model_equal(m, c) + + # Test independence: mutations to copy do not affect source + # 1. Variable mutation: add new variable + c.add_variables(name="z") + assert "z" not in m.variables + + # 2. Variable data mutation (bounds): verify buffers are independent + original_lower = m.variables["x"].lower.values[0, 0].item() + new_lower = 999 + c.variables["x"].lower.values[0, 0] = new_lower + assert c.variables["x"].lower.values[0, 0] == new_lower + assert m.variables["x"].lower.values[0, 0] == original_lower + + # 3. Constraint coefficient mutation: deep copy must not leak back + original_con_coeff = m.constraints["con0"].coeffs.values.flat[0].item() + new_con_coeff = original_con_coeff + 42 + c.constraints["con0"].coeffs.values.flat[0] = new_con_coeff + assert c.constraints["con0"].coeffs.values.flat[0] == new_con_coeff + assert m.constraints["con0"].coeffs.values.flat[0] == original_con_coeff + + # 4. Objective expression coefficient mutation: deep copy must not leak back + original_obj_coeff = m.objective.expression.coeffs.values.flat[0].item() + new_obj_coeff = original_obj_coeff + 20 + c.objective.expression.coeffs.values.flat[0] = new_obj_coeff + assert c.objective.expression.coeffs.values.flat[0] == new_obj_coeff + assert m.objective.expression.coeffs.values.flat[0] == original_obj_coeff + + # 5. Objective sense mutation + original_sense = m.objective.sense + c.objective.sense = "max" + assert c.objective.sense == "max" + assert m.objective.sense == original_sense + + +@pytest.mark.skipif(not available_solvers, reason="No solver installed") +class TestModelCopySolved: + def test_model_deepcopy_protocol_excludes_solution( + self, solved_copy_test_model: Model + ) -> None: + """copy.deepcopy on solved model drops solve state by default.""" + m = solved_copy_test_model + + c = pycopy.deepcopy(m) + + assert c.status == "initialized" + assert c.termination_condition == "" + assert c.objective.value is None + + for v in m.variables: + assert_equal( + c.variables[v].data[c.variables.dataset_attrs], + m.variables[v].data[m.variables.dataset_attrs], + ) + for con in m.constraints: + assert_conequal(c.constraints[con], m.constraints[con], strict=False) + assert_linequal(c.objective.expression, m.objective.expression) + assert c.objective.sense == m.objective.sense + + def test_model_copy_solved_with_solution( + self, solved_copy_test_model: Model + ) -> None: + """Copy with include_solution=True preserves solve state.""" + m = solved_copy_test_model + + c = m.copy(include_solution=True) + assert_model_equal(m, c) + + def test_model_copy_solved_without_solution( + self, solved_copy_test_model: Model + ) -> None: + """Copy with include_solution=False (default) drops solve state but preserves problem structure.""" + m = solved_copy_test_model + + c = m.copy(include_solution=False) + + # solve state is dropped + assert c.status == "initialized" + assert c.termination_condition == "" + assert c.objective.value is None + + # problem structure is preserved — compare only dataset_attrs to exclude solution/dual + for v in m.variables: + assert_equal( + c.variables[v].data[c.variables.dataset_attrs], + m.variables[v].data[m.variables.dataset_attrs], + ) + for con in m.constraints: + assert_conequal(c.constraints[con], m.constraints[con], strict=False) + assert_linequal(c.objective.expression, m.objective.expression) + assert c.objective.sense == m.objective.sense