- 浏览: 957446 次
文章分类
最新评论
-
l67721363:
感谢分享,要是有各个函数性能比较就好了。
SQL优化 数据库优化 -
hanmiao:
此图片来自QQ空间,未经允许不可引用。
Hacking QQ空间
Spark SQL组件源码分析
功能
Spark新发布的Spark SQL组件让Spark对SQL有了别样于Shark基于Hive的支持。参考官方手册,具体分三部分:
其一,能在Scala代码里写SQL,支持简单的SQL语法检查,能把RDD指定为Table存储起来。此外支持部分SQL语法的DSL。
其二,支持Parquet文件的读写,且保留Schema。
其三,能在Scala代码里访问Hive元数据,能执行Hive语句,并且把结果取回作为RDD使用。
第一点对SQL的支持主要依赖了Catalyst这个新的查询优化框架(下面会给出一些Catalyst的简介),在把sql解析成逻辑执行计划之后,利用Catalyst包里的一些类和接口,执行了一些简单的执行计划优化,最后变成RDD的计算。虽然目前的sql parser比较简单,执行计划的优化比较通配,还有些参考价值,所以看了下这块代码。目前这个PR在昨天已经merge进了主干,可以在sql模块里看到这部分实现,还有catalyst模块看到Catalyst的代码。下面会具体介绍Spark SQL模块的实现。
第二点对Parquet的支持不关注,因为我们的应用场景里不会使用Parquet这样的列存储,适用场景不一样。
第三点对Hive的这种结合方式,没有什么核心的进展。与Shark相比,Shark依赖Hive的Metastore,解析器等能把hql执行变成Spark上的计算,而Hive的现在这种结合方式与代码里引入Hive包执行hql没什么本质区别,只是把hive hql的数据与RDD的打通这种交互做得更友好了。
Catalyst介绍
参考spark summit里关于Catalyst的资料,Catalyst: A Query Optimization Framework for Spark and Shark
Query optimization can greatly improve both the productivity of developers and the performance of the queries that they write. A good query optimizer is capable of automatically rewriting relational queries to
execute more efficiently, using techniques such as filtering data early, utilizing available indexes, and even ensuring different data sources are joined in the most efficient order. By performing these transformations, the optimizer not only improves
the execution times of relational queries, but also
frees the developer to focus on the semantics of their application instead of its performance. Unfortunately, building an optimizer is a incredibly complex engineering task and thus many open source systems perform only very simple optimizations. Past
research[1][2] has attempted to combat this complexity by providing frameworks that allow the creators of optimizers to write possible optimizations as a set of declarative rules. However, the use of such frameworks has required the creation and maintenance
of special “optimizer compilers” and forced the burden of learning a complex domain specific language upon those wishing to add features to the optimizer. Instead, we propose Catalyst, a query optimization framework embedded in Scala.Catalyst
takes advantage of Scala’s powerful language features such as pattern matching and runtime metaprogramming to allow developers to concisely specify complex relational optimizations.In this talk I will describe the framework and how it allows developers
toexpress complex query transformations in very few lines of code. I will also describe our initialefforts at improving the execution time of Shark queries
by greatly improving its query optimization capabilities.
总体上说,Catalyst是一个implementation-agnostic framework for manipulating trees of relational operators and expressions.主要由三部分组成:
- a TreeNode library for transforming trees that are expressed as Scala case classes,
- a logical plan representation for relational operators,
- an expression library.
分析
根据官方给出的Spark SQL例子,分析一下内部实现。
val sc: SparkContext // An existing SparkContext. val sqlContext = new SqlContext(sc) // Importing the SQL context gives access to all the public SQL functions and implicit conversions. import sqlContext._ // Define the schema using a case class. case class Person(name: String, age: String) // Create an RDD of Person objects and register it as a table. val people: RDD[Person] = sc.textFile("people.txt").map(_.split(",")).map(p => Person(p(0), p(1).toInt)) people.registerAsTable("people") val teenagers = sql("SELECT name FROM people WHERE age >= 10 && age <= 19") // The results of SQL queries are themselves RDDs and support all the normal operations teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
SQLContext是SQL模块一个总的执行环境,
val people: RDD[Person] = sc.textFile("people.txt").map(_.split(",")).map(p => Person(p(0), p(1).toInt)) 原本是通过sc的文件读取和两次MappedRDD transform生成一个people RDD,由于import sqlContext._ 之后,把隐式函数createSchemaRDD引进了上下文中,
implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) = new SchemaRDD( this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd)))被声明为 RDD[Person]的people是一个带case class的RDD,所以被转换成了SchemaRDD。
SchemaRDD是SQL模块增加的一个RDD实现类,SchemaRDD在new的时候需要两部分
class SchemaRDD( @transient val sqlContext: SQLContext, @transient val logicalPlan: LogicalPlan) extends RDD[Row](sqlContext.sparkContext, Nil) {SQLContext和逻辑执行计划。在createSchemaRDD方法中,ExsitingRdd.fromProductRdd(rdd)对people这个rdd做了这一件事情:
def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = { ExistingRdd(ScalaReflection.attributesFor[A], productToRowRdd(productRdd)) }
首先根据A,即Person这个case class,通过Scala反射出了类的属性,对于table来说就是取到了各个column。其次,productToRowRdd把rdd转化成了一个RDD[Row],
def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = { // TODO: Reuse the row, don't use map on the product iterator. Maybe code gen? data.map(r => new GenericRow(r.productIterator.map(convertToCatalyst).toArray): Row) }这两步之后,其实就是基于people这个RDD,得到一个case class:
case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row])output对应的就是column的序列。
SparkLogicalPlan是一个简单的类,可以看到内部实现
/** * Allows already planned SparkQueries to be linked into logical query plans. * * Note that in general it is not valid to use this class to link multiple copies of the same * physical operator into the same query plan as this violates the uniqueness of expression ids. * Special handling exists for ExistingRdd as these are already leaf operators and thus we can just * replace the output attributes with new copies of themselves without breaking any attribute * linking. */ case class SparkLogicalPlan(alreadyPlanned: SparkPlan) extends logical.LogicalPlan with MultiInstanceRelation { def output = alreadyPlanned.output def references = Set.empty def children = Nil override final def newInstance: this.type = { SparkLogicalPlan( alreadyPlanned match { case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd) case _ => sys.error("Multiple instance of the same relation detected.") }).asInstanceOf[this.type] } }
到这里为止,其实都是val people: RDD[Person] = sc.textFile("people.txt").map(_.split(",")).map(p => Person(p(0), p(1).toInt)) 这一步发生的事情,结果就是得到了一个SchemaRDD。
接下来,people.registerAsTable("people") 是SchemaRDD的一个方法,可以看到真正注册table表依赖的还是SQLContext
def registerAsTable(tableName: String): Unit = { sqlContext.registerRDDAsTable(this, tableName) }
/** * Registers the given RDD as a temporary table in the catalog. Temporary tables exist only * during the lifetime of this instance of SQLContext. * * @group userf */ def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = { catalog.registerTable(None, tableName, rdd.logicalPlan) }SQLContext在注册表的时候,依赖的是Catalog类的一个实现 SimpleCatalog类:
/** * An interface for looking up relations by name. Used by an [[Analyzer]]. */ trait Catalog { def lookupRelation( databaseName: Option[String], tableName: String, alias: Option[String] = None): LogicalPlan def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit } class SimpleCatalog extends Catalog { val tables = new mutable.HashMap[String, LogicalPlan]() def registerTable(databaseName: Option[String],tableName: String, plan: LogicalPlan): Unit = { tables += ((tableName, plan)) } def dropTable(tableName: String) = tables -= tableName def lookupRelation( databaseName: Option[String], tableName: String, alias: Option[String] = None): LogicalPlan = { val table = tables.get(tableName).getOrElse(sys.error(s"Table Not Found: $tableName")) // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are // properly qualified with this alias. alias.map(a => Subquery(a.toLowerCase, table)).getOrElse(table) } }上面这部分是Catalyst包里的代码,Catalog是一个维护table信息的类,能把注册新表存储起来,对旧表能进行查询和删除。SimpleCatalog的实现则是把tableName和logicalPlan存在了一个hashmap里。
好了,在转换好RDD并存储成table之后,接下来是执行sql的时候了:val teenagers = sql("SELECT name FROM people WHERE age >= 10 && age <= 19")
sql方法同样是SQLContext的方法:
/** * Executes a SQL query using Spark, returning the result as a SchemaRDD. * * @group userf */ def sql(sqlText: String): SchemaRDD = { val result = new SchemaRDD(this, parseSql(sqlText)) // We force query optimization to happen right away instead of letting it happen lazily like // when using the query DSL. This is so DDL commands behave as expected. This is only // generates the RDD lineage for DML queries, but do not perform any execution. result.queryExecution.toRdd result }先略过生成result的过程,看到最后在返回result之前,做了一次result.queryExecution.toRdd的操作。在执行toRdd之前,前面rdd的转换、逻辑执行计划的生成、分析、优化工作都还没有实际进行数据的计算,直到toRdd了之后,这一系列的plan才真正执行,目前sql()的实现里面把计算完成了。
parseSql(sqlText)这一步是使用一个简单的sql parser解析了一下sql,这个scala parser是Catalyst包里提供的一个SqlParser,源码注释是这么说的:
/** * A very simple SQL parser. Based loosly on: * https://github.com/stephentu/scala-sql-parser/blob/master/src/main/scala/parser.scala * * Limitations: * - Only supports a very limited subset of SQL. * - Keywords must be capital. * * This is currently included mostly for illustrative purposes. Users wanting more complete support * for a SQL like language should checkout the HiveQL support in the sql/hive subproject. */ class SqlParser extends StandardTokenParsers貌似参考了人家实现的一个scala sql parser,支持很少量的sql语法,关键字还需要大写。看实现里目前可能支持下面一些关键字:
protected val ALL = Keyword("ALL") protected val AND = Keyword("AND") protected val AS = Keyword("AS") protected val ASC = Keyword("ASC") protected val AVG = Keyword("AVG") protected val BY = Keyword("BY") protected val CAST = Keyword("CAST") protected val COUNT = Keyword("COUNT") protected val DESC = Keyword("DESC") protected val DISTINCT = Keyword("DISTINCT") protected val FALSE = Keyword("FALSE") protected val FIRST = Keyword("FIRST") protected val FROM = Keyword("FROM") protected val FULL = Keyword("FULL") protected val GROUP = Keyword("GROUP") protected val HAVING = Keyword("HAVING") protected val IF = Keyword("IF") protected val IN = Keyword("IN") protected val INNER = Keyword("INNER") protected val IS = Keyword("IS") protected val JOIN = Keyword("JOIN") protected val LEFT = Keyword("LEFT") protected val LIMIT = Keyword("LIMIT") protected val NOT = Keyword("NOT") protected val NULL = Keyword("NULL") protected val ON = Keyword("ON") protected val OR = Keyword("OR") protected val ORDER = Keyword("ORDER") protected val OUTER = Keyword("OUTER") protected val RIGHT = Keyword("RIGHT") protected val SELECT = Keyword("SELECT") protected val STRING = Keyword("STRING") protected val SUM = Keyword("SUM") protected val TRUE = Keyword("TRUE") protected val UNION = Keyword("UNION") protected val WHERE = Keyword("WHERE")
好了,现在到了最高潮的部分了,sql()方法里result.queryExecution这一句是最重要的,queryExecution真正调用了SQLContext对逻辑执行计划的处理,
/** * A lazily computed query execution workflow. All other RDD operations are passed * through to the RDD that is produced by this workflow. * * We want this to be lazy because invoking the whole query optimization pipeline can be * expensive. */ @transient protected[spark] lazy val queryExecution = sqlContext.executePlan(logicalPlan)处理方式是生成一个QueryExecution,
protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution { val logical = plan }而QueryExecution里声明了几个lazy变量,直到toRdd被调用的时候,所有的分析、优化等处理都会触发生效,让我们看一下QueryExecution部分的代码
/** * The primary workflow for executing relational queries using Spark. Designed to allow easy * access to the intermediate phases of query execution for developers. */ protected abstract class QueryExecution { def logical: LogicalPlan lazy val analyzed = analyzer(logical) lazy val optimizedPlan = optimizer(analyzed) // TODO: Don't just pick the first one... lazy val sparkPlan = planner(optimizedPlan).next() lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) /** Internal version of the RDD. Avoids copies and has no schema */ lazy val toRdd: RDD[Row] = executedPlan.execute()
主要分为四步,analyzer -> optimizer -> planner -> prepareForExecution
在讲述这四步之前,简单提一下LogicalPlan,因为在转化成物理执行前,本文很多类和接口的处理对象都是逻辑执行计划,即LogicalPlan类。LogicalPlan本身是一个抽象类,他的实现有大致以下这些,他是Catalyst代码里的一个重要类,总体上是一棵语法树结构:
/** * A logical plan node with no children. */ abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan ] { self: Product => // Leaf nodes by definition cannot reference any input attributes. def references = Set.empty } /** * A logical node that represents a non-query command to be executed by the system. For example, * commands can be used by parsers to represent DDL operations. */ abstract class Command extends LeafNode { self: Product => def output = Seq.empty } /** * Returned for commands supported by a given parser, but not catalyst. In general these are DDL * commands that are passed directly to another system. */ case class NativeCommand(cmd: String) extends Command /** * Returned by a parser when the users only wants to see what query plan would be executed, without * actually performing the execution. */ case class ExplainCommand(plan: LogicalPlan ) extends Command /** * A logical plan node with single child. */ abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan ] { self: Product => } /** * A logical plan node with a left and right child. */ abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan ] { self: Product => }
关于Catalyst QueryPlan的继承结构如下图,其中SparkPlan是SQL包里的一个实现,前面在newSparkLogicalPlan的时候传入的就是一个SparkPlan。
第一步:analyzer
SQLContext有一个分析器,来自Catalyst包,里面的catalog就是存储了table信息的那货,
@transient protected[sql] lazy val analyzer: Analyzer = new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = true)从分析器的实现看,分析器是为了把不确定的属性和关系,通过catalog新和一些适配器方法通通确定选来,
/** * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and * [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and * a [[FunctionRegistry]]. */ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean) extends RuleExecutor[LogicalPlan] with HiveTypeCoercion { // TODO: pass this in as a parameter. val fixedPoint = FixedPoint(100) val batches: Seq[Batch] = Seq( Batch("MultiInstanceRelations", Once, NewRelationInstances), Batch("CaseInsensitiveAttributeReferences", Once, (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*), Batch("Resolution", fixedPoint, ResolveReferences :: ResolveRelations :: NewRelationInstances :: ImplicitGenerate :: StarExpansion :: ResolveFunctions :: GlobalAggregates :: typeCoercionRules :_*) )分析器的batches注册了很多策略,或者说是处理适配器,在内部为每个都做了实现,比如:
/** * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ object ResolveRelations extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case UnresolvedRelation(databaseName, name, alias) => catalog.lookupRelation(databaseName, name, alias) } }由于Analyzer是继承了RuleExecutor,它的每种处理方法则是继承了Rule。在RuleExecutor里,apply()方法会递归逻辑执行计划,执行batches里的处理适配器,
/** * Executes the batches of rules defined by the subclass. The batches are executed serially * using the defined execution strategy. Within each batch, rules are also executed serially. */ def apply(plan: TreeType): TreeType = { var curPlan = plan batches.foreach { batch => var iteration = 1 var lastPlan = curPlan curPlan = batch.rules.foldLeft(curPlan) { case (curPlan, rule) => rule(curPlan) } // Run until fix point (or the max number of iterations as specified in the strategy. while (iteration < batch.strategy.maxIterations && !curPlan.fastEquals(lastPlan)) { lastPlan = curPlan curPlan = batch.rules.foldLeft(curPlan) { case (curPlan, rule) => val result = rule(curPlan) if (!result.fastEquals(curPlan)) { logger.debug( s""" |=== Applying Rule ${rule.ruleName} === |${sideBySide(curPlan.treeString, result.treeString).mkString("\n")} """.stripMargin) } result } iteration += 1 } } curPlan }
第二步:optimizer
优化器的实现和处理方式同分析器很相似,只是出于不同的处理阶段,他们职责不同。优化器也继承了RuleExecutor,并实现了一批规则,这批规则会同分析器一样对输入的plan进行递归处理:
object Optimizer extends RuleExecutor[LogicalPlan] { val batches = Batch("Subqueries", Once, EliminateSubqueries) :: Batch("ConstantFolding", Once, ConstantFolding, BooleanSimplification, SimplifyCasts) :: Batch("Filter Pushdown", Once, EliminateSubqueries, CombineFilters, PushPredicateThroughProject, PushPredicateThroughInnerJoin) :: Nil }
第三步:planner
SQLContext内部的planner是一个自己实现的SparkPlanner,protected[sql] class SparkPlanner extends SparkStrategies { val sparkContext = self.sparkContext val strategies: Seq[Strategy] = TopK :: PartialAggregation :: SparkEquiInnerJoin :: BasicOperators :: CartesianProduct :: BroadcastNestedLoopJoin :: Nil }
里面包含了一个策略序列,而SparkStrategies是Catalyst包里的QueryPlanner的一个子类实现,QueryPlanner的apply()方法实现如下,
def apply(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... val iter = strategies.view.flatMap(_(plan)).toIterator assert(iter.hasNext, s"No plan for $plan") iter }对于SparkPlanner来说,就是把TopK, BasicOperator这些策略对物理执行计划进行一次遍历贯彻。
第四步:prepareForExecution
prepareForExecution依然是Catalyst RuleExecutor的一个实现,
/** * Prepares a planned SparkPlan for execution by binding references to specific ordinals, and * inserting shuffle operations as needed. */ @transient protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] { val batches = Batch("Add exchange", Once, AddExchange) :: Batch("Prepare Expressions", Once, new BindReferences[SparkPlan]) :: Nil里面注册了两个batch处理。
上述这些步骤其实都是对Catalyst RuleExecutor的实现,只是每一步分工不同,实现的事情也不一样,只有在最后 调用
/** Internal version of the RDD. Avoids copies and has no schema */ lazy val toRdd: RDD[Row] = executedPlan.execute()这一步的时候,所有之前的lazy 计划被触发执行。我们再回顾一下QueryExecution代码里的几个过程:
/** * The primary workflow for executing relational queries using Spark. Designed to allow easy * access to the intermediate phases of query execution for developers. */ protected abstract class QueryExecution { def logical: LogicalPlan lazy val analyzed = analyzer(logical) lazy val optimizedPlan = optimizer(analyzed) // TODO: Don't just pick the first one... lazy val sparkPlan = planner(optimizedPlan).next() lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) /** Internal version of the RDD. Avoids copies and has no schema */ lazy val toRdd: RDD[Row] = executedPlan.execute()
最后,sql()执行完之后返回的是一个RDD,所以之后又可以使用RDD的transform和acrtion做别的处理。
这边对analyzer, optimizer的分析略显简陋,主要是一些规则的定制,我也还没有完全仔细地去看,希望可以梳理清楚就好。
总结
总结一下Spark SQL的过程:
通过SQLContext这个新的上下文类,我们可以把RDD注册成table,这个RDD是SchemaRDD,拥有一个case class,相当于是sql表的schema信息,schema的column信息会从case class里反射出来。把RDD注册成table之后,他的信息会持有在Catalog里,且生命周期是本次上下文内存里。之后在进行sql()编写的时候,就可以利用到这个SchemaRDD,执行之前会经历几个步骤,分别是通过简单的sql parser把sql解析成逻辑执行计划,从逻辑执行计划到物理执行计划之间,有分析器、优化器和Planner做进一步的处理,这些处理本质上都是Catalyst RuleExecutor的实现,每一步骤都定制和注册了自己的规则序列,递归作用于逻辑执行计划之内。最后前面这些处理还都是lazy 的,只有触发toRdd的时候才真正执行,返回RDD,此RDD既是Spark上通用的RDD形态,可以被继续处理,从而打通了从RDD到table,经过sql处理后再回到RDD的过程。整个过程的执行和优化完全依靠的是Catalyst这个新的查询优化框架。
本文主要关注的是Spark SQL组件的sql这一块,对于其他hive和Parquet的支持不做分析。后续会再阅读Catalyst包的代码,看看这一块内容有没有一些可以参考的点。感觉Spark core包的这些新的工具组件,一个接一个地出现,我们使用的时候还是具备一些选择性,本身也不清楚哪个会是Databricks主要发展的方向,可能他们自己也是各个方面都突一突,像Catalyst这个东西貌似还会渗透到Shark里去为Shark做改良,而Spark 1.0里对于Spark SQL这块应该会是一个看点,但估计api什么的还会有很大变化。反正咱自己也看看学学,能参考的就参考参考,能学习的就学习学习。
全文完 :)
相关推荐
8个月后成为Apache顶级项目,速度之快足见过人之处,Spark以其先进的设计理念,迅速成为社区的热门项目,围绕着Spark推出了Spark SQL、Spark Streaming、MLLib和GraphX等组件,也就是BDAS(伯克利数据分析栈),这些...
采用MTV模式,数据库使用MySQL和Redis,以从豆瓣平台爬取的电影数据作为基础数据源,主要基于用户的基本信息和使用操作记录等行为信息来开发用户标签,并使用Hadoop、Spark大数据组件进行分析和处理的推荐系统。...
世界风java源码火花 如何处理大数据? 为什么要学习 Spark? Spark 是目前最流行的大数据分析工具之一。 您可能听说过其他工具,例如 Hadoop。 Hadoop 是一种较旧的技术,但仍有一些公司在使用。 Spark 通常比 ...
采用MTV模式,数据库使用MySQL和Redis,以从豆瓣平台爬取的电影数据作为基础数据源,主要基于用户的基本信息和使用操作记录等行为信息来开发用户标签,并使用Hadoop、Spark大数据组件进行分析和处理的推荐系统。...
我们记得SparkSQL的执行流程中另一个核心的组件式Analyzer,本文将会介绍Analyzer在SparkSQL里起到了什么作用。Analyzer位于Catalyst的analysispackage下,主要职责是将SqlParser未能Resolved的LogicalPlan给...
各⼤数据组件介绍 ⼀、zookeeper ZooKeeper是⼀个的,开放源码的协调服务,是的Chubby⼀个的实现,是Hadoop和Hbase的重要组件。它是⼀个为分布式应⽤提供⼀致 性服务的软件,提供的功能包括:配置维护、域名服务、...
CDH是Apache许可的开放源码,是唯一提供统一批处理,交互式SQL和交互式搜索以及基于角色的访问控制的Hadoop解决方案。 Cloudera作为一个强大的商业版数据中心管理工具,提供了各种能够快速稳定运行的数据计算框架...
本模块通过学习 Hive、Impala 等⼤数据 SQL 分析组件,让⽤户将隐匿在泥沙之下的数据价值挖掘出来。 所以在第三部分的学习中我们需要达到以下⽬标: 1、 安装部署 Hive; 理解 Hive 架构及执⾏原理 ; Hive 的优化...
CDH是Apache许可的开放源码,是唯一提供统一批处理,交互式SQL和交互式搜索以及基于角色的访问控制的Hadoop解决方案。 Cloudera作为一个强大的商业版数据中心管理工具,提供了各种能够快速稳定运行的数据计算框架...
用于交互式数据分析,包括脚本开发(SQL,Pyspark,HiveQL),任务提交(Spark,Hive),UDF,功能,资源管理和智能诊断。 到Github仓库 是一站式的数据质量管理平台,支持对各种数据源进行质量验证,