Skip to content

Client

The async HTTP client handles all communication with the YNAB API. Every request flows through YNABClient, which provides authentication, rate limit enforcement, error parsing, response unwrapping, and milliunit-to-dollar conversion.

Key behaviors

  • Rate limiting: Checks remaining quota before each request and raises ToolError if exhausted.
  • Authentication: Bearer token injected via the httpx client at construction time.
  • Error parsing: YNAB error responses are parsed into YNABAPIError with structured fields.
  • Data unwrapping: Automatically extracts the data envelope from YNAB API responses.
  • Milliunit conversion: Converts milliunit fields (amounts in thousandths) to dollar values in responses.
  • Delta caching: Supports server knowledge-based delta requests for supported endpoints.

client

YNAB API client wrapping httpx with auth, rate limiting, and conversion.

Provides YNABClient, the single point of contact with the YNAB API. All requests go through this client, which handles:

  • Rate limit checking before each request
  • Bearer token authentication via the injected httpx client
  • YNAB error response parsing into YNABAPIError
  • Data envelope unwrapping (response["data"])
  • Milliunit-to-dollar conversion on response fields
  • Delta request caching for supported endpoints

logger module-attribute

logger = getLogger(__name__)

_OK module-attribute

_OK = OK

MILLIUNIT_FIELDS module-attribute

MILLIUNIT_FIELDS: frozenset[str] = frozenset({
    "balance",
    "cleared_balance",
    "uncleared_balance",
    "budgeted",
    "activity",
    "amount",
    "goal_target",
    "goal_overall_left",
    "goal_under_funded",
    "allocation",
    "spent",
    "income",
    "to_be_budgeted",
})

Field names that contain milliunit values in YNAB API responses.

JSONValue

JSONValue = dict[str, Any] | list[Any] | str | int | float | bool | None

CacheStore

CacheStore()

In-memory cache for YNAB delta-capable endpoints.

Stores full response data and server_knowledge values per budget+endpoint combination. Merges delta responses by replacing/adding changed entities in the cached list.

Attributes:

  • _entries (dict[str, CacheEntry]) –

    Internal dict mapping cache keys to CacheEntry instances.

Initialize an empty cache store.

Source code in src/ynaa_mcp/cache.py
def __init__(self) -> None:
    """Initialize an empty cache store."""
    self._entries: dict[str, CacheEntry] = {}
    self._ttl_entries: dict[str, TTLCacheEntry] = {}

get_knowledge

get_knowledge(cache_key: str) -> int | None

Return stored server_knowledge for a cache key, or None.

Parameters:

  • cache_key (str) –

    The cache key in {budget_id}:{resource} format.

Returns:

  • int | None

    The server_knowledge integer, or None if not cached.

Source code in src/ynaa_mcp/cache.py
def get_knowledge(self, cache_key: str) -> int | None:
    """Return stored server_knowledge for a cache key, or None.

    Args:
        cache_key: The cache key in ``{budget_id}:{resource}`` format.

    Returns:
        The server_knowledge integer, or None if not cached.
    """
    entry = self._entries.get(cache_key)
    return entry.server_knowledge if entry else None

get_cached_data

get_cached_data(cache_key: str) -> dict[str, Any] | None

Return cached response data, or None if not cached.

Parameters:

  • cache_key (str) –

    The cache key in {budget_id}:{resource} format.

Returns:

  • dict[str, Any] | None

    The cached data dict, or None if not cached.

Source code in src/ynaa_mcp/cache.py
def get_cached_data(self, cache_key: str) -> dict[str, Any] | None:
    """Return cached response data, or None if not cached.

    Args:
        cache_key: The cache key in ``{budget_id}:{resource}`` format.

    Returns:
        The cached data dict, or None if not cached.
    """
    entry = self._entries.get(cache_key)
    return entry.data if entry else None

update

update(cache_key: str, server_knowledge: int, data: dict[str, Any]) -> None

Store or replace a cache entry with full response data.

