Skip to content

csv

DemandCSVFileSource dataclass

Bases: DemandSource

A CSV source for Demand.

This class simplifies the upload of Demand Information using CSV. Arguments: path: A path to the CSV file containing demand information. product_id_column: A column that is used to identify a product in a dataset delimiter: A delimiter used in CSV file plant_id_column: A column that is used to reference a production plant where material is used customer_id_column: A column that is used to reference a customer ordering the product quantity_column: A column that represent the quantity of product characteristics_columns: Specifies a list of arbitrary characteristics of a given material

Examples:

Minimal working example

This is a minimal required configuration of a CSV source for products.

simple.py
1
2
3
4
5
6
source = DemandCSVFileSource(
    "demand.csv",
    product_id_column=Column(
        "source_id",
    ),
)

Extended example with multiple characteristics

When your CSV source has a more complex configuration, you can utilise this class to upload information using characteristic and predefined columns.

example.py
source = DemandCSVFileSource(
    "demand.csv",
    product_id_column=Column(
        "product_id",
    ),
    plant_id_column=Column(
        "plant_id_column",
    ),
    customer_id_column=Column(
        "customer_id_column",
    ),
    quanity_column=Column(
        "weight",
        unit="kilogram",
    ),
    characteristics_columns=[
        CharacteristicColumnBool(
            column_name="type",
            characteristic_name="sale_type",
        ),
    ],
)

Source code in src/volur/sdk/v1alpha2/sources/csv/source.py
@dataclass
class DemandCSVFileSource(DemandSource):
    """A CSV source for Demand.

    This class simplifies the upload of Demand Information using CSV.
    Arguments:
        path: A path to the CSV file containing demand information.
        product_id_column: A column that is used to identify a product in a dataset
        delimiter: A delimiter used in CSV file
        plant_id_column: A column that is used to reference a production plant where material is used
        customer_id_column: A column that is used to reference a customer ordering the product
        quantity_column: A column that represent the quantity of product
        characteristics_columns: Specifies a list of arbitrary characteristics of a given material

    Examples:
        ### Minimal working example
        This is a **minimal** required configuration of a CSV source for
        products.
        ```python title="simple.py" linenums="1"
        source = DemandCSVFileSource(
            "demand.csv",
            product_id_column=Column(
                "source_id",
            ),
        )
        ```
        ### Extended example with multiple characteristics
        When your CSV source has a more complex configuration, you can utilise
        this class to upload information using characteristic and predefined
        columns.
        ```python title="example.py" linenums="1"
        source = DemandCSVFileSource(
            "demand.csv",
            product_id_column=Column(
                "product_id",
            ),
            plant_id_column=Column(
                "plant_id_column",
            ),
            customer_id_column=Column(
                "customer_id_column",
            ),
            quanity_column=Column(
                "weight",
                unit="kilogram",
            ),
            characteristics_columns=[
                CharacteristicColumnBool(
                    column_name="type",
                    characteristic_name="sale_type",
                ),
            ],
        )
        ```
    """  # noqa: E501

    path: str | pathlib.Path | io.BufferedIOBase
    has_header: bool = field(default=True)
    _data: AsyncIterator[demand_pb2.Demand] | None = field(
        default=None,
        init=False,
        repr=False,
    )
    delimiter: str = field(
        default=",",
    )
    product_id_column: Column | None = field(default=None)
    plant_id_column: Column | None = field(default=None)
    customer_id_column: Column | None = field(default=None)
    quantity_column: QuantityColumn | None = field(default=None)
    characteristics_columns: list[CharacteristicColumn] = field(
        default_factory=list,
    )

    @property
    def columns(
        self: "DemandCSVFileSource",
    ) -> list[str | int]:
        _: list[str | int] = []
        if self.product_id_column:
            if self.product_id_column:
                _.append(self.product_id_column.column_id)
        if self.characteristics_columns:
            for characteristic in self.characteristics_columns:
                _.append(characteristic.column_id)
        return _

    def __aiter__(
        self: "DemandCSVFileSource",
    ) -> AsyncIterator[demand_pb2.Demand]:
        self._data = self._load()
        return self

    async def __anext__(
        self: "DemandCSVFileSource",
    ) -> demand_pb2.Demand:
        if self._data is None:
            self._data = self._load()
        data = await anext(self._data, None)
        if data is None:
            raise StopAsyncIteration()
        return data

    def _load(
        self: "DemandCSVFileSource",
    ) -> AsyncIterator[demand_pb2.Demand]:
        return (
            self._load_data_from_a_file_with_header()
            if self.has_header
            else self._load_data_from_a_file_without_header()
        )

    async def _load_data_from_a_file_with_header(
        self: "DemandCSVFileSource",
    ) -> AsyncIterator[demand_pb2.Demand]:
        if isinstance(self.path, io.BufferedIOBase):
            reader = csv.DictReader(
                buffered_io_base_to_str_iterable(self.path),
                delimiter=self.delimiter,
            )
            for data in reader:
                yield self._create_demand(data)
        if isinstance(self.path, (str, Path)):
            with open(self.path, mode="r") as source:
                reader = csv.reader(  # type: ignore[assignment]
                    source,
                    delimiter=self.delimiter,
                    strict=True,
                )
                header = next(reader)
                async for row in anyio.wrap_file(source):
                    data = dict(
                        zip(
                            header,
                            next(
                                csv.reader(
                                    [row],
                                    delimiter=self.delimiter,
                                    strict=True,
                                )
                            ),
                            strict=False,
                        ),
                    )
                    yield self._create_demand(data)

    async def _load_data_from_a_file_without_header(
        self: "DemandCSVFileSource",
    ) -> AsyncIterator[demand_pb2.Demand]:
        if isinstance(self.path, io.BufferedIOBase):
            dialect = csv.Sniffer().sniff(self.path.read(1024).decode("utf-8"))
            self.path.seek(0)
            reader = csv.reader(
                buffered_io_base_to_str_iterable(self.path),
                dialect=dialect,
                strict=True,
            )
            first_row = next(reader)
            number_of_columns = max(
                max(self.columns),
                len(first_row),
            )
            self.path.seek(0)
            reader = csv.reader(
                buffered_io_base_to_str_iterable(self.path),
                dialect=dialect,
                strict=True,
            )
            for _ in reader:
                data = dict(zip(range(number_of_columns), _, strict=False))  # type: ignore[arg-type]
                print(data)
                yield self._create_demand(data)  # type: ignore[arg-type]
        if isinstance(self.path, (str, Path)):
            with open(self.path, mode="r") as source:
                reader = csv.reader(
                    source,
                    delimiter=self.delimiter,
                    strict=True,
                )
                first_row = next(reader)
                number_of_columns = max(max(self.columns), len(first_row))
                source.seek(0)
                async for row in anyio.wrap_file(source):
                    data = dict(
                        zip(
                            range(number_of_columns),  # type: ignore[arg-type]
                            next(
                                csv.reader(
                                    [row],
                                    delimiter=self.delimiter,
                                    strict=True,
                                )
                            ),
                            strict=False,
                        ),
                    )
                    yield self._create_demand(data)  # type: ignore[arg-type]

    def _create_demand(
        self: "DemandCSVFileSource",
        data: dict[str | int, Any],
    ) -> demand_pb2.Demand:
        demand = demand_pb2.Demand()
        if self.product_id_column:
            product = product_pb2.Product(
                product_id=data.get(self.product_id_column.column_id, None)
            )
            demand.product.CopyFrom(product)
        if self.plant_id_column:
            demand.plant = data.get(self.plant_id_column.column_id, None)
        if self.customer_id_column:
            demand.customer_id = data.get(self.customer_id_column.column_id, None)
        if self.quantity_column:
            quantity = self.quantity_column.get_value(data)
            demand.quantity.CopyFrom(quantity)
        if self.characteristics_columns:
            demand.characteristics.extend(
                [column.get_value(data) for column in self.characteristics_columns]
            )
        return demand

DemandSource dataclass

Base class for the demand sources. This class in an abstract class that defines the interface for the demand CSV source.

Source code in src/volur/sdk/v1alpha2/sources/csv/base.py
@dataclass
class DemandSource:
    """
    Base class for the demand sources.
    This class in an abstract class that defines the interface for the demand CSV
    source.
    """

    @abc.abstractmethod
    def __aiter__(self: "DemandSource") -> AsyncIterator[demand_pb2.Demand]:
        """DemandSource implements Asynchronous Iterator.
        This allows you to use any implementation of DemandSource as
        ```python title="example.py" linenums="1"
        source = DemandSourceImplementation()
        for _ in source:
            # do something with Demand
        ```
        """
        ...

    @abc.abstractmethod
    async def __anext__(
        self: "DemandSource",
    ) -> demand_pb2.Demand:
        """You can fetch the next element in the asynchronous iterator."""
        ...

__aiter__() abstractmethod

DemandSource implements Asynchronous Iterator. This allows you to use any implementation of DemandSource as

example.py
1
2
3
source = DemandSourceImplementation()
for _ in source:
    # do something with Demand

Source code in src/volur/sdk/v1alpha2/sources/csv/base.py
@abc.abstractmethod
def __aiter__(self: "DemandSource") -> AsyncIterator[demand_pb2.Demand]:
    """DemandSource implements Asynchronous Iterator.
    This allows you to use any implementation of DemandSource as
    ```python title="example.py" linenums="1"
    source = DemandSourceImplementation()
    for _ in source:
        # do something with Demand
    ```
    """
    ...

__anext__() abstractmethod async

You can fetch the next element in the asynchronous iterator.

Source code in src/volur/sdk/v1alpha2/sources/csv/base.py
@abc.abstractmethod
async def __anext__(
    self: "DemandSource",
) -> demand_pb2.Demand:
    """You can fetch the next element in the asynchronous iterator."""
    ...

MaterialsCSVFileSource dataclass

Bases: MaterialsSource

A CSV source for Materials.

This class simplifies the upload of Materials Information using CSV.

Parameters:

Name Type Description Default
path str | Path | BufferedIOBase

A path to the CSV file containing materials information.

required
material_id_column Column | None

A column that is used to uniquely identify a material in a dataset

None
delimiter str

A delimiter used in CSV file

','
plant_id_column Column | None

A column that is used to reference a production plant where material is used

None
quantity_column QuantityColumn | None

A column that represent the quantity of material

None
characteristics_columns list[CharacteristicColumn]

Specifies a list of arbitrary characteristics of a given material

list()

Examples:

Minimal working example

This is a minimal required configuration of a CSV source for materials.

simple.py
1
2
3
4
5
6
source = MaterialsCSVFileSource(
    "materials.csv",
    material_id_column=Column(
        "source_id",
    ),
)
Extended example with multiple characteristics

When your CSV source has a more complex configuration, you can utilise this class to upload information using characteristic and predefined columns.

example.py
source = MaterialsCSVFileSource(
    "materials.csv",
    material_id_column=Column(
        "material_id",
    ),
    plant_id_column=Column(
        "plant_id_column",
    ),
    quanity_column=Column(
        "weight",
        unit="kilogram",
    ),
    characteristics_columns=[
        CharacteristicColumnBool(
            column_name="frozen",
            characteristic_name="is_frozen",
        ),
        CharacteristicColumnString(
            column_name="quality_category",
            characteristic_name="quality_category",
        ),
        CharacteristicColumnFloat(
            column_name="lean_percentage",
            characteristic_name="lean_percentage",
        ),
    ],
)
Source code in src/volur/sdk/v1alpha2/sources/csv/source.py
@dataclass
class MaterialsCSVFileSource(MaterialsSource):
    """A CSV source for Materials.

    This class simplifies the upload of Materials Information using CSV.

    Arguments:
        path: A path to the CSV file containing materials information.
        material_id_column: A column that is used to uniquely identify a material in a dataset
        delimiter: A delimiter used in CSV file
        plant_id_column: A column that is used to reference a production plant where material is used
        quantity_column: A column that represent the quantity of material
        characteristics_columns: Specifies a list of arbitrary characteristics of a given material

    Examples:
        ### Minimal working example
        This is a **minimal** required configuration of a CSV source for
        materials.

        ```python title="simple.py" linenums="1"
        source = MaterialsCSVFileSource(
            "materials.csv",
            material_id_column=Column(
                "source_id",
            ),
        )
        ```
        ### Extended example with multiple characteristics
        When your CSV source has a more complex configuration, you can utilise
        this class to upload information using characteristic and predefined
        columns.

        ```python title="example.py" linenums="1"
        source = MaterialsCSVFileSource(
            "materials.csv",
            material_id_column=Column(
                "material_id",
            ),
            plant_id_column=Column(
                "plant_id_column",
            ),
            quanity_column=Column(
                "weight",
                unit="kilogram",
            ),
            characteristics_columns=[
                CharacteristicColumnBool(
                    column_name="frozen",
                    characteristic_name="is_frozen",
                ),
                CharacteristicColumnString(
                    column_name="quality_category",
                    characteristic_name="quality_category",
                ),
                CharacteristicColumnFloat(
                    column_name="lean_percentage",
                    characteristic_name="lean_percentage",
                ),
            ],
        )
        ```
    """  # noqa: E501

    path: str | pathlib.Path | io.BufferedIOBase
    has_header: bool = field(default=True)
    _data: AsyncIterator[material_pb2.Material] | None = field(
        default=None,
        init=False,
        repr=False,
    )
    delimiter: str = field(
        default=",",
    )
    material_id_column: Column | None = field(default=None)
    plant_id_column: Column | None = field(default=None)
    quantity_column: QuantityColumn | None = field(default=None)
    characteristics_columns: list[CharacteristicColumn] = field(
        default_factory=list,
    )

    @property
    def columns(
        self: "MaterialsCSVFileSource",
    ) -> list[str | int]:
        _: list[str | int] = []
        if self.material_id_column:
            if self.material_id_column:
                _.append(self.material_id_column.column_id)
        if self.plant_id_column:
            _.append(self.plant_id_column.column_id)
        if self.quantity_column:
            _.append(self.quantity_column.column_id)
        if self.characteristics_columns:
            for characteristic in self.characteristics_columns:
                _.append(characteristic.column_id)
        return _

    def __aiter__(
        self: "MaterialsCSVFileSource",
    ) -> AsyncIterator[material_pb2.Material]:
        self._data = self._load()
        return self

    async def __anext__(
        self: "MaterialsCSVFileSource",
    ) -> material_pb2.Material:
        if self._data is None:
            self._data = self._load()
        data = await anext(self._data, None)
        if data is None:
            raise StopAsyncIteration()
        return data

    def _load(
        self: "MaterialsCSVFileSource",
    ) -> AsyncIterator[material_pb2.Material]:
        return (
            self._load_data_from_a_file_with_header()
            if self.has_header
            else self._load_data_from_a_file_without_header()
        )

    async def _load_data_from_a_file_with_header(
        self: "MaterialsCSVFileSource",
    ) -> AsyncIterator[material_pb2.Material]:
        if isinstance(self.path, io.BufferedIOBase):
            reader = csv.DictReader(
                buffered_io_base_to_str_iterable(self.path),
                delimiter=self.delimiter,
            )
            for data in reader:
                yield self._create_material(data)
        if isinstance(self.path, (str, Path)):
            with open(self.path, mode="r") as source:
                reader = csv.reader(  # type: ignore[assignment]
                    source,
                    delimiter=self.delimiter,
                    strict=True,
                )
                header = next(reader)
                async for row in anyio.wrap_file(source):
                    data = dict(
                        zip(
                            header,
                            next(
                                csv.reader(
                                    [row],
                                    delimiter=self.delimiter,
                                    strict=True,
                                )
                            ),
                            strict=False,
                        ),
                    )
                    yield self._create_material(data)

    async def _load_data_from_a_file_without_header(
        self: "MaterialsCSVFileSource",
    ) -> AsyncIterator[material_pb2.Material]:
        if isinstance(self.path, io.BufferedIOBase):
            dialect = csv.Sniffer().sniff(self.path.read(1024).decode("utf-8"))
            self.path.seek(0)
            reader = csv.reader(
                buffered_io_base_to_str_iterable(self.path),
                dialect=dialect,
                strict=True,
            )
            first_row = next(reader)
            number_of_columns = max(
                max(self.columns),
                len(first_row),
            )
            self.path.seek(0)
            reader = csv.reader(
                buffered_io_base_to_str_iterable(self.path),
                dialect=dialect,
                strict=True,
            )
            for _ in reader:
                data = dict(zip(range(number_of_columns), _, strict=False))  # type: ignore[arg-type]
                print(data)
                yield self._create_material(data)  # type: ignore[arg-type]
        if isinstance(self.path, (str, Path)):
            with open(self.path, mode="r") as source:
                reader = csv.reader(
                    source,
                    delimiter=self.delimiter,
                    strict=True,
                )
                first_row = next(reader)
                number_of_columns = max(max(self.columns), len(first_row))
                source.seek(0)
                async for row in anyio.wrap_file(source):
                    data = dict(
                        zip(
                            range(number_of_columns),  # type: ignore[arg-type]
                            next(
                                csv.reader(
                                    [row],
                                    delimiter=self.delimiter,
                                    strict=True,
                                )
                            ),
                            strict=False,
                        ),
                    )
                    yield self._create_material(data)  # type: ignore[arg-type]

    def _create_material(
        self: "MaterialsCSVFileSource",
        data: dict[str | int, Any],
    ) -> material_pb2.Material:
        material = material_pb2.Material()
        if self.material_id_column:
            material.material_id = data.get(
                self.material_id_column.column_id,
                None,
            )
            # todo: support auto-generation of material_id
        if self.plant_id_column:
            material.plant = data.get(self.plant_id_column.column_id, None)
        if self.quantity_column:
            quantity = self.quantity_column.get_value(data)
            material.quantity.CopyFrom(quantity)
        if self.characteristics_columns:
            material.characteristics.extend(
                [column.get_value(data) for column in self.characteristics_columns]
            )
        return material

MaterialsSource

Base class for material sources.

This class in an abstract class that defines the interface for materials CSV source.

Source code in src/volur/sdk/v1alpha2/sources/csv/base.py
class MaterialsSource:
    """
    Base class for material sources.

    This class in an abstract class that defines the interface for materials CSV
    source.
    """

    @abc.abstractmethod
    def __aiter__(self: "MaterialsSource") -> AsyncIterator[material_pb2.Material]:
        """MaterialSource implements Asynchronous Iterator.

        This allows you to use any implementation of MaterialSource as

        ```python title="example.py" linenums="1"
        source = MaterialSourceImplementation()
        for _ in source:
            # do something with Material
        ```
        """
        ...

    @abc.abstractmethod
    async def __anext__(
        self: "MaterialsSource",
    ) -> material_pb2.Material:
        """You can fetch the next element in the asynchronous iterator."""
        ...

__aiter__() abstractmethod

MaterialSource implements Asynchronous Iterator.

This allows you to use any implementation of MaterialSource as

example.py
1
2
3
source = MaterialSourceImplementation()
for _ in source:
    # do something with Material
Source code in src/volur/sdk/v1alpha2/sources/csv/base.py
@abc.abstractmethod
def __aiter__(self: "MaterialsSource") -> AsyncIterator[material_pb2.Material]:
    """MaterialSource implements Asynchronous Iterator.

    This allows you to use any implementation of MaterialSource as

    ```python title="example.py" linenums="1"
    source = MaterialSourceImplementation()
    for _ in source:
        # do something with Material
    ```
    """
    ...

__anext__() abstractmethod async

You can fetch the next element in the asynchronous iterator.

Source code in src/volur/sdk/v1alpha2/sources/csv/base.py
@abc.abstractmethod
async def __anext__(
    self: "MaterialsSource",
) -> material_pb2.Material:
    """You can fetch the next element in the asynchronous iterator."""
    ...

ProductsCSVFileSource dataclass

Bases: ProductsSource

A CSV source for Products.

This class simplifies the upload of Product Information using CSV. Arguments: path: A path to the CSV file containing product information. product_id_column: A column that is used to uniquely identify a product in a dataset delimiter: A delimiter used in CSV file characteristics_columns: Specifies a list of arbitrary characteristics of a given product

Examples:

Minimal working example

This is a minimal required configuration of a CSV source for products.

simple.py
1
2
3
4
5
6
source = ProductCSVFileSource(
    "products.csv",
    product_id_column=Column(
        "source_id",
    ),
)

Extended example with multiple characteristics

When your CSV source has a more complex configuration, you can utilise this class to upload information using characteristic and predefined columns.

example.py
source = ProductCSVFileSource(
    "products.csv",
    product_id_column=Column(
        "product_id",
    ),
    characteristics_columns=[
        CharacteristicColumnString(
            column_name="description",
            characteristic_name="description",
        ),
        CharacteristicColumnString(
            column_name="plant_code",
            characteristic_name="plant_code",
        ),
    ],
)

Source code in src/volur/sdk/v1alpha2/sources/csv/source.py
@dataclass
class ProductsCSVFileSource(ProductsSource):
    """A CSV source for Products.

    This class simplifies the upload of Product Information using CSV.
    Arguments:
        path: A path to the CSV file containing product information.
        product_id_column: A column that is used to uniquely identify a product in a dataset
        delimiter: A delimiter used in CSV file
        characteristics_columns: Specifies a list of arbitrary characteristics of a given product

    Examples:
        ### Minimal working example
        This is a **minimal** required configuration of a CSV source for
        products.
        ```python title="simple.py" linenums="1"
        source = ProductCSVFileSource(
            "products.csv",
            product_id_column=Column(
                "source_id",
            ),
        )
        ```
        ### Extended example with multiple characteristics
        When your CSV source has a more complex configuration, you can utilise
        this class to upload information using characteristic and predefined
        columns.
        ```python title="example.py" linenums="1"
        source = ProductCSVFileSource(
            "products.csv",
            product_id_column=Column(
                "product_id",
            ),
            characteristics_columns=[
                CharacteristicColumnString(
                    column_name="description",
                    characteristic_name="description",
                ),
                CharacteristicColumnString(
                    column_name="plant_code",
                    characteristic_name="plant_code",
                ),
            ],
        )
        ```
    """  # noqa: E501

    path: str | pathlib.Path | io.BufferedIOBase
    has_header: bool = field(default=True)
    _data: AsyncIterator[product_pb2.Product] | None = field(
        default=None,
        init=False,
        repr=False,
    )
    delimiter: str = field(
        default=",",
    )
    product_id_column: Column | None = field(default=None)
    characteristics_columns: list[CharacteristicColumn] = field(
        default_factory=list,
    )

    @property
    def columns(
        self: "ProductsCSVFileSource",
    ) -> list[str | int]:
        _: list[str | int] = []
        if self.product_id_column:
            if self.product_id_column:
                _.append(self.product_id_column.column_id)
        if self.characteristics_columns:
            for characteristic in self.characteristics_columns:
                _.append(characteristic.column_id)
        return _

    def __aiter__(
        self: "ProductsCSVFileSource",
    ) -> AsyncIterator[product_pb2.Product]:
        self._data = self._load()
        return self

    async def __anext__(
        self: "ProductsCSVFileSource",
    ) -> product_pb2.Product:
        if self._data is None:
            self._data = self._load()
        data = await anext(self._data, None)
        if data is None:
            raise StopAsyncIteration()
        return data

    def _load(
        self: "ProductsCSVFileSource",
    ) -> AsyncIterator[product_pb2.Product]:
        return (
            self._load_data_from_a_file_with_header()
            if self.has_header
            else self._load_data_from_a_file_without_header()
        )

    async def _load_data_from_a_file_with_header(
        self: "ProductsCSVFileSource",
    ) -> AsyncIterator[product_pb2.Product]:
        if isinstance(self.path, io.BufferedIOBase):
            reader = csv.DictReader(
                buffered_io_base_to_str_iterable(self.path),
                delimiter=self.delimiter,
            )
            for data in reader:
                yield self._create_product(data)
        if isinstance(self.path, (str, Path)):
            with open(self.path, mode="r") as source:
                reader = csv.reader(  # type: ignore[assignment]
                    source,
                    delimiter=self.delimiter,
                    strict=True,
                )
                header = next(reader)
                async for row in anyio.wrap_file(source):
                    data = dict(
                        zip(
                            header,
                            next(
                                csv.reader(
                                    [row],
                                    delimiter=self.delimiter,
                                    strict=True,
                                )
                            ),
                            strict=False,
                        ),
                    )
                    yield self._create_product(data)

    async def _load_data_from_a_file_without_header(
        self: "ProductsCSVFileSource",
    ) -> AsyncIterator[product_pb2.Product]:
        if isinstance(self.path, io.BufferedIOBase):
            dialect = csv.Sniffer().sniff(self.path.read(1024).decode("utf-8"))
            self.path.seek(0)
            reader = csv.reader(
                buffered_io_base_to_str_iterable(self.path),
                dialect=dialect,
                strict=True,
            )
            first_row = next(reader)
            number_of_columns = max(
                max(self.columns),
                len(first_row),
            )
            self.path.seek(0)
            reader = csv.reader(
                buffered_io_base_to_str_iterable(self.path),
                dialect=dialect,
                strict=True,
            )
            for _ in reader:
                data = dict(zip(range(number_of_columns), _, strict=False))  # type: ignore[arg-type]
                print(data)
                yield self._create_product(data)  # type: ignore[arg-type]
        if isinstance(self.path, (str, Path)):
            with open(self.path, mode="r") as source:
                reader = csv.reader(
                    source,
                    delimiter=self.delimiter,
                    strict=True,
                )
                first_row = next(reader)
                number_of_columns = max(max(self.columns), len(first_row))
                source.seek(0)
                async for row in anyio.wrap_file(source):
                    data = dict(
                        zip(
                            range(number_of_columns),  # type: ignore[arg-type]
                            next(
                                csv.reader(
                                    [row],
                                    delimiter=self.delimiter,
                                    strict=True,
                                )
                            ),
                            strict=False,
                        ),
                    )
                    yield self._create_product(data)  # type: ignore[arg-type]

    def _create_product(
        self: "ProductsCSVFileSource",
        data: dict[str | int, Any],
    ) -> product_pb2.Product:
        product = product_pb2.Product()
        if self.product_id_column:
            product.product_id = data.get(
                self.product_id_column.column_id,
                None,
            )
        if self.characteristics_columns:
            product.characteristics.extend(
                [column.get_value(data) for column in self.characteristics_columns]
            )
        return product

ProductsSource dataclass

Base class for the product sources. This class in an abstract class that defines the interface for the products CSV source.

Source code in src/volur/sdk/v1alpha2/sources/csv/base.py
@dataclass
class ProductsSource:
    """
    Base class for the product sources.
    This class in an abstract class that defines the interface for the products CSV
    source.
    """

    @abc.abstractmethod
    def __aiter__(self: "ProductsSource") -> AsyncIterator[product_pb2.Product]:
        """ProductSource implements Asynchronous Iterator.
        This allows you to use any implementation of ProductsSource as
        ```python title="example.py" linenums="1"
        source = ProductSourceImplementation()
        for _ in source:
            # do something with Product
        ```
        """
        ...

    @abc.abstractmethod
    async def __anext__(
        self: "ProductsSource",
    ) -> product_pb2.Product:
        """You can fetch the next element in the asynchronous iterator."""
        ...

__aiter__() abstractmethod

ProductSource implements Asynchronous Iterator. This allows you to use any implementation of ProductsSource as

example.py
1
2
3
source = ProductSourceImplementation()
for _ in source:
    # do something with Product

Source code in src/volur/sdk/v1alpha2/sources/csv/base.py
@abc.abstractmethod
def __aiter__(self: "ProductsSource") -> AsyncIterator[product_pb2.Product]:
    """ProductSource implements Asynchronous Iterator.
    This allows you to use any implementation of ProductsSource as
    ```python title="example.py" linenums="1"
    source = ProductSourceImplementation()
    for _ in source:
        # do something with Product
    ```
    """
    ...

__anext__() abstractmethod async

You can fetch the next element in the asynchronous iterator.

Source code in src/volur/sdk/v1alpha2/sources/csv/base.py
@abc.abstractmethod
async def __anext__(
    self: "ProductsSource",
) -> product_pb2.Product:
    """You can fetch the next element in the asynchronous iterator."""
    ...