栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 软件开发 > 后端开发 > Java

【pulsar学习】pulsar中java API的使用

Java 更新时间:发布时间: 百科书网 趣学号

文章目录
  • pulsar结构
  • 1 基于pulsar实现Topic的构建操作
    • 1.1 管理租户
    • 1.2 管理namespace
    • 1.3 管理topic
  • 2 基于Pulsar实现数据生产
    • 2.1 同步发送数据
    • 2.2 异步发送数据
    • 2.3 基于pulsar实现数据生产
  • 3 基于Pulsar实现数据消费
    • 3.1 同步方式消费数据
    • 3.2 schema方式消费数据
    • 3.3 批量处理消费数据
  • 总结

继上一篇:【pulsar学习】pulsar集群部署及可视化监控部署之后,本文介绍如何把pulsar用起来。pulsar的结构主要有租户、namespace、topic,主要涉及到的有数据生产和数据消费。本文主要介绍上述内容的java API接口。

pulsar结构


pulsar结构如上图所示:多租户,名称空间,topic名称。topic是基本的单元,其定位为persistent://tenant/namespace/topic

  • persistentor non-persistent代表该topic是否是持久化的。持久化topic是消费发布与消费的逻辑端点;非持久化topic应用在仅消费实时发布消息与不需要持久化保证的应用程序,它通过删除持久消息的开销来减少消息发布延迟。
  • tenant为租户名
  • namespace为命名空间
  • topic为topic名

pulsar相关原理将会在下一篇博客介绍(画个饼先)。

下面的代码需要如下添加依赖:

    
        
            org.apache.pulsar
            pulsar-client-all
            2.8.1
        
    
1 基于pulsar实现Topic的构建操作 1.1 管理租户
package www.whuhhh.cn;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.TenantInfo;

import java.util.HashSet;
import java.util.List;


// 使用java构建租户
public class CreateTenants {
    public static void main(String[] args) throws PulsarClientException, PulsarAdminException {
        // 创建pulsar的admin管理对象
        String serviceURL = "http://hadoop100:8080,hadoop102:8080,hadoop103:8080";
        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(serviceURL).build();
        // 创建租户

        HashSet clusters = new HashSet<>();
        clusters.add("pulsar-cluster");

        HashSet adminRoles = new HashSet<>();
        adminRoles.add("dev");
        TenantInfo config = TenantInfo.builder().allowedClusters(clusters).adminRoles(adminRoles).build();
        
        admin.tenants().createTenant("itcast_pulsar_t", config);
        
        // 3-获取有哪些租户
        System.out.println("获取有哪些租户");
        List tenants = admin.tenants().getTenants();
        for (String tenant : tenants) {
            System.out.println(tenant);
        }

        // 4-删除租户
        admin.tenants().deleteTenant("itcast_pulsar_t");

        // 3-获取有哪些租户
        System.out.println("删除后,获取有那些租户");
        tenants = admin.tenants().getTenants();
        for (String tenant : tenants) {
            System.out.println(tenant);
        }
        admin.close();
    }
}


1.2 管理namespace
package www.whuhhh.cn;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;

import java.util.List;


public class CreateNamespace {
    public static void main(String[] args) throws PulsarClientException, PulsarAdminException {
        // 创建pulsar的admin管理对象
        String serviceURL = "http://hadoop100:8080,hadoop102:8080,hadoop103:8080";
        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(serviceURL).build();

        // 创建名称空间
        admin.namespaces().createNamespace("itcast_pulsar_t/itcast_pulsar_2");
        // 获取所有的名称空间
        System.out.println("获取到当前有哪些名称空间:");
        List namespaces = admin.namespaces().getNamespaces("itcast_pulsar_t");
        for (String namespace : namespaces) {
            System.out.println(namespace);
        }
        // 删除名称空间
        admin.namespaces().deleteNamespace("itcast_pulsar_t/itcast_pulsar_n");
        // 关闭资源
        admin.close();
    }
}

1.3 管理topic
package www.whuhhh.cn;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;

import java.util.List;


public class CreateTopic {
    public static void main(String[] args) throws PulsarClientException, PulsarAdminException {
        // 1 创建pulsar的admin管理对象
        String serviceURL = "http://hadoop100:8080,hadoop102:8080,hadoop103:8080";
        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(serviceURL).build();

        // 2 创建Topic
        // 2.1 创建一个持久化的带分区的Topic
        admin.topics().createPartitionedTopic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1",3);
        // 2.2 创建一个非持久化带分区的Topic
        admin.topics().createPartitionedTopic("non-persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic2",3);
        // 2.3 创建一个持久化的不带分区的Topic
        admin.topics().createNonPartitionedTopic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic3");
        // 2.4 创建一个非持久化不分区的Topic
        admin.topics().createNonPartitionedTopic("non-persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic4");

        // 3 列出某个名称空间下,所有的topic
        // 3.1 无分区的Topic
        System.out.println("无分区的Topic:");
        List topics = admin.topics().getList("itcast_pulsar_t/itcast_pulsar_n");
        for (String topic : topics) {
            System.out.println(topic);
        }
        // 3.2 有分区的Topic
        System.out.println("有分区的Topic:");
        topics = admin.topics().getPartitionedTopicList("itcast_pulsar_t/itcast_pulsar_n");
        for (String topic : topics) {
            System.out.println(topic);
        }

        // 4 更新Topic:增加分区数
        admin.topics().updatePartitionedTopic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1",5);
        int partitions = admin.topics().getPartitionedTopicMetadata("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1").partitions;
        System.out.println("topic的分区数为:" + partitions);

        // 5 删除Topic
        // 5.1 删除没有分区的
        admin.topics().delete("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic3");
        admin.topics().delete("non-persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic4");
        // 5.2 删除有分区的
        admin.topics().deletePartitionedTopic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1");
        admin.topics().deletePartitionedTopic("non-persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic2");
        
        admin.close();
    }
}


带分区的不会被删除,没带分区的会被删除?

注意:
不管是有分区还是没有分区, 创建topic后,如果没有任何操作, 60s后pulsar会认为此topic是不活动的,会自动进行删除, 以避免生成垃圾数据。
相关配置:

  • Brokerdeleteinactivetopicsenabenabled : 默认值为true 表示是否启动自动删除
  • BrokerDeleteInactiveTopicsFrequencySeconds: 默认为60s 表示检测未活动的时间

但是我自己在玩的时候发现有分区的topic一直没有被删除,也没有搜到相关的资料。

2 基于Pulsar实现数据生产 2.1 同步发送数据
package www.whuhhh.cn;

import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import java.nio.charset.StandardCharsets;


public class PulsarProducerSyncTest {
    public static void main(String[] args) throws PulsarClientException, InterruptedException {
        // 获取pulsar的客户端对象
        ClientBuilder clientBuilder = PulsarClient.builder();
        clientBuilder.serviceUrl("pulsar://hadoop100:6650,hadoop102:6650,hadoop103:6650");
        PulsarClient client = clientBuilder.build();
        // 通过客户端创建生产者的对象
        Producer producer = client.newProducer().topic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1").create();
        // 发送消息
        int i = 0;
        while (i < 200) {
            producer.send(( "你好 Pulsar..." + i ).getBytes(StandardCharsets.UTF_8));
            Thread.sleep(10);
            i++;
        }
        // 释放资源
        producer.close();
        client.close();
    }
}

2.2 异步发送数据
package www.whuhhh.cn;

import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;

import java.nio.charset.StandardCharsets;


public class PulsarProducerAsyncTest {
    public static void main(String[] args) throws PulsarClientException, InterruptedException {
        // 获取pulsar的客户端对象
        ClientBuilder clientBuilder = PulsarClient.builder();
        clientBuilder.serviceUrl("pulsar://hadoop100:6650,hadoop102:6650,hadoop103:6650");
        PulsarClient client = clientBuilder.build();
        // 通过客户端创建生产者对象
        Producer producer = client.newProducer().topic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1").create();
        // 发送消息
        int i = 0;
        while (i < 200) {
            producer.sendAsync(( "异步发送 Pulsar..." + i ).getBytes(StandardCharsets.UTF_8));
            //Thread.sleep(10);
            i++;
        }
        // 如果采用异步发送数据, 由于需要先放置在缓存区中, 如果立即关闭, 会导致无法发送
        Thread.sleep(1000);
        producer.close();
        client.close();
    }
}

异步的代码与同步的就发送那里不一样,另外也要设置thread.sleep的时间

2.3 基于pulsar实现数据生产
package www.whuhhh.cn;

import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;


public class PulsarProducerSchemaTest {
    public static void main(String[] args) throws PulsarClientException, InterruptedException {
        // 获取pulsar的客户端对象
        ClientBuilder clientBuilder = PulsarClient.builder();
        clientBuilder.serviceUrl("pulsar://hadoop100:6650,hadoop102:6650,hadoop103:6650");
        PulsarClient client = clientBuilder.build();
        // 通过客户端创建生产者对象
        AvroSchema schema = AvroSchema.of(SchemaDefinition.builder().withPojo(User.class).build());
        Producer producer = client.newProducer(schema).topic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1").create();
        // 发送消息
        int i = 0;
        while (i < 200) {
            User user = new User();
            user.setName("张三");
            user.setAge(i);
            producer.send(user);
            i++;
        }
        Thread.sleep(10000);
        producer.close();
        client.close();
    }
}

其中,user类为

package www.whuhhh.cn;

import java.io.Serializable;


public class User implements Serializable {

    private String name;
    private Integer age;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + ''' +
                ", age=" + age +
                '}';
    }
}

再用本文3.2中代码进行消费(先运行消费代码,再运行生产者代码,可以看到打印的数据)

可以看到:message.getValue() 获取到的就是user对象,然后如果直接输出就是user类中toString的结果。

3 基于Pulsar实现数据消费 3.1 同步方式消费数据
package www.whuhhh.cn;

import org.apache.pulsar.client.api.*;


public class PulsarConsumerTest {
    public static void main(String[] args) throws PulsarClientException {
        ClientBuilder clientBuilder = PulsarClient.builder();
        clientBuilder.serviceUrl("pulsar://hadoop100:6650,hadoop102:6650,hadoop103:6650");
        PulsarClient client = clientBuilder.build();

        Consumer consumer = client.newConsumer().topic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1").subscriptionName("my_subscription").subscribe();

        // 循环获取读取
        while (true) {
            Message message = consumer.receive();
            try {
                System.out.println("消息为: " + new String(message.getData()));
                consumer.acknowledge(message);
            } catch (Exception e) {
                consumer.negativeAcknowledge(message);
            }
        }
    }
}
3.2 schema方式消费数据
package www.whuhhh.cn;

import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;


public class PulsarConsumerSchemaTest {
    public static void main(String[] args) throws PulsarClientException {
        // 1、获取pulsar的客户端对象
        ClientBuilder clientBuilder = PulsarClient.builder();
        clientBuilder.serviceUrl("pulsar://hadoop100:6650,hadoop102:6650,hadoop103:6650");
        PulsarClient client = clientBuilder.build();
        // 2、通过客户端创建消费者的对象
        Consumer consumer = client.newConsumer(AvroSchema.of(SchemaDefinition.builder().withPojo(User.class).build()))
                .topic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1")
                .subscriptionName("my-subscription")
                .subscribe();
        // 3、循环获取读取
        while (true) {
            Message message = consumer.receive();
            try {
                System.out.println("消息为: " + message.getValue() + ", age: " + message.getValue().getAge());
                consumer.acknowledge(message);
            } catch (Exception e) {
                consumer.negativeAcknowledge(message);
            }
        }
    }
}

这里获取user数据的方法如2.3所示

3.3 批量处理消费数据
package www.whuhhh.cn;

import org.apache.pulsar.client.api.*;

import java.util.concurrent.TimeUnit;


public class PulsarConsumerBatchTest {
    public static void main(String[] args) throws PulsarClientException {
        // 1、获取pulsar的客户端对象
        ClientBuilder clientBuilder = PulsarClient.builder();
        clientBuilder.serviceUrl("pulsar://hadoop100:6650,hadoop102:6650,hadoop103:6650");
        PulsarClient client = clientBuilder.build();
        // 2、通过客户端创建消费者的对象
        Consumer consumer = client.newConsumer()
                .topic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1")
                .subscriptionName("my-subscription")
                .batchReceivePolicy(BatchReceivePolicy.builder()
                        // 设置一次性最大获取多少条消息,默认值为-1
                        .maxNumMessages(100)
                        // 设置每条数据允许的最大的字节大小,默认为10 * 1024 * 1024
                        .maxNumBytes(1024 * 1024)
                        // 设置等待超时时间,默认为100
                        .timeout(200, TimeUnit.MILLISECONDS)
                        .build())
                .subscribe();
        // 3、循环读取获取
        while (true) {
            Messages messages = consumer.batchReceive(); // 批量读取数据
            for (Message message : messages) {
                try{
                    System.out.println("批量消息为: " + new String(message.getData()));
                    consumer.acknowledge(message);
                } catch (Exception e) {
                    consumer.negativeAcknowledge(message);
                }
            }

        }
    }
}

批量的方法注意参数的使用。

总结

走得慢是为了走的更好,加油!

转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/957445.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号