Blog
Kafka Securizado – Spark Streaming
A continuación vamos a mostrar un ejemplo de cómo quedaría la configuración de una aplicación de Spark Streaming (Java) que lee los datos de un topic de Kafka securizado.
- Código Spark Streaming – Java
En la parte del consumidor de Spark Streaming, lo que debemos indicar como configuración en la creación del DStream de Kafka, es la siguiente propiedad:
Map kafkaParams = new HashMap<>();
kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
El resto de configuración/código será igual que la que tuviéramos indicada en un consumidor de lectura sin securizar. A continuación os indicamos el ejemplo completo de la creación del DStream:
SparkConf sparkConf = new SparkConf().setAppName("Formacion Hadoop");
sparkConf.set("spark.serializer", KryoSerializer.class.getName());
sparkConf.set("spark.hadoop.fs.hdfs.impl.disable.cache", "true");
final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
new Duration(Long.parseLong("1200")));
String brokers = "localhost:9092";
String topics = "fhadoop";
Collection<String> topicsSet = (Arrays.asList(topics.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", brokers);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", properties.getProperty(Constants.KAFKA_GROUP_ID));
kafkaParams.put("enable.auto.commit", "false");
kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
jssc.checkpoint("/fhadoop/check");
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(topicsSet, kafkaParams));
- Configuración ejecución Spark Streaming
Una vez realizada la implementación anterior nos quedaría crear la instrucción de ejecución del proceso con la configuración necesaria para la lectura del topic securizado. Lo primero que deberemos crear, será un fichero jass (es un fichero de texto plano) con el usuario que tiene permisos de lectura sobre el topic.
Ejemplo fichero kafka_client_jass.conf :
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="fhadoop.copy.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka"
client=true
principal="fhadoop@F.HADOOP.CORP";
};
Cómo podéis observar en el ejemplo anterior, también será necesario la generación del keytab (si el sistema está Kerberizado) tanto para la ejecución del proceso de Spark como para la lectura del topic.
Por último, lo que quedaría sería montar la instrucción de ejecución con los parámetros necesarios:
spark-submit --keytab fhadoop.keytab --principal fhadoop@F.HADOOP.CORP --class fhadoop.clasePrincipal --driver-java-options "-Djava.security.auth.login.config=kafka_client_jass.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jass.conf" --files kafka_client_jass.conf,fhadoop.copy.keytab --deploy-mode cluster codigo.jar
Los parámetros más importantes de la instrucción anterior son:
- Existen 2 Keytab iguales. Uno para la aplicación de Spark (fhadoop.keytab) y otro para la securización/lectura de Kafka (fhadoop.copy.keytab).
- Tanto el fichero jass como el keytab deben ser enviados al driver y a los ejecutres (instrucciones –driver-java-options y –conf )
Antes de ejecutar la aplicación habrá que indicarle la variable de entorno especificándole la versión de Kafka (en nuestro caso 0.10):
export SPARK_KAFKA_VERSION=0.10
A continuación os indicamos la dependencia de la librería utilizada para la lectura de Kafka 0.10 con Spark Streaming:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
</dependency>
- En el caso de que queramos conectarnos a un topic securizado a través del productor/consumidor de consola, debemos indicar la siguiente variable de entorno (indicando la ruta donde se encuentra el fichero jass):
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/fhadoop/kafka_client_jass.conf"
También habrá que generar el fichero "client.properties" con el siguiente contenido:
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
Ejemplo consumidor:
kafka-console-consumer --new-consumer --bootstrap-server localhost:9092,localhost1:9092 --topic fhadoop --consumer.config client.properties
- Si lo que tenemos es un productor/consumidor dentro de una aplicación en un servidor tomcat, lo que tendremos que hacer es:
Primero añadir la propiedad SASL_PLAINTEXT en la creación del productor/consumidor:
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
Segundo indicarle en la variable de entorno JAVA_OPTS dónde se encuentra el fichero jaas:
JAVA_OPTS:export JAVA_OPTS="-Djavax.servlet.request.encoding=UTF-8 -Dfile.encoding=UTF-8 -Djava.security.auth.login.config=/fhadoop/kafka_client_jass.conf -Djava.security.krb5.conf=/fhadoop/krb5.conf"