Skip to content

Blob Storage

https://github.com/Azure-Samples/AzureStorageSnippets/blob/master/blobs/howto/python/blob-devguide-py/blob-devguide-blobs.py

https://learn.microsoft.com/en-us/samples/azure/azure-sdk-for-python/storage-blob-samples/

when to use

  • Ideal for storing unstructured data, like media files (videos, images, audio), logs, scientific data, and archives.

  • Data is stored as objects called blobs, which can be very large (up to petabytes).

  • Access is typically programmatic through code or APIs.

  • Offers various access tiers for cost optimization based on access frequency.

list blobs

https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blobs-list-python

ContainerClient.list_blobs      #name and metadata, tags, and other information associated with each blob
ContainerClient.list_blob_names #only blob name
ContainerClient.walk_blobs      #hierarchical listing

list blob storage containers with container resource_manager_id

az storage container list --account-name <storage-account-name> \
    --auth-mode login -o json
az storage container list --account-name <storage-account-name> \
    --auth-mode login --query "[].{Name:name, ResourceId:id}"
az storage container list --account-name <storage-account-name> \
    --account-key <storage-account-key> --query "[].{Name:name, ResourceId:id}"

example

from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, BlobLeaseClient, BlobPrefix, ContentSettings

account_name = '<account_name>'
container_name = '<container_name>'

account_url = f"https://{account_name}.blob.core.windows.net"
credential = DefaultAzureCredential()

# Create the BlobServiceClient object
blob_service_client = BlobServiceClient(account_url, credential=credential)
container_client = blob_service_client.get_container_client(container=container_name)

m = 0
n = 0
for blob in container_client.walk_blobs(name_starts_with='2021/', delimiter='/'):
    if isinstance(blob, BlobPrefix):
        m += 1
        if m < 4:
            print(f'hierarchical: {blob.name}')
        #list_blobs_hierarchical(container_client, prefix=blob.name)
    else:
        n += 1
        if n < 4:
            print(f'normal blob: {blob.name}')

blob size in a folder

total_size = 0
for blob in container_client.walk_blobs(name_starts_with='2021/', delimiter='/'):
    total_size += blob.size
print(f'Total size: {total_size / 1024 / 1024} MB')

move blob

limitations: creation_time and last_modified cannot be preserved and cannot be updated

can we do this? No! 'BlobClient' object has no attribute 'set_blob_properties'

# Set the creation time of the destination blob to match the creation time of the source blob
dest_blob_properties = dest_blob_client.get_blob_properties()
dest_blob_properties.creation_time = blob_properties.creation_time
dest_blob_client.set_blob_properties(blob_properties=dest_blob_properties)
def move_blob(
    container_client: ContainerClient,
    source_blob_fullpath: str,
    dest_blob_path: str,
):
    """
    Move blob file to another folder in the same blob container
    """
    # Make sure source blob exists
    source_blob = container_client.get_blob_client(blob=source_blob_fullpath)
    if source_blob.exists():
        # Lease source blob during copy to prevent other clients from modifying it
        lease = BlobLeaseClient(client=source_blob)
        lease.acquire(-1) # Create an infinite lease

        # Get source blob properties
        source_blob_properties = source_blob.get_blob_properties()

        # Copy blob
        blob_filename = source_blob_fullpath.rsplit('/', 1)[-1]
        dest_blob = container_client.get_blob_client(blob=f'{dest_blob_path}/{blob_filename}')
        dest_blob.start_copy_from_url(source_url=source_blob.url)

        # Break source blob lease
        if source_blob_properties.lease.state == "leased":
            lease.break_lease()

        # Delete source blob
        source_blob.delete_blob()

        return source_blob_properties

blob with Python

import os, uuid
from azure.storage.blob import BlobServiceClient, ContainerClient, BlobClient, __version__

#create BlobServiceClient
blobsvc = BlobServiceClient.from_connection_string('blob connection string')

#create container with a unique name
container_name = str(uuid.uuid4())
container = blobsvc.create_container(container_name)

#create blob client
client = client_blobsvc.get_blob_client(container=container_name, blob=filename)

#upload file to blob
with open(f'{filename}.txt', "rb") as data:
    client.upload_blob(data)

#list blobs in container
blob_list = container.list_blobs()
for blob in blob_list:
    print("\t" + blob.name)

#download blob to a local file
with open(download_file, "wb") as file:
    file.write(client.download_blob().readall())

#deleting blob container
container.delete_container()

download file

from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient

container_name = 'my-container'
storage_account_name = 'my-storage-account'
parquet_file_path = 'path/from/container/root.parquet'

account_url = f"https://{storage_account_name}.blob.core.windows.net"
credential = DefaultAzureCredential()

try:
    blob_service_client = BlobServiceClient(account_url=account_url, credential=credential)
    blob_client = blob_service_client.get_blob_client(container_name, parquet_file_path)

    # Download the Parquet file
    with open('c:/data/downloaded_file.parquet', 'wb') as f:
        blob_data = blob_client.download_blob()
        blob_data.readinto(f)
    print('File downloaded successfully.')
except Exception as e:
    print(f"Error downloading the file: {e}")

upload file

import uuid
def file_to_blob(local_filepath, blob_filepath, chunk_size = 4 * 1024 * 1024):
    """
    Upload file to blob storage
    """
    try:
        blob_client = container_client.get_blob_client(blob_filepath)
        block_list = []
        with open(local_filepath,'rb') as f:
            while True:
                chunk = f.read(chunk_size)
                if not chunk:
                    break
                blk_id = str(uuid.uuid4())
                blob_client.stage_block(block_id=blk_id, data=chunk)
                block_list.append(BlobBlock(block_id=blk_id))
        blob_client.commit_block_list(block_list)
    except Exception as exc:
        print('Upload file error')

deltalake

import duckdb
import pandas as pd
from deltalake import DeltaTable
from azure.identity import DefaultAzureCredential

container_name = 'my-container-name'
storage_account_name = 'my-storage-account-name'
credential = DefaultAzureCredential()
def get_dataframe(
    path: str, # not include container name
    query: str=None,
) -> pd.DataFrame:
    token = credential.get_token("https://storage.azure.com/.default").token
    delta_table_path = f'abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{path}'
    delta_table = DeltaTable(delta_table_path, storage_options={'bearer_token':token}).to_pyarrow_dataset()
    with duckdb.connect() as conn:
        conn.execute("SET timezone = 'UTC'")  # force UTC timezone in DuckDB
        conn.register("delta_table", delta_table) # register as a view
        if not query:
            query = 'select * from delta_table'
        results = conn.execute(query).df()
    return results