
    *                        S r SSKrSSKrSSKrSSKJr  SSKJr  SSKJr	  SSK
Jr  SSK
Jr  SSKJr  SSKJr  SSKJr  SS	KJr  SS
KJr  SSKrSrSr\R2                  R4                  R6                  R8                  R;                  \5        \R2                  R4                  R6                  R<                  R;                  \5        SS jrSS jr S r!SS jr"SS jr#SS jr$S r% " S S\&5      r' " S S\&5      r(g)zHelpers for accessing GCS.

Bulk object uploads and downloads use methods that shell out to gsutil.
Lightweight metadata / streaming operations use the StorageClient class.
    N)
exceptions)transfer)storage_api)storage_util)apis)log)
properties)	resourcesgs<   c                     [         R                  R                  R                  R	                  5       nU(       a  [        X5        g [        XUS9  g )N)storage_client)r	   VALUESstorage
use_gsutilGetBool_UploadGsutil_UploadStorageClient)filesdestinationr   r   s       6lib/googlecloudsdk/api_lib/dataproc/storage_helpers.pyUploadr   5   s9       ((33;;=*%%NK    c           	         U=(       d    [         R                  " 5       nU  Hr  n[        R                  R	                  U5      n[        R                  R                  X5      n[        R                  R                  U5      n UR                  XG5        Mt     g! [        R                   a:  n[        R                  " SR                  SR                  U 5      X5      5      eSnAff = f)zUpload a list of local files to GCS.

Args:
  files: The list of local files to upload.
  destination: A GCS "directory" to copy the files into.
  storage_client: Storage api client used to copy files to gcs.
z)Failed to upload files ['{}'] to '{}': {}', 'N)r   StorageClientospathbasenamejoinr   ObjectReferenceFromUrlCopyFileToGCSr   BadFileExceptiondp_exceptionsFileUploadErrorformat)	r   r   r   clientfile_to_upload	file_namedest_urldest_objecterrs	            r   r   r   ?   s     8[668&n  0Iww||K3H..66x@K5>7  && 5))
5
<
<kk% +45 55s   BC&,5C!!C&c                     U nX!/-  n[         R                  " SU5      nUS:w  a5  [        R                  " SR	                  SR                  U 5      U5      5      eg)zUpload a list of local files to GCS.

Args:
  files: The list of local files to upload.
  destination: A GCS "directory" to copy the files into.
cpr   z5Failed to upload files ['{0}'] to '{1}' using gsutil.r   N)r   RunGsutilCommandr%   r&   r'   r    )r   r   args	exit_codes       r   r   r   T   s`     
$-$++D$7)!^

'
'?FFKK	-. . r   c                     U=(       d    [         R                  " 5       n UR                  U 5      $ ! [         R                   a     gf = f)zGets a bucket if it exists.

Args:
  bucket: The bucket name.
  storage_client: Storage client instance.

Returns:
  A bucket message, or None if it doesn't exist.
N)r   r   	GetBucketBucketNotFoundError)bucketr   r(   s      r   r4   r4   d   sE     8[668&F##		(	( s   1 AAc                 `    U=(       d    [         R                  " 5       nUR                  XUS9  g)a[  Creates a bucket.

Creates a bucket in the specified region. If the region is None, the bucket
will be created in global region.

Args:
  bucket: Name of bucket to create.
  region: Region to create bucket in.
  storage_client: Storage client instance.
  project: The project to create the bucket in. If None, current Cloud SDK
  project is used.
)locationprojectN)r   r   CreateBucketIfNotExists)r6   regionr   r9   r(   s        r   r:   r:   v   s*     8[668&  ' Jr   c                 d   U=(       d    [         R                  " 5       n[        R                  R	                  U 5      n UR                  U5      n[        R                  " USS9nUR                  5       $ ! [        R                   a&    [        R                  " SR                  U 5      5      ef = f)a9  Reads an object's content from GCS.

Args:
  object_url: The URL of the object to be read. Must have "gs://" prefix.
  storage_client: Storage api client used to read files from gcs.

Raises:
  ObjectReadError:
    If the read of GCS object is not successful.

Returns:
  A str for the content of the GCS object.
zutf-8)encodingzFailed to read file '{0}'.)r   r   r   r!   r"   
ReadObjectioTextIOWrapperreadr   r$   r%   ObjectReadErrorr'   )
object_urlr   r(   
object_refbytes_iowrappers         r   r>   r>      s     8[668&++33J?*9  ,Hx':G<<>		$	$ 9

'
'$++J79 99s   5A5 5:B/c                     [         R                  R                  U 5      nUR                  UR                  UR
                  S9$ )z.Build an Object proto message from a GCS path.)r6   name)r
   REGISTRYParseStorageURLObjectr6   object)r   messagesresources      r   GetObjectRefrO      s3    //5(	hoo	FFr   c                   4    \ rS rSrSrS rS	S jrS rS rSr	g)
r      zMicro-client for accessing GCS.c                 t    [         R                  " SS5      U l        [         R                  " SS5      U l        g )Nr   v1)	core_apisGetClientInstancer(   GetMessagesModulerM   selfs    r   __init__StorageClient.__init__   s*    --i>DK//	4@DMr   Nc                     U R                   R                  UR                  UR                  S9n U R                  R
                  R                  X2S9$ ! [        R                   a     g f = f)N)r6   rL   )requestdownload)	rM   StorageObjectsGetRequestr6   rH   r(   objectsGetapitools_exceptionsHttpNotFoundError)rX   rD   r]   r\   s       r   
_GetObjectStorageClient._GetObject   se    mm44   5 :G[[  $$W$HH00 s   "A A*)A*c                 $    U R                  U5      $ )a	  Get the object metadata of a GCS object.

Args:
  object_ref: A proto message of the object to fetch. Only the bucket and
    name need be set.

Raises:
  HttpError:
    If the responses status is not 2xx or 404.

Returns:
  The object if it exists otherwise None.
)rc   )rX   rD   s     r   	GetObjectStorageClient.GetObject   s     ??:&&r   c                 t    [         R                  R                  XR                  SS9nU R	                  X#S9  U$ )a  Build an apitools Download from a stream and a GCS object reference.

Note: This will always succeed, but HttpErrors with downloading will be
  raised when the download's methods are called.

Args:
  stream: An Stream-like object that implements write(<string>) to write
    into.
  object_ref: A proto message of the object to fetch. Only the bucket and
    name need be set.

Returns:
  The download.
F)
total_sizeauto_transfer)r]   )r   Download
FromStreamsizerc   )rX   streamrD   r]   s       r   BuildObjectStreamStorageClient.BuildObjectStream   s<       ++??% , AHOOJO2Or   )r(   rM   N)
__name__
__module____qualname____firstlineno____doc__rY   rc   rf   ro   __static_attributes__ r   r   r   r      s    'A' r   r   c                   d    \ rS rSrSrSS jr\S 5       rS rS r	S r
\R                  4S	 jrS
rg)StorageObjectSeriesStream   zFI/O Stream-like class for communicating via a sequence of GCS objects.Nc                 j    Xl         U=(       d
    [        5       U l        SU l        SU l        SU l        g)a  Construct a StorageObjectSeriesStream for a specific gcs path.

Args:
  path: A GCS object prefix which will be the base of the objects used to
      communicate across the channel.
  storage_client: a StorageClient for accessing GCS.

Returns:
  The constructed stream.
Tr   N)
_base_pathr   _gcs_open_current_object_index_current_object_pos)rX   r   r   s      r   rY   "StorageObjectSeriesStream.__init__   s2     O1-/DIDJ "#D  !Dr   c                     U R                   $ )zWhether the stream is open.r   rW   s    r   openStorageObjectSeriesStream.open   s     ::r   c                     SU l         g)zClose the stream.FNr   rW   s    r   CloseStorageObjectSeriesStream.Close   s	    DJr   c                 <    U R                   (       d  [        S5      eg )NzI/O operation on closed stream.)r   
ValueErrorrW   s    r   _AssertOpen%StorageObjectSeriesStream._AssertOpen   s    99899 r   c                     SR                  U R                  U5      nU R                  R                  [	        X R                  R
                  5      5      $ )z!Get the ith object in the series.z{0}.{1:09d})r'   r}   r~   rf   rO   rM   )rX   ir   s      r   rc   $StorageObjectSeriesStream._GetObject  s<    3D99|D))2D2DEFFr   c                    U R                  5         SnSnUnX5:  Gaz  U R                  U R                  S-   5      nU(       a  U(       a&   U R                  U R                  5      nU(       d   U$ UR                  U R                  -
  nUS:  a$  [        SR                  UR                  5      5      eUR                  S:X  a  U R                  5          U$ XS-
  n	[        X5      n
U
S:  a`  U R                  R                  X5      nUR!                  U R                  U R                  U
-   S-
  5        U =R                  U
-  sl        X:-  nU=(       a    U R                  UR                  :H  nU(       a!  UnU =R                  S-  sl        SU l        GM}   U$ U$ ! [        R                   a"  n[
        R                  " SU5         SnAU$ SnAff = f)aO  Read from this stream into a writable.

Reads at most n bytes, or until it sees there is not a next object in the
series. This will block for the duration of each object's download,
and possibly indefinitely if new objects are being added to the channel
frequently enough.

Args:
  writable: The stream-like object that implements write(<string>) to
      write into.
  n: A maximum number of bytes to read. Defaults to sys.maxsize
    (usually ~4 GB).

Raises:
  ValueError: If the stream is closed or objects in the series are
    detected to shrink.

Returns:
  The number of bytes read.
r   N   zFailed to fetch GCS output:
%szObject [{0}] shrunk.)r   rc   r   ra   	HttpErrorr   warningrm   r   r   r'   rH   r   minr~   ro   GetRange)rX   writablen
bytes_readobject_infomax_bytes_to_readnext_object_infoerrornew_bytes_availablebytes_left_to_readnew_bytes_to_readr]   object_finisheds                r   ReadIntoWritable*StorageObjectSeriesStream.ReadIntoWritable  s   * 	JK

()C)Ca)GH ,	(B(BC+ 
L I (,,t/G/GG	q	 /66{7G7GHII			Q	

8 5 -90F	Q	99..xE$$$$'881<	> 	  $55 '

 
Kt77;;K;KK  
&""a'"#$  	:W #,, 	
++7
?
R W	s   
F G-G

G)r}   r   r   r~   r   rq   )rr   rs   rt   ru   rv   rY   propertyr   r   r   rc   sysmaxsizer   rw   rx   r   r   rz   rz      s=    N!*  :G
 *- Lr   rz   rq   )NN))rv   r?   r   r   apitools.base.pyr   ra   r   googlecloudsdk.api_lib.dataprocr%   googlecloudsdk.api_lib.storager   r   googlecloudsdk.api_lib.utilr   rT   googlecloudsdk.calliopegooglecloudsdk.corer   r	   r
   six.moves.urllib.parsesixSTORAGE_SCHEMEHTTP_TIMEOUTmovesurllibparseuses_relativeappenduses_netlocr   r   r   r4   r:   r>   rO   rL   r   rz   rx   r   r   <module>r      s     
 	 
 > % G 6 7 9 . # * )    		     $ $ + +N ; 		     " " ) ). 9L5*. $K"92G4F 4nv vr   