Skip to content

Analysis API

Placeholder for analysis API docs.

Bode dataclass

Container opcional para quem preferir um objeto tipado.

Source code in src/cat/analysis/metrics_ac.py
17
18
19
20
21
22
23
@dataclass(frozen=True)
class Bode:
    """Container opcional para quem preferir um objeto tipado."""

    f: NDArray[Any]  # Hz
    mag_db: NDArray[Any]  # dB
    ph_deg: NDArray[Any]  # graus

MonteCarloResult dataclass

Source code in src/cat/analysis/montecarlo.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
@dataclass(frozen=True)
class MonteCarloResult:
    samples: list[dict[str, float]]
    runs: list[AnalysisResult]
    # optional metadata about the varied parameters: list of (label, nominal, dist_repr)
    mapping_manifest: list[tuple[str, float, str]] | None = None

    def to_dataframe(
        self,
        metric: (
            Callable[[AnalysisResult], float | dict[str, Any]]
            | TMapping[str, Callable[[AnalysisResult], Any]]
            | None
        ) = None,
        *,
        trial_name: str = "trial",
        param_prefix: str = "",
        y: Sequence[str] | None = None,
        sample_at: float | None = None,
    ) -> Any:
        """
        Returns a per-trial DataFrame with columns:
          - trial (index within this Monte Carlo run)
          - one column per sampled parameter (from `samples`), optionally prefixed
          - optional metric columns computed from each AnalysisResult
          - optional raw trace columns (final value or sampled at `sample_at` seconds)

        metric:
          - callable → result stored in column 'metric' (float or scalar)
          - mapping name->callable → adds one column per metric name
        y: list of trace names to extract values for each run. If `sample_at` is given,
           the value is linearly interpolated at t=sample_at using the run's time axis;
           otherwise, the last value in the trace is used.
        """
        try:
            pd: Any = importlib.import_module("pandas")
        except Exception as exc:  # pragma: no cover
            raise RuntimeError("pandas is required for MonteCarloResult.to_dataframe()") from exc

        rows: list[dict[str, Any]] = []
        for i, (s, run) in enumerate(zip(self.samples, self.runs, strict=False)):
            # copy sampled params; optionally add prefix
            if param_prefix:
                row = {f"{param_prefix}{k}": v for k, v in s.items()}
            else:
                row = dict(s)
            row[trial_name] = i
            if metric is not None:
                if hasattr(metric, "items"):
                    for name, fn in cast(
                        TMapping[str, Callable[[AnalysisResult], Any]], metric
                    ).items():
                        row[name] = fn(run)
                else:
                    m = cast(Callable[[AnalysisResult], Any], metric)(run)
                    if isinstance(m, dict):
                        row.update(m)
                    else:
                        row["metric"] = m

            if y:
                try:
                    import numpy as _np  # local import to avoid hard dep at module import
                except Exception:  # pragma: no cover
                    _np = None  # type: ignore[assignment]

                ts = run.traces
                # pick x axis name
                xname = getattr(ts.x, "name", "time")
                for name in y:
                    vals = ts[name].values
                    if sample_at is not None and _np is not None and xname.lower() == "time":
                        t = ts[xname].values
                        row[name] = float(_np.interp(sample_at, t, vals))
                    else:
                        row[name] = (
                            float(vals[-1]) if len(vals) else _np.nan if _np is not None else 0.0
                        )
            rows.append(row)
        return pd.DataFrame(rows)

    def to_csv(
        self,
        path: str,
        metric: (
            Callable[[AnalysisResult], float | dict[str, Any]]
            | TMapping[str, Callable[[AnalysisResult], Any]]
            | None
        ) = None,
        *,
        trial_name: str = "trial",
        param_prefix: str = "",
        y: Sequence[str] | None = None,
        sample_at: float | None = None,
        columns: Sequence[str] | None = None,
        index: bool = False,
        **to_csv_kwargs: Any,
    ) -> None:
        """Write the Monte Carlo per-trial table to CSV.

        - `path`: output file path (passed to pandas.DataFrame.to_csv).
        - `metric`, `trial_name`, `param_prefix`, `y`, `sample_at` are forwarded
          to :meth:`to_dataframe` and behave the same.
        - `columns`: optional sequence of column names to keep (order preserved).
        - `index`: whether to write the DataFrame index (default False).
        - `to_csv_kwargs`: additional keyword args passed to pandas.DataFrame.to_csv.

        Raises RuntimeError if pandas is not available.
        """
        try:
            importlib.import_module("pandas")
        except Exception as exc:  # pragma: no cover
            raise RuntimeError("pandas is required for MonteCarloResult.to_csv()") from exc

        df = self.to_dataframe(
            metric=metric,
            trial_name=trial_name,
            param_prefix=param_prefix,
            y=y,
            sample_at=sample_at,
        )
        if columns is not None:
            df = df.loc[:, list(columns)]
        df.to_csv(path, index=index, **to_csv_kwargs)

    def save_samples_csv(
        self, path: str, *, param_prefix: str = "", index: bool = False, **to_csv_kwargs: Any
    ) -> None:
        """Write only the sampled parameters (and trial index) to CSV.

        This is a convenience helper that writes the per-trial sampled parameters
        (the entries produced when generating the Monte Carlo `samples`) to a CSV
        file. Columns are the sampled parameter names (optionally prefixed) and
        the trial column named 'trial'.
        """
        try:
            importlib.import_module("pandas")
        except Exception as exc:  # pragma: no cover
            raise RuntimeError(
                "pandas is required for MonteCarloResult.save_samples_csv()"
            ) from exc

        df = self.to_dataframe(metric=None, trial_name="trial", param_prefix=param_prefix, y=None)
        df.to_csv(path, index=index, **to_csv_kwargs)

    def save_manifest_csv(self, path: str, *, index: bool = False, **to_csv_kwargs: Any) -> None:
        """Write a small manifest describing the varied parameters to CSV.

        The manifest columns are: label, nominal, dist. The manifest is taken from
        `mapping_manifest` populated by the `monte_carlo` helper when available.
        """
        try:
            importlib.import_module("pandas")
        except Exception as exc:  # pragma: no cover
            raise RuntimeError(
                "pandas is required for MonteCarloResult.save_manifest_csv()"
            ) from exc

        if not self.mapping_manifest:
            # nothing to write
            return

        import pandas as pd  # type: ignore[import-untyped]  # local import; optional runtime dependency

        df = pd.DataFrame(self.mapping_manifest, columns=["label", "nominal", "dist"])
        df.to_csv(path, index=index, **to_csv_kwargs)

save_manifest_csv(path, *, index=False, **to_csv_kwargs)

Write a small manifest describing the varied parameters to CSV.

The manifest columns are: label, nominal, dist. The manifest is taken from mapping_manifest populated by the monte_carlo helper when available.

Source code in src/cat/analysis/montecarlo.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def save_manifest_csv(self, path: str, *, index: bool = False, **to_csv_kwargs: Any) -> None:
    """Write a small manifest describing the varied parameters to CSV.

    The manifest columns are: label, nominal, dist. The manifest is taken from
    `mapping_manifest` populated by the `monte_carlo` helper when available.
    """
    try:
        importlib.import_module("pandas")
    except Exception as exc:  # pragma: no cover
        raise RuntimeError(
            "pandas is required for MonteCarloResult.save_manifest_csv()"
        ) from exc

    if not self.mapping_manifest:
        # nothing to write
        return

    import pandas as pd  # type: ignore[import-untyped]  # local import; optional runtime dependency

    df = pd.DataFrame(self.mapping_manifest, columns=["label", "nominal", "dist"])
    df.to_csv(path, index=index, **to_csv_kwargs)

save_samples_csv(path, *, param_prefix='', index=False, **to_csv_kwargs)

Write only the sampled parameters (and trial index) to CSV.

This is a convenience helper that writes the per-trial sampled parameters (the entries produced when generating the Monte Carlo samples) to a CSV file. Columns are the sampled parameter names (optionally prefixed) and the trial column named 'trial'.

Source code in src/cat/analysis/montecarlo.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def save_samples_csv(
    self, path: str, *, param_prefix: str = "", index: bool = False, **to_csv_kwargs: Any
) -> None:
    """Write only the sampled parameters (and trial index) to CSV.

    This is a convenience helper that writes the per-trial sampled parameters
    (the entries produced when generating the Monte Carlo `samples`) to a CSV
    file. Columns are the sampled parameter names (optionally prefixed) and
    the trial column named 'trial'.
    """
    try:
        importlib.import_module("pandas")
    except Exception as exc:  # pragma: no cover
        raise RuntimeError(
            "pandas is required for MonteCarloResult.save_samples_csv()"
        ) from exc

    df = self.to_dataframe(metric=None, trial_name="trial", param_prefix=param_prefix, y=None)
    df.to_csv(path, index=index, **to_csv_kwargs)

to_csv(path, metric=None, *, trial_name='trial', param_prefix='', y=None, sample_at=None, columns=None, index=False, **to_csv_kwargs)

Write the Monte Carlo per-trial table to CSV.

  • path: output file path (passed to pandas.DataFrame.to_csv).
  • metric, trial_name, param_prefix, y, sample_at are forwarded to :meth:to_dataframe and behave the same.
  • columns: optional sequence of column names to keep (order preserved).
  • index: whether to write the DataFrame index (default False).
  • to_csv_kwargs: additional keyword args passed to pandas.DataFrame.to_csv.

Raises RuntimeError if pandas is not available.

Source code in src/cat/analysis/montecarlo.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def to_csv(
    self,
    path: str,
    metric: (
        Callable[[AnalysisResult], float | dict[str, Any]]
        | TMapping[str, Callable[[AnalysisResult], Any]]
        | None
    ) = None,
    *,
    trial_name: str = "trial",
    param_prefix: str = "",
    y: Sequence[str] | None = None,
    sample_at: float | None = None,
    columns: Sequence[str] | None = None,
    index: bool = False,
    **to_csv_kwargs: Any,
) -> None:
    """Write the Monte Carlo per-trial table to CSV.

    - `path`: output file path (passed to pandas.DataFrame.to_csv).
    - `metric`, `trial_name`, `param_prefix`, `y`, `sample_at` are forwarded
      to :meth:`to_dataframe` and behave the same.
    - `columns`: optional sequence of column names to keep (order preserved).
    - `index`: whether to write the DataFrame index (default False).
    - `to_csv_kwargs`: additional keyword args passed to pandas.DataFrame.to_csv.

    Raises RuntimeError if pandas is not available.
    """
    try:
        importlib.import_module("pandas")
    except Exception as exc:  # pragma: no cover
        raise RuntimeError("pandas is required for MonteCarloResult.to_csv()") from exc

    df = self.to_dataframe(
        metric=metric,
        trial_name=trial_name,
        param_prefix=param_prefix,
        y=y,
        sample_at=sample_at,
    )
    if columns is not None:
        df = df.loc[:, list(columns)]
    df.to_csv(path, index=index, **to_csv_kwargs)

to_dataframe(metric=None, *, trial_name='trial', param_prefix='', y=None, sample_at=None)

Returns a per-trial DataFrame with columns
  • trial (index within this Monte Carlo run)
  • one column per sampled parameter (from samples), optionally prefixed
  • optional metric columns computed from each AnalysisResult
  • optional raw trace columns (final value or sampled at sample_at seconds)
metric
  • callable → result stored in column 'metric' (float or scalar)
  • mapping name->callable → adds one column per metric name

y: list of trace names to extract values for each run. If sample_at is given, the value is linearly interpolated at t=sample_at using the run's time axis; otherwise, the last value in the trace is used.

Source code in src/cat/analysis/montecarlo.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def to_dataframe(
    self,
    metric: (
        Callable[[AnalysisResult], float | dict[str, Any]]
        | TMapping[str, Callable[[AnalysisResult], Any]]
        | None
    ) = None,
    *,
    trial_name: str = "trial",
    param_prefix: str = "",
    y: Sequence[str] | None = None,
    sample_at: float | None = None,
) -> Any:
    """
    Returns a per-trial DataFrame with columns:
      - trial (index within this Monte Carlo run)
      - one column per sampled parameter (from `samples`), optionally prefixed
      - optional metric columns computed from each AnalysisResult
      - optional raw trace columns (final value or sampled at `sample_at` seconds)

    metric:
      - callable → result stored in column 'metric' (float or scalar)
      - mapping name->callable → adds one column per metric name
    y: list of trace names to extract values for each run. If `sample_at` is given,
       the value is linearly interpolated at t=sample_at using the run's time axis;
       otherwise, the last value in the trace is used.
    """
    try:
        pd: Any = importlib.import_module("pandas")
    except Exception as exc:  # pragma: no cover
        raise RuntimeError("pandas is required for MonteCarloResult.to_dataframe()") from exc

    rows: list[dict[str, Any]] = []
    for i, (s, run) in enumerate(zip(self.samples, self.runs, strict=False)):
        # copy sampled params; optionally add prefix
        if param_prefix:
            row = {f"{param_prefix}{k}": v for k, v in s.items()}
        else:
            row = dict(s)
        row[trial_name] = i
        if metric is not None:
            if hasattr(metric, "items"):
                for name, fn in cast(
                    TMapping[str, Callable[[AnalysisResult], Any]], metric
                ).items():
                    row[name] = fn(run)
            else:
                m = cast(Callable[[AnalysisResult], Any], metric)(run)
                if isinstance(m, dict):
                    row.update(m)
                else:
                    row["metric"] = m

        if y:
            try:
                import numpy as _np  # local import to avoid hard dep at module import
            except Exception:  # pragma: no cover
                _np = None  # type: ignore[assignment]

            ts = run.traces
            # pick x axis name
            xname = getattr(ts.x, "name", "time")
            for name in y:
                vals = ts[name].values
                if sample_at is not None and _np is not None and xname.lower() == "time":
                    t = ts[xname].values
                    row[name] = float(_np.interp(sample_at, t, vals))
                else:
                    row[name] = (
                        float(vals[-1]) if len(vals) else _np.nan if _np is not None else 0.0
                    )
        rows.append(row)
    return pd.DataFrame(rows)

RiseFall dataclass

Tempos de subida/descida entre frações de nível (ex.: 10%→90%).

Source code in src/cat/analysis/metrics_tran.py
136
137
138
139
140
141
@dataclass(frozen=True)
class RiseFall:
    """Tempos de subida/descida entre frações de nível (ex.: 10%→90%)."""

    trise: float | None
    tfall: float | None

ac_gain_phase(ts, y_out, y_in=None)

Retorna (f, mag_db, fase_deg).

Source code in src/cat/analysis/metrics_ac.py
235
236
237
238
239
240
241
def ac_gain_phase(
    ts: TraceSet,
    y_out: str,
    y_in: str | None = None,
) -> tuple[NDArray[Any], NDArray[Any], NDArray[Any]]:
    """Retorna (f, mag_db, fase_deg)."""
    return _get_xy_ac(ts, y_out=y_out, y_in=y_in)

bandwidth_3db(ts, y_out, y_in=None)

Frequência -3 dB relativa ao ganho de baixa frequência.

Source code in src/cat/analysis/metrics_ac.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
def bandwidth_3db(
    ts: TraceSet,
    y_out: str,
    y_in: str | None = None,
) -> float | None:
    """Frequência -3 dB relativa ao ganho de baixa frequência."""
    f, mag_db, _ = ac_gain_phase(ts, y_out=y_out, y_in=y_in)
    g0 = float(mag_db[0])
    target = g0 - 3.0
    below = np.where(mag_db <= target)[0]
    if below.size == 0:
        return None
    i = below[0]
    if i == 0:
        return float(f[0])
    x0, x1 = f[i - 1], f[i]
    y0, y1 = mag_db[i - 1], mag_db[i]
    if np.isclose(y1, y0):
        return float(x1)
    w = (target - y0) / (y1 - y0)
    return float(x0 + w * (x1 - x0))

bode(circuit, y_out, y_in=None, *, sweep_type='dec', n=201, fstart=10.0, fstop=1000000.0)

Run AC and return (f, |G|_dB, phase_deg) using cat.analysis.metrics_ac.

Note: The circuit must include appropriate small-signal sources for AC analysis.

Source code in src/cat/analysis/__init__.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def bode(
    circuit: Circuit,
    y_out: str,
    y_in: str | None = None,
    *,
    sweep_type: str = "dec",
    n: int = 201,
    fstart: float = 10.0,
    fstop: float = 1e6,
) -> tuple[Any, Any, Any]:
    """Run AC and return (f, |G|_dB, phase_deg) using cat.analysis.metrics_ac.

    Note: The circuit must include appropriate small-signal sources for AC analysis.
    """
    res = AC(sweep_type, n, fstart, fstop).run(circuit)
    return ac_gain_phase(res.traces, y_out=y_out, y_in=y_in)

crossover_freq_0db(ts, y_out, y_in=None)

Frequência em que |G| cruza 0 dB.

Source code in src/cat/analysis/metrics_ac.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def crossover_freq_0db(
    ts: TraceSet,
    y_out: str,
    y_in: str | None = None,
) -> float | None:
    """Frequência em que |G| cruza 0 dB."""
    f, mag_db, _ = ac_gain_phase(ts, y_out=y_out, y_in=y_in)
    y = mag_db
    s = np.sign(y)
    idx = np.where((s[:-1] >= 0) & (s[1:] <= 0) | (s[:-1] <= 0) & (s[1:] >= 0))[0]
    if idx.size == 0:
        return None
    i = int(idx[0])
    x0, x1 = f[i], f[i + 1]
    y0, y1 = y[i], y[i + 1]
    if np.isclose(y1, y0):
        return float(x0)
    w = (0.0 - y0) / (y1 - y0)
    return float(x0 + w * (x1 - x0))

fall_time(ts, y_name, frac_low=0.1, frac_high=0.9)

Tempo 90–10% (por padrão). Retorna RiseFall com .tfall (e .trise=None).

Estratégia robusta análoga à de subida, mas do nível alto para o baixo.

Source code in src/cat/analysis/metrics_tran.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def fall_time(
    ts: TraceSet,
    y_name: str,
    frac_low: float = 0.1,
    frac_high: float = 0.9,
) -> RiseFall:
    """
    Tempo 90–10% (por padrão). Retorna RiseFall com .tfall (e .trise=None).

    Estratégia robusta análoga à de subida, mas do nível alto para o baixo.
    """
    x, y = _get_xy(ts, y_name)
    y0 = float(y[0])
    yf = float(y[-1])
    y_min = float(np.min(y))
    y_max = float(np.max(y))
    span_end = y0 - yf
    span_global = y_max - y_min

    if span_end > 1e-15:
        hi = yf + (1.0 - frac_high) * span_end  # próximo de y0
        lo = yf + (1.0 - frac_low) * span_end
    elif span_global > 1e-15:
        hi = y_max - frac_high * span_global
        lo = y_max - frac_low * span_global
    else:
        return RiseFall(trise=None, tfall=None)

    t_hi = _cross_time(x, y, hi)
    t_lo = _cross_time(x, y, lo)

    if t_hi is None:
        t_hi = _interp_time_fall(x, y, hi)
    if t_lo is None:
        t_lo = _interp_time_fall(x, y, lo)

    if t_hi is None:
        t_hi = _discrete_time_first_at_or_above(x, y[::-1], hi)
    if t_lo is None:
        t_lo = _discrete_time_first_at_or_above(x, y[::-1], lo)

    if t_hi is None or t_lo is None:
        return RiseFall(trise=None, tfall=None)

    dt = float(t_lo - t_hi)
    if dt < 0.0:
        dt = 0.0
    return RiseFall(trise=None, tfall=dt)

gain_at(ts, y_out, f_hz, y_in=None)

Ganho (dB) em f_hz.

Source code in src/cat/analysis/metrics_ac.py
256
257
258
259
260
261
262
263
264
def gain_at(
    ts: TraceSet,
    y_out: str,
    f_hz: float,
    y_in: str | None = None,
) -> float:
    """Ganho (dB) em f_hz."""
    f, mag_db, _ = ac_gain_phase(ts, y_out=y_out, y_in=y_in)
    return _interp_at_x(f, mag_db, f_hz)

gain_db_from_traces(ts, y_name)

"Ganho" DC aproximado, usando a diferença final - inicial do traço (em dB do delta). É um helper simples para testes; não é igual a ganho AC.

Source code in src/cat/analysis/metrics_basic.py
21
22
23
24
25
26
27
28
29
30
31
32
def gain_db_from_traces(ts: TraceSet, y_name: str) -> float:
    """
    "Ganho" DC aproximado, usando a diferença final - inicial do traço (em dB do delta).
    É um helper simples para testes; não é igual a ganho AC.
    """
    arr = _vals(ts, y_name)
    if arr.size == 0:
        return -300.0
    v = float(arr[-1] - arr[0])
    if v == 0.0:
        return -300.0
    return float(20.0 * np.log10(abs(v)))

gain_margin_db(ts, y_out, y_in=None)

Ganho em dB quando fase = -180° (margem de ganho).

Source code in src/cat/analysis/metrics_ac.py
333
334
335
336
337
338
339
340
341
342
def gain_margin_db(
    ts: TraceSet,
    y_out: str,
    y_in: str | None = None,
) -> float | None:
    """Ganho em dB quando fase = -180° (margem de ganho)."""
    f180 = phase_crossover_freq(ts, y_out=y_out, y_in=y_in, target_deg=-180.0)
    if f180 is None:
        return None
    return gain_at(ts, y_out=y_out, y_in=y_in, f_hz=f180)

loop_gain_bode(ts, y_out, y_in)

Retorna Bode do loop (y_out / y_in) como tupla (f, mag_db, fase).

Source code in src/cat/analysis/metrics_ac.py
345
346
347
348
349
350
351
def loop_gain_bode(
    ts: TraceSet,
    y_out: str,
    y_in: str,
) -> tuple[NDArray[Any], NDArray[Any], NDArray[Any]]:
    """Retorna Bode do loop (y_out / y_in) como tupla (f, mag_db, fase)."""
    return ac_gain_phase(ts, y_out=y_out, y_in=y_in)

monte_carlo(circuit, mapping, n, analysis_factory, seed=None, label_fn=None, workers=1, progress=None)

Executa Monte Carlo variando valores dos componentes conforme distribuições.

Source code in src/cat/analysis/montecarlo.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
def monte_carlo(
    circuit: Circuit,
    mapping: Mapping[Component, Dist],
    n: int,
    analysis_factory: Callable[[], _RunsAnalysis],
    seed: int | None = None,
    label_fn: Callable[[Component], str] | None = None,
    workers: int = 1,
    progress: bool | Callable[[int, int], None] | None = None,
) -> MonteCarloResult:
    """
    Executa Monte Carlo variando valores dos componentes conforme distribuições.
    """
    rnd = _random.Random(seed)

    def _label(c: Component) -> str:
        if label_fn:
            return label_fn(c)
        return f"{type(c).__name__}.{c.ref}"

    comps: list[Component] = list(mapping.keys())
    nominals: list[float] = [_as_float(c.value) for c in comps]
    dists: list[Dist] = [mapping[c] for c in comps]

    samples: list[dict[str, float]] = []
    for _ in range(n):
        s: dict[str, float] = {}
        for comp, nominal, dist in zip(comps, nominals, dists, strict=False):
            s[_label(comp)] = dist.sample(nominal, rnd)
        samples.append(s)

    def _run_one(sample: dict[str, float]) -> AnalysisResult:
        c_copy: Circuit = copy.deepcopy(circuit)
        comp_list = getattr(c_copy, "components", None)
        if comp_list is None:
            comp_list = getattr(c_copy, "_components", [])
        by_label: dict[str, Component] = {_label(c): c for c in comp_list}
        for k, v in sample.items():
            by_label[k].value = v
        analysis = analysis_factory()
        return analysis.run(c_copy)

    # Progress handler (optional)
    printer = None

    def _notify(done: int, total: int) -> None:
        if progress is None:
            return
        if callable(progress):
            try:
                progress(done, total)
            except Exception:
                pass
            return
        # simple stderr bar
        nonlocal printer
        if progress is True:
            # lazy-init
            class _Bar:
                def __init__(self, total: int) -> None:
                    self.total = total
                    self.last = -1

                def update(self, done: int) -> None:
                    if done == self.last:
                        return
                    pct = int(round(100.0 * done / max(self.total, 1)))
                    sys.stderr.write(f"\rMC: {done}/{self.total} ({pct}%)")
                    sys.stderr.flush()
                    self.last = done

                def close(self) -> None:
                    sys.stderr.write("\n")

            if printer is None:
                printer = _Bar(total)
            printer.update(done)

    runs: list[AnalysisResult] = []
    if workers <= 1:
        for i, s in enumerate(samples, start=1):
            runs.append(_run_one(s))
            _notify(i, len(samples))
    else:
        # Executa em paralelo preservando a ordem dos samples
        runs_buf: list[AnalysisResult | None] = [None] * len(samples)
        with ThreadPoolExecutor(max_workers=workers) as ex:
            fut_to_idx = {}
            for idx, s in enumerate(samples):
                fut = ex.submit(_run_one, s)
                fut_to_idx[fut] = idx
            done = 0
            for f in as_completed(list(fut_to_idx.keys())):
                idx = fut_to_idx[f]
                runs_buf[idx] = f.result()
                done += 1
                _notify(done, len(samples))
        runs = [r for r in runs_buf if r is not None]

    if isinstance(progress, bool) and progress and printer is not None:
        try:
            printer.close()
        except Exception:
            pass

    # build optional manifest: list of (label, nominal, dist_repr)
    manifest: list[tuple[str, float, str]] = []
    for c, nom, d in zip(comps, nominals, dists, strict=False):
        try:
            d_repr = repr(d)
        except Exception:
            d_repr = type(d).__name__
        manifest.append((_label(c), nom, d_repr))

    return MonteCarloResult(samples=samples, runs=runs, mapping_manifest=manifest)

overshoot(ts, y_name)

Overshoot relativo ao degrau: (Vmax - Vfinal) / (Vfinal - Vinicial).

Source code in src/cat/analysis/metrics_tran.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def overshoot(ts: TraceSet, y_name: str) -> OvershootResult:
    """
    Overshoot relativo ao degrau: (Vmax - Vfinal) / (Vfinal - Vinicial).
    """
    x, y = _get_xy(ts, y_name)
    _ = x  # reservado para futuros usos
    n = y.shape[0]
    if n < 2:
        return OvershootResult(
            overshoot=0.0,
            y_final=float(y[-1]),
            y_initial=float(y[0]),
            y_peak=float(y[0]),
        )
    y0 = float(y[0])
    yf = float(y[-1])
    yp = float(np.max(y))
    denom = yf - y0
    if np.isclose(denom, 0.0):
        return OvershootResult(overshoot=0.0, y_final=yf, y_initial=y0, y_peak=yp)
    ov = max(0.0, (yp - yf) / denom)
    return OvershootResult(overshoot=ov, y_final=yf, y_initial=y0, y_peak=yp)

overshoot_pct(ts, y_name)

Overshoot em % relativo ao degrau.

Source code in src/cat/analysis/metrics_basic.py
35
36
37
def overshoot_pct(ts: TraceSet, y_name: str) -> float:
    """Overshoot em % relativo ao degrau."""
    return float(_ov(ts, y_name).overshoot * 100.0)

phase_crossover_freq(ts, y_out, y_in=None, target_deg=-180.0)

Frequência em que fase cruza target (padrão: -180°).

Source code in src/cat/analysis/metrics_ac.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def phase_crossover_freq(
    ts: TraceSet,
    y_out: str,
    y_in: str | None = None,
    target_deg: float = -180.0,
) -> float | None:
    """Frequência em que fase cruza target (padrão: -180°)."""
    f, _, ph_deg = ac_gain_phase(ts, y_out=y_out, y_in=y_in)
    y = ph_deg - target_deg
    s = np.sign(y)
    idx = np.where((s[:-1] >= 0) & (s[1:] <= 0) | (s[:-1] <= 0) & (s[1:] >= 0))[0]
    if idx.size == 0:
        return None
    i = int(idx[0])
    x0, x1 = f[i], f[i + 1]
    y0, y1 = y[i], y[i + 1]
    if np.isclose(y1, y0):
        return float(x0)
    w = (0.0 - y0) / (y1 - y0)
    return float(x0 + w * (x1 - x0))

phase_margin(ts, y_out, y_in=None)

PM = 180° + fase em w_c (onde |G|=1). Fase normalizada para (-180, 180].

Fallback: se a fase aparenta estar ausente (variância ~ zero), estima PM assumindo um sistema de 1 polo mínimo-fase: φ(w) ≈ -atan(w/wp) e PM ≈ 180° - atan(wc/wp), onde wp é obtido via largura de banda de -3 dB.

Source code in src/cat/analysis/metrics_ac.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def phase_margin(
    ts: TraceSet,
    y_out: str,
    y_in: str | None = None,
) -> float | None:
    """
    PM = 180° + fase em w_c (onde |G|=1). Fase normalizada para (-180, 180].

    Fallback: se a fase aparenta estar ausente (variância ~ zero), estima PM assumindo
    um sistema de 1 polo mínimo-fase: φ(w) ≈ -atan(w/wp) e PM ≈ 180° - atan(wc/wp),
    onde wp é obtido via largura de banda de -3 dB.
    """
    wc = crossover_freq_0db(ts, y_out=y_out, y_in=y_in)
    if wc is None:
        return None
    f, _, ph = ac_gain_phase(ts, y_out=y_out, y_in=y_in)

    # se há fase, use-a
    if np.nanstd(ph) > 1e-3:
        phi = _interp_at_x(f, ph, wc)
        return 180.0 + float(phi)

    # fallback 1-polo: usa -3 dB para estimar wp
    bw = bandwidth_3db(ts, y_out=y_out, y_in=y_in)
    if bw is None or bw <= 0.0:
        # sem alternativa: devolve 180 para não explodir
        return 180.0
    phi_est = -np.degrees(np.arctan(wc / bw))
    return 180.0 + float(phi_est)

rise_time(ts, y_name, frac_low=0.1, frac_high=0.9)

Tempo 10–90% (por padrão). Retorna RiseFall com .trise (e .tfall=None).

Estratégia robusta

1) Prioriza níveis com base em y0→yf (degrau observado). Se não subir, usa span global. 2) Tenta cruzamento linear; se falhar, interpola no envelope; se falhar, usa fallback discreto (primeiro índice com y >= nível).

Source code in src/cat/analysis/metrics_tran.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def rise_time(
    ts: TraceSet,
    y_name: str,
    frac_low: float = 0.1,
    frac_high: float = 0.9,
) -> RiseFall:
    """
    Tempo 10–90% (por padrão). Retorna RiseFall com .trise (e .tfall=None).

    Estratégia robusta:
      1) Prioriza níveis com base em y0→yf (degrau observado). Se não subir, usa span global.
      2) Tenta cruzamento linear; se falhar, interpola no envelope; se falhar, usa fallback
         discreto (primeiro índice com y >= nível).
    """
    x, y = _get_xy(ts, y_name)
    y0 = float(y[0])
    yf = float(y[-1])
    y_min = float(np.min(y))
    y_max = float(np.max(y))
    span_end = yf - y0
    span_global = y_max - y_min

    if span_end > 1e-15:
        lo = y0 + frac_low * span_end
        hi = y0 + frac_high * span_end
    elif span_global > 1e-15:
        lo = y_min + frac_low * span_global
        hi = y_min + frac_high * span_global
    else:
        return RiseFall(trise=None, tfall=None)

    # 1) cruzamento linear
    t_lo = _cross_time(x, y, lo)
    t_hi = _cross_time(x, y, hi)

    # 2) envelope (não-decrescente)
    if t_lo is None:
        t_lo = _interp_time_rise(x, y, lo)
    if t_hi is None:
        t_hi = _interp_time_rise(x, y, hi)

    # 3) discreto (primeiro índice com y >= nível)
    if t_lo is None:
        t_lo = _discrete_time_first_at_or_above(x, y, lo)
    if t_hi is None:
        t_hi = _discrete_time_first_at_or_above(x, y, hi)

    if t_lo is None or t_hi is None:
        return RiseFall(trise=None, tfall=None)

    dt = float(t_hi - t_lo)
    if dt < 0.0:
        dt = 0.0
    return RiseFall(trise=dt, tfall=None)

run_ac(circuit, sweep_type, n, fstart, fstop, *, return_df=False)

Run an AC analysis and optionally return a DataFrame of traces.

Source code in src/cat/analysis/__init__.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def run_ac(
    circuit: Circuit,
    sweep_type: str,
    n: int,
    fstart: float,
    fstop: float,
    *,
    return_df: bool = False,
) -> AnalysisResult | Any:
    """Run an AC analysis and optionally return a DataFrame of traces."""
    res = AC(sweep_type, n, fstart, fstop).run(circuit)
    if return_df:
        return res.traces.to_dataframe()
    return res

run_op(circuit)

Run a simple .OP analysis and return the AnalysisResult.

Example

from cat.core.circuit import Circuit from cat.core.components import Vdc, Resistor from cat.core.net import GND c = Circuit("rc") V1, R1 = Vdc("1", 5.0), Resistor("1", "1k") c.add(V1, R1) c.connect(V1.ports[0], R1.ports[0]) c.connect(R1.ports[1], GND) c.connect(V1.ports[1], GND) _ = run_op(c) # doctest: +SKIP

Source code in src/cat/analysis/__init__.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def run_op(circuit: Circuit) -> AnalysisResult:
    """Run a simple .OP analysis and return the AnalysisResult.

    Example:
        >>> from cat.core.circuit import Circuit
        >>> from cat.core.components import Vdc, Resistor
        >>> from cat.core.net import GND
        >>> c = Circuit("rc")
        >>> V1, R1 = Vdc("1", 5.0), Resistor("1", "1k")
        >>> c.add(V1, R1)
        >>> c.connect(V1.ports[0], R1.ports[0])
        >>> c.connect(R1.ports[1], GND)
        >>> c.connect(V1.ports[1], GND)
        >>> _ = run_op(c)  # doctest: +SKIP
    """
    return OP().run(circuit)

run_step_native(circuit, directives)

Roda um deck com diretivas .step nativas (já contidas em directives) e retorna todos os plots como lista de TraceSet.

Source code in src/cat/analysis/step_native.py
16
17
18
19
20
21
22
23
24
25
26
27
def run_step_native(circuit: Circuit, directives: list[str]) -> StepNativeResult:
    """
    Roda um deck com diretivas .step nativas (já contidas em `directives`)
    e retorna todos os plots como lista de TraceSet.
    """
    net = circuit.build_netlist()
    run_directives = get_run_directives()
    res = run_directives(net, directives)
    if res.returncode != 0 or not res.artifacts.raw_path:
        raise RuntimeError("NGSpice failed for native .step run")
    sets = parse_ngspice_ascii_raw_multi(res.artifacts.raw_path)
    return StepNativeResult(tracesets=sets)

run_tran(circuit, tstep, tstop, tstart=None, *, return_df=False)

Run a transient (.TRAN) analysis and optionally return a Pandas DataFrame.

  • return_df=False returns AnalysisResult
  • return_df=True returns a DataFrame via TraceSet.to_dataframe()
Source code in src/cat/analysis/__init__.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def run_tran(
    circuit: Circuit,
    tstep: str,
    tstop: str,
    tstart: str | None = None,
    *,
    return_df: bool = False,
) -> AnalysisResult | Any:
    """Run a transient (.TRAN) analysis and optionally return a Pandas DataFrame.

    - return_df=False returns AnalysisResult
    - return_df=True returns a DataFrame via TraceSet.to_dataframe()
    """
    res = TRAN(tstep, tstop, tstart).run(circuit)
    if return_df:
        return res.traces.to_dataframe()
    return res

settling_time(ts, y_name, tol=0.02)

Primeiro instante em que |y(t) - y_final| <= tol*|y_final - y_initial| e permanece assim até o final.

Source code in src/cat/analysis/metrics_tran.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def settling_time(ts: TraceSet, y_name: str, tol: float = 0.02) -> SettlingResult:
    """
    Primeiro instante em que |y(t) - y_final| <= tol*|y_final - y_initial|
    e permanece assim até o final.
    """
    x, y = _get_xy(ts, y_name)
    y0 = float(y[0])
    yf = float(y[-1])
    band = abs(tol * (yf - y0))
    if band == 0.0:
        return SettlingResult(t_settle=None, idx=None, band=band, y_final=yf)

    err = np.abs(y - yf)
    inside = err <= band
    suffix_ok = np.flip(np.cumsum(np.flip(~inside)) == 0)
    ok = inside & suffix_ok
    idxs = np.where(ok)[0]
    if idxs.size == 0:
        return SettlingResult(t_settle=None, idx=None, band=band, y_final=yf)
    i = int(idxs[0])
    return SettlingResult(t_settle=float(x[i]), idx=i, band=band, y_final=yf)

stack_runs_to_df(runs, params_list=None, y=None, with_x=True, run_index_name='run_idx')

Empilha uma lista de AnalysisResult em um único DataFrame, adicionando as colunas de parâmetros (params_list[i]) por run.

  • y: lista de nomes de traços a manter (ex.: ["v(out)"]). Se None, mantém todos.
  • with_x: inclui a coluna do eixo X (primeira coluna do TraceSet), tipicamente "time"/"frequency".
Source code in src/cat/analysis/post.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def stack_runs_to_df(
    runs: Sequence[AnalysisResult],
    params_list: Sequence[Mapping[str, str | float]] | None = None,
    y: Sequence[str] | None = None,
    with_x: bool = True,
    run_index_name: str = "run_idx",
) -> Any:
    """
    Empilha uma lista de `AnalysisResult` em um único DataFrame, adicionando
    as colunas de parâmetros (`params_list[i]`) por run.

    - `y`: lista de nomes de traços a manter (ex.: ["v(out)"]). Se None, mantém todos.
    - `with_x`: inclui a coluna do eixo X (primeira coluna do TraceSet), tipicamente
    "time"/"frequency".
    """
    pd = _ensure_pandas()
    frames: list[Any] = []
    for i, res in enumerate(runs):
        df = res.traces.to_dataframe()
        x_name = res.traces.x.name
        keep = list(df.columns)
        if y is not None:
            keep = [x_name] + list(y) if with_x else list(y)
            df = _pick_columns(df, keep)
        else:
            if not with_x:
                keep = [c for c in keep if c != x_name]
                df = df[keep]
        # parâmetros deste run
        params = params_list[i] if params_list is not None else {}
        for k, v in params.items():
            df[k] = v
        df[run_index_name] = i
        frames.append(df)
    if not frames:
        return pd.DataFrame()
    return pd.concat(frames, ignore_index=True)

stack_step_to_df(step, y=None, with_x=True, run_index_name='run_idx')

Versão prática para StepResult: empilha step.runs com as colunas de step.grid.

Source code in src/cat/analysis/post.py
66
67
68
69
70
71
72
73
74
75
def stack_step_to_df(
    step: StepResult,
    y: Sequence[str] | None = None,
    with_x: bool = True,
    run_index_name: str = "run_idx",
) -> Any:
    """
    Versão prática para `StepResult`: empilha `step.runs` com as colunas de `step.grid`.
    """
    return stack_runs_to_df(step.runs, step.grid, y=y, with_x=with_x, run_index_name=run_index_name)

sweep_component(circuit, component, values, analysis_factory, param_name=None, *, progress=None)

Executa várias simulações alterando component.value em Python.

  • values: lista de valores a aplicar no componente (ex.: ["1k","2k","5k"])
  • analysis_factory: callable que cria uma instância da análise a cada iteração, por ex.: lambda: TRAN("100us","1ms") ou lambda: OP()
  • param_name: opcional, nome amigável para registrar no resultado
Source code in src/cat/analysis/sweep.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def sweep_component(
    circuit: Circuit,
    component: Component,
    values: Sequence[str | float],
    analysis_factory: Callable[[], A],
    param_name: str | None = None,
    *,
    progress: bool | Callable[[int, int], None] | None = None,
) -> SweepResult:
    """Executa várias simulações alterando `component.value` em Python.

    - `values`: lista de valores a aplicar no componente (ex.: ["1k","2k","5k"])
    - `analysis_factory`: callable que cria uma instância da análise a cada iteração,
      por ex.: `lambda: TRAN("100us","1ms")` ou `lambda: OP()`
    - `param_name`: opcional, nome amigável para registrar no resultado
    """
    original = component.value

    def _notify(done: int) -> None:
        if not progress:
            return
        if callable(progress):
            try:
                progress(done, len(values))
            except Exception:
                pass
            return
        pct = int(round(100.0 * done / max(len(values), 1)))
        sys.stderr.write(f"\rSWEEP[{component.ref}]: {done}/{len(values)} ({pct}%)")
        sys.stderr.flush()

    runs: list[AnalysisResult] = []
    try:
        for i, v in enumerate(values, start=1):
            component.value = v
            analysis = analysis_factory()
            res = analysis.run(circuit)  # type: ignore[attr-defined]
            runs.append(res)
            _notify(i)
    finally:
        component.value = original  # restore
    return SweepResult(
        param_name or f"{type(component).__name__}.{component.ref}", list(values), runs
    )

worst_case(circuit, analysis_factory, metric, space, mode='min', n_random=64, n_refine=3, progress=None)

Busca pior caso sobre parâmetros .param discretizados em 'space'. Estratégia: amostragem aleatória inicial -> refinamento coordenado local.

Source code in src/cat/analysis/worstcase.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def worst_case(
    circuit: Circuit,
    analysis_factory: Callable[[], object],
    metric: Callable[[AnalysisResult], float],
    space: Mapping[str, Sequence[float | str]],
    mode: str = "min",  # "min" or "max"
    n_random: int = 64,
    n_refine: int = 3,
    progress: bool | Callable[[int, int], None] | None = None,
) -> WorstCaseResult:
    """
    Busca pior caso sobre parâmetros .param discretizados em 'space'.
    Estratégia: amostragem aleatória inicial -> refinamento coordenado local.
    """
    net = circuit.build_netlist()
    base = analysis_factory()._directives()  # type: ignore[attr-defined]

    # 1) random
    total = n_random + sum(len(space[k]) for k in space) * n_refine

    def _notify(done: int) -> None:
        if not progress:
            return
        if callable(progress):
            try:
                progress(done, total)
            except Exception:
                pass
            return
        pct = int(round(100.0 * done / max(total, 1)))
        sys.stderr.write(f"\rWORST: {done}/{total} ({pct}%)")
        sys.stderr.flush()

    hist: list[tuple[dict[str, float | str], float]] = []
    best_p: dict[str, float | str] = {}
    best_v = math.inf if mode == "min" else -math.inf

    keys = list(space.keys())
    choices = [list(space[k]) for k in keys]

    done = 0
    for _ in range(n_random):
        p = {k: random.choice(choices[i]) for i, k in enumerate(keys)}
        res = _run_with_params(net, _directives_with_params(base, p))
        val = metric(res)
        hist.append((p, val))
        if (mode == "min" and val < best_v) or (mode == "max" and val > best_v):
            best_p, best_v = p, val
        done += 1
        _notify(done)

    # 2) refinamento coordenado
    for _ in range(n_refine):
        improved = False
        for i, k in enumerate(keys):
            cand: list[tuple[float, dict[str, float | str]]] = []
            for v in choices[i]:
                p2 = dict(best_p)
                p2[k] = v
                res = _run_with_params(net, _directives_with_params(base, p2))
                val = metric(res)
                hist.append((p2, val))
                cand.append((val, p2))
                done += 1
                _notify(done)
            if mode == "min":
                val, p_sel = min(cand, key=lambda x: x[0])
                if val < best_v:
                    best_v, best_p, improved = val, p_sel, True
            else:
                val, p_sel = max(cand, key=lambda x: x[0])
                if val > best_v:
                    best_v, best_p, improved = val, p_sel, True
        if not improved:
            break

    return WorstCaseResult(best_params=best_p, best_value=best_v, history=hist)