Spark

Spark-HBase Connector

Torno a scrivere su Nerdammer, dopo una lunga assenza, per parlare delle nuove tendenze del mondo Big Data. L’ultimo articolo su Hadoop pubblicato su questo blog risale forse a un anno fa, quando la parola Big Data cominciava a essere più presente su Twitter, e quel famoso framework (MapReduce) veniva proposto come soluzione universale per l’elaborazione di quantità enormi di dati, con potenzialità teoricamente infinite.

È passato del tempo e, a quanto pare, in Italia siamo ancora al punto di partenza su queste tecnologie e a me, che ormai ho deciso che questa sarà la strada da prendere nel futuro lavorativo, toccherà forse emigrare “al Nord” o all’estero per poter sperimentare qualcosa… I soliti italiani !

Mentre tutto rimane fermo, ho trovato un po’ di tempo e ho tirato fuori un nuovo progetto open source targato Nerdammer: Spark-HBase Connector (https://github.com/nerdammer/spark-hbase-connector). Vi spiego rapidamente di cosa si tratta.

Spark è un framework fantastico per l’elaborazione distribuita, che si sta diffondendo sempre di più nel mondo Big Data. In sostanza, consente di fare tutto ciò che si può fare con il vecchio MapReduce, ma in maniera molto, molto più semplice e rapida. Algoritmi che in MapReduce possono essere scritti definendo decine di classi e configurando workflow complessi con Oozie, su Spark diventano qualche riga di codice Scala. Sì, Scala. Spark supporta anche Java, ma tutta la semplicità e la poesia si perde quando si usa la versione Java. Al massimo potete provare la versione Python, ma non Java.

HBase è un database NoSQL. Le principali distribuzioni Big Data (Cloudera, Hortonworks, MapR) contengono tutte HBase come database NoSQL di riferimento, quindi immagino che ci si possa fidare. Accumulo è un altro progetto promettente, Cassandra viene considerata una valida alternativa ed è forse il database più conosciuto tra i NoSQL al momento (forse a pari merito con MongoDB).

La cosa strana in questo mondo difficile è che ci sono un sacco di librerie di integrazione tra Spark e Cassandra (alcune anche ufficiali), ma per connettere un’applicazione Spark a HBase non si riescono a trovare né guide, né librerie.. niente. Anche su Stackoverflow, tutti quelli che hanno fatto una domanda del tipo “Come faccio a scrivere/leggere su HBase tramite Spark ?” si sono trovati delle risposte del tipo “Guarda che dice sto tizio nel log di questa mailing list, sembra che lui ci sia riuscito…”.

La cosa non riesco ancora a capirla, visto che si tratta del database NoSQL di riferimento. E proprio per questo motivo, la libreria mancante l’ho creata io.

Per chi conosce già scala, usare Spark-HBase connector è molto semplice:

import it.nerdammer.spark.hbase._

val rdd = sc.parallelize(1 to 100)
.map(i => (i.toString, i+1, "Hello"))

rdd.toHBaseTable("mytable")
.toColumns("column1", "column2")
.inColumnFamily("mycf")
.save()

A questo punto ho scritto 100 record su HBase. E, per leggerli:

import it.nerdammer.spark.hbase._

val hBaseRDD = sc.hbaseTable[(String, Int, String)]("mytable")
.select("column1", "column2")
.inColumnFamily("mycf")

E i dati presenti su HBase diventano RDD di Spark e possono essere utilizzati per fare tutti i calcoli distribuiti che volete. Dietro le quinte (ovviamente non ho provato a reinventare la ruota), la libreria utilizza le API di interoperabilità tra Spark e Hadoop2, ma, come vedete nel codice, non c’è bisogno di perdere tempo a configurare classi strane e la conversione dei tipi è automatica.

Per ulteriori info, fate riferimento al progetto su GitHub.

2 pensieri su “Spark-HBase Connector”

  1. 15/04/22 19:43:57 ERROR Executor: Exception in task 1.0 in stage 2.0 (TID 9)
    java.lang.IllegalArgumentException: Expected at least two converted values, the first one should be the row key
    at it.nerdammer.spark.hbase.HBaseWriter$$anonfun$1.apply(HBaseWriterBuilder.scala:89)
    at it.nerdammer.spark.hbase.HBaseWriter$$anonfun$1.apply(HBaseWriterBuilder.scala:84)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:999)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Rispondi a Marco Annulla risposta

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *