Added third-party typeshed stubs.

This commit is contained in:
Eric Traut 2020-05-22 09:37:00 -07:00
parent afa42826a3
commit 150971c55c
528 changed files with 26779 additions and 0 deletions

View File

@ -0,0 +1,19 @@
from typing import Any, Union, Text
from .blockalgo import BlockAlgo
__revision__: str
class AESCipher(BlockAlgo):
def __init__(self, key: Union[bytes, Text], *args, **kwargs) -> None: ...
def new(key: Union[bytes, Text], *args, **kwargs) -> AESCipher: ...
MODE_ECB: int
MODE_CBC: int
MODE_CFB: int
MODE_PGP: int
MODE_OFB: int
MODE_CTR: int
MODE_OPENPGP: int
block_size: int
key_size: int

View File

@ -0,0 +1,19 @@
from typing import Any, Union, Text
from .blockalgo import BlockAlgo
__revision__: str
class RC2Cipher(BlockAlgo):
def __init__(self, key: Union[bytes, Text], *args, **kwargs) -> None: ...
def new(key: Union[bytes, Text], *args, **kwargs) -> RC2Cipher: ...
MODE_ECB: int
MODE_CBC: int
MODE_CFB: int
MODE_PGP: int
MODE_OFB: int
MODE_CTR: int
MODE_OPENPGP: int
block_size: int
key_size: int

View File

@ -0,0 +1,15 @@
from typing import Any, Union, Text
__revision__: str
class ARC4Cipher:
block_size: int
key_size: int
def __init__(self, key: Union[bytes, Text], *args, **kwargs) -> None: ...
def encrypt(self, plaintext): ...
def decrypt(self, ciphertext): ...
def new(key: Union[bytes, Text], *args, **kwargs) -> ARC4Cipher: ...
block_size: int
key_size: int

View File

@ -0,0 +1,19 @@
from typing import Any, Union, Text
from .blockalgo import BlockAlgo
__revision__: str
class BlowfishCipher(BlockAlgo):
def __init__(self, key: Union[bytes, Text], *args, **kwargs) -> None: ...
def new(key: Union[bytes, Text], *args, **kwargs) -> BlowfishCipher: ...
MODE_ECB: int
MODE_CBC: int
MODE_CFB: int
MODE_PGP: int
MODE_OFB: int
MODE_CTR: int
MODE_OPENPGP: int
block_size: int
key_size: Any

View File

@ -0,0 +1,19 @@
from typing import Any, Union, Text
from .blockalgo import BlockAlgo
__revision__: str
class CAST128Cipher(BlockAlgo):
def __init__(self, key: Union[bytes, Text], *args, **kwargs) -> None: ...
def new(key: Union[bytes, Text], *args, **kwargs) -> CAST128Cipher: ...
MODE_ECB: int
MODE_CBC: int
MODE_CFB: int
MODE_PGP: int
MODE_OFB: int
MODE_CTR: int
MODE_OPENPGP: int
block_size: int
key_size: Any

View File

@ -0,0 +1,19 @@
from typing import Any, Union, Text
from .blockalgo import BlockAlgo
__revision__: str
class DESCipher(BlockAlgo):
def __init__(self, key: Union[bytes, Text], *args, **kwargs) -> None: ...
def new(key: Union[bytes, Text], *args, **kwargs) -> DESCipher: ...
MODE_ECB: int
MODE_CBC: int
MODE_CFB: int
MODE_PGP: int
MODE_OFB: int
MODE_CTR: int
MODE_OPENPGP: int
block_size: int
key_size: int

View File

@ -0,0 +1,20 @@
from typing import Any, Union, Text
from .blockalgo import BlockAlgo
__revision__: str
class DES3Cipher(BlockAlgo):
def __init__(self, key: Union[bytes, Text], *args, **kwargs) -> None: ...
def new(key: Union[bytes, Text], *args, **kwargs) -> DES3Cipher: ...
MODE_ECB: int
MODE_CBC: int
MODE_CFB: int
MODE_PGP: int
MODE_OFB: int
MODE_CTR: int
MODE_OPENPGP: int
block_size: int
key_size: Any

View File

@ -0,0 +1,13 @@
from typing import Any, Optional, Union, Text
from Crypto.PublicKey.RSA import _RSAobj
class PKCS1OAEP_Cipher:
def __init__(self, key: _RSAobj, hashAlgo: Any, mgfunc: Any, label: Any) -> None: ...
def can_encrypt(self): ...
def can_decrypt(self): ...
def encrypt(self, message: Union[bytes, Text]) -> bytes: ...
def decrypt(self, ct: bytes) -> bytes: ...
def new(key: _RSAobj, hashAlgo: Optional[Any] = ..., mgfunc: Optional[Any] = ..., label: Any = ...) -> PKCS1OAEP_Cipher: ...

View File

@ -0,0 +1,13 @@
from typing import Any, Union, Text
from Crypto.PublicKey.RSA import _RSAobj
class PKCS115_Cipher:
def __init__(self, key: _RSAobj) -> None: ...
def can_encrypt(self) -> bool: ...
def can_decrypt(self) -> bool: ...
rf: Any
def encrypt(self, message: Union[bytes, Text]) -> bytes: ...
def decrypt(self, ct: bytes, sentinel: Any) -> bytes: ...
def new(key: _RSAobj) -> PKCS115_Cipher: ...

View File

@ -0,0 +1,16 @@
from typing import Any, Union, Text
__revision__: str
class XORCipher:
block_size: int
key_size: int
def __init__(self, key: Union[bytes, Text], *args, **kwargs) -> None: ...
def encrypt(self, plaintext: Union[bytes, Text]) -> bytes: ...
def decrypt(self, ciphertext: bytes) -> bytes: ...
def new(key: Union[bytes, Text], *args, **kwargs) -> XORCipher: ...
block_size: int
key_size: int

View File

@ -0,0 +1,11 @@
# Names in __all__ with no definition:
# AES
# ARC2
# ARC4
# Blowfish
# CAST
# DES
# DES3
# PKCS1_OAEP
# PKCS1_v1_5
# XOR

View File

@ -0,0 +1,17 @@
from typing import Any, Union, Text
MODE_ECB: int
MODE_CBC: int
MODE_CFB: int
MODE_PGP: int
MODE_OFB: int
MODE_CTR: int
MODE_OPENPGP: int
class BlockAlgo:
mode: int
block_size: int
IV: Any
def __init__(self, factory: Any, key: Union[bytes, Text], *args, **kwargs) -> None: ...
def encrypt(self, plaintext: Union[bytes, Text]) -> bytes: ...
def decrypt(self, ciphertext: bytes) -> bytes: ...

View File

@ -0,0 +1,16 @@
from typing import Any, Optional
digest_size: Any
class HMAC:
digest_size: Any
digestmod: Any
outer: Any
inner: Any
def __init__(self, key, msg: Optional[Any] = ..., digestmod: Optional[Any] = ...) -> None: ...
def update(self, msg): ...
def copy(self): ...
def digest(self): ...
def hexdigest(self): ...
def new(key, msg: Optional[Any] = ..., digestmod: Optional[Any] = ...): ...

View File

@ -0,0 +1,13 @@
from typing import Any, Optional
from Crypto.Hash.hashalgo import HashAlgo
class MD2Hash(HashAlgo):
oid: Any
digest_size: int
block_size: int
def __init__(self, data: Optional[Any] = ...) -> None: ...
def new(self, data: Optional[Any] = ...): ...
def new(data: Optional[Any] = ...): ...
digest_size: Any

View File

@ -0,0 +1,13 @@
from typing import Any, Optional
from Crypto.Hash.hashalgo import HashAlgo
class MD4Hash(HashAlgo):
oid: Any
digest_size: int
block_size: int
def __init__(self, data: Optional[Any] = ...) -> None: ...
def new(self, data: Optional[Any] = ...): ...
def new(data: Optional[Any] = ...): ...
digest_size: Any

View File

@ -0,0 +1,13 @@
from typing import Any, Optional
from Crypto.Hash.hashalgo import HashAlgo
class MD5Hash(HashAlgo):
oid: Any
digest_size: int
block_size: int
def __init__(self, data: Optional[Any] = ...) -> None: ...
def new(self, data: Optional[Any] = ...): ...
def new(data: Optional[Any] = ...): ...
digest_size: Any

View File

@ -0,0 +1,13 @@
from typing import Any, Optional
from Crypto.Hash.hashalgo import HashAlgo
class RIPEMD160Hash(HashAlgo):
oid: Any
digest_size: int
block_size: int
def __init__(self, data: Optional[Any] = ...) -> None: ...
def new(self, data: Optional[Any] = ...): ...
def new(data: Optional[Any] = ...): ...
digest_size: Any

View File

@ -0,0 +1,13 @@
from typing import Any, Optional
from Crypto.Hash.hashalgo import HashAlgo
class SHA1Hash(HashAlgo):
oid: Any
digest_size: int
block_size: int
def __init__(self, data: Optional[Any] = ...) -> None: ...
def new(self, data: Optional[Any] = ...): ...
def new(data: Optional[Any] = ...): ...
digest_size: Any

View File

@ -0,0 +1,13 @@
from typing import Any, Optional
from Crypto.Hash.hashalgo import HashAlgo
class SHA224Hash(HashAlgo):
oid: Any
digest_size: int
block_size: int
def __init__(self, data: Optional[Any] = ...) -> None: ...
def new(self, data: Optional[Any] = ...): ...
def new(data: Optional[Any] = ...): ...
digest_size: Any

View File

@ -0,0 +1,13 @@
from typing import Any, Optional
from Crypto.Hash.hashalgo import HashAlgo
class SHA256Hash(HashAlgo):
oid: Any
digest_size: int
block_size: int
def __init__(self, data: Optional[Any] = ...) -> None: ...
def new(self, data: Optional[Any] = ...): ...
def new(data: Optional[Any] = ...): ...
digest_size: Any

View File

@ -0,0 +1,13 @@
from typing import Any, Optional
from Crypto.Hash.hashalgo import HashAlgo
class SHA384Hash(HashAlgo):
oid: Any
digest_size: int
block_size: int
def __init__(self, data: Optional[Any] = ...) -> None: ...
def new(self, data: Optional[Any] = ...): ...
def new(data: Optional[Any] = ...): ...
digest_size: Any

View File

@ -0,0 +1,13 @@
from typing import Any, Optional
from Crypto.Hash.hashalgo import HashAlgo
class SHA512Hash(HashAlgo):
oid: Any
digest_size: int
block_size: int
def __init__(self, data: Optional[Any] = ...) -> None: ...
def new(self, data: Optional[Any] = ...): ...
def new(data: Optional[Any] = ...): ...
digest_size: Any

View File

@ -0,0 +1,11 @@
# Names in __all__ with no definition:
# HMAC
# MD2
# MD4
# MD5
# RIPEMD
# SHA
# SHA224
# SHA256
# SHA384
# SHA512

View File

@ -0,0 +1,11 @@
from typing import Any, Optional
class HashAlgo:
digest_size: Any
block_size: Any
def __init__(self, hashFactory, data: Optional[Any] = ...) -> None: ...
def update(self, data): ...
def digest(self): ...
def hexdigest(self): ...
def copy(self): ...
def new(self, data: Optional[Any] = ...): ...

View File

@ -0,0 +1,10 @@
from typing import Any, Optional
__revision__: str
def isInt(x): ...
class AllOrNothing:
def __init__(self, ciphermodule, mode: Optional[Any] = ..., IV: Optional[Any] = ...) -> None: ...
def digest(self, text): ...
def undigest(self, blocks): ...

View File

@ -0,0 +1,5 @@
__revision__: str
class Chaff:
def __init__(self, factor: float = ..., blocksper: int = ...) -> None: ...
def chaff(self, blocks): ...

View File

@ -0,0 +1,7 @@
from typing import Any, Optional
from Crypto.Hash import SHA as SHA1
__revision__: str
def PBKDF1(password, salt, dkLen, count: int = ..., hashAlgo: Optional[Any] = ...): ...
def PBKDF2(password, salt, dkLen: int = ..., count: int = ..., prf: Optional[Any] = ...): ...

View File

@ -0,0 +1,4 @@
# Names in __all__ with no definition:
# AllOrNothing
# Chaffing
# KDF

View File

@ -0,0 +1,27 @@
from typing import Any, Optional
from .pubkey import pubkey
class _DSAobj(pubkey):
keydata: Any
implementation: Any
key: Any
def __init__(self, implementation, key) -> None: ...
def __getattr__(self, attrname): ...
def sign(self, M, K): ...
def verify(self, M, signature): ...
def has_private(self): ...
def size(self): ...
def can_blind(self): ...
def can_encrypt(self): ...
def can_sign(self): ...
def publickey(self): ...
class DSAImplementation:
error: Any
def __init__(self, **kwargs) -> None: ...
def generate(self, bits, randfunc: Optional[Any] = ..., progress_func: Optional[Any] = ...): ...
def construct(self, tup): ...
generate: Any
construct: Any
error: Any

View File

@ -0,0 +1,19 @@
from typing import Any, Optional
from Crypto.PublicKey.pubkey import pubkey
from Crypto.PublicKey.pubkey import * # noqa: F403
class error(Exception): ...
def generate(bits, randfunc, progress_func: Optional[Any] = ...): ...
def construct(tup): ...
class ElGamalobj(pubkey):
keydata: Any
def encrypt(self, plaintext, K): ...
def decrypt(self, ciphertext): ...
def sign(self, M, K): ...
def verify(self, M, signature): ...
def size(self): ...
def has_private(self): ...
def publickey(self): ...

View File

@ -0,0 +1,32 @@
from typing import Any, Optional, Union, Text
from .pubkey import pubkey
class _RSAobj(pubkey):
keydata: Any
implementation: Any
key: Any
def __init__(self, implementation, key, randfunc: Optional[Any] = ...) -> None: ...
def __getattr__(self, attrname): ...
def encrypt(self, plaintext, K): ...
def decrypt(self, ciphertext): ...
def sign(self, M, K): ...
def verify(self, M, signature): ...
def has_private(self): ...
def size(self): ...
def can_blind(self): ...
def can_encrypt(self): ...
def can_sign(self): ...
def publickey(self): ...
def exportKey(self, format: str = ..., passphrase: Optional[Any] = ..., pkcs: int = ...): ...
class RSAImplementation:
error: Any
def __init__(self, **kwargs) -> None: ...
def generate(self, bits, randfunc: Optional[Any] = ..., progress_func: Optional[Any] = ..., e: int = ...): ...
def construct(self, tup): ...
def importKey(self, externKey: Any, passphrase: Union[None, bytes, Text] = ...) -> _RSAobj: ...
generate: Any
construct: Any
importKey: Any
error: Any

View File

@ -0,0 +1,4 @@
# Names in __all__ with no definition:
# DSA
# ElGamal
# RSA

View File

@ -0,0 +1,21 @@
from Crypto.Util.number import * # noqa: F403
__revision__: str
class pubkey:
def __init__(self) -> None: ...
def encrypt(self, plaintext, K): ...
def decrypt(self, ciphertext): ...
def sign(self, M, K): ...
def verify(self, M, signature): ...
def validate(self, M, signature): ...
def blind(self, M, B): ...
def unblind(self, M, B): ...
def can_sign(self): ...
def can_encrypt(self): ...
def can_blind(self): ...
def size(self): ...
def has_private(self): ...
def publickey(self): ...
def __eq__(self, other): ...
def __ne__(self, other): ...

View File

@ -0,0 +1,25 @@
from typing import Any
__revision__: str
class FortunaPool:
digest_size: Any
def __init__(self) -> None: ...
def append(self, data): ...
def digest(self): ...
def hexdigest(self): ...
length: int
def reset(self): ...
def which_pools(r): ...
class FortunaAccumulator:
min_pool_size: int
reseed_interval: float
reseed_count: int
generator: Any
last_reseed: Any
pools: Any
def __init__(self) -> None: ...
def random_data(self, bytes): ...
def add_random_event(self, source_number, pool_number, data): ...

View File

@ -0,0 +1,16 @@
from typing import Any
__revision__: str
class AESGenerator:
block_size: Any
key_size: int
max_blocks_per_request: Any
counter: Any
key: Any
block_size_shift: Any
blocks_per_key: Any
max_bytes_per_request: Any
def __init__(self) -> None: ...
def reseed(self, seed): ...
def pseudo_random_data(self, bytes): ...

View File

@ -0,0 +1,13 @@
from typing import Any, Optional
class _SHAd256:
digest_size: Any
def __init__(self, internal_api_check, sha256_hash_obj) -> None: ...
def copy(self): ...
def digest(self): ...
def hexdigest(self): ...
def update(self, data): ...
digest_size: Any
def new(data: Optional[Any] = ...): ...

View File

@ -0,0 +1 @@
__revision__: str

View File

@ -0,0 +1,5 @@
from .rng_base import BaseRNG
class PythonOSURandomRNG(BaseRNG):
name: str
def __init__(self) -> None: ...

View File

@ -0,0 +1,6 @@
from typing import Any, Optional
from .rng_base import BaseRNG
class DevURandomRNG(BaseRNG):
name: str
def __init__(self, devname: Optional[Any] = ...) -> None: ...

View File

@ -0,0 +1,11 @@
__revision__: str
class BaseRNG:
closed: bool
def __init__(self) -> None: ...
def __del__(self): ...
def __enter__(self): ...
def __exit__(self): ...
def close(self): ...
def flush(self): ...
def read(self, N: int = ...): ...

View File

@ -0,0 +1,5 @@
from typing import Any
def new(*args: Any, **kwargs: Any): ...
def atfork() -> None: ...
def get_random_bytes(n: int) -> bytes: ...

View File

@ -0,0 +1,19 @@
from typing import Any, List, Optional, Sequence, TypeVar
_T = TypeVar('_T')
class StrongRandom:
def __init__(self, rng: Optional[Any] = ..., randfunc: Optional[Any] = ...) -> None: ...
def getrandbits(self, k: int) -> int: ...
def randrange(self, start: int, stop: int = ..., step: int = ...) -> int: ...
def randint(self, a: int, b: int) -> int: ...
def choice(self, seq: Sequence[_T]) -> _T: ...
def shuffle(self, x: Sequence[Any]): ...
def sample(self, population: Sequence[_T], k: int) -> List[_T]: ...
def getrandbits(k: int) -> int: ...
def randrange(start: int, stop: int = ..., step: int = ...) -> int: ...
def randint(a: int, b: int) -> int: ...
def choice(seq: Sequence[_T]) -> _T: ...
def shuffle(x: Sequence[Any]): ...
def sample(population: Sequence[_T], k: int) -> List[_T]: ...

View File

@ -0,0 +1,9 @@
from typing import Any, Optional
class PSS_SigScheme:
def __init__(self, key, mgfunc, saltLen) -> None: ...
def can_sign(self): ...
def sign(self, mhash): ...
def verify(self, mhash, S): ...
def new(key, mgfunc: Optional[Any] = ..., saltLen: Optional[Any] = ...): ...

View File

@ -0,0 +1,7 @@
class PKCS115_SigScheme:
def __init__(self, key) -> None: ...
def can_sign(self): ...
def sign(self, mhash): ...
def verify(self, mhash, S): ...
def new(key): ...

View File

@ -0,0 +1,3 @@
# Names in __all__ with no definition:
# PKCS1_PSS
# PKCS1_v1_5

View File

@ -0,0 +1,3 @@
from typing import Any
def new(nbits, prefix: Any = ..., suffix: Any = ..., initial_value: int = ..., overflow: int = ..., little_endian: bool = ..., allow_wraparound: bool = ..., disable_shortcut: bool = ...): ...

View File

@ -0,0 +1,9 @@
from typing import Any
__revision__: str
binary: Any
def key_to_english(key): ...
def english_to_key(s): ...
wordlist: Any

View File

@ -0,0 +1,6 @@
# Names in __all__ with no definition:
# RFC1751
# asn1
# number
# randpool
# strxor

View File

@ -0,0 +1,45 @@
from typing import Any, Optional
class DerObject:
typeTags: Any
typeTag: Any
payload: Any
def __init__(self, ASN1Type: Optional[Any] = ..., payload: Any = ...) -> None: ...
def isType(self, ASN1Type): ...
def encode(self): ...
def decode(self, derEle, noLeftOvers: int = ...): ...
class DerInteger(DerObject):
value: Any
def __init__(self, value: int = ...) -> None: ...
payload: Any
def encode(self): ...
def decode(self, derEle, noLeftOvers: int = ...): ...
class DerSequence(DerObject):
def __init__(self, startSeq: Optional[Any] = ...) -> None: ...
def __delitem__(self, n): ...
def __getitem__(self, n): ...
def __setitem__(self, key, value): ...
def __setslice__(self, i, j, sequence): ...
def __delslice__(self, i, j): ...
def __getslice__(self, i, j): ...
def __len__(self): ...
def append(self, item): ...
def hasInts(self): ...
def hasOnlyInts(self): ...
payload: Any
def encode(self): ...
def decode(self, derEle, noLeftOvers: int = ...): ...
class DerOctetString(DerObject):
payload: Any
def __init__(self, value: Any = ...) -> None: ...
def decode(self, derEle, noLeftOvers: int = ...): ...
class DerNull(DerObject):
def __init__(self) -> None: ...
class DerObjectId(DerObject):
def __init__(self) -> None: ...
def decode(self, derEle, noLeftOvers: int = ...): ...

View File

@ -0,0 +1,22 @@
from typing import Any, Optional
from warnings import warn as _warn
__revision__: str
bignum: Any
def size(N): ...
def getRandomNumber(N, randfunc: Optional[Any] = ...): ...
def getRandomInteger(N, randfunc: Optional[Any] = ...): ...
def getRandomRange(a, b, randfunc: Optional[Any] = ...): ...
def getRandomNBitInteger(N, randfunc: Optional[Any] = ...): ...
def GCD(x, y): ...
def inverse(u, v): ...
def getPrime(N, randfunc: Optional[Any] = ...): ...
def getStrongPrime(N, e: int = ..., false_positive_prob: float = ..., randfunc: Optional[Any] = ...): ...
def isPrime(N, false_positive_prob: float = ..., randfunc: Optional[Any] = ...): ...
def long_to_bytes(n, blocksize: int = ...): ...
def bytes_to_long(s): ...
def long2str(n, blocksize: int = ...): ...
def str2long(s): ...
sieve_base: Any

View File

@ -0,0 +1,16 @@
from typing import Any, Optional
__revision__: str
class RandomPool:
bytes: Any
bits: Any
entropy: Any
def __init__(self, numbytes: int = ..., cipher: Optional[Any] = ..., hash: Optional[Any] = ..., file: Optional[Any] = ...) -> None: ...
def get_bytes(self, N): ...
def randomize(self, N: int = ...): ...
def stir(self, s: str = ...): ...
def stir_n(self, N: int = ...): ...
def add_event(self, s: str = ...): ...
def getBytes(self, N): ...
def addEvent(self, event, s: str = ...): ...

View File

@ -0,0 +1,2 @@
def strxor(*args, **kwargs): ...
def strxor_c(*args, **kwargs): ...

View File

@ -0,0 +1,7 @@
# Names in __all__ with no definition:
# Cipher
# Hash
# Protocol
# PublicKey
# Signature
# Util

View File

@ -0,0 +1,7 @@
class CryptoWarning(Warning): ...
class CryptoDeprecationWarning(DeprecationWarning, CryptoWarning): ...
class CryptoRuntimeWarning(RuntimeWarning, CryptoWarning): ...
class RandomPool_DeprecationWarning(CryptoDeprecationWarning): ...
class ClockRewindWarning(CryptoRuntimeWarning): ...
class GetRandomNumber_DeprecationWarning(CryptoDeprecationWarning): ...
class PowmInsecureWarning(CryptoRuntimeWarning): ...

View File

@ -0,0 +1,22 @@
import sys
from typing import Any, AnyStr, Callable, ContextManager, Generic, IO, Optional, Text, Type, Union
if sys.version_info >= (3, 6):
from os import PathLike
_Path = Union[str, bytes, PathLike[str], PathLike[bytes]]
else:
_Path = Union[Text, bytes]
def replace_atomic(src: AnyStr, dst: AnyStr) -> None: ...
def move_atomic(src: AnyStr, dst: AnyStr) -> None: ...
class AtomicWriter(object):
def __init__(self, path: _Path, mode: Text = ..., overwrite: bool = ...) -> None: ...
def open(self) -> ContextManager[IO[Any]]: ...
def _open(self, get_fileobject: Callable[..., IO[AnyStr]]) -> ContextManager[IO[AnyStr]]: ...
def get_fileobject(self, dir: Optional[_Path] = ..., **kwargs: Any) -> IO[Any]: ...
def sync(self, f: IO[Any]) -> None: ...
def commit(self, f: IO[Any]) -> None: ...
def rollback(self, f: IO[Any]) -> None: ...
def atomic_write(
path: _Path, writer_cls: Type[AtomicWriter] = ..., **cls_kwargs: object,
) -> ContextManager[IO[Any]]: ...

View File

@ -0,0 +1,278 @@
from typing import (
Any,
Callable,
Dict,
Generic,
List,
Optional,
Sequence,
Mapping,
Tuple,
Type,
TypeVar,
Union,
overload,
)
# `import X as X` is required to make these public
from . import exceptions as exceptions
from . import filters as filters
from . import converters as converters
from . import validators as validators
from ._version_info import VersionInfo
__version__: str
__version_info__: VersionInfo
__title__: str
__description__: str
__url__: str
__uri__: str
__author__: str
__email__: str
__license__: str
__copyright__: str
_T = TypeVar("_T")
_C = TypeVar("_C", bound=type)
_ValidatorType = Callable[[Any, Attribute[_T], _T], Any]
_ConverterType = Callable[[Any], _T]
_FilterType = Callable[[Attribute[_T], _T], bool]
_ReprType = Callable[[Any], str]
_ReprArgType = Union[bool, _ReprType]
# FIXME: in reality, if multiple validators are passed they must be in a list or tuple,
# but those are invariant and so would prevent subtypes of _ValidatorType from working
# when passed in a list or tuple.
_ValidatorArgType = Union[_ValidatorType[_T], Sequence[_ValidatorType[_T]]]
# _make --
NOTHING: object
# NOTE: Factory lies about its return type to make this possible: `x: List[int] = Factory(list)`
# Work around mypy issue #4554 in the common case by using an overload.
@overload
def Factory(factory: Callable[[], _T]) -> _T: ...
@overload
def Factory(
factory: Union[Callable[[Any], _T], Callable[[], _T]],
takes_self: bool = ...,
) -> _T: ...
class Attribute(Generic[_T]):
name: str
default: Optional[_T]
validator: Optional[_ValidatorType[_T]]
repr: _ReprArgType
cmp: bool
eq: bool
order: bool
hash: Optional[bool]
init: bool
converter: Optional[_ConverterType[_T]]
metadata: Dict[Any, Any]
type: Optional[Type[_T]]
kw_only: bool
# NOTE: We had several choices for the annotation to use for type arg:
# 1) Type[_T]
# - Pros: Handles simple cases correctly
# - Cons: Might produce less informative errors in the case of conflicting TypeVars
# e.g. `attr.ib(default='bad', type=int)`
# 2) Callable[..., _T]
# - Pros: Better error messages than #1 for conflicting TypeVars
# - Cons: Terrible error messages for validator checks.
# e.g. attr.ib(type=int, validator=validate_str)
# -> error: Cannot infer function type argument
# 3) type (and do all of the work in the mypy plugin)
# - Pros: Simple here, and we could customize the plugin with our own errors.
# - Cons: Would need to write mypy plugin code to handle all the cases.
# We chose option #1.
# `attr` lies about its return type to make the following possible:
# attr() -> Any
# attr(8) -> int
# attr(validator=<some callable>) -> Whatever the callable expects.
# This makes this type of assignments possible:
# x: int = attr(8)
#
# This form catches explicit None or no default but with no other arguments returns Any.
@overload
def attrib(
default: None = ...,
validator: None = ...,
repr: _ReprArgType = ...,
cmp: Optional[bool] = ...,
hash: Optional[bool] = ...,
init: bool = ...,
metadata: Optional[Mapping[Any, Any]] = ...,
type: None = ...,
converter: None = ...,
factory: None = ...,
kw_only: bool = ...,
eq: Optional[bool] = ...,
order: Optional[bool] = ...,
) -> Any: ...
# This form catches an explicit None or no default and infers the type from the other arguments.
@overload
def attrib(
default: None = ...,
validator: Optional[_ValidatorArgType[_T]] = ...,
repr: _ReprArgType = ...,
cmp: Optional[bool] = ...,
hash: Optional[bool] = ...,
init: bool = ...,
metadata: Optional[Mapping[Any, Any]] = ...,
type: Optional[Type[_T]] = ...,
converter: Optional[_ConverterType[_T]] = ...,
factory: Optional[Callable[[], _T]] = ...,
kw_only: bool = ...,
eq: Optional[bool] = ...,
order: Optional[bool] = ...,
) -> _T: ...
# This form catches an explicit default argument.
@overload
def attrib(
default: _T,
validator: Optional[_ValidatorArgType[_T]] = ...,
repr: _ReprArgType = ...,
cmp: Optional[bool] = ...,
hash: Optional[bool] = ...,
init: bool = ...,
metadata: Optional[Mapping[Any, Any]] = ...,
type: Optional[Type[_T]] = ...,
converter: Optional[_ConverterType[_T]] = ...,
factory: Optional[Callable[[], _T]] = ...,
kw_only: bool = ...,
eq: Optional[bool] = ...,
order: Optional[bool] = ...,
) -> _T: ...
# This form covers type=non-Type: e.g. forward references (str), Any
@overload
def attrib(
default: Optional[_T] = ...,
validator: Optional[_ValidatorArgType[_T]] = ...,
repr: _ReprArgType = ...,
cmp: Optional[bool] = ...,
hash: Optional[bool] = ...,
init: bool = ...,
metadata: Optional[Mapping[Any, Any]] = ...,
type: object = ...,
converter: Optional[_ConverterType[_T]] = ...,
factory: Optional[Callable[[], _T]] = ...,
kw_only: bool = ...,
eq: Optional[bool] = ...,
order: Optional[bool] = ...,
) -> Any: ...
@overload
def attrs(
maybe_cls: _C,
these: Optional[Dict[str, Any]] = ...,
repr_ns: Optional[str] = ...,
repr: bool = ...,
cmp: Optional[bool] = ...,
hash: Optional[bool] = ...,
init: bool = ...,
slots: bool = ...,
frozen: bool = ...,
weakref_slot: bool = ...,
str: bool = ...,
auto_attribs: bool = ...,
kw_only: bool = ...,
cache_hash: bool = ...,
auto_exc: bool = ...,
eq: Optional[bool] = ...,
order: Optional[bool] = ...,
) -> _C: ...
@overload
def attrs(
maybe_cls: None = ...,
these: Optional[Dict[str, Any]] = ...,
repr_ns: Optional[str] = ...,
repr: bool = ...,
cmp: Optional[bool] = ...,
hash: Optional[bool] = ...,
init: bool = ...,
slots: bool = ...,
frozen: bool = ...,
weakref_slot: bool = ...,
str: bool = ...,
auto_attribs: bool = ...,
kw_only: bool = ...,
cache_hash: bool = ...,
auto_exc: bool = ...,
eq: Optional[bool] = ...,
order: Optional[bool] = ...,
) -> Callable[[_C], _C]: ...
# TODO: add support for returning NamedTuple from the mypy plugin
class _Fields(Tuple[Attribute[Any], ...]):
def __getattr__(self, name: str) -> Attribute[Any]: ...
def fields(cls: type) -> _Fields: ...
def fields_dict(cls: type) -> Dict[str, Attribute[Any]]: ...
def validate(inst: Any) -> None: ...
# TODO: add support for returning a proper attrs class from the mypy plugin
# we use Any instead of _CountingAttr so that e.g. `make_class('Foo', [attr.ib()])` is valid
def make_class(
name: str,
attrs: Union[List[str], Tuple[str, ...], Dict[str, Any]],
bases: Tuple[type, ...] = ...,
repr_ns: Optional[str] = ...,
repr: bool = ...,
cmp: Optional[bool] = ...,
hash: Optional[bool] = ...,
init: bool = ...,
slots: bool = ...,
frozen: bool = ...,
weakref_slot: bool = ...,
str: bool = ...,
auto_attribs: bool = ...,
kw_only: bool = ...,
cache_hash: bool = ...,
auto_exc: bool = ...,
eq: Optional[bool] = ...,
order: Optional[bool] = ...,
) -> type: ...
# _funcs --
# TODO: add support for returning TypedDict from the mypy plugin
# FIXME: asdict/astuple do not honor their factory args. waiting on one of these:
# https://github.com/python/mypy/issues/4236
# https://github.com/python/typing/issues/253
def asdict(
inst: Any,
recurse: bool = ...,
filter: Optional[_FilterType[Any]] = ...,
dict_factory: Type[Mapping[Any, Any]] = ...,
retain_collection_types: bool = ...,
) -> Dict[str, Any]: ...
# TODO: add support for returning NamedTuple from the mypy plugin
def astuple(
inst: Any,
recurse: bool = ...,
filter: Optional[_FilterType[Any]] = ...,
tuple_factory: Type[Sequence[Any]] = ...,
retain_collection_types: bool = ...,
) -> Tuple[Any, ...]: ...
def has(cls: type) -> bool: ...
def assoc(inst: _T, **changes: Any) -> _T: ...
def evolve(inst: _T, **changes: Any) -> _T: ...
# _config --
def set_run_validators(run: bool) -> None: ...
def get_run_validators() -> bool: ...
# aliases --
s = attributes = attrs
ib = attr = attrib
dataclass = attrs # Technically, partial(attrs, auto_attribs=True) ;)

View File

@ -0,0 +1,9 @@
class VersionInfo:
@property
def year(self) -> int: ...
@property
def minor(self) -> int: ...
@property
def micro(self) -> int: ...
@property
def releaselevel(self) -> str: ...

View File

@ -0,0 +1,12 @@
from typing import TypeVar, Optional, Callable, overload
from . import _ConverterType
_T = TypeVar("_T")
def optional(
converter: _ConverterType[_T]
) -> _ConverterType[Optional[_T]]: ...
@overload
def default_if_none(default: _T) -> _ConverterType[_T]: ...
@overload
def default_if_none(*, factory: Callable[[], _T]) -> _ConverterType[_T]: ...

View File

@ -0,0 +1,15 @@
from typing import Any
class FrozenInstanceError(AttributeError):
msg: str = ...
class AttrsAttributeNotFoundError(ValueError): ...
class NotAnAttrsClassError(ValueError): ...
class DefaultAlreadySetError(RuntimeError): ...
class UnannotatedAttributeError(RuntimeError): ...
class PythonTooOldError(RuntimeError): ...
class NotCallableError(TypeError):
msg: str = ...
value: Any = ...
def __init__(self, msg: str, value: Any) -> None: ...

View File

@ -0,0 +1,5 @@
from typing import Union, Any
from . import Attribute, _FilterType
def include(*what: Union[type, Attribute[Any]]) -> _FilterType[Any]: ...
def exclude(*what: Union[type, Attribute[Any]]) -> _FilterType[Any]: ...

View File

@ -0,0 +1,66 @@
from typing import (
Container,
List,
Union,
TypeVar,
Type,
Any,
Optional,
Tuple,
Iterable,
Mapping,
Callable,
Match,
AnyStr,
overload,
)
from . import _ValidatorType
_T = TypeVar("_T")
_T1 = TypeVar("_T1")
_T2 = TypeVar("_T2")
_T3 = TypeVar("_T3")
_I = TypeVar("_I", bound=Iterable)
_K = TypeVar("_K")
_V = TypeVar("_V")
_M = TypeVar("_M", bound=Mapping)
# To be more precise on instance_of use some overloads.
# If there are more than 3 items in the tuple then we fall back to Any
@overload
def instance_of(type: Type[_T]) -> _ValidatorType[_T]: ...
@overload
def instance_of(type: Tuple[Type[_T]]) -> _ValidatorType[_T]: ...
@overload
def instance_of(
type: Tuple[Type[_T1], Type[_T2]]
) -> _ValidatorType[Union[_T1, _T2]]: ...
@overload
def instance_of(
type: Tuple[Type[_T1], Type[_T2], Type[_T3]]
) -> _ValidatorType[Union[_T1, _T2, _T3]]: ...
@overload
def instance_of(type: Tuple[type, ...]) -> _ValidatorType[Any]: ...
def provides(interface: Any) -> _ValidatorType[Any]: ...
def optional(
validator: Union[_ValidatorType[_T], List[_ValidatorType[_T]]]
) -> _ValidatorType[Optional[_T]]: ...
def in_(options: Container[_T]) -> _ValidatorType[_T]: ...
def and_(*validators: _ValidatorType[_T]) -> _ValidatorType[_T]: ...
def matches_re(
regex: AnyStr,
flags: int = ...,
func: Optional[
Callable[[AnyStr, AnyStr, int], Optional[Match[AnyStr]]]
] = ...,
) -> _ValidatorType[AnyStr]: ...
def deep_iterable(
member_validator: _ValidatorType[_T],
iterable_validator: Optional[_ValidatorType[_I]] = ...,
) -> _ValidatorType[_I]: ...
def deep_mapping(
key_validator: _ValidatorType[_K],
value_validator: _ValidatorType[_V],
mapping_validator: Optional[_ValidatorType[_M]] = ...,
) -> _ValidatorType[_M]: ...
def is_callable() -> _ValidatorType[_T]: ...

View File

@ -0,0 +1,4 @@
from typing import Any
# Explicitly mark this package as incomplete.
def __getattr__(name: str) -> Any: ...

View File

@ -0,0 +1,3 @@
class CertificateError(ValueError): ...
def match_hostname(cert, hostname): ...

View File

@ -0,0 +1,15 @@
from typing import Any
def mk_gen(): ...
def mk_awaitable(): ...
def mk_coroutine(): ...
Generator: Any
Awaitable: Any
Coroutine: Any
def isawaitable(obj): ...
PATCHED: Any
def patch(patch_inspect: bool = ...): ...

View File

@ -0,0 +1,32 @@
from typing import Any, Container, Iterable, Optional, Text
from bleach.linkifier import DEFAULT_CALLBACKS as DEFAULT_CALLBACKS, Linker as Linker
from bleach.sanitizer import (
ALLOWED_ATTRIBUTES as ALLOWED_ATTRIBUTES,
ALLOWED_PROTOCOLS as ALLOWED_PROTOCOLS,
ALLOWED_STYLES as ALLOWED_STYLES,
ALLOWED_TAGS as ALLOWED_TAGS,
Cleaner as Cleaner,
)
from .linkifier import _Callback
__releasedate__: Text
__version__: Text
VERSION: Any # packaging.version.Version
def clean(
text: Text,
tags: Container[Text] = ...,
attributes: Any = ...,
styles: Container[Text] = ...,
protocols: Container[Text] = ...,
strip: bool = ...,
strip_comments: bool = ...,
) -> Text: ...
def linkify(
text: Text,
callbacks: Iterable[_Callback] = ...,
skip_tags: Optional[Container[Text]] = ...,
parse_email: bool = ...,
) -> Text: ...

View File

@ -0,0 +1,6 @@
from typing import MutableMapping, Any, Text
_Attrs = MutableMapping[Any, Text]
def nofollow(attrs: _Attrs, new: bool = ...) -> _Attrs: ...
def target_blank(attrs: _Attrs, new: bool = ...) -> _Attrs: ...

View File

@ -0,0 +1,31 @@
from typing import Any, Container, Iterable, List, MutableMapping, Optional, Pattern, Protocol, Text
_Attrs = MutableMapping[Any, Text]
class _Callback(Protocol):
def __call__(self, attrs: _Attrs, new: bool = ...) -> _Attrs: ...
DEFAULT_CALLBACKS: List[_Callback]
TLDS: List[Text]
def build_url_re(tlds: Iterable[Text] = ..., protocols: Iterable[Text] = ...) -> Pattern[Text]: ...
URL_RE: Pattern[Text]
PROTO_RE: Pattern[Text]
EMAIL_RE: Pattern[Text]
class Linker(object):
def __init__(
self,
callbacks: Iterable[_Callback] = ...,
skip_tags: Optional[Container[Text]] = ...,
parse_email: bool = ...,
url_re: Pattern[Text] = ...,
email_re: Pattern[Text] = ...,
recognized_tags: Optional[Container[Text]] = ...,
) -> None: ...
def linkify(self, text: Text) -> Text: ...
class LinkifyFilter(object): # TODO: derives from html5lib.Filter
def __getattr__(self, item: str) -> Any: ... # incomplete

View File

@ -0,0 +1,34 @@
from typing import Any, Callable, Container, Dict, Iterable, List, Optional, Pattern, Text, Type, Union
ALLOWED_TAGS: List[Text]
ALLOWED_ATTRIBUTES: Dict[Text, List[Text]]
ALLOWED_STYLES: List[Text]
ALLOWED_PROTOCOLS: List[Text]
INVISIBLE_CHARACTERS: Text
INVISIBLE_CHARACTERS_RE: Pattern[Text]
INVISIBLE_REPLACEMENT_CHAR: Text
# A html5lib Filter class
_Filter = Any
class Cleaner(object):
def __init__(
self,
tags: Container[Text] = ...,
attributes: Any = ...,
styles: Container[Text] = ...,
protocols: Container[Text] = ...,
strip: bool = ...,
strip_comments: bool = ...,
filters: Optional[Iterable[_Filter]] = ...,
) -> None: ...
def clean(self, text: Text) -> Text: ...
_AttributeFilter = Callable[[Text, Text, Text], bool]
_AttributeDict = Dict[Text, Union[Container[Text], _AttributeFilter]]
def attribute_filter_factory(attributes: Union[_AttributeFilter, _AttributeDict, Container[Text]]) -> _AttributeFilter: ...
class BleachSanitizerFilter(object): # TODO: derives from html5lib.sanitizer.Filter
def __getattr__(self, item: str) -> Any: ... # incomplete

View File

@ -0,0 +1,8 @@
from collections import OrderedDict
from typing import overload, Mapping, Any, Text
@overload
def alphabetize_attributes(attrs: None) -> None: ...
@overload
def alphabetize_attributes(attrs: Mapping[Any, Text]) -> OrderedDict[Any, Text]: ...
def force_unicode(text: Text) -> Text: ...

View File

@ -0,0 +1,79 @@
from typing import Any, Optional, Text
import logging
from .s3.connection import S3Connection
Version: Any
UserAgent: Any
config: Any
BUCKET_NAME_RE: Any
TOO_LONG_DNS_NAME_COMP: Any
GENERATION_RE: Any
VERSION_RE: Any
ENDPOINTS_PATH: Any
def init_logging(): ...
class NullHandler(logging.Handler):
def emit(self, record): ...
log: Any
perflog: Any
def set_file_logger(name, filepath, level: Any = ..., format_string: Optional[Any] = ...): ...
def set_stream_logger(name, level: Any = ..., format_string: Optional[Any] = ...): ...
def connect_sqs(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_s3(aws_access_key_id: Optional[Text] = ..., aws_secret_access_key: Optional[Text] = ..., **kwargs) -> S3Connection: ...
def connect_gs(gs_access_key_id: Optional[Any] = ..., gs_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_ec2(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_elb(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_autoscale(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_cloudwatch(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_sdb(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_fps(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_mturk(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_cloudfront(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_vpc(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_rds(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_rds2(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_emr(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_sns(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_iam(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_route53(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_cloudformation(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_euca(host: Optional[Any] = ..., aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., port: int = ..., path: str = ..., is_secure: bool = ..., **kwargs): ...
def connect_glacier(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_ec2_endpoint(url, aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_walrus(host: Optional[Any] = ..., aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., port: int = ..., path: str = ..., is_secure: bool = ..., **kwargs): ...
def connect_ses(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_sts(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_ia(ia_access_key_id: Optional[Any] = ..., ia_secret_access_key: Optional[Any] = ..., is_secure: bool = ..., **kwargs): ...
def connect_dynamodb(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_swf(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_cloudsearch(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_cloudsearch2(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., sign_request: bool = ..., **kwargs): ...
def connect_cloudsearchdomain(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_beanstalk(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_elastictranscoder(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_opsworks(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_redshift(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_support(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_cloudtrail(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_directconnect(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_kinesis(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_logs(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_route53domains(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_cognito_identity(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_cognito_sync(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_kms(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_awslambda(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_codedeploy(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_configservice(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_cloudhsm(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_ec2containerservice(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def connect_machinelearning(aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., **kwargs): ...
def storage_uri(uri_str, default_scheme: str = ..., debug: int = ..., validate: bool = ..., bucket_storage_uri_class: Any = ..., suppress_consec_slashes: bool = ..., is_latest: bool = ...): ...
def storage_uri_for_key(key): ...
# Explicitly mark this package as incomplete.
def __getattr__(name: str) -> Any: ...

View File

@ -0,0 +1,108 @@
from typing import Any, Optional
from boto.auth_handler import AuthHandler
SIGV4_DETECT: Any
class HmacKeys:
host: Any
def __init__(self, host, config, provider) -> None: ...
def update_provider(self, provider): ...
def algorithm(self): ...
def sign_string(self, string_to_sign): ...
class AnonAuthHandler(AuthHandler, HmacKeys):
capability: Any
def __init__(self, host, config, provider) -> None: ...
def add_auth(self, http_request, **kwargs): ...
class HmacAuthV1Handler(AuthHandler, HmacKeys):
capability: Any
def __init__(self, host, config, provider) -> None: ...
def update_provider(self, provider): ...
def add_auth(self, http_request, **kwargs): ...
class HmacAuthV2Handler(AuthHandler, HmacKeys):
capability: Any
def __init__(self, host, config, provider) -> None: ...
def update_provider(self, provider): ...
def add_auth(self, http_request, **kwargs): ...
class HmacAuthV3Handler(AuthHandler, HmacKeys):
capability: Any
def __init__(self, host, config, provider) -> None: ...
def add_auth(self, http_request, **kwargs): ...
class HmacAuthV3HTTPHandler(AuthHandler, HmacKeys):
capability: Any
def __init__(self, host, config, provider) -> None: ...
def headers_to_sign(self, http_request): ...
def canonical_headers(self, headers_to_sign): ...
def string_to_sign(self, http_request): ...
def add_auth(self, req, **kwargs): ...
class HmacAuthV4Handler(AuthHandler, HmacKeys):
capability: Any
service_name: Any
region_name: Any
def __init__(self, host, config, provider, service_name: Optional[Any] = ..., region_name: Optional[Any] = ...) -> None: ...
def headers_to_sign(self, http_request): ...
def host_header(self, host, http_request): ...
def query_string(self, http_request): ...
def canonical_query_string(self, http_request): ...
def canonical_headers(self, headers_to_sign): ...
def signed_headers(self, headers_to_sign): ...
def canonical_uri(self, http_request): ...
def payload(self, http_request): ...
def canonical_request(self, http_request): ...
def scope(self, http_request): ...
def split_host_parts(self, host): ...
def determine_region_name(self, host): ...
def determine_service_name(self, host): ...
def credential_scope(self, http_request): ...
def string_to_sign(self, http_request, canonical_request): ...
def signature(self, http_request, string_to_sign): ...
def add_auth(self, req, **kwargs): ...
class S3HmacAuthV4Handler(HmacAuthV4Handler, AuthHandler):
capability: Any
region_name: Any
def __init__(self, *args, **kwargs) -> None: ...
def clean_region_name(self, region_name): ...
def canonical_uri(self, http_request): ...
def canonical_query_string(self, http_request): ...
def host_header(self, host, http_request): ...
def headers_to_sign(self, http_request): ...
def determine_region_name(self, host): ...
def determine_service_name(self, host): ...
def mangle_path_and_params(self, req): ...
def payload(self, http_request): ...
def add_auth(self, req, **kwargs): ...
def presign(self, req, expires, iso_date: Optional[Any] = ...): ...
class STSAnonHandler(AuthHandler):
capability: Any
def add_auth(self, http_request, **kwargs): ...
class QuerySignatureHelper(HmacKeys):
def add_auth(self, http_request, **kwargs): ...
class QuerySignatureV0AuthHandler(QuerySignatureHelper, AuthHandler):
SignatureVersion: int
capability: Any
class QuerySignatureV1AuthHandler(QuerySignatureHelper, AuthHandler):
SignatureVersion: int
capability: Any
def __init__(self, *args, **kw) -> None: ...
class QuerySignatureV2AuthHandler(QuerySignatureHelper, AuthHandler):
SignatureVersion: int
capability: Any
class POSTPathQSV2AuthHandler(QuerySignatureV2AuthHandler, AuthHandler):
capability: Any
def add_auth(self, req, **kwargs): ...
def get_auth_handler(host, config, provider, requested_capability: Optional[Any] = ...): ...
def detect_potential_sigv4(func): ...
def detect_potential_s3sigv4(func): ...

View File

@ -0,0 +1,9 @@
from typing import Any
from boto.plugin import Plugin
class NotReadyToAuthenticate(Exception): ...
class AuthHandler(Plugin):
capability: Any
def __init__(self, host, config, provider) -> None: ...
def add_auth(self, http_request): ...

View File

@ -0,0 +1,17 @@
import sys
from typing import Any
from base64 import encodestring as encodebytes
from six.moves import http_client
expanduser: Any
if sys.version_info >= (3, 0):
StandardError = Exception
else:
from __builtin__ import StandardError as StandardError
long_type: Any
unquote_str: Any
parse_qs_safe: Any

View File

@ -0,0 +1,115 @@
from typing import Any, Dict, Optional, Text
from six.moves import http_client
HAVE_HTTPS_CONNECTION: bool
ON_APP_ENGINE: Any
PORTS_BY_SECURITY: Any
DEFAULT_CA_CERTS_FILE: Any
class HostConnectionPool:
queue: Any
def __init__(self) -> None: ...
def size(self): ...
def put(self, conn): ...
def get(self): ...
def clean(self): ...
class ConnectionPool:
CLEAN_INTERVAL: float
STALE_DURATION: float
host_to_pool: Any
last_clean_time: float
mutex: Any
def __init__(self) -> None: ...
def size(self): ...
def get_http_connection(self, host, port, is_secure): ...
def put_http_connection(self, host, port, is_secure, conn): ...
def clean(self): ...
class HTTPRequest:
method: Any
protocol: Any
host: Any
port: Any
path: Any
auth_path: Any
params: Any
headers: Any
body: Any
def __init__(self, method, protocol, host, port, path, auth_path, params, headers, body) -> None: ...
def authorize(self, connection, **kwargs): ...
class HTTPResponse(http_client.HTTPResponse):
def __init__(self, *args, **kwargs) -> None: ...
def read(self, amt: Optional[Any] = ...): ...
class AWSAuthConnection:
suppress_consec_slashes: Any
num_retries: int
is_secure: Any
https_validate_certificates: Any
ca_certificates_file: Any
port: Any
http_exceptions: Any
http_unretryable_exceptions: Any
socket_exception_values: Any
https_connection_factory: Any
protocol: str
host: Any
path: Any
debug: Any
host_header: Any
http_connection_kwargs: Any
provider: Any
auth_service_name: Any
request_hook: Any
def __init__(self, host, aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., is_secure: bool = ..., port: Optional[Any] = ..., proxy: Optional[Any] = ..., proxy_port: Optional[Any] = ..., proxy_user: Optional[Any] = ..., proxy_pass: Optional[Any] = ..., debug: int = ..., https_connection_factory: Optional[Any] = ..., path: str = ..., provider: str = ..., security_token: Optional[Any] = ..., suppress_consec_slashes: bool = ..., validate_certs: bool = ..., profile_name: Optional[Any] = ...) -> None: ...
auth_region_name: Any
@property
def connection(self): ...
@property
def aws_access_key_id(self): ...
@property
def gs_access_key_id(self) -> Any: ...
access_key: Any
@property
def aws_secret_access_key(self): ...
@property
def gs_secret_access_key(self): ...
secret_key: Any
@property
def profile_name(self): ...
def get_path(self, path: str = ...): ...
def server_name(self, port: Optional[Any] = ...): ...
proxy: Any
proxy_port: Any
proxy_user: Any
proxy_pass: Any
no_proxy: Any
use_proxy: Any
def handle_proxy(self, proxy, proxy_port, proxy_user, proxy_pass): ...
def get_http_connection(self, host, port, is_secure): ...
def skip_proxy(self, host): ...
def new_http_connection(self, host, port, is_secure): ...
def put_http_connection(self, host, port, is_secure, connection): ...
def proxy_ssl(self, host: Optional[Any] = ..., port: Optional[Any] = ...): ...
def prefix_proxy_to_path(self, path, host: Optional[Any] = ...): ...
def get_proxy_auth_header(self): ...
def get_proxy_url_with_auth(self): ...
def set_host_header(self, request): ...
def set_request_hook(self, hook): ...
def build_base_http_request(self, method, path, auth_path, params: Optional[Any] = ..., headers: Optional[Any] = ..., data: str = ..., host: Optional[Any] = ...): ...
def make_request(self, method, path, headers: Optional[Any] = ..., data: str = ..., host: Optional[Any] = ..., auth_path: Optional[Any] = ..., sender: Optional[Any] = ..., override_num_retries: Optional[Any] = ..., params: Optional[Any] = ..., retry_handler: Optional[Any] = ...): ...
def close(self): ...
class AWSQueryConnection(AWSAuthConnection):
APIVersion: str
ResponseError: Any
def __init__(self, aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., is_secure: bool = ..., port: Optional[Any] = ..., proxy: Optional[Any] = ..., proxy_port: Optional[Any] = ..., proxy_user: Optional[Any] = ..., proxy_pass: Optional[Any] = ..., host: Optional[Any] = ..., debug: int = ..., https_connection_factory: Optional[Any] = ..., path: str = ..., security_token: Optional[Any] = ..., validate_certs: bool = ..., profile_name: Optional[Any] = ..., provider: str = ...) -> None: ...
def get_utf8_value(self, value): ...
def make_request(self, action, params: Optional[Any] = ..., path: str = ..., verb: str = ..., *args, **kwargs): ... # type: ignore # https://github.com/python/mypy/issues/1237
def build_list_params(self, params, items, label): ...
def build_complex_list_params(self, params, items, label, names): ...
def get_list(self, action, params, markers, path: str = ..., parent: Optional[Any] = ..., verb: str = ...): ...
def get_object(self, action, params, cls, path: str = ..., parent: Optional[Any] = ..., verb: str = ...): ...
def get_status(self, action, params, path: str = ..., parent: Optional[Any] = ..., verb: str = ...): ...

View File

@ -0,0 +1,10 @@
from typing import Any
RegionData: Any
def regions(**kw_params): ...
def connect_to_region(region_name, **kw_params): ...
def get_region(region_name, **kw_params): ...
# Explicitly mark this package as incomplete.
def __getattr__(name: str) -> Any: ...

View File

@ -0,0 +1,39 @@
from typing import Any
from boto.connection import AWSQueryConnection
RegionData: Any
def regions(): ...
def connect_to_region(region_name, **kw_params): ...
class ELBConnection(AWSQueryConnection):
APIVersion: Any
DefaultRegionName: Any
DefaultRegionEndpoint: Any
region: Any
def __init__(self, aws_access_key_id=..., aws_secret_access_key=..., is_secure=..., port=..., proxy=..., proxy_port=..., proxy_user=..., proxy_pass=..., debug=..., https_connection_factory=..., region=..., path=..., security_token=..., validate_certs=..., profile_name=...) -> None: ...
def build_list_params(self, params, items, label): ...
def get_all_load_balancers(self, load_balancer_names=..., marker=...): ...
def create_load_balancer(self, name, zones, listeners=..., subnets=..., security_groups=..., scheme=..., complex_listeners=...): ...
def create_load_balancer_listeners(self, name, listeners=..., complex_listeners=...): ...
def delete_load_balancer(self, name): ...
def delete_load_balancer_listeners(self, name, ports): ...
def enable_availability_zones(self, load_balancer_name, zones_to_add): ...
def disable_availability_zones(self, load_balancer_name, zones_to_remove): ...
def modify_lb_attribute(self, load_balancer_name, attribute, value): ...
def get_all_lb_attributes(self, load_balancer_name): ...
def get_lb_attribute(self, load_balancer_name, attribute): ...
def register_instances(self, load_balancer_name, instances): ...
def deregister_instances(self, load_balancer_name, instances): ...
def describe_instance_health(self, load_balancer_name, instances=...): ...
def configure_health_check(self, name, health_check): ...
def set_lb_listener_SSL_certificate(self, lb_name, lb_port, ssl_certificate_id): ...
def create_app_cookie_stickiness_policy(self, name, lb_name, policy_name): ...
def create_lb_cookie_stickiness_policy(self, cookie_expiration_period, lb_name, policy_name): ...
def create_lb_policy(self, lb_name, policy_name, policy_type, policy_attributes): ...
def delete_lb_policy(self, lb_name, policy_name): ...
def set_lb_policies_of_listener(self, lb_name, lb_port, policies): ...
def set_lb_policies_of_backend_server(self, lb_name, instance_port, policies): ...
def apply_security_groups_to_lb(self, name, security_groups): ...
def attach_lb_to_subnets(self, name, subnets): ...
def detach_lb_from_subnets(self, name, subnets): ...

View File

@ -0,0 +1,146 @@
from typing import Any, Optional
from boto.compat import StandardError
class BotoClientError(StandardError):
reason: Any
def __init__(self, reason, *args) -> None: ...
class SDBPersistenceError(StandardError): ...
class StoragePermissionsError(BotoClientError): ...
class S3PermissionsError(StoragePermissionsError): ...
class GSPermissionsError(StoragePermissionsError): ...
class BotoServerError(StandardError):
status: Any
reason: Any
body: Any
request_id: Any
error_code: Any
message: str
box_usage: Any
def __init__(self, status, reason, body: Optional[Any] = ..., *args) -> None: ...
def __getattr__(self, name): ...
def __setattr__(self, name, value): ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
class ConsoleOutput:
parent: Any
instance_id: Any
timestamp: Any
comment: Any
output: Any
def __init__(self, parent: Optional[Any] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
class StorageCreateError(BotoServerError):
bucket: Any
def __init__(self, status, reason, body: Optional[Any] = ...) -> None: ...
def endElement(self, name, value, connection): ...
class S3CreateError(StorageCreateError): ...
class GSCreateError(StorageCreateError): ...
class StorageCopyError(BotoServerError): ...
class S3CopyError(StorageCopyError): ...
class GSCopyError(StorageCopyError): ...
class SQSError(BotoServerError):
detail: Any
type: Any
def __init__(self, status, reason, body: Optional[Any] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
class SQSDecodeError(BotoClientError):
message: Any
def __init__(self, reason, message) -> None: ...
class StorageResponseError(BotoServerError):
resource: Any
def __init__(self, status, reason, body: Optional[Any] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
class S3ResponseError(StorageResponseError): ...
class GSResponseError(StorageResponseError): ...
class EC2ResponseError(BotoServerError):
errors: Any
def __init__(self, status, reason, body: Optional[Any] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
request_id: Any
def endElement(self, name, value, connection): ...
class JSONResponseError(BotoServerError):
status: Any
reason: Any
body: Any
error_message: Any
error_code: Any
def __init__(self, status, reason, body: Optional[Any] = ..., *args) -> None: ...
class DynamoDBResponseError(JSONResponseError): ...
class SWFResponseError(JSONResponseError): ...
class EmrResponseError(BotoServerError): ...
class _EC2Error:
connection: Any
error_code: Any
error_message: Any
def __init__(self, connection: Optional[Any] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
class SDBResponseError(BotoServerError): ...
class AWSConnectionError(BotoClientError): ...
class StorageDataError(BotoClientError): ...
class S3DataError(StorageDataError): ...
class GSDataError(StorageDataError): ...
class InvalidUriError(Exception):
message: Any
def __init__(self, message) -> None: ...
class InvalidAclError(Exception):
message: Any
def __init__(self, message) -> None: ...
class InvalidCorsError(Exception):
message: Any
def __init__(self, message) -> None: ...
class NoAuthHandlerFound(Exception): ...
class InvalidLifecycleConfigError(Exception):
message: Any
def __init__(self, message) -> None: ...
class ResumableTransferDisposition:
START_OVER: str
WAIT_BEFORE_RETRY: str
ABORT_CUR_PROCESS: str
ABORT: str
class ResumableUploadException(Exception):
message: Any
disposition: Any
def __init__(self, message, disposition) -> None: ...
class ResumableDownloadException(Exception):
message: Any
disposition: Any
def __init__(self, message, disposition) -> None: ...
class TooManyRecordsException(Exception):
message: Any
def __init__(self, message) -> None: ...
class PleaseRetryException(Exception):
message: Any
response: Any
def __init__(self, message, response: Optional[Any] = ...) -> None: ...
class InvalidInstanceMetadataError(Exception):
MSG: str
def __init__(self, msg) -> None: ...

View File

@ -0,0 +1,5 @@
from typing import List
import boto.regioninfo
def regions() -> List[boto.regioninfo.RegionInfo]: ...
def connect_to_region(region_name, **kw_params): ...

View File

@ -0,0 +1,17 @@
from boto.exception import BotoServerError
class InvalidGrantTokenException(BotoServerError): ...
class DisabledException(BotoServerError): ...
class LimitExceededException(BotoServerError): ...
class DependencyTimeoutException(BotoServerError): ...
class InvalidMarkerException(BotoServerError): ...
class AlreadyExistsException(BotoServerError): ...
class InvalidCiphertextException(BotoServerError): ...
class KeyUnavailableException(BotoServerError): ...
class InvalidAliasNameException(BotoServerError): ...
class UnsupportedOperationException(BotoServerError): ...
class InvalidArnException(BotoServerError): ...
class KMSInternalException(BotoServerError): ...
class InvalidKeyUsageException(BotoServerError): ...
class MalformedPolicyDocumentException(BotoServerError): ...
class NotFoundException(BotoServerError): ...

View File

@ -0,0 +1,37 @@
from typing import Any, Dict, List, Mapping, Optional, Type
from boto.connection import AWSQueryConnection
class KMSConnection(AWSQueryConnection):
APIVersion: str
DefaultRegionName: str
DefaultRegionEndpoint: str
ServiceName: str
TargetPrefix: str
ResponseError: Type[Exception]
region: Any
def __init__(self, **kwargs) -> None: ...
def create_alias(self, alias_name: str, target_key_id: str) -> Optional[Dict[str, Any]]: ...
def create_grant(self, key_id: str, grantee_principal: str, retiring_principal: Optional[str] = ..., operations: Optional[List[str]] = ..., constraints: Optional[Dict[str, Dict[str, str]]] = ..., grant_tokens: Optional[List[str]] = ...) -> Optional[Dict[str, Any]]: ...
def create_key(self, policy: Optional[str] = ..., description: Optional[str] = ..., key_usage: Optional[str] = ...) -> Optional[Dict[str, Any]]: ...
def decrypt(self, ciphertext_blob: bytes, encryption_context: Optional[Mapping[str, Any]] = ..., grant_tokens: Optional[List[str]] = ...) -> Optional[Dict[str, Any]]: ...
def delete_alias(self, alias_name: str) -> Optional[Dict[str, Any]]: ...
def describe_key(self, key_id: str) -> Optional[Dict[str, Any]]: ...
def disable_key(self, key_id: str) -> Optional[Dict[str, Any]]: ...
def disable_key_rotation(self, key_id: str) -> Optional[Dict[str, Any]]: ...
def enable_key(self, key_id: str) -> Optional[Dict[str, Any]]: ...
def enable_key_rotation(self, key_id: str) -> Optional[Dict[str, Any]]: ...
def encrypt(self, key_id: str, plaintext: bytes, encryption_context: Optional[Mapping[str, Any]] = ..., grant_tokens: Optional[List[str]] = ...) -> Optional[Dict[str, Any]]: ...
def generate_data_key(self, key_id: str, encryption_context: Optional[Mapping[str, Any]] = ..., number_of_bytes: Optional[int] = ..., key_spec: Optional[str] = ..., grant_tokens: Optional[List[str]] = ...) -> Optional[Dict[str, Any]]: ...
def generate_data_key_without_plaintext(self, key_id: str, encryption_context: Optional[Mapping[str, Any]] = ..., key_spec: Optional[str] = ..., number_of_bytes: Optional[int] = ..., grant_tokens: Optional[List[str]] = ...) -> Optional[Dict[str, Any]]: ...
def generate_random(self, number_of_bytes: Optional[int] = ...) -> Optional[Dict[str, Any]]: ...
def get_key_policy(self, key_id: str, policy_name: str) -> Optional[Dict[str, Any]]: ...
def get_key_rotation_status(self, key_id: str) -> Optional[Dict[str, Any]]: ...
def list_aliases(self, limit: Optional[int] = ..., marker: Optional[str] = ...) -> Optional[Dict[str, Any]]: ...
def list_grants(self, key_id: str, limit: Optional[int] = ..., marker: Optional[str] = ...) -> Optional[Dict[str, Any]]: ...
def list_key_policies(self, key_id: str, limit: Optional[int] = ..., marker: Optional[str] = ...) -> Optional[Dict[str, Any]]: ...
def list_keys(self, limit: Optional[int] = ..., marker: Optional[str] = ...) -> Optional[Dict[str, Any]]: ...
def put_key_policy(self, key_id: str, policy_name: str, policy: str) -> Optional[Dict[str, Any]]: ...
def re_encrypt(self, ciphertext_blob: bytes, destination_key_id: str, source_encryption_context: Optional[Mapping[str, Any]] = ..., destination_encryption_context: Optional[Mapping[str, Any]] = ..., grant_tokens: Optional[List[str]] = ...) -> Optional[Dict[str, Any]]: ...
def retire_grant(self, grant_token: str) -> Optional[Dict[str, Any]]: ...
def revoke_grant(self, key_id: str, grant_id: str) -> Optional[Dict[str, Any]]: ...
def update_key_description(self, key_id: str, description: str) -> Optional[Dict[str, Any]]: ...

View File

@ -0,0 +1,9 @@
from typing import Any, Optional
class Plugin:
capability: Any
@classmethod
def is_capable(cls, requested_capability): ...
def get_plugin(cls, requested_capability: Optional[Any] = ...): ...
def load_plugins(config): ...

View File

@ -0,0 +1,16 @@
from typing import Any, Optional
def load_endpoint_json(path): ...
def merge_endpoints(defaults, additions): ...
def load_regions(): ...
def get_regions(service_name, region_cls: Optional[Any] = ..., connection_cls: Optional[Any] = ...): ...
class RegionInfo:
connection: Any
name: Any
endpoint: Any
connection_cls: Any
def __init__(self, connection: Optional[Any] = ..., name: Optional[Any] = ..., endpoint: Optional[Any] = ..., connection_cls: Optional[Any] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def connect(self, **kw_params): ...

View File

@ -0,0 +1,14 @@
from typing import Optional
from .connection import S3Connection
from boto.connection import AWSAuthConnection
from boto.regioninfo import RegionInfo
from typing import List, Type, Text
class S3RegionInfo(RegionInfo):
def connect(self, name: Optional[Text] = ..., endpoint: Optional[str] = ..., connection_cls: Optional[Type[AWSAuthConnection]] = ..., **kw_params) -> S3Connection: ...
def regions() -> List[S3RegionInfo]: ...
def connect_to_region(region_name: Text, **kw_params): ...

View File

@ -0,0 +1,39 @@
from .connection import S3Connection
from .user import User
from typing import Any, Dict, Optional, List, Text, Union
CannedACLStrings: List[str]
class Policy:
parent: Any
namespace: Any
acl: ACL
def __init__(self, parent: Optional[Any] = ...) -> None: ...
owner: User
def startElement(self, name: Text, attrs: Dict[str, Any], connection: S3Connection) -> Union[None, User, ACL]: ...
def endElement(self, name: Text, value: Any, connection: S3Connection) -> None: ...
def to_xml(self) -> str: ...
class ACL:
policy: Policy
grants: List[Grant]
def __init__(self, policy: Optional[Policy] = ...) -> None: ...
def add_grant(self, grant: Grant) -> None: ...
def add_email_grant(self, permission: Text, email_address: Text) -> None: ...
def add_user_grant(self, permission: Text, user_id: Text, display_name: Optional[Text] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name: Text, value: Any, connection: S3Connection) -> None: ...
def to_xml(self) -> str: ...
class Grant:
NameSpace: Text
permission: Text
id: Text
display_name: Text
uri: Text
email_address: Text
type: Text
def __init__(self, permission: Optional[Text] = ..., type: Optional[Text] = ..., id: Optional[Text] = ..., display_name: Optional[Text] = ..., uri: Optional[Text] = ..., email_address: Optional[Text] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name: Text, value: Any, connection: S3Connection) -> None: ...
def to_xml(self) -> str: ...

View File

@ -0,0 +1,94 @@
from .bucketlistresultset import BucketListResultSet
from .connection import S3Connection
from .key import Key
from typing import Any, Dict, Optional, Text, Type, List
class S3WebsiteEndpointTranslate:
trans_region: Dict[str, str]
@classmethod
def translate_region(self, reg: Text) -> str: ...
S3Permissions: List[str]
class Bucket:
LoggingGroup: str
BucketPaymentBody: str
VersioningBody: str
VersionRE: str
MFADeleteRE: str
name: Text
connection: S3Connection
key_class: Type[Key]
def __init__(self, connection: Optional[S3Connection] = ..., name: Optional[Text] = ..., key_class: Type[Key] = ...) -> None: ...
def __iter__(self): ...
def __contains__(self, key_name) -> bool: ...
def startElement(self, name, attrs, connection): ...
creation_date: Any
def endElement(self, name, value, connection): ...
def set_key_class(self, key_class): ...
def lookup(self, key_name, headers: Optional[Dict[Text, Text]] = ...): ...
def get_key(self, key_name, headers: Optional[Dict[Text, Text]] = ..., version_id: Optional[Any] = ..., response_headers: Optional[Dict[Text, Text]] = ..., validate: bool = ...) -> Key: ...
def list(self, prefix: Text = ..., delimiter: Text = ..., marker: Text = ..., headers: Optional[Dict[Text, Text]] = ..., encoding_type: Optional[Any] = ...) -> BucketListResultSet: ...
def list_versions(self, prefix: str = ..., delimiter: str = ..., key_marker: str = ..., version_id_marker: str = ..., headers: Optional[Dict[Text, Text]] = ..., encoding_type: Optional[Text] = ...) -> BucketListResultSet: ...
def list_multipart_uploads(self, key_marker: str = ..., upload_id_marker: str = ..., headers: Optional[Dict[Text, Text]] = ..., encoding_type: Optional[Any] = ...): ...
def validate_kwarg_names(self, kwargs, names): ...
def get_all_keys(self, headers: Optional[Dict[Text, Text]] = ..., **params): ...
def get_all_versions(self, headers: Optional[Dict[Text, Text]] = ..., **params): ...
def validate_get_all_versions_params(self, params): ...
def get_all_multipart_uploads(self, headers: Optional[Dict[Text, Text]] = ..., **params): ...
def new_key(self, key_name: Optional[Any] = ...): ...
def generate_url(self, expires_in, method: str = ..., headers: Optional[Dict[Text, Text]] = ..., force_http: bool = ..., response_headers: Optional[Dict[Text, Text]] = ..., expires_in_absolute: bool = ...): ...
def delete_keys(self, keys, quiet: bool = ..., mfa_token: Optional[Any] = ..., headers: Optional[Dict[Text, Text]] = ...): ...
def delete_key(self, key_name, headers: Optional[Dict[Text, Text]] = ..., version_id: Optional[Any] = ..., mfa_token: Optional[Any] = ...): ...
def copy_key(self, new_key_name, src_bucket_name, src_key_name, metadata: Optional[Any] = ..., src_version_id: Optional[Any] = ..., storage_class: str = ..., preserve_acl: bool = ..., encrypt_key: bool = ..., headers: Optional[Dict[Text, Text]] = ..., query_args: Optional[Any] = ...): ...
def set_canned_acl(self, acl_str, key_name: str = ..., headers: Optional[Dict[Text, Text]] = ..., version_id: Optional[Any] = ...): ...
def get_xml_acl(self, key_name: str = ..., headers: Optional[Dict[Text, Text]] = ..., version_id: Optional[Any] = ...): ...
def set_xml_acl(self, acl_str, key_name: str = ..., headers: Optional[Dict[Text, Text]] = ..., version_id: Optional[Any] = ..., query_args: str = ...): ...
def set_acl(self, acl_or_str, key_name: str = ..., headers: Optional[Dict[Text, Text]] = ..., version_id: Optional[Any] = ...): ...
def get_acl(self, key_name: str = ..., headers: Optional[Dict[Text, Text]] = ..., version_id: Optional[Any] = ...): ...
def set_subresource(self, subresource, value, key_name: str = ..., headers: Optional[Dict[Text, Text]] = ..., version_id: Optional[Any] = ...): ...
def get_subresource(self, subresource, key_name: str = ..., headers: Optional[Dict[Text, Text]] = ..., version_id: Optional[Any] = ...): ...
def make_public(self, recursive: bool = ..., headers: Optional[Dict[Text, Text]] = ...): ...
def add_email_grant(self, permission, email_address, recursive: bool = ..., headers: Optional[Dict[Text, Text]] = ...): ...
def add_user_grant(self, permission, user_id, recursive: bool = ..., headers: Optional[Dict[Text, Text]] = ..., display_name: Optional[Any] = ...): ...
def list_grants(self, headers: Optional[Dict[Text, Text]] = ...): ...
def get_location(self): ...
def set_xml_logging(self, logging_str, headers: Optional[Dict[Text, Text]] = ...): ...
def enable_logging(self, target_bucket, target_prefix: str = ..., grants: Optional[Any] = ..., headers: Optional[Dict[Text, Text]] = ...): ...
def disable_logging(self, headers: Optional[Dict[Text, Text]] = ...): ...
def get_logging_status(self, headers: Optional[Dict[Text, Text]] = ...): ...
def set_as_logging_target(self, headers: Optional[Dict[Text, Text]] = ...): ...
def get_request_payment(self, headers: Optional[Dict[Text, Text]] = ...): ...
def set_request_payment(self, payer: str = ..., headers: Optional[Dict[Text, Text]] = ...): ...
def configure_versioning(self, versioning, mfa_delete: bool = ..., mfa_token: Optional[Any] = ..., headers: Optional[Dict[Text, Text]] = ...): ...
def get_versioning_status(self, headers: Optional[Dict[Text, Text]] = ...): ...
def configure_lifecycle(self, lifecycle_config, headers: Optional[Dict[Text, Text]] = ...): ...
def get_lifecycle_config(self, headers: Optional[Dict[Text, Text]] = ...): ...
def delete_lifecycle_configuration(self, headers: Optional[Dict[Text, Text]] = ...): ...
def configure_website(self, suffix: Optional[Any] = ..., error_key: Optional[Any] = ..., redirect_all_requests_to: Optional[Any] = ..., routing_rules: Optional[Any] = ..., headers: Optional[Dict[Text, Text]] = ...): ...
def set_website_configuration(self, config, headers: Optional[Dict[Text, Text]] = ...): ...
def set_website_configuration_xml(self, xml, headers: Optional[Dict[Text, Text]] = ...): ...
def get_website_configuration(self, headers: Optional[Dict[Text, Text]] = ...): ...
def get_website_configuration_obj(self, headers: Optional[Dict[Text, Text]] = ...): ...
def get_website_configuration_with_xml(self, headers: Optional[Dict[Text, Text]] = ...): ...
def get_website_configuration_xml(self, headers: Optional[Dict[Text, Text]] = ...): ...
def delete_website_configuration(self, headers: Optional[Dict[Text, Text]] = ...): ...
def get_website_endpoint(self): ...
def get_policy(self, headers: Optional[Dict[Text, Text]] = ...): ...
def set_policy(self, policy, headers: Optional[Dict[Text, Text]] = ...): ...
def delete_policy(self, headers: Optional[Dict[Text, Text]] = ...): ...
def set_cors_xml(self, cors_xml, headers: Optional[Dict[Text, Text]] = ...): ...
def set_cors(self, cors_config, headers: Optional[Dict[Text, Text]] = ...): ...
def get_cors_xml(self, headers: Optional[Dict[Text, Text]] = ...): ...
def get_cors(self, headers: Optional[Dict[Text, Text]] = ...): ...
def delete_cors(self, headers: Optional[Dict[Text, Text]] = ...): ...
def initiate_multipart_upload(self, key_name, headers: Optional[Dict[Text, Text]] = ..., reduced_redundancy: bool = ..., metadata: Optional[Any] = ..., encrypt_key: bool = ..., policy: Optional[Any] = ...): ...
def complete_multipart_upload(self, key_name, upload_id, xml_body, headers: Optional[Dict[Text, Text]] = ...): ...
def cancel_multipart_upload(self, key_name, upload_id, headers: Optional[Dict[Text, Text]] = ...): ...
def delete(self, headers: Optional[Dict[Text, Text]] = ...): ...
def get_tags(self): ...
def get_xml_tags(self): ...
def set_xml_tags(self, tag_str, headers: Optional[Dict[Text, Text]] = ..., query_args: str = ...): ...
def set_tags(self, tags, headers: Optional[Dict[Text, Text]] = ...): ...
def delete_tags(self, headers: Optional[Dict[Text, Text]] = ...): ...

View File

@ -0,0 +1,40 @@
from .bucket import Bucket
from .key import Key
from typing import Any, Iterable, Iterator, Optional
def bucket_lister(bucket, prefix: str = ..., delimiter: str = ..., marker: str = ..., headers: Optional[Any] = ..., encoding_type: Optional[Any] = ...): ...
class BucketListResultSet(Iterable[Key]):
bucket: Any
prefix: Any
delimiter: Any
marker: Any
headers: Any
encoding_type: Any
def __init__(self, bucket: Optional[Any] = ..., prefix: str = ..., delimiter: str = ..., marker: str = ..., headers: Optional[Any] = ..., encoding_type: Optional[Any] = ...) -> None: ...
def __iter__(self) -> Iterator[Key]: ...
def versioned_bucket_lister(bucket, prefix: str = ..., delimiter: str = ..., key_marker: str = ..., version_id_marker: str = ..., headers: Optional[Any] = ..., encoding_type: Optional[Any] = ...): ...
class VersionedBucketListResultSet:
bucket: Any
prefix: Any
delimiter: Any
key_marker: Any
version_id_marker: Any
headers: Any
encoding_type: Any
def __init__(self, bucket: Optional[Any] = ..., prefix: str = ..., delimiter: str = ..., key_marker: str = ..., version_id_marker: str = ..., headers: Optional[Any] = ..., encoding_type: Optional[Any] = ...) -> None: ...
def __iter__(self) -> Iterator[Key]: ...
def multipart_upload_lister(bucket, key_marker: str = ..., upload_id_marker: str = ..., headers: Optional[Any] = ..., encoding_type: Optional[Any] = ...): ...
class MultiPartUploadListResultSet:
bucket: Any
key_marker: Any
upload_id_marker: Any
headers: Any
encoding_type: Any
def __init__(self, bucket: Optional[Any] = ..., key_marker: str = ..., upload_id_marker: str = ..., headers: Optional[Any] = ..., encoding_type: Optional[Any] = ...) -> None: ...
def __iter__(self): ...

View File

@ -0,0 +1,11 @@
from typing import Any, Optional
class BucketLogging:
target: Any
prefix: Any
grants: Any
def __init__(self, target: Optional[Any] = ..., prefix: Optional[Any] = ..., grants: Optional[Any] = ...) -> None: ...
def add_grant(self, grant): ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...

View File

@ -0,0 +1,67 @@
from .bucket import Bucket
from typing import Any, Dict, Optional, Text, Type
from boto.connection import AWSAuthConnection
from boto.exception import BotoClientError
def check_lowercase_bucketname(n): ...
def assert_case_insensitive(f): ...
class _CallingFormat:
def get_bucket_server(self, server, bucket): ...
def build_url_base(self, connection, protocol, server, bucket, key: str = ...): ...
def build_host(self, server, bucket): ...
def build_auth_path(self, bucket, key: str = ...): ...
def build_path_base(self, bucket, key: str = ...): ...
class SubdomainCallingFormat(_CallingFormat):
def get_bucket_server(self, server, bucket): ...
class VHostCallingFormat(_CallingFormat):
def get_bucket_server(self, server, bucket): ...
class OrdinaryCallingFormat(_CallingFormat):
def get_bucket_server(self, server, bucket): ...
def build_path_base(self, bucket, key: str = ...): ...
class ProtocolIndependentOrdinaryCallingFormat(OrdinaryCallingFormat):
def build_url_base(self, connection, protocol, server, bucket, key: str = ...): ...
class Location:
DEFAULT: str
EU: str
EUCentral1: str
USWest: str
USWest2: str
SAEast: str
APNortheast: str
APSoutheast: str
APSoutheast2: str
CNNorth1: str
class NoHostProvided: ...
class HostRequiredError(BotoClientError): ...
class S3Connection(AWSAuthConnection):
DefaultHost: Any
DefaultCallingFormat: Any
QueryString: str
calling_format: Any
bucket_class: Type[Bucket]
anon: Any
def __init__(self, aws_access_key_id: Optional[Any] = ..., aws_secret_access_key: Optional[Any] = ..., is_secure: bool = ..., port: Optional[Any] = ..., proxy: Optional[Any] = ..., proxy_port: Optional[Any] = ..., proxy_user: Optional[Any] = ..., proxy_pass: Optional[Any] = ..., host: Any = ..., debug: int = ..., https_connection_factory: Optional[Any] = ..., calling_format: Any = ..., path: str = ..., provider: str = ..., bucket_class: Type[Bucket] = ..., security_token: Optional[Any] = ..., suppress_consec_slashes: bool = ..., anon: bool = ..., validate_certs: Optional[Any] = ..., profile_name: Optional[Any] = ...) -> None: ...
def __iter__(self): ...
def __contains__(self, bucket_name): ...
def set_bucket_class(self, bucket_class: Type[Bucket]) -> None: ...
def build_post_policy(self, expiration_time, conditions): ...
def build_post_form_args(self, bucket_name, key, expires_in: int = ..., acl: Optional[Any] = ..., success_action_redirect: Optional[Any] = ..., max_content_length: Optional[Any] = ..., http_method: str = ..., fields: Optional[Any] = ..., conditions: Optional[Any] = ..., storage_class: str = ..., server_side_encryption: Optional[Any] = ...): ...
def generate_url_sigv4(self, expires_in, method, bucket: str = ..., key: str = ..., headers: Optional[Dict[Text, Text]] = ..., force_http: bool = ..., response_headers: Optional[Dict[Text, Text]] = ..., version_id: Optional[Any] = ..., iso_date: Optional[Any] = ...): ...
def generate_url(self, expires_in, method, bucket: str = ..., key: str = ..., headers: Optional[Dict[Text, Text]] = ..., query_auth: bool = ..., force_http: bool = ..., response_headers: Optional[Dict[Text, Text]] = ..., expires_in_absolute: bool = ..., version_id: Optional[Any] = ...): ...
def get_all_buckets(self, headers: Optional[Dict[Text, Text]] = ...): ...
def get_canonical_user_id(self, headers: Optional[Dict[Text, Text]] = ...): ...
def get_bucket(self, bucket_name: Text, validate: bool = ..., headers: Optional[Dict[Text, Text]] = ...) -> Bucket: ...
def head_bucket(self, bucket_name, headers: Optional[Dict[Text, Text]] = ...): ...
def lookup(self, bucket_name, validate: bool = ..., headers: Optional[Dict[Text, Text]] = ...): ...
def create_bucket(self, bucket_name, headers: Optional[Dict[Text, Text]] = ..., location: Any = ..., policy: Optional[Any] = ...): ...
def delete_bucket(self, bucket, headers: Optional[Dict[Text, Text]] = ...): ...
def make_request(self, method, bucket: str = ..., key: str = ..., headers: Optional[Any] = ..., data: str = ..., query_args: Optional[Any] = ..., sender: Optional[Any] = ..., override_num_retries: Optional[Any] = ..., retry_handler: Optional[Any] = ..., *args, **kwargs): ... # type: ignore # https://github.com/python/mypy/issues/1237

View File

@ -0,0 +1,19 @@
from typing import Any, List, Optional
class CORSRule:
allowed_method: Any
allowed_origin: Any
id: Any
allowed_header: Any
max_age_seconds: Any
expose_header: Any
def __init__(self, allowed_method: Optional[Any] = ..., allowed_origin: Optional[Any] = ..., id: Optional[Any] = ..., allowed_header: Optional[Any] = ..., max_age_seconds: Optional[Any] = ..., expose_header: Optional[Any] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self) -> str: ...
class CORSConfiguration(List[CORSRule]):
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self) -> str: ...
def add_rule(self, allowed_method, allowed_origin, id: Optional[Any] = ..., allowed_header: Optional[Any] = ..., max_age_seconds: Optional[Any] = ..., expose_header: Optional[Any] = ...): ...

View File

@ -0,0 +1,12 @@
from typing import Any, Optional
class DeleteMarker:
bucket: Any
name: Any
version_id: Any
is_latest: bool
last_modified: Any
owner: Any
def __init__(self, bucket: Optional[Any] = ..., name: Optional[Any] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...

View File

@ -0,0 +1,232 @@
from typing import Any, Callable, Dict, Optional, Text, Union, overload
class Key:
DefaultContentType: str
RestoreBody: str
BufferSize: Any
base_user_settable_fields: Any
base_fields: Any
bucket: Any
name: str
metadata: Any
cache_control: Any
content_type: Any
content_encoding: Any
content_disposition: Any
content_language: Any
filename: Any
etag: Any
is_latest: bool
last_modified: Any
owner: Any
path: Any
resp: Any
mode: Any
size: Any
version_id: Any
source_version_id: Any
delete_marker: bool
encrypted: Any
ongoing_restore: Any
expiry_date: Any
local_hashes: Any
def __init__(self, bucket: Optional[Any] = ..., name: Optional[Any] = ...) -> None: ...
def __iter__(self): ...
@property
def provider(self): ...
key: Any
md5: Any
base64md5: Any
storage_class: Any
def get_md5_from_hexdigest(self, md5_hexdigest): ...
def handle_encryption_headers(self, resp): ...
def handle_version_headers(self, resp, force: bool = ...): ...
def handle_restore_headers(self, response): ...
def handle_addl_headers(self, headers): ...
def open_read(
self,
headers: Optional[Dict[Text, Text]] = ...,
query_args: str = ...,
override_num_retries: Optional[Any] = ...,
response_headers: Optional[Dict[Text, Text]] = ...,
): ...
def open_write(self, headers: Optional[Dict[Text, Text]] = ..., override_num_retries: Optional[Any] = ...): ...
def open(
self,
mode: str = ...,
headers: Optional[Dict[Text, Text]] = ...,
query_args: Optional[Any] = ...,
override_num_retries: Optional[Any] = ...,
): ...
closed: bool
def close(self, fast: bool = ...): ...
def next(self): ...
__next__: Any
def read(self, size: int = ...): ...
def change_storage_class(self, new_storage_class, dst_bucket: Optional[Any] = ..., validate_dst_bucket: bool = ...): ...
def copy(
self,
dst_bucket,
dst_key,
metadata: Optional[Any] = ...,
reduced_redundancy: bool = ...,
preserve_acl: bool = ...,
encrypt_key: bool = ...,
validate_dst_bucket: bool = ...,
): ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def exists(self, headers: Optional[Dict[Text, Text]] = ...): ...
def delete(self, headers: Optional[Dict[Text, Text]] = ...): ...
def get_metadata(self, name): ...
def set_metadata(self, name, value): ...
def update_metadata(self, d): ...
def set_acl(self, acl_str, headers: Optional[Dict[Text, Text]] = ...): ...
def get_acl(self, headers: Optional[Dict[Text, Text]] = ...): ...
def get_xml_acl(self, headers: Optional[Dict[Text, Text]] = ...): ...
def set_xml_acl(self, acl_str, headers: Optional[Dict[Text, Text]] = ...): ...
def set_canned_acl(self, acl_str, headers: Optional[Dict[Text, Text]] = ...): ...
def get_redirect(self): ...
def set_redirect(self, redirect_location, headers: Optional[Dict[Text, Text]] = ...): ...
def make_public(self, headers: Optional[Dict[Text, Text]] = ...): ...
def generate_url(
self,
expires_in,
method: str = ...,
headers: Optional[Dict[Text, Text]] = ...,
query_auth: bool = ...,
force_http: bool = ...,
response_headers: Optional[Dict[Text, Text]] = ...,
expires_in_absolute: bool = ...,
version_id: Optional[Any] = ...,
policy: Optional[Any] = ...,
reduced_redundancy: bool = ...,
encrypt_key: bool = ...,
): ...
def send_file(
self,
fp,
headers: Optional[Dict[Text, Text]] = ...,
cb: Optional[Callable[[int, int], Any]] = ...,
num_cb: int = ...,
query_args: Optional[Any] = ...,
chunked_transfer: bool = ...,
size: Optional[Any] = ...,
): ...
def should_retry(self, response, chunked_transfer: bool = ...): ...
def compute_md5(self, fp, size: Optional[Any] = ...): ...
def set_contents_from_stream(
self,
fp,
headers: Optional[Dict[Text, Text]] = ...,
replace: bool = ...,
cb: Optional[Callable[[int, int], Any]] = ...,
num_cb: int = ...,
policy: Optional[Any] = ...,
reduced_redundancy: bool = ...,
query_args: Optional[Any] = ...,
size: Optional[Any] = ...,
): ...
def set_contents_from_file(
self,
fp,
headers: Optional[Dict[Text, Text]] = ...,
replace: bool = ...,
cb: Optional[Callable[[int, int], Any]] = ...,
num_cb: int = ...,
policy: Optional[Any] = ...,
md5: Optional[Any] = ...,
reduced_redundancy: bool = ...,
query_args: Optional[Any] = ...,
encrypt_key: bool = ...,
size: Optional[Any] = ...,
rewind: bool = ...,
): ...
def set_contents_from_filename(
self,
filename,
headers: Optional[Dict[Text, Text]] = ...,
replace: bool = ...,
cb: Optional[Callable[[int, int], Any]] = ...,
num_cb: int = ...,
policy: Optional[Any] = ...,
md5: Optional[Any] = ...,
reduced_redundancy: bool = ...,
encrypt_key: bool = ...,
): ...
def set_contents_from_string(
self,
string_data: Union[Text, bytes],
headers: Optional[Dict[Text, Text]] = ...,
replace: bool = ...,
cb: Optional[Callable[[int, int], Any]] = ...,
num_cb: int = ...,
policy: Optional[Any] = ...,
md5: Optional[Any] = ...,
reduced_redundancy: bool = ...,
encrypt_key: bool = ...,
) -> None: ...
def get_file(
self,
fp,
headers: Optional[Dict[Text, Text]] = ...,
cb: Optional[Callable[[int, int], Any]] = ...,
num_cb: int = ...,
torrent: bool = ...,
version_id: Optional[Any] = ...,
override_num_retries: Optional[Any] = ...,
response_headers: Optional[Dict[Text, Text]] = ...,
): ...
def get_torrent_file(
self, fp, headers: Optional[Dict[Text, Text]] = ..., cb: Optional[Callable[[int, int], Any]] = ..., num_cb: int = ...
): ...
def get_contents_to_file(
self,
fp,
headers: Optional[Dict[Text, Text]] = ...,
cb: Optional[Callable[[int, int], Any]] = ...,
num_cb: int = ...,
torrent: bool = ...,
version_id: Optional[Any] = ...,
res_download_handler: Optional[Any] = ...,
response_headers: Optional[Dict[Text, Text]] = ...,
): ...
def get_contents_to_filename(
self,
filename,
headers: Optional[Dict[Text, Text]] = ...,
cb: Optional[Callable[[int, int], Any]] = ...,
num_cb: int = ...,
torrent: bool = ...,
version_id: Optional[Any] = ...,
res_download_handler: Optional[Any] = ...,
response_headers: Optional[Dict[Text, Text]] = ...,
): ...
@overload
def get_contents_as_string(
self,
headers: Optional[Dict[Text, Text]] = ...,
cb: Optional[Callable[[int, int], Any]] = ...,
num_cb: int = ...,
torrent: bool = ...,
version_id: Optional[Any] = ...,
response_headers: Optional[Dict[Text, Text]] = ...,
encoding: None = ...,
) -> bytes: ...
@overload
def get_contents_as_string(
self,
headers: Optional[Dict[Text, Text]] = ...,
cb: Optional[Callable[[int, int], Any]] = ...,
num_cb: int = ...,
torrent: bool = ...,
version_id: Optional[Any] = ...,
response_headers: Optional[Dict[Text, Text]] = ...,
*, encoding: Text,
) -> Text: ...
def add_email_grant(self, permission, email_address, headers: Optional[Dict[Text, Text]] = ...): ...
def add_user_grant(
self, permission, user_id, headers: Optional[Dict[Text, Text]] = ..., display_name: Optional[Any] = ...
): ...
def set_remote_metadata(self, metadata_plus, metadata_minus, preserve_acl, headers: Optional[Dict[Text, Text]] = ...): ...
def restore(self, days, headers: Optional[Dict[Text, Text]] = ...): ...

View File

@ -0,0 +1,29 @@
from typing import Any
class KeyFile:
key: Any
location: int
closed: bool
softspace: int
mode: str
encoding: str
errors: str
newlines: str
name: Any
def __init__(self, key) -> None: ...
def tell(self): ...
def seek(self, pos, whence: Any = ...): ...
def read(self, size): ...
def close(self): ...
def isatty(self): ...
def getkey(self): ...
def write(self, buf): ...
def fileno(self): ...
def flush(self): ...
def next(self): ...
def readinto(self): ...
def readline(self): ...
def readlines(self): ...
def truncate(self): ...
def writelines(self): ...
def xreadlines(self): ...

View File

@ -0,0 +1,51 @@
from typing import Any, List, Optional
class Rule:
id: Any
prefix: Any
status: Any
expiration: Any
transition: Any
def __init__(self, id: Optional[Any] = ..., prefix: Optional[Any] = ..., status: Optional[Any] = ..., expiration: Optional[Any] = ..., transition: Optional[Any] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...
class Expiration:
days: Any
date: Any
def __init__(self, days: Optional[Any] = ..., date: Optional[Any] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...
class Transition:
days: Any
date: Any
storage_class: Any
def __init__(self, days: Optional[Any] = ..., date: Optional[Any] = ..., storage_class: Optional[Any] = ...) -> None: ...
def to_xml(self): ...
class Transitions(List[Transition]):
transition_properties: int
current_transition_property: int
temp_days: Any
temp_date: Any
temp_storage_class: Any
def __init__(self) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...
def add_transition(self, days: Optional[Any] = ..., date: Optional[Any] = ..., storage_class: Optional[Any] = ...): ...
@property
def days(self): ...
@property
def date(self): ...
@property
def storage_class(self): ...
class Lifecycle(List[Rule]):
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...
def add_rule(self, id: Optional[Any] = ..., prefix: str = ..., status: str = ..., expiration: Optional[Any] = ..., transition: Optional[Any] = ...): ...

View File

@ -0,0 +1,27 @@
from typing import Any, Optional
class Deleted:
key: Any
version_id: Any
delete_marker: Any
delete_marker_version_id: Any
def __init__(self, key: Optional[Any] = ..., version_id: Optional[Any] = ..., delete_marker: bool = ..., delete_marker_version_id: Optional[Any] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
class Error:
key: Any
version_id: Any
code: Any
message: Any
def __init__(self, key: Optional[Any] = ..., version_id: Optional[Any] = ..., code: Optional[Any] = ..., message: Optional[Any] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
class MultiDeleteResult:
bucket: Any
deleted: Any
errors: Any
def __init__(self, bucket: Optional[Any] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...

View File

@ -0,0 +1,49 @@
from typing import Any, Optional
class CompleteMultiPartUpload:
bucket: Any
location: Any
bucket_name: Any
key_name: Any
etag: Any
version_id: Any
encrypted: Any
def __init__(self, bucket: Optional[Any] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
class Part:
bucket: Any
part_number: Any
last_modified: Any
etag: Any
size: Any
def __init__(self, bucket: Optional[Any] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def part_lister(mpupload, part_number_marker: Optional[Any] = ...): ...
class MultiPartUpload:
bucket: Any
bucket_name: Any
key_name: Any
id: Any
initiator: Any
owner: Any
storage_class: Any
initiated: Any
part_number_marker: Any
next_part_number_marker: Any
max_parts: Any
is_truncated: bool
def __init__(self, bucket: Optional[Any] = ...) -> None: ...
def __iter__(self): ...
def to_xml(self): ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def get_all_parts(self, max_parts: Optional[Any] = ..., part_number_marker: Optional[Any] = ..., encoding_type: Optional[Any] = ...): ...
def upload_part_from_file(self, fp, part_num, headers: Optional[Any] = ..., replace: bool = ..., cb: Optional[Any] = ..., num_cb: int = ..., md5: Optional[Any] = ..., size: Optional[Any] = ...): ...
def copy_part_from_key(self, src_bucket_name, src_key_name, part_num, start: Optional[Any] = ..., end: Optional[Any] = ..., src_version_id: Optional[Any] = ..., headers: Optional[Any] = ...): ...
def complete_upload(self): ...
def cancel_upload(self): ...

View File

@ -0,0 +1,10 @@
from typing import Any, Optional
class Prefix:
bucket: Any
name: Any
def __init__(self, bucket: Optional[Any] = ..., name: Optional[Any] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
@property
def provider(self): ...

View File

@ -0,0 +1,22 @@
from typing import Any, List, Optional
class Tag:
key: Any
value: Any
def __init__(self, key: Optional[Any] = ..., value: Optional[Any] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...
def __eq__(self, other): ...
class TagSet(List[Tag]):
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def add_tag(self, key, value): ...
def to_xml(self): ...
class Tags(List[TagSet]):
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...
def add_tag_set(self, tag_set): ...

View File

@ -0,0 +1,10 @@
from typing import Any, Optional
class User:
type: Any
id: Any
display_name: Any
def __init__(self, parent: Optional[Any] = ..., id: str = ..., display_name: str = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self, element_name: str = ...): ...

View File

@ -0,0 +1,62 @@
from typing import Any, List, Optional
def tag(key, value): ...
class WebsiteConfiguration:
suffix: Any
error_key: Any
redirect_all_requests_to: Any
routing_rules: Any
def __init__(self, suffix: Optional[Any] = ..., error_key: Optional[Any] = ..., redirect_all_requests_to: Optional[Any] = ..., routing_rules: Optional[Any] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...
class _XMLKeyValue:
translator: Any
container: Any
def __init__(self, translator, container: Optional[Any] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...
class RedirectLocation(_XMLKeyValue):
TRANSLATOR: Any
hostname: Any
protocol: Any
def __init__(self, hostname: Optional[Any] = ..., protocol: Optional[Any] = ...) -> None: ...
def to_xml(self): ...
class RoutingRules(List[RoutingRule]):
def add_rule(self, rule: RoutingRule) -> RoutingRules: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...
class RoutingRule:
condition: Any
redirect: Any
def __init__(self, condition: Optional[Any] = ..., redirect: Optional[Any] = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...
@classmethod
def when(cls, key_prefix: Optional[Any] = ..., http_error_code: Optional[Any] = ...): ...
def then_redirect(self, hostname: Optional[Any] = ..., protocol: Optional[Any] = ..., replace_key: Optional[Any] = ..., replace_key_prefix: Optional[Any] = ..., http_redirect_code: Optional[Any] = ...): ...
class Condition(_XMLKeyValue):
TRANSLATOR: Any
key_prefix: Any
http_error_code: Any
def __init__(self, key_prefix: Optional[Any] = ..., http_error_code: Optional[Any] = ...) -> None: ...
def to_xml(self): ...
class Redirect(_XMLKeyValue):
TRANSLATOR: Any
hostname: Any
protocol: Any
replace_key: Any
replace_key_prefix: Any
http_redirect_code: Any
def __init__(self, hostname: Optional[Any] = ..., protocol: Optional[Any] = ..., replace_key: Optional[Any] = ..., replace_key_prefix: Optional[Any] = ..., http_redirect_code: Optional[Any] = ...) -> None: ...
def to_xml(self): ...

Some files were not shown because too many files have changed in this diff Show More