当前位置:   article > 正文

【持续更新】Java高级工程师面试常见问题汇总_java高级 面试

java高级 面试

【持续更新】面试问题汇总

spring 的事务隔离级别

READ UNCOMMITTED(读未提交数据):允许事务读取未被其他事务提交的变更数据,会出现脏读、不可重复读和幻读问题。

READ COMMITTED(读已提交数据):只允许事务读取已经被其他事务提交的变更数据,可避免脏读,仍会出现不可重复读和幻读问题。

REPEATABLE
READ(可重复读):确保事务可以多次从一个字段中读取相同的值,在此事务持续期间,禁止其他事务对此字段的更新,可以避免脏读和不可重复读,仍会出现幻读问题。

SERIALIZABLE(序列化):确保事务可以从一个表中读取相同的行,在这个事务持续期间,禁止其他事务对该表执行插入、更新和删除操作,可避免所有并发问题,但性能非常低。

private protected public

1、private修饰词,表示成员是私有的,只有自身可以访问;

2、protected,表示受保护权限,体现在继承,即子类可以访问父类受保护成员,同时相同包内的其他类也可以访问protected成员。

3、无修饰词(默认),表示包访问权限(friendly, java语言中是没有friendly这个修饰符的,这样称呼应该是来源于c++
),同一个包内可以访问,访问权限是包级访问权限;

4、public修饰词,表示成员是公开的,所有其他类都可以访问;

2、面试题:接口与抽象类的区别

相同点

(1) 都不能被实例化 (2)接口的实现类或抽象类的子类都只有实现了接口或抽象类中的方法后才能实例化。

不同点

(1)接口只有定义,不能有方法的实现,java 1.8中可以定义default方法体,而抽象类可以有定义与实现,方法可在抽象类中实现。

(2)实现接口的关键字为 implements,继承抽象类的关键字为extends
。一个类可以实现多个接口,但一个类只能继承一个抽象类。所以,使用接口可以间接地实现多重继承。

(3)接口强调特定功能的实现,而抽象类强调所属关系。

(4)接口成员变量默认为public static
final,必须赋初值,不能被修改;其所有的成员方法都是public、abstract的。抽象类中成员变量默认default,可在子类中被重新定义,也可被重新赋值;抽象方法被abstract修饰,不能被private、static、synchronized和native等修饰,必须以分号结尾,不带花括号。

rabbitmq怎么消费

一、推送Consume

前面我们使用到的都是这种模式,注册一个消费者后,RabbitMQ会在消息可用时,自动将消息进行推送给消费者。这种方式效率最高最及时。 核心代码如下:

// 接收消息,第二个参数表示是否自动应答 channel.basicConsume(queueName, true, new
DefaultConsumer(channel) { @Override public void handleDelivery(String
consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException { System.out.println(envelope.getRoutingKey() + " 接收到数据:"

  • new String(body)); } });

二、拉取Get

属于一种轮询模型,发送一次get请求,获得一个消息。如果此时RabbitMQ中没有消息,会获得一个表示空的回复。总的来说,这种方式性能比较差,很明显,每获得一条消息,都要和RabbitMQ进行网络通信发出请求。而且对RabbitMQ来说,RabbitMQ无法进行任何优化,因为它永远不知道应用程序何时会发出请求。
核心代码如下:

