Thursday, 19 October 2017

Number of executors and memory calculation in spark

Consider below cluster configuration:

No.of nodes - 6
No.of cores on each node - 16
Memory on each node - 64GB

Number of executors can be considered dynamically based on the usage with the property spark.dynamicAllocation.enabled=true;

If need fixed no.of resources,Good way to design is:

1. Each node should have 1 core and 1GB memory for AM.
==>each node will have 15 cores. It is good to have max of 5 cores for each executor.
==>total no. of executors per node = 15/5 = 3
                      ==>--num-executors 3*6 =18 --executor-cores=5
2. Total memory available in each node - 63GB (As we excluded 1GB for Application Master)
==> Total Memory per each executor = 63/3=21GB
==>0.07*Total memory for Yarn overhead for each executor i.e 0.07*21 ~=2GB
                       ==>Final memory per executor=21-2=19GB i.e --executor-memory=19G

Wednesday, 18 October 2017

Loading data from multiple sources which are not in sync to Final table

--day 1 data--
pk a pk b pk c
1  x 1 y -  -
--stage table1
pk a b c
1 x null null
1 null y null
----- union and group by on pk, get max on each col
--INSERT OVERWRITE TABLE stage_tbl SELECT pk,max(a),max(b),max(c) FROM stage_tbl1 group by pk;
--stage_tbl
pk a b c
1 x y null

--First time load to Final table
--final_tbl
pk a b c timestamp
1 x y null 2017-10-18 11:00:04

--day 2 data--
pk a pk b pk c
- - - - 1  z
2 a 2 b 2 c
3 p 1 o 3 r

--stage table1
pk a b c
1 null o null
1 null null z
2 a null null
2 null b null
2 null null c
3 p null null
3 null null r
----- union and group by on pk, get max on each col
--INSERT OVERWRITE TABLE stage_tbl SELECT pk,max(a),max(b),max(c) FROM stage_tbl1 group by pk;
--stage_tbl
pk a b c
1 null o z
2 a b c
3 p null r

---delta load to final table

Set1 : select * from final_tbl f where pk in (select pk from stage_tbl)
Set2 : select pk, COALESCE(s.a,f.a) as a,  COALESCE(s.b,f.b) as b,  COALESCE(s.c,f.c) as c from stage_tbl s left join  Set1 f on s.pk=f.pk;
Set3 : select * from final_tbl f where pk not in (select pk from stage_tbl)

insert overwrite table final_tbl (select pk,a,b,c,current_time() from Set2 UNION select pk,a,b,c,current_time() from Set3);

Note: Consider utilizing partitions and buckets on attributes/cols like category for improved performance

The Mindset Behind Reliable Data Systems I’ve been in data engineering long enough to see the stack change many times over. Tools come and g...