Spaces:
Runtime error
Runtime error
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. | |
# Copyright 2022 The HuggingFace Team. All rights reserved. | |
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
""" ConfigMixin base class and utilities.""" | |
import functools | |
import importlib | |
import inspect | |
import json | |
import os | |
import re | |
import tempfile | |
from collections import OrderedDict | |
from typing import Any, Dict, Optional, Tuple, Union | |
import numpy as np | |
from huggingface_hub import ( | |
create_repo, | |
get_hf_file_metadata, | |
hf_hub_download, | |
hf_hub_url, | |
repo_type_and_id_from_hf_id, | |
upload_folder, | |
) | |
from huggingface_hub.utils import EntryNotFoundError | |
from requests import HTTPError | |
from .download_utils import ppdiffusers_bos_download | |
from .utils import ( | |
DOWNLOAD_SERVER, | |
HF_CACHE, | |
PPDIFFUSERS_CACHE, | |
DummyObject, | |
deprecate, | |
logging, | |
) | |
from .version import VERSION as __version__ | |
logger = logging.get_logger(__name__) | |
_re_configuration_file = re.compile(r"config\.(.*)\.json") | |
class FrozenDict(OrderedDict): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
for key, value in self.items(): | |
setattr(self, key, value) | |
self.__frozen = True | |
def __delitem__(self, *args, **kwargs): | |
raise Exception(f"You cannot use ``__delitem__`` on a {self.__class__.__name__} instance.") | |
def setdefault(self, *args, **kwargs): | |
raise Exception(f"You cannot use ``setdefault`` on a {self.__class__.__name__} instance.") | |
def pop(self, *args, **kwargs): | |
raise Exception(f"You cannot use ``pop`` on a {self.__class__.__name__} instance.") | |
def update(self, *args, **kwargs): | |
raise Exception(f"You cannot use ``update`` on a {self.__class__.__name__} instance.") | |
def __setattr__(self, name, value): | |
if hasattr(self, "__frozen") and self.__frozen: | |
raise Exception(f"You cannot use ``__setattr__`` on a {self.__class__.__name__} instance.") | |
super().__setattr__(name, value) | |
def __setitem__(self, name, value): | |
if hasattr(self, "__frozen") and self.__frozen: | |
raise Exception(f"You cannot use ``__setattr__`` on a {self.__class__.__name__} instance.") | |
super().__setitem__(name, value) | |
class ConfigMixin: | |
r""" | |
Base class for all configuration classes. Stores all configuration parameters under `self.config` Also handles all | |
methods for loading/downloading/saving classes inheriting from [`ConfigMixin`] with | |
- [`~ConfigMixin.from_config`] | |
- [`~ConfigMixin.save_config`] | |
Class attributes: | |
- **config_name** (`str`) -- A filename under which the config should stored when calling | |
[`~ConfigMixin.save_config`] (should be overridden by parent class). | |
- **ignore_for_config** (`List[str]`) -- A list of attributes that should not be saved in the config (should be | |
overridden by subclass). | |
- **has_compatibles** (`bool`) -- Whether the class has compatible classes (should be overridden by subclass). | |
- **_deprecated_kwargs** (`List[str]`) -- Keyword arguments that are deprecated. Note that the init function | |
should only have a `kwargs` argument if at least one argument is deprecated (should be overridden by | |
subclass). | |
""" | |
config_name = None | |
ignore_for_config = [] | |
has_compatibles = False | |
_deprecated_kwargs = [] | |
def register_to_config(self, **kwargs): | |
if self.config_name is None: | |
raise NotImplementedError(f"Make sure that {self.__class__} has defined a class name `config_name`") | |
# Special case for `kwargs` used in deprecation warning added to schedulers | |
# TODO: remove this when we remove the deprecation warning, and the `kwargs` argument, | |
# or solve in a more general way. | |
kwargs.pop("kwargs", None) | |
for key, value in kwargs.items(): | |
try: | |
setattr(self, key, value) | |
except AttributeError as err: | |
logger.error(f"Can't set {key} with value {value} for {self}") | |
raise err | |
if not hasattr(self, "_internal_dict"): | |
internal_dict = kwargs | |
else: | |
previous_dict = dict(self._internal_dict) | |
internal_dict = {**self._internal_dict, **kwargs} | |
logger.debug(f"Updating config from {previous_dict} to {internal_dict}") | |
self._internal_dict = FrozenDict(internal_dict) | |
def save_config(self, save_directory: Union[str, os.PathLike], push_to_hub: bool = False, **kwargs): | |
""" | |
Save a configuration object to the directory `save_directory`, so that it can be re-loaded using the | |
[`~ConfigMixin.from_config`] class method. | |
Args: | |
save_directory (`str` or `os.PathLike`): | |
Directory where the configuration JSON file will be saved (will be created if it does not exist). | |
""" | |
if os.path.isfile(save_directory): | |
raise AssertionError(f"Provided path ({save_directory}) should be a directory, not a file") | |
os.makedirs(save_directory, exist_ok=True) | |
# If we save using the predefined names, we can load using `from_config` | |
output_config_file = os.path.join(save_directory, self.config_name) | |
self.to_json_file(output_config_file) | |
logger.info(f"Configuration saved in {output_config_file}") | |
def save_to_hf_hub( | |
self, | |
repo_id: str, | |
private: Optional[bool] = None, | |
subfolder: Optional[str] = None, | |
commit_message: Optional[str] = None, | |
revision: Optional[str] = None, | |
create_pr: bool = False, | |
): | |
""" | |
Uploads all elements of this config to a new HuggingFace Hub repository. | |
Args: | |
repo_id (str): Repository name for your model/tokenizer in the Hub. | |
private (bool, optional): Whether the model/tokenizer is set to private | |
subfolder (str, optional): Push to a subfolder of the repo instead of the root | |
commit_message (str, optional): The summary / title / first line of the generated commit. Defaults to: f"Upload {path_in_repo} with huggingface_hub" | |
revision (str, optional): The git revision to commit from. Defaults to the head of the "main" branch. | |
create_pr (boolean, optional): Whether or not to create a Pull Request with that commit. Defaults to False. | |
If revision is not set, PR is opened against the "main" branch. If revision is set and is a branch, PR is opened against this branch. | |
If revision is set and is not a branch name (example: a commit oid), an RevisionNotFoundError is returned by the server. | |
Returns: The url of the commit of your model in the given repository. | |
""" | |
repo_url = create_repo(repo_id, private=private, exist_ok=True) | |
# Infer complete repo_id from repo_url | |
# Can be different from the input `repo_id` if repo_owner was implicit | |
_, repo_owner, repo_name = repo_type_and_id_from_hf_id(repo_url) | |
repo_id = f"{repo_owner}/{repo_name}" | |
# Check if README file already exist in repo | |
try: | |
get_hf_file_metadata(hf_hub_url(repo_id=repo_id, filename="README.md", revision=revision)) | |
has_readme = True | |
except EntryNotFoundError: | |
has_readme = False | |
with tempfile.TemporaryDirectory() as root_dir: | |
if subfolder is not None: | |
save_dir = os.path.join(root_dir, subfolder) | |
else: | |
save_dir = root_dir | |
# save config | |
self.save_config(save_dir) | |
# Add readme if does not exist | |
logger.info("README.md not found, adding the default README.md") | |
if not has_readme: | |
with open(os.path.join(root_dir, "README.md"), "w") as f: | |
f.write(f"---\nlibrary_name: ppdiffusers\n---\n# {repo_id}") | |
# Upload model and return | |
logger.info(f"Pushing to the {repo_id}. This might take a while") | |
return upload_folder( | |
repo_id=repo_id, | |
repo_type="model", | |
folder_path=root_dir, | |
commit_message=commit_message, | |
revision=revision, | |
create_pr=create_pr, | |
) | |
def from_config(cls, config: Union[FrozenDict, Dict[str, Any]] = None, return_unused_kwargs=False, **kwargs): | |
r""" | |
Instantiate a Python class from a config dictionary | |
Parameters: | |
config (`Dict[str, Any]`): | |
A config dictionary from which the Python class will be instantiated. Make sure to only load | |
configuration files of compatible classes. | |
return_unused_kwargs (`bool`, *optional*, defaults to `False`): | |
Whether kwargs that are not consumed by the Python class should be returned or not. | |
kwargs (remaining dictionary of keyword arguments, *optional*): | |
Can be used to update the configuration object (after it being loaded) and initiate the Python class. | |
`**kwargs` will be directly passed to the underlying scheduler/model's `__init__` method and eventually | |
overwrite same named arguments of `config`. | |
Examples: | |
```python | |
>>> from ppdiffusers import DDPMScheduler, DDIMScheduler, PNDMScheduler | |
>>> # Download scheduler from BOS and cache. | |
>>> scheduler = DDPMScheduler.from_pretrained("google/ddpm-cifar10-32") | |
>>> # Instantiate DDIM scheduler class with same config as DDPM | |
>>> scheduler = DDIMScheduler.from_config(scheduler.config) | |
>>> # Instantiate PNDM scheduler class with same config as DDPM | |
>>> scheduler = PNDMScheduler.from_config(scheduler.config) | |
``` | |
""" | |
# <===== TO BE REMOVED WITH DEPRECATION | |
# TODO(Patrick) - make sure to remove the following lines when config=="model_path" is deprecated | |
if "pretrained_model_name_or_path" in kwargs: | |
config = kwargs.pop("pretrained_model_name_or_path") | |
if config is None: | |
raise ValueError("Please make sure to provide a config as the first positional argument.") | |
# ======> | |
if not isinstance(config, dict): | |
deprecation_message = "It is deprecated to pass a pretrained model name or path to `from_config`." | |
if "Scheduler" in cls.__name__: | |
deprecation_message += ( | |
f"If you were trying to load a scheduler, please use {cls}.from_pretrained(...) instead." | |
" Otherwise, please make sure to pass a configuration dictionary instead. This functionality will" | |
" be removed in v1.0.0." | |
) | |
elif "Model" in cls.__name__: | |
deprecation_message += ( | |
f"If you were trying to load a model, please use {cls}.load_config(...) followed by" | |
f" {cls}.from_config(...) instead. Otherwise, please make sure to pass a configuration dictionary" | |
" instead. This functionality will be removed in v1.0.0." | |
) | |
deprecate("config-passed-as-path", "1.0.0", deprecation_message, standard_warn=False) | |
config, kwargs = cls.load_config(pretrained_model_name_or_path=config, return_unused_kwargs=True, **kwargs) | |
init_dict, unused_kwargs, hidden_dict = cls.extract_init_dict(config, **kwargs) | |
# Allow dtype to be specified on initialization | |
if "dtype" in unused_kwargs: | |
# (TODO junnyu, donot use dtype) | |
unused_kwargs.pop("dtype") | |
# init_dict["dtype"] = unused_kwargs.pop("dtype") | |
# add possible deprecated kwargs | |
for deprecated_kwarg in cls._deprecated_kwargs: | |
if deprecated_kwarg in unused_kwargs: | |
init_dict[deprecated_kwarg] = unused_kwargs.pop(deprecated_kwarg) | |
# Return model and optionally state and/or unused_kwargs | |
model = cls(**init_dict) | |
# make sure to also save config parameters that might be used for compatible classes | |
model.register_to_config(**hidden_dict) | |
# add hidden kwargs of compatible classes to unused_kwargs | |
unused_kwargs = {**unused_kwargs, **hidden_dict} | |
if return_unused_kwargs: | |
return (model, unused_kwargs) | |
else: | |
return model | |
def get_config_dict(cls, *args, **kwargs): | |
deprecation_message = ( | |
f" The function get_config_dict is deprecated. Please use {cls}.load_config instead. This function will be" | |
" removed in version v1.0.0" | |
) | |
deprecate("get_config_dict", "1.0.0", deprecation_message, standard_warn=False) | |
return cls.load_config(*args, **kwargs) | |
def load_config( | |
cls, pretrained_model_name_or_path: Union[str, os.PathLike], return_unused_kwargs=False, **kwargs | |
) -> Tuple[Dict[str, Any], Dict[str, Any]]: | |
r""" | |
Instantiate a Python class from a config dictionary | |
Parameters: | |
pretrained_model_name_or_path (`str` or `os.PathLike`, *optional*): | |
Can be either: | |
- A string, the *model id* of a model repo on huggingface.co. Valid model ids should have an | |
organization name, like `google/ddpm-celebahq-256`. | |
- A path to a *directory* containing model weights saved using [`~ConfigMixin.save_config`], e.g., | |
`./my_model_directory/`. | |
cache_dir (`Union[str, os.PathLike]`, *optional*): | |
Path to a directory in which a downloaded pretrained model configuration should be cached if the | |
standard cache should not be used. | |
output_loading_info(`bool`, *optional*, defaults to `False`): | |
Whether or not to also return a dictionary containing missing keys, unexpected keys and error messages. | |
subfolder (`str`, *optional*, defaults to `""`): | |
In case the relevant files are located inside a subfolder of the model repo (either remote in | |
huggingface.co or downloaded locally), you can specify the folder name here. | |
from_hf_hub (bool, *optional*): | |
Whether to load from Hugging Face Hub. Defaults to False | |
""" | |
from_hf_hub = kwargs.pop("from_hf_hub", False) | |
if from_hf_hub: | |
cache_dir = kwargs.pop("cache_dir", HF_CACHE) | |
else: | |
cache_dir = kwargs.pop("cache_dir", PPDIFFUSERS_CACHE) | |
subfolder = kwargs.pop("subfolder", None) | |
pretrained_model_name_or_path = str(pretrained_model_name_or_path) | |
if cls.config_name is None: | |
raise ValueError( | |
"`self.config_name` is not defined. Note that one should not load a config from " | |
"`ConfigMixin`. Please make sure to define `config_name` in a class inheriting from `ConfigMixin`" | |
) | |
if os.path.isfile(pretrained_model_name_or_path): | |
config_file = pretrained_model_name_or_path | |
elif os.path.isdir(pretrained_model_name_or_path): | |
if os.path.isfile(os.path.join(pretrained_model_name_or_path, cls.config_name)): | |
# Load from a Paddle checkpoint | |
config_file = os.path.join(pretrained_model_name_or_path, cls.config_name) | |
elif subfolder is not None and os.path.isfile( | |
os.path.join(pretrained_model_name_or_path, subfolder, cls.config_name) | |
): | |
config_file = os.path.join(pretrained_model_name_or_path, subfolder, cls.config_name) | |
else: | |
raise EnvironmentError( | |
f"Error no file named {cls.config_name} found in directory {pretrained_model_name_or_path}." | |
) | |
elif from_hf_hub: | |
config_file = hf_hub_download( | |
repo_id=pretrained_model_name_or_path, | |
filename=cls.config_name, | |
cache_dir=cache_dir, | |
subfolder=subfolder, | |
library_name="PPDiffusers", | |
library_version=__version__, | |
) | |
else: | |
try: | |
config_file = ppdiffusers_bos_download( | |
pretrained_model_name_or_path, | |
filename=cls.config_name, | |
subfolder=subfolder, | |
cache_dir=cache_dir, | |
) | |
except HTTPError as err: | |
raise EnvironmentError( | |
"There was a specific connection error when trying to load" | |
f" {pretrained_model_name_or_path}:\n{err}" | |
) | |
except ValueError: | |
raise EnvironmentError( | |
f"We couldn't connect to '{DOWNLOAD_SERVER}' to load this model, couldn't find it" | |
f" in the cached files and it looks like {pretrained_model_name_or_path} is not the path to a" | |
f" directory containing a {cls.config_name} file.\nCheckout your internet connection or see how to" | |
" run the library in offline mode at" | |
" 'https://huggingface.co/docs/diffusers/installation#offline-mode'." | |
) | |
except EnvironmentError: | |
raise EnvironmentError( | |
f"Can't load config for '{pretrained_model_name_or_path}'. If you were trying to load it from " | |
"'https://huggingface.co/models', make sure you don't have a local directory with the same name. " | |
f"Otherwise, make sure '{pretrained_model_name_or_path}' is the correct path to a directory " | |
f"containing a {cls.config_name} file" | |
) | |
try: | |
# Load config dict | |
config_dict = cls._dict_from_json_file(config_file) | |
except (json.JSONDecodeError, UnicodeDecodeError): | |
raise EnvironmentError(f"It looks like the config file at '{config_file}' is not a valid JSON file.") | |
if return_unused_kwargs: | |
return config_dict, kwargs | |
return config_dict | |
def _get_init_keys(cls): | |
return set(dict(inspect.signature(cls.__init__).parameters).keys()) | |
def extract_init_dict(cls, config_dict, **kwargs): | |
# 0. Copy origin config dict | |
original_dict = {k: v for k, v in config_dict.items()} | |
# 1. Retrieve expected config attributes from __init__ signature | |
expected_keys = cls._get_init_keys(cls) | |
expected_keys.remove("self") | |
# remove general kwargs if present in dict | |
if "kwargs" in expected_keys: | |
expected_keys.remove("kwargs") | |
# 2. Remove attributes that cannot be expected from expected config attributes | |
# remove keys to be ignored | |
if len(cls.ignore_for_config) > 0: | |
expected_keys = expected_keys - set(cls.ignore_for_config) | |
# load ppdiffusers library to import compatible and original scheduler | |
ppdiffusers_library = importlib.import_module(__name__.split(".")[0]) | |
if cls.has_compatibles: | |
compatible_classes = [c for c in cls._get_compatibles() if not isinstance(c, DummyObject)] | |
else: | |
compatible_classes = [] | |
expected_keys_comp_cls = set() | |
for c in compatible_classes: | |
expected_keys_c = cls._get_init_keys(c) | |
expected_keys_comp_cls = expected_keys_comp_cls.union(expected_keys_c) | |
expected_keys_comp_cls = expected_keys_comp_cls - cls._get_init_keys(cls) | |
config_dict = {k: v for k, v in config_dict.items() if k not in expected_keys_comp_cls} | |
# remove attributes from orig class that cannot be expected | |
orig_cls_name = config_dict.pop("_class_name", cls.__name__) | |
if orig_cls_name != cls.__name__ and hasattr(ppdiffusers_library, orig_cls_name): | |
orig_cls = getattr(ppdiffusers_library, orig_cls_name) | |
unexpected_keys_from_orig = cls._get_init_keys(orig_cls) - expected_keys | |
config_dict = {k: v for k, v in config_dict.items() if k not in unexpected_keys_from_orig} | |
# remove private attributes | |
config_dict = {k: v for k, v in config_dict.items() if not k.startswith("_")} | |
# 3. Create keyword arguments that will be passed to __init__ from expected keyword arguments | |
init_dict = {} | |
for key in expected_keys: | |
# if config param is passed to kwarg and is present in config dict | |
# it should overwrite existing config dict key | |
if key in kwargs and key in config_dict: | |
config_dict[key] = kwargs.pop(key) | |
if key in kwargs: | |
# overwrite key | |
init_dict[key] = kwargs.pop(key) | |
elif key in config_dict: | |
# use value from config dict | |
init_dict[key] = config_dict.pop(key) | |
# 4. Give nice warning if unexpected values have been passed | |
if len(config_dict) > 0: | |
logger.warning( | |
f"The config attributes {config_dict} were passed to {cls.__name__}, " | |
"but are not expected and will be ignored. Please verify your " | |
f"{cls.config_name} configuration file." | |
) | |
# 5. Give nice info if config attributes are initiliazed to default because they have not been passed | |
passed_keys = set(init_dict.keys()) | |
if len(expected_keys - passed_keys) > 0: | |
logger.info( | |
f"{expected_keys - passed_keys} was not found in config. Values will be initialized to default values." | |
) | |
# 6. Define unused keyword arguments | |
unused_kwargs = {**config_dict, **kwargs} | |
# 7. Define "hidden" config parameters that were saved for compatible classes | |
hidden_config_dict = {k: v for k, v in original_dict.items() if k not in init_dict} | |
return init_dict, unused_kwargs, hidden_config_dict | |
def _dict_from_json_file(cls, json_file: Union[str, os.PathLike]): | |
with open(json_file, "r", encoding="utf-8") as reader: | |
text = reader.read() | |
return json.loads(text) | |
def __repr__(self): | |
return f"{self.__class__.__name__} {self.to_json_string()}" | |
def config(self) -> Dict[str, Any]: | |
""" | |
Returns the config of the class as a frozen dictionary | |
Returns: | |
`Dict[str, Any]`: Config of the class. | |
""" | |
return self._internal_dict | |
def to_json_string(self) -> str: | |
""" | |
Serializes this instance to a JSON string. | |
Returns: | |
`str`: String containing all the attributes that make up this configuration instance in JSON format. | |
""" | |
config_dict = self._internal_dict if hasattr(self, "_internal_dict") else {} | |
config_dict["_class_name"] = self.__class__.__name__ | |
config_dict["_ppdiffusers_version"] = __version__ | |
def to_json_saveable(value): | |
if isinstance(value, np.ndarray): | |
value = value.tolist() | |
return value | |
config_dict = {k: to_json_saveable(v) for k, v in config_dict.items()} | |
return json.dumps(config_dict, indent=2, sort_keys=True) + "\n" | |
def to_json_file(self, json_file_path: Union[str, os.PathLike]): | |
""" | |
Save this instance to a JSON file. | |
Args: | |
json_file_path (`str` or `os.PathLike`): | |
Path to the JSON file in which this configuration instance's parameters will be saved. | |
""" | |
with open(json_file_path, "w", encoding="utf-8") as writer: | |
writer.write(self.to_json_string()) | |
def register_to_config(init): | |
r""" | |
Decorator to apply on the init of classes inheriting from [`ConfigMixin`] so that all the arguments are | |
automatically sent to `self.register_for_config`. To ignore a specific argument accepted by the init but that | |
shouldn't be registered in the config, use the `ignore_for_config` class variable | |
Warning: Once decorated, all private arguments (beginning with an underscore) are trashed and not sent to the init! | |
""" | |
def inner_init(self, *args, **kwargs): | |
# Ignore private kwargs in the init. | |
init_kwargs = {k: v for k, v in kwargs.items() if not k.startswith("_")} | |
config_init_kwargs = {k: v for k, v in kwargs.items() if k.startswith("_")} | |
if not isinstance(self, ConfigMixin): | |
raise RuntimeError( | |
f"`@register_for_config` was applied to {self.__class__.__name__} init method, but this class does " | |
"not inherit from `ConfigMixin`." | |
) | |
ignore = getattr(self, "ignore_for_config", []) | |
# Get positional arguments aligned with kwargs | |
new_kwargs = {} | |
signature = inspect.signature(init) | |
parameters = { | |
name: p.default for i, (name, p) in enumerate(signature.parameters.items()) if i > 0 and name not in ignore | |
} | |
for arg, name in zip(args, parameters.keys()): | |
new_kwargs[name] = arg | |
# Then add all kwargs | |
new_kwargs.update( | |
{ | |
k: init_kwargs.get(k, default) | |
for k, default in parameters.items() | |
if k not in ignore and k not in new_kwargs | |
} | |
) | |
new_kwargs = {**config_init_kwargs, **new_kwargs} | |
getattr(self, "register_to_config")(**new_kwargs) | |
init(self, *args, **init_kwargs) | |
return inner_init | |