Wednesday, 20 May 2020

Airflow SparkSubmitOperator example

Creating a connection




Note:
  • config values passed from SparkSubmitOperator takes precedence over the connection parameters. 
  • Using the spark-home in extra properties will work for all versions in case of multiple spark installations
  • default deployment mode is client. Adding 'deploy-mode':'cluster' in extra properties will change it to cluster mode

Thursday, 23 April 2020

Reflect function in Hive and Spark SQL

Reflect function:

A Java class and method often exists to handle the exact function a user would like to use in Hive. Rather than having to write a wrapper UDF to call this method, the majority of these methods can be called using reflect UDF. Reflect uses Java reflection to instantiate and call methods of objects; it can also call static functions. The method must return a primitive type or a type that Hive knows how to serialize.


SELECT reflect("java.lang.String""valueOf"1),
       reflect("java.lang.String""isEmpty"),
       reflect("java.lang.Math""max"23),
       reflect("java.lang.Math""min"23),
       reflect("java.lang.Math""round"2.5),
       reflect("java.lang.Math""exp"1.0),
       reflect("java.lang.Math""floor"1.9)
FROM src LIMIT 1;
1   true    3   2   3   2.7182818284590455  1.0

SHA256 encoding using Java's DigestUtils and reflect method in Hive:


hive> SELECT Reflect('org.apache.commons.codec.digest.DigestUtils', 'sha256Hex', 'HANU');
OK
72102548c156fe16ed7ff108def7ddf19332d510b0afc29749a83dfd47787077
If we pass NULL value to above method, it throws exception

hive> SELECT Reflect('org.apache.commons.codec.digest.DigestUtils', 'sha256Hex', NULL);
OK
Failed with exception java.io.IOException:
org.apache.hadoop.hive.ql.metadata.HiveException: UDFReflect getMethod
20/04/23 11:00:05 ERROR CliDriver: Failed with exception 
java.io.IOException: org.apache.hadoop.hive.ql.metadata.HiveException: UDFReflect getMethod
        at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:154)
        at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:1693)
        at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:233)
        at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:165)
        at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
        at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:736)
        at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:681)
        at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:621)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.util.RunJar.run(RunJar.java:234)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:148)

Surprisingly Hive handles it using CAST(null as string)

hive> select Reflect('org.apache.commons.codec.digest.DigestUtils', 'sha256Hex',
cast(null as string));
OK
UDFReflect evaluate java.lang.reflect.InvocationTargetException 
method = public static java.lang.String org.apache.commons.codec.digest.
DigestUtilssha256Hex(java.lang.String) 
args = [null]
NULL

 

 

spark.sql(" select Reflect('org.apache.commons.codec.digest.DigestUtils', 'sha256Hex',cast(null as string))").show()
[Stage 427:>                                                        (0 + 1) / 1]20/04/23 11:25:43 WARN TaskSetManager: Lost task 0.0 in stage 427.0 469): java.lang.reflect.InvocationTargetException
Caused by: java.lang.NullPointerException

Tuesday, 5 November 2019

Spark Notes

    1. Spark is a distributed in memory (mostly) computing framework/processing engine designed for batch and streaming data featuring SQL queries, graph processing and machine learning.
    2. Map Redue is a 2 stage frame work. Spark is multi stage framework operating on DAG.
    3. Why immutable --> to avoid source/raw data modifications, multiple threads can't change the data. RDD can be created at any time in case of failures, caching, sharing and replication
    4. Spark deployment modes - Local, client and Cluster.
      • Client vs Cluster - Driver program is outside of Yarn cluster manager (i.e at client) in client mode.
      • Driver program will be alongside of Application master in Yarn cluster in cluster mode
    5. RDD vs DataFrame vs DataSet:
      1. Use RDD for
        1. low level transformations/actions
        2. When schema is not necessary, ex: accessing columnar format,
        3. When data is unstructured like media streams or steamed text (Spark streaming is available now)

    DataFrame(DF)
    DataSet(DS)
    It is distributed collection of objects of type Row
    It allows users to assign java class to the records inside DF
    Not Type-Safe
    Type-safe (compile time error check)
    Scala, Java, Python and R
    Scala and Java

    Leverages Tungsten’s fast in-memory encoding

    Encoders are highly optimized and use run time code generation to build custom serde. As a result, it is faster than Java/Kyro serialization.

    Comparatively less in size.. Which will improve the network transfer speeds.

    Single interface usable in Java & Scala
    1. Rdd.toDebugString to get RDD lineage graph
    1. Tuning -
      1. Data Serialization:  Use Kyro. But, it doesn't support all serializable types and requires to register the classes. If the objects are large, increase spark.kryoserializer.buffer. If the object is not registered, Kyro will still work. But - it will store full classname with the object which is wasteful
      2. Memory Tuning:
        1. Avoid using String (takes ~40bytes of overhead than raw data) and common collection classes like HashMap, LinkeList etc., (They will have a wrapper object  for each entry).
        2. Prefer to use arrays of objects and primitive types - fastutil library
        3. If you have less than 32 GB of RAM, set the JVM flag -XX:+UseCompressedOops to make pointers be four bytes instead of eight
    Check if there are too many garbage collections by collecting GC stats. If a full GC is invoked multiple times for before a task completes, it means that there isn’t enough memory available for executing tasks.
    -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps to collect GC stats and tuning GC accordingly
    1. Level of parallelism: 2-3 tasks per CPU core is recommended
    2. Memory usage of Reduce Tasks: RDD Serialized persist if possible. Try to reduce shuffles (reduceByKey over groupByKey, coalesce over repartition etc,.
    3. Broadcasting large variables: Broadcast large variables as RDD lookup is O(m) and broadcast is O(1)
    4. Data Locality:
    5. Other considerations:
      1. Use Datasets whenever possible.. Avoid using UDF/UDAF. If there is a need for UDF, implement and extend catalyst's
      2. Akka size  akka.frameSize- Spark message broker for data transfer over n/w.
      3. Check the time taken for execution from stages tab
      4. Check the cache utilization from storage tab.
      5. In spark, JDBC overwrite mode, below actions will be performed - (which is not advisable as we will lose metadata like column constraints)
        1. remove all the metadata like index,PK,FK etc.,
        2. Create table(only col def)
        3. write data
    Better approach(s):
    • collect the data to driver and use standard jdbc tools like scalikejdbc to perform required operation
    • use spark truncate=true property along with overwrite
    • First truncate the table using standard jdbc tools then use append mode
    1. Partition calculation: no.of Executers * no. of Cores
    2. Executors & Memory calculation: https://mylearninginbigdata.blogspot.com/2018/06/number-of-executors-and-memory.html
    memoryOverhead = MAX( driver/executor memory * 0.1, 384) --> default calculation.
    1. spark.driver.supervise can be used to restart the driver automatically in case it fails with non-zero exit code. It is supported in standalone, mesos cluster only
    1. spark.memory.fraction : The lower this is, the more frequently spills and cached data eviction occur.
    2. spark.shuffle.memoryFraction:  If spills are often, consider increasing this value at the expense of spark.storage.memoryFraction.
    1. File formats which one and when **  -->File Formats
    2. Spark v1 vs v2 -
      1. Unified DataFrame and DataSet API
      2. Standard SQL support
      3. SparkSession is the entry point and it subsumes SQLContext and HiveContext.
      4. Improved performance in Accumulator
      5. DF based ML APIs
      6. ML pipeline persistence - Users can now save and load machine learning pipelines and models across all programming languages supported by Spark
      7. Distributing algorithms in R - Added support for Generalized Linear Models (GLM), Naive Bayes, Survival Regression, and K-Means in R.
      8. User-defined functions (UDFs) in R - Added support for running partition level UDFs (dapply and gapply) and hyper-parameter tuning (lapply).
      9. Improved Catalyst optimizer , introduced Tungstun tuning (encode, decode, generate entire code as single function etc.,) and Vectorized Parquet
      10. Spark structured streaming - Rich integration with Batch process
    3. Spark catalog is used to manage views/tables.
      1. Ex: spark.catalog.listTables.show (It will list local tables/views)
      2. Ex: spark.catalog.listTables("global_temp").show (It will list global&local tables/views)
    4. In CSV read - options for mode are - -> permissive, dropmalformed and failfast.
    5. Coalesce can be used to reduce the no.of partitions and it doesn't shuffle the data, however instructs spark to read multiple partitions as one
    6. readStream (provide topic).load(), writeStream(topic,key,value).start()--- watermark("10 min") should be applied on same as aggr column and before aggr.
    7. Code executes on executor only when SPARK APIs are used i.e operations on RDD,DF or DS. All the other code i.e before using sparkSession/context/spark api  or operations on collected data will be executed on driver.
    8. Mistakes to avoid:
      1. Wrong calculation of executors - remember to Consider yarn-memory-overhead as 15-20%
      2. No spark shuffle block size can be greater than 2GB
      3. Default # of partitions used when shuffle is involved is 200. So use re partition of coalesce wisely (spark.sql.shuffle.partitions)
        1. How many partitions? ~256MB per partition
        2. Remember the number 2000 (Spark book keeping limit) if the number of partitions is near to 2000, bump the number
    References:



Wednesday, 8 August 2018

Movie Recommendation engine using SQL

Data Model


Data Model







Query to get the recommendation:

SELECT f.ID, MVE_ID, IF(AVG(IF(Recommended='Y',1,0))>=0.5,'Y','N')
FROM friend_tbl f LEFT JOIN feedback_tbl fb ON f.Frnd_ID = fb.VIEWER_ID GROUP BY f.ID,MVE_ID;

Tuesday, 17 July 2018

When to use RDD, Dataframe and Dataset

When to use RDDs?


  • you want low-level transformation and actions and control on your dataset;
  • your data is unstructured, such as media streams or streams of text;
  • you want to manipulate your data with functional programming constructs than domain specific expressions
  • you don’t care about imposing a schema, such as columnar format, while processing or accessing data attributes by name or column; and
  • you can forgo some optimization and performance benefits available with DataFrames and Datasets for structured and semi-structured data.

When should I use DataFrames or Datasets?


  • If you want rich semantics, high-level abstractions, and domain specific APIs, use DataFrame or Dataset.
  • If your processing demands high-level expressions, filters, maps, aggregation, averages, sum, SQL queries, columnar access and use of lambda functions on semi-structured data, use DataFrame or Dataset.
  • If you want higher degree of type-safety at compile time, want typed JVM objects, take advantage of Catalyst optimization, and benefit from Tungsten’s efficient code generation, use Dataset.
  • If you want unification and simplification of APIs across Spark Libraries, use DataFrame or Dataset.
  • If you are a R user, use DataFrames.
  • If you are a Python user, use DataFrames and resort back to RDDs if you need more control.
Reference:

Thursday, 22 February 2018

Convert single column into multiple in Spark

Consider below Sample JSON which has geo column with latitude and longitude values.
let's convert this into multiple columns dynamically.

{"Name":"Hanu","Address":{"Address1":"11213","Address2":"N TEST BLVD","City":"MIAMI"},"State":"FL"}

Below code will convert the Address into multiple columns
val sample = spark.read.json("/myHome/sample.json")
smaple.select('Name,"$Address.*",'State).show

Code snippet


 A good reference for Shell scripting  https://linuxcommand.org/lc3_writing_shell_scripts.php