o
    eiWI                  	   @   s  U d dl Z d dlZd dlmZmZ d dlmZ d dlmZm	Z	 d dl
Z
d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d
dlmZmZmZmZmZmZmZ d
dl m!Z!m"Z"m#Z#m$Z$m%Z%m&Z& d
dl'm(Z(m)Z) dgZ*e+e, e-d< de#de#de.fddZ/de+e# de.fddZ0de+e# de+e# de+e# fddZ1de
j2defddZ3dedefdd Z4d!edede$fd"d#Z5d$e,dede%fd%d&Z6d$e,d!edede%fd'd(Z7d$e,de
j2de%fd)d*Z8d$e,d+efd,d-Z9d.d/ Z:d0d1 Z;d$e,d2ed3e+e de+e" fd4dZ<d5ede#fd6d7Z=d$e,d8ede+e% fd9d:Z>dedefd;d<Z?de
j2de+e fd=d>Z@d$e,d?ed@ede+e" fdAdBZAd5eBe,ef defdCdDZCdEedFedGedHefdIdJZDdS )K    N)bisect_rightinsort)Callable)Anycast)_get_device_module)ShardMetadata)ShardedTensor)DTensor)%compute_local_shape_and_global_offset   )BytesStorageMetadataChunkStorageMetadataMetadataIndexSTATE_DICT_TYPESTORAGE_TYPESTensorPropertiesTensorStorageMetadata)LoadItemTypeReadItemSavePlanTensorWriteData	WriteItemWriteItemType)"_check_shard_metadata_pair_overlap+_shards_get_overlap_region_wrt_saved_tensor create_read_items_for_chunk_list__all__plan
other_planreturnc           
      C   s  | j |j krdS t| jt|jkrdS t| j|jD ]j\}}|j|jkr( dS |j}|j}|j|jks@|j|jks@|j|jkrC dS |j}|j}|rM|rQ|sT|rT dS |r|r|j	|j	kra dS |j
}|j
}	|rk|	ro|sr|	rr dS |r|	r|j|	jks|j|	jkr dS qdS )a  
    Compare the two Save plans and return True if they are equal.

    Args:
        plan (SavePlan): First SavePlan to compare.
        other_plan (SavePlan): Second SavePlan to compare.

    Returns:
       True if the two plans are equal, False otherwise.
    FT)usablelenitemsziptypeindexfqnoffsettensor_datasizechunkoffsetssizes)
r   r   	plan_itemother_plan_itemplan_metadata_indexother_plan_metadata_indexr)   other_tensor_datar+   other_chunk r4   v/var/www/addictedbytheproject.nl/epg/venv/lib/python3.10/site-packages/torch/distributed/checkpoint/planner_helpers.py_compare_save_plans*   sD   r6   delta_plansc                 C   s   t dd | D S )z
    Check if any delta plan is usable, indicating the plan has changed.

    Args:
        delta_plans (List[SavePlan]): A list of delta plans to check.
    Returns:
        True if any delta plan is usable, False otherwise.
    c                 s   s    | ]}|o|j V  qd S N)r!   ).0
delta_planr4   r4   r5   	<genexpr>t   s    z(_contains_usable_plan.<locals>.<genexpr>)any)r7   r4   r4   r5   _contains_usable_plank   s   	r=   cached_plansc                 C   s<   g }t | |D ]\}}|r|js|| q|| q|S )ac  
    Merge a list of delta plans into a single plan.

    Args:
        cached_plans (List[SavePlan]): A list of cached plans.
        delta_plans (List[SavePlan]): A list of delta plans to merge. It can contain empty plans

    Returns:
        A single merged plan. If a delta plan is not usable, use the cached plan. Otherwise, use the delta plan.
    )r$   r!   append)r>   r7   merged_planscached_planr:   r4   r4   r5   _merge_delta_local_plansw   s   
rB   tensorc                 C   s$   t tdgt|   |  dS )Nr   r,   r-   )r   torchSizer"   r*   )rC   r4   r4   r5   _create_chunk_from_tensor   s   rG   shard_mdc                 C   s   t t| jt| jdS NrD   )r   rE   rF   shard_offsetsshard_sizes)rH   r4   r4   r5   _chunk_for_shard   s   

rL   sharded_tensorc                 C   s>   |   j}t|j|j|j|j|jd}tt	|||   j
dS )N)dtypelayoutrequires_gradmemory_format
pin_memoryr+   
propertiesr*   )metadatatensor_propertiesr   rN   rO   rP   rQ   rR   r   rL   r*   )rM   rH   shard_propertiesrT   r4   r4   r5   _sharded_tensor_metadata   s   
rX   r'   c              	   C   sb   t |j|j|j\}}t|t|}}tt| |tj	t
t||dt| | ddS )NrD   rS   r&   r%   r)   )r   shapedevice_mesh
placementsrE   rF   r   r   r   SHARDr   r   r   create_from_tensorto_localr*   )r'   rC   r-   r,   r4   r4   r5   _create_write_items_for_dtensor   s    r`   c                 C   s(   t |j}tt| |tjt||dS )NrY   )rE   rF   rJ   r   r   r   r]   rX   )r'   rM   rH   r,   r4   r4   r5   _create_write_item_for_shard   s   ra   c                 C   sN   t dgt|  }tt| |tjtt	|| dt
|| ddS )Nr   rD   rS   rY   )rE   rF   r"   r*   r   r   r   TENSORr   r   r   r^   )r'   rC   r,   r4   r4   r5   _create_write_item_for_tensor   s   rc   bytesc                 C   s   t t| tjdS )N)r&   r%   )r   r   r   BYTE_IO)r'   rd   r4   r4   r5   _create_write_item_for_bytesio   s   rf   c              	   C   s.   t tj| t|f|t|ft|fdS N)r%   
dest_indexdest_offsetsstorage_indexstorage_offsetslengths)r   r   re   rE   rF   rh   dest_offsetrj   storage_offsetlengthr4   r4   r5   _create_read_item_for_byteio   s   


rq   c              	   C   s(   t tj| t||t|t|dS rg   )r   r   rb   rE   rF   rh   ri   rj   rk   rl   r4   r4   r5   _create_read_item_for_tensor   s   rs   checkpoint_mdlocal_chunksc                    s  g }|j }|r	|s|S t|d j}d|dkr8d}t|D ] t fddt||D }||kr7|} q|dkrKdgt| dgt| nfdd|D fdd|D ttt|fd	d
d}ttt|fdd
d}	g }
d}t|}|	D ]}|| }| \}}t|
|df}|r|
d|= ||k r|| }|| }| \}}||krnt	|
||f |d7 }||k s|
D ]F\}}|| }t
||sqg }g }g }t||dD ]\}}}}|| || || q|tt| |j||t| |j|||d qq|S )aW  
    Create a list of ``ReadItem`` based on the checkpoint and local chunks.

    This applies the resharding algorithm and computes the reads needed
    to satisfy ``local_chunks`` with a checkpoint described by ``checkpoint_md``.

    Args:
        fqn (str) : The state_dict FQN to pass to ``ReadItem``.
        checkpoint_md (TensorStorageMetadata): metadata for a given tensor
            from a checkpoint.
        local_chunks (List[ChunkStorageMetadata]): Local chunks that needs to be
            loaded.

    Returns:
        A list of ``ReadItem`` that will satisfy all input chunks.
    r   r   c                 3   s$    | ]}|j   |j   V  qd S r8   rD   )r9   r+   )dimr4   r5   r;      s
    
z3create_read_items_for_chunk_list.<locals>.<genexpr>)r   r   c                    *   g | ]}|j   |j   |j   fqS r4   rD   r9   c	sweep_dimr4   r5   
<listcomp>.      z4create_read_items_for_chunk_list.<locals>.<listcomp>c                    rw   r4   rD   rx   rz   r4   r5   r|   2  r}   c                        |  d S Nr   r4   idx)saved_boundsr4   r5   <lambda>9      z2create_read_items_for_chunk_list.<locals>.<lambda>)keyc                    r~   r   r4   r   )local_boundsr4   r5   r   =  r   N)saved_shardcurrent_shardrr   )chunksr"   r,   rangemax	itertoolschainsortedr   r   r   r   r?   rs   r   )r'   rt   ru   
read_itemssaved_chunksnum_dimsmax_sizedim_sizesaved_sorted_indiceslocal_sorted_indicesactive_saved	saved_ptr	num_saved	local_idxlocal_chunklocal_start	local_endcutoffstorage_idxstorage_chunksaved_start	saved_end_rk   ri   rl   _dimoffset_for_saved_tensoroffset_for_current_tensorrp   r4   )rv   r   r   r{   r5   r      s   











state_dictc                    s   g }|   D ]?\ ttr|t  qttr.| fdd jD  qtt	j
r=|t  q|t  qt|S )Nc                 3   s    | ]	}t  |V  qd S r8   )ra   )r9   rH   r'   objr4   r5   r;     s
    

z5_create_default_metadata_only_plan.<locals>.<genexpr>)r#   
isinstancer
   r?   r`   r	   extendrU   shards_metadatarE   Tensorrc   rf   r   )r   requestsr4   r   r5   "_create_default_metadata_only_plany  s   


r   objectc                    s\   t dr S ttr fdd D S ttjr(t gS t gS )N__create_write_items__c                    s   g | ]	}t  |jqS r4   )ra   rU   r9   shardr'   r   r4   r5   r|         z'_create_write_items.<locals>.<listcomp>)	hasattrr   r   r	   local_shardsrE   r   rc   rf   r   r4   r   r5   _create_write_items  s   

r   c                 C   s8   t | j| j| j\}}t|t|}}t||dS rI   )r   rZ   r[   r\   rE   rF   r   )rC   r-   r,   r4   r4   r5   _create_chunk_from_dtensor  s   r   c                 C   sb   t | dr|  }|S t| trdd |  D }|S t| tjr(t| g}|S tdt	|  )N__create_chunk_list__c                 S   s   g | ]}t |jqS r4   )rL   rU   r   r4   r4   r5   r|     s    
z&_create_chunk_list.<locals>.<listcomp>zMUnsupported Type, expecting one of [Tensor, DTensor, ShardedTensor] ,but got )
r   r   r   r	   r   rE   r   rG   
ValueErrorr%   )rC   ru   r4   r4   r5   _create_chunk_list  s    


r   mdr   c              
   C   sx   t |ts.zt|}W n ty' } ztd|  ddt|  |d }~ww t| ||S tt| dt| dddgS )Nz Invalid checkpoint metadata for z, z(expected BytesStorageMetadata but found r   rm   )r   r   r   r   r%   r   rq   r   )r'   r   r   ru   exr4   r4   r5   _create_read_items  s,   

r   c                 C   s>   dt fdd}dtfdd}dtjfdd}t| ||| dS )	zP
    Initializes meta tensor if the meta tensor is DTensor or torch.Tensor.
    valuec                 S   st   t | dd }|tdkr8tj j}ttjt|	 }tj
|  |d}tj|| j| j|  |  d}|S | S )Ndevicemetar   )r[   r\   rZ   stride)getattrrE   r   distdistributed_c10d_get_pg_default_devicer%   r   r   current_device
empty_liker_   r
   
from_localr[   r\   r*   r   )r   r   device_typenew_local_tensordtensorr4   r4   r5   dtensor_func  s    z&_init_state_dict.<locals>.dtensor_funcc                 S   s2   t | dd }|tdkrtdt|  d| S )Nr   r   zFound unsupported type z for meta device loading.)r   rE   r   RuntimeErrorr%   )r   r   r4   r4   r5   sharded_tensor_func  s   z-_init_state_dict.<locals>.sharded_tensor_funcc                 S   sP   t | dd }|tdkr&tj j}ttjt|	 }tj
| |d}|S | S )Nr   r   r   )r   rE   r   r   r   r   r%   r   r   r   r   )r   r   r   rC   r4   r4   r5   tensor_func  s   z%_init_state_dict.<locals>.tensor_funcN)r
   r   rE   r   _iterate_state_dict)r   r   r   r   r4   r4   r5   _init_state_dict  s   	r   iter_objectr   r   r   c                    s   t | tr	 | S t | tr| S t | tjr| S t | ttttt	j
fs+| du r-| S t | trF|  D ]\}}t| | |< q6| S t | ttfrc fdd| D }t | trat|}|S dS )a$  
    Iterate through the state dict, applying the given functions to each tensor type
    and update the state dict in place.

    Args:
        iter_object (Any): the target state_dict.
        sharded_tensor_func (Callable): the function to apply to ShardedTensor
        dtensor_func (Callable): the function to apply to DTensor
        tensor_func (Callable): the function to apply to Tensor

    # TODO: let state_dict_util._iterate_state_dict() to support in place option
    so we don't need to have two versions of _iterate_state_dict.
    Nc                    s   g | ]	}t | qS r4   )r   )r9   vr   r   r   r4   r5   r|   +  r   z'_iterate_state_dict.<locals>.<listcomp>)r   r
   r	   rE   r   intfloatstrrd   ioBytesIOdictr#   r   listtuple)r   r   r   r   r   r   retr4   r   r5   r     s0   




r   )Er   r   bisectr   r   collections.abcr   typingr   r   rE   torch.distributeddistributedr   torch._utilsr   !torch.distributed._shard.metadatar   'torch.distributed._shard.sharded_tensorr	   torch.distributed.tensorr
   torch.distributed.tensor._utilsr   rU   r   r   r   r   r   r   r   plannerr   r   r   r   r   r   
reshardingr   r   r   r   r   __annotations__boolr6   r=   rB   r   rG   rL   rX   r`   ra   rc   rf   rq   rs   r   r   r   r   r   r   r   r   r   r4   r4   r4   r5   <module>   s   
$ 	A



{7