DBInputFormat per transferir dades de la base de dades SQL a NoSQL

L’objectiu d’aquest bloc és aprendre a transferir dades de bases de dades SQL a HDFS, a transferir dades de bases de dades SQL a bases de dades NoSQL.

En aquest bloc explorarem les capacitats i possibilitats d’un dels components més importants de la tecnologia Hadoop, és a dir, MapReduce.



Avui en dia, les empreses adopten el marc Hadoop com a primera opció per a l’emmagatzematge de dades a causa de les seves capacitats per gestionar dades grans de manera eficaç. Però també sabem que les dades són versàtils i existeixen en diverses estructures i formats. Per controlar una enorme varietat de dades i els seus diferents formats, hauria d’haver un mecanisme per acomodar totes les varietats i, tot i així, produir un resultat eficaç i coherent.



El component més potent del marc Hadoop és MapReduce, que pot proporcionar el control de les dades i la seva estructura millor que els seus altres homòlegs. Tot i que requereix una sobrecàrrega de la corba d’aprenentatge i la complexitat de la programació, si podeu gestionar aquestes complexitats segur que podeu manejar qualsevol tipus de dades amb Hadoop.

El marc MapReduce divideix totes les seves tasques de processament bàsicament en dues fases: Mapatge i Reducció.



La preparació de les dades brutes per a aquestes fases requereix la comprensió d’algunes classes i interfícies bàsiques. La super classe d’aquest reprocessament és InputFormat.

El InputFormat class és una de les classes bàsiques de l'API Hadoop MapReduce. Aquesta classe s’encarrega de definir dues coses principals:

  • Dividits de dades
  • Lector de discos

Dividir les dades és un concepte fonamental en el marc Hadoop MapReduce que defineix tant la mida de les tasques de mapes individuals com el servidor potencial d'execució. El Lector de discos és responsable de llegir els registres reals del fitxer d'entrada i d'enviar-los (com a parells clau / valor) al mapeador.



ordenar la matriu c ++

El nombre de mapers es decideix en funció del nombre de fraccions. InputFormat és la tasca de crear les divisions. La majoria de les vegades la mida de la divisió és equivalent a la mida del bloc, però no sempre es creen dividits en funció de la mida del bloc HDFS. Depèn totalment de com s'ha substituït el mètode getSplits () del vostre InputFormat.

Hi ha una diferència fonamental entre la divisió MR i el bloc HDFS. Un bloc és un fragment físic de dades, mentre que una divisió és només un fragment lògic que llegeix un mapeador. Una divisió no conté les dades d’entrada, només conté una referència o adreça de les dades. Una divisió té bàsicament dues coses: una longitud en bytes i un conjunt d'ubicacions d'emmagatzematge, que són només cadenes.

Per entendre-ho millor, posem un exemple: processar les dades emmagatzemades a MySQL mitjançant MR. Com que no hi ha cap concepte de blocs en aquest cas, la teoria: 'les divisions sempre es creen basades en el bloc HDFS',falla. Una possibilitat és crear divisions basades en rangs de files de la taula MySQL (i això és el que fa DBInputFormat, un format d’entrada per llegir dades d’una base de dades relacional). Podem tenir k nombre de fraccions que consisteixen en n files.

Només per als InputFormats basats en FileInputFormat (un InputFormat per al maneig de dades emmagatzemades en fitxers) que es divideixen en funció de la mida total, en bytes, dels fitxers d’entrada. No obstant això, la mida del bloc FileSystem dels fitxers d'entrada es tracta com un límit superior per a les divisions d'entrada. Si teniu un fitxer inferior a la mida del bloc HDFS, només obtindreu 1 assignador per a aquest fitxer. Si voleu tenir un comportament diferent, podeu utilitzar mapred.min.split.size. Però torna a dependre únicament dels getSplits () del vostre InputFormat.

Tenim tants formats d’entrada preexistents disponibles al paquet org.apache.hadoop.mapreduce.lib.input.

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

El valor per defecte és TextInputFormat.

De la mateixa manera, tenim tants formats de sortida que llegeixen les dades dels reductors i les emmagatzemen a HDFS:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

El valor predeterminat és TextOutputFormat.

En acabar de llegir aquest bloc, ja hauríeu après:

  • Com escriure un programa de reducció de mapes
  • Quant als diferents tipus d’InputFormats disponibles a Mapreduce
  • Quina és la necessitat d’InputFormats
  • Com escriure InputFormats personalitzats
  • Com transferir dades de bases de dades SQL a HDFS
  • Com transferir dades de bases de dades SQL (aquí MySQL) a bases de dades NoSQL (aquí Hbase)
  • Com es transfereixen dades d'una base de dades SQL a una altra taula de les bases de dades SQL (potser això potser no sigui tan important si ho fem a la mateixa base de dades SQL. Tot i això, no hi ha res dolent en tenir-ne coneixement. Mai se sap com es pot utilitzar)

Requisit previ:

  • Hadoop preinstal·lat
  • SQL preinstal·lat
  • Hbase preinstal·lat
  • Comprensió bàsica de Java
  • MapReduce el coneixement
  • Coneixements bàsics del marc Hadoop

Comprenguem l’enunciat del problema que resoldrem aquí:

Tenim una taula d’empleats a MySQL DB a la nostra base de dades relacional Edureka. Ara, segons el requisit empresarial, hem de canviar totes les dades disponibles al DB relacional al sistema de fitxers Hadoop, és a dir, HDFS, NoSQL DB conegut com a Hbase.

Tenim moltes opcions per fer aquesta tasca:

  • Sqoop
  • Flume
  • MapReduce

Ara no voleu instal·lar ni configurar cap altra eina per a aquesta operació. Només us queda una sola opció, que és el marc de processament de Hadoop MapReduce. El marc MapReduce us proporcionaria un control total sobre les dades mentre feu la transferència. Podeu manipular les columnes i col·locar-les directament a qualsevol de les dues ubicacions de destinació.

Nota:

  • Hem de descarregar i posar el connector MySQL a la ruta de classe de Hadoop per obtenir taules de la taula MySQL. Per fer-ho, descarregueu el connector com.mysql.jdbc_5.1.5.jar i manteniu-lo al directori Hadoop_home / share / Hadoop / MaPreduce / lib.
Descàrregues de cp / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • A més, poseu tots els pots Hbase a la ruta de classe Hadoop per tal que el vostre programa MR accedeixi a Hbase. Per fer-ho, executeu l'ordre següent :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

Les versions de programari que he utilitzat en l'execució d'aquesta tasca són:

  • Hadooop-2.3.0
  • HBase 0.98.9-Hadoop2
  • eclipsi Lluna

Per tal d'evitar el programa en qualsevol problema de compatibilitat, prescric als meus lectors perquè executin l'ordre amb un entorn similar.

DBInputWritable personalitzat:

package com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBInputWritable implements Writable, DBWritable {private int id private String name, dept public void readFields (DataInput in) throws IOException {} public void readFields (ResultSet rs) llança SQLException // L'objecte Resultset representa les dades retornades d'una sentència SQL {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) llança IOException { } public void write (PreparedStatement ps) llança SQLException {ps.setInt (1, id) ps.setString (2, name) ps.setString (3, dept)} public int getId () {return id} public String getName () {return name} public String getDept () {return dept}}

Escriptura DBO personalitzada:

package com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritable implements Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = nom this.id = id this.dept = dept} public void readFields (DataInput in) llança IOException {} public void readFields (ResultSet rs) llança SQLException {} public void write (DataOutput out) llança IOException {} public void write (PreparedStatement ps) llança SQLException {ps.setString (1, nom) ps.setInt (2, id) ps.setString (3, dept)}}

Taula d'entrada:

crear una base de dades edureka
crear la taula emp (empid int no null, nom varchar (30), dept varchar (20), clau principal (empid))
inseriu en valors emp (1, 'abhay', 'desenvolupament'), (2, 'brundesh', 'test')
seleccioneu * entre emp

Cas 1: transferència de MySQL a HDFS

package com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce Importació de treball org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable public class MainDbtohdfs {public static void main (String [] args) throws Exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // classe de controladors' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // nom d'usuari' root ') // contrasenya Tasca de treball = nova tasca de treball (conf) .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormat.clorm) FileOut nova ruta (args [0])) DBInputFormat.setInput (treball, DBInputWritable.class, 'emp', // nom de la taula d'entrada null, nul, cadena nova [] {'empid', 'nom', 'dept'} / / columnes de la taula) Camí p = nou camí (args [0]) FileSystem fs = FileSystem.get (URI nou (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Aquest fragment de codi ens permet preparar o configurar el format d’entrada per accedir a la nostra base de dades SQL d’origen. El paràmetre inclou la classe de controlador, l’URL té l’adreça de la base de dades SQL, el seu nom d’usuari i la contrasenya.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // classe de controladors 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // nom d'usuari 'root') // contrasenya

Aquest fragment de codi ens permet passar els detalls de les taules de la base de dades i establir-lo a l’objecte del treball. Els paràmetres inclouen, per descomptat, la instància de treball, la classe d'escriptura personalitzada que ha d'implementar la interfície DBWritable, el nom de la taula d'origen, la condició si és nul, els paràmetres d'ordenació, i la llista de columnes de la taula respectivament.

DBInputFormat.setInput (treball, DBInputWritable.class, 'emp', // nom de la taula d'entrada nul, nul, cadena nova [] {'empid', 'nom', 'dept'} // columnes de la taula)

Mapper

package com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritable public class Map estén Mapper {
mapa buit protegit (clau LongWritable, valor DBInputWritable, context context) {try {Nom de la cadena = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (text nou (nom + '' + id + '' + dept), id)
} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Reductor: reductor d’identitat utilitzat

Ordre per executar:

pot hadoop dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Sortida: Taula MySQL transferida a HDFS

hadoop dfs -ls / dbtohdfs / *

Cas 2: transferència d'una taula a MySQL a una altra de MySQL

creant una taula de sortida a MySQL

crear taula empleat1 (nom varchar (20), id int, dept varchar (20))

package com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable public class Mainonetable_to_other_table {public static void main (String [] args) genera Excepció {Configuració conf = nova configuració () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // classe de controladors 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // nom d'usuari' root ') // contrasenya Job job = new Job (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) job .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // nom de la taula d'entrada null, null, cadena nova [] {' ',' nom ',' dept '} // columnes de la taula) DBOutputFormat.setOutput (treball,' empleat1 ', // nom de la taula de sortida nova cadena [] {' nom ',' id ',' dept '} // taula columnes) System.exit (job.waitForCompletion (true)? 0: 1)}}

Aquest fragment de codi ens permet configurar el nom de la taula de sortida a SQL DB. Els paràmetres són la instància de treball, el nom de la taula de sortida i els noms de les columnes de sortida respectivament.

DBOutputFormat.setOutput (treball, 'empleat1', // nom de la taula de sortida nova cadena [] {'nom', 'id', 'dept'} // columnes de la taula)

Assignador: igual que el cas 1

Reductor:

package com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable public class Reduce extends Reducer {protected void reduce (tecla de text, valors iterables, context ctx) {int sum = 0 String line [] = key.toString (). Split ('') try {ctx.write (new DBOutputWritable (línia [0] .toString (), Integer.parseInt (línia [1] .toString ()), línia [2] .toString ()), NullWritable.get ())} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Ordre per executar:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Sortida: dades transferides de la taula EMP a MySQL a una altra taula Employee1 de MySQL

Cas 3: Transferència de la taula a MySQL a la taula NoSQL (Hbase)

Creació de la taula Hbase per acomodar la sortida de la taula SQL:

crear 'empleat', 'informació_oficial'

Classe de conductor:

paquet Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text public class MainDbToHbase {public static void main (String [] args) throws Exception {Configuration conf = HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // classe de controladors 'jdbc: mysql: // localhost: 3306 / edureka' , // db url 'root', // nom d'usuari 'root') // contrasenya Job Job = nova Job (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('empleat', Reduce.class, job) Job.setInputFormat. class) DBInputFormat.setInput (treball, DBInputWritable.class, 'emp', // nom de la taula d'entrada nul, nul, cadena nova [] {'empid', 'nom', 'dept'} // columnes de la taula) System.exit (job.waitForCompletion (true)? 0: 1)}}

Aquest fragment de codi us permet configurar la classe de clau de sortida que en cas d’hbase és ImmutableBytesWritable

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Aquí passem el nom de la taula hbase i el reductor per actuar sobre la taula.

TableMapReduceUtil.initTableReducerJob ('empleat', Reduce.class, feina)

Mapeador:

paquet Dbtohbase importació java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable public class Mapa amplia Mapper {private IntWritable one = new IntWritable (1) mapa buit protegit (LongWritable id, DBInputWritable value, context context) {try {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), text nou (line + ') '+ dept))} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

En aquest fragment de codi, estem prenent valors dels getters de la classe DBinputwritable i després els passem
ImmutableBytesWritable perquè arribin al reductor en forma de bytewriatble que Hbase entén.

String line = value.getName () String cd = value.getId () + 'String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line +' '+ dept )))

Reductor:

paquet Dbtohbase importació java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text public class Reduce extends TableReducer {public void reduce (ImmutableBytesWritable key, Valors iterables, Context context) llança IOException, InterruptedException {String [] cause = null // Valors de bucle per a (Text val: valors) {causa = val.toString (). split ('')} // Posa a HBase Posa put = nova Posa (key.get ()) put.add (Bytes.toBytes ('official_info') ), Bytes.toBytes ('nom'), Bytes.toBytes (causa [0])) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('departament'), Bytes.toBytes (causa [1] ])) context.write (clau, posada)}}

Aquest fragment de codi ens permet decidir la fila exacta i la columna en què guardaríem els valors del reductor. Aquí estem emmagatzemant cada empid en fila separada, ja que vam fer empid com a clau de fila que seria única. A cada fila, emmagatzemem la informació oficial dels empleats a la família de columnes 'info_oficial' a les columnes 'nom' i 'departament' respectivament.

Put put = new Put (key.get ()) put.add (Bytes.toBytes ('info_oficial'), Bytes.toBytes ('nom'), Bytes.toBytes (causa [0])) put.add (Bytes. toBytes ('official_info'), Bytes.toBytes ('departament'), Bytes.toBytes (causa [1])) context.write (clau, put)

Dades transferides a Hbase:

empleat d’escaneig

Com veiem, vam poder completar amb èxit la tasca de migrar les nostres dades empresarials d’un DB SQL relacional a un NoSQL DB.

Al proper bloc aprendrem a escriure i executar codis per a altres formats d’entrada i sortida.

Seguiu publicant els vostres comentaris, preguntes o qualsevol comentari. M'encantaria tenir notícies vostres.

Tens alguna pregunta? Esmenta’l a la secció de comentaris i et respondrem.

Articles Relacionats: