Skip to content
April 4, 2012 / ahriman hpc mode

Windows Azure & Java. Что такое Service Bus и как ее использовать из Java-приложения. Часть 2. Топики.

Топики

По принципу работы топики похожи на очереди, однако их главной особенностью является множество конечных точек для обработчиков – они называются подписками (Subscriptions), и такая модель называется publish/subscribe messaging communication – каждый потребитель-обработчик “подписывается” на подписку. Каждая подписка имеет доступ ко всем добавленным сообщениям, при этом механизм фильтрации позволяет им определить, доступно ли сообщение через подписку или нет, и если доступно, то обработчик забирает сообщение из подписки аналогичным очередям образом. Таким образом, можно реализовать распространение одного и того же сообщения на несколько подписок. Механизм подписок использует принцип работы FIFO и представлен на рис.3.

[image]

Рис.3. Принцип работы подписок (Subscription) Service Bus

Service Bus Topics diagram

Топики и очереди позволяют разрабатывать более слабо связанные приложения, с временной независимостью (когда отправитель и обработчик сообщения не обязаны быть в он-лайн одновременно), балансировкой нагрузки.

Практика

Для того, чтобы начать работу с Service Bus в Windows Azure, необходимо создать собственно именованный сервис (пространство имен), которое будет предоставлять контейнер для Service Bus.

1) Зайдите на портал управления Windows Azure (http://windows.azure.com)

2) В левом нижнем углу портала управления нажмите Service Bus, Access Control & Caching, после чего нажмите на Service Bus и нажмите в левом верхнем углу портала управления кнопку New. (рис.4).

image_thumb[2]

Рис.4. Интерфейс портала управления Windows Azure, управление Service Bus

3) В появившемся диалоговом окне Create a new Service Namespace введите значение Namespace и нажмите Check Availability – в этот момент введенное значение будет проверено на уникальность в пределах платформы(рис.5).

image_thumb[3]

Рис 5.Интерфейс портала управления Windows Azure, создание контейнера Service Bus

4) Выберите соответствующие значения региона расположения вашего контейнера (лучше, если это будет тот же регион, в котором вы используете ваше приложение) и нажмите Create Namespace. Контейнер создан, дождитесь появления статуса Active (рис.6).

image_thumb[4]

Рис 6.Интерфейс портала управления Windows Azure, список контейнеров Service Bus

Для управления вашей Service Bus вам необходимо получить определенные данные. В панели слева нажмите на Service Bus и выберите созданный только что контейнер (рис.7).

image_thumb[5]

Рис 7.Интерфейс портала управления Windows Azure, список контейнеров Service Bus

В правой панели свойств вы увидите необходимые вам данные – в том числе Default Key. Нажмите на кнопку View рядом с Default Key для отображения ключа доступа.

Если вы хотите, то можете отсюда же создать топик и в дальнейшем использовать имя созданного топика в своем приложении. Для этого необходимо выбрать ваш контейнер и нажать кнопку New Topic. В появившемся диалоговом окне введите необходимые данные (например, время жизни сообщения) и нажмите Ok.

image_thumb[9]

Всё, на этом настройка Service Bus закончена. Можно переходить к приложению.

Практика. Настройка приложения.

Как и в прошлых статьях, приведу лишь общий кусок Java-кода, который готовы к использованию, с комментариями. Хочу обратить ваше внимание на использование новой функциональности (в прошлой части всё было гораздо проще). В данном коде используется дополнительный метод convertStreamToString(InputStream is), который преобразовывает поток в строку. Кроме этого, показано два метода добавления контента в сообщение – как напрямую (в конструкторе), так и с помощью метода setBody().

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.StringWriter;
import java.io.Writer;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;

import com.microsoft.windowsazure.services.serviceBus.*;
import com.microsoft.windowsazure.services.serviceBus.models.*;
import com.microsoft.windowsazure.services.core.*;

import javax.xml.datatype.*;
import javax.xml.datatype.DatatypeConstants.Field;

public class TestServiceBusRelay {

    // Все операции по управлению вашим контейнером Service Bus выполняются с
    // помощью класса ServiceBusContract, при создании которого конструктору
    // передаётся
    // конфигурация вашего контейнера - название контейнера, имя issuer и ключ.
    // Все эти данные можно получить из панели свойств Properties вашего
    // контейнера Service Bus на портале Windows Azure.
    // Далее можно воспользоваться функциональностью класса ServiceBusService,
    // предоставляющего управление очередями - создание, удаление и т.д. В
    // данном методе создаётся контейнер Service Bus с именем ahrimansb.

    private static ServiceBusContract createServiceBus(String issuer, String key) {

        Configuration config = ServiceBusConfiguration
                .configureWithWrapAuthentication("ahrimansb", issuer, key);
        ServiceBusContract service = ServiceBusService.create(config);
        return service;
    }

    // В данном методе используется функциональность класса TopicInfo, с помощью
    // которой в данном случае определяется
    // максимальный размер очереди в мегабайтах (указывается максимальный размер
    // в 5Гб). С помощью методов TopicInfo можно настраивать различные параметры
    // ваших очередей, в т.ч. Time To Live для сообщений, максимальный размер и
    // многое другое.

    private static void createTopic(String name, ServiceBusContract service) {

        long queueSize = 5120;

        TopicInfo topicInfo = new TopicInfo(name);
        try {
            topicInfo.setMaxSizeInMegabytes(queueSize);
            CreateTopicResult result = service.createTopic(topicInfo);

        } catch (ServiceException e) {
            System.out.print("ServiceException: " + e.getMessage());
        }

    }

    // В данном методе используется фильтр по умолчанию MatchAll (поэтому нет
    // никаких дополнительных указаний значения фильтра).
    // При использовании фильтра по умолчанию все сообщения, которые поступают в
    // топик, помещаются в подписку-очередь.

    private static void createSubscriptionWithFilterMatchAll(
            String subscriptionInfoName, String topicName,
            ServiceBusContract service) {

        try {
            SubscriptionInfo subInfo = new SubscriptionInfo(
                    subscriptionInfoName);
            CreateSubscriptionResult result = service.createSubscription(
                    topicName, subInfo);

        } catch (ServiceException e) {
            System.out.print("ServiceException: " + e.getMessage());
        }
    }

    // В данном методе создаётся ещё одна подписка с SQL-фильтром сообщений
    // (SqlFilter). При этом
    // в качестве условия используется сравнение некоторого custom-свойства
    // MessageSequenceId.
    // После создания двух подписок сообщения, поступающие в соответствующий
    // топик, буду уходить -
    // все в первую подписку, и только удовлетворяющие условиям фильтра - во
    // вторую, таким образом распределяясь по обработчикам. Естественно,
    // что гораздо оптимальнее создать несколько подписок с разными фильтрами и
    // распределять сообщения по определенному условию.

    private static void createSubscriptionWithFilter(
            String subscriptionInfoName, String topicName,
            ServiceBusContract service) {
        SubscriptionInfo subInfo = new SubscriptionInfo(subscriptionInfoName);

        try {
            CreateSubscriptionResult result = service.createSubscription(
                    topicName, subInfo);
        } catch (ServiceException e) {

            e.printStackTrace();
        }
        RuleInfo ruleInfo = new RuleInfo();
        ruleInfo = ruleInfo.withSqlExpressionFilter("MessageNumber > 4");
        try {
            CreateRuleResult ruleResult = service.createRule(topicName,
                    "subscriptioninfoname2", ruleInfo);
        } catch (ServiceException e) {
            e.printStackTrace();
        }

    }

    // Для отображения сообщения в объектной модели существует класс
    // BrokeredMessage, объекты которого содержат набор методов для управления
    // сообщением,
    // набор параметров и набор данных. В набор данных можно передать любой
    // сериализуемый объект. Очереди Service Bus имеют ограничение на размер
    // сообщения в 256 мегабайт(заголовок, содержащий свойства - 64 мегабайт),
    // однако ограничений как таковых на количество хранимых в очереди сообщений
    // нет, кроме задаваемого вами ограничения-максимального размера очереди. В
    // данном случае мы также указываем custom-свойство MessageSequenceId,
    // которое будет использоваться для фильтра

    private static void putMessageToTopic(String topicName,
            ServiceBusContract service, BrokeredMessage message) {

        try {
            message.setProperty("MessageNumber", "6");
            service.sendTopicMessage(topicName, message);
        } catch (ServiceException e) {
            System.out.print("ServiceException: " + e.getMessage());
        }

    }

    // Метод для добавления множества сообщений в топик.
    private static void putMessagesToTopic(String topicName,
            ServiceBusContract service, List<BrokeredMessage> messages) {

        try {
            for (BrokeredMessage message : messages) {

                service.sendTopicMessage(topicName, message);
            }
        } catch (ServiceException e) {
            e.printStackTrace();
        }
    }

    // Аналогично сервису очередей хранилища Windows Azure, вы можете
    // использовать два метода для извлечения сообщений из очередей топиков -
    // получение и удаление (ReceiveAndDelete) и "подсматривание" - получение
    // сообщение, но не удаление его из очереди топика (PeekLock). При
    // использовании
    // метода ReceiveAndDelete при получении запроса на извлечение
    // сообщения, очередь помечает это сообщение как "потреблённое".
    // При использовании метода PeekLock процесс получения дробится на два этапа
    // - когда Service Bus получает запрос на извлечение сообщения, он находит
    // это сообщение, помечает его как locked (в этот момент другие обработчики
    // перестают видеть это сообщение) и возвращает его приложению. После
    // окончания
    // обработки приложением сообщения закрывается второй этап процесса с
    // помощью вызова метода Delete полученного сообщения. После этого сообщение
    // помечается как удаленное.
    // Типичным паттерном опроса очереди на наличие новых сообщений является
    // использование бесконечного цикла while. В данном методе очередь
    // опрашивается постоянно. Если вы хотите
    // ограничить выполнение каким-либо количеством полученных сообщений, вам
    // необходимо реализовать логику с использованием break.

    private static void getMessageFromTopic(String topicName,
            String subscriptionName, ServiceBusContract service)
            throws ServiceException {

        ReceiveMessageOptions opts = ReceiveMessageOptions.DEFAULT;
        opts.setReceiveMode(ReceiveMode.PEEK_LOCK);
        while (true) {

            ReceiveSubscriptionMessageResult result = service
                    .receiveSubscriptionMessage(topicName, subscriptionName,
                            opts);
            BrokeredMessage message = result.getValue();
            if (message != null && message.getMessageId() != null) {
                try {
                    System.out
                            .println("Начало работы по опросу очереди подписки с именем:"
                                    + subscriptionName);

                    System.out.println("Сообщение: "
                            + convertStreamToString(message.getBody()));
                    System.out.println("ID сообщения: "
                            + message.getMessageId());
                    System.out
                            .println("Если вы задали какое-то свойство, его можно получить с помощью метода getProperty(): "
                                    + message.getProperty("MessageNumber"));
                    System.out.println("Сообщение прочитано - удалено.");
                    service.deleteMessage(message);
                } catch (Exception ex) {
                    // если было выброшено исключение, сообщение будет
                    // разблокировано для других обработчиков
                    System.out.println("Исключение!");
                    service.unlockMessage(message);
                }
            } else {
                System.out
                        .println("Больше нет сообщений, но топик продолжает опрашиваться.");
            }
        }

    }

    private static void deleteTopic(String topicName, ServiceBusContract service) {

        try {
            service.deleteTopic(topicName);
        } catch (ServiceException e) {
            e.printStackTrace();
        }

    }

    private static void deleteSubscription(String subscriptionName,
            String subscriptionInfoName, ServiceBusContract service) {

        try {
            service.deleteSubscription(subscriptionName, subscriptionInfoName);
        } catch (ServiceException e) {

            e.printStackTrace();
        }

    }

    public static void main(String args[]) throws FileNotFoundException {

        ServiceBusContract service = createServiceBus("owner",
                "WqmNgDFb1mgicPy7eHzx5sLklBS1Qwb8QjIujhFk8P4=");
        createTopic("mytopic", service);
        createSubscriptionWithFilterMatchAll("subscriptioninfoname1",
                "mytopic", service);
        createSubscriptionWithFilter("subscriptioninfoname2", "mytopic",
                service);
        InputStream input = new FileInputStream("c:\\1.txt");
        BrokeredMessage message = new BrokeredMessage("Our message.");
        message.setBody(input);
        ArrayList<BrokeredMessage> messages = new ArrayList<BrokeredMessage>();
        for (int i = 0; i < 5; i++) {

            BrokeredMessage msg = new BrokeredMessage("Message text: " + i);
            msg.setProperty("MessageNumber", i);
            messages.add(msg);

        }

        putMessageToTopic("mytopic", service, message);
        putMessagesToTopic("mytopic", service, messages);

        try {
            getMessageFromTopic("mytopic", "subscriptioninfoname1", service);
            getMessageFromTopic("mytopic", "mysubscriptionwithfilter", service);

        } catch (ServiceException e) {
            e.printStackTrace();
        }

    }

    public static String convertStreamToString(InputStream is)
            throws IOException {

        if (is != null) {
            Writer writer = new StringWriter();

            char[] buffer = new char[1024];
            try {
                Reader reader = new BufferedReader(new InputStreamReader(is,
                        "UTF-8"));
                int n;
                while ((n = reader.read(buffer)) != -1) {
                    writer.write(buffer, 0, n);
                }
            } finally {
                is.close();
            }
            return writer.toString();
        } else {
            return "";
        }
    }

}

Далее просто запустите ваше приложение. Для этого нажмите ALT+Shift+X либо нажмите соответствующую кнопку в меню в Eclipse (рис. 8).

image

Рис.8. Запуск простого Java-проекта.

В консоли вы должны увидеть результат.

Обратите внимание, что, если вы получаете исключение ниже, вам необходимо реализовать проверку на существование топика и подписки перед тем, как их создавать – это исключение значит, что такие сущности уже есть в контейнере.

04.04.2012 11:57:06 com.microsoft.windowsazure.services.serviceBus.implementation.ServiceBusExceptionProcessor processCatch
WARNING: com.sun.jersey.api.client.UniformInterfaceException: PUT https://ahrimansb.servicebus.windows.net/myqueue returned a response status of 409 Conflict
com.sun.jersey.api.client.UniformInterfaceException: PUT https://ahrimansb.servicebus.windows.net/myqueue returned a response status of 409 Conflict
    at com.sun.jersey.api.client.WebResource.handle(WebResource.java:676)
    at com.sun.jersey.api.client.WebResource.access$200(WebResource.java:74)
    at com.sun.jersey.api.client.WebResource$Builder.put(WebResource.java:533)
    at com.microsoft.windowsazure.services.serviceBus.implementation.ServiceBusRestProxy.createQueue(ServiceBusRestProxy.java:265)
    at com.microsoft.windowsazure.services.serviceBus.implementation.ServiceBusExceptionProcessor.createQueue(ServiceBusExceptionProcessor.java:188)
    at TestServiceBusRelay.createQueue(TestServiceBusRelay.java:46)
    at TestServiceBusRelay.main(TestServiceBusRelay.java:135)
ServiceException: com.sun.jersey.api.client.UniformInterfaceException: PUT https://ahrimansb.servicebus.windows.net/myqueue returned a response status of 409 Conflict
Response Body: <Error><Code>409</Code><Detail>Conflict.TrackingId:59992d2d-e47f-40a0-935e-a16ea910d5f2_2,TimeStamp:4/4/2012 4:57:08 AM</Detail></Error>Сообщение: com.microsoft.windowsazure.services.serviceBus.models.BrokeredMessage@878c4c

В этом случае вы можете также просто удалить топик или подписку с помощью портала управления Windows Azure.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: