Thursday, 1 January 2026

The Mindset Behind Reliable Data Systems

I’ve been in data engineering long enough to see the stack change many times over.

Tools come and go.
Architectures get renamed.
What was “best practice” five years ago quietly disappears.

What hasn’t changed is how experienced data engineers think.

Here are a few lessons that only became obvious to me after years of building and fixing data systems at scale.


Start with the decision, not the design.

Early in my career, I focused on building impressive pipelines. Over time, I learned that a pipeline without a clear consumer is just movement, not progress. Every system should exist to support a decision. If you can’t name that decision, you’re not ready to design anything yet.


Assume the data will change — and plan for it.

Schemas evolve. Definitions drift. Upstream teams refactor without telling you. This isn’t bad engineering; it’s reality. Strong data systems don’t rely on stability — they’re built to adapt. Validation, observability, and recovery matter more than perfect assumptions.


Treat data like a product, not a side effect.

A pipeline that runs on schedule but produces confusing or untrusted data has failed. Data engineers don’t just deliver tables; they deliver clarity. If your users don’t know what the data means or when to trust it, the work isn’t done.


Optimize for understanding before optimization.

I’ve debugged enough production issues to know that the fastest system isn’t always the best one. When something breaks, the ability to trace, explain, and fix the problem quickly matters far more than shaving milliseconds off a job. Simple systems survive longer.


Be honest about tradeoffs.

There is no perfect architecture. Every choice trades cost, latency, flexibility, or correctness. Mature engineers don’t hide these tradeoffs — they communicate them clearly so the business can make informed decisions.


Build for the next engineer, not just yourself.

You won’t always be there to explain what you built. Clear structure, naming, and documentation aren’t nice-to-haves — they’re how systems scale beyond individuals. If a new engineer can understand your work without a meeting, you’ve done your job well.


After nearly two decades in this field, one thing is clear to me:

Data engineering isn’t about moving data faster.
It’s about making complexity manageable and decisions dependable.

Everything else is just an implementation detail.


Wednesday, 20 May 2020

Airflow SparkSubmitOperator example

Creating a connection




Note:
  • config values passed from SparkSubmitOperator takes precedence over the connection parameters. 
  • Using the spark-home in extra properties will work for all versions in case of multiple spark installations
  • default deployment mode is client. Adding 'deploy-mode':'cluster' in extra properties will change it to cluster mode

Thursday, 23 April 2020

Reflect function in Hive and Spark SQL

Reflect function:

A Java class and method often exists to handle the exact function a user would like to use in Hive. Rather than having to write a wrapper UDF to call this method, the majority of these methods can be called using reflect UDF. Reflect uses Java reflection to instantiate and call methods of objects; it can also call static functions. The method must return a primitive type or a type that Hive knows how to serialize.


SELECT reflect("java.lang.String""valueOf"1),
       reflect("java.lang.String""isEmpty"),
       reflect("java.lang.Math""max"23),
       reflect("java.lang.Math""min"23),
       reflect("java.lang.Math""round"2.5),
       reflect("java.lang.Math""exp"1.0),
       reflect("java.lang.Math""floor"1.9)
FROM src LIMIT 1;
1   true    3   2   3   2.7182818284590455  1.0

SHA256 encoding using Java's DigestUtils and reflect method in Hive:


hive> SELECT Reflect('org.apache.commons.codec.digest.DigestUtils', 'sha256Hex', 'HANU');
OK
72102548c156fe16ed7ff108def7ddf19332d510b0afc29749a83dfd47787077
If we pass NULL value to above method, it throws exception

hive> SELECT Reflect('org.apache.commons.codec.digest.DigestUtils', 'sha256Hex', NULL);
OK
Failed with exception java.io.IOException:
org.apache.hadoop.hive.ql.metadata.HiveException: UDFReflect getMethod
20/04/23 11:00:05 ERROR CliDriver: Failed with exception 
java.io.IOException: org.apache.hadoop.hive.ql.metadata.HiveException: UDFReflect getMethod
        at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:154)
        at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:1693)
        at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:233)
        at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:165)
        at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
        at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:736)
        at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:681)
        at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:621)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.util.RunJar.run(RunJar.java:234)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:148)

Surprisingly Hive handles it using CAST(null as string)

hive> select Reflect('org.apache.commons.codec.digest.DigestUtils', 'sha256Hex',
cast(null as string));
OK
UDFReflect evaluate java.lang.reflect.InvocationTargetException 
method = public static java.lang.String org.apache.commons.codec.digest.
DigestUtilssha256Hex(java.lang.String) 
args = [null]
NULL

 

 

spark.sql(" select Reflect('org.apache.commons.codec.digest.DigestUtils', 'sha256Hex',cast(null as string))").show()
[Stage 427:>                                                        (0 + 1) / 1]20/04/23 11:25:43 WARN TaskSetManager: Lost task 0.0 in stage 427.0 469): java.lang.reflect.InvocationTargetException
Caused by: java.lang.NullPointerException

Tuesday, 5 November 2019

Spark Notes

    1. Spark is a distributed in memory (mostly) computing framework/processing engine designed for batch and streaming data featuring SQL queries, graph processing and machine learning.
    2. Map Redue is a 2 stage frame work. Spark is multi stage framework operating on DAG.
    3. Why immutable --> to avoid source/raw data modifications, multiple threads can't change the data. RDD can be created at any time in case of failures, caching, sharing and replication
    4. Spark deployment modes - Local, client and Cluster.
      • Client vs Cluster - Driver program is outside of Yarn cluster manager (i.e at client) in client mode.
      • Driver program will be alongside of Application master in Yarn cluster in cluster mode
    5. RDD vs DataFrame vs DataSet:
      1. Use RDD for
        1. low level transformations/actions
        2. When schema is not necessary, ex: accessing columnar format,
        3. When data is unstructured like media streams or steamed text (Spark streaming is available now)

    DataFrame(DF)
    DataSet(DS)
    It is distributed collection of objects of type Row
    It allows users to assign java class to the records inside DF
    Not Type-Safe
    Type-safe (compile time error check)
    Scala, Java, Python and R
    Scala and Java

    Leverages Tungsten’s fast in-memory encoding

    Encoders are highly optimized and use run time code generation to build custom serde. As a result, it is faster than Java/Kyro serialization.

    Comparatively less in size.. Which will improve the network transfer speeds.

    Single interface usable in Java & Scala
    1. Rdd.toDebugString to get RDD lineage graph
    1. Tuning -
      1. Data Serialization:  Use Kyro. But, it doesn't support all serializable types and requires to register the classes. If the objects are large, increase spark.kryoserializer.buffer. If the object is not registered, Kyro will still work. But - it will store full classname with the object which is wasteful
      2. Memory Tuning:
        1. Avoid using String (takes ~40bytes of overhead than raw data) and common collection classes like HashMap, LinkeList etc., (They will have a wrapper object  for each entry).
        2. Prefer to use arrays of objects and primitive types - fastutil library
        3. If you have less than 32 GB of RAM, set the JVM flag -XX:+UseCompressedOops to make pointers be four bytes instead of eight
    Check if there are too many garbage collections by collecting GC stats. If a full GC is invoked multiple times for before a task completes, it means that there isn’t enough memory available for executing tasks.
    -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps to collect GC stats and tuning GC accordingly
    1. Level of parallelism: 2-3 tasks per CPU core is recommended
    2. Memory usage of Reduce Tasks: RDD Serialized persist if possible. Try to reduce shuffles (reduceByKey over groupByKey, coalesce over repartition etc,.
    3. Broadcasting large variables: Broadcast large variables as RDD lookup is O(m) and broadcast is O(1)
    4. Data Locality:
    5. Other considerations:
      1. Use Datasets whenever possible.. Avoid using UDF/UDAF. If there is a need for UDF, implement and extend catalyst's
      2. Akka size  akka.frameSize- Spark message broker for data transfer over n/w.
      3. Check the time taken for execution from stages tab
      4. Check the cache utilization from storage tab.
      5. In spark, JDBC overwrite mode, below actions will be performed - (which is not advisable as we will lose metadata like column constraints)
        1. remove all the metadata like index,PK,FK etc.,
        2. Create table(only col def)
        3. write data
    Better approach(s):
    • collect the data to driver and use standard jdbc tools like scalikejdbc to perform required operation
    • use spark truncate=true property along with overwrite
    • First truncate the table using standard jdbc tools then use append mode
    1. Partition calculation: no.of Executers * no. of Cores
    2. Executors & Memory calculation: https://mylearninginbigdata.blogspot.com/2018/06/number-of-executors-and-memory.html
    memoryOverhead = MAX( driver/executor memory * 0.1, 384) --> default calculation.
    1. spark.driver.supervise can be used to restart the driver automatically in case it fails with non-zero exit code. It is supported in standalone, mesos cluster only
    1. spark.memory.fraction : The lower this is, the more frequently spills and cached data eviction occur.
    2. spark.shuffle.memoryFraction:  If spills are often, consider increasing this value at the expense of spark.storage.memoryFraction.
    1. File formats which one and when **  -->File Formats
    2. Spark v1 vs v2 -
      1. Unified DataFrame and DataSet API
      2. Standard SQL support
      3. SparkSession is the entry point and it subsumes SQLContext and HiveContext.
      4. Improved performance in Accumulator
      5. DF based ML APIs
      6. ML pipeline persistence - Users can now save and load machine learning pipelines and models across all programming languages supported by Spark
      7. Distributing algorithms in R - Added support for Generalized Linear Models (GLM), Naive Bayes, Survival Regression, and K-Means in R.
      8. User-defined functions (UDFs) in R - Added support for running partition level UDFs (dapply and gapply) and hyper-parameter tuning (lapply).
      9. Improved Catalyst optimizer , introduced Tungstun tuning (encode, decode, generate entire code as single function etc.,) and Vectorized Parquet
      10. Spark structured streaming - Rich integration with Batch process
    3. Spark catalog is used to manage views/tables.
      1. Ex: spark.catalog.listTables.show (It will list local tables/views)
      2. Ex: spark.catalog.listTables("global_temp").show (It will list global&local tables/views)
    4. In CSV read - options for mode are - -> permissive, dropmalformed and failfast.
    5. Coalesce can be used to reduce the no.of partitions and it doesn't shuffle the data, however instructs spark to read multiple partitions as one
    6. readStream (provide topic).load(), writeStream(topic,key,value).start()--- watermark("10 min") should be applied on same as aggr column and before aggr.
    7. Code executes on executor only when SPARK APIs are used i.e operations on RDD,DF or DS. All the other code i.e before using sparkSession/context/spark api  or operations on collected data will be executed on driver.
    8. Mistakes to avoid:
      1. Wrong calculation of executors - remember to Consider yarn-memory-overhead as 15-20%
      2. No spark shuffle block size can be greater than 2GB
      3. Default # of partitions used when shuffle is involved is 200. So use re partition of coalesce wisely (spark.sql.shuffle.partitions)
        1. How many partitions? ~256MB per partition
        2. Remember the number 2000 (Spark book keeping limit) if the number of partitions is near to 2000, bump the number
    References:



Wednesday, 8 August 2018

Movie Recommendation engine using SQL

Data Model


Data Model







Query to get the recommendation:

SELECT f.ID, MVE_ID, IF(AVG(IF(Recommended='Y',1,0))>=0.5,'Y','N')
FROM friend_tbl f LEFT JOIN feedback_tbl fb ON f.Frnd_ID = fb.VIEWER_ID GROUP BY f.ID,MVE_ID;

Tuesday, 17 July 2018

When to use RDD, Dataframe and Dataset

When to use RDDs?


  • you want low-level transformation and actions and control on your dataset;
  • your data is unstructured, such as media streams or streams of text;
  • you want to manipulate your data with functional programming constructs than domain specific expressions
  • you don’t care about imposing a schema, such as columnar format, while processing or accessing data attributes by name or column; and
  • you can forgo some optimization and performance benefits available with DataFrames and Datasets for structured and semi-structured data.

When should I use DataFrames or Datasets?


  • If you want rich semantics, high-level abstractions, and domain specific APIs, use DataFrame or Dataset.
  • If your processing demands high-level expressions, filters, maps, aggregation, averages, sum, SQL queries, columnar access and use of lambda functions on semi-structured data, use DataFrame or Dataset.
  • If you want higher degree of type-safety at compile time, want typed JVM objects, take advantage of Catalyst optimization, and benefit from Tungsten’s efficient code generation, use Dataset.
  • If you want unification and simplification of APIs across Spark Libraries, use DataFrame or Dataset.
  • If you are a R user, use DataFrames.
  • If you are a Python user, use DataFrames and resort back to RDDs if you need more control.
Reference:

The Mindset Behind Reliable Data Systems I’ve been in data engineering long enough to see the stack change many times over. Tools come and g...