Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Supporting CTAS queries for Hive to Spark query translations #324

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

nimesh1601
Copy link

The approach and details are mentioned in this Doc for reference: Supporting CTAS queries for Hive to Spark

Unit tests are WIP

Comment on lines 59 to 68
if(tableRowFormat != null){
writer.keyword("ROW FORMAT DELIMITED FIELDS TERMINATED BY");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition sounds loose. See all options of the row format: https://cwiki.apache.org/confluence/display/hive/languagemanual+ddl#LanguageManualDDL-RowFormats&SerDe and https://spark.apache.org/docs/latest/sql-ref-syntax-hive-format.html.

It is okay to support only ROW FORMAT DELIMITED FIELDS TERMINATED BY, but the condition should reflect that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting having a conditional check if entered values for the row format are correct or something else?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am saying ROW FORMAT implies a few things (e.g., either SERDE or DELIMITED). Somehow you take tableRowFormat to reflect a lot of things together: ROW FORMAT DELIMITED FIELDS TERMINATED BY.

Comment on lines 15 to 17
private final @Nullable SqlNode tableSerializer;
private final @Nullable SqlNodeList tableFileFormat;
private final @Nullable SqlCharStringLiteral tableRowFormat;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using both serializer and row format is confusing since serializer/deserializer is folded under row format. ROW FORMAT SERDE.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, when referencing serializer, can we just use serDe?
You can also remove the table prefix since it is already implied.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting using a separate custom SqlNode for RowFormat which will contain information like serDe, fieldDelim, colDelim, etc? I think having a custom SqlNode will later help in supporting other syntaxes of row formats in a better way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am saying ROW FORMAT concept encompasses both SERDE and DELIMITED concepts/keywords. The code structure should reflect the syntax structure.

Comment on lines 64 to 77
if(tableFileFormat != null){
if(tableFileFormat.size() == 1){
writer.keyword("STORED AS");
tableFileFormat.get(0).unparse(writer, 0, 0);
writer.newlineAndIndent();
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the above, can we make the conditions stricter?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, are you suggesting to have a check, if the file format is valid, like ORC, Parquet, etc? I think unparsing logic shouldn't worry about these.

import java.util.List;
import java.util.Objects;

public class SqlCreateTable extends SqlCreate {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please depict the SqlNode tree in the comment, or describe what each node is supposed to have? Please see these two examples [1, 2].

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us get back to this, but it might depend on the discussion above.

@@ -64,7 +64,7 @@ public ResponseEntity translate(@RequestBody TranslateRequestBody translateReque
else if (fromLanguage.equalsIgnoreCase("hive")) {
// To Spark
if (toLanguage.equalsIgnoreCase("spark")) {
translatedSql = translateHiveToSpark(query);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for this change? @ljfgem FYI.

@@ -33,4 +36,11 @@ public static String translateHiveToSpark(String query) {
CoralSpark coralSpark = CoralSpark.create(relNode);
return coralSpark.getSparkSql();
}

public static String translateHiveQueryToSparkSql(String query){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will get back to this after resolving the above.

public static CoralSpark create(SqlNode sqlNode, Function<SqlNode, RelNode> convertor){
SparkRelInfo sparkRelInfo;
//apply RelNode transformations for sqlNode eligible for transformation.
if(sqlNode instanceof SqlCreate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding an interface SqlCommand, and have SqlCreateTable extend SqlCreate and implement SqlCommand? SqlCommand defines getSelectQueries() and setSelectQueries(). Those are just list versions of what you have now in SqlCreateTable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this sounds good

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that getSelectQueries() and setSelectQueries() will be used for CREATE TABLE/VIEW....AS. If there's no other use case, can we rename SqlCommand -> SqlCreateCommand?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, but I think commands like INSERT OVERWRITE TABLE tmp select * FROM foo, might also leverage this

*
* @return [[CoralSparkInfo]]
*/
public static CoralSpark create(SqlNode sqlNode, Function<SqlNode, RelNode> convertor){
Copy link
Contributor

@wmoustafa wmoustafa Nov 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we are assuming this is the Coral IR SqlNode. Assuming that Coral IR SqlNode has array indexes start from 1 (see standardizeRel()), I am pretty sure the array index in this tree will start from 0. For consistency, somehow we need a SELECT node here that:
1- is aSqlNode
2- has array indexes starting from 1.
I think this implies that the caller to this method must have taken the query already, converted the SELECT SqlNode to RelNode then back to SELECT SqlNode. The former SELECT will have array indexes start from 0, and the latter will have them start from 1. What remains after the caller has done this step is that the latter Coral IR SqlNode is simply converted to Spark SQL string (using the Spark dialect and after applying theSparkSqlRewriter (on the latter SqlNode)), without having to go through RelNode here.
The caller can call the RelNode-based create on the obtained RelNode in the caller. It is best to code this logic in a test case. @aastha25 @ljfgem FYI.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, currently SqlNode given as input to this function has 0-based indexing. Now as you have mentioned that we need a SELECTnode here should have array indexes starting from 1, so it means all RelNodebased transformations, would have been done by the caller of this function. It means the caller should be able to execute the below transformations on theSELECT `node before calling this new create function.

    if(sqlNode instanceof SqlCommand) {
      SqlNode selectNode = ((SqlCreateTable) sqlNode).getSelectQuery();
      sparkRelInfo = IRRelToSparkRelTransformer.transform(convertor.apply(selectNode));
      selectNode = new CoralRelToSqlNodeConverter().convert(sparkRelInfo.getSparkRelNode());
      ((SqlCreateTable) sqlNode).setSelectQuery(selectNode);
    } else {
      sparkRelInfo = IRRelToSparkRelTransformer.transform(convertor.apply(sqlNode));
      sqlNode = new CoralRelToSqlNodeConverter().convert(sparkRelInfo.getSparkRelNode());
    }

Should we create a separate function in CoralSpark to handle this SqlNode->RelNode->SqlNode transformation?
I don't think, we can call RelNode-based create as currently, it will convert RelNode to SQL directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the translation path:
Language1 (hiveSQL) --> CoralSqlNode (LHS) --> CoralRelNode --> CoralSqlNode (RHS) --> Language 2 (spark SQL)

The coral-spark class is responsible for generating sparkSQL from CoralIR. It currently accepts the CoralRelNode and applies the RHS side of transformations to generate a spark-compatible SQL. In your current implementation of create(..), you are passing CoralSqlNode (LHS) and using convertor.apply(selectNode) to generate CoralRelNode. Could you please refactor to apply the LHS side of transformations outside this class and then invoke existing method create(RelNode irRelNode){..} to generate the updated selectNode?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aastha25 do you mean the entry point of CTAS statement translation should be in another class? I think it's fine to put it in CoralSpark. And looks like create(RelNode irRelNode) can't generate updated selectNode, it would return Spark SQL directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nimesh1601 could you summarize the offline discussion/agreement here?

Copy link
Contributor

@aastha25 aastha25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nimesh1601 for the PR!

*
* @return [[CoralSparkInfo]]
*/
public static CoralSpark create(SqlNode sqlNode, Function<SqlNode, RelNode> convertor){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the argument sqlNode is of type SqlCreate, the query's semantics are not getting validated. For example:
for a table foo with multiple columns, if there are input queries like create table t1 as select * as col0 from foo or create table t1 (a int) as select * from foo, I'm not sure if Coral will throw an exception right now.

The validation work can be incremental, but maybe we can setup a class like ddlSqlValidator in coral-common / coral-hive modules which can validate the different types of ddl statements and invoke it here.

We should validate that the rowtype is compatible for SqlCreateTable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Queries like create table t1 as select * as col0 from foo will fail query parsing, but yes, we would need validation for cases like create table t1 (a int) as select * from foo. Do we have anything similar for select queries?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for select queries, we use the hiveSqlValidator to validate the CoralSqlNode and then generate the relNode representation. This is done here.

if create DDL (and other DDL) are not flowing through this translation path, we should validate them separately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aastha25 I do not think we should create two parallel paths. Can we unify them using common APIs?

break;
}
}
return new SqlCreateTable(ZERO, false, ctOptions.ifNotExists != null ? ctOptions.ifNotExists : false, ctOptions.name,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we introduce a utility class like calcite does here https://github.com/apache/calcite/blob/main/core/src/main/java/org/apache/calcite/sql/ddl/SqlDdlNodes.java to manage all ddl sqlNode creation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aastha25 Why does DDL SqlNode creation warrant a separate path? Why cannot it be done through generic SqlNode generation?

public static CoralSpark create(SqlNode sqlNode, Function<SqlNode, RelNode> convertor){
SparkRelInfo sparkRelInfo;
//apply RelNode transformations for sqlNode eligible for transformation.
if(sqlNode instanceof SqlCreate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that getSelectQueries() and setSelectQueries() will be used for CREATE TABLE/VIEW....AS. If there's no other use case, can we rename SqlCommand -> SqlCreateCommand?

Copy link
Collaborator

@ljfgem ljfgem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nimesh1601 for this PR!
Please run ./gradlew spotlessApply to fix the format issues.

Comment on lines 142 to 143
if(sqlNode instanceof SqlCommand) {
SqlNode selectNode = ((SqlCreateTable) sqlNode).getSelectQuery();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we cast sqlNode to SqlCreateTable if sqlNode instanceof SqlCommand? Will there be more classes implementing SqlCommand?

*
* @return [[CoralSparkInfo]]
*/
public static CoralSpark create(SqlNode sqlNode, Function<SqlNode, RelNode> convertor){
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aastha25 do you mean the entry point of CTAS statement translation should be in another class? I think it's fine to put it in CoralSpark. And looks like create(RelNode irRelNode) can't generate updated selectNode, it would return Spark SQL directly.

Comment on lines 125 to 126
* Internally Appropriate parts of Sql RelNode is converted to Spark RelNode, Spark RelNode is converted back
* to SqlNode and SqlNode to SparkSQL.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not following Internally Appropriate parts of Sql RelNode, could you clarify it?

* 2) Base tables
* 3) Spark UDF information objects, ie. List of {@link SparkUDFInfo}
*
* @param sqlNode CoralNode which will be translated to SparkSQL.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to clarify sqlNode here, it's CoralSqlNode (LHS) in the translation chain Language1 (hiveSQL) --> CoralSqlNode (LHS) --> CoralRelNode --> CoralSqlNode (RHS) --> Language 2 (spark SQL), which is the SqlNode parsed from the source SQL.

* 3) Spark UDF information objects, ie. List of {@link SparkUDFInfo}
*
* @param sqlNode CoralNode which will be translated to SparkSQL.
* @param convertor Functional Interface to convert SqlNode to appropriate RelNode
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think appropriate RelNode can be replaced by CoralRelNode.

@nimesh1601 nimesh1601 requested review from aastha25 and removed request for wmoustafa November 14, 2022 14:38
Copy link
Contributor

@aastha25 aastha25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes!! Left some comments about the structuring, will continue reviewing.

Also, could you please add javadoc for all public classes and methods :)

@@ -52,6 +54,7 @@ public class HiveToRelConverter extends ToRelConverter {
// The validator must be reused
SqlValidator sqlValidator = new HiveSqlValidator(getOperatorTable(), getCalciteCatalogReader(),
((JavaTypeFactory) getRelBuilder().getTypeFactory()), HIVE_SQL);
DdlSqlValidator ddlSqlValidator = new HiveDdlSqlValidator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you also make this private final?

}

private void validateCreateTable(SqlNode sqlNode) {
//Todo need to add appropriate validations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this going to be implemented as part of this PR?

*
* @return [[SqlNode]]
*/
public static SqlNode getCoralSqlNode(RelNode irRelNode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unclear why this is needed.

CoralSpark API should take Coral intermediate representation as input and return Spark SQL / SparkSQLNode.

You could create a

 String convert(SqlCommand sqlCommand, RelNode coralRelNode) {
// generate CoralSqlNode (RHS) for coralRelNode
// set the updated CoralSqlNode (RHS) in sqlCommand
// send this (combined) SqlNode to CoralSqlNodeToSparkSqlNodeConverter and then SparkSqlRewriter
}

RelNode relNode = new HiveToRelConverter(hiveMetastoreClient).convertSql(query);
CoralSpark coralSpark = CoralSpark.create(relNode);
return coralSpark.getSparkSql();
HiveToRelConverter hiveToRelConverter = new HiveToRelConverter(hiveMetastoreClient);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Pl read the comment on CoralSpark.getCoralSqlNode(..) before this comment)

I think this class could use a bit of refactoring.
For SqlNode sqlNode = hiveToRelConverter.toSqlNode(query) : hiveToRelConverter could return a coralSqlNode. Currently, on the LHS side, hiveSqlNode is equivalent CoralSqlNode (LHS).

Inside the if condition, you could extract CoralSelectSqlNode from coralSqlNode, and hand it over first to hiveToRelConverter to generate CoralRelNode. And then hand over sqlCommand + CoralRelNode to CoralSpark.

RelRoot root = getSqlToRelConverter().convertQuery(sqlNode, true, true);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary empty line?

Comment on lines 59 to 68
if(tableRowFormat != null){
writer.keyword("ROW FORMAT DELIMITED FIELDS TERMINATED BY");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am saying ROW FORMAT implies a few things (e.g., either SERDE or DELIMITED). Somehow you take tableRowFormat to reflect a lot of things together: ROW FORMAT DELIMITED FIELDS TERMINATED BY.

Comment on lines 15 to 17
private final @Nullable SqlNode tableSerializer;
private final @Nullable SqlNodeList tableFileFormat;
private final @Nullable SqlCharStringLiteral tableRowFormat;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am saying ROW FORMAT concept encompasses both SERDE and DELIMITED concepts/keywords. The code structure should reflect the syntax structure.

import java.util.List;
import java.util.Objects;

public class SqlCreateTable extends SqlCreate {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us get back to this, but it might depend on the discussion above.

*
* @return [[CoralSparkInfo]]
*/
public static CoralSpark create(SqlNode sqlNode, Function<SqlNode, RelNode> convertor){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nimesh1601 could you summarize the offline discussion/agreement here?

*
* @return [[CoralSparkInfo]]
*/
public static CoralSpark create(SqlNode sqlNode, Function<SqlNode, RelNode> convertor){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aastha25 I do not think we should create two parallel paths. Can we unify them using common APIs?

break;
}
}
return new SqlCreateTable(ZERO, false, ctOptions.ifNotExists != null ? ctOptions.ifNotExists : false, ctOptions.name,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aastha25 Why does DDL SqlNode creation warrant a separate path? Why cannot it be done through generic SqlNode generation?

Comment on lines +11 to +14
public interface DdlSqlValidator {

void validate(SqlNode ddlSqlNode);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who is supposed to call this (external calling code or internally inside some Coral step)? How can we unify it with the existing pipeline for validation? @aastha25 please let me know your thoughts.

Comment on lines +17 to +25
public class SqlDdlNodes {

/** Creates a CREATE TABLE. */
public static SqlCreateTable createTable(SqlParserPos pos, boolean replace, boolean ifNotExists, SqlIdentifier name,
SqlNodeList columnList, SqlNode query, SqlNode tableSerializer, SqlNodeList tableFileFormat,
SqlCharStringLiteral tableRowFormat) {
return new SqlCreateTable(pos, replace, ifNotExists, name, columnList, query, tableSerializer, tableFileFormat,
tableRowFormat);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds redundant? @aastha25

Comment on lines +73 to +80
/**
* Users use this function to get CoralSqlNode from CoralRelNode
* This should be used when user need to get CoralSqlNode from CoralRelNode by applying
* spark specific transformations on CoralRelNode
* with Coral-schema output schema
*
* @return [[SqlNode]]
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java doc needs to be more elaborate, and reflect the relationship with existing methods. @aastha25 could help with that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants