Skip to content
June 22, 2012 / ahriman hpc mode

Пояснения по нововведениям в Windows Azure: сервисы хранилища. Ч.4.

Пояснения по нововведениям в Windows Azure: сервисы хранилища. Ч.3.

Пояснения по нововведениям в Windows Azure: сервисы хранилища. Ч.2.

Пояснения по нововведениям в Windows Azure: сервисы хранилища.

В этой части – Shared Access Signatures для сервиса очередей (+ код), заканчиваем с SAS. В следующей части – перевод статьи Gaurav Mantri про копирование блобов между аккаунтами.

Функциональность SAS заключается в детальном контроле доступа к ресурсам. К списку доступных для определения SAS операций (параметр sp) относятся:

  • Добавление (a), удаление (d), обновление (u), обработка (получение и удаление, p) и «подглядывание» (просмотр сообщения без его забора из очереди) или чтение (r) сообщений в очередях.
  • Получение метаданных очередей, включая количество сообщений в очереди.

Параметры SAS включают в себя всю информацию, необходимую для выдачи доступа к ресурсам хранилища – параметры запроса в URL определяют временной промежуток (параметр si), через который SAS "протухнет", разрешения, предоставляемые данной SAS, ресурсы, к которым предоставляется доступ и, собственно, сигнатуру, с помощью которой происходит аутентификация. Кроме этого в SAS URL можно включить ссылку на хранимую политику доступа, с помощью которой можно предоставить ещё один слой контроля.

Типичным сценарием использования сигнатур для очередей является система уведомления, когда сервис, выпускающий уведомления, должен иметь к очереди доступ типа «только чтение», а клиент, получающий уведомление, доступ типа «обработка и чтение сообщений». В качестве примера можно привести сервис обработки видео: видео-источник хранится в хранилище Windows Azure, после того же, как сервис заканчивает обрабатывать видео, результат его работы передается обратно в хранилище. Если реализовать подобный сценарий без использования SAS, то система будет, возможно, содержать три компонента:

  • Клиентское приложение, создающий токен доступа к блобу-источнику с разрешениями типа «чтение» и блобу-результату с разрешениями типа «запись» и посылающий запрос сервису, обрабатывающему видео.
  • Фронтенд сервиса, обрабатывающего видео. Принимает запросы, аутентифицирует клиента и посылает сообщение в очередь со сведениями о том, откуда и какое видео взять для обработки (очередь “videoprocessingqueue”).
  • Worker-роль обработчика видео. Коллекция экземпляров роли данного сервиса опрашивает очередь “videoprocessingqueue” и производит обработку видео, сообщения о которых приходят ему из очереди.

Подобная архитектура системы потребовала бы определенное увеличение экземпляров фронтенда пропорционально увеличению запросов к системе и количества использующих ее пользователей. Возможность работы клиентского приложения с сервисом хранилища существенно увеличивает производительность системы и позволяет снизить необходимые вычислительные мощности. В этом случае фронтенд может выдавать токены SAS для обеспечения доступа к очереди с разрешением типа «добавление сообщений» клиенту на, например, 2 часа, и в это время клиентское приложение спокойно работает с сервисом хранилища, не нагружая фронтенд заботами о проксировании запросов. Компоненты остаются те же, но роли их меняются:

  • Клиент создаёт токены для блоба-источника и блоба-результата, после чего обращается к фронтенду, получает токен для очереди и начинает посылать запросы. Токен кэшируется на два часа и обновляется перед «протуханием».
  • Фронтенд сервиса, обрабатывающего видео, принимает и аутентифицирует запросы и выдает токены SAS на использование очереди “videoprocessingqueue” с разрешениями типа «добавление сообщения» на период до 2 часов.
  • Worker-роль обработчика видео. Коллекция экземпляров роли данного сервиса опрашивает очередь “videoprocessingqueue” и производит обработку видео, сообщения о которых приходят ему из очереди.

При этом у видео, которое надо кодировать, есть enumeration, содержащий возможные значения качества кодирования этого видео.

Сначала определим объект видео – TranscodingWorkItem.

/// <summary>

/// Enumeration, отображающий необходимое качество видео-результата. Определяется клиентом. 

/// </summary>

public enum VideoQuality

{

quality240p,

quality480p,

quality720p

}

/// <summary>

/// Класс для сообщения о видео, предоставляемом клиентом и обрабатываемом обработчиком видео. 

/// </summary>

public class TranscodingWorkItem

{

public string SourceVideoUri { get; set; }

public string DestinationVideoUri { get; set; }

/// <summary>

/// Токен SAS для источника видео с разрешениями на чтение. 

/// </summary>

public string SourceSasToken { get; set; }

/// <summary>

/// Токен SAS для источника видео с разрешениями на запись.

/// </summary>

public string DestinationSasToken { get; set; }

/// <summary>

/// Запрошенное клиентом качество видео-результата.

/// </summary>

public VideoQuality TargetVideoQuality { get; set; }

/// <summary>

/// Конвертирует XML-отображение сообщения в очереди в объект TranscodingWorkItem, используется экземпляром обработчика видео. 

public static TranscodingWorkItem FromMessage(string messageContents)

{

XmlSerializer mySerializer = new XmlSerializer(typeof(TranscodingWorkItem));

StringReader reader = new StringReader(messageContents);

return (TranscodingWorkItem)mySerializer.Deserialize(reader);

}

/// <summary>

/// Сериализует объект TranscodingWorkItem в XML-строку, использующуюся ///как сообщение в очереди. Используется клиентом. 

/// </summary>

public string ToMessage()

{

XmlSerializer mySerializer = new XmlSerializer(typeof(TranscodingWorkItem));

StringWriter writer = new StringWriter();

mySerializer.Serialize(writer, this);

writer.Close();

return writer.ToString();

}

}

Код для издателя токенов SAS. Генерирует два типа токенов – неотзываемый, который живет в течение 2 часов (для клиента), и отзываемый с максимально возможным временем жизни, который будет использоваться экземпляром обработчика видео.

/// <summary>

/// Издатель токенов, работающий на фронтенде. 

/// </summary>

public class SasProducer

{

/* ... */

/// <summary>

/// Вызывается клиентским приложением в случае необходимости выдачи ///токена SAS, разрешающего добавлять сообщения в очередь в течение 2 часов. 

public string GetClientSasToken()

{

// Политика истекает спустя два часа после выдачи, при этом время ///начала работы этой политики не указано, что означает, что она начнет ///работать сразу же после выпуска токена. 

SharedAccessQueuePolicy policy = new SharedAccessQueuePolicy()

{

SharedAccessExpiryTime = DateTime.UtcNow.Add(SasProducer.SasTokenDuration),

Permissions = SharedAccessQueuePermissions.Add

};

string sasToken = this.videoProcessingQueue.GetSharedAccessSignature(

policy /* политика доступа */,

null /* идентификатор политики доступа */);

return sasToken;

}

/// <summary>

/// Метод генерирует отзываемый токен SAS для экземпляров обработчика ///видео. 

/// </summary>

public string GetSasTokenForProcessingMessages()

{

// Нужно иметь подписанный идентификатор для связывания SAS и ///политики, хранимой на сервере. 

string workerPolicySignedIdentifier = 

"VideoProcessingWorkerAccessPolicy" + DateTime.UtcNow.ToString();

QueuePermissions workerQueuePermissions = new QueuePermissions();

SharedAccessQueuePolicy workerQueuePolicy = new SharedAccessQueuePolicy()

{

SharedAccessExpiryTime = DateTime.MaxValue,

Permissions = SharedAccessQueuePermissions.ProcessMessages | SharedAccessQueuePermissions.Update

};

workerQueuePermissions.SharedAccessPolicies.Add(

workerPolicySignedIdentifier,

workerQueuePolicy);

// Приведенный ниже код приведет к формированию запроса Set Queue ACL к хранилищу Windows Azure. Таким образом политика будет ассоциирована с идентификатором "VideoProcessingWorkerAccessPolicy".

this.videoProcessingQueue.SetPermissions(workerQueuePermissions);

string revocableSasTokenQueue = this.videoProcessingQueue.GetSharedAccessSignature(

new SharedAccessQueuePolicy(),

workerPolicySignedIdentifier);

return revocableSasTokenQueue;

}

}

Код для части клиентского приложения, использующей учетные данные хранилища для создания SAS для блоба-источника, блоба-результата и очереди. Коммуникации между клиентом и фронтендом сервиса осуществляются вызовом метода объекта SasProducer.

/// <summary>

/// Класс клиента, использующего сервис обработки видео. 

/// </summary>

public class Client

{

private const int CredsRefreshThresholdInMinutes = 60;

private SasProducer videoProcessingService;

private StorageCredentialsSharedAccessSignature serviceQueueSasCredentials;

private DateTime serviceQueueSasExpiryTime;

private string serviceQueueEndpoint;

public Client(SasProducer service, string serviceQueueEndpoint)

{

this.videoProcessingService = service;

this.serviceQueueEndpoint = serviceQueueEndpoint;

}

public void SubmitTranscodeVideoRequest(

string clientStorageAccountName,

string clientStorageKey,

string sourceVideoBlobUri,

string destinationVideoBlobUri,

VideoQuality videoQuality)

{

CloudStorageAccount clientStorageAccount = CloudStorageAccount.Parse(

string.Format("DefaultEndpointsProtocol=http;AccountName={0};AccountKey={1}", 

clientStorageAccountName, clientStorageKey));

CloudBlobClient blobClient = clientStorageAccount.CreateCloudBlobClient();

CloudBlob sourceVideo = new CloudBlob(

sourceVideoBlobUri /*blobUri*/,

blobClient /*serviceClient*/);

CloudBlob destinationVideo = new CloudBlob(

destinationVideoBlobUri /*blobUri*/,

blobClient /*serviceClient*/);

SharedAccessBlobPolicy sourcePolicy = new SharedAccessBlobPolicy

{

SharedAccessExpiryTime = DateTime.UtcNow.AddHours(24),

Permissions = SharedAccessBlobPermissions.Read

};

SharedAccessBlobPolicy destinationPolicy = new SharedAccessBlobPolicy

{

SharedAccessExpiryTime = DateTime.UtcNow.AddHours(24),

Permissions = SharedAccessBlobPermissions.Write

};

string sourceSasToken = sourceVideo.GetSharedAccessSignature(

sourcePolicy,

null /* идентификатор политики доступа*/);

string destinationSasToken = destinationVideo.GetSharedAccessSignature(

destinationPolicy,

null /* идентификатор политики доступа*/);

TranscodingWorkItem workItem = new TranscodingWorkItem

{

SourceVideoUri = sourceVideo.Uri.AbsoluteUri,

DestinationVideoUri = destinationVideo.Uri.AbsoluteUri,

SourceSasToken = sourceSasToken,

DestinationSasToken = destinationSasToken,

TargetVideoQuality = videoQuality

};

StorageCredentials serviceQueueSasCrendials = GetServiceQueueSasCredentials();

CloudQueueClient queueClient = new CloudQueueClient(

this.serviceQueueEndpoint /*baseAddress*/,

serviceQueueSasCrendials /*учетные данные*/);

CloudQueue serviceQueue = queueClient.GetQueueReference(SasProducer.WorkerQueueName);

CloudQueueMessage message = new CloudQueueMessage(

workItem.ToMessage());

serviceQueue.AddMessage(message);

}

public StorageCredentials GetServiceQueueSasCredentials()

{

if (this.serviceQueueSasCredentials == null ||

DateTime.UtcNow.AddMinutes(CredsRefreshThresholdInMinutes) 

>= this.serviceQueueSasExpiryTime)

{

this.RefreshAccessCredentials();

}

return this.serviceQueueSasCredentials;

}

public void RefreshAccessCredentials()

{

string sasToken = this.videoProcessingService.GetClientSasToken();

this.serviceQueueSasCredentials = new StorageCredentialsSharedAccessSignature(sasToken);

this.serviceQueueSasExpiryTime = DateTime.UtcNow.Add(SasProducer.SasTokenDuration);

}

}

Код обработчика видео. Обработчик видео использует токены SAS, которые передаются либо в конфигурационном файле, либо по коммуникационному каналу от издателя SAS.

/// <summary>

/// Класс обработчика видео. 

/// </summary>

public class VideoProcessingWorker

{

public const string WorkerQueueName = "videoprocessingqueue";

private CloudQueue videoProcessingQueue;

public VideoProcessingWorker(string sasTokenForWorkQueue, string storageAccountName)

{

string queueEndpoint = 

string.Format("http://{0}.queue.core.windows.net", storageAccountName);

StorageCredentials queueCredendials = 

new StorageCredentialsSharedAccessSignature(sasTokenForWorkQueue);

CloudQueueClient queueClient = 

new CloudQueueClient(queueEndpoint, queueCredendials);

this.videoProcessingQueue = 

queueClient.GetQueueReference(VideoProcessingWorker.WorkerQueueName);

}

public void Start()

{

while (true)

{

CloudQueueMessage message = this.videoProcessingQueue.GetMessage(

TimeSpan.FromMinutes(5));

if (message == null)

{

Thread.Sleep(TimeSpan.FromSeconds(5));

continue;

}

TranscodingWorkItem workItem;

try

{

workItem = TranscodingWorkItem.FromMessage(message.AsString);

}

catch (InvalidOperationException)

{

this.videoProcessingQueue.DeleteMessage(message);

continue;

}

StorageCredentials sourceCredentials = 

new StorageCredentialsSharedAccessSignature(workItem.SourceSasToken);

CloudBlob sourceVideo = new CloudBlob(workItem.SourceVideoUri, sourceCredentials);

StorageCredentials destinationCredentials = 

new StorageCredentialsSharedAccessSignature(workItem.DestinationSasToken);

CloudBlob destinationVideo = 

new CloudBlob(workItem.DestinationVideoUri, destinationCredentials);

this.ProcessVideo(sourceVideo, destinationVideo, workItem.TargetVideoQuality);

this.videoProcessingQueue.DeleteMessage(message);

}

}

private void ProcessVideo(

CloudBlob sourceVideo,

CloudBlob destinationVideo,

VideoQuality targetVideoQuality)

{

Stream inStream = sourceVideo.OpenRead();

Stream outStream = sourceVideo.OpenWrite();

byte[] buffer = new byte[1024];

int count = 1;

while (count != 0)

{

count = inStream.Read(buffer, 0, buffer.Length);

outStream.Write(buffer, 0, count);

}

using (TextWriter writer = new StreamWriter(outStream))

{

writer.WriteLine(" (transcoded to {0})", targetVideoQuality);

}

}

}

Код метода Main, в котором используется вся вышеприведенная функциональность.

public static void Main()

{

string serviceAccountName = "someserviceaccountname";

string serviceAccountKey = "someserviceAccountKey";

string serviceQueueEndpoint = 

string.Format("http://{0}.queue.core.windows.net", serviceAccountName);

SasProducer sasProducer = new SasProducer(serviceAccountName, serviceAccountKey);

string sasTokenForQueue = sasProducer.GetSasTokenForProcessingMessages();

VideoProcessingWorker transcodingWorker = 

new VideoProcessingWorker(sasTokenForQueue, "someAccountName");

ThreadPool.QueueUserWorkItem((state) => transcodingWorker.Start());

Client client = new Client(sasProducer, serviceQueueEndpoint);

string customerAccountName = "clientaccountname";

string customerAccountKey = "CLIENTACCOUNTKEY";

CloudStorageAccount storageAccount = CloudStorageAccount.Parse(

string.Format("DefaultEndpointsProtocol=http;AccountName={0};AccountKey={1}",

customerAccountName,

customerAccountKey));

CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();

CloudBlobContainer sourceContainer = 

blobClient.GetContainerReference("sourcevideos");

sourceContainer.CreateIfNotExist();

CloudBlobContainer destinationContainer = 

blobClient.GetContainerReference("transcodedvideos");

destinationContainer.CreateIfNotExist();

List<CloudBlob> sourceVideoList = new List<CloudBlob>();

for (int i = 0; i < 10; i++)

{

CloudBlob sourceVideo = sourceContainer.GetBlobReference("Video" + i);

sourceVideo.UploadText("Content of video" + i);

sourceVideoList.Add(sourceVideo);

}

for (int i = 0; i < 10; i++)

{

CloudBlob sourceVideo = sourceVideoList[i];

CloudBlob destinationVideo = 

destinationContainer.GetBlobReference("Video" + i);

client.SubmitTranscodeVideoRequest(

customerAccountName,

customerAccountKey,

sourceVideo.Uri.AbsoluteUri,

destinationVideo.Uri.AbsoluteUri,

VideoQuality.quality480p);

}

Thread.Sleep(TimeSpan.FromMinutes(5));

}

По мотивам статьи команды разработчиков и MSDN.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: