Todavía queda una cosa antes de centrar nuestra atención en la parte divertida. La interfaz de usuario web de Flink es una interfaz fácil de usar que permite a los desarrolladores y administradores monitorear y administrar sus aplicaciones Apache Flink. Proporciona una descripción general en tiempo real de los trabajos en ejecución o completados, muestra métricas como el rendimiento y la latencia y ofrece información detallada sobre el plan de ejecución del trabajo. Básicamente, es un panel conveniente donde puede visualizar el rendimiento y el estado de sus aplicaciones Flink, lo que hace que el proceso de depuración, optimización y administración de sus trabajos de transmisión o procesamiento por lotes sea mucho más fácil e intuitivo.
Cuando ejecuta una aplicación Flink localmente como en este ejemplo, normalmente no tiene habilitada la interfaz de usuario web de Flink. Sin embargo, existe una manera de obtener también la interfaz de usuario web de Flink en un entorno de ejecución local. Esto me parece útil, especialmente para tener una idea del plan de ejecución antes de ejecutar aplicaciones de streaming en producción.
Comencemos agregando una dependencia al pom.xml:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
Y cambiar ligeramente el código en nuestra clase principal. App.java:
package de.vojay.flitch;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class App {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(new Configuration());
env.fromSequence(1, Long.MAX_VALUE).print();
env.execute("Flitch");
env.close();
}
}
La aplicación de streaming ahora procesará una secuencia de números, por lo que no finalizará inmediatamente. También con createLocalEnvironmentWithWebUI Tendremos la interfaz de usuario web de Flink disponible localmente en el puerto. 8081 mientras la aplicación se está ejecutando.
Empezar de nuevo y abrir http://localhost:8081/ en tu navegador. Además de varias métricas, también puede ver el plan de ejecución de su aplicación Flink.
Ahora tenemos una configuración local adecuada y podemos comenzar a conectar nuestra aplicación a Twitch y ejecutar análisis de opiniones en los mensajes de chat.
Contracción nerviosala plataforma líder de transmisión en vivo para jugadores, ofrece una API integral y una función de chat profundamente integrada con el protocolo Internet Relay Chat (IRC).
En esencia, la API de Twitch permite que las aplicaciones interactúen con los datos de Twitch. Esto incluye la recuperación de información sobre transmisiones en vivo, VOD (vídeo a pedido), usuarios y detalles del juego. La API es RESTful, lo que significa que sigue el estilo arquitectónico de la web, lo que facilita su uso con solicitudes HTTP comunes. Los desarrolladores pueden utilizar esta API para crear experiencias personalizadas, como mostrar estadísticas de transmisión en vivo, buscar canales o incluso automatizar configuraciones de transmisión.
El chat de Twitch es un aspecto vital de la experiencia de Twitch, ya que permite a los espectadores interactuar con streamers y otros espectadores en tiempo real. Debajo de la moderna interfaz de Twitch Chat se encuentra el protocolo Internet Relay Chat (IRC), un elemento básico de la comunicación en línea desde finales de los años 80. Esta dependencia de IRC permite una amplia gama de posibilidades a la hora de leer e interactuar con el chat a través de aplicaciones personalizadas.
Para nuestro propósito, simplemente queremos leer el chat, sin escribir mensajes nosotros mismos. Afortunadamente, Twitch permite conexiones anónimas al chat para casos de uso de aplicaciones de solo lectura.
Para reducir el esfuerzo de implementación, utilizaremos una biblioteca existente para interactuar con Twitch: Twitch4J. Twitch4J es una biblioteca Java moderna diseñada para simplificar la integración con las funciones de Twitch, incluida su API, Chat (a través de IRC), PubSub (para notificaciones en tiempo real) y Webhooks. Esencialmente, es un poderoso conjunto de herramientas para desarrolladores de Java que buscan interactuar con los servicios de Twitch sin tener que administrar directamente detalles de bajo nivel como solicitudes HTTP o manejo del protocolo IRC.
El primer paso es agregar Twitch4J como una dependencia al pom.xml:
<dependency>
<groupId>com.github.twitch4j</groupId>
<artifactId>twitch4j</artifactId>
<version>1.19.0</version>
</dependency>
Nos gustaría tener un objeto Java antiguo (POJO) ligero y serializable para representar los mensajes de chat de Twitch dentro de nuestra aplicación. Nos interesa el canal donde se escribió el mensaje, el usuario y el contenido en sí.
Crear una nueva clase TwitchMessage con la siguiente implementación:
package de.vojay.flitch;public class TwitchMessage {
private final String channel;
private final String user;
private final String message;
public TwitchMessage(String channel, String user, String message) {
this.channel = channel;
this.user = user;
this.message = message;
}
public String getChannel() {
return channel;
}
public String getUser() {
return user;
}
public String getMessage() {
return message;
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer("TwitchMessage{");
sb.append("channel='").append(channel).append('\'');
sb.append(", user='").append(user).append('\'');
sb.append(", message='").append(message).append('\'');
sb.append('}');
return sb.toString();
}
}
Como nota al margen: no es necesario escribir funciones básicas como toString() por su cuenta, puede utilizar IntelliJ para generarlo por usted. Simplemente haga clic en Código → Generar… → toString() para obtener el resultado anterior.
Ahora usaremos Twitch4J para implementar una función de fuente de Twitch personalizada para Flink. La función de fuente generará un flujo ilimitado de datos, en este caso mensajes de chat de Twitch. Eso también significa que la aplicación no finalizará hasta que la detengamos explícitamente.
El cliente Twitch se puede construir así:
TwitchClientBuilder clientBuilder = TwitchClientBuilder.builder();
client = clientBuilder
.withEnableChat(true)
.build();client.getChat().joinChannel("vojay");
Con este ejemplo obtenemos un client que se une al canal de Twitch llamado viajar. Sí, una vez fui un streamer activo.. Dato curioso: en mis transmisiones enseñé a personas sobre desarrollo de juegos y desarrollo de software en general. También disfruté jugando juegos retro en vivo por transmisión 🎮. Pero ese es un tema diferente, centrémonos en el proyecto 😉.
También deberías notar que no hay autenticación en el ejemplo anterior. Como se dijo antes, como sólo queremos leer el chat, no se necesita autenticación. De hecho, simplemente nos unimos a un chat IRC de forma anónima y leemos los mensajes.
Como queremos establecer la conexión al chat de Twitch sólo una vez por instancia fuente, tenemos que ampliar el resumen RichSourceFunction clase, para poder anular la open función, que permite agregar código para la inicialización.
public class TwitchSource extends RichSourceFunction<TwitchMessage> {
@Override
public void open(Configuration configuration) {
// ...
}// ...
}
También utilizamos nuestro TwitchMessage POJO para que el parámetro genérico le diga a Flink que esta fuente genera elementos de tipo TwitchMessage.
Además, queremos poder pasar una serie de canales de Twitch que queremos escuchar en el constructor de la función fuente.
Para controlar el estado de nuestra función fuente, utilizamos un boolean variable llamada runningque configuramos en true en el open función.
En base a esto, el constructor y open La función se parece a la siguiente:
public class TwitchSource extends RichSourceFunction<TwitchMessage> {private final String[] twitchChannels;
private TwitchClient client;
private SimpleEventHandler eventHandler;
private boolean running = true;
public TwitchSource(String[] twitchChannels) {
this.twitchChannels = twitchChannels;
}
@Override
public void open(Configuration configuration) {
client = TwitchClientBuilder
.builder()
.withEnableChat(true)
.build();
for(String channel : twitchChannels) {
client.getChat().joinChannel(channel);
}
eventHandler = client
.getEventManager()
.getEventHandler(SimpleEventHandler.class);
running = true;
}
// ...
Con eso, tenemos todo lo que necesitamos para consumir mensajes y emitirlos para su posterior procesamiento como un flujo de datos.
El run La función de una función fuente es donde ocurre la magia. Aquí generamos los datos y con un dado SourceContextpodemos emitir datos.
El SimpleEventHandler proporcionado por Twitch4J se puede utilizar para reaccionar ante mensajes específicos.
Cada vez que obtenemos un evento de tipo IRCMessageEventque es un mensaje en el chat de Twitch, generamos una instancia de nuestro POJO y la emitimos a la transmisión a través del contexto.
Para garantizar que nuestra función fuente no termine, agregaremos un bucle con un retraso artificial, que se ejecutará hasta que nuestra boolean variable running se establece en false. Esto se hará en el cancel función, que es llamada por el entorno de Flink al apagar.
@Override
public void run(SourceContext<TwitchMessage> ctx) throws InterruptedException {
eventHandler.onEvent(IRCMessageEvent.class, event -> {
String channel = event.getChannel().getName();
EventUser eventUser = event.getUser();
String user = eventUser == null ? "" : eventUser.getName();
String message = event.getMessage().orElseGet(String::new);ctx.collect(new TwitchMessage(channel, user, message));
});
while(running) {
Thread.sleep(100);
}
}
@Override
public void cancel() {
client.close();
running = false;
}
En conjunto, esta es la implementación completa de nuestra función de fuente de Twitch personalizada para Flink. TwitchSource.java:
package de.vojay.flitch;import com.github.philippheuer.events4j.simple.SimpleEventHandler;
import com.github.twitch4j.TwitchClient;
import com.github.twitch4j.TwitchClientBuilder;
import com.github.twitch4j.chat.events.channel.IRCMessageEvent;
import com.github.twitch4j.common.events.domain.EventUser;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
public class TwitchSource extends RichSourceFunction<TwitchMessage> {
private final String[] twitchChannels;
private TwitchClient client;
private SimpleEventHandler eventHandler;
private boolean running = true;
public TwitchSource(String[] twitchChannels) {
this.twitchChannels = twitchChannels;
}
@Override
public void open(Configuration configuration) {
client = TwitchClientBuilder
.builder()
.withEnableChat(true)
.build();
for(String channel : twitchChannels) {
client.getChat().joinChannel(channel);
}
eventHandler = client
.getEventManager()
.getEventHandler(SimpleEventHandler.class);
running = true;
}
@Override
public void run(SourceContext<TwitchMessage> ctx) throws InterruptedException {
eventHandler.onEvent(IRCMessageEvent.class, event -> {
String channel = event.getChannel().getName();
EventUser eventUser = event.getUser();
String user = eventUser == null ? "" : eventUser.getName();
String message = event.getMessage().orElseGet(String::new);
ctx.collect(new TwitchMessage(channel, user, message));
});
while(running) {
Thread.sleep(100);
}
}
@Override
public void cancel() {
client.close();
running = false;
}
}
Con esta función de fuente personalizada, ya podemos ampliar nuestro canal de transmisión en App.java para simplemente imprimir cada mensaje de chat escrito en el chat:
package de.vojay.flitch;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class App {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(new Configuration());
TwitchSource twitchSource = new TwitchSource(new String[]{"vojay"});
env.addSource(twitchSource)
.print();
env.execute("Flitch");
env.close();
}
}
Con addSource Podemos agregar nuestra función fuente. Luego, los elementos se procesan en el siguiente paso de la secuencia, que es print(). Con este sumidero, enviaremos nuevamente cada elemento a STDOUT.
Al ejecutar la aplicación ahora y escribir en el chat en https://twitch.tv/vojaylos mensajes serán procesados e impresos por nuestra aplicación de streaming 🎉.
Ahora que podemos leer el chat de Twitch como un flujo de datos, es momento de procesar cada mensaje. La idea básica es: para cada mensaje de Twitch, detectamos las frases individuales del mensaje y calculamos el sentimiento de cada una de las frases. El resultado será una estructura como esta:
Tuple2<TwitchMessage, Tuple2<List<Integer>, List<String>>>
Vamos a desglosarlo: el resultado contiene el POJO original del mensaje de chat de Twitch junto con otra tupla con 2 elementos:
- Una lista de puntuaciones de sentimiento (
List<Integer>) que contiene la puntuación de cada frase del mensaje, de 0 (muy negativo) a 4 (muy positivo) y - una lista de clases de sentimiento (
List<String>) que contiene la clase legible para cada oración del mensaje, por ejemplo: Neutral o Negativo.