Thursday, 19 October 2017

Number of executors and memory calculation in spark

Consider below cluster configuration:

No.of nodes - 6
No.of cores on each node - 16
Memory on each node - 64GB

Number of executors can be considered dynamically based on the usage with the property spark.dynamicAllocation.enabled=true;

If need fixed no.of resources,Good way to design is:

1. Each node should have 1 core and 1GB memory for AM.
==>each node will have 15 cores. It is good to have max of 5 cores for each executor.
==>total no. of executors per node = 15/5 = 3
                      ==>--num-executors 3*6 =18 --executor-cores=5
2. Total memory available in each node - 63GB (As we excluded 1GB for Application Master)
==> Total Memory per each executor = 63/3=21GB
==>0.07*Total memory for Yarn overhead for each executor i.e 0.07*21 ~=2GB
                       ==>Final memory per executor=21-2=19GB i.e --executor-memory=19G

Wednesday, 18 October 2017

Loading data from multiple sources which are not in sync to Final table

--day 1 data--
pk a pk b pk c
1  x 1 y -  -
--stage table1
pk a b c
1 x null null
1 null y null
----- union and group by on pk, get max on each col
--INSERT OVERWRITE TABLE stage_tbl SELECT pk,max(a),max(b),max(c) FROM stage_tbl1 group by pk;
pk a b c
1 x y null

--First time load to Final table
pk a b c timestamp
1 x y null 2017-10-18 11:00:04

--day 2 data--
pk a pk b pk c
- - - - 1  z
2 a 2 b 2 c
3 p 1 o 3 r

--stage table1
pk a b c
1 null o null
1 null null z
2 a null null
2 null b null
2 null null c
3 p null null
3 null null r
----- union and group by on pk, get max on each col
--INSERT OVERWRITE TABLE stage_tbl SELECT pk,max(a),max(b),max(c) FROM stage_tbl1 group by pk;
pk a b c
1 null o z
2 a b c
3 p null r

---delta load to final table

Set1 : select * from final_tbl f where pk in (select pk from stage_tbl)
Set2 : select pk, COALESCE(s.a,f.a) as a,  COALESCE(s.b,f.b) as b,  COALESCE(s.c,f.c) as c from stage_tbl s left join  Set1 f on;
Set3 : select * from final_tbl f where pk not in (select pk from stage_tbl)

insert overwrite table final_tbl (select pk,a,b,c,current_time() from Set2 UNION select pk,a,b,c,current_time() from Set3);

Note: Consider utilizing partitions and buckets on attributes/cols like category for improved performance

 A good reference for Shell scripting