Parameters:

  • cache_key (str) –

    The cache key in {budget_id}:{resource} format.

  • server_knowledge (int) –

    The YNAB server knowledge value.

  • data (dict[str, Any]) –

    The full response data dict.

Source code in src/ynaa_mcp/cache.py
def update(
    self,
    cache_key: str,
    server_knowledge: int,
    data: dict[str, Any],
) -> None:
    """Store or replace a cache entry with full response data.

    Args:
        cache_key: The cache key in ``{budget_id}:{resource}`` format.
        server_knowledge: The YNAB server knowledge value.
        data: The full response data dict.
    """
    self._entries[cache_key] = CacheEntry(
        server_knowledge=server_knowledge,
        data=data,
    )

merge_delta

merge_delta(cache_key: str, server_knowledge: int, delta_data: dict[str, Any]) -> None

Merge delta entities into existing cached data.

For each top-level key in delta_data that is a list of dicts with id fields: updates existing entities, adds new ones, and removes those with deleted=True.

For category_groups (grouped response): merges at group level first, then merges categories within each group.

If no existing entry exists, stores delta_data as a fresh entry.

Parameters:

  • cache_key (str) –

    The cache key in {budget_id}:{resource} format.

  • server_knowledge (int) –

    The new server knowledge value.

  • delta_data (dict[str, Any]) –

    The delta response data dict.

Source code in src/ynaa_mcp/cache.py
def merge_delta(
    self,
    cache_key: str,
    server_knowledge: int,
    delta_data: dict[str, Any],
) -> None:
    """Merge delta entities into existing cached data.

    For each top-level key in ``delta_data`` that is a list of dicts
    with ``id`` fields: updates existing entities, adds new ones, and
    removes those with ``deleted=True``.

    For ``category_groups`` (grouped response): merges at group level
    first, then merges categories within each group.

    If no existing entry exists, stores ``delta_data`` as a fresh entry.

    Args:
        cache_key: The cache key in ``{budget_id}:{resource}`` format.
        server_knowledge: The new server knowledge value.
        delta_data: The delta response data dict.
    """
    existing = self.get_cached_data(cache_key)
    if existing is None:
        self.update(cache_key, server_knowledge, delta_data)
        return

    for key, value in delta_data.items():
        if not isinstance(value, list):
            existing[key] = value
            continue

        entity_list = cast("list[dict[str, Any]]", value)
        if key == "category_groups":
            self._merge_category_groups(existing, entity_list)
        else:
            existing[key] = self._merge_entity_list(
                existing.get(key, []),
                entity_list,
            )

    self._entries[cache_key] = CacheEntry(
        server_knowledge=server_knowledge,
        data=existing,
    )

invalidate

invalidate(cache_key: str) -> None

Remove a specific cache entry.

Parameters:

  • cache_key (str) –

    The cache key to remove.

Source code in src/ynaa_mcp/cache.py
def invalidate(self, cache_key: str) -> None:
    """Remove a specific cache entry.

    Args:
        cache_key: The cache key to remove.
    """
    self._entries.pop(cache_key, None)

invalidate_budget

invalidate_budget(budget_id: str) -> None

Remove all cache entries for a budget.

Parameters:

  • budget_id (str) –

    The YNAB budget ID whose entries to remove.

Source code in src/ynaa_mcp/cache.py
def invalidate_budget(self, budget_id: str) -> None:
    """Remove all cache entries for a budget.

    Args:
        budget_id: The YNAB budget ID whose entries to remove.
    """
    keys_to_remove = [k for k in self._entries if k.startswith(f"{budget_id}:")]
    for key in keys_to_remove:
        del self._entries[key]

invalidate_for_mutation

invalidate_for_mutation(path: str) -> None

Invalidate cache entries affected by a mutation at the given path.

Extracts the budget ID and resource type from the API path, invalidates the direct resource, then invalidates any cross-resource entries defined in :data:CROSS_INVALIDATION_MAP.

Parameters:

  • path (str) –

    The API path of the mutation (e.g., /budgets/b1/transactions/t1).

Source code in src/ynaa_mcp/cache.py
def invalidate_for_mutation(self, path: str) -> None:
    """Invalidate cache entries affected by a mutation at the given path.

    Extracts the budget ID and resource type from the API path,
    invalidates the direct resource, then invalidates any cross-resource
    entries defined in :data:`CROSS_INVALIDATION_MAP`.

    Args:
        path: The API path of the mutation (e.g., ``/budgets/b1/transactions/t1``).
    """
    parts = path.strip("/").split("/")
    if len(parts) < 3 or parts[0] != "budgets":  # noqa: PLR2004
        return

    budget_id = parts[1]
    resource = parts[2]

    self.invalidate(f"{budget_id}:{resource}")

    for cross_resource in CROSS_INVALIDATION_MAP.get(resource, []):
        self.invalidate(f"{budget_id}:{cross_resource}")

get_ttl

get_ttl(key: str) -> dict[str, Any] | None

Return cached data for a TTL key, or None if missing/expired.

Expired entries are deleted on access.

Parameters:

  • key (str) –

    The TTL cache key.

Returns:

  • dict[str, Any] | None

    The cached data dict, or None if not cached or expired.

Source code in src/ynaa_mcp/cache.py
def get_ttl(self, key: str) -> dict[str, Any] | None:
    """Return cached data for a TTL key, or None if missing/expired.

    Expired entries are deleted on access.

    Args:
        key: The TTL cache key.

    Returns:
        The cached data dict, or None if not cached or expired.
    """
    entry = self._ttl_entries.get(key)
    if entry is None:
        return None
    if time.monotonic() > entry.expires_at:
        del self._ttl_entries[key]
        return None
    return entry.data

set_ttl

set_ttl(key: str, data: dict[str, Any], ttl_seconds: float) -> None

Store data with a time-to-live expiration.

Parameters:

  • key (str) –

    The TTL cache key.

  • data (dict[str, Any]) –

    The data dict to cache.

  • ttl_seconds (float) –

    Number of seconds until the entry expires.

Source code in src/ynaa_mcp/cache.py
def set_ttl(
    self,
    key: str,
    data: dict[str, Any],
    ttl_seconds: float,
) -> None:
    """Store data with a time-to-live expiration.

    Args:
        key: The TTL cache key.
        data: The data dict to cache.
        ttl_seconds: Number of seconds until the entry expires.
    """
    self._ttl_entries[key] = TTLCacheEntry(
        data=data,
        expires_at=time.monotonic() + ttl_seconds,
    )

clear

clear() -> None

Remove all cache entries (both delta and TTL).

Source code in src/ynaa_mcp/cache.py
def clear(self) -> None:
    """Remove all cache entries (both delta and TTL)."""
    self._entries.clear()
    self._ttl_entries.clear()

YNABAPIError

YNABAPIError(status_code: int, error_id: str, name: str, detail: str)

Bases: Exception

Raised when the YNAB API returns an error response.

Attributes:

  • status_code

    HTTP status code from the YNAB API.

  • error_id

    YNAB-specific error identifier (e.g., "404.2").

  • name

    YNAB error name (e.g., "resource_not_found").

  • detail

    Human-readable error description from YNAB.

Initialize a YNABAPIError.

Parameters:

  • status_code (int) –

    HTTP status code from the YNAB API.

  • error_id (str) –

    YNAB-specific error identifier.

  • name (str) –

    YNAB error name.

  • detail (str) –

    Human-readable error description.

Source code in src/ynaa_mcp/errors.py
def __init__(
    self,
    status_code: int,
    error_id: str,
    name: str,
    detail: str,
) -> None:
    """Initialize a YNABAPIError.

    Args:
        status_code: HTTP status code from the YNAB API.
        error_id: YNAB-specific error identifier.
        name: YNAB error name.
        detail: Human-readable error description.
    """
    super().__init__(detail)
    self.status_code = status_code
    self.error_id = error_id
    self.name = name
    self.detail = detail

RateLimiter

RateLimiter()

Sliding window rate limiter that tracks request timestamps.

Tracks time.monotonic() timestamps in a deque and prunes entries older than :data:WINDOW_SECONDS. Denies new requests once the count reaches :data:DENY_THRESHOLD.

Attributes:

  • _timestamps (deque[float]) –

    Deque of monotonic timestamps for requests in the window.

Initialize the rate limiter with an empty timestamp deque.

Source code in src/ynaa_mcp/rate_limiter.py
def __init__(self) -> None:
    """Initialize the rate limiter with an empty timestamp deque."""
    self._timestamps: deque[float] = deque()

check

check() -> tuple[bool, int, float | None]

Check whether a new request is allowed.

Prunes expired timestamps, then checks the count against :data:DENY_THRESHOLD.

Returns:

  • tuple[bool, int, float | None]

    A 3-tuple of: - allowed: True if a request can proceed. - current_count: Number of requests in the window. - retry_after: Seconds until the oldest request expires from the window (None if allowed).

Source code in src/ynaa_mcp/rate_limiter.py
def check(self) -> tuple[bool, int, float | None]:
    """Check whether a new request is allowed.

    Prunes expired timestamps, then checks the count against
    :data:`DENY_THRESHOLD`.

    Returns:
        A 3-tuple of:
            - ``allowed``: True if a request can proceed.
            - ``current_count``: Number of requests in the window.
            - ``retry_after``: Seconds until the oldest request
              expires from the window (None if allowed).
    """
    self._prune()
    current_count = len(self._timestamps)

    if current_count >= DENY_THRESHOLD:
        # Oldest timestamp determines when the window frees a slot
        oldest = self._timestamps[0]
        retry_after = (oldest + WINDOW_SECONDS) - time.monotonic()
        return False, current_count, max(retry_after, 0.0)

    return True, current_count, None

record

record() -> None

Record a request timestamp in the sliding window.

Source code in src/ynaa_mcp/rate_limiter.py
def record(self) -> None:
    """Record a request timestamp in the sliding window."""
    self._timestamps.append(time.monotonic())

YNABClient

YNABClient(
    http_client: AsyncClient,
    rate_limiter: RateLimiter,
    *,
    cache: CacheStore | None = None,
)

Async client for the YNAB API with rate limiting and conversion.

Accepts an injected httpx.AsyncClient (created during server lifespan) and a RateLimiter. Does NOT create its own HTTP client.

Attributes:

  • _http

    The injected httpx async client.

  • _rate_limiter

    The rate limiter instance.

  • _cache

    Optional delta cache store for reducing API calls.

Initialize the YNAB client with injected dependencies.

Parameters:

  • http_client (AsyncClient) –

    An httpx.AsyncClient pre-configured with base URL and authorization headers.

  • rate_limiter (RateLimiter) –

    A RateLimiter instance for tracking API usage.

  • cache (CacheStore | None, default: None ) –

    Optional CacheStore for delta request caching.

Source code in src/ynaa_mcp/client.py
def __init__(
    self,
    http_client: httpx.AsyncClient,
    rate_limiter: RateLimiter,
    *,
    cache: CacheStore | None = None,
) -> None:
    """Initialize the YNAB client with injected dependencies.

    Args:
        http_client: An httpx.AsyncClient pre-configured with base URL
            and authorization headers.
        rate_limiter: A RateLimiter instance for tracking API usage.
        cache: Optional CacheStore for delta request caching.
    """
    self._http = http_client
    self._rate_limiter = rate_limiter
    self._cache = cache

validate_token async

validate_token() -> str

Validate the YNAB personal access token by calling GET /user.

Returns:

  • str

    The authenticated user's ID string.

Source code in src/ynaa_mcp/client.py
async def validate_token(self) -> str:
    """Validate the YNAB personal access token by calling GET /user.

    Returns:
        The authenticated user's ID string.
    """
    data = await self.request("GET", "/user")
    user_info: dict[str, Any] = data["user"]
    return user_info["id"]

request async

request(method: str, path: str, **kwargs: Any) -> dict[str, Any]

Send a request to the YNAB API with rate limiting and conversion.

Steps
  1. Check rate limiter -- raise ToolError if denied
  2. Send HTTP request
  3. Record timestamp in rate limiter
  4. Parse error responses into YNABAPIError
  5. Unwrap the data envelope
  6. Convert milliunit fields to dollars

Parameters:

  • method (str) –

    HTTP method (GET, POST, PUT, PATCH, DELETE).

  • path (str) –

    API path relative to base URL (e.g., /user).

  • **kwargs (Any, default: {} ) –

    Additional keyword arguments passed to httpx (Any required -- httpx accepts diverse parameter types).

Returns:

  • dict[str, Any]

    The unwrapped data dict from the YNAB response with

  • dict[str, Any]

    milliunit fields converted to dollar amounts.

Raises:

  • ToolError

    If the rate limiter denies the request.

Source code in src/ynaa_mcp/client.py
async def request(
    self,
    method: str,
    path: str,
    **kwargs: Any,
) -> dict[str, Any]:
    """Send a request to the YNAB API with rate limiting and conversion.

    Steps:
        1. Check rate limiter -- raise ToolError if denied
        2. Send HTTP request
        3. Record timestamp in rate limiter
        4. Parse error responses into YNABAPIError
        5. Unwrap the ``data`` envelope
        6. Convert milliunit fields to dollars

    Args:
        method: HTTP method (GET, POST, PUT, PATCH, DELETE).
        path: API path relative to base URL (e.g., ``/user``).
        **kwargs: Additional keyword arguments passed to httpx
            (Any required -- httpx accepts diverse parameter types).

    Returns:
        The unwrapped ``data`` dict from the YNAB response with
        milliunit fields converted to dollar amounts.

    Raises:
        ToolError: If the rate limiter denies the request.
    """
    allowed, count, retry_after = self._rate_limiter.check()
    if not allowed:
        retry_seconds = int(retry_after) if retry_after is not None else 60
        logger.warning(
            "Rate limit reached: %d requests in window, retry after %ds",
            count,
            retry_seconds,
        )
        msg = (
            f"Rate limit reached ({count} requests this hour). "
            f"Try again in {retry_seconds} seconds."
        )
        raise ToolError(msg)

    response: httpx.Response = await self._http.request(method, path, **kwargs)
    self._rate_limiter.record()

    if response.status_code >= _OK.real + 100:
        self._raise_api_error(response)

    json_data: dict[str, Any] = response.json()
    data: dict[str, Any] = json_data["data"]
    converted = cast("dict[str, Any]", self._convert_milliunits(data))

    if method != "GET" and self._cache:
        self._cache.invalidate_for_mutation(path)

    return converted

get async

get(path: str, **kwargs: JSONValue) -> dict[str, Any]

Send a GET request, using delta cache when available.

For delta-capable endpoints, injects last_knowledge_of_server when cached knowledge exists, and merges delta responses into the cache. Strips server_knowledge from returned data.

Parameters:

  • path (str) –

    API path relative to base URL.

  • **kwargs (JSONValue, default: {} ) –

    Additional keyword arguments passed to httpx.

Returns:

  • dict[str, Any]

    The unwrapped and converted response data, with

  • dict[str, Any]

    server_knowledge stripped if present.

Source code in src/ynaa_mcp/client.py
async def get(self, path: str, **kwargs: JSONValue) -> dict[str, Any]:
    """Send a GET request, using delta cache when available.

    For delta-capable endpoints, injects ``last_knowledge_of_server``
    when cached knowledge exists, and merges delta responses into the
    cache. Strips ``server_knowledge`` from returned data.

    Args:
        path: API path relative to base URL.
        **kwargs: Additional keyword arguments passed to httpx.

    Returns:
        The unwrapped and converted response data, with
        ``server_knowledge`` stripped if present.
    """
    cache_key = cache_key_from_path(path) if self._cache else None

    if cache_key and self._cache:
        knowledge = self._cache.get_knowledge(cache_key)
        if knowledge is not None:
            existing_params = kwargs.get("params")
            params: dict[str, Any] = (
                dict(cast("dict[str, Any]", existing_params))
                if existing_params
                else {}
            )
            params["last_knowledge_of_server"] = knowledge
            kwargs["params"] = params

    data = await self.request("GET", path, **kwargs)

    if cache_key and self._cache and "server_knowledge" in data:
        sk = data["server_knowledge"]
        if self._cache.get_cached_data(cache_key) is not None:
            self._cache.merge_delta(cache_key, sk, strip_server_knowledge(data))
        else:
            self._cache.update(cache_key, sk, strip_server_knowledge(data))
        return strip_server_knowledge(data)

    return data

post async

post(path: str, **kwargs: JSONValue) -> dict[str, Any]

Send a POST request to the YNAB API.

Parameters:

  • path (str) –

    API path relative to base URL.

  • **kwargs (JSONValue, default: {} ) –

    Additional keyword arguments passed to httpx.

Returns:

  • dict[str, Any]

    The unwrapped and converted response data.

Source code in src/ynaa_mcp/client.py
async def post(self, path: str, **kwargs: JSONValue) -> dict[str, Any]:
    """Send a POST request to the YNAB API.

    Args:
        path: API path relative to base URL.
        **kwargs: Additional keyword arguments passed to httpx.

    Returns:
        The unwrapped and converted response data.
    """
    return await self.request("POST", path, **kwargs)

put async

put(path: str, **kwargs: JSONValue) -> dict[str, Any]

Send a PUT request to the YNAB API.

Parameters:

  • path (str) –

    API path relative to base URL.

  • **kwargs (JSONValue, default: {} ) –

    Additional keyword arguments passed to httpx.

Returns:

  • dict[str, Any]

    The unwrapped and converted response data.

Source code in src/ynaa_mcp/client.py
async def put(self, path: str, **kwargs: JSONValue) -> dict[str, Any]:
    """Send a PUT request to the YNAB API.

    Args:
        path: API path relative to base URL.
        **kwargs: Additional keyword arguments passed to httpx.

    Returns:
        The unwrapped and converted response data.
    """
    return await self.request("PUT", path, **kwargs)

delete async

delete(path: str, **kwargs: JSONValue) -> dict[str, Any]

Send a DELETE request to the YNAB API.

Parameters:

  • path (str) –

    API path relative to base URL.

  • **kwargs (JSONValue, default: {} ) –

    Additional keyword arguments passed to httpx.

Returns:

  • dict[str, Any]

    The unwrapped and converted response data.

Source code in src/ynaa_mcp/client.py
async def delete(self, path: str, **kwargs: JSONValue) -> dict[str, Any]:
    """Send a DELETE request to the YNAB API.

    Args:
        path: API path relative to base URL.
        **kwargs: Additional keyword arguments passed to httpx.

    Returns:
        The unwrapped and converted response data.
    """
    return await self.request("DELETE", path, **kwargs)

patch async

patch(path: str, **kwargs: JSONValue) -> dict[str, Any]

Send a PATCH request to the YNAB API.

Parameters:

  • path (str) –

    API path relative to base URL.

  • **kwargs (JSONValue, default: {} ) –

    Additional keyword arguments passed to httpx.

Returns:

  • dict[str, Any]

    The unwrapped and converted response data.

Source code in src/ynaa_mcp/client.py
async def patch(self, path: str, **kwargs: JSONValue) -> dict[str, Any]:
    """Send a PATCH request to the YNAB API.

    Args:
        path: API path relative to base URL.
        **kwargs: Additional keyword arguments passed to httpx.

    Returns:
        The unwrapped and converted response data.
    """
    return await self.request("PATCH", path, **kwargs)

cache_key_from_path

cache_key_from_path(path: str) -> str | None

Extract a cache key from an API path, or None if not delta-cacheable.

Matches paths like /budgets/{budget_id}/{resource} where resource is in :data:DELTA_ENDPOINTS.

Parameters:

  • path (str) –

    The API path (e.g., /budgets/abc-123/transactions).

Returns:

  • str | None

    A cache key like abc-123:transactions, or None if the path

  • str | None

    is not a delta-capable endpoint.

Source code in src/ynaa_mcp/cache.py
def cache_key_from_path(path: str) -> str | None:
    """Extract a cache key from an API path, or None if not delta-cacheable.

    Matches paths like ``/budgets/{budget_id}/{resource}`` where resource
    is in :data:`DELTA_ENDPOINTS`.

    Args:
        path: The API path (e.g., ``/budgets/abc-123/transactions``).

    Returns:
        A cache key like ``abc-123:transactions``, or None if the path
        is not a delta-capable endpoint.
    """
    match = _BUDGET_RESOURCE_PATTERN.match(path)
    if match:
        budget_id, resource = match.groups()
        return f"{budget_id}:{resource}"
    return None

strip_server_knowledge

strip_server_knowledge(data: dict[str, Any]) -> dict[str, Any]

Return a copy of data with the server_knowledge key removed.

Parameters:

  • data (dict[str, Any]) –

    The response data dict that may contain server_knowledge.

Returns:

  • dict[str, Any]

    A shallow copy of data without the server_knowledge key.

Source code in src/ynaa_mcp/cache.py
def strip_server_knowledge(data: dict[str, Any]) -> dict[str, Any]:
    """Return a copy of data with the ``server_knowledge`` key removed.

    Args:
        data: The response data dict that may contain ``server_knowledge``.

    Returns:
        A shallow copy of ``data`` without the ``server_knowledge`` key.
    """
    return {k: v for k, v in data.items() if k != "server_knowledge"}

milliunits_to_dollars

milliunits_to_dollars(milliunits: int) -> float

Convert YNAB milliunits to a dollar amount.

Parameters:

  • milliunits (int) –

    Amount in YNAB milliunits (1000 milliunits = $1.00).

Returns:

  • float

    The equivalent dollar amount as a float.

Examples:

>>> milliunits_to_dollars(45670)
45.67
>>> milliunits_to_dollars(-10000)
-10.0
Source code in src/ynaa_mcp/converters.py
def milliunits_to_dollars(milliunits: int) -> float:
    """Convert YNAB milliunits to a dollar amount.

    Args:
        milliunits: Amount in YNAB milliunits (1000 milliunits = $1.00).

    Returns:
        The equivalent dollar amount as a float.

    Examples:
        >>> milliunits_to_dollars(45670)
        45.67
        >>> milliunits_to_dollars(-10000)
        -10.0
    """
    return float(Decimal(milliunits) / MILLIUNIT_FACTOR)

_is_milliunit_field

_is_milliunit_field(key: str) -> bool

Check whether a response field name contains milliunit values.

Matches exact field names in :data:MILLIUNIT_FIELDS plus any key ending with _balance or _amount for forward compatibility.

Parameters:

  • key (str) –

    The field name to check.

Returns:

  • bool

    True if the field contains milliunit values.

Source code in src/ynaa_mcp/client.py
def _is_milliunit_field(key: str) -> bool:
    """Check whether a response field name contains milliunit values.

    Matches exact field names in :data:`MILLIUNIT_FIELDS` plus any key
    ending with ``_balance`` or ``_amount`` for forward compatibility.

    Args:
        key: The field name to check.

    Returns:
        True if the field contains milliunit values.
    """
    if key in MILLIUNIT_FIELDS:
        return True
    return key.endswith(("_balance", "_amount"))