|
|
|
import boto3
|
|
|
|
from botocore.exceptions import ClientError
|
|
|
|
from yt_dlp_stream_to_s3.config import (
|
|
|
|
AWS_ENDPOINT_URL,
|
|
|
|
AWS_SECRET_ACCESS_KEY,
|
|
|
|
AWS_ACCESS_KEY_ID,
|
|
|
|
)
|
|
|
|
from yt_dlp_stream_to_s3.errors import YtDlpStreamToS3ConnectionError
|
|
|
|
|
|
|
|
creds = dict(
|
|
|
|
endpoint_url=AWS_ENDPOINT_URL,
|
|
|
|
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
|
|
|
|
aws_access_key_id=AWS_ACCESS_KEY_ID,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
s3_client = boto3.client("s3", **creds)
|
|
|
|
|
|
|
|
except Exception:
|
|
|
|
raise YtDlpStreamToS3ConnectionError(
|
|
|
|
f"Could not connect to S3 service {AWS_ENDPOINT_URL}"
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def get_file_size(bucket_name: str, key: str) -> str:
|
|
|
|
s3_resource = boto3.resource("s3", **creds)
|
|
|
|
return s3_resource.Object(bucket_name, key).content_length / 1_000_000
|
|
|
|
|
|
|
|
|
|
|
|
def create_presigned_url(bucket_name, object_name, expiration=3600):
|
|
|
|
return s3_client.generate_presigned_url(
|
|
|
|
"get_object",
|
|
|
|
Params={"Bucket": bucket_name, "Key": object_name},
|
|
|
|
ExpiresIn=expiration,
|
|
|
|
)
|