Tuesday 5 November 2019

Spark Notes

    1. Spark is a distributed in memory (mostly) computing framework/processing engine designed for batch and streaming data featuring SQL queries, graph processing and machine learning.
    2. Map Redue is a 2 stage frame work. Spark is multi stage framework operating on DAG.
    3. Why immutable --> to avoid source/raw data modifications, multiple threads can't change the data. RDD can be created at any time in case of failures, caching, sharing and replication
    4. Spark deployment modes - Local, client and Cluster.
      • Client vs Cluster - Driver program is outside of Yarn cluster manager (i.e at client) in client mode.
      • Driver program will be alongside of Application master in Yarn cluster in cluster mode
    5. RDD vs DataFrame vs DataSet:
      1. Use RDD for
        1. low level transformations/actions
        2. When schema is not necessary, ex: accessing columnar format,
        3. When data is unstructured like media streams or steamed text (Spark streaming is available now)

    DataFrame(DF)
    DataSet(DS)
    It is distributed collection of objects of type Row
    It allows users to assign java class to the records inside DF
    Not Type-Safe
    Type-safe (compile time error check)
    Scala, Java, Python and R
    Scala and Java

    Leverages Tungsten’s fast in-memory encoding

    Encoders are highly optimized and use run time code generation to build custom serde. As a result, it is faster than Java/Kyro serialization.

    Comparatively less in size.. Which will improve the network transfer speeds.

    Single interface usable in Java & Scala
    1. Rdd.toDebugString to get RDD lineage graph
    1. Tuning -
      1. Data Serialization:  Use Kyro. But, it doesn't support all serializable types and requires to register the classes. If the objects are large, increase spark.kryoserializer.buffer. If the object is not registered, Kyro will still work. But - it will store full classname with the object which is wasteful
      2. Memory Tuning:
        1. Avoid using String (takes ~40bytes of overhead than raw data) and common collection classes like HashMap, LinkeList etc., (They will have a wrapper object  for each entry).
        2. Prefer to use arrays of objects and primitive types - fastutil library
        3. If you have less than 32 GB of RAM, set the JVM flag -XX:+UseCompressedOops to make pointers be four bytes instead of eight
    Check if there are too many garbage collections by collecting GC stats. If a full GC is invoked multiple times for before a task completes, it means that there isn’t enough memory available for executing tasks.
    -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps to collect GC stats and tuning GC accordingly
    1. Level of parallelism: 2-3 tasks per CPU core is recommended
    2. Memory usage of Reduce Tasks: RDD Serialized persist if possible. Try to reduce shuffles (reduceByKey over groupByKey, coalesce over repartition etc,.
    3. Broadcasting large variables: Broadcast large variables as RDD lookup is O(m) and broadcast is O(1)
    4. Data Locality:
    5. Other considerations:
      1. Use Datasets whenever possible.. Avoid using UDF/UDAF. If there is a need for UDF, implement and extend catalyst's
      2. Akka size  akka.frameSize- Spark message broker for data transfer over n/w.
      3. Check the time taken for execution from stages tab
      4. Check the cache utilization from storage tab.
      5. In spark, JDBC overwrite mode, below actions will be performed - (which is not advisable as we will lose metadata like column constraints)
        1. remove all the metadata like index,PK,FK etc.,
        2. Create table(only col def)
        3. write data
    Better approach(s):
    • collect the data to driver and use standard jdbc tools like scalikejdbc to perform required operation
    • use spark truncate=true property along with overwrite
    • First truncate the table using standard jdbc tools then use append mode
    1. Partition calculation: no.of Executers * no. of Cores
    2. Executors & Memory calculation: https://mylearninginbigdata.blogspot.com/2018/06/number-of-executors-and-memory.html
    memoryOverhead = MAX( driver/executor memory * 0.1, 384) --> default calculation.
    1. spark.driver.supervise can be used to restart the driver automatically in case it fails with non-zero exit code. It is supported in standalone, mesos cluster only
    1. spark.memory.fraction : The lower this is, the more frequently spills and cached data eviction occur.
    2. spark.shuffle.memoryFraction:  If spills are often, consider increasing this value at the expense of spark.storage.memoryFraction.
    1. File formats which one and when **  -->File Formats
    2. Spark v1 vs v2 -
      1. Unified DataFrame and DataSet API
      2. Standard SQL support
      3. SparkSession is the entry point and it subsumes SQLContext and HiveContext.
      4. Improved performance in Accumulator
      5. DF based ML APIs
      6. ML pipeline persistence - Users can now save and load machine learning pipelines and models across all programming languages supported by Spark
      7. Distributing algorithms in R - Added support for Generalized Linear Models (GLM), Naive Bayes, Survival Regression, and K-Means in R.
      8. User-defined functions (UDFs) in R - Added support for running partition level UDFs (dapply and gapply) and hyper-parameter tuning (lapply).
      9. Improved Catalyst optimizer , introduced Tungstun tuning (encode, decode, generate entire code as single function etc.,) and Vectorized Parquet
      10. Spark structured streaming - Rich integration with Batch process
    3. Spark catalog is used to manage views/tables.
      1. Ex: spark.catalog.listTables.show (It will list local tables/views)
      2. Ex: spark.catalog.listTables("global_temp").show (It will list global&local tables/views)
    4. In CSV read - options for mode are - -> permissive, dropmalformed and failfast.
    5. Coalesce can be used to reduce the no.of partitions and it doesn't shuffle the data, however instructs spark to read multiple partitions as one
    6. readStream (provide topic).load(), writeStream(topic,key,value).start()--- watermark("10 min") should be applied on same as aggr column and before aggr.
    7. Code executes on executor only when SPARK APIs are used i.e operations on RDD,DF or DS. All the other code i.e before using sparkSession/context/spark api  or operations on collected data will be executed on driver.
    8. Mistakes to avoid:
      1. Wrong calculation of executors - remember to Consider yarn-memory-overhead as 15-20%
      2. No spark shuffle block size can be greater than 2GB
      3. Default # of partitions used when shuffle is involved is 200. So use re partition of coalesce wisely (spark.sql.shuffle.partitions)
        1. How many partitions? ~256MB per partition
        2. Remember the number 2000 (Spark book keeping limit) if the number of partitions is near to 2000, bump the number
    References:



 A good reference for Shell scripting  https://linuxcommand.org/lc3_writing_shell_scripts.php