Server : LiteSpeed System : Linux in-mum-web1333.main-hosting.eu 4.18.0-553.37.1.lve.el8.x86_64 #1 SMP Mon Feb 10 22:45:17 UTC 2025 x86_64 User : u141265441 ( 141265441) PHP Version : 8.4.3 Disable Function : system, exec, shell_exec, passthru, mysql_list_dbs, ini_alter, dl, symlink, link, chgrp, leak, popen, apache_child_terminate, virtual, mb_send_mail Directory : /proc/self/root/opt/gsutil/gslib/vendored/boto/tests/integration/s3/ |
# -*- coding: utf-8 -*-
# Copyright (c) 2011 Mitch Garnaat http://garnaat.org/
# All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
"""
Some unit tests for the S3 MultiPartUpload
"""
# Note:
# Multipart uploads require at least one part. If you upload
# multiple parts then all parts except the last part has to be
# bigger than 5M. Hence we just use 1 part so we can keep
# things small and still test logic.
import os
import unittest
import time
from boto.compat import StringIO
import mock
import boto
from boto.s3.connection import S3Connection
class S3MultiPartUploadTest(unittest.TestCase):
s3 = True
def setUp(self):
self.conn = S3Connection(is_secure=False)
self.bucket_name = 'multipart-%d' % int(time.time())
self.bucket = self.conn.create_bucket(self.bucket_name)
def tearDown(self):
for key in self.bucket:
key.delete()
self.bucket.delete()
def test_abort(self):
key_name = u"テスト"
mpu = self.bucket.initiate_multipart_upload(key_name)
mpu.cancel_upload()
def test_complete_ascii(self):
key_name = "test"
mpu = self.bucket.initiate_multipart_upload(key_name)
fp = StringIO("small file")
mpu.upload_part_from_file(fp, part_num=1)
fp.close()
cmpu = mpu.complete_upload()
self.assertEqual(cmpu.key_name, key_name)
self.assertNotEqual(cmpu.etag, None)
def test_complete_japanese(self):
key_name = u"テスト"
mpu = self.bucket.initiate_multipart_upload(key_name)
fp = StringIO("small file")
mpu.upload_part_from_file(fp, part_num=1)
fp.close()
cmpu = mpu.complete_upload()
self.assertEqual(cmpu.key_name, key_name)
self.assertNotEqual(cmpu.etag, None)
def test_list_japanese(self):
key_name = u"テスト"
mpu = self.bucket.initiate_multipart_upload(key_name)
rs = self.bucket.list_multipart_uploads()
# New bucket, so only one upload expected
lmpu = next(iter(rs))
self.assertEqual(lmpu.id, mpu.id)
self.assertEqual(lmpu.key_name, key_name)
# Abort using the one returned in the list
lmpu.cancel_upload()
def test_list_multipart_uploads(self):
key_name = u"テスト"
mpus = []
mpus.append(self.bucket.initiate_multipart_upload(key_name))
mpus.append(self.bucket.initiate_multipart_upload(key_name))
rs = self.bucket.list_multipart_uploads()
# uploads (for a key) are returned in time initiated asc order
for lmpu in rs:
ompu = mpus.pop(0)
self.assertEqual(lmpu.key_name, ompu.key_name)
self.assertEqual(lmpu.id, ompu.id)
self.assertEqual(0, len(mpus))
def test_get_all_multipart_uploads(self):
key1 = 'a'
key2 = 'b/c'
mpu1 = self.bucket.initiate_multipart_upload(key1)
mpu2 = self.bucket.initiate_multipart_upload(key2)
rs = self.bucket.get_all_multipart_uploads(prefix='b/', delimiter='/')
for lmpu in rs:
# only expect upload for key2 (mpu2) returned
self.assertEqual(lmpu.key_name, mpu2.key_name)
self.assertEqual(lmpu.id, mpu2.id)
def test_four_part_file(self):
key_name = "k"
contents = "01234567890123456789"
sfp = StringIO(contents)
# upload 20 bytes in 4 parts of 5 bytes each
mpu = self.bucket.initiate_multipart_upload(key_name)
mpu.upload_part_from_file(sfp, part_num=1, size=5)
mpu.upload_part_from_file(sfp, part_num=2, size=5)
mpu.upload_part_from_file(sfp, part_num=3, size=5)
mpu.upload_part_from_file(sfp, part_num=4, size=5)
sfp.close()
etags = {}
pn = 0
for part in mpu:
pn += 1
self.assertEqual(5, part.size)
etags[pn] = part.etag
self.assertEqual(pn, 4)
# etags for 01234
self.assertEqual(etags[1], etags[3])
# etags for 56789
self.assertEqual(etags[2], etags[4])
# etag 01234 != etag 56789
self.assertNotEqual(etags[1], etags[2])
# parts are too small to compete as each part must
# be a min of 5MB so so we'll assume that is enough
# testing and abort the upload.
mpu.cancel_upload()
# mpu.upload_part_from_file() now returns the uploaded part
# which makes the etag available. Confirm the etag is
# available and equal to the etag returned by the parts list.
def test_etag_of_parts(self):
key_name = "etagtest"
mpu = self.bucket.initiate_multipart_upload(key_name)
fp = StringIO("small file")
# upload 2 parts and save each part
uparts = []
uparts.append(mpu.upload_part_from_file(fp, part_num=1, size=5))
uparts.append(mpu.upload_part_from_file(fp, part_num=2))
fp.close()
# compare uploaded parts etag to listed parts
pn = 0
for lpart in mpu:
self.assertEqual(uparts[pn].etag, lpart.etag)
pn += 1
# Can't complete 2 small parts so just clean up.
mpu.cancel_upload()
class S3MultiPartUploadSigV4Test(unittest.TestCase):
s3 = True
def setUp(self):
self.env_patch = mock.patch('os.environ', {'S3_USE_SIGV4': True})
self.env_patch.start()
self.conn = boto.s3.connect_to_region('us-west-2')
self.bucket_name = 'multipart-%d' % int(time.time())
self.bucket = self.conn.create_bucket(self.bucket_name,
location='us-west-2')
def tearDown(self):
for key in self.bucket:
key.delete()
self.bucket.delete()
self.env_patch.stop()
def test_initiate_multipart(self):
key_name = "multipart"
multipart_upload = self.bucket.initiate_multipart_upload(key_name)
multipart_uploads = self.bucket.get_all_multipart_uploads()
for upload in multipart_uploads:
# Check that the multipart upload was created.
self.assertEqual(upload.key_name, multipart_upload.key_name)
self.assertEqual(upload.id, multipart_upload.id)
multipart_upload.cancel_upload()
def test_upload_part_by_size(self):
key_name = "k"
contents = "01234567890123456789"
sfp = StringIO(contents)
# upload 20 bytes in 4 parts of 5 bytes each
mpu = self.bucket.initiate_multipart_upload(key_name)
mpu.upload_part_from_file(sfp, part_num=1, size=5)
mpu.upload_part_from_file(sfp, part_num=2, size=5)
mpu.upload_part_from_file(sfp, part_num=3, size=5)
mpu.upload_part_from_file(sfp, part_num=4, size=5)
sfp.close()
etags = {}
pn = 0
for part in mpu:
pn += 1
self.assertEqual(5, part.size)
etags[pn] = part.etag
self.assertEqual(pn, 4)
# etags for 01234
self.assertEqual(etags[1], etags[3])
# etags for 56789
self.assertEqual(etags[2], etags[4])
# etag 01234 != etag 56789
self.assertNotEqual(etags[1], etags[2])
# parts are too small to complete as each part must
# be a min of 5MB so so we'll assume that is enough
# testing and abort the upload.
mpu.cancel_upload()