while(true){ //如果没有消息,将返回null GetResponse getResponse =
channel.basicGet(queueName, true); if(null!=getResponse){
System.out.println(“received[”+getResponse.getEnvelope().getRoutingKey()+"]"+new
String(getResponse.getBody())); } Thread.sleep(1000); }

三、自动确认

方法channel.basicConsume和方法channel.basicGet表示同步或异步获取消息,第二个参数都表示是否自动确认。前面我们都设置为了true。这个时候我们只需要处理逻辑,将自动向RabbitMQ进行确认。
当autoAck=true时,一旦消费者接收到了消息,就视为自动确认了消息。如果消费者在处理消息的过程中,出了错,就没有什么办法重新处理这条消息,所以我们很多时候,需要在消息处理成功后
,再确认消息,这就需要手动确认。

四、手动确认

// 接收消息手动确认,第二个参数表示是否自动应答 channel.basicConsume(queueName, false, new
DefaultConsumer(channel) { @Override public void handleDelivery(String
consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException { System.out.println(envelope.getRoutingKey() + " 接收到数据:"

  • new String(body)); //手动确认,第一个参数是消息标识,第二个参数表示是否批量确认。这里是一条一条确认,所以设置false
    channel.basicAck(envelope.getDeliveryTag(), false); } });

五、QoS预取模式

在确认消息被接收之前,消费者可以预先要求接收一定数量的消息,在处理完一定数量的消息后,批量进行确认。如果消费者应用程序在确认消息之前崩溃,则所有未确认的消息将被重新发送给其他消费者。所以这里存在着一定程度上的可靠性风险。
这种机制一方面可以实现限速(将消息暂存到RabbitMQ内存中)的作用,一方面可以保证消息确认质量(比如确认了但是处理有异常的情况) 核心代码:

//参数1表示限制条数,参数2 true=channel,false=消费者 channel.basicQos(100, true); //
接收消息手动确认,第二个参数表示是否自动应答 channel.basicConsume(queueName, false, new
DefaultConsumer(channel) { @Override public void handleDelivery(String
consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException { System.out.println(envelope.getRoutingKey() + " 接收到数据:"

  • new String(body)); //手动确认,第一个参数是消息标识,第二个参数表示是否批量确认。这里是一条一条确认,所以设置false
    channel.basicAck(envelope.getDeliveryTag(), false); } });

注意:

1.消费确认模式必须是非自动ACK机制(这个是使用baseQos的前提条件,否则会Qos不生效),然后设置basicQos的值;另外,还可以基于consume和channel的粒度进行设置(global)

kafka如何消费消息

总而言之,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助。

Kafka一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。对于上面的例子,假如我们新增了一个新的消费组G2,而这个消费组有两个消费者,那么会是这样的:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fFWmZFQ2-1681010345905)(1_files/Image.png)]

在这个场景中,消费组G1和消费组G2都能收到T1主题的全量消息,在逻辑意义上来说它们属于不同的应用。

最后,总结起来就是:如果应用需要读取全量消息,那么请为该应用设置一个消费组;如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者。

消费组与分区重平衡

可以看到,当新的消费者加入消费组,它会消费一个或多个分区,而这些分区之前是由其他消费者负责的;另外,当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为重平衡(rebalance)。重平衡是Kafka一个很重要的性质,这个性质保证了高可用和水平扩展。不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。后面我们会讨论如何安全的进行重平衡以及如何尽可能避免。

消费者通过定期发送心跳(hearbeat)到一个作为组协调者(group
coordinator)的broker来保持在消费组内存活。这个broker不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳。

如果消费者超过一定时间没有发送心跳,那么它的会话(session)就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡。可以看到,从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能进行消息消费;通常情况下,我们可以进行优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期。

在0.10.1版本,Kafka对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。另外更高版本的Kafka支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费。

创建Kafka消费者

读取Kafka消息只需要创建一个kafkaConsumer,创建过程与KafkaProducer非常相像。我们需要使用四个基本属性,bootstrap.servers、key.deserializer、value.deserializer和group.id。其中,bootstrap.servers与创建KafkaProducer的含义一样;key.deserializer和value.deserializer是用来做反序列化的,也就是将字节数组转换成对象;group.id不是严格必须的,但通常都会指定,这个参数是消费者的消费组。

Properties props = new Properties(); props.put(“bootstrap.servers”,
“broker1:9092,broker2:9092”); props.put(“group.id”, “CountryCounter”);
props.put(“key.deserializer”,
“org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”,
“org.apache.kafka.common.serialization.StringDeserializer”);
KafkaConsumer<String, String> consumer = new
KafkaConsumer<String,String>(props);

订阅主题

创建完消费者后我们便可以订阅主题了,只需要通过调用subscribe()方法即可,这个方法接收一个主题列表,非常简单:

nsumer.subscribe(Collections.singletonList(“customerCountries”));

这个例子中只订阅了一个customerCountries主题。另外,我们也可以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配的新主题,那么这个消费组会立即对其进行消费。正则表达式在连接Kafka与其他系统时非常有用。比如订阅所有的测试主题:

consumer.subscribe(“test.*”);

拉取循环

消费数据的API和处理方式很简单,我们只需要循环不断拉取消息即可。Kafka对外暴露了一个非常简洁的poll方法,其内部实现了协作、分区重平衡、心跳、数据拉取等功能,但使用时这些细节都被隐藏了,我们也不需要关注这些。下面是一个代码样例:

try { while (true) { //1) ConsumerRecords<String, String> records =
consumer.poll(100); //2) for (ConsumerRecord<String, String> record : records)
//3) { log.debug(“topic = %s, partition = %s, offset = %d, customer = %s,
country = %sn”, record.topic(), record.partition(), record.offset(),
record.key(), record.value()); int updatedCount = 1; if
(custCountryMap.countainsValue(record.value())) { updatedCount =
custCountryMap.get(record.value()) + 1; } custCountryMap.put(record.value(),
updatedCount) JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4)) } } } finally { consumer.close(); //4 }

其中,代码中标注了几点,说明如下:

  • 1)这个例子使用无限循环消费并处理数据,这也是使用Kafka最多的一个场景,后面我们会讨论如何更好的退出循环并关闭。

  • 2)这是上面代码中最核心的一行代码。我们不断调用poll拉取数据,如果停止拉取,那么Kafka会认为此消费者已经死亡并进行重平衡。参数值是一个超时时间,指明线程如果没有数据时等待多长时间,0表示不等待立即返回。

  • 3)poll()方法返回记录的列表,每条记录包含key/value以及主题、分区、位移信息。

  • 4)主动关闭可以使得Kafka立即进行重平衡而不需要等待会话过期。

另外需要提醒的是,消费者对象不是线程安全的,也就是不能够多个线程同时使用一个消费者对象;而且也不能够一个线程有多个消费者对象。简而言之,一个线程一个消费者,如果需要多个消费者那么请使用多线程来进行一一对应。

我们已经知道当消费组存在多个消费者时,主题的分区需要按照一定策略分配给消费者。这个策略由PartitionAssignor类决定,默认有两种策略:

  • 范围(Range):对于每个主题,每个消费者负责一定的连续范围分区。假如消费者C1和消费者C2订阅了两个主题,这两个主题都有3个分区,那么使用这个策略会导致消费者C1负责每个主题的分区0和分区1(下标基于0开始),消费者C2负责分区2。可以看到,如果消费者数量不能整除分区数,那么第一个消费者会多出几个分区(由主题数决定)。

  • 轮询(RoundRobin):对于所有订阅的主题分区,按顺序一一的分配给消费者。用上面的例子来说,消费者C1负责第一个主题的分区0、分区2,以及第二个主题的分区1;其他分区则由消费者C2负责。可以看到,这种策略更加均衡,所有消费者之间的分区数的差值最多为1。

提交(commit)与位移(offset)

当我们调用poll()时,该方法会返回我们没有消费的消息。当消息从broker返回消费者时,broker并不跟踪这些消息是否被消费者接收到;Kafka让消费者自身来管理消费的位移,并向消费者提供更新位移的接口,这种更新位移方式称为提交(commit)。

在正常情况下,消费者会发送分区的提交信息到Kafka,Kafka进行记录。当消费者宕机或者新消费者加入时,Kafka会进行重平衡,这会导致消费者负责之前并不属于它的分区。重平衡完成后,消费者会重新获取分区的位移,下面来看下两种有意思的情况。

假如一个消费者在重平衡前后都负责某个分区,如果提交位移比之前实际处理的消息位移要小,那么会导致消息重复消费,如下所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-akuAvg0P-1681010345906)(1_files/Image [1].png)]

假如在重平衡前某个消费者拉取分区消息,在进行消息处理前提交了位移,但还没完成处理宕机了,然后Kafka进行重平衡,新的消费者负责此分区并读取提交位移,此时会“丢失”消息,如下所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LYLwZcB3-1681010345907)(1_files/Image [2].png)]

因此,提交位移的方式会对应用有比较大的影响,下面来看下不同的提交方式。

自动提交

这种方式让消费者来管理位移,应用本身不需要显式操作。当我们将enable.auto.commit设置为true,那么消费者会在poll方法调用后每隔5秒(由auto.commit.interval.ms指定)提交一次位移。和很多其他操作一样,自动提交也是由poll()方法来驱动的;在调用poll()时,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移。

需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者poll消息后,应用正在处理消息,在3秒后Kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。

提交当前位移

为了减少消息重复消费或者避免消息丢失,很多应用选择自己主动提交位移。设置auto.commit.offset为false,那么应用需要自己通过调用commitSync()来主动提交位移,该方法会提交poll返回的最后位移。

为了避免消息丢失,我们应当在完成业务逻辑后才提交位移。而如果在处理消息时发生了重平衡,那么只有当前poll的消息会重复消费。下面是一个自动提交的代码样例:

while (true) { ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf(“topic = %s, partition = %s, offset = %d, customer = %s,
country = %sn”, record.topic(), record.partition(), record.offset(),
record.key(), record.value()); } try { consumer.commitSync(); } catch
(CommitFailedException e) { log.error(“commit failed”, e) } }

上面代码poll消息,并进行简单的打印(在实际中有更多的处理),最后完成处理后进行了位移提交。

异步提交

手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。以下为使用异步提交的方式,应用发了一个提交请求然后立即返回:

while (true) { ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf(“topic = %s, partition = %s, offset = %d, customer = %s,
country = %sn”, record.topic(), record.partition(), record.offset(),
record.key(), record.value()); } consumer.commitAsync(); }

但是异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

因此,基于这种性质,一般情况下对于异步提交,我们可能会通过回调的方式记录提交结果:

while (true) { ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf(“topic = %s, partition = %s, offset = %d, customer = %s,
country = %sn”, record.topic(), record.partition(), record.offset(),
record.key(), record.value()); } consumer.commitAsync(new
OffsetCommitCallback() { public void onComplete(Map<TopicPartition,
OffsetAndMetadata> offsets, Exception exception) { if (e != null)
log.error(“Commit failed for offsets {}”, offsets, e); } }); }

Kafka消费数据的方式主要包含如下几种:

1、指定多主题消费

consumer.subscribe(Arrays.asList(“t4”,“t5”));

2、指定分区消费

consumer.assign(list);

3、手动修改偏移量

consumer.commitSync(); //提交当前消费偏移量

consumer.commitSync(Map<TopicPartition, OffsetAndMetadata>) //提交指定偏移量

consumer.assign(Arrays.asList(tp));

4、seek,修改偏移量搜索指针,顺序读取数据

consumer.assign(Arrays.asList(tp));

consumer.seek(tp,0);

Properties props = new Properties(); props.put(“bootstrap.servers”,
“localhost:9092”); props.put(“group.id”, “test”); props.put(“client.id”,
“test”); props.put(“fetch.max.bytes”, 1024);// 为了便于测试,这里设置一次fetch
请求取得的数据最大值为1KB,默认是5MB props.put(“enable.auto.commit”, false);// 设置手动提交偏移量
props.put(“key.deserializer”,“org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”,“org.apache.kafka.common.serialization.StringDeserializer”);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅主题
consumer.subscribe(Arrays.asList(“test”)); try { int minCommitSize = 10;//
最少处理10 条消息后才进行提交 int icount = 0 ;// 消息计算器 while (true) { // 等待拉取消息
ConsumerRecords<String, String> records = consumer.poll(1000); for
(ConsumerRecord<String, String> record : records) { // 简单打印出消息内容,模拟业务处理
System.out.printf(“partition = %d, offset = %d,key= %s value = %s%n”, record.
partition(), record.offset(), record.key(),record.value()); icount++; } //
在业务逻辑处理成功后提交偏移量 if (icount >= minCommitSize){ consumer.commitAsync(new
OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition,
OffsetAndMetadata> offsets, Exception exception) { if (null == exception) { //
TODO 表示偏移量成功提交 System.out.println(“提交成功”); } else { // TODO
表示提交偏移量发生了异常,根据业务进行相关处理 System.out.println(“发生了异常”); } } }); icount=0; //
重置计数器 } } } catch(Exception e){ // TODO 异常处理 e.printStackTrace(); } finally {
consumer.close(); }

cPartition,
OffsetAndMetadata> offsets, Exception exception) { if (null == exception) { //
TODO 表示偏移量成功提交 System.out.println(“提交成功”); } else { // TODO
表示提交偏移量发生了异常,根据业务进行相关处理 System.out.println(“发生了异常”); } } }); icount=0; //
重置计数器 } } } catch(Exception e){ // TODO 异常处理 e.printStackTrace(); } finally {
consumer.close(); }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/682036
推荐阅读
相关标签
  

闽ICP备14008679号