Nello scorso articolo abbiamo visto che lo scopo di questo sistema è realizzare un sistema di analisi dei dati di produzione di “n” macchine in near-realtime per evitare carichi sui sistemi di calcolo, sulla rete, e per avere i dati disponibili il prima possibile.

Se non l’hai letto, li troverai le considerazioni teoriche dei dati che vogliamo trasformare.

Ora vedremo invece la pratica.

STEP 1 – Creiamo su Azure la risorsa Event Hubs e rendiamola utilizzabile

Pre prima cosa creiamo un resource group su Azure che utilizzeremo per le risorse necessarie. Un resource group è di fatto un contenitore di risorse che è utile anche da un punto di vista di fatturazione (puoi dividere i costi proprio per resource group).

Fig. 1 – creazione su Azure di un resource group

Fatto questo possiamo iniziare a creare la nostra risorsa: Event Hubs. Quando creiamo una risorsa Event Hubs dovremo dichiarare inizialmente il “namespace” (che sarà poi riportato sulla stringa di connessione). Il namespace è solo il contenitore di tutti gli hub (gli stream di dati) che tu creerai come vedi nella figura sottostante.

Fig. 2 – Creazione del namespace dell’ event hubs

Nota che ho impostato il piano standard perchè con questo piano è possibile:

  • abilitare la compatibilità con Kafka
  • abilitare la funzione “capture” che salva i singoli messaggi su Blob Storage in formato .avro. Così potrai utilizzarli anche in un secondo momento.
  • la retention dei messaggi è di almeno 7 giorni.

Fatto questo creiamo lo stream vero e proprio (nella overview di Event Hubs clicca su “+ Event Hub”). Ti verrà chiesto un nome (io ho inserito “press-log”).

Una volta creato lo stream “press-log” dovremo dare i permessi alle applicazioni per accedere allo stream. Come? Abilitando una shared key (ovvero una stringa di connessione contenente le chiavi. Tale stringa la utilizzeremo nelle applicazioni che si connetteranno allo stream). Quindi andiamo sul menu “Shared Access Policies” interno all’event hub creato e clicchiamo su “+ Add”. Inseriremo il nome della policy e i permessi (ho abilitato send e listen per inviare i messaggi su questo stream e leggerli). Una volta eseguito, cliccando nuovamente sulla policy si aprirà a destra una colonna contenente le stringhe di connessione che dovrai copiare ed importare nelle applicazioni (connection string – primary key)

Fig. 3 – Connection string – primary key

Fatto questo adesso puoi tornare alla dashboard dove troverai dei grafici rappresentanti l’andamento dello stream.

Fig. 4 – dashboard di event hub

Nota che non ho abilitato l’event capture perchè non utile ai fini di questo articolo, ma sarebbe bastato abilitare questa feature dal menu di sinistra “capture”.

Adesso siamo pronti per partire.

Step 2 – concept del producer

Come detto prima il producer è il processo installato a bordo macchina che ad ogni fine ciclo di lavorazione della pressa, legge il file prodotto, ne estrae un documento json e lo invia allo stream creato prima.

Ho realizzato il codice in python vista la sua enorme flessibilità. Essendo un processo chiamato in modo sincrono e su singolo thread, le performance non saranno certo un problema. Piuttosto ne guadagneremo in termini di sviluppo più snello ed espressivo.

Pre praticità ho creato un virtual environment e vi ho installato il pacchetto “azure-eventhub” (adesso nella versione 5.2). Tale pacchetto è necessario per connettersi allo stream creato in precedenza.

Cosa fa il codice seguente? Apre il file .csv generato dalla pressa, crea il documento json prendendo solo i dati necessari e lo trasmette.

Come vedrai dal codice ho inserito del codice per simulare la lettura e invio di più files (io avevo un solo file di test della macchina!).

import os
import json
import datetime
from azure.eventhub import EventHubProducerClient, EventData
from azure.eventhub.exceptions import EventHubError

class LogPusher:
    def __init__(self) -> None:
        self.event_hub_name = 'press-log' #il nome dell'event hub, non il namespace
        self.event_hub_connection_string = '<la tua connection string di event hub>'
        super().__init__()

    def push_data(self, n: int) -> None:
        client = EventHubProducerClient.from_connection_string(
            conn_str=self.event_hub_connection_string, 
            eventhub_name=self.event_hub_name)

        with client:
            event_data_batch = client.create_batch()
            for i in range(n):
                json_document = self.generate_json_document(i)
                event_data = EventData(json_document)
                try:
                    event_data_batch.add(event_data)
                except ValueError:
                    client.send_batch(event_data_batch)
                    event_data_batch = client.create_batch()
                    event_data_batch.add(event_data)

            if len(event_data_batch) > 0:
                client.send_batch(event_data_batch)
        
        
    def generate_json_document(self, i: int):
        file_name = 'DX-111_2001-08-10_19-07-39_0055_OK.csv'
        # dati falsati volontariamente per creare differenti risultati
        result = 'OK' if i % 2 == 0 else 'KO'
        now = datetime.datetime.now()
        date_time = now.strftime("%Y-%m-%d_%H-%M-%S")
        fake_file_name = 'MP-000_{}_{}_{}.csv'.format(
                date_time,
                i, 
                result)
        data_array = self.read_log_data(file_name, fake_file_name)
        document = {
            "file_name": fake_file_name,
            "data": data_array
        }
        json_document = json.dumps(document)
        return json_document


    def read_log_data(self, file_name: str, fake_file_name: str):
        data_array = []
        base_path = os.getcwd()
        file_name = os.path.join(base_path, 'data', file_name)
        with open(file_name, 'r') as csv_file:
            data_lines = csv_file.readlines()[279:]
            for data_line in data_lines:
                if data_line == ';;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;\n':
                    break
                values = data_line.split(';')[0:4]
                data_array.append({
                    # "file_name": fake_file_name,
                    "s": self.convert_to_float(values[0]),
                    "mm1": self.convert_to_float(values[1]),
                    "n": self.convert_to_float(values[2]),
                    "mm2": self.convert_to_float(values[3]),
                })

        return data_array

    @staticmethod
    def convert_to_float(value: str) -> float:
        return float(value.replace(',','.'))

if __name__ == "__main__":
    LogPusher().push_data(20)

Step 3 – concept del consumer

In questo caso realizzeremo una applicazione da far girare sul motore Spark. L’ho realizzata in Java prendendo in considerazione che è un linguaggio conosciuto e molto presente nel mondo enterprise.

Comunque, il consiglio è (se possibile) di utilizzare scala o python (tramite pyspark).

Il bello di Apache Spark…

Come saprai Spark è un motore (scritto in scala, e di conseguenza gira su JVM) di elaborazione e analisi dei dati. Non starò qui a spiegarti le basi.

Ti dirò solo che è un motore distribuito: ovvero il carico di lavoro viene suddiviso tra più cluster e poi unito per ottenere il risultato finale. Ma è tutto trasparente: tu scriverai il tuo codice una volta sola, e i dati saranno accessibili in un solo posto. Spark si occuperà di inviare il tuo codice ed i dati in modo partizionato ai cluster registrati.

C’è un concetto di fondo che è bene considerare: utilizzeremo Spark in modalità “structured streaming”.

Spark rimarrà in ascolto sullo stream di event hub ed eseguirà operazioni sui dati ricevuti. Sarà Spark stesso a mantenere un thread in loop continuo.

I dati però nel nostro caso non sono legati ad una finestra temporale (ad esempio la ricezione continua della potenza del segnale di un device, oppure la ricezione continua dei valori di un titolo di borsa). Ogni singolo messaggio infatti contiene già la timeline di una lavorazione della pressa, e non dipende dal tempo. Ci arriva ed è già pronto per essere elaborato. Non dipende minimamente dagli altri messaggi.

Semplicemente vogliamo sfruttare lo stream per lavorare il prima possibile i singoli messaggi anziché in un mostruoso batch notturno.

Questo concetto si chiama “stateless aggregation” ed è necessario considerarlo nel momento in cui aggregheremo e scriveremo i dati sul database.

Passando al codice, come per python, dovremo inserire oltre alle dipendenze di Spark, anche quelle di Event Hub. Ti riporto (se usi maven) il file xml con la dichiarazione della dipendenza. Questa ti servirà per dichiarare il formato di streaming su cui Spark rimarrà in ascolto.

<project xmlns="http://maven.apache.org/POM/4.0.0" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>net.heavycode.analytic</groupId>
  <artifactId>press-log-subscriber</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>1.8</java.version>
    <scala.version>2.12</scala.version>
    <spark.version>3.0.0</spark.version>
    <delta.version>0.7.0</delta.version>
    <maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
    <postgresql.version>42.1.4</postgresql.version>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
  </properties>
  
  
  <dependencies>
    <!-- Spark -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_${scala.version}</artifactId>
      <version>${spark.version}</version>
    </dependency>
    
    <dependency>
	   <groupId>org.apache.spark</groupId>
	   <artifactId>spark-sql_${scala.version}</artifactId>
	   <version>${spark.version}</version>
	   <exclusions>
	    <exclusion>
	     <groupId>org.slf4j</groupId>
	     <artifactId>slf4j-simple</artifactId>
	    </exclusion>
	   </exclusions>
	  </dependency>
	  
	... 
    
    <dependency>
	    <groupId>com.microsoft.azure</groupId>
	    <artifactId>azure-eventhubs-spark_2.12</artifactId>
	    <version>2.3.18</version>
	</dependency>
    
  </dependencies>
  
</project>

Quello che ti riporto adesso è il codice intero (con i commenti per ogni trasformazione).

package net.heavycode.analytic.press.subscriber;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.StructType;
import java.util.concurrent.TimeoutException;
import org.apache.spark.eventhubs.EventHubsConf;
import org.apache.spark.eventhubs.EventPosition;

import static org.apache.spark.sql.functions.*;
import static org.apache.spark.sql.types.DataTypes.FloatType;
import static org.apache.spark.sql.types.DataTypes.StringType;

public class PressReaderApp {
	
	public static void main(String[] args) {
		PressReaderApp app = new PressReaderApp();
		 try {
		      app.start();
	    } catch (TimeoutException | StreamingQueryException e) {
	      System.out.print(e.getMessage());
	    }
	  }
	
	private void start() throws TimeoutException, StreamingQueryException {

		System.out.print("INIT!");

		SparkSession spark = SparkSession.builder()
		        .appName("Subscribe to event hub")
		        .master("local")
		        .getOrCreate();


		/******************************************************************************************
		 1 - confgurazione e connessione a Event Hubs per la lettura dello stream
		 ******************************************************************************************/
		String eventHubConnectionString = "<la tua connection string event hub>";
		EventHubsConf ehConf = new EventHubsConf(eventHubConnectionString)
				.setStartingPosition(EventPosition.fromStartOfStream());

		Dataset<Row> df = spark
				.readStream()
				.format("eventhubs")
				.options(ehConf.toMap())
				.load();
		
		System.out.println("DATAFRAME LOADED");
		df.printSchema();


		/******************************************************************************************
		2 - il dataframe va convertito in quanto il messaggio è in un campo contenente un array di byte
	 	******************************************************************************************/
		Dataset<Row> msgDf = df.selectExpr("cast (body as string) AS content");
		System.out.println("msgDf DF SCHEMA");
		msgDf.printSchema();


		/******************************************************************************************
		3 - creo una struttura che rappresenti il formato del json contenuto nel singolo messaggio
		******************************************************************************************/
		StructType arrayStruct = new StructType()
				.add("s", FloatType, false)
				.add("mm1", FloatType, false)
				.add("n", FloatType, false)
				.add("mm2", FloatType, false);

		StructType schema = new StructType()
				.add("file_name", StringType, false)
				.add("data", new ArrayType(arrayStruct, false));


		/******************************************************************************************
		4 - utilizzo la funzione from_json per eseguire il cast della stringa contenente il json
	 	******************************************************************************************/
		msgDf = msgDf
				.withColumn("json", from_json(msgDf.col("content"),schema))
				.drop("content");

		System.out.println("msgDf DF SCHEMA");
		msgDf.printSchema();


		/******************************************************************************************
	 	5 - dal momento che la struttura conterrebbe una sola colonna chiamata "json"
		 e contenente il documento json con l'attrbiuto "file_name"
		 e "data" (che è un array di array che verrà trasformato) li divido in 2 colonne separate.
		 Notare che "data" è un array di array (una matrice bidimensionale)
	 	******************************************************************************************/
		Dataset<Row> explodedJsonDf = msgDf
				.select("json", "json.file_name", "json.data")
				.drop("json");

		System.out.println("explodedJsonDf DF SCHEMA");
		explodedJsonDf.printSchema();


		/******************************************************************************************
		 6 - viene esplosa la colonna data in modo da generare una riga
		 per ogni elemento dell'array di array (quindi ogni riga risultante sarà un vettore)
		 ******************************************************************************************/
		Dataset<Row> explodedDataDf = explodedJsonDf.select(
				explodedJsonDf.col("file_name"),
				explode(explodedJsonDf.col("data").as("data")));

		explodedDataDf = explodedDataDf.withColumnRenamed("col", "data");

		System.out.println("explodedDataDf DF SCHEMA");
		explodedDataDf.printSchema();


		/******************************************************************************************
		 7 - adesso esplodiamo ulteriormente il vettore in una colonna per ogni elemento del vettore
		 coì da iniziare ad evere una "tabella" di dati utilizzabili per fare una minima analisi.
		 +-----------------------------------+------+-------+---------+-------+
		 |file_name                          |s     |mm1    |n        |mm2    |
		 +-----------------------------------+------+-------+---------+-------+
		 |MP-000_2021-01-05_14-13-54_0_OK.csv|0.0   |65.0   |-0.36336 |65.0   |
		 |MP-000_2021-01-05_14-13-54_0_OK.csv|0.09  |65.2001|-0.57141 |65.2001|
		 |MP-000_2021-01-05_14-13-54_0_OK.csv|0.1379|65.4002|-0.36716 |65.4002|
		 |MP-000_2021-01-05_14-13-54_0_OK.csv|0.1813|65.601 |-0.2847  |65.601 |
		 ******************************************************************************************/
		Dataset<Row> fullColsDf = explodedDataDf
			.withColumn("s", explodedDataDf.col("data").getItem("s"))
			.withColumn("mm1", explodedDataDf.col("data").getItem("mm1"))
			.withColumn("n", explodedDataDf.col("data").getItem("n"))
			.withColumn("mm2", explodedDataDf.col("data").getItem("mm2"))
			.drop("data");

		System.out.println("fullColsDf DF SCHEMA");
		fullColsDf.printSchema();


		/******************************************************************************************
		 8 - il file_name contiene informazioni importanti nella stringa, quindi si estraggono
		 (si poteva fare al punto 6, ma per charezza lo si riporta separatamente)
		 ******************************************************************************************/
		fullColsDf = fullColsDf
				.withColumn("machine_name", substring(fullColsDf.col("file_name"), 0,2))
				.withColumn("work_number", substring(fullColsDf.col("file_name"), 4,3))
				.withColumn("cycle_number", substring(fullColsDf.col("file_name"), 28,1))
				.withColumn("result", substring(fullColsDf.col("file_name"), 30,2))
				.withColumn("str_date", substring(fullColsDf.col("file_name"), 8,19))
				.withColumn("time_stamp", current_timestamp());

		fullColsDf = fullColsDf
				.withColumn(
						"work_datetime",
						to_timestamp(fullColsDf.col("str_date"), "yyyy-MM-dd_HH-mm-ss"))
				.drop("str_date");


		/******************************************************************************************
		 9 - raggruppamento per file_name e aggregazione dati
		 ******************************************************************************************/
		Dataset<Row> groupDf = fullColsDf
				.groupBy(fullColsDf.col("file_name"))
				.agg(
						max(fullColsDf.col("n")).as("max_n").alias("max_n"),
						max(fullColsDf.col("mm1")).as("max_mm1").alias("max_mm1"),
						max(fullColsDf.col("result")).as("result").alias("result")
				);

		System.out.println("groupDf DF SCHEMA");
		groupDf.printSchema();


		/******************************************************************************************
		 10 - dal momento che ogni documento json di origine contiene già tutto il pacchetto di
		 dati della lavorazione della macchina, non ci troviamo nel caso di una aggregazione
		 statefull (in cui spark deve aggiornare un flusso continuo di dati) ma in una aggregazione
		 stateless (in cui definiamo un tempo di attività sulla base del quale spark realizza
		 dei micro-batch computazionali)
		 ******************************************************************************************/
		StreamingQuery query = groupDf
		        .writeStream()
		        .outputMode(OutputMode.Complete())
		        .foreach(new PotgresForeachWriter())
		        .option("truncate", false)
				.trigger(Trigger.ProcessingTime("10 seconds"))
		        .start();

		try {
			query.awaitTermination(10000);
		}
		catch (StreamingQueryException e) {
			System.out.print("ERROR -> " + e.getMessage());
		}

		System.out.print("END");
	}

}

Questi sono i punti principali del listato:

Al commento numero 1 andiamo ad impostare Spark per la fase di ingestione dello stream e del suo caricamento dei dati sul dataframe (la struttura principale di Spark).

Nota che nella creazione dell’oggetto EventHubsConf ho impostato anche l’offset dal quale si vuole iniziare a leggere lo stream. Questo dipende dalle necessità del caso d’uso.

Al commento numero 2 andiamo a convertire il messaggio in una stringa in quanto all’arrivo è contenuto in un campo codificato.

Al punto 3 andiamo a formalizzare la struttura contenuta nel json, questa sarà necessaria proprio per la sua conversione da stringa in formato json a un oggetto json vero e proprio.

Il punto 6 è focale: abbiamo detto che all’arrivo del messaggio ogni riga del dataframe contiene un json contenente il nome della lavorazione e i dati campionati per tutta la lavorazione: un array di array. In pratica potrebbero arrivare più messaggi insieme, ognuno contenuto in una riga del dataframe caricato.

Perciò è necessario eseguire una “esplosione” del dataset in modo da ottenere un dataframe contenente per ogni riga l’identificativo della lavorazione e i dati del singolo istante.

Se ad esempio il dataframe originale avesse contenuto 2 righe rappresentanti una lavorazione ciascuna, e ciascuna lavorazione avesse contenuto 20 campionature dei dati di lavorazione, dopo l’esplosione ci troveremo un nuovo dataframe di 40 righe divise in 20 righe di dati di lavorazione riferite alla prima lavorazione, ed altrettante 20 riferite alla seconda lavorazione.

Fig. 5 – il risultato dell’ esplosione al punto 6

Dopo ciò aggiungeremo una colonna per ogni dato contenuto in quello che per ogni riga, adesso, è un vettore monodimensionale.

Al punto 9 avendo a disposizione una tabella riportante in colonna: identificativo lavorazione, forza, spostamento e gli altri dati possiamo ora aggregare sull’identificativo della lavorazione e ottenere i valori che ci servono (in questo caso il massimo della forza e dello spostamento per quella lavorazione).

La questione dell’aggregazione stateless

Una cosa importante da considerare per Spark quando si eseguono delle aggregazioni durante uno stream di dati è questa: quando scrivo i dati? Quando posso considerare terminato il micro-batch?

Supponi di voler aggregare i dati della potenza del segnale di un device che vengono elaborati da Spark in uno stream continuo e volerne scrivere la media. Ecco il punto… quando? La media negli ultimi 5 minuti? 10 minuti? 20 minuti? Lo stream è un flusso continuo! Questo è un esempio di aggregazione stateful: il valore dipende dalla finestra temporale di analisi.

Nel nostro caso invece ci troviamo di fronte ad una aggregazione stateless: ovvero non dipendiamo dal tempo. Quando arriva il messaggio, esso già contiene tutti i dati che ci servono per eseguire i calcoli, è “autonomo” per così dire.

Quindi l’unica cosa di cui dovremo preoccuparci è istruire Spark per sapere ogni quanto eseguire il loop per controllare i nuovi messaggi in arrivo.

Questo è ciò che è realizzato al punto 10 tramite l’impostazione della OutputModel di tipo “complete” e del trigger.

Infine c’è un’altra cosa di cui tenere presente. Come finalizzare la scrittura a database dei dati ormai aggregati?

Per far questo è necessario usare il metodo “foreach” che prende come argomento l’istanza di una classe che deve estendere “ForeachWriter”.

Questa classa serve ad eseguire una operazione per ogni riga dello stream di dati in uscita e in questo modo è possibile scrivere sul database di destinazione.

package net.heavycode.analytic.press.subscriber;

import org.apache.spark.sql.ForeachWriter;
import org.apache.spark.sql.Row;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;


public class PotgresForeachWriter extends ForeachWriter<Row> {
	private static final long serialVersionUID = 8383715100587612498L;
	private int streamId = 0;
	private String dbDriver = "";
	private String url =  "jdbc:postgresql://....";
	private String username = "username";
	private String password = "password";

	@Override
	public void close(Throwable arg0) {
	}

	@Override
	public boolean open(long arg0, long arg1) {
		return true;
	}

	@Override
	public void process(Row arg0) {
		String fileName = arg0.getString(0);
		float maxN = arg0.getFloat(1);
		float maxMm1 = arg0.getFloat(2);
		String result = arg0.getString(3);

		Properties connProps = new Properties();
		connProps.setProperty("user", this.username);
		connProps.setProperty("password", this.password);
		connProps.setProperty("dbtable", "public.press_log");
		String query = " insert into press_log (file_name, max_n, max_mm1, result)"
				+ " values (?, ?, ?, ?)";
		Connection connection = null;
		try {
			connection = DriverManager.getConnection(url, connProps);
			PreparedStatement preparedStmt = connection.prepareStatement(query);
			preparedStmt.setString (1, fileName);
			preparedStmt.setFloat (2, maxN);
			preparedStmt.setFloat (3, maxMm1);
			preparedStmt.setString (4, result);
			preparedStmt.execute();
			connection.close();
		} catch (SQLException e) {
			e.printStackTrace();
		}
	}
}

In conclusione…

In conclusione di questi due articoli, se sei arrivato fino in fondo devo dirti davvero complimenti!

Ho pensato fosse utile condividere con te questo concept perché sicuramente molte aziende otterrebbero un beneficio dall’utilizzo di queste tecnologie.

Nel mio percorso ho raccolto molte informazioni interessanti in merito agli argomenti di cui ti ho parlato in questo articolo. Per questo ho scritto un libro intitolato “Why Your Data Matter”.

Essendo il frutto della mia passione ed esperienza diretta, ho scelto di mettere questo libro gratuitamente a disposizione di tutti gli IT Manager ed i CIO delle aziende che come te vogliono ottenere grandi risultati dalle loro scelte e dal loro lavoro (evitando di trovarsi in situazioni scomode e da risolvere con urgenza).

Ti invito a leggere le prime pagine scaricandole!
Se poi ti piacerà sarò felice di inviartene una copia GRATUITA direttamente nel tuo ufficio.  

Clicca qui per scaricare l’estratto del mio libro (se ti piacerà te lo invierò in formato cartaceo!) ==> il mio libro  

Alla prossima informazione!