Skip to content

engine

BalanceEngine

Bases: Notifier

Engine for solving insulator chains positions.

After solving any situation, many attributes are updated in the models.

Most interesting ones are

  • self.L_ref for solve_adjustment()

  • self.balance_model.nodes.dxdydz and self.span_model.parameter for solve_change_state().

Examples:

1
2
3
4
5
6
7
8
    >>> balance_engine = BalanceEngine(cable_array, section_array)
    >>> balance_engine.solve_adjustment()
    >>> wind_pressure = np.array([...])  # in Pa
    >>> ice_thickness = np.array([...])  # in m
    >>> new_temperature = np.array([...])  # in °C
    >>> balance_engine.solve_change_state(
    ...     wind_pressure, ice_thickness, new_temperature
    ... )

Parameters:

Name Type Description Default

cable_array

CableArray

Cable data

required

section_array

SectionArray

Section data

required

span_model_type

Type[Span]

Span model to use. Defaults to CatenarySpan.

CatenarySpan

deformation_model_type

Type[IDeformation]

Deformation model to use. Defaults to DeformationRte.

DeformationRte
Source code in src/mechaphlowers/core/models/balance/engine.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def __init__(
    self,
    cable_array: CableArray,
    section_array: SectionArray,
    balance_model_type: Type[IBalanceModel] = BalanceModel,
    span_model_type: Type[ISpan] = CatenarySpan,
    deformation_model_type: Type[IDeformation] = DeformationRte,
) -> None:
    # TODO: find a better way to initialize objects
    self.section_array = section_array
    self.cable_array = cable_array
    self.balance_model_type = balance_model_type
    self.span_model_type = span_model_type
    self.deformation_model_type = deformation_model_type
    self._adjustment_blocked: bool = False

    self.reset(full=True)

add_loads

add_loads(
    load_position_distance: ndarray | list,
    load_mass: ndarray | list,
) -> None

Adds loads to BalanceEngine. Updates load_position and load_mass fields in SectionArray.

Input for position is a distance, and will be converted into ratio to match SectionArray.

Expected input are arrays of size matching the number of supports. Each value refers to a span.

Parameters:

Name Type Description Default

load_position_distance

ndarray | list

Position of the loads, in meters

required

load_mass

ndarray | list

Mass of the loads

required

Raises:

Type Description
ValueError

if load_position_distance is not in [0, span_length] for at least one span

Examples:

1
2
3
4
>>> load_position_distance = np.array([150, 200, 0, np.nan])  # 4 supports/3 spans
>>> load_mass = np.array([500, 70, 0, np.nan])
>>> engine.add_loads(load_position_distance, load_mass)
>>> plot_engine.reset()  # optional: only needed if cached plots must be discarded
Source code in src/mechaphlowers/core/models/balance/engine.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def add_loads(
    self,
    load_position_distance: np.ndarray | list,
    load_mass: np.ndarray | list,
) -> None:
    """Adds loads to BalanceEngine.
    Updates load_position and load_mass fields in SectionArray.

    Input for position is a distance, and will be converted into ratio to match SectionArray.

    Expected input are arrays of size matching the number of supports. Each value refers to a span.

    Args:
        load_position_distance (np.ndarray | list): Position of the loads, in meters
        load_mass (np.ndarray | list): Mass of the loads

    Raises:
        ValueError: if load_position_distance is not in [0, span_length] for at least one span

    Examples:
        >>> load_position_distance = np.array([150, 200, 0, np.nan])  # 4 supports/3 spans
        >>> load_mass = np.array([500, 70, 0, np.nan])
        >>> engine.add_loads(load_position_distance, load_mass)
        >>> plot_engine.reset()  # optional: only needed if cached plots must be discarded
    """
    span_length = self.section_array.data["span_length"].to_numpy()
    load_position_distance = np.array(load_position_distance)
    if (
        arr.decr(load_position_distance > span_length).any()
        or arr.decr(load_position_distance < 0).any()
    ):
        raise ValueError(
            f"{load_position_distance=} should be all between 0 and {span_length=}"
        )

    # This formula for load_position_ratio may change later
    load_position_ratio = load_position_distance / span_length
    self.section_array._data["load_position"] = load_position_ratio
    self.section_array._data["load_mass"] = load_mass

    self.reset(full=False)
    debug_loads = (
        "Loads have been added. PlotEngine will be notified automatically "
        "via the observer pattern; no manual reset is required."
    )
    logger.debug(debug_loads)

get_data_spans

get_data_spans() -> dict[str, list]

Fetch data from BalanceEngine about spans.

This data is stored as a dictionary containing lists.

Returns:

Name Type Description
dict dict[str, list]

dictionnary contains following fields:

  • span_length
  • elevation
  • parameter
  • tension_sup
  • tension_inf
  • slope_left
  • slope_right
  • L0
  • horizontal_distance
  • arc_length
  • T_h
  • sag
  • sag_s2
Source code in src/mechaphlowers/core/models/balance/engine.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
def get_data_spans(self) -> dict[str, list]:
    """Fetch data from BalanceEngine about spans.

    This data is stored as a dictionary containing lists.

    Returns:
        dict: dictionnary contains following fields:
            <ul>
                <li>span_length</li>
                <li>elevation</li>
                <li>parameter</li>
                <li>tension_sup</li>
                <li>tension_inf</li>
                <li>slope_left</li>
                <li>slope_right</li>
                <li>L0</li>
                <li>horizontal_distance</li>
                <li>arc_length</li>
                <li>T_h</li>
                <li>sag</li>
                <li>sag_s2</li>
            </ul>
    """
    T_sup, T_inf = self.span_model.tensions_sup_inf()
    force_output_unit = options.output_units.force
    T_sup_q_array, T_inf_q_array = (
        QuantityArray(T_sup, 'N', force_output_unit),
        QuantityArray(T_inf, 'N', force_output_unit),
    )
    T_h_q_array = QuantityArray(
        self.span_model.T_h(), 'N', force_output_unit
    )
    span_slope_left = QuantityArray(
        self.span_model.slope(side="left"), 'rad', 'deg'
    )
    span_slope_right = QuantityArray(
        self.span_model.slope(side="right"), 'rad', 'deg'
    )

    result_dict = {
        "span_length": arr.decr(
            self.section_array.data["span_length"].to_numpy()
        ).tolist(),
        "elevation": arr.decr(
            self.section_array.data["elevation_difference"].to_numpy()
        ).tolist(),
        "parameter": arr.decr(self.parameter).tolist(),
        "slope_left": arr.decr(span_slope_left.value()).tolist(),
        "slope_right": arr.decr(span_slope_right.value()).tolist(),
        "tension_sup": arr.decr(T_sup_q_array.value()).tolist(),
        "tension_inf": arr.decr(T_inf_q_array.value()).tolist(),
        "L0": self.L_ref.tolist(),
        "horizontal_distance": self.balance_model.a.tolist(),
        "arc_length": arr.decr(self.span_model.compute_L()).tolist(),
        "T_h": arr.decr(T_h_q_array.value()).tolist(),
        "sag": arr.decr(self.span_model.sag()).tolist(),
        "sag_s2": arr.decr(self.span_model.sag_s2()).tolist(),
    }
    return result_dict

get_ruling_span_length

get_ruling_span_length() -> float

Compute ruling span length:

if we considered the whole section as a single span, the length would be ruling_span_length

Used for tensions computation when unfolding the cable.

\(L_{R} = \sqrt{\frac{\sum(L_n ^ 4 / C_n)}{\sum{C_n}}}\)

where \(L_n\) are the horizontal length of span n, and \(C_n\) the chord length of span n

Returns:

Name Type Description
float float

span length of ruling span

Source code in src/mechaphlowers/core/models/balance/engine.py
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
def get_ruling_span_length(self) -> float:
    """Compute ruling span length:

    if we considered the whole section as a single span, the length would be ruling_span_length

    Used for tensions computation when unfolding the cable.

    $L_{R} = \\sqrt{\\frac{\\sum(L_n ^ 4 / C_n)}{\\sum{C_n}}}$

    where $L_n$ are the horizontal length of span n, and $C_n$ the chord length of span n

    Returns:
        float: span length of ruling span
    """
    # proto uses section_array.span_length instead of balance_model.a (called a_chain in proto)
    chord = np.sqrt(self.balance_model.a**2 + self.balance_model.b**2)
    return np.sqrt(np.sum(self.balance_model.a**4 / chord) / np.sum(chord))

reset

reset(full: bool = False) -> None

Reset the balance engine to initial state.

This method re-initializes the span model, cable loads, deformation model, balance model, and solvers. This method is useful when an error occurs during solving that may cause an inconsistent state with NaN values.

Source code in src/mechaphlowers/core/models/balance/engine.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def reset(self, full: bool = False) -> None:
    """Reset the balance engine to initial state.

    This method re-initializes the span model, cable loads, deformation model, balance model, and solvers.
    This method is useful when an error occurs during solving that may cause an inconsistent state with NaN values.
    """

    logger.debug("Resetting balance engine.")

    if full:
        self.initialized = False
        zeros_vector = np.zeros_like(
            self.section_array.data.conductor_attachment_altitude.to_numpy()
        )
        sagging_temperature = arr.decr(
            (self.section_array.data.sagging_temperature.to_numpy())
        )
        parameter = arr.decr(
            self.section_array.data.sagging_parameter.to_numpy()
        )
        self.span_model = span_model_builder(
            self.section_array, self.cable_array, self.span_model_type
        )
        self.cable_loads = CableLoads(
            np.float64(self.cable_array.data.diameter.iloc[0]),
            np.float64(self.cable_array.data.linear_weight.iloc[0]),
            zeros_vector,
            zeros_vector,
        )
        self.deformation_model = deformation_model_builder(
            self.cable_array,
            self.span_model,
            sagging_temperature,
            self.deformation_model_type,
        )
        super().__init__()
        self.balance_model = self.balance_model_type(
            sagging_temperature,
            parameter,
            self.section_array,
            self.cable_array,
            self.span_model,
            self.deformation_model,
            self.cable_loads,
        )
    else:
        self.balance_model.reset(
            cable_array=self.cable_array,
            span_model=self.span_model,
            deformation_model=self.deformation_model,
            cable_loads=self.cable_loads,
            full=full,
        )

    if full:
        self.solver_change_state = BalanceSolver(
            **options.solver.balance_solver_change_state_params
        )
        self.solver_adjustment = BalanceSolver(
            **options.solver.balance_solver_adjustment_params
        )
        self.L_ref: np.ndarray

    self.get_displacement: Callable[[], np.ndarray] = (
        self.balance_model.chain_displacement
    )

    self.notify()
    self.initialized = True

    logger.debug("Balance engine initialized.")

solve_adjustment

solve_adjustment() -> None

Solve the chain positions in the adjustment case, updating L_ref in the balance model. In this case, there is no weather, no loads, and temperature is the sagging temperature.

After running this method, many attributes are updated. Most interesting ones are L_ref, parameter in Span, and dxdydz in Nodes.

Raises:

Type Description
SolverError

If the solver fails to converge.

RuntimeError

If adjustment is blocked (engine built from manipulations).

Source code in src/mechaphlowers/core/models/balance/engine.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
@check_time
def solve_adjustment(self) -> None:
    """Solve the chain positions in the adjustment case, updating L_ref in the balance model.
    In this case, there is no weather, no loads, and temperature is the sagging temperature.

    After running this method, many attributes are updated.
    Most interesting ones are `L_ref`, `parameter` in Span, and `dxdydz` in Nodes.

    Raises:
        SolverError: If the solver fails to converge.
        RuntimeError: If adjustment is blocked (engine built from manipulations).
    """
    if self._adjustment_blocked:
        raise RuntimeError(
            "solve_adjustment is blocked on this engine. "
            "L_ref was injected externally from a clean adjustment."
        )
    logger.debug("Starting adjustment.")

    self.balance_model.adjustment = True
    try:
        self.solver_adjustment.solve(self.balance_model)
    except SolverError as e:
        logger.error(
            "Error during solve_adjustment, resetting balance engine."
        )
        e.origin = "solve_adjustment"
        raise e

    self.initial_L_ref = self.L_ref = self.balance_model.update_L_ref()

    logger.debug(f"Output : L_ref = {str(self.L_ref)}")

solve_change_state

solve_change_state(
    wind_pressure: ndarray | float | None = None,
    ice_thickness: ndarray | float | None = None,
    new_temperature: ndarray | float | None = None,
    wind_direction: Literal[
        'clockwise', 'anticlockwise'
    ] = 'anticlockwise',
) -> None

Solve the chain positions, for a case of change of state. Updates weather conditions and/or sagging temperature if provided. Takes into account loads if any.

Parameters:

Name Type Description Default

wind_pressure

ndarray | float | None

Wind pressure in Pa. Default to None

None

ice_thickness

ndarray | float | None

Ice thickness in m. Default to None

None

new_temperature

ndarray | float | None

New temperature in °C. Default to None

None

wind_direction

Literal['clockwise', 'anticlockwise']

Direction of the wind: if "clockwise": towards user (right), if "anticlockwise": away from user (left). Default to "anticlockwise".

'anticlockwise'

After running this method, many attributes are updated. Most interesting ones are L_ref, parameter in Span, and dxdydz in Nodes.

Raises:

Type Description
SolverError

If the solver fails to converge.

TypeError

If input parameters have incorrect type.

ValueError

If input parameters have incorrect shape.

Source code in src/mechaphlowers/core/models/balance/engine.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
@check_time
def solve_change_state(
    self,
    wind_pressure: np.ndarray | float | None = None,
    ice_thickness: np.ndarray | float | None = None,
    new_temperature: np.ndarray | float | None = None,
    wind_direction: Literal[
        "clockwise", "anticlockwise"
    ] = "anticlockwise",
) -> None:
    """Solve the chain positions, for a case of change of state.
    Updates weather conditions and/or sagging temperature if provided.
    Takes into account loads if any.

    Args:
        wind_pressure (np.ndarray | float | None): Wind pressure in Pa. Default to None
        ice_thickness (np.ndarray | float | None): Ice thickness in m. Default to None
        new_temperature (np.ndarray | float | None): New temperature in °C. Default to None
        wind_direction (Literal["clockwise", "anticlockwise"]): Direction of the wind: if "clockwise": towards user (right), if "anticlockwise": away from user (left). Default to "anticlockwise".

    After running this method, many attributes are updated.
    Most interesting ones are `L_ref`, `parameter` in Span, and `dxdydz` in Nodes.

    Raises:
        SolverError: If the solver fails to converge.
        TypeError: If input parameters have incorrect type.
        ValueError: If input parameters have incorrect shape.
    """
    logger.debug("Starting change state.")
    logger.debug(
        f"Parameters received: \nwind_pressure {str(wind_pressure)}\nice_thickness {str(ice_thickness)}\nnew_temperature {str(new_temperature)}\nwind_direction {str(wind_direction)}"
    )

    if wind_direction not in ["clockwise", "anticlockwise"]:
        raise ValueError(
            f"wind_direction should be 'clockwise' or 'anticlockwise', received {wind_direction}"
        )

    # check if adjustment has been done before
    try:
        _ = self.initial_L_ref
        logger.debug(
            f"Adjustment has been done before, initial_L_ref before shifting: {str(self.initial_L_ref)}"
        )
    except AttributeError:
        logger.warning(self._warning_no_L_ref)
        warnings.warn(self._warning_no_L_ref, BalanceEngineWarning)
        self.solve_adjustment()

    # Use current span_model (potentially rebuilt by solve_adjustment)
    span_shape = (
        self.span_model.parameter.shape
    )  # span_model holds n-sized array (same shape as span_length)

    def validate_input(input_value, name: str):
        if input_value is None:
            input_value = np.full(span_shape, self.default_value[name])
        elif isinstance(input_value, (int, float)):
            input_value = np.full(span_shape, input_value)
        elif isinstance(input_value, np.ndarray):
            if input_value.shape != span_shape:
                raise ValueError(
                    f"{name} has incorrect shape: {span_shape} is expected, received {input_value.shape}"
                )
        else:
            raise TypeError(f"{name} has incorrect type")

        return input_value

    # Set model attributes after potential solve_adjustment (which may
    # rebuild models via reset(full=True)).
    validated_wind = validate_input(wind_pressure, "wind_pressure")
    if wind_direction == "clockwise":
        validated_wind = -validated_wind

    self.balance_model.cable_loads.wind_pressure = validated_wind

    # TODO: convert ice thickness from cm to m? Right now, user has to input in m
    self.balance_model.cable_loads.ice_thickness = validate_input(
        ice_thickness, "ice_thickness"
    )

    new_t = validate_input(new_temperature, "new_temperature")
    self.balance_model.sagging_temperature = arr.decr(new_t)
    self.deformation_model.current_temperature = new_t

    self.balance_model.adjustment = False

    self.span_model.load_coefficient = (
        self.balance_model.cable_loads.load_coefficient
    )

    try:
        self.solver_change_state.solve(self.balance_model)
    except SolverError as e:
        logger.error(
            "Error during solve_change_state, you should reset the balance engine."
        )
        e.origin = "solve_change_state"
        raise e

    logger.debug(
        f"Output : get_displacement \n{str(self.get_displacement())}"
    )
    self.balance_model.update_nodes_span_model()