La Red de Conocimientos Pedagógicos - Currículum vitae - ¿Cómo obtener los datos más recientes después de que el consumidor de Kafka se vuelva a conectar?

¿Cómo obtener los datos más recientes después de que el consumidor de Kafka se vuelva a conectar?

Pero hay algunas advertencias a tener en cuenta. Para múltiples particiones y múltiples consumidores,

1. Si hay más consumidores que particiones, es un desperdicio. Debido a que Kafka está diseñado para particionarse y no permite la concurrencia, la cantidad de consumidores no puede ser mayor que la cantidad de particiones.

2. Si el número de consumidores es menor que el número de particiones, un consumidor corresponderá a varias particiones. Lo principal aquí es asignar razonablemente la cantidad de consumidores y la cantidad de particiones; de lo contrario, los datos de las particiones se utilizarán de manera desequilibrada.

El número óptimo de particiones es un múltiplo entero del número de usuarios, por lo que el número de particiones es muy importante. Por ejemplo, si toma 24, es fácil establecer el número de consumidores.

3. Si el usuario lee datos de varias particiones, no hay garantía de que los datos estén en orden. Kafka solo garantiza que los datos estén en orden en una partición, pero varias particiones serán diferentes según el orden de lectura.

4. Agregar o reducir consumidores, agentes y particiones provocará un reequilibrio, por lo que las particiones correspondientes a los consumidores cambiarán después del reequilibrio.

5. Cuando los datos no estén disponibles, la interfaz de alto nivel se bloqueará.

Versión simple,

Error simple. Si el proceso de prueba consiste en generar algunos datos primero y luego leerlos con el consumidor, recuerde agregar la primera oración de configuración.

Debido a que el desplazamiento inicial es ilegal de forma predeterminada, y esta configuración significa cómo corregir el desplazamiento cuando el desplazamiento es ilegal. El valor predeterminado es el más grande, que es el más reciente, por lo que no existe tal configuración. No podrá leer los datos que generó antes. Es inútil agregar una configuración mínima en este momento, porque la compensación es legal en este momento y no se corregirá, por lo que debe restablecer la compensación manualmente o con una herramienta.

Properties props = new Properties();

props.put("auto.offset.reset", "minimum"); //Debe agregarse si se van a eliminar datos antiguos. leer palabras.

props.put("zookeeper.connect", "localhost:2181");

props.put("group.id", "PV");

props.put("zookeeper.session.timeout.ms","400");

props.put("zookeeper.sync.time.ms","200");

props . put(" auto . commit . intervalo . ms ", " 1000 ");

configuración del consumidor = nueva configuración del consumidor(props>consumidor); consumidor del conector = consumidor de Kafka. createjavaconsumeconnector(conf);

tema de cadena = "página _ visitas

Mapa ltString, entero gttopicCountMap = nuevo HashMap ltString, entero gt();

topicCountMap.put(topic, new Integer(1));

Mapa lt cadena, lista ltKafkaStream ltbyte[], byte[] gt; gt gtconsumer map = consumidor. streams(topicCountMap);

Lista ltKafkaStream ltbyte[], byte[] gt; gtstreams = consumidor map.get(topic);

KafkaStream ltbyte[], byte[] gt; flujo = flujos . get(0);

Consumo anormal ltbyte[], byte[] gt; it = flujo

mientras (it.hasNext ()) {

sistema . println(" mensaje: " nueva cadena(it . next(). mensaje())));

}

if (consumidor!= null) consumidor.shutdown(). // En realidad no se puede ejecutar porque el hasNext anterior se bloqueará.

Dos herramientas poderosas cuando se utilizan consumidores avanzados,

1.bin/Kafka-run-class .sh Kafka .PV del grupo de verificación de compensación del consumidor

p><. p>Puede ver el estado actual del desplazamiento del grupo, como el estado de pv aquí, 3 particiones.

Propietario del retraso del tamaño del registro de compensación de Pid del tema del grupo

PV page_visits 0 21 21 0 Ninguno

PV page_visits 1 19 19 0 Ninguno

pv page_visits 2 20 20 0 Ninguno

La clave es desplazamiento, tamaño de registro y retraso

Lo he leído aquí antes, así que offset=logSize, Lag=0.

2.bin/Kafka-run-class .sh Kafka .herramientas updateoffsetsinzk página de propiedades del consumidor/configuración más temprana _visitas

Tres parámetros,

[ más temprano|más reciente], indicando dónde colocar el desplazamiento.

Aquí está la ruta al archivo de configuración.

Tema, nombre del tema, aquí está page_visits.

Después de hacer esto para el grupo pv anterior, pasaremos al estado de compensación del grupo de verificación y los resultados serán los siguientes.

Propietario del retraso del tamaño del registro de compensación de Pid del tema del grupo

PV page_visits 0 0 21 21 Ninguno

PV page_visits 1 0 19 19 Ninguno

pv page_visits 2 0 20 20 Ninguno

Puedes ver que el desplazamiento se ha borrado a 0, Lag=logSize.

A continuación se proporciona el código completo del consumidor de subprocesos múltiples en el artículo original.

Importar consumidor Kafka.

Importar consumidor Kafka.

Importar consumidor Kafka.

Importar Java.util.hashmap;

Importar Java.util.list;

Importar Java.util.map;

Importar Java.util. properties;

Importar Java.util.concurrent.executorservice;

Importar Java.util.concurrent.executors;

Ejemplo de grupo de consumidores de clase pública {

p>

Consumidor final privado Consumidor del conector;

Asunto de cadena final privado;

Ejecutor privado;

ejemplo de grupo de consumidores público (cadena a_zookeeper, cadena a_groupId, String a_topic) {

Consumidor = Kafka. consumidor. consumidor. createjavaconsumeconnector(//Cree un conector y preste atención a la siguiente configuración de conf.

createConsumerConfig(a_zookeeper, a_groupId));

this.topic = a_topic

p >

}

Apagado por anulación pública() {

if (consumidor! = nulo) consumidor.shutdown().

if (ejecutor!= null)ejecutor . cerrar();

}

public void run(int a _ num threads){/ /Crear consumidores concurrentes.

Map ltString, Integer gttopicCountMap = new HashMap ltString, Integer gt();

topicCountMap.put(topic, new Integer(a _ numThreads) //Descripción a leer; tema y cuántos hilos se requieren para leerlo.

Mapa lt cadena, lista ltKafkaStream ltbyte[], byte[] gt; gt gtconsumer map = consumer .crea flujos de mensajes(topicCountMap); //Crear flujo

Lista ltKafkaStream ltbyte [ ], byte[] gt; gtstreams = mapa del consumidor. get(topic); // Cada hilo corresponde a un KafkaStream.

//Iniciar todos los hilos ahora

//

executor = executors . newfixedthreadpool(a _ numThreads);

//Ahora Crear un objeto para consumir mensajes

//

int número de hilo = 0

for (flujo final de KafkaStream: flujos) {

executor .submit(new consumer test(stream, thread number));// Iniciar el hilo del consumidor.

número de hilo;

}

}

Configuración de consumidor estática privada createConsumerConfig(String a _ zookeeper, String a_groupId) {

p>

Propiedades props = new Propiedades();

props.put("zookeeper.connect ", a _ zookeeper);

props.put ("group.id", a_groupId);

props .put("zookeeper. sesión. tiempo de espera. ms", "400");

props("zookeeper". sync .time.ms","200");

props.put("auto.commit.interval.ms","1000");

Devuelve nuevo ConsumerConfig(. props) ;

}

Public static void main(String[] args) {

string zooKeeper = args[0];

string groupId = args[1];

string topic = args[2];

int threads = integer . parse int(args[3]);

Ejemplo de ConsumerGroupExample = new ConsumerGroupExample(zooKeeper, groupId, topic);

ejemplo.run(thread);

prueba {

thread.sleep(10000); );

} catch(excepción interrumpida, es decir){

}

ejemplo apagado(); >

}

Consumidor simple

El otro es SimpleConsumer. Lleva su nombre y se cree que es una interfaz simple. De hecho, es un consumidor de bajo nivel. y una interfaz más compleja.

Referencia, https://c wiki.apache.org/conflict/display/Kafka/0.8.0 ejemplo simple de consumidor.

¿Cuándo debo utilizar esta interfaz?

Leer un mensaje varias veces

En un proceso, usar solo un subconjunto de las particiones en un tema

Administrar transacciones para garantizar que solo se procese un mensaje Una vez

Por supuesto, hay un precio por usar esta interfaz, es decir, la partición, el corredor y la compensación ya no son transparentes para usted, y debe administrarlos usted mismo, y también debe cambiar el identificador del corredor líder, lo cual es muy problemático.

Así que no es necesario utilizarlo, lo mejor es no usarlo.

Tienes que realizar un seguimiento del offset en tu aplicación para saber dónde dejas de consumir.

Debe determinar qué broker es el broker principal para temas y particiones.

Debe gestionar los cambios del líder del broker.

Para utilizar un consumidor simple:

Encuentre un broker activo y descubra qué broker es el líder para su tema y partición

Determine quién es el broker réplica para su tema y partición

Construya una solicitud que defina los datos que le interesan

Obtener los datos

Identificar y recuperarse de los cambios de liderazgo

Primero, debe saber qué tema leer y qué partición.

Luego, busque el corredor responsable de la partición, encontrando así el corredor propietario de una copia de la partición.

Además, escribe la solicitud y obtén los datos tú mismo.

Finalmente, tenga en cuenta la necesidad de reconocer y abordar los cambios en el liderazgo de los brokers.