Skip to content

api

VolurApiAsyncClient dataclass

A client for interacting with Völur API.

This client is used to interact with the Völur API to upload the data securely and effectively. See the actual methods for more details.

Note

This client was not intended to be used directly, instead use the SDK client from [volur.sdk.client.VolurClient][volur.sdk.client].

Source code in src/volur/api/v1alpha1/client.py
@dataclass
class VolurApiAsyncClient:
    """A client for interacting with Völur API.

    This client is used to interact with the Völur API to upload the data
    securely and effectively. See the actual methods for more details.

    Note:
        This client was not intended to be used directly, instead use the SDK client
        from [volur.sdk.client.VolurClient][volur.sdk.client].
    """

    settings: VolurApiSettings = field(default_factory=VolurApiSettings)

    async def upload_materials_information(
        self: "VolurApiAsyncClient",
        materials: AsyncIterator[material_pb2.Material],
    ) -> Status:
        """Uploads Materials Information to the Völur platform using the Völur
        API.

        This method is using a source to get the materials data and then send
        it to the Völur API. This method is asynchronous and will return a
        status of the operation.

        Args:
            materials: a source of materials data to be uploaded to the Völur
                platform.

        Returns:
            The status of the operation.
        """

        async def generate_requests() -> (
            AsyncIterator[material_pb2.UploadMaterialInformationRequest]
        ):
            try:
                async for material in materials:
                    yield material_pb2.UploadMaterialInformationRequest(
                        material=material,
                    )
            except Exception:
                logger.exception(
                    "error occurred while generating requests",
                )

        try:
            logger.info("start uploading materials data")
            channel = grpc.aio.secure_channel(
                self.settings.address,
                grpc.ssl_channel_credentials(),
            )
            stub = material_pb2_grpc.MaterialInformationServiceStub(channel)
            requests = generate_requests()
            stream = stub.UploadMaterialInformation(
                requests,  # type: ignore[arg-type]
                metadata=(
                    (
                        "authorization",
                        f"Bearer {self.settings.token.get_secret_value()}",
                    ),
                ),
            )
            while True:
                response = await stream.read()  # type: ignore[attr-defined]
                if response == grpc.aio.EOF:  # type: ignore[attr-defined]
                    logger.info("successfully uploaded materials information")
                    break
                if response.HasField("status"):
                    if response.status.code != 0:
                        logger.error(
                            f"error occurred while uploading materials information "
                            f"{response.status.code} {response.status.message}",
                        )
                    else:
                        logger.debug(
                            "successfully uploaded materials information",
                        )
                else:
                    raise ValueError("response from a server does not contain status")
            return Status(code=0)
        except grpc.aio.AioRpcError as rpc_error:
            if rpc_error.code() == grpc.StatusCode.UNAUTHENTICATED:
                logger.error(
                    "used token in invalid,"
                    " please set a valid token using"
                    " `VOLUR_API_TOKEN` environment variable",
                )
            else:
                with logger.contextualize(
                    rpc_error_code=rpc_error.code(),
                    rpc_error_details=rpc_error.details(),
                ):
                    logger.exception(
                        "error occurred while uploading materials information",
                    )
            code: int
            code, _ = rpc_error.code().value  # type: ignore[misc]
            message = _ if (_ := rpc_error.details()) else ""
            return Status(
                code=code,
                message=message,
            )

    async def upload_products_information(
        self: "VolurApiAsyncClient",
        products: AsyncIterator[product_pb2.Product],
    ) -> Status:
        """Uploads Products Information to the Völur platform using the Völur
        API.

        This method is using a source to get the products data and then send
        it to the Völur API. This method is asynchronous and will return a
        status of the operation.

        Args:
            products: an iterable of Product protos to be uploaded via API
                platform.

        Returns:
            The status of the operation.
        """

        async def generate_requests() -> (
            AsyncIterator[product_pb2.UploadProductInformationRequest]
        ):
            try:
                async for product in products:
                    yield product_pb2.UploadProductInformationRequest(
                        product=product,
                    )
            except Exception:
                logger.exception(
                    "error occurred while generating requests",
                )

        try:
            logger.info("start uploading products data")
            channel = grpc.aio.secure_channel(
                self.settings.address,
                grpc.ssl_channel_credentials(),
            )
            stub = product_pb2_grpc.ProductInformationServiceStub(channel)
            requests = generate_requests()
            stream = stub.UploadProductInformation(
                requests,  # type: ignore[arg-type]
                metadata=(
                    (
                        "authorization",
                        f"Bearer {self.settings.token.get_secret_value()}",
                    ),
                ),
            )
            while True:
                response = await stream.read()  # type: ignore[attr-defined]
                if response == grpc.aio.EOF:  # type: ignore[attr-defined]
                    logger.info("successfully uploaded products information")
                    break
                if response.HasField("status"):
                    if response.status.code != 0:
                        logger.error(
                            f"error occurred while uploading materials information "
                            f"{response.status.code} {response.status.message}",
                        )
                    else:
                        logger.debug(
                            "successfully uploaded products information",
                        )
                else:
                    raise ValueError("response from a server does not contain status")
            return Status(code=0)
        except grpc.aio.AioRpcError as rpc_error:
            if rpc_error.code() == grpc.StatusCode.UNAUTHENTICATED:
                logger.error(
                    "used token in invalid,"
                    " please set a valid token using"
                    " `VOLUR_API_TOKEN` environment variable",
                )
            else:
                with logger.contextualize(
                    rpc_error_code=rpc_error.code(),
                    rpc_error_details=rpc_error.details(),
                ):
                    logger.exception(
                        "error occurred while uploading products information",
                    )
            code: int
            code, _ = rpc_error.code().value  # type: ignore[misc]
            message = _ if (_ := rpc_error.details()) else ""
            return Status(
                code=code,
                message=message,
            )

    async def upload_demand_information(
        self: "VolurApiAsyncClient",
        demand: AsyncIterator[demand_pb2.Demand],
    ) -> Status:
        """Uploads Demand Information to the Völur platform using the Völur
        API.

        This method is using a source to get the demand data and then send
        it to the Völur API. This method is asynchronous and will return a
        status of the operation.

        Args:
            demand: a source of demand data to be uploaded to the Völur
                platform.

        Returns:
            The status of the operation.
        """

        async def generate_requests() -> (
            AsyncIterator[demand_pb2.UploadDemandInformationRequest]
        ):
            try:
                async for dem in demand:
                    yield demand_pb2.UploadDemandInformationRequest(
                        demand=dem,
                    )
            except Exception:
                logger.exception(
                    "error occurred while generating requests",
                )

        try:
            logger.info("start uploading demand data")
            channel = grpc.aio.secure_channel(
                self.settings.address,
                grpc.ssl_channel_credentials(),
            )
            stub = demand_pb2_grpc.DemandInformationServiceStub(channel)
            requests = generate_requests()
            stream = stub.UploadDemandInformation(
                requests,  # type: ignore[arg-type]
                metadata=(
                    (
                        "authorization",
                        f"Bearer {self.settings.token.get_secret_value()}",
                    ),
                ),
            )
            while True:
                response = await stream.read()  # type: ignore[attr-defined]
                if response == grpc.aio.EOF:  # type: ignore[attr-defined]
                    logger.info("successfully uploaded demand information")
                    break
                if response.HasField("status"):
                    if response.status.code != 0:
                        logger.error(
                            f"error occurred while uploading demand information "
                            f"{response.status.code} {response.status.message}",
                        )
                    else:
                        logger.debug(
                            "successfully uploaded demand information",
                        )
                else:
                    raise ValueError("response from a server does not contain status")
            return Status(code=0)
        except grpc.aio.AioRpcError as rpc_error:
            if rpc_error.code() == grpc.StatusCode.UNAUTHENTICATED:
                logger.error(
                    "used token in invalid,"
                    "please set a valid token using"
                    "`VOLUR_API_TOKEN` environment variable",
                )
            else:
                with logger.contextualize(
                    rpc_error_code=rpc_error.code(),
                    rpc_error_details=rpc_error.details(),
                ):
                    logger.exception(
                        "error occurred while uploading demand information",
                    )
            code: int
            code, _ = rpc_error.code().value  # type: ignore[misc]
            message = _ if (_ := rpc_error.details()) else ""
            return Status(
                code=code,
                message=message,
            )

upload_demand_information(demand) async

Uploads Demand Information to the Völur platform using the Völur API.

This method is using a source to get the demand data and then send it to the Völur API. This method is asynchronous and will return a status of the operation.

Parameters:

Name Type Description Default
demand AsyncIterator[Demand]

a source of demand data to be uploaded to the Völur platform.

required

Returns:

Type Description
Status

The status of the operation.

Source code in src/volur/api/v1alpha1/client.py
async def upload_demand_information(
    self: "VolurApiAsyncClient",
    demand: AsyncIterator[demand_pb2.Demand],
) -> Status:
    """Uploads Demand Information to the Völur platform using the Völur
    API.

    This method is using a source to get the demand data and then send
    it to the Völur API. This method is asynchronous and will return a
    status of the operation.

    Args:
        demand: a source of demand data to be uploaded to the Völur
            platform.

    Returns:
        The status of the operation.
    """

    async def generate_requests() -> (
        AsyncIterator[demand_pb2.UploadDemandInformationRequest]
    ):
        try:
            async for dem in demand:
                yield demand_pb2.UploadDemandInformationRequest(
                    demand=dem,
                )
        except Exception:
            logger.exception(
                "error occurred while generating requests",
            )

    try:
        logger.info("start uploading demand data")
        channel = grpc.aio.secure_channel(
            self.settings.address,
            grpc.ssl_channel_credentials(),
        )
        stub = demand_pb2_grpc.DemandInformationServiceStub(channel)
        requests = generate_requests()
        stream = stub.UploadDemandInformation(
            requests,  # type: ignore[arg-type]
            metadata=(
                (
                    "authorization",
                    f"Bearer {self.settings.token.get_secret_value()}",
                ),
            ),
        )
        while True:
            response = await stream.read()  # type: ignore[attr-defined]
            if response == grpc.aio.EOF:  # type: ignore[attr-defined]
                logger.info("successfully uploaded demand information")
                break
            if response.HasField("status"):
                if response.status.code != 0:
                    logger.error(
                        f"error occurred while uploading demand information "
                        f"{response.status.code} {response.status.message}",
                    )
                else:
                    logger.debug(
                        "successfully uploaded demand information",
                    )
            else:
                raise ValueError("response from a server does not contain status")
        return Status(code=0)
    except grpc.aio.AioRpcError as rpc_error:
        if rpc_error.code() == grpc.StatusCode.UNAUTHENTICATED:
            logger.error(
                "used token in invalid,"
                "please set a valid token using"
                "`VOLUR_API_TOKEN` environment variable",
            )
        else:
            with logger.contextualize(
                rpc_error_code=rpc_error.code(),
                rpc_error_details=rpc_error.details(),
            ):
                logger.exception(
                    "error occurred while uploading demand information",
                )
        code: int
        code, _ = rpc_error.code().value  # type: ignore[misc]
        message = _ if (_ := rpc_error.details()) else ""
        return Status(
            code=code,
            message=message,
        )

upload_materials_information(materials) async

Uploads Materials Information to the Völur platform using the Völur API.

This method is using a source to get the materials data and then send it to the Völur API. This method is asynchronous and will return a status of the operation.

Parameters:

Name Type Description Default
materials AsyncIterator[Material]

a source of materials data to be uploaded to the Völur platform.

required

Returns:

Type Description
Status

The status of the operation.

Source code in src/volur/api/v1alpha1/client.py
async def upload_materials_information(
    self: "VolurApiAsyncClient",
    materials: AsyncIterator[material_pb2.Material],
) -> Status:
    """Uploads Materials Information to the Völur platform using the Völur
    API.

    This method is using a source to get the materials data and then send
    it to the Völur API. This method is asynchronous and will return a
    status of the operation.

    Args:
        materials: a source of materials data to be uploaded to the Völur
            platform.

    Returns:
        The status of the operation.
    """

    async def generate_requests() -> (
        AsyncIterator[material_pb2.UploadMaterialInformationRequest]
    ):
        try:
            async for material in materials:
                yield material_pb2.UploadMaterialInformationRequest(
                    material=material,
                )
        except Exception:
            logger.exception(
                "error occurred while generating requests",
            )

    try:
        logger.info("start uploading materials data")
        channel = grpc.aio.secure_channel(
            self.settings.address,
            grpc.ssl_channel_credentials(),
        )
        stub = material_pb2_grpc.MaterialInformationServiceStub(channel)
        requests = generate_requests()
        stream = stub.UploadMaterialInformation(
            requests,  # type: ignore[arg-type]
            metadata=(
                (
                    "authorization",
                    f"Bearer {self.settings.token.get_secret_value()}",
                ),
            ),
        )
        while True:
            response = await stream.read()  # type: ignore[attr-defined]
            if response == grpc.aio.EOF:  # type: ignore[attr-defined]
                logger.info("successfully uploaded materials information")
                break
            if response.HasField("status"):
                if response.status.code != 0:
                    logger.error(
                        f"error occurred while uploading materials information "
                        f"{response.status.code} {response.status.message}",
                    )
                else:
                    logger.debug(
                        "successfully uploaded materials information",
                    )
            else:
                raise ValueError("response from a server does not contain status")
        return Status(code=0)
    except grpc.aio.AioRpcError as rpc_error:
        if rpc_error.code() == grpc.StatusCode.UNAUTHENTICATED:
            logger.error(
                "used token in invalid,"
                " please set a valid token using"
                " `VOLUR_API_TOKEN` environment variable",
            )
        else:
            with logger.contextualize(
                rpc_error_code=rpc_error.code(),
                rpc_error_details=rpc_error.details(),
            ):
                logger.exception(
                    "error occurred while uploading materials information",
                )
        code: int
        code, _ = rpc_error.code().value  # type: ignore[misc]
        message = _ if (_ := rpc_error.details()) else ""
        return Status(
            code=code,
            message=message,
        )

upload_products_information(products) async

Uploads Products Information to the Völur platform using the Völur API.

This method is using a source to get the products data and then send it to the Völur API. This method is asynchronous and will return a status of the operation.

Parameters:

Name Type Description Default
products AsyncIterator[Product]

an iterable of Product protos to be uploaded via API platform.

required

Returns:

Type Description
Status

The status of the operation.

Source code in src/volur/api/v1alpha1/client.py
async def upload_products_information(
    self: "VolurApiAsyncClient",
    products: AsyncIterator[product_pb2.Product],
) -> Status:
    """Uploads Products Information to the Völur platform using the Völur
    API.

    This method is using a source to get the products data and then send
    it to the Völur API. This method is asynchronous and will return a
    status of the operation.

    Args:
        products: an iterable of Product protos to be uploaded via API
            platform.

    Returns:
        The status of the operation.
    """

    async def generate_requests() -> (
        AsyncIterator[product_pb2.UploadProductInformationRequest]
    ):
        try:
            async for product in products:
                yield product_pb2.UploadProductInformationRequest(
                    product=product,
                )
        except Exception:
            logger.exception(
                "error occurred while generating requests",
            )

    try:
        logger.info("start uploading products data")
        channel = grpc.aio.secure_channel(
            self.settings.address,
            grpc.ssl_channel_credentials(),
        )
        stub = product_pb2_grpc.ProductInformationServiceStub(channel)
        requests = generate_requests()
        stream = stub.UploadProductInformation(
            requests,  # type: ignore[arg-type]
            metadata=(
                (
                    "authorization",
                    f"Bearer {self.settings.token.get_secret_value()}",
                ),
            ),
        )
        while True:
            response = await stream.read()  # type: ignore[attr-defined]
            if response == grpc.aio.EOF:  # type: ignore[attr-defined]
                logger.info("successfully uploaded products information")
                break
            if response.HasField("status"):
                if response.status.code != 0:
                    logger.error(
                        f"error occurred while uploading materials information "
                        f"{response.status.code} {response.status.message}",
                    )
                else:
                    logger.debug(
                        "successfully uploaded products information",
                    )
            else:
                raise ValueError("response from a server does not contain status")
        return Status(code=0)
    except grpc.aio.AioRpcError as rpc_error:
        if rpc_error.code() == grpc.StatusCode.UNAUTHENTICATED:
            logger.error(
                "used token in invalid,"
                " please set a valid token using"
                " `VOLUR_API_TOKEN` environment variable",
            )
        else:
            with logger.contextualize(
                rpc_error_code=rpc_error.code(),
                rpc_error_details=rpc_error.details(),
            ):
                logger.exception(
                    "error occurred while uploading products information",
                )
        code: int
        code, _ = rpc_error.code().value  # type: ignore[misc]
        message = _ if (_ := rpc_error.details()) else ""
        return Status(
            code=code,
            message=message,
        )

VolurApiSettings

Bases: BaseSettings

Settings for Völur API client.

This class is used to obtain required configurations for the client to work properly.

This class requires two environment variables to be set:

  • VOLUR_API_ADDRESS: Address of the Völur API server,
  • VOLUR_API_TOKEN: Token for authenticating to the Völur API server.

Please contact Völur to obtain the endpoint address and the token.

Examples:

example.py
settings = VolurApiSettings()
Source code in src/volur/api/v1alpha1/settings.py
class VolurApiSettings(BaseSettings):
    """Settings for Völur API client.

    This class is used to obtain required configurations for the client to work
    properly.

    This class requires two environment variables to be set:

    - `VOLUR_API_ADDRESS`: Address of the Völur API server,
    - `VOLUR_API_TOKEN`: Token for authenticating to the Völur API server.

    Please contact Völur to obtain the endpoint address and the token.

    Examples:
        ```python title="example.py" linenums="1"
        settings = VolurApiSettings()
        ```
    """

    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        env_prefix="VOLUR_API_",
    )
    address: str
    token: SecretStr
    debug: bool = Field(
        False,
        description="Enable debug mode",
    )