Skip to content
March 13, 2012 / ahriman hpc mode

Основы хранилища Windows Azure. Очереди.

Очереди Windows Azure

clip_image001

1) Очереди используются для хранения сообщения

2) Используют FIFO, однонаправленные

3) Имя очереди должно быть именем в нижнем регистре и URL-friendly

.clip_image002

Сообщения:

  1. Очередь может содержать неограниченное количество сообщений
  2. Сообщения должны быть сериализуемы как XML
  3. Размер ограничен 8Кб
  4. Обычно используется паттерн work ticket
  5. Сборщик мусора для очереди запускается раз в неделю

image

Ссылка на очередь выглядит стандартно для именования сущностей в сервисах хранилища Windows Azure:

http://<account&gt;.queue.core.windows.net/<QueueName>

К основным операциям над очередями с использованием клиента облачных очередей CloudQueueClient можно отнести получение ссылки на очередь (GetQueueReference, возвращает объект CloudQueue) и получение списка очередей (ListQueues).

var queueClient =  CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient();

var queue = queueClient.GetQueueReference("messages");

У объекта CloudQueue доступно несколько операций по управлению очередью:

  • SetMetadata: Определяет метаданные для очереди.
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient();

var queue = queue.Client.GetQueueReference("messages");

queue.CreateIfNotExist();

queue.Metadata.Add(new NameValueCollection()

{

{"Name","Ahriman"}.

{"DateOfMsgCreation", DateTime.UtcNow.ToString()}

});

queue.SetMetadata();
  • FetchAttributes: Загружает метаданные и атрибуты.
  • Clear: Очищает очередь.
  • Create: Создает очередь. Если очередь уже существует, будет выброшено исключение.

Асинхронное создание очереди:

var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient();

var queue = queue.Client.GetQueueReference("messages");

if (!queue.Exists)){

queue.BeginCreate(EndCreateQueue, queue);

}

protected static void EndCreateQueue(IAsyncResult result)

{

var queue = result.AsyncState as CloudQueue;

queue.EndCreate(result);

}
  • CreateIfNotExists: Создает очередь в том случае, если ее не существует.
  • Delete: Удаляет очередь.
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient();

var queue = queue.Client.GetQueueReference("messages");

queue.Delete();
  • Exists: Проверяет существование очереди.
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient();

var queue = queue.Client.GetQueueReference("messages");

if (!queue.Exists())

{

queue.Create();

}
  • AddMessage: Добавляет сообщение в очередь.
  • DeleteMessage: Удаляет сообщение из очереди.
  • GetMessage: Возвращает следующее сообщение из очереди. В это время сообщение становится “невидимым” для других обработчиков.
  • GetMessages: Возвращает определенное количество сообщений из очереди.
  • PeekMessage: Возвращает сообщение из очереди, оставляя его видимым для других обработчиков (“подсматривает”).
  • PeekMessages: Возвращает определенное количество сообщений из очереди, оставляя их видимыми для других обработчиков.
  • UpdateMessage: Изменяет содержимое или время видимости сообщения.

var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient(); var queue = queue.Client.GetQueueReference("messages"); //создание объекта заранее определенного класса, помеченного как Serialize var msg = new Message {id = Guid.NewGuid().ToString(), name = "Ahriman"}; //создание объекта сообщения облачной очереди и передача в качестве аргумента созданного //сериализованного (с помощью какого-либо реализованного метода) объекта msg var queueMessage = new CloudQueueMessage(msg.SerializeToBinary()); //добавление сообщения в очередь. Первый аргумент - сообщение, второй - время жизни сообщения //(максимум - 7 дней, нельзя забывать про Garbage Collector), третий, необязательный аргумент - //время "невидимости" сообщения для обработчика после добавления сообщения в очередь queue.AddMessage(queueMessage,TimeSpan.FromMinutes(5));

 

//”подсматривание”, но не забор сообщения из очереди

var peekedMessage = queue.PeekMessage(); //получение сообщения из начала очереди var retrievedMessage = queue.GetMessage();

 

retrievedMessage.SetMessageContent("Some update.");

 

//сообщение в очереди обновляется и становится сразу же видимым для обработчиков

queue.UpdateMessage(retrievedMessage,TimeSpan.FromSeconds(0.0), MessageUpdateFields.Content | MessageUpdateFields.Visibility);

//печать значения свойства Id возвращенного объекта Message. System.Console.WriteLine(retrievedMessage.FromMessage<Message>().Id); //удаление сообщения из очереди queue.DeleteMessage(retrievedMessage);

RetrieveApproximateMessageCount:

Возвращает примерно количество сообщений в очереди. Примерное – потому, что сообщение могут быть добавлены или удалены после того, как вы отправите запрос на количество сообщений очереди.

var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient();

var queue = queue.Client.GetQueueReference("messages");

int msgCount = queue.RetrieveApproximateMessageCount();

int cachedMsgCount = queue.ApproximateMessageCount;

Опрос очереди на наличие нового сообщения в обработчике принято выполнять в бесконечном цикле.

while (true)
 {
     var msg = queue.GetMessage();
     if (msg != null)
          {
                 string retrievedMsg = msg.AsString;
                 //логика
                 query.DeleteMessage(msg);
           }
      else
       {
            Thread.Sleep(1000);
       }

Обратите внимание, что использование Thread.Sleep необосновано с экономической точки зрения, поэтому рекомендуется использовать Timer.

Усеченный экспоненциальный алгоритм отката

Одним из подходов к опросу очереди и облегчению нагрузки на очередь является использование усеченного экспоненциального алгоритма отката. Суть заключается в том, что каждый “пустой” опрос увеличивает интервал опроса вдвое, не пустой же опрос ставит интервал обратно в 1.

while (true)
 {
     var msg = queue.GetMessage();
     if (msg != null)
          {
                 string retrievedMsg = msg.AsString;
                 //логика
                 query.DeleteMessage(msg);

                 if (gradualDecrease)
                      if (curInterval > intervalFloor) curInterval = curInterval /2;
                      else curInterval = intervalFloor;
                 else curInterval = intervalFloor;   
           }
      else
       {
            if (curInterval <intervalCeiling) curInterval = curInterval * 2;
           
       }

Долгие очереди

Очередь, собирающая сообщения в период отключённого потребителя, называется долгой. В этом случае потребитель очереди может выходить в онлайн раз в день и забирать все сообщения, затем снова отключаться. Полезно для обмена сообщениями с внешними вендорами, а также для сокращения оплаты за время выполнения сервиса (в том случае, если потребитель является, например, Worker-ролью в Windows Azure).

Рекомендация

В качестве рекомендаций можно упомянуть разбиение сложного процесса на состояние, при этом каждое состояние является отдельным процессом-потребителем, опрашивающим определенную очередь.

image

Неоптимальная архитектура – один большой потребитель обрабатывает одну очередь. Является негибким подходом.

image

Архитектура, являющаяся более оптимальной с позиции гибкости – разделенная на различных потребителей и очередей, каждый из которых занимается собственной задачей.